/*
 * Decompiled with CFR 0.152.
 */
package org.apache.druid.frame.channel;

import com.google.common.io.ByteStreams;
import com.google.common.io.Files;
import com.google.common.util.concurrent.ListenableFuture;
import java.io.Closeable;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import javax.annotation.Nullable;
import org.apache.druid.frame.Frame;
import org.apache.druid.frame.FrameType;
import org.apache.druid.frame.allocation.ArenaMemoryAllocator;
import org.apache.druid.frame.allocation.MemoryAllocator;
import org.apache.druid.frame.channel.BlockingQueueFrameChannel;
import org.apache.druid.frame.channel.ChannelClosedForWritesException;
import org.apache.druid.frame.channel.ReadableByteChunksFrameChannel;
import org.apache.druid.frame.channel.ReadableFrameChannel;
import org.apache.druid.frame.read.FrameReader;
import org.apache.druid.frame.testutil.FrameSequenceBuilder;
import org.apache.druid.frame.testutil.FrameTestUtil;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.guava.Sequence;
import org.apache.druid.java.util.common.guava.Sequences;
import org.apache.druid.segment.CursorFactory;
import org.apache.druid.segment.TestIndex;
import org.apache.druid.segment.column.RowSignature;
import org.apache.druid.segment.incremental.IncrementalIndexCursorFactory;
import org.apache.druid.testing.InitializedNullHandlingTest;
import org.hamcrest.CoreMatchers;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import org.junit.rules.TemporaryFolder;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

public class ReadableByteChunksFrameChannelTest {

