/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.fs;

import java.io.EOFException;
import java.io.IOException;
import java.util.ArrayDeque;
import java.util.Queue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CosNFileReadTask;
import org.apache.hadoop.fs.FSInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.NativeFileSystemStore;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class CosNFSInputStream
extends FSInputStream {
    public static final Logger LOG = LoggerFactory.getLogger(CosNFSInputStream.class);
    private FileSystem.Statistics statistics;
    private final Configuration conf;
    private final NativeFileSystemStore store;
    private final String key;
    private long position = 0L;
    private long nextPos = 0L;
    private long lastByteStart = -1L;
    private long fileSize;
    private long partRemaining;
    private long bufferStart;
    private long bufferEnd;
    private final long preReadPartSize;
    private final int maxReadPartNumber;
    private byte[] buffer;
    private boolean closed = false;
    private final int socketErrMaxRetryTimes;
    private final ExecutorService readAheadExecutorService;
    private final Queue<ReadBuffer> readBufferQueue;

    public CosNFSInputStream(Configuration conf, NativeFileSystemStore store, FileSystem.Statistics statistics, String key, long fileSize, ExecutorService readAheadExecutorService) {
        this.conf = conf;
        this.store = store;
        this.statistics = statistics;
        this.key = key;
        this.fileSize = fileSize;
        this.bufferStart = -1L;
        this.bufferEnd = -1L;
        this.preReadPartSize = conf.getLong("fs.cosn.read.ahead.block.size", 0x100000L);
        this.maxReadPartNumber = conf.getInt("fs.cosn.read.ahead.queue.size", 8);
        this.socketErrMaxRetryTimes = conf.getInt("fs.cosn.socket.error.maxRetries", 5);
        this.readAheadExecutorService = readAheadExecutorService;
        this.readBufferQueue = new ArrayDeque<ReadBuffer>(this.maxReadPartNumber);
        this.closed = false;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private synchronized void reopen(long pos) throws IOException {
        long partSize = 0L;
        if (pos < 0L) {
            throw new EOFException("Cannot seek to a negative offset");
        }
        if (pos > this.fileSize) {
            throw new EOFException("Attempted to seek or read past the end of the file");
        }
        partSize = pos + this.preReadPartSize > this.fileSize ? this.fileSize - pos : this.preReadPartSize;
        this.buffer = null;
        this.bufferStart = -1L;
        this.bufferEnd = -1L;
        boolean isRandomIO = true;
        if (pos == this.nextPos) {
            isRandomIO = false;
        } else {
            while (this.readBufferQueue.size() != 0 && this.readBufferQueue.element().getStart() != pos) {
                this.readBufferQueue.poll();
            }
        }
        this.nextPos = pos + partSize;
        int currentBufferQueueSize = this.readBufferQueue.size();
        if (currentBufferQueueSize == 0) {
            this.lastByteStart = pos - partSize;
        } else {
            ReadBuffer[] readBuffers = this.readBufferQueue.toArray(new ReadBuffer[currentBufferQueueSize]);
            this.lastByteStart = readBuffers[currentBufferQueueSize - 1].getStart();
        }
        int maxLen = this.maxReadPartNumber - currentBufferQueueSize;
        for (int i = 0; i < maxLen && i < (currentBufferQueueSize + 1) * 2 && this.lastByteStart + partSize * (long)(i + 1) <= this.fileSize; ++i) {
            ReadBuffer readBuffer;
            long byteStart = this.lastByteStart + partSize * (long)(i + 1);
            long byteEnd = byteStart + partSize - 1L;
            if (byteEnd >= this.fileSize) {
                byteEnd = this.fileSize - 1L;
            }
            if ((readBuffer = new ReadBuffer(byteStart, byteEnd)).getBuffer().length == 0) {
                readBuffer.setStatus(0);
            } else {
                this.readAheadExecutorService.execute(new CosNFileReadTask(this.conf, this.key, this.store, readBuffer, this.socketErrMaxRetryTimes));
            }
            this.readBufferQueue.add(readBuffer);
            if (isRandomIO) break;
        }
        ReadBuffer readBuffer = this.readBufferQueue.poll();
        IOException innerException = null;
        readBuffer.lock();
        try {
            readBuffer.await(1);
            if (readBuffer.getStatus() == -1) {
                innerException = readBuffer.getException();
                this.buffer = null;
                this.bufferStart = -1L;
                this.bufferEnd = -1L;
            } else {
                this.buffer = readBuffer.getBuffer();
                this.bufferStart = readBuffer.getStart();
                this.bufferEnd = readBuffer.getEnd();
            }
        }
        catch (InterruptedException e) {
            LOG.warn("interrupted exception occurs when wait a read buffer.");
        }
        finally {
            readBuffer.unLock();
        }
        if (null == this.buffer) {
            LOG.error(String.format("Null IO stream key:%s", this.key), (Throwable)innerException);
            throw new IOException("Null IO stream.", innerException);
        }
        this.position = pos;
        this.partRemaining = partSize;
    }

    public void seek(long pos) throws IOException {
        this.checkOpened();
        if (pos < 0L) {
            throw new EOFException("Cannot seek to a negative offset");
        }
        if (pos > this.fileSize) {
            throw new EOFException("Attempted to seek or read past the end of the file");
        }
        if (this.position == pos) {
            return;
        }
        if (pos >= this.bufferStart && pos <= this.bufferEnd) {
            LOG.debug("seek cache hit lastpos {}, pos {}, this buffer start {}, end {}", new Object[]{this.position, pos, this.bufferStart, this.bufferEnd});
            this.position = pos;
            this.partRemaining = this.bufferEnd - pos + 1L;
        } else {
            this.position = pos;
            this.partRemaining = -1L;
        }
    }

    public long getPos() throws IOException {
        return this.position;
    }

    public boolean seekToNewSource(long targetPos) throws IOException {
        return false;
    }

    public int read() throws IOException {
        this.checkOpened();
        if (this.partRemaining <= 0L && this.position < this.fileSize) {
            this.reopen(this.position);
        }
        int byteRead = -1;
        if (this.partRemaining != 0L) {
            byteRead = this.buffer[(int)((long)this.buffer.length - this.partRemaining)] & 0xFF;
        }
        if (byteRead >= 0) {
            ++this.position;
            --this.partRemaining;
            if (null != this.statistics) {
                this.statistics.incrementBytesRead(1L);
            }
        }
        return byteRead;
    }

    public int read(byte[] b, int off, int len) throws IOException {
        this.checkOpened();
        if (len == 0) {
            return 0;
        }
        if (off < 0 || len < 0 || len > b.length) {
            throw new IndexOutOfBoundsException();
        }
        int bytesRead = 0;
        while (this.position < this.fileSize && bytesRead < len) {
            if (this.partRemaining <= 0L) {
                this.reopen(this.position);
            }
            int bytes = 0;
            for (int i = this.buffer.length - (int)this.partRemaining; i < this.buffer.length; ++i) {
                b[off + bytesRead] = this.buffer[i];
                ++bytes;
                if (off + ++bytesRead >= len) break;
            }
            if (bytes > 0) {
                this.position += (long)bytes;
                this.partRemaining -= (long)bytes;
                continue;
            }
            if (this.partRemaining == 0L) continue;
            throw new IOException("Failed to read from stream. Remaining: " + this.partRemaining);
        }
        if (null != this.statistics && bytesRead > 0) {
            this.statistics.incrementBytesRead((long)bytesRead);
        }
        return bytesRead == 0 ? -1 : bytesRead;
    }

    public int available() throws IOException {
        this.checkOpened();
        long remaining = this.fileSize - this.position;
        if (remaining > Integer.MAX_VALUE) {
            return Integer.MAX_VALUE;
        }
        return (int)remaining;
    }

    public void close() throws IOException {
        if (this.closed) {
            return;
        }
        this.closed = true;
        this.buffer = null;
    }

    private void checkOpened() throws IOException {
        if (this.closed) {
            throw new IOException("Stream is closed!");
        }
    }

    public static class ReadBuffer {
        public static final int INIT = 1;
        public static final int SUCCESS = 0;
        public static final int ERROR = -1;
        private final Lock lock = new ReentrantLock();
        private Condition readyCondition = this.lock.newCondition();
        private byte[] buffer;
        private int status;
        private long start;
        private long end;
        private IOException exception;

        public ReadBuffer(long start, long end) {
            this.start = start;
            this.end = end;
            this.buffer = new byte[(int)(this.end - this.start) + 1];
            this.status = 1;
            this.exception = null;
        }

        public void lock() {
            this.lock.lock();
        }

        public void unLock() {
            this.lock.unlock();
        }

        public void await(int waitStatus) throws InterruptedException {
            while (this.status == waitStatus) {
                this.readyCondition.await();
            }
        }

        public void signalAll() {
            this.readyCondition.signalAll();
        }

        public byte[] getBuffer() {
            return this.buffer;
        }

        public int getStatus() {
            return this.status;
        }

        public void setStatus(int status) {
            this.status = status;
        }

        public void setException(IOException e) {
            this.exception = e;
        }

        public IOException getException() {
            return this.exception;
        }

        public long getStart() {
            return this.start;
        }

        public long getEnd() {
            return this.end;
        }
    }
}

