/*
 * Decompiled with CFR 0.152.
 */
package org.apache.druid.frame.channel;

import com.google.common.io.ByteStreams;
import java.io.ByteArrayInputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.nio.ByteBuffer;
import java.nio.file.Files;
import java.nio.file.OpenOption;
import java.util.concurrent.ExecutorService;
import org.apache.druid.frame.Frame;
import org.apache.druid.frame.FrameType;
import org.apache.druid.frame.allocation.ArenaMemoryAllocator;
import org.apache.druid.frame.allocation.MemoryAllocator;
import org.apache.druid.frame.channel.ReadableFrameChannel;
import org.apache.druid.frame.channel.ReadableInputStreamFrameChannel;
import org.apache.druid.frame.read.FrameReader;
import org.apache.druid.frame.testutil.FrameSequenceBuilder;
import org.apache.druid.frame.testutil.FrameTestUtil;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.java.util.common.guava.Sequence;
import org.apache.druid.java.util.common.guava.Sequences;
import org.apache.druid.segment.StorageAdapter;
import org.apache.druid.segment.TestIndex;
import org.apache.druid.segment.column.RowSignature;
import org.apache.druid.segment.incremental.IncrementalIndexStorageAdapter;
import org.apache.druid.testing.InitializedNullHandlingTest;
import org.hamcrest.CoreMatchers;
import org.hamcrest.Matcher;
import org.hamcrest.MatcherAssert;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.internal.matchers.ThrowableMessageMatcher;
import org.junit.rules.ExpectedException;
import org.junit.rules.TemporaryFolder;

