/*
 * Decompiled with CFR 0.152.
 */
package org.apache.druid.frame.processor;

import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterables;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.SettableFuture;
import it.unimi.dsi.fastutil.ints.IntBidirectionalIterator;
import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.Channels;
import java.nio.channels.WritableByteChannel;
import java.nio.file.Files;
import java.nio.file.OpenOption;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import org.apache.druid.frame.Frame;
import org.apache.druid.frame.FrameType;
import org.apache.druid.frame.allocation.ArenaMemoryAllocator;
import org.apache.druid.frame.allocation.MemoryAllocator;
import org.apache.druid.frame.channel.BlockingQueueFrameChannel;
import org.apache.druid.frame.channel.ByteTracker;
import org.apache.druid.frame.channel.ReadableFileFrameChannel;
import org.apache.druid.frame.channel.ReadableFrameChannel;
import org.apache.druid.frame.channel.WritableFrameChannel;
import org.apache.druid.frame.channel.WritableFrameFileChannel;
import org.apache.druid.frame.file.FrameFile;
import org.apache.druid.frame.file.FrameFileWriter;
import org.apache.druid.frame.key.ClusterBy;
import org.apache.druid.frame.key.ClusterByPartition;
import org.apache.druid.frame.key.ClusterByPartitions;
import org.apache.druid.frame.key.KeyColumn;
import org.apache.druid.frame.key.KeyOrder;
import org.apache.druid.frame.key.KeyTestUtils;
import org.apache.druid.frame.key.RowKey;
import org.apache.druid.frame.key.RowKeyReader;
import org.apache.druid.frame.processor.ComposingOutputChannelFactory;
import org.apache.druid.frame.processor.FileOutputChannelFactory;
import org.apache.druid.frame.processor.FrameProcessorExecutor;
import org.apache.druid.frame.processor.OutputChannel;
import org.apache.druid.frame.processor.OutputChannelFactory;
import org.apache.druid.frame.processor.OutputChannels;
import org.apache.druid.frame.processor.SuperSorter;
import org.apache.druid.frame.processor.SuperSorterProgressTracker;
import org.apache.druid.frame.read.FrameReader;
import org.apache.druid.frame.testutil.FrameSequenceBuilder;
import org.apache.druid.frame.testutil.FrameTestUtil;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.java.util.common.guava.Sequence;
import org.apache.druid.java.util.common.guava.Sequences;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.segment.ColumnInspector;
import org.apache.druid.segment.QueryableIndexStorageAdapter;
import org.apache.druid.segment.StorageAdapter;
import org.apache.druid.segment.TestIndex;
import org.apache.druid.segment.column.RowSignature;
import org.apache.druid.testing.InitializedNullHandlingTest;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.runners.Enclosed;
import org.junit.rules.TemporaryFolder;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

@RunWith(value=Enclosed.class)
public class SuperSorterTest {
    private static final Logger log = new Logger(SuperSorterTest.class);

    private static List<ReadableFrameChannel> makeFileChannels(Sequence<Frame> frames, File tmpDir, int numChannels) throws IOException {
        ArrayList<File> files = new ArrayList<File>();
        final ArrayList<WritableFrameFileChannel> writableChannels = new ArrayList<WritableFrameFileChannel>();
        for (int i = 0; i < numChannels; ++i) {
            File file = new File(tmpDir, StringUtils.format((String)"channel-%d", (Object[])new Object[]{i}));
            files.add(file);
            writableChannels.add(new WritableFrameFileChannel(FrameFileWriter.open((WritableByteChannel)Channels.newChannel(Files.newOutputStream(file.toPath(), new OpenOption[0])), null, (ByteTracker)ByteTracker.unboundedTracker())));
        }
        frames.forEach((Consumer)new Consumer<Frame>(){
            private int i;

            @Override
            public void accept(Frame frame) {
                try {
                    ((WritableFrameChannel)writableChannels.get(this.i % writableChannels.size())).write(frame);
                }
                catch (IOException e) {
                    throw new RuntimeException(e);
                }
                ++this.i;
            }
        });
        ArrayList<ReadableFrameChannel> retVal = new ArrayList<ReadableFrameChannel>();
        for (int i = 0; i < writableChannels.size(); ++i) {
            WritableFrameChannel writableChannel = (WritableFrameChannel)writableChannels.get(i);
            writableChannel.close();
            retVal.add((ReadableFrameChannel)new ReadableFileFrameChannel(FrameFile.open((File)((File)files.get(i)), null, (FrameFile.Flag[])new FrameFile.Flag[0])));
        }
        return retVal;
    }

