/*
 * Decompiled with CFR 0.152.
 */
package org.apache.druid.frame.channel;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.base.Throwables;
import com.google.common.primitives.Ints;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture;
import com.google.errorprone.annotations.concurrent.GuardedBy;
import java.nio.ByteOrder;
import java.util.ArrayList;
import java.util.List;
import java.util.NoSuchElementException;
import javax.annotation.Nullable;
import org.apache.datasketches.memory.Memory;
import org.apache.datasketches.memory.WritableMemory;
import org.apache.druid.frame.Frame;
import org.apache.druid.frame.channel.ReadableFrameChannel;
import org.apache.druid.frame.file.FrameFileWriter;
import org.apache.druid.java.util.common.Either;
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.logger.Logger;

public class ReadableByteChunksFrameChannel
implements ReadableFrameChannel {
    private static final Logger log = new Logger(ReadableByteChunksFrameChannel.class);
    private static final long MAX_FRAME_SIZE_BYTES = 100000000L;
    private static final int UNKNOWN_LENGTH = -1;
    private static final int FRAME_MARKER_BYTES = 1;
    private static final int FRAME_MARKER_AND_COMPRESSED_ENVELOPE_BYTES = 26;
    private final Object lock = new Object();
    private final String id;
    private final long bytesLimit;
    @GuardedBy(value="lock")
    private final List<Either<Throwable, byte[]>> chunks = new ArrayList<Either<Throwable, byte[]>>();
    @GuardedBy(value="lock")
    private SettableFuture<?> addChunkBackpressureFuture = null;
    @GuardedBy(value="lock")
    private SettableFuture<?> readyForReadingFuture = null;
    @GuardedBy(value="lock")
    private boolean noMoreWrites = false;
    @GuardedBy(value="lock")
    private int positionInFirstChunk = 0;
    @GuardedBy(value="lock")
    private long bytesBuffered = 0L;
    @GuardedBy(value="lock")
    private long bytesAdded = 0L;
    @GuardedBy(value="lock")
    private long nextCompressedFrameLength = -1L;
    @GuardedBy(value="lock")
    private StreamPart streamPart = StreamPart.MAGIC;

    private ReadableByteChunksFrameChannel(String id, long bytesLimit) {
        this.id = (String)Preconditions.checkNotNull((Object)id, (Object)"id");
        this.bytesLimit = bytesLimit;
    }

    public static ReadableByteChunksFrameChannel create(String id) {
        return new ReadableByteChunksFrameChannel(id, 1L);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Nullable
    public ListenableFuture<?> addChunk(byte[] chunk) {
        Object object = this.lock;
        synchronized (object) {
            if (this.noMoreWrites) {
                throw new ISE("Channel is no longer accepting writes", new Object[0]);
            }
            try {
                if (chunk.length > 0) {
                    this.bytesAdded += (long)chunk.length;
                    if (this.streamPart != StreamPart.FOOTER) {
                        this.chunks.add((Either<Throwable, byte[]>)Either.value((Object)chunk));
                        this.bytesBuffered += (long)chunk.length;
                    }
                    this.updateStreamState();
                    if (this.readyForReadingFuture != null && this.canReadFrame()) {
                        this.readyForReadingFuture.set(null);
                        this.readyForReadingFuture = null;
                    }
                }
                if (this.addChunkBackpressureFuture == null && this.bytesBuffered >= this.bytesLimit && this.canReadFrame()) {
                    this.addChunkBackpressureFuture = SettableFuture.create();
                }
                return this.addChunkBackpressureFuture;
            }
            catch (Throwable e) {
                this.setError(e);
                return null;
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void setError(Throwable t) {
        Object object = this.lock;
        synchronized (object) {
            if (this.noMoreWrites) {
                log.noStackTrace().warn(t, "Channel is no longer accepting writes, cannot propagate exception", new Object[0]);
            } else {
                this.chunks.clear();
                this.chunks.add((Either<Throwable, byte[]>)Either.error((Object)t));
                this.nextCompressedFrameLength = -1L;
                this.doneWriting();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void doneWriting() {
        Object object = this.lock;
        synchronized (object) {
            this.noMoreWrites = true;
            if (this.readyForReadingFuture != null) {
                this.readyForReadingFuture.set(null);
                this.readyForReadingFuture = null;
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public boolean isFinished() {
        Object object = this.lock;
        synchronized (object) {
            return this.chunks.isEmpty() && this.noMoreWrites && !this.canRead();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public boolean canRead() {
        Object object = this.lock;
        synchronized (object) {
            return this.canReadError() || this.canReadFrame() || this.streamPart != StreamPart.FOOTER && this.noMoreWrites;
        }
    }

    @Override
    public Frame read() {
        Object object = this.lock;
        synchronized (object) {
            if (this.canReadError()) {
                Throwable t = (Throwable)this.chunks.remove(0).map(bytes -> null).error();
                Throwables.propagateIfPossible((Throwable)t);
                throw new RuntimeException(t);
            }
            if (this.canReadFrame()) {
                return this.nextFrame();
            }
            if (this.noMoreWrites) {
                this.chunks.clear();
                this.nextCompressedFrameLength = -1L;
                throw new ISE("Incomplete or missing frame at end of stream (id = %s, position = %d)", new Object[]{this.id, this.bytesAdded - this.bytesBuffered});
            }
            assert (!this.canRead());
            throw new NoSuchElementException();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public ListenableFuture<?> readabilityFuture() {
        Object object = this.lock;
        synchronized (object) {
            if (this.canRead() || this.isFinished()) {
                return Futures.immediateFuture(null);
            }
            if (this.readyForReadingFuture != null) {
                return this.readyForReadingFuture;
            }
            this.readyForReadingFuture = SettableFuture.create();
            return this.readyForReadingFuture;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void close() {
        Object object = this.lock;
        synchronized (object) {
            this.chunks.clear();
            this.nextCompressedFrameLength = -1L;
            this.noMoreWrites = true;
        }
    }

    public String getId() {
        return this.id;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public long getBytesAdded() {
        Object object = this.lock;
        synchronized (object) {
            return this.bytesAdded;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public boolean isErrorOrFinished() {
        Object object = this.lock;
        synchronized (object) {
            return this.isFinished() || this.canRead() && !this.canReadFrame();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @VisibleForTesting
    long getBytesBuffered() {
        Object object = this.lock;
        synchronized (object) {
            return this.bytesBuffered;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private Frame nextFrame() {
        Memory frameMemory;
        Object object = this.lock;
        synchronized (object) {
            if (!this.canReadFrame()) {
                throw new ISE("Frame of size [%,d] not yet ready to read", new Object[]{this.nextCompressedFrameLength});
            }
            if (this.nextCompressedFrameLength > 2147483621L) {
                throw new ISE("Cannot read frame of size [%,d] bytes", new Object[]{this.nextCompressedFrameLength});
            }
            int numBytes = Ints.checkedCast((long)(26L + this.nextCompressedFrameLength));
            frameMemory = this.copyFromQueuedChunks(numBytes).region(1L, 26L + this.nextCompressedFrameLength - 1L);
            this.deleteFromQueuedChunks(numBytes);
            this.updateStreamState();
        }
        Frame frame = Frame.decompress(frameMemory, 0L, frameMemory.getCapacity());
        log.debug("Read frame with [%,d] rows and [%,d] bytes.", new Object[]{frame.numRows(), frame.numBytes()});
        return frame;
    }

    @GuardedBy(value="lock")
    private void updateStreamState() {
        Memory memory;
        if (this.streamPart == StreamPart.MAGIC && this.bytesBuffered >= (long)FrameFileWriter.MAGIC.length) {
            memory = this.copyFromQueuedChunks(FrameFileWriter.MAGIC.length);
            if (memory.equalTo(0L, (Object)Memory.wrap((byte[])FrameFileWriter.MAGIC), 0L, (long)FrameFileWriter.MAGIC.length)) {
                this.streamPart = StreamPart.FRAMES;
                this.deleteFromQueuedChunks(FrameFileWriter.MAGIC.length);
            } else {
                throw new ISE("Invalid stream header (id = %s, position = %d)", new Object[]{this.id, this.bytesAdded - this.bytesBuffered});
            }
        }
        if (this.streamPart == StreamPart.FRAMES && this.bytesBuffered >= 1L) {
            memory = this.copyFromQueuedChunks(1);
            if (memory.getByte(0L) == 1) {
                int bytesRequiredToReadLength = 18;
                if (this.nextCompressedFrameLength == -1L && this.bytesBuffered >= 18L) {
                    this.nextCompressedFrameLength = this.copyFromQueuedChunks(18).getLong(2L);
                    if (this.nextCompressedFrameLength <= 0L || this.nextCompressedFrameLength >= 100000000L) {
                        throw new ISE("Invalid frame size (size = %,d B)", new Object[]{this.nextCompressedFrameLength});
                    }
                }
            } else if (memory.getByte(0L) == 2) {
                this.streamPart = StreamPart.FOOTER;
                this.nextCompressedFrameLength = -1L;
            } else {
                throw new ISE("Invalid midstream marker (id = %s, position = %d)", new Object[]{this.id, this.bytesAdded - this.bytesBuffered});
            }
        }
        if (this.streamPart == StreamPart.FOOTER) {
            if (this.bytesBuffered > 0L) {
                this.deleteFromQueuedChunks(this.bytesBuffered);
            }
            assert (this.bytesBuffered == 0L && this.chunks.isEmpty() && this.nextCompressedFrameLength == -1L);
        }
        if (!(this.addChunkBackpressureFuture == null || this.bytesBuffered >= this.bytesLimit && this.canReadFrame())) {
            this.addChunkBackpressureFuture.set(null);
            this.addChunkBackpressureFuture = null;
        }
    }

    @GuardedBy(value="lock")
    private boolean canReadError() {
        return this.chunks.size() > 0 && this.chunks.get(0).isError();
    }

    @GuardedBy(value="lock")
    private boolean canReadFrame() {
        return this.nextCompressedFrameLength != -1L && this.bytesBuffered >= 26L + this.nextCompressedFrameLength;
    }

    @GuardedBy(value="lock")
    private Memory copyFromQueuedChunks(int numBytes) {
        if (this.bytesBuffered < (long)numBytes) {
            throw new IAE("Cannot copy [%,d] bytes, only have [%,d] buffered", new Object[]{numBytes, this.bytesBuffered});
        }
        WritableMemory buf = WritableMemory.allocate((int)numBytes, (ByteOrder)ByteOrder.LITTLE_ENDIAN);
        int bufPos = 0;
        for (int chunkNumber = 0; chunkNumber < this.chunks.size(); ++chunkNumber) {
            byte[] chunk = (byte[])this.chunks.get(chunkNumber).valueOrThrow();
            int chunkPosition = chunkNumber == 0 ? this.positionInFirstChunk : 0;
            int len = Math.min(chunk.length - chunkPosition, numBytes - bufPos);
            buf.putByteArray((long)bufPos, chunk, chunkPosition, len);
            if ((bufPos += len) == numBytes) break;
        }
        return buf;
    }

    @GuardedBy(value="lock")
    private void deleteFromQueuedChunks(long numBytes) {
        if (this.bytesBuffered < numBytes) {
            throw new IAE("Cannot delete [%,d] bytes, only have [%,d] buffered", new Object[]{numBytes, this.bytesBuffered});
        }
        long toDelete = numBytes;
        while (toDelete > 0L) {
            byte[] chunk = (byte[])this.chunks.get(0).valueOrThrow();
            int bytesRemainingInChunk = chunk.length - this.positionInFirstChunk;
            if (toDelete >= (long)bytesRemainingInChunk) {
                toDelete -= (long)bytesRemainingInChunk;
                this.positionInFirstChunk = 0;
                this.chunks.remove(0);
                continue;
            }
            this.positionInFirstChunk = Ints.checkedCast((long)((long)this.positionInFirstChunk + toDelete));
            toDelete = 0L;
        }
        this.bytesBuffered -= numBytes;
        this.nextCompressedFrameLength = -1L;
    }

    private static enum StreamPart {
        MAGIC,
        FRAMES,
        FOOTER;

    }
}