    @RunWith(value=Parameterized.class)
    public static class ParameterizedWithTestIndexTests
    extends InitializedNullHandlingTest {
        @Rule
        public TemporaryFolder temporaryFolder = new TemporaryFolder();
        private final FrameType frameType;
        private final int maxRowsPerFrame;
        private final int chunkSize;

        public ParameterizedWithTestIndexTests(FrameType frameType, int maxRowsPerFrame, int chunkSize) {
            this.frameType = frameType;
            this.maxRowsPerFrame = maxRowsPerFrame;
            this.chunkSize = chunkSize;
        }

        @Parameterized.Parameters(name="frameType = {0}, maxRowsPerFrame = {1}, chunkSize = {2}")
        public static Iterable<Object[]> constructorFeeder() {
            ArrayList<Object[]> constructors = new ArrayList<Object[]>();
            for (FrameType frameType : FrameType.values()) {
                for (int maxRowsPerFrame : new int[]{1, 50, Integer.MAX_VALUE}) {
                    for (int chunkSize : new int[]{1, 10, 1000, 5000, 50000, 1000000}) {
                        constructors.add(new Object[]{frameType, maxRowsPerFrame, chunkSize});
                    }
                }
            }
            return constructors;
        }

        @Test
        public void testWriteFullyThenRead() throws IOException {
            IncrementalIndexCursorFactory cursorFactory = new IncrementalIndexCursorFactory(TestIndex.getIncrementalTestIndex());
            File file = FrameTestUtil.writeFrameFile(FrameSequenceBuilder.fromCursorFactory((CursorFactory)cursorFactory).maxRowsPerFrame(this.maxRowsPerFrame).frameType(this.frameType).frames(), this.temporaryFolder.newFile());
            ReadableByteChunksFrameChannel channel = ReadableByteChunksFrameChannel.create((String)"test", (boolean)false);
            ListenableFuture firstBackpressureFuture = null;
            long totalSize = 0L;
            Assert.assertEquals((long)0L, (long)channel.getBytesBuffered());
            try (Chunker chunker = new Chunker(new FileInputStream(file), this.chunkSize);){
                byte[] chunk;
                while ((chunk = chunker.nextChunk()) != null) {
                    ListenableFuture backpressureFuture = channel.addChunk(chunk);
                    Assert.assertEquals((long)channel.getBytesAdded(), (long)(totalSize += (long)chunk.length));
                    Assert.assertEquals((Object)channel.canRead(), (Object)(backpressureFuture != null ? 1 : 0));
                    if (backpressureFuture == null) continue;
                    if (firstBackpressureFuture == null) {
                        firstBackpressureFuture = backpressureFuture;
                        continue;
                    }
                    Assert.assertSame((Object)firstBackpressureFuture, (Object)backpressureFuture);
                }
                Assert.assertNotNull(firstBackpressureFuture);
                Assert.assertFalse((boolean)firstBackpressureFuture.isDone());
                channel.doneWriting();
            }
            FrameTestUtil.assertRowsEqual(FrameTestUtil.readRowsFromCursorFactory((CursorFactory)cursorFactory), FrameTestUtil.readRowsFromFrameChannel((ReadableFrameChannel)channel, FrameReader.create((RowSignature)cursorFactory.getRowSignature())));
        }

        @Test
        public void testWriteReadInterleaved() throws IOException {
            IncrementalIndexCursorFactory cursorFactory = new IncrementalIndexCursorFactory(TestIndex.getIncrementalTestIndex());
            File file = FrameTestUtil.writeFrameFile(FrameSequenceBuilder.fromCursorFactory((CursorFactory)cursorFactory).maxRowsPerFrame(this.maxRowsPerFrame).frameType(this.frameType).frames(), this.temporaryFolder.newFile());
            ReadableByteChunksFrameChannel channel = ReadableByteChunksFrameChannel.create((String)"test", (boolean)false);
            BlockingQueueFrameChannel outChannel = new BlockingQueueFrameChannel(10000);
            ListenableFuture backpressureFuture = null;
            int iteration = 0;
            long totalSize = 0L;
            try (Chunker chunker = new Chunker(new FileInputStream(file), this.chunkSize);){
                byte[] chunk;
                while ((chunk = chunker.nextChunk()) != null) {
                    if (iteration % 3 == 0) {
                        while (channel.canRead()) {
                            outChannel.writable().write(channel.read());
                        }
                        Assert.assertTrue((backpressureFuture == null || backpressureFuture.isDone() ? 1 : 0) != 0);
                    } else if (iteration % 11 == 0 && channel.canRead()) {
                        outChannel.writable().write(channel.read());
                    }
                    if (backpressureFuture != null && backpressureFuture.isDone()) {
                        backpressureFuture = null;
                    }
                    ++iteration;
                    ListenableFuture addVal = channel.addChunk(chunk);
                    Assert.assertEquals((long)(totalSize += (long)chunk.length), (long)channel.getBytesAdded());
                    Assert.assertEquals((Object)channel.canRead(), (Object)(addVal != null ? 1 : 0));
                    if (addVal == null) continue;
                    if (backpressureFuture == null) {
                        backpressureFuture = addVal;
                        continue;
                    }
                    Assert.assertSame((Object)backpressureFuture, (Object)addVal);
                }
                channel.doneWriting();
                while (channel.canRead()) {
                    outChannel.writable().write(channel.read());
                }
                outChannel.writable().close();
            }
            FrameTestUtil.assertRowsEqual(FrameTestUtil.readRowsFromCursorFactory((CursorFactory)cursorFactory), FrameTestUtil.readRowsFromFrameChannel(outChannel.readable(), FrameReader.create((RowSignature)cursorFactory.getRowSignature())));
        }

        private static class Chunker
        implements Closeable {
            private final FileInputStream in;
            private final int chunkSize;
            private final byte[] buf;
            private boolean eof = false;

            public Chunker(FileInputStream in, int chunkSize) {
                this.in = in;
                this.chunkSize = chunkSize;
                this.buf = new byte[chunkSize];
            }

            @Nullable
            public byte[] nextChunk() throws IOException {
                int p;
                int r;
                if (this.eof) {
                    return null;
                }
                for (p = 0; p < this.chunkSize; p += r) {
                    r = this.in.read(this.buf, p, this.chunkSize - p);
                    if (r >= 0) continue;
                    this.eof = true;
                    break;
                }
                if (p > 0) {
                    return Arrays.copyOf(this.buf, p);
                }
                return null;
            }

            @Override
            public void close() throws IOException {
                this.in.close();
            }
        }
    }