    private static ReadableFrameChannel duplicateOutputChannel(ReadableFrameChannel channel) {
        return new ReadableFileFrameChannel(((ReadableFileFrameChannel)channel).newFrameFileReference());
    }

    private static <T> long countSequence(Sequence<T> sequence) {
        return (Long)sequence.accumulate((Object)0L, (accumulated, in) -> accumulated + 1L);
    }

    @RunWith(value=Parameterized.class)
    public static class ParameterizedCasesTest
    extends InitializedNullHandlingTest {
        @Rule
        public TemporaryFolder temporaryFolder = new TemporaryFolder();
        private final int maxRowsPerFrame;
        private final int maxBytesPerFrame;
        private final int numChannels;
        private final int maxActiveProcessors;
        private final int maxChannelsPerProcessor;
        private final int numThreads;
        private final boolean isComposedStorage;
        private StorageAdapter adapter;
        private RowSignature signature;
        private FrameProcessorExecutor exec;
        private List<ReadableFrameChannel> inputChannels;
        private FrameReader frameReader;

        public ParameterizedCasesTest(int maxRowsPerFrame, int maxBytesPerFrame, int numChannels, int maxActiveProcessors, int maxChannelsPerProcessor, int numThreads, boolean isComposedStorage) {
            this.maxRowsPerFrame = maxRowsPerFrame;
            this.maxBytesPerFrame = maxBytesPerFrame;
            this.numChannels = numChannels;
            this.maxActiveProcessors = maxActiveProcessors;
            this.maxChannelsPerProcessor = maxChannelsPerProcessor;
            this.numThreads = numThreads;
            this.isComposedStorage = isComposedStorage;
        }

        @Parameterized.Parameters(name="maxRowsPerFrame = {0}, maxBytesPerFrame = {1}, numChannels = {2}, maxActiveProcessors = {3}, maxChannelsPerProcessor = {4}, numThreads = {5}, isComposedStorage = {6}")
        public static Iterable<Object[]> constructorFeeder() {
            ArrayList<Object[]> constructors = new ArrayList<Object[]>();
            for (int maxRowsPerFrame : new int[]{Integer.MAX_VALUE, 50, 1}) {
                for (int maxBytesPerFrame : new int[]{20000, 200000}) {
                    for (int numChannels : new int[]{1, 3}) {
                        for (int maxActiveProcessors : new int[]{1, 2, 4}) {
                            for (int maxChannelsPerProcessor : new int[]{2, 3, 8}) {
                                for (int numThreads : new int[]{1, 3}) {
                                    for (boolean isComposedStorage : new boolean[]{true, false}) {
                                        if (maxActiveProcessors < maxChannelsPerProcessor) continue;
                                        constructors.add(new Object[]{maxRowsPerFrame, maxBytesPerFrame, numChannels, maxActiveProcessors, maxChannelsPerProcessor, numThreads, isComposedStorage});
                                    }
                                }
                            }
                        }
                    }
                }
            }
            return constructors;
        }

        @Before
        public void setUp() {
            this.exec = new FrameProcessorExecutor(MoreExecutors.listeningDecorator((ExecutorService)Execs.multiThreaded((int)this.numThreads, (String)(this.getClass().getSimpleName() + "[%d]"))));
            this.adapter = new QueryableIndexStorageAdapter(TestIndex.getNoRollupMMappedTestIndex());
        }

        @After
        public void tearDown() throws Exception {
            if (this.exec != null) {
                this.exec.getExecutorService().shutdownNow();
                if (!this.exec.getExecutorService().awaitTermination(5L, TimeUnit.SECONDS)) {
                    log.warn("Executor did not terminate after 5 seconds", new Object[0]);
                }
            }
        }

        private void setUpInputChannels(ClusterBy clusterBy) throws Exception {
            if (this.signature != null || this.inputChannels != null) {
                throw new ISE("Channels already created for this case", new Object[0]);
            }
            FrameSequenceBuilder frameSequenceBuilder = FrameSequenceBuilder.fromAdapter(this.adapter).maxRowsPerFrame(this.maxRowsPerFrame).sortBy(clusterBy.getColumns()).allocator((MemoryAllocator)ArenaMemoryAllocator.create((ByteBuffer)ByteBuffer.allocate(this.maxBytesPerFrame))).frameType(FrameType.ROW_BASED).populateRowNumber();
            this.inputChannels = SuperSorterTest.makeFileChannels((Sequence<Frame>)frameSequenceBuilder.frames(), this.temporaryFolder.newFolder(), this.numChannels);
            this.signature = frameSequenceBuilder.signature();
            this.frameReader = FrameReader.create((RowSignature)this.signature);
        }

        private OutputChannels verifySuperSorter(ClusterBy clusterBy, ClusterByPartitions clusterByPartitions) throws Exception {
            File tempFolder = this.temporaryFolder.newFolder();
            ComposingOutputChannelFactory outputChannelFactory = this.isComposedStorage ? new ComposingOutputChannelFactory((List)ImmutableList.of((Object)new FileOutputChannelFactory(new File(tempFolder, "1"), this.maxBytesPerFrame, null), (Object)new FileOutputChannelFactory(new File(tempFolder, "2"), this.maxBytesPerFrame, null)), this.maxBytesPerFrame) : new FileOutputChannelFactory(tempFolder, this.maxBytesPerFrame, null);
            RowKeyReader keyReader = clusterBy.keyReader((ColumnInspector)this.signature);
            Comparator keyComparator = clusterBy.keyComparator();
            SettableFuture clusterByPartitionsFuture = SettableFuture.create();
            SuperSorterProgressTracker superSorterProgressTracker = new SuperSorterProgressTracker();
            SuperSorter superSorter = new SuperSorter(this.inputChannels, this.frameReader, clusterBy.getColumns(), (ListenableFuture)clusterByPartitionsFuture, this.exec, (OutputChannelFactory)new FileOutputChannelFactory(tempFolder, this.maxBytesPerFrame, null), (OutputChannelFactory)outputChannelFactory, this.maxActiveProcessors, this.maxChannelsPerProcessor, -1L, null, superSorterProgressTracker);
            superSorter.setNoWorkRunnable(() -> clusterByPartitionsFuture.set((Object)clusterByPartitions));
            OutputChannels outputChannels = (OutputChannels)superSorter.run().get();
            Assert.assertEquals((long)clusterByPartitions.size(), (long)outputChannels.getAllChannels().size());
            Assert.assertEquals((double)1.0, (double)superSorterProgressTracker.snapshot().getProgressDigest(), (double)0.0);
            int[] clusterByPartColumns = clusterBy.getColumns().stream().mapToInt(part -> this.signature.indexOf(part.columnName())).toArray();
            ArrayList<Sequence<List<Object>>> outputSequences = new ArrayList<Sequence<List<Object>>>();
            IntBidirectionalIterator intBidirectionalIterator = outputChannels.getPartitionNumbers().iterator();
            while (intBidirectionalIterator.hasNext()) {
                int partitionNumber = (Integer)intBidirectionalIterator.next();
                ClusterByPartition partition = clusterByPartitions.get(partitionNumber);
                ReadableFrameChannel outputChannel = ((OutputChannel)Iterables.getOnlyElement((Iterable)outputChannels.getChannelsForPartition(partitionNumber))).getReadableChannel();
                FrameTestUtil.readRowsFromFrameChannel(SuperSorterTest.duplicateOutputChannel(outputChannel), this.frameReader).forEach(row -> {
                    Object[] array = new Object[clusterByPartColumns.length];
                    for (int i = 0; i < array.length; ++i) {
                        array[i] = row.get(clusterByPartColumns[i]);
                    }
                    RowKey key = this.createKey(clusterBy, array);
                    Assert.assertTrue((String)StringUtils.format((String)"Key %s >= partition %,d start %s", (Object[])new Object[]{keyReader.read(key), partitionNumber, partition.getStart() == null ? null : keyReader.read(partition.getStart())}), (partition.getStart() == null || keyComparator.compare(key, partition.getStart()) >= 0 ? 1 : 0) != 0);
                    Assert.assertTrue((String)StringUtils.format((String)"Key %s < partition %,d end %s", (Object[])new Object[]{keyReader.read(key), partitionNumber, partition.getEnd() == null ? null : keyReader.read(partition.getEnd())}), (partition.getEnd() == null || keyComparator.compare(key, partition.getEnd()) < 0 ? 1 : 0) != 0);
                });
                outputSequences.add(FrameTestUtil.readRowsFromFrameChannel(SuperSorterTest.duplicateOutputChannel(outputChannel), this.frameReader));
            }
            Sequence expectedRows = Sequences.sort(FrameTestUtil.readRowsFromAdapter(this.adapter, this.signature, true), Comparator.comparing(row -> {
                Object[] array = new Object[clusterByPartColumns.length];
                for (int i = 0; i < array.length; ++i) {
                    array[i] = row.get(clusterByPartColumns[i]);
                }
                return this.createKey(clusterBy, array);
            }, keyComparator));
            FrameTestUtil.assertRowsEqual((Sequence<List<Object>>)expectedRows, (Sequence<List<Object>>)Sequences.concat(outputSequences));
            return outputChannels;
        }

        @Test
        public void test_clusterByQualityLongAscRowNumberAsc_onePartition() throws Exception {
            ClusterBy clusterBy = new ClusterBy((List)ImmutableList.of((Object)new KeyColumn("qualityLong", KeyOrder.ASCENDING), (Object)new KeyColumn("__row_number", KeyOrder.ASCENDING)), 0);
            this.setUpInputChannels(clusterBy);
            this.verifySuperSorter(clusterBy, ClusterByPartitions.oneUniversalPartition());
        }

        @Test
        public void test_clusterByQualityLongAscRowNumberAsc_twoPartitionsOneEmpty() throws Exception {
            ClusterBy clusterBy = new ClusterBy((List)ImmutableList.of((Object)new KeyColumn("qualityLong", KeyOrder.ASCENDING), (Object)new KeyColumn("__row_number", KeyOrder.ASCENDING)), 0);
            this.setUpInputChannels(clusterBy);
            RowKey zeroZero = this.createKey(clusterBy, 0L, 0L);
            OutputChannels outputChannels = this.verifySuperSorter(clusterBy, new ClusterByPartitions((List)ImmutableList.of((Object)new ClusterByPartition(null, zeroZero), (Object)new ClusterByPartition(zeroZero, null))));
            Assert.assertEquals((long)0L, (long)SuperSorterTest.countSequence(FrameTestUtil.readRowsFromFrameChannel(((OutputChannel)Iterables.getOnlyElement((Iterable)outputChannels.getChannelsForPartition(0))).getReadableChannel(), this.frameReader)));
            Assert.assertEquals((long)this.adapter.getNumRows(), (long)SuperSorterTest.countSequence(FrameTestUtil.readRowsFromFrameChannel(((OutputChannel)Iterables.getOnlyElement((Iterable)outputChannels.getChannelsForPartition(1))).getReadableChannel(), this.frameReader)));
        }

        @Test
        public void test_clusterByQualityDescRowNumberAsc_fourPartitions() throws Exception {
            ClusterBy clusterBy = new ClusterBy((List)ImmutableList.of((Object)new KeyColumn("quality", KeyOrder.DESCENDING), (Object)new KeyColumn("__row_number", KeyOrder.ASCENDING)), 0);
            this.setUpInputChannels(clusterBy);
            ClusterByPartitions partitions = new ClusterByPartitions((List)ImmutableList.of((Object)new ClusterByPartition(this.createKey(clusterBy, "travel", 8L), this.createKey(clusterBy, "premium", 506L)), (Object)new ClusterByPartition(this.createKey(clusterBy, "premium", 506L), this.createKey(clusterBy, "mezzanine", 204L)), (Object)new ClusterByPartition(this.createKey(clusterBy, "mezzanine", 204L), this.createKey(clusterBy, "health", 900L)), (Object)new ClusterByPartition(this.createKey(clusterBy, "health", 900L), null)));
            Assert.assertEquals((long)4L, (long)partitions.size());
            this.verifySuperSorter(clusterBy, partitions);
        }

        @Test
        public void test_clusterByTimeAscMarketAscRowNumberAsc_fourPartitions() throws Exception {
            ClusterBy clusterBy = new ClusterBy((List)ImmutableList.of((Object)new KeyColumn("__time", KeyOrder.ASCENDING), (Object)new KeyColumn("market", KeyOrder.ASCENDING), (Object)new KeyColumn("__row_number", KeyOrder.ASCENDING)), 0);
            this.setUpInputChannels(clusterBy);
            ClusterByPartitions partitions = new ClusterByPartitions((List)ImmutableList.of((Object)new ClusterByPartition(this.createKey(clusterBy, 1294790400000L, "spot", 0L), this.createKey(clusterBy, 1296864000000L, "spot", 302L)), (Object)new ClusterByPartition(this.createKey(clusterBy, 1296864000000L, "spot", 302L), this.createKey(clusterBy, 1298851200000L, "spot", 604L)), (Object)new ClusterByPartition(this.createKey(clusterBy, 1298851200000L, "spot", 604L), this.createKey(clusterBy, 1300838400000L, "total_market", 906L)), (Object)new ClusterByPartition(this.createKey(clusterBy, 1300838400000L, "total_market", 906L), null)));
            Assert.assertEquals((long)4L, (long)partitions.size());
            this.verifySuperSorter(clusterBy, partitions);
        }

        @Test
        public void test_clusterByPlacementishDescRowNumberAsc_fourPartitions() throws Exception {
            ClusterBy clusterBy = new ClusterBy((List)ImmutableList.of((Object)new KeyColumn("placementish", KeyOrder.DESCENDING), (Object)new KeyColumn("__row_number", KeyOrder.ASCENDING)), 0);
            this.setUpInputChannels(clusterBy);
            ClusterByPartitions partitions = new ClusterByPartitions((List)ImmutableList.of((Object)new ClusterByPartition(this.createKey(clusterBy, ImmutableList.of((Object)"preferred", (Object)"t"), 7L), this.createKey(clusterBy, ImmutableList.of((Object)"p", (Object)"preferred"), 506L)), (Object)new ClusterByPartition(this.createKey(clusterBy, ImmutableList.of((Object)"p", (Object)"preferred"), 506L), this.createKey(clusterBy, ImmutableList.of((Object)"m", (Object)"preferred"), 204L)), (Object)new ClusterByPartition(this.createKey(clusterBy, ImmutableList.of((Object)"m", (Object)"preferred"), 204L), this.createKey(clusterBy, ImmutableList.of((Object)"h", (Object)"preferred"), 900L)), (Object)new ClusterByPartition(this.createKey(clusterBy, ImmutableList.of((Object)"h", (Object)"preferred"), 900L), null)));
            Assert.assertEquals((long)4L, (long)partitions.size());
            this.verifySuperSorter(clusterBy, partitions);
        }

        @Test
        public void test_clusterByQualityLongDescRowNumberAsc_fourPartitions() throws Exception {
            ClusterBy clusterBy = new ClusterBy((List)ImmutableList.of((Object)new KeyColumn("qualityLong", KeyOrder.DESCENDING), (Object)new KeyColumn("__row_number", KeyOrder.ASCENDING)), 0);
            this.setUpInputChannels(clusterBy);
            ClusterByPartitions partitions = new ClusterByPartitions((List)ImmutableList.of((Object)new ClusterByPartition(this.createKey(clusterBy, 1800L, 8L), this.createKey(clusterBy, 1600L, 506L)), (Object)new ClusterByPartition(this.createKey(clusterBy, 1600L, 506L), this.createKey(clusterBy, 1400L, 204L)), (Object)new ClusterByPartition(this.createKey(clusterBy, 1400L, 204L), this.createKey(clusterBy, 1300L, 900L)), (Object)new ClusterByPartition(this.createKey(clusterBy, 1300L, 900L), null)));
            Assert.assertEquals((long)4L, (long)partitions.size());
            this.verifySuperSorter(clusterBy, partitions);
        }

        @Test
        public void test_clusterByQualityLongDescRowNumberAsc_fourPartitions_durableStorage() throws Exception {
            ClusterBy clusterBy = new ClusterBy((List)ImmutableList.of((Object)new KeyColumn("qualityLong", KeyOrder.DESCENDING), (Object)new KeyColumn("__row_number", KeyOrder.ASCENDING)), 0);
            this.setUpInputChannels(clusterBy);
            ClusterByPartitions partitions = new ClusterByPartitions((List)ImmutableList.of((Object)new ClusterByPartition(this.createKey(clusterBy, 1800L, 8L), this.createKey(clusterBy, 1600L, 506L)), (Object)new ClusterByPartition(this.createKey(clusterBy, 1600L, 506L), this.createKey(clusterBy, 1400L, 204L)), (Object)new ClusterByPartition(this.createKey(clusterBy, 1400L, 204L), this.createKey(clusterBy, 1300L, 900L)), (Object)new ClusterByPartition(this.createKey(clusterBy, 1300L, 900L), null)));
            Assert.assertEquals((long)4L, (long)partitions.size());
            this.verifySuperSorter(clusterBy, partitions);
        }

        private RowKey createKey(ClusterBy clusterBy, Object ... objects) {
            RowSignature keySignature = KeyTestUtils.createKeySignature(clusterBy.getColumns(), (ColumnInspector)this.signature);
            return KeyTestUtils.createKey(keySignature, objects);
        }
    }

