001/*
002 * Licensed under the Apache License, Version 2.0 (the "License");
003 * you may not use this file except in compliance with the License.
004 * You may obtain a copy of the License at
005 *
006 *     http://www.apache.org/licenses/LICENSE-2.0
007 *
008 * Unless required by applicable law or agreed to in writing, software
009 * distributed under the License is distributed on an "AS IS" BASIS,
010 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
011 * See the License for the specific language governing permissions and
012 * limitations under the License.
013 */
014package org.apache.commons.io.input;
015
016import static org.apache.commons.io.IOUtils.EOF;
017
018// import javax.annotation.concurrent.GuardedBy;
019import java.io.EOFException;
020import java.io.FilterInputStream;
021import java.io.IOException;
022import java.io.InputStream;
023import java.io.InterruptedIOException;
024import java.nio.ByteBuffer;
025import java.util.Objects;
026import java.util.concurrent.ExecutorService;
027import java.util.concurrent.Executors;
028import java.util.concurrent.TimeUnit;
029import java.util.concurrent.atomic.AtomicBoolean;
030import java.util.concurrent.locks.Condition;
031import java.util.concurrent.locks.ReentrantLock;
032
033import org.apache.commons.io.build.AbstractStreamBuilder;
034
035/**
036 * Implements {@link InputStream} to asynchronously read ahead from an underlying input stream when a specified amount of data has been read from the current
037 * buffer. It does so by maintaining two buffers: an active buffer and a read ahead buffer. The active buffer contains data which should be returned when a
038 * read() call is issued. The read ahead buffer is used to asynchronously read from the underlying input stream. When the current active buffer is exhausted, we
039 * flip the two buffers so that we can start reading from the read ahead buffer without being blocked by disk I/O.
040 * <p>
041 * To build an instance, see {@link Builder}.
042 * </p>
043 * <p>
044 * This class was ported and adapted from Apache Spark commit 933dc6cb7b3de1d8ccaf73d124d6eb95b947ed19.
045 * </p>
046 *
047 * @since 2.9.0
048 */
049public class ReadAheadInputStream extends FilterInputStream {
050
051    /**
052     * Builds a new {@link ReadAheadInputStream} instance.
053     * <p>
054     * For example:
055     * </p>
056     * <pre>{@code
057     * ReadAheadInputStream s = ReadAheadInputStream.builder()
058     *   .setPath(path)
059     *   .setExecutorService(Executors.newSingleThreadExecutor(ReadAheadInputStream::newThread))
060     *   .get();}
061     * </pre>
062     *
063     * @since 2.12.0
064     */
065    public static class Builder extends AbstractStreamBuilder<ReadAheadInputStream, Builder> {
066
067        private ExecutorService executorService;
068
069        /**
070         * Constructs a new instance.
071         * <p>
072         * This builder use the aspects InputStream, OpenOption[], buffer size, ExecutorService.
073         * </p>
074         * <p>
075         * You must provide an origin that can be converted to an InputStream by this builder, otherwise, this call will throw an
076         * {@link UnsupportedOperationException}.
077         * </p>
078         *
079         * @return a new instance.
080         * @throws UnsupportedOperationException if the origin cannot provide an InputStream.
081         * @see #getInputStream()
082         */
083        @SuppressWarnings("resource")
084        @Override
085        public ReadAheadInputStream get() throws IOException {
086            return new ReadAheadInputStream(getInputStream(), getBufferSize(), executorService != null ? executorService : newExecutorService(),
087                    executorService == null);
088        }
089
090        /**
091         * Sets the executor service for the read-ahead thread.
092         *
093         * @param executorService the executor service for the read-ahead thread.
094         * @return this
095         */
096        public Builder setExecutorService(final ExecutorService executorService) {
097            this.executorService = executorService;
098            return this;
099        }
100
101    }
102
103    private static final ThreadLocal<byte[]> BYTE_ARRAY_1 = ThreadLocal.withInitial(() -> new byte[1]);
104
105    /**
106     * Constructs a new {@link Builder}.
107     *
108     * @return a new {@link Builder}.
109     * @since 2.12.0
110     */
111    public static Builder builder() {
112        return new Builder();
113    }
114
115    /**
116     * Constructs a new daemon thread.
117     *
118     * @param r the thread's runnable.
119     * @return a new daemon thread.
120     */
121    private static Thread newDaemonThread(final Runnable r) {
122        final Thread thread = new Thread(r, "commons-io-read-ahead");
123        thread.setDaemon(true);
124        return thread;
125    }
126
127    /**
128     * Constructs a new daemon executor service.
129     *
130     * @return a new daemon executor service.
131     */
132    private static ExecutorService newExecutorService() {
133        return Executors.newSingleThreadExecutor(ReadAheadInputStream::newDaemonThread);
134    }
135
136    private final ReentrantLock stateChangeLock = new ReentrantLock();
137
138    // @GuardedBy("stateChangeLock")
139    private ByteBuffer activeBuffer;
140
141    // @GuardedBy("stateChangeLock")
142    private ByteBuffer readAheadBuffer;
143
144    // @GuardedBy("stateChangeLock")
145    private boolean endOfStream;
146
147    // @GuardedBy("stateChangeLock")
148    // true if async read is in progress
149    private boolean readInProgress;
150
151    // @GuardedBy("stateChangeLock")
152    // true if read is aborted due to an exception in reading from underlying input stream.
153    private boolean readAborted;
154
155    // @GuardedBy("stateChangeLock")
156    private Throwable readException;
157
158    // @GuardedBy("stateChangeLock")
159    // whether the close method is called.
160    private boolean isClosed;
161
162    // @GuardedBy("stateChangeLock")
163    // true when the close method will close the underlying input stream. This is valid only if
164    // `isClosed` is true.
165    private boolean isUnderlyingInputStreamBeingClosed;
166
167    // @GuardedBy("stateChangeLock")
168    // whether there is a read ahead task running,
169    private boolean isReading;
170
171    // Whether there is a reader waiting for data.
172    private final AtomicBoolean isWaiting = new AtomicBoolean(false);
173
174    private final ExecutorService executorService;
175
176    private final boolean shutdownExecutorService;
177
178    private final Condition asyncReadComplete = stateChangeLock.newCondition();
179
180    /**
181     * Constructs an instance with the specified buffer size and read-ahead threshold
182     *
183     * @param inputStream       The underlying input stream.
184     * @param bufferSizeInBytes The buffer size.
185     * @deprecated Use {@link #builder()}, {@link Builder}, and {@link Builder#get()}
186     */
187    @Deprecated
188    public ReadAheadInputStream(final InputStream inputStream, final int bufferSizeInBytes) {
189        this(inputStream, bufferSizeInBytes, newExecutorService(), true);
190    }
191
192    /**
193     * Constructs an instance with the specified buffer size and read-ahead threshold
194     *
195     * @param inputStream       The underlying input stream.
196     * @param bufferSizeInBytes The buffer size.
197     * @param executorService   An executor service for the read-ahead thread.
198     * @deprecated Use {@link #builder()}, {@link Builder}, and {@link Builder#get()}
199     */
200    @Deprecated
201    public ReadAheadInputStream(final InputStream inputStream, final int bufferSizeInBytes, final ExecutorService executorService) {
202        this(inputStream, bufferSizeInBytes, executorService, false);
203    }
204
205    /**
206     * Constructs an instance with the specified buffer size and read-ahead threshold
207     *
208     * @param inputStream             The underlying input stream.
209     * @param bufferSizeInBytes       The buffer size.
210     * @param executorService         An executor service for the read-ahead thread.
211     * @param shutdownExecutorService Whether or not to shut down the given ExecutorService on close.
212     */
213    private ReadAheadInputStream(final InputStream inputStream, final int bufferSizeInBytes, final ExecutorService executorService,
214            final boolean shutdownExecutorService) {
215        super(Objects.requireNonNull(inputStream, "inputStream"));
216        if (bufferSizeInBytes <= 0) {
217            throw new IllegalArgumentException("bufferSizeInBytes should be greater than 0, but the value is " + bufferSizeInBytes);
218        }
219        this.executorService = Objects.requireNonNull(executorService, "executorService");
220        this.shutdownExecutorService = shutdownExecutorService;
221        this.activeBuffer = ByteBuffer.allocate(bufferSizeInBytes);
222        this.readAheadBuffer = ByteBuffer.allocate(bufferSizeInBytes);
223        this.activeBuffer.flip();
224        this.readAheadBuffer.flip();
225    }
226
227    @Override
228    public int available() throws IOException {
229        stateChangeLock.lock();
230        // Make sure we have no integer overflow.
231        try {
232            return (int) Math.min(Integer.MAX_VALUE, (long) activeBuffer.remaining() + readAheadBuffer.remaining());
233        } finally {
234            stateChangeLock.unlock();
235        }
236    }
237
238    private void checkReadException() throws IOException {
239        if (readAborted) {
240            if (readException instanceof IOException) {
241                throw (IOException) readException;
242            }
243            throw new IOException(readException);
244        }
245    }
246
247    @Override
248    public void close() throws IOException {
249        boolean isSafeToCloseUnderlyingInputStream = false;
250        stateChangeLock.lock();
251        try {
252            if (isClosed) {
253                return;
254            }
255            isClosed = true;
256            if (!isReading) {
257                // Nobody is reading, so we can close the underlying input stream in this method.
258                isSafeToCloseUnderlyingInputStream = true;
259                // Flip this to make sure the read ahead task will not close the underlying input stream.
260                isUnderlyingInputStreamBeingClosed = true;
261            }
262        } finally {
263            stateChangeLock.unlock();
264        }
265
266        if (shutdownExecutorService) {
267            try {
268                executorService.shutdownNow();
269                executorService.awaitTermination(Long.MAX_VALUE, TimeUnit.SECONDS);
270            } catch (final InterruptedException e) {
271                final InterruptedIOException iio = new InterruptedIOException(e.getMessage());
272                iio.initCause(e);
273                throw iio;
274            } finally {
275                if (isSafeToCloseUnderlyingInputStream) {
276                    super.close();
277                }
278            }
279        }
280    }
281
282    private void closeUnderlyingInputStreamIfNecessary() {
283        boolean needToCloseUnderlyingInputStream = false;
284        stateChangeLock.lock();
285        try {
286            isReading = false;
287            if (isClosed && !isUnderlyingInputStreamBeingClosed) {
288                // close method cannot close underlyingInputStream because we were reading.
289                needToCloseUnderlyingInputStream = true;
290            }
291        } finally {
292            stateChangeLock.unlock();
293        }
294        if (needToCloseUnderlyingInputStream) {
295            try {
296                super.close();
297            } catch (final IOException ignored) {
298                // TODO Rethrow as UncheckedIOException?
299            }
300        }
301    }
302
303    private boolean isEndOfStream() {
304        return !activeBuffer.hasRemaining() && !readAheadBuffer.hasRemaining() && endOfStream;
305    }
306
307    @Override
308    public int read() throws IOException {
309        if (activeBuffer.hasRemaining()) {
310            // short path - just get one byte.
311            return activeBuffer.get() & 0xFF;
312        }
313        final byte[] oneByteArray = BYTE_ARRAY_1.get();
314        oneByteArray[0] = 0;
315        return read(oneByteArray, 0, 1) == EOF ? EOF : oneByteArray[0] & 0xFF;
316    }
317
318    @Override
319    public int read(final byte[] b, final int offset, int len) throws IOException {
320        if (offset < 0 || len < 0 || len > b.length - offset) {
321            throw new IndexOutOfBoundsException();
322        }
323        if (len == 0) {
324            return 0;
325        }
326
327        if (!activeBuffer.hasRemaining()) {
328            // No remaining in active buffer - lock and switch to write ahead buffer.
329            stateChangeLock.lock();
330            try {
331                waitForAsyncReadComplete();
332                if (!readAheadBuffer.hasRemaining()) {
333                    // The first read.
334                    readAsync();
335                    waitForAsyncReadComplete();
336                    if (isEndOfStream()) {
337                        return EOF;
338                    }
339                }
340                // Swap the newly read ahead buffer in place of empty active buffer.
341                swapBuffers();
342                // After swapping buffers, trigger another async read for read ahead buffer.
343                readAsync();
344            } finally {
345                stateChangeLock.unlock();
346            }
347        }
348        len = Math.min(len, activeBuffer.remaining());
349        activeBuffer.get(b, offset, len);
350
351        return len;
352    }
353
354    /**
355     * Read data from underlyingInputStream to readAheadBuffer asynchronously.
356     *
357     * @throws IOException if an I/O error occurs.
358     */
359    private void readAsync() throws IOException {
360        stateChangeLock.lock();
361        final byte[] arr;
362        try {
363            arr = readAheadBuffer.array();
364            if (endOfStream || readInProgress) {
365                return;
366            }
367            checkReadException();
368            readAheadBuffer.position(0);
369            readAheadBuffer.flip();
370            readInProgress = true;
371        } finally {
372            stateChangeLock.unlock();
373        }
374        executorService.execute(() -> {
375            stateChangeLock.lock();
376            try {
377                if (isClosed) {
378                    readInProgress = false;
379                    return;
380                }
381                // Flip this so that the close method will not close the underlying input stream when we
382                // are reading.
383                isReading = true;
384            } finally {
385                stateChangeLock.unlock();
386            }
387
388            // Please note that it is safe to release the lock and read into the read ahead buffer
389            // because either of following two conditions will hold:
390            //
391            // 1. The active buffer has data available to read so the reader will not read from the read ahead buffer.
392            //
393            // 2. This is the first time read is called or the active buffer is exhausted, in that case the reader waits
394            // for this async read to complete.
395            //
396            // So there is no race condition in both the situations.
397            int read = 0;
398            int off = 0, len = arr.length;
399            Throwable exception = null;
400            try {
401                // try to fill the read ahead buffer.
402                // if a reader is waiting, possibly return early.
403                do {
404                    read = in.read(arr, off, len);
405                    if (read <= 0) {
406                        break;
407                    }
408                    off += read;
409                    len -= read;
410                } while (len > 0 && !isWaiting.get());
411            } catch (final Throwable ex) {
412                exception = ex;
413                if (ex instanceof Error) {
414                    // `readException` may not be reported to the user. Rethrow Error to make sure at least
415                    // The user can see Error in UncaughtExceptionHandler.
416                    throw (Error) ex;
417                }
418            } finally {
419                stateChangeLock.lock();
420                try {
421                    readAheadBuffer.limit(off);
422                    if (read < 0 || exception instanceof EOFException) {
423                        endOfStream = true;
424                    } else if (exception != null) {
425                        readAborted = true;
426                        readException = exception;
427                    }
428                    readInProgress = false;
429                    signalAsyncReadComplete();
430                } finally {
431                    stateChangeLock.unlock();
432                }
433                closeUnderlyingInputStreamIfNecessary();
434            }
435        });
436    }
437
438    private void signalAsyncReadComplete() {
439        stateChangeLock.lock();
440        try {
441            asyncReadComplete.signalAll();
442        } finally {
443            stateChangeLock.unlock();
444        }
445    }
446
447    @Override
448    public long skip(final long n) throws IOException {
449        if (n <= 0L) {
450            return 0L;
451        }
452        if (n <= activeBuffer.remaining()) {
453            // Only skipping from active buffer is sufficient
454            activeBuffer.position((int) n + activeBuffer.position());
455            return n;
456        }
457        stateChangeLock.lock();
458        final long skipped;
459        try {
460            skipped = skipInternal(n);
461        } finally {
462            stateChangeLock.unlock();
463        }
464        return skipped;
465    }
466
467    /**
468     * Internal skip function which should be called only from skip(). The assumption is that the stateChangeLock is already acquired in the caller before
469     * calling this function.
470     *
471     * @param n the number of bytes to be skipped.
472     * @return the actual number of bytes skipped.
473     * @throws IOException if an I/O error occurs.
474     */
475    private long skipInternal(final long n) throws IOException {
476        assert stateChangeLock.isLocked();
477        waitForAsyncReadComplete();
478        if (isEndOfStream()) {
479            return 0;
480        }
481        if (available() >= n) {
482            // we can skip from the internal buffers
483            int toSkip = (int) n;
484            // We need to skip from both active buffer and read ahead buffer
485            toSkip -= activeBuffer.remaining();
486            assert toSkip > 0; // skipping from activeBuffer already handled.
487            activeBuffer.position(0);
488            activeBuffer.flip();
489            readAheadBuffer.position(toSkip + readAheadBuffer.position());
490            swapBuffers();
491            // Trigger async read to emptied read ahead buffer.
492            readAsync();
493            return n;
494        }
495        final int skippedBytes = available();
496        final long toSkip = n - skippedBytes;
497        activeBuffer.position(0);
498        activeBuffer.flip();
499        readAheadBuffer.position(0);
500        readAheadBuffer.flip();
501        final long skippedFromInputStream = in.skip(toSkip);
502        readAsync();
503        return skippedBytes + skippedFromInputStream;
504    }
505
506    /**
507     * Flips the active and read ahead buffer
508     */
509    private void swapBuffers() {
510        final ByteBuffer temp = activeBuffer;
511        activeBuffer = readAheadBuffer;
512        readAheadBuffer = temp;
513    }
514
515    private void waitForAsyncReadComplete() throws IOException {
516        stateChangeLock.lock();
517        try {
518            isWaiting.set(true);
519            // There is only one reader, and one writer, so the writer should signal only once,
520            // but a while loop checking the wake-up condition is still needed to avoid spurious wakeups.
521            while (readInProgress) {
522                asyncReadComplete.await();
523            }
524        } catch (final InterruptedException e) {
525            final InterruptedIOException iio = new InterruptedIOException(e.getMessage());
526            iio.initCause(e);
527            throw iio;
528        } finally {
529            try {
530                isWaiting.set(false);
531            } finally {
532                stateChangeLock.unlock();
533            }
534        }
535        checkReadException();
536    }
537}