/*
 * Decompiled with CFR 0.152.
 */
package org.apache.druid.frame.processor;

import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterables;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.SettableFuture;
import java.io.File;
import java.io.IOException;
import java.nio.channels.Channels;
import java.nio.channels.WritableByteChannel;
import java.nio.file.Files;
import java.nio.file.OpenOption;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.IdentityHashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import org.apache.druid.frame.Frame;
import org.apache.druid.frame.FrameType;
import org.apache.druid.frame.allocation.ArenaMemoryAllocator;
import org.apache.druid.frame.allocation.MemoryAllocator;
import org.apache.druid.frame.channel.BlockingQueueFrameChannel;
import org.apache.druid.frame.channel.ReadableFileFrameChannel;
import org.apache.druid.frame.channel.ReadableFrameChannel;
import org.apache.druid.frame.channel.WritableFrameChannel;
import org.apache.druid.frame.channel.WritableFrameFileChannel;
import org.apache.druid.frame.file.FrameFile;
import org.apache.druid.frame.file.FrameFileWriter;
import org.apache.druid.frame.processor.FrameChannelMuxer;
import org.apache.druid.frame.processor.FrameProcessor;
import org.apache.druid.frame.processor.FrameProcessorExecutor;
import org.apache.druid.frame.processor.test.ChompingFrameProcessor;
import org.apache.druid.frame.processor.test.FailingFrameProcessor;
import org.apache.druid.frame.processor.test.InfiniteFrameProcessor;
import org.apache.druid.frame.processor.test.SleepyFrameProcessor;
import org.apache.druid.frame.processor.test.SuperBlasterFrameProcessor;
import org.apache.druid.frame.read.FrameReader;
import org.apache.druid.frame.testutil.FrameSequenceBuilder;
import org.apache.druid.frame.testutil.FrameTestUtil;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.segment.QueryableIndexStorageAdapter;
import org.apache.druid.segment.StorageAdapter;
import org.apache.druid.segment.TestIndex;
import org.apache.druid.segment.column.RowSignature;
import org.apache.druid.segment.incremental.IncrementalIndexStorageAdapter;
import org.apache.druid.testing.InitializedNullHandlingTest;
import org.apache.druid.utils.CloseableUtils;
import org.hamcrest.CoreMatchers;
import org.hamcrest.Matcher;
import org.hamcrest.MatcherAssert;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.runners.Enclosed;
import org.junit.internal.matchers.ThrowableMessageMatcher;
import org.junit.rules.TemporaryFolder;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

@RunWith(value=Enclosed.class)
public class FrameProcessorExecutorTest {
    private static ReadableFrameChannel openFileChannel(File file) {
        try {
            return new ReadableFileFrameChannel(FrameFile.open((File)file, (FrameFile.Flag[])new FrameFile.Flag[0]));
        }
        catch (IOException e) {
            throw new RuntimeException(e);
        }
    }