    public static class NonParameterizedTests
    extends InitializedNullHandlingTest {
        @Rule
        public TemporaryFolder temporaryFolder = new TemporaryFolder();
        @Rule
        public ExpectedException expectedException = ExpectedException.none();

        @Test
        public void testZeroBytes() {
            ReadableByteChunksFrameChannel channel = ReadableByteChunksFrameChannel.create((String)"test", (boolean)false);
            channel.doneWriting();
            Assert.assertTrue((boolean)channel.canRead());
            Assert.assertFalse((boolean)channel.isFinished());
            Assert.assertTrue((boolean)channel.isErrorOrFinished());
            this.expectedException.expect(IllegalStateException.class);
            this.expectedException.expectMessage("Incomplete or missing frame at end of stream (id = test, position = 0)");
            channel.read();
        }

        @Test
        public void testZeroBytesWithSpecialError() {
            ReadableByteChunksFrameChannel channel = ReadableByteChunksFrameChannel.create((String)"test", (boolean)false);
            channel.setError((Throwable)new IllegalArgumentException("test error"));
            channel.doneWriting();
            Assert.assertTrue((boolean)channel.canRead());
            Assert.assertFalse((boolean)channel.isFinished());
            Assert.assertTrue((boolean)channel.isErrorOrFinished());
            this.expectedException.expect(IllegalArgumentException.class);
            this.expectedException.expectMessage("test error");
            channel.read();
        }

        @Test
        public void testEmptyFrameFile() throws IOException {
            File file = FrameTestUtil.writeFrameFile((Sequence<Frame>)Sequences.empty(), this.temporaryFolder.newFile());
            ReadableByteChunksFrameChannel channel = ReadableByteChunksFrameChannel.create((String)"test", (boolean)false);
            channel.addChunk(Files.toByteArray((File)file));
            channel.doneWriting();
            Assert.assertEquals((long)file.length(), (long)channel.getBytesAdded());
            while (channel.canRead()) {
                Assert.assertFalse((boolean)channel.isFinished());
                Assert.assertFalse((boolean)channel.isErrorOrFinished());
                channel.read();
            }
            Assert.assertTrue((boolean)channel.isFinished());
            channel.close();
        }

        @Test
        public void testAddChunkAfterDoneWriting() {
            try (ReadableByteChunksFrameChannel channel = ReadableByteChunksFrameChannel.create((String)"test", (boolean)false);){
                channel.doneWriting();
                Assert.assertThrows(ChannelClosedForWritesException.class, () -> channel.addChunk(new byte[0]));
            }
        }

        @Test
        public void testTruncatedFrameFile() throws IOException {
            int allocatorSize = 64000;
            int truncatedSize = 30000;
            IncrementalIndexCursorFactory cursorFactory = new IncrementalIndexCursorFactory(TestIndex.getIncrementalTestIndex());
            File file = FrameTestUtil.writeFrameFile(FrameSequenceBuilder.fromCursorFactory((CursorFactory)cursorFactory).allocator((MemoryAllocator)ArenaMemoryAllocator.create((ByteBuffer)ByteBuffer.allocate(64000))).frameType(FrameType.COLUMNAR).frames(), this.temporaryFolder.newFile());
            byte[] truncatedFile = new byte[30000];
            try (FileInputStream in = new FileInputStream(file);){
                ByteStreams.readFully((InputStream)in, (byte[])truncatedFile);
            }
            ReadableByteChunksFrameChannel channel = ReadableByteChunksFrameChannel.create((String)"test", (boolean)false);
            channel.addChunk(truncatedFile);
            channel.doneWriting();
            Assert.assertEquals((long)truncatedFile.length, (long)channel.getBytesAdded());
            Assert.assertTrue((boolean)channel.canRead());
            Assert.assertFalse((boolean)channel.isFinished());
            Assert.assertFalse((boolean)channel.isErrorOrFinished());
            channel.read();
            Assert.assertTrue((boolean)channel.canRead());
            Assert.assertFalse((boolean)channel.isFinished());
            Assert.assertFalse((boolean)channel.isErrorOrFinished());
            channel.read();
            Assert.assertTrue((boolean)channel.canRead());
            Assert.assertFalse((boolean)channel.isFinished());
            Assert.assertTrue((boolean)channel.isErrorOrFinished());
            this.expectedException.expect(IllegalStateException.class);
            this.expectedException.expectMessage(CoreMatchers.startsWith((String)"Incomplete or missing frame at end of stream"));
            channel.read();
        }

        @Test
        public void testSetError() throws IOException {
            int allocatorSize = 64000;
            int errorAtBytePosition = 30000;
            IncrementalIndexCursorFactory cursorFactory = new IncrementalIndexCursorFactory(TestIndex.getIncrementalTestIndex());
            File file = FrameTestUtil.writeFrameFile(FrameSequenceBuilder.fromCursorFactory((CursorFactory)cursorFactory).allocator((MemoryAllocator)ArenaMemoryAllocator.create((ByteBuffer)ByteBuffer.allocate(64000))).frameType(FrameType.COLUMNAR).frames(), this.temporaryFolder.newFile());
            ReadableByteChunksFrameChannel channel = ReadableByteChunksFrameChannel.create((String)"test", (boolean)false);
            byte[] fileBytes = Files.toByteArray((File)file);
            byte[] chunk1 = new byte[30000];
            System.arraycopy(fileBytes, 0, chunk1, 0, chunk1.length);
            channel.addChunk(chunk1);
            Assert.assertEquals((long)chunk1.length, (long)channel.getBytesAdded());
            channel.setError((Throwable)new ISE("Test error!", new Object[0]));
            channel.doneWriting();
            Assert.assertEquals((long)chunk1.length, (long)channel.getBytesAdded());
            this.expectedException.expect(IllegalStateException.class);
            this.expectedException.expectMessage("Test error!");
            channel.read();
        }
    }
}

