/*
 * Decompiled with CFR 0.152.
 */
package alluxio.client.file;

import alluxio.PositionReader;
import alluxio.client.file.FileInStream;
import alluxio.conf.Configuration;
import alluxio.conf.PropertyKey;
import alluxio.exception.PreconditionMessage;
import alluxio.network.protocol.databuffer.PooledDirectNioByteBuf;
import alluxio.shaded.client.com.amazonaws.annotation.NotThreadSafe;
import alluxio.shaded.client.com.google.common.annotations.VisibleForTesting;
import alluxio.shaded.client.com.google.common.base.Preconditions;
import alluxio.shaded.client.com.google.common.collect.EvictingQueue;
import alluxio.shaded.client.io.netty.buffer.ByteBuf;
import alluxio.shaded.client.io.netty.buffer.Unpooled;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Objects;

@NotThreadSafe
public class PositionReadFileInStream
extends FileInStream {
    private final long mLength;
    private long mPos = 0L;
    private boolean mClosed;
    private final PositionReader mPositionReader;
    private final PrefetchCache mCache;

    public PositionReadFileInStream(PositionReader reader, long length) {
        this.mPositionReader = reader;
        this.mLength = length;
        this.mCache = new PrefetchCache(Configuration.getInt(PropertyKey.USER_POSITION_READER_STREAMING_MULTIPLIER), this.mLength);
    }

    @Override
    public long remaining() {
        return this.mLength - this.mPos;
    }

    @VisibleForTesting
    int getBufferedLength() {
        return this.mCache.mCache.readableBytes();
    }

    @VisibleForTesting
    long getBufferedPosition() {
        return this.mCache.mCacheStartPos;
    }

    @VisibleForTesting
    int getPrefetchSize() {
        return this.mCache.mPrefetchSize;
    }

    @Override
    public int read(byte[] b, int off, int len) throws IOException {
        Objects.requireNonNull(b, "Read buffer cannot be null");
        return this.read(ByteBuffer.wrap(b), off, len);
    }

    @Override
    public int read(ByteBuffer byteBuffer, int off, int len) throws IOException {
        byteBuffer.position(off).limit(off + len);
        this.mCache.addTrace(this.mPos, len);
        int totalBytesRead = 0;
        int bytesReadFromCache = this.mCache.fillWithCache(this.mPos, byteBuffer);
        totalBytesRead += bytesReadFromCache;
        this.mPos += (long)bytesReadFromCache;
        if (!byteBuffer.hasRemaining()) {
            return totalBytesRead;
        }
        int bytesPrefetched = this.mCache.prefetch(this.mPositionReader, this.mPos, byteBuffer.remaining());
        if (bytesPrefetched < 0) {
            if (totalBytesRead == 0) {
                return -1;
            }
            return totalBytesRead;
        }
        bytesReadFromCache = this.mCache.fillWithCache(this.mPos, byteBuffer);
        totalBytesRead += bytesReadFromCache;
        this.mPos += (long)bytesReadFromCache;
        if (!byteBuffer.hasRemaining()) {
            return totalBytesRead;
        }
        int bytesRead = this.mPositionReader.read(this.mPos, byteBuffer, byteBuffer.remaining());
        if (bytesRead < 0) {
            if (totalBytesRead == 0) {
                return -1;
            }
            return totalBytesRead;
        }
        this.mPos += (long)bytesRead;
        return totalBytesRead += bytesRead;
    }

    @Override
    public int positionedRead(long position, byte[] buffer, int offset, int len) throws IOException {
        long pos = position;
        ByteBuffer byteBuffer = ByteBuffer.wrap(buffer, offset, len);
        this.mCache.addTrace(position, len);
        int totalBytesRead = 0;
        int bytesReadFromCache = this.mCache.fillWithCache(pos, byteBuffer);
        totalBytesRead += bytesReadFromCache;
        pos += (long)bytesReadFromCache;
        if (!byteBuffer.hasRemaining()) {
            return totalBytesRead;
        }
        int bytesPrefetched = this.mCache.prefetch(this.mPositionReader, pos, byteBuffer.remaining());
        if (bytesPrefetched < 0) {
            if (totalBytesRead == 0) {
                return -1;
            }
            return totalBytesRead;
        }
        bytesReadFromCache = this.mCache.fillWithCache(pos, byteBuffer);
        totalBytesRead += bytesReadFromCache;
        pos += (long)bytesReadFromCache;
        if (!byteBuffer.hasRemaining()) {
            return totalBytesRead;
        }
        int bytesRead = this.mPositionReader.read(pos, byteBuffer, byteBuffer.remaining());
        if (bytesRead < 0) {
            if (totalBytesRead == 0) {
                return -1;
            }
            return totalBytesRead;
        }
        pos += (long)bytesRead;
        return totalBytesRead += bytesRead;
    }

    @Override
    public long getPos() throws IOException {
        return this.mPos;
    }

    @Override
    public void seek(long pos) throws IOException {
        Preconditions.checkState(!this.mClosed, "Cannot do operations on a closed BlockInStream");
        Preconditions.checkArgument(pos >= 0L, PreconditionMessage.ERR_SEEK_NEGATIVE.toString(), pos);
        Preconditions.checkArgument(pos <= this.mLength, "Seek position past the end of the read region (block or file).");
        if (pos == this.mPos) {
            return;
        }
        this.mPos = pos;
    }

    @Override
    public long skip(long n) throws IOException {
        Preconditions.checkState(!this.mClosed, "Cannot do operations on a closed BlockInStream");
        if (n <= 0L) {
            return 0L;
        }
        long toSkip = Math.min(this.remaining(), n);
        this.seek(this.mPos + toSkip);
        return toSkip;
    }

    @Override
    public void close() throws IOException {
        if (this.mClosed) {
            return;
        }
        this.mClosed = true;
        this.mPositionReader.close();
        this.mCache.close();
    }

    private static class CallTrace {
        final long mPosition;
        final int mLength;

        private CallTrace(long pos, int length) {
            this.mPosition = pos;
            this.mLength = length;
        }
    }

    private static class PrefetchCache
    implements AutoCloseable {
        private final long mFileLength;
        private final EvictingQueue<CallTrace> mCallHistory;
        private int mPrefetchSize = 0;
        private ByteBuf mCache = Unpooled.wrappedBuffer(new byte[0]);
        private long mCacheStartPos = 0L;

        PrefetchCache(int prefetchMultiplier, long fileLength) {
            this.mCallHistory = EvictingQueue.create(prefetchMultiplier);
            this.mFileLength = fileLength;
        }

        private void update() {
            int consecutiveReadLength = 0;
            long lastReadEnd = -1L;
            for (CallTrace trace : this.mCallHistory) {
                if (trace.mPosition == lastReadEnd) {
                    lastReadEnd += (long)trace.mLength;
                    consecutiveReadLength += trace.mLength;
                    continue;
                }
                lastReadEnd = trace.mPosition + (long)trace.mLength;
                consecutiveReadLength = trace.mLength;
            }
            this.mPrefetchSize = consecutiveReadLength;
        }

        private void addTrace(long pos, int size) {
            this.mCallHistory.add(new CallTrace(pos, size));
            this.update();
        }

        private int fillWithCache(long targetStartPos, ByteBuffer outBuffer) {
            if (this.mCacheStartPos <= targetStartPos) {
                if (targetStartPos - this.mCacheStartPos < (long)this.mCache.readableBytes()) {
                    int posInCache = (int)(targetStartPos - this.mCacheStartPos);
                    int size = Math.min(outBuffer.remaining(), this.mCache.readableBytes() - posInCache);
                    ByteBuffer slice = outBuffer.slice();
                    slice.limit(size);
                    this.mCache.getBytes(posInCache, slice);
                    outBuffer.position(outBuffer.position() + size);
                    return size;
                }
                return 0;
            }
            return 0;
        }

        private int prefetch(PositionReader reader, long pos, int minBytesToRead) {
            int prefetchSize = Math.max(this.mPrefetchSize, minBytesToRead);
            prefetchSize = (int)Math.min(this.mFileLength - pos, (long)prefetchSize);
            if (this.mCache.capacity() < prefetchSize) {
                this.mCache.release();
                this.mCache = PooledDirectNioByteBuf.allocate(prefetchSize);
                this.mCacheStartPos = 0L;
            }
            this.mCache.clear();
            try {
                int bytesPrefetched = reader.read(pos, this.mCache, prefetchSize);
                if (bytesPrefetched > 0) {
                    this.mCache.readerIndex(0).writerIndex(bytesPrefetched);
                    this.mCacheStartPos = pos;
                }
                return bytesPrefetched;
            }
            catch (IOException ignored) {
                this.mCache.clear();
                return 0;
            }
        }

        @Override
        public void close() {
            this.mCache.release();
            this.mCache = Unpooled.wrappedBuffer(new byte[0]);
            this.mCacheStartPos = 0L;
        }
    }
}