    public static abstract class BaseFrameProcessorExecutorTestSuite
    extends InitializedNullHandlingTest {
        @Rule
        public final TemporaryFolder temporaryFolder = new TemporaryFolder();
        public final int numThreads;
        FrameProcessorExecutor exec;

        public BaseFrameProcessorExecutorTestSuite(int numThreads) {
            this.numThreads = numThreads;
        }

        @Before
        public void setUp() throws Exception {
            this.exec = new FrameProcessorExecutor(MoreExecutors.listeningDecorator((ExecutorService)Execs.multiThreaded((int)this.numThreads, (String)(StringUtils.encodeForFormat((String)((Object)((Object)this)).getClass().getName()) + "-%s"))));
        }

        @After
        public void tearDown() throws Exception {
            this.exec.getExecutorService().shutdownNow();
            if (!this.exec.getExecutorService().awaitTermination(1L, TimeUnit.MINUTES)) {
                throw new ISE("Executor service did not terminate within 1 minute", new Object[0]);
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        List<File> writeToNFiles(StorageAdapter adapter, int numFiles) throws IOException {
            ArrayList<File> files = new ArrayList<File>();
            final ArrayList<FrameFileWriter> writers = new ArrayList<FrameFileWriter>();
            try {
                for (int i = 0; i < numFiles; ++i) {
                    files.add(this.temporaryFolder.newFile());
                    writers.add(FrameFileWriter.open((WritableByteChannel)Channels.newChannel(Files.newOutputStream(((File)files.get(i)).toPath(), new OpenOption[0])), null));
                }
                Consumer<Frame> writer = new Consumer<Frame>(){
                    private int j = 0;

                    @Override
                    public void accept(Frame frame) {
                        try {
                            ((FrameFileWriter)writers.get(this.j % writers.size())).writeFrame(frame, -1);
                        }
                        catch (IOException e) {
                            throw new RuntimeException(e);
                        }
                        ++this.j;
                    }
                };
                FrameSequenceBuilder.fromAdapter(adapter).frameType(FrameType.ROW_BASED).allocator((MemoryAllocator)ArenaMemoryAllocator.createOnHeap((int)1000000)).maxRowsPerFrame(3).frames().forEach((Consumer)writer);
            }
            finally {
                CloseableUtils.closeAll(writers);
            }
            return files;
        }
    }

    @RunWith(value=Parameterized.class)
    public static class MiscTests
    extends BaseFrameProcessorExecutorTestSuite {
        public MiscTests(int numThreads) {
            super(numThreads);
        }

        @Parameterized.Parameters(name="numThreads = {0}")
        public static Collection<Object[]> constructorFeeder() {
            ArrayList<Object[]> constructors = new ArrayList<Object[]>();
            for (int numThreads : new int[]{1, 3, 12}) {
                constructors.add(new Object[]{numThreads});
            }
            return constructors;
        }

        @Test
        public void test_runFully_errors() throws Exception {
            IncrementalIndexStorageAdapter adapter = new IncrementalIndexStorageAdapter(TestIndex.getIncrementalTestIndex());
            File inFile = (File)Iterables.getOnlyElement(this.writeToNFiles((StorageAdapter)adapter, 1));
            ReadableFrameChannel inChannel = FrameProcessorExecutorTest.openFileChannel(inFile);
            BlockingQueueFrameChannel outChannel = BlockingQueueFrameChannel.minimal();
            FailingFrameProcessor failer = new FailingFrameProcessor(inChannel, outChannel.writable(), 0);
            ListenableFuture failerFuture = this.exec.runFully((FrameProcessor)failer, null);
            ExecutionException e = (ExecutionException)Assert.assertThrows(ExecutionException.class, () -> failerFuture.get());
            MatcherAssert.assertThat((Object)e.getCause().getCause(), (Matcher)ThrowableMessageMatcher.hasMessage((Matcher)CoreMatchers.containsString((String)"failure!")));
            ReadableFrameChannel outReadableChannel = outChannel.readable();
            Assert.assertTrue((boolean)outReadableChannel.canRead());
            RuntimeException readException = (RuntimeException)Assert.assertThrows(RuntimeException.class, () -> ((ReadableFrameChannel)outReadableChannel).read());
            MatcherAssert.assertThat((Object)readException.getCause(), (Matcher)ThrowableMessageMatcher.hasMessage((Matcher)CoreMatchers.containsString((String)"failure!")));
            Assert.assertTrue((boolean)outReadableChannel.isFinished());
        }

        @Test
        public void test_registerCancelableFuture() throws InterruptedException {
            SettableFuture future = SettableFuture.create();
            String cancellationId = "xyzzy";
            Assert.assertSame((Object)future, (Object)this.exec.registerCancelableFuture((ListenableFuture)future, false, "xyzzy"));
            this.exec.cancel("xyzzy");
            Assert.assertTrue((boolean)future.isDone());
            Assert.assertTrue((boolean)future.isCancelled());
        }

        @Test
        public void test_cancel_sleepy() throws Exception {
            SleepyFrameProcessor processor = new SleepyFrameProcessor();
            String cancellationId = "xyzzy";
            ListenableFuture future = this.exec.runFully((FrameProcessor)processor, "xyzzy");
            processor.awaitRun();
            this.exec.cancel("xyzzy");
            Assert.assertTrue((boolean)future.isDone());
            Assert.assertTrue((boolean)future.isCancelled());
            Assert.assertTrue((boolean)processor.didGetInterrupt());
            Assert.assertTrue((boolean)processor.didCleanup());
        }

        @Test(timeout=30000L)
        public void test_futureCancel_sleepy() throws Exception {
            SleepyFrameProcessor processor = new SleepyFrameProcessor();
            String cancellationId = "xyzzy";
            ListenableFuture future = this.exec.runFully((FrameProcessor)processor, "xyzzy");
            processor.awaitRun();
            Assert.assertTrue((boolean)future.cancel(true));
            Assert.assertTrue((boolean)future.isDone());
            Assert.assertTrue((boolean)future.isCancelled());
            processor.awaitCleanup();
            Assert.assertTrue((boolean)processor.didGetInterrupt());
            Assert.assertTrue((boolean)processor.didCleanup());
        }

        @Test
        public void test_cancel_concurrency() throws Exception {
            ChompingFrameProcessor chomper;
            int numSystems = 1000;
            int numGeneratorsPerSystem = 8;
            Frame frame = (Frame)Iterables.getOnlyElement((Iterable)FrameSequenceBuilder.fromAdapter((StorageAdapter)new QueryableIndexStorageAdapter(TestIndex.getMMappedTestIndex())).frameType(FrameType.ROW_BASED).frames().toList());
            HashSet<String> systemIds = new HashSet<String>();
            HashMap<String, Object> systemGeneratorsMap = new HashMap<String, Object>();
            HashMap<String, ChompingFrameProcessor> systemChomperMap = new HashMap<String, ChompingFrameProcessor>();
            IdentityHashMap<Object, ListenableFuture> processorFutureMap = new IdentityHashMap<Object, ListenableFuture>();
            HashMap<String, Boolean> systemCleanStopMap = new HashMap<String, Boolean>();
            boolean doCleanStop = false;
            for (int systemNumber = 0; systemNumber < 1000; ++systemNumber) {
                String string = UUID.randomUUID().toString();
                ArrayList generators = new ArrayList(8);
                ArrayList<ReadableFrameChannel> channels = new ArrayList<ReadableFrameChannel>(8);
                for (int i = 0; i < 8; ++i) {
                    BlockingQueueFrameChannel channel = BlockingQueueFrameChannel.minimal();
                    generators.add(new InfiniteFrameProcessor(frame, channel.writable()));
                    channels.add(channel.readable());
                }
                chomper = new ChompingFrameProcessor(channels);
                systemIds.add(string);
                systemGeneratorsMap.put(string, generators);
                systemChomperMap.put(string, chomper);
                systemCleanStopMap.put(string, doCleanStop);
                doCleanStop = !doCleanStop;
            }
            for (String string : systemGeneratorsMap.keySet()) {
                for (InfiniteFrameProcessor generator : (List)systemGeneratorsMap.get(string)) {
                    processorFutureMap.put(generator, this.exec.runFully((FrameProcessor)generator, string));
                }
                ChompingFrameProcessor chomper2 = (ChompingFrameProcessor)systemChomperMap.get(string);
                processorFutureMap.put(chomper2, this.exec.runFully((FrameProcessor)chomper2, string));
            }
            for (Map.Entry entry : systemChomperMap.entrySet()) {
                String systemId = (String)entry.getKey();
                ((ChompingFrameProcessor)entry.getValue()).awaitRead();
                if (((Boolean)systemCleanStopMap.get(systemId)).booleanValue()) {
                    ((List)systemGeneratorsMap.get(systemId)).forEach(InfiniteFrameProcessor::stop);
                    continue;
                }
                this.exec.cancel(systemId);
            }
            for (String string : systemIds) {
                boolean cleanStop = (Boolean)systemCleanStopMap.get(string);
                List generators = (List)systemGeneratorsMap.get(string);
                chomper = (ChompingFrameProcessor)systemChomperMap.get(string);
                if (cleanStop) {
                    long systemFrameCount = 0L;
                    for (InfiniteFrameProcessor generator : generators) {
                        Long retVal = (Long)((ListenableFuture)processorFutureMap.get(generator)).get();
                        Assert.assertNotNull((Object)retVal);
                        Assert.assertEquals((long)generator.getNumFrames(), (long)retVal);
                        systemFrameCount += retVal.longValue();
                    }
                    Long retVal = (Long)((ListenableFuture)processorFutureMap.get(chomper)).get();
                    Assert.assertNotNull((Object)retVal);
                    Assert.assertEquals((long)systemFrameCount, (long)retVal);
                } else {
                    ImmutableList allProcessors = ImmutableList.copyOf((Iterable)Iterables.concat((Iterable)generators, Collections.singleton(chomper)));
                    for (FrameProcessor processor : allProcessors) {
                        ListenableFuture future = (ListenableFuture)processorFutureMap.get(processor);
                        Assert.assertTrue((boolean)future.isDone());
                        Assert.assertTrue((boolean)future.isCancelled());
                        Exception e = (Exception)Assert.assertThrows(Exception.class, () -> future.get());
                        MatcherAssert.assertThat((Object)e, (Matcher)CoreMatchers.instanceOf(CancellationException.class));
                    }
                }
                for (InfiniteFrameProcessor generator : generators) {
                    Assert.assertTrue((boolean)generator.didCleanup());
                }
                Assert.assertTrue((boolean)chomper.didCleanup());
            }
        }

        @Test
        public void test_cancel_nonexistentCancellationId() throws InterruptedException {
            this.exec.cancel("nonexistent");
        }
    }

    @RunWith(value=Parameterized.class)
    public static class SuperBlasterTests
    extends BaseFrameProcessorExecutorTestSuite {
        private final SuperBlasterFrameProcessor.AwaitStyle awaitStyle;

        public SuperBlasterTests(int numThreads, SuperBlasterFrameProcessor.AwaitStyle awaitStyle) {
            super(numThreads);
            this.awaitStyle = awaitStyle;
        }

        @Parameterized.Parameters(name="numThreads = {0}, awaitStyle = {1}")
        public static Collection<Object[]> constructorFeeder() {
            ArrayList<Object[]> constructors = new ArrayList<Object[]>();
            for (int numThreads : new int[]{1, 3, 12}) {
                for (SuperBlasterFrameProcessor.AwaitStyle awaitStyle : SuperBlasterFrameProcessor.AwaitStyle.values()) {
                    constructors.add(new Object[]{numThreads, awaitStyle});
                }
            }
            return constructors;
        }

        @Test
        public void test_runFully() throws Exception {
            IncrementalIndexStorageAdapter adapter = new IncrementalIndexStorageAdapter(TestIndex.getIncrementalTestIndex());
            List<File> inFiles = this.writeToNFiles((StorageAdapter)adapter, 3);
            File outFile = this.temporaryFolder.newFile();
            BlockingQueueFrameChannel memoryChannel1 = BlockingQueueFrameChannel.minimal();
            BlockingQueueFrameChannel memoryChannel2 = BlockingQueueFrameChannel.minimal();
            SuperBlasterFrameProcessor blaster = new SuperBlasterFrameProcessor(inFiles.stream().map(x$0 -> FrameProcessorExecutorTest.openFileChannel(x$0)).collect(Collectors.toList()), (List<WritableFrameChannel>)ImmutableList.of((Object)memoryChannel1.writable(), (Object)memoryChannel2.writable()), this.awaitStyle);
            FrameChannelMuxer muxer = new FrameChannelMuxer((List)ImmutableList.of((Object)memoryChannel1.readable(), (Object)memoryChannel2.readable()), (WritableFrameChannel)new WritableFrameFileChannel(FrameFileWriter.open((WritableByteChannel)Channels.newChannel(Files.newOutputStream(outFile.toPath(), new OpenOption[0])), null)));
            ListenableFuture blasterFuture = this.exec.runFully((FrameProcessor)blaster, null);
            ListenableFuture muxerFuture = this.exec.runFully((FrameProcessor)muxer, null);
            Assert.assertEquals((long)adapter.getNumRows(), (long)((Long)blasterFuture.get()));
            Assert.assertEquals((long)(adapter.getNumRows() * 2), (long)((Long)muxerFuture.get()));
            Assert.assertEquals((long)(adapter.getNumRows() * 2), (long)FrameTestUtil.readRowsFromFrameChannel((ReadableFrameChannel)new ReadableFileFrameChannel(FrameFile.open((File)outFile, (FrameFile.Flag[])new FrameFile.Flag[0])), FrameReader.create((RowSignature)adapter.getRowSignature())).toList().size());
        }
    }
}

