/*
 * Decompiled with CFR 0.152.
 */
package com.antgroup.geaflow.infer.exchange;

import com.antgroup.geaflow.common.exception.GeaflowRuntimeException;
import com.antgroup.geaflow.infer.exchange.DataExchangeQueue;
import com.antgroup.geaflow.infer.exchange.UnSafeUtils;
import java.io.IOException;
import java.io.OutputStream;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import sun.misc.Unsafe;

public class DataQueueOutputStream
extends OutputStream {
    private static final int BUFFER_SIZE = 10240;
    private final DataExchangeQueue dataExchangeQueue;
    private final byte[] dataBufferArray;
    private final ByteBuffer buffer;
    private final int queueCapacity;
    private final int queueMask;

    public DataQueueOutputStream(DataExchangeQueue dataExchangeQueue) {
        this.dataExchangeQueue = dataExchangeQueue;
        this.queueCapacity = dataExchangeQueue.getQueueCapacity();
        this.dataBufferArray = new byte[10240];
        this.queueMask = dataExchangeQueue.getQueueMask();
        this.buffer = ByteBuffer.wrap(this.dataBufferArray, 0, this.dataBufferArray.length);
        this.buffer.order(ByteOrder.LITTLE_ENDIAN);
    }

    @Override
    public void write(int b) throws IOException {
        this.dataBufferArray[0] = (byte)(b & 0xFF);
        this.write(this.dataBufferArray, 0, 1);
    }

    @Override
    public void write(byte[] buffer, int offset, int size) throws IOException {
        long outputPointer = this.dataExchangeQueue.getOutputPointer();
        long currentInputIndex = outputPointer - (long)(this.queueCapacity - size);
        while (this.dataExchangeQueue.getInputNextPointer() <= currentInputIndex || this.dataExchangeQueue.getBarrierAddress() > this.dataExchangeQueue.getCurrentBufferAddress()) {
            this.dataExchangeQueue.setInputNextPointer(this.dataExchangeQueue.getInputPointerByVolatile());
            if (this.dataExchangeQueue.getInputNextPointer() > currentInputIndex && this.dataExchangeQueue.getBarrierAddress() <= this.dataExchangeQueue.getCurrentBufferAddress()) continue;
            if (this.dataExchangeQueue.enableFinished()) {
                throw new GeaflowRuntimeException("output queue is marked finished");
            }
            Thread.yield();
        }
        int currentOutputNum = 0;
        while (currentOutputNum < size) {
            long nextPointIndex = DataExchangeQueue.getNextPointIndex(outputPointer, this.queueCapacity);
            int remainNum = (int)(nextPointIndex - outputPointer);
            int bytesToWrite = Math.min(size - currentOutputNum, remainNum);
            int left = Unsafe.ARRAY_BYTE_BASE_OFFSET + offset + currentOutputNum;
            long right = this.dataExchangeQueue.getInitialQueueAddress() + (outputPointer & (long)this.queueMask);
            UnSafeUtils.UNSAFE.copyMemory(buffer, left, null, right, bytesToWrite);
            this.dataExchangeQueue.setOutputPointer(outputPointer + (long)bytesToWrite);
            currentOutputNum += bytesToWrite;
            outputPointer += (long)bytesToWrite;
        }
        this.dataExchangeQueue.setOutputPointer(outputPointer);
    }

    public boolean tryReserveBeforeWrite(int len) {
        long inputNextPointer;
        long outputPointer = this.dataExchangeQueue.getOutputPointer();
        long currentInputIndex = outputPointer - (long)(this.queueCapacity - len);
        if (this.dataExchangeQueue.getInputNextPointer() <= currentInputIndex) {
            this.dataExchangeQueue.setInputNextPointer(this.dataExchangeQueue.getInputPointerByVolatile());
        }
        return (inputNextPointer = this.dataExchangeQueue.getInputNextPointer()) > currentInputIndex;
    }

    @Override
    public void close() {
        this.dataExchangeQueue.markFinished();
    }
}

