/*
 * Decompiled with CFR 0.152.
 */
package com.antgroup.geaflow.infer.exchange;

import com.antgroup.geaflow.infer.exchange.DataExchangeQueue;
import com.antgroup.geaflow.infer.exchange.UnSafeUtils;
import java.io.IOException;
import java.io.InputStream;
import java.io.InterruptedIOException;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import sun.misc.Unsafe;

public class DataQueueInputStream
extends InputStream {
    private static final Logger LOGGER = LoggerFactory.getLogger(DataQueueInputStream.class);
    private static final int BUFFER_SIZE = 10240;
    private static final int INT_SIZE = 4;
    private static final int SHORT_SIZE = 2;
    private static final int LONG_SIZE = 8;
    private final DataExchangeQueue dataExchangeQueue;
    private final byte[] dataBufferArray;
    private final ByteBuffer buffer;
    private final int queueCapacity;
    private final long initialAddress;
    private final int queueMask;

    public DataQueueInputStream(DataExchangeQueue dataExchangeQueue) {
        this.dataExchangeQueue = dataExchangeQueue;
        this.queueCapacity = dataExchangeQueue.getQueueCapacity();
        this.initialAddress = dataExchangeQueue.getInitialQueueAddress();
        this.queueMask = dataExchangeQueue.getQueueMask();
        this.dataBufferArray = new byte[10240];
        this.buffer = ByteBuffer.wrap(this.dataBufferArray, 0, this.dataBufferArray.length);
        this.buffer.order(ByteOrder.LITTLE_ENDIAN);
    }

    @Override
    public int read() throws IOException {
        int r = this.read(this.dataBufferArray, 0, 1);
        if (r == 1) {
            return this.dataBufferArray[0] & 0xFF;
        }
        return -1;
    }

    @Override
    public int read(byte[] buffer, int offset, int length) throws IOException {
        int currentIndex;
        int currentLength;
        for (currentIndex = 0; currentIndex < length; currentIndex += currentLength) {
            try {
                currentLength = this.readFully(buffer, currentIndex + offset, length - currentIndex);
            }
            catch (InterruptedException e) {
                InterruptedIOException interruptedIOException = new InterruptedIOException(e.getMessage());
                interruptedIOException.bytesTransferred = currentIndex;
                LOGGER.error("read infer data failed", (Throwable)e);
                throw interruptedIOException;
            }
            if (currentLength >= 0) continue;
            return currentIndex > 0 ? currentIndex : -1;
        }
        return currentIndex;
    }

    public int read(byte[] b, int size) throws IOException {
        return this.read(b, 0, size);
    }

    private int readFully(byte[] buffer, int offset, int length) throws InterruptedException {
        long nextPointIndex;
        long inputPointer = this.dataExchangeQueue.getInputPointer();
        long outputNextPointer = this.dataExchangeQueue.getOutputNextPointer();
        while (inputPointer >= outputNextPointer) {
            long outputPointer = this.dataExchangeQueue.getOutputPointerByVolatile();
            this.dataExchangeQueue.setOutputNextPointer(outputPointer);
            outputNextPointer = this.dataExchangeQueue.getOutputNextPointer();
            if (inputPointer < outputNextPointer || !this.dataExchangeQueue.enableFinished()) continue;
            long outputPointerByVolatile = this.dataExchangeQueue.getOutputPointerByVolatile();
            this.dataExchangeQueue.setOutputNextPointer(outputPointerByVolatile);
            outputNextPointer = this.dataExchangeQueue.getOutputNextPointer();
            if (inputPointer < outputNextPointer) break;
            return -1;
        }
        int remainByteNum = outputNextPointer > (nextPointIndex = DataExchangeQueue.getNextPointIndex(inputPointer, this.queueCapacity)) ? (int)(nextPointIndex - inputPointer) : (int)(outputNextPointer - inputPointer);
        int readableNum = Math.min(remainByteNum, length);
        long left = this.initialAddress + (inputPointer & (long)this.queueMask);
        int right = Unsafe.ARRAY_BYTE_BASE_OFFSET + offset;
        UnSafeUtils.UNSAFE.copyMemory(null, left, buffer, right, readableNum);
        this.dataExchangeQueue.setInputPointer(inputPointer + (long)readableNum);
        return readableNum;
    }

    @Override
    public int available() {
        int availRead;
        long writeCache;
        long currentRead = this.dataExchangeQueue.getInputPointer();
        if (currentRead >= (writeCache = this.dataExchangeQueue.getOutputNextPointer())) {
            this.dataExchangeQueue.setOutputNextPointer(this.dataExchangeQueue.getOutputPointerByVolatile());
            writeCache = this.dataExchangeQueue.getOutputNextPointer();
        }
        if ((availRead = (int)(writeCache - currentRead)) > 0) {
            return availRead;
        }
        return 0;
    }

    public int getInt() throws IOException {
        this.read(this.dataBufferArray, 4);
        this.buffer.clear();
        return this.buffer.getInt();
    }

    public short getShort() throws IOException {
        this.read(this.dataBufferArray, 2);
        this.buffer.clear();
        return this.buffer.getShort();
    }

    public long getLong() throws IOException {
        this.read(this.dataBufferArray, 8);
        this.buffer.clear();
        return this.buffer.getLong();
    }

    public double getDouble() throws IOException {
        this.read(this.dataBufferArray, 8);
        this.buffer.clear();
        return this.buffer.getDouble();
    }

    public float getFloat() throws IOException {
        this.read(this.dataBufferArray, 4);
        this.buffer.clear();
        return this.buffer.getFloat();
    }
}