public class ReadableInputStreamFrameChannelTest
extends InitializedNullHandlingTest {
    @Rule
    public TemporaryFolder temporaryFolder = new TemporaryFolder();
    @Rule
    public ExpectedException expectedException = ExpectedException.none();
    final IncrementalIndexStorageAdapter adapter = new IncrementalIndexStorageAdapter(TestIndex.getIncrementalTestIndex());
    ExecutorService executorService = Execs.singleThreaded((String)"input-stream-fetcher-test");

    @Test
    public void testSimpleFrameFile() {
        InputStream inputStream = this.getInputStream();
        ReadableInputStreamFrameChannel readableInputStreamFrameChannel = ReadableInputStreamFrameChannel.open((InputStream)inputStream, (String)"readSimpleFrameFile", (ExecutorService)this.executorService, (boolean)false);
        FrameTestUtil.assertRowsEqual(FrameTestUtil.readRowsFromAdapter((StorageAdapter)this.adapter, null, false), FrameTestUtil.readRowsFromFrameChannel((ReadableFrameChannel)readableInputStreamFrameChannel, FrameReader.create((RowSignature)this.adapter.getRowSignature())));
        Assert.assertTrue((boolean)readableInputStreamFrameChannel.isFinished());
        readableInputStreamFrameChannel.close();
    }

    @Test
    public void testEmptyFrameFile() throws IOException {
        File file = FrameTestUtil.writeFrameFile((Sequence<Frame>)Sequences.empty(), this.temporaryFolder.newFile());
        ReadableInputStreamFrameChannel readableInputStreamFrameChannel = ReadableInputStreamFrameChannel.open((InputStream)Files.newInputStream(file.toPath(), new OpenOption[0]), (String)"readEmptyFrameFile", (ExecutorService)this.executorService, (boolean)false);
        Assert.assertEquals((long)FrameTestUtil.readRowsFromFrameChannel((ReadableFrameChannel)readableInputStreamFrameChannel, FrameReader.create((RowSignature)this.adapter.getRowSignature())).toList().size(), (long)0L);
        Assert.assertTrue((boolean)readableInputStreamFrameChannel.isFinished());
        readableInputStreamFrameChannel.close();
    }

    @Test
    public void testZeroBytesFrameFile() throws IOException {
        File file = this.temporaryFolder.newFile();
        FileOutputStream outputStream = new FileOutputStream(file);
        outputStream.write(new byte[0]);
        outputStream.close();
        ReadableInputStreamFrameChannel readableInputStreamFrameChannel = ReadableInputStreamFrameChannel.open((InputStream)Files.newInputStream(file.toPath(), new OpenOption[0]), (String)"testZeroBytesFrameFile", (ExecutorService)this.executorService, (boolean)false);
        IllegalStateException e = (IllegalStateException)Assert.assertThrows(IllegalStateException.class, () -> FrameTestUtil.readRowsFromFrameChannel((ReadableFrameChannel)readableInputStreamFrameChannel, FrameReader.create((RowSignature)this.adapter.getRowSignature())).toList());
        MatcherAssert.assertThat((Object)e, (Matcher)ThrowableMessageMatcher.hasMessage((Matcher)CoreMatchers.startsWith((String)"Incomplete or missing frame at end of stream")));
    }

    @Test
    public void testTruncatedFrameFile() throws IOException {
        int allocatorSize = 64000;
        int truncatedSize = 30000;
        IncrementalIndexStorageAdapter adapter = new IncrementalIndexStorageAdapter(TestIndex.getIncrementalTestIndex());
        File file = FrameTestUtil.writeFrameFile(FrameSequenceBuilder.fromAdapter((StorageAdapter)adapter).allocator((MemoryAllocator)ArenaMemoryAllocator.create((ByteBuffer)ByteBuffer.allocate(64000))).frameType(FrameType.ROW_BASED).frames(), this.temporaryFolder.newFile());
        byte[] truncatedFile = new byte[30000];
        try (FileInputStream in = new FileInputStream(file);){
            ByteStreams.readFully((InputStream)in, (byte[])truncatedFile);
        }
        ReadableInputStreamFrameChannel readableInputStreamFrameChannel = ReadableInputStreamFrameChannel.open((InputStream)new ByteArrayInputStream(truncatedFile), (String)"readTruncatedFrameFile", (ExecutorService)this.executorService, (boolean)false);
        this.expectedException.expect(ISE.class);
        this.expectedException.expectMessage("Incomplete or missing frame at end of stream");
        Assert.assertEquals((long)FrameTestUtil.readRowsFromFrameChannel((ReadableFrameChannel)readableInputStreamFrameChannel, FrameReader.create((RowSignature)adapter.getRowSignature())).toList().size(), (long)0L);
        Assert.assertTrue((boolean)readableInputStreamFrameChannel.isFinished());
        readableInputStreamFrameChannel.close();
    }

    @Test
    public void testIncorrectFrameFile() throws IOException {
        File file = this.temporaryFolder.newFile();
        FileOutputStream outputStream = new FileOutputStream(file);
        outputStream.write(10);
        outputStream.close();
        ReadableInputStreamFrameChannel readableInputStreamFrameChannel = ReadableInputStreamFrameChannel.open((InputStream)Files.newInputStream(file.toPath(), new OpenOption[0]), (String)"readIncorrectFrameFile", (ExecutorService)this.executorService, (boolean)false);
        this.expectedException.expect(ISE.class);
        this.expectedException.expectMessage("Incomplete or missing frame at end of stream");
        Assert.assertEquals((long)FrameTestUtil.readRowsFromFrameChannel((ReadableFrameChannel)readableInputStreamFrameChannel, FrameReader.create((RowSignature)this.adapter.getRowSignature())).toList().size(), (long)0L);
        Assert.assertTrue((boolean)readableInputStreamFrameChannel.isFinished());
        readableInputStreamFrameChannel.close();
    }

    @Test
    public void closeInputStreamWhileReading() throws IOException {
        InputStream inputStream = this.getInputStream();
        ReadableInputStreamFrameChannel readableInputStreamFrameChannel = ReadableInputStreamFrameChannel.open((InputStream)inputStream, (String)"closeInputStreamWhileReading", (ExecutorService)this.executorService, (boolean)false);
        inputStream.close();
        this.expectedException.expect(ISE.class);
        this.expectedException.expectMessage("Found error while reading input stream");
        FrameTestUtil.assertRowsEqual(FrameTestUtil.readRowsFromAdapter((StorageAdapter)this.adapter, null, false), FrameTestUtil.readRowsFromFrameChannel((ReadableFrameChannel)readableInputStreamFrameChannel, FrameReader.create((RowSignature)this.adapter.getRowSignature())));
        Assert.assertTrue((boolean)readableInputStreamFrameChannel.isFinished());
        readableInputStreamFrameChannel.close();
    }

    @Test
    public void closeInputStreamWhileReadingCheckError() throws IOException, InterruptedException {
        InputStream inputStream = this.getInputStream();
        ReadableInputStreamFrameChannel readableInputStreamFrameChannel = ReadableInputStreamFrameChannel.open((InputStream)inputStream, (String)"closeInputStreamWhileReadingCheckError", (ExecutorService)this.executorService, (boolean)false);
        inputStream.close();
        this.expectedException.expect(ISE.class);
        this.expectedException.expectMessage("Found error while reading input stream");
        while (!readableInputStreamFrameChannel.canRead()) {
            Thread.sleep(10L);
        }
        readableInputStreamFrameChannel.read();
        Assert.assertTrue((boolean)readableInputStreamFrameChannel.isFinished());
        readableInputStreamFrameChannel.close();
    }

    private InputStream getInputStream() {
        try {
            File file = FrameTestUtil.writeFrameFile(FrameSequenceBuilder.fromAdapter((StorageAdapter)this.adapter).maxRowsPerFrame(10).frameType(FrameType.ROW_BASED).frames(), this.temporaryFolder.newFile());
            return Files.newInputStream(file.toPath(), new OpenOption[0]);
        }
        catch (IOException e) {
            throw new ISE((Throwable)e, "Unable to create file input stream", new Object[0]);
        }
    }
}