    public static class NonParameterizedCasesTest
    extends InitializedNullHandlingTest {
        private static final int NUM_THREADS = 1;
        private static final int FRAME_SIZE = 1000000;
        @Rule
        public TemporaryFolder temporaryFolder = new TemporaryFolder();
        private FrameProcessorExecutor exec;

        @Before
        public void setUp() {
            this.exec = new FrameProcessorExecutor(MoreExecutors.listeningDecorator((ExecutorService)Execs.multiThreaded((int)1, (String)"super-sorter-test-%d")));
        }

        @After
        public void tearDown() {
            this.exec.getExecutorService().shutdownNow();
        }

        @Test
        public void testSingleEmptyInputChannel_fileStorage() throws Exception {
            BlockingQueueFrameChannel inputChannel = BlockingQueueFrameChannel.minimal();
            inputChannel.writable().close();
            SettableFuture outputPartitionsFuture = SettableFuture.create();
            SuperSorterProgressTracker superSorterProgressTracker = new SuperSorterProgressTracker();
            File tempFolder = this.temporaryFolder.newFolder();
            SuperSorter superSorter = new SuperSorter(Collections.singletonList(inputChannel.readable()), FrameReader.create((RowSignature)RowSignature.empty()), Collections.emptyList(), (ListenableFuture)outputPartitionsFuture, this.exec, (OutputChannelFactory)new FileOutputChannelFactory(tempFolder, 1000000, null), (OutputChannelFactory)new FileOutputChannelFactory(tempFolder, 1000000, null), 2, 2, -1L, null, superSorterProgressTracker);
            superSorter.setNoWorkRunnable(() -> outputPartitionsFuture.set((Object)ClusterByPartitions.oneUniversalPartition()));
            OutputChannels channels = (OutputChannels)superSorter.run().get();
            Assert.assertEquals((long)1L, (long)channels.getAllChannels().size());
            ReadableFrameChannel channel = ((OutputChannel)Iterables.getOnlyElement((Iterable)channels.getAllChannels())).getReadableChannel();
            Assert.assertTrue((boolean)channel.isFinished());
            Assert.assertEquals((double)1.0, (double)superSorterProgressTracker.snapshot().getProgressDigest(), (double)0.0);
            channel.close();
        }
    }
}

