/*
 * Decompiled with CFR 0.152.
 */
package io.trino.operator.output;

import com.google.common.collect.ImmutableList;
import io.airlift.concurrent.Threads;
import io.airlift.slice.Slice;
import io.airlift.units.DataSize;
import io.trino.Session;
import io.trino.SessionTestUtils;
import io.trino.block.BlockAssertions;
import io.trino.execution.StageId;
import io.trino.execution.TaskId;
import io.trino.execution.buffer.CompressionCodec;
import io.trino.execution.buffer.OutputBuffer;
import io.trino.execution.buffer.OutputBufferStateMachine;
import io.trino.execution.buffer.PagesSerdeFactory;
import io.trino.execution.buffer.PartitionedOutputBuffer;
import io.trino.execution.buffer.PipelinedOutputBuffers;
import io.trino.execution.buffer.TestingPagesSerdes;
import io.trino.jmh.Benchmarks;
import io.trino.memory.context.AggregatedMemoryContext;
import io.trino.memory.context.LocalMemoryContext;
import io.trino.memory.context.SimpleLocalMemoryContext;
import io.trino.operator.BucketPartitionFunction;
import io.trino.operator.DriverContext;
import io.trino.operator.HashGenerator;
import io.trino.operator.PageTestUtils;
import io.trino.operator.PartitionFunction;
import io.trino.operator.PrecomputedHashGenerator;
import io.trino.operator.output.PartitionedOutputOperator;
import io.trino.operator.output.PositionsAppenderFactory;
import io.trino.spi.Page;
import io.trino.spi.QueryId;
import io.trino.spi.block.Block;
import io.trino.spi.block.RowBlock;
import io.trino.spi.block.RunLengthEncodedBlock;
import io.trino.spi.connector.BucketFunction;
import io.trino.spi.type.ArrayType;
import io.trino.spi.type.BigintType;
import io.trino.spi.type.BooleanType;
import io.trino.spi.type.DecimalType;
import io.trino.spi.type.IntegerType;
import io.trino.spi.type.MapType;
import io.trino.spi.type.RowType;
import io.trino.spi.type.SmallintType;
import io.trino.spi.type.Type;
import io.trino.spi.type.TypeOperators;
import io.trino.spi.type.VarcharType;
import io.trino.sql.planner.HashBucketFunction;
import io.trino.sql.planner.plan.PlanNodeId;
import io.trino.testing.TestingTaskContext;
import io.trino.type.BlockTypeOperators;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.OptionalInt;
import java.util.Set;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.IntStream;
import java.util.stream.LongStream;
import org.junit.jupiter.api.Test;
import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.BenchmarkMode;
import org.openjdk.jmh.annotations.Fork;
import org.openjdk.jmh.annotations.Measurement;
import org.openjdk.jmh.annotations.Mode;
import org.openjdk.jmh.annotations.OutputTimeUnit;
import org.openjdk.jmh.annotations.Param;
import org.openjdk.jmh.annotations.Scope;
import org.openjdk.jmh.annotations.Setup;
import org.openjdk.jmh.annotations.State;
import org.openjdk.jmh.annotations.Warmup;
import org.openjdk.jmh.infra.Blackhole;

@State(value=Scope.Thread)
@OutputTimeUnit(value=TimeUnit.MILLISECONDS)
@Fork(value=2)
@Warmup(iterations=10, time=500, timeUnit=TimeUnit.MILLISECONDS)
@Measurement(iterations=10, time=500, timeUnit=TimeUnit.MILLISECONDS)
@BenchmarkMode(value={Mode.AverageTime})
public class BenchmarkPartitionedOutputOperator {
    private static final PositionsAppenderFactory POSITIONS_APPENDER_FACTORY = new PositionsAppenderFactory(new BlockTypeOperators());

    @Benchmark
    public void addPage(BenchmarkData data) {
        PartitionedOutputOperator operator = data.createPartitionedOutputOperator();
        for (int i = 0; i < data.getPageCount(); ++i) {
            operator.addInput(data.getDataPage());
        }
        operator.finish();
    }

    @Test
    public void verifyAddPage() {
        BenchmarkData data = new BenchmarkData();
        data.setup(null);
        new BenchmarkPartitionedOutputOperator().addPage(data);
    }

    private static RowType rowTypeWithDefaultFieldNames(List<Type> types) {
        List<Object> fields = new ArrayList<RowType.Field>();
        for (int i = 0; i < types.size(); ++i) {
            fields.add(new RowType.Field(Optional.of("field" + i), types.get(i)));
        }
        fields = Collections.unmodifiableList(fields);
        return RowType.from(fields);
    }

    private static MapType createMapType(Type keyType, Type valueType) {
        return new MapType(keyType, valueType, new TypeOperators());
    }

    private static void pollute() {
        try {
            List<BenchmarkData.TestType> types = List.of(BenchmarkData.TestType.BIGINT, BenchmarkData.TestType.DICTIONARY_BIGINT, BenchmarkData.TestType.RLE_BIGINT, BenchmarkData.TestType.LONG_DECIMAL, BenchmarkData.TestType.INTEGER, BenchmarkData.TestType.SMALLINT, BenchmarkData.TestType.BOOLEAN, BenchmarkData.TestType.VARCHAR, BenchmarkData.TestType.ARRAY_BIGINT);
            BenchmarkPartitionedOutputOperator benchmark = new BenchmarkPartitionedOutputOperator();
            types.forEach(type -> {
                BenchmarkData data = new BenchmarkData();
                data.setType((BenchmarkData.TestType)((Object)type));
                data.setupData(null);
                data.setPageCount(1);
                benchmark.addPage(data);
                data = new BenchmarkData();
                data.setType((BenchmarkData.TestType)((Object)type));
                data.setPartitionCount(256);
                data.setPositionCount(256);
                data.setupData(null);
                data.setPageCount(50);
                benchmark.addPage(data);
            });
        }
        catch (Throwable throwable) {
            throw new RuntimeException(throwable);
        }
    }

    public static void main(String[] args) throws Exception {
        Benchmarks.benchmark(BenchmarkPartitionedOutputOperator.class).withOptions(optionsBuilder -> optionsBuilder.jvmArgs(new String[]{"-Xmx16g"})).run();
    }

    @State(value=Scope.Thread)
    public static class BenchmarkData {
        private static final int DEFAULT_POSITION_COUNT = 8192;
        private static final DataSize MAX_PARTITION_BUFFER_SIZE = DataSize.of((long)256L, (DataSize.Unit)DataSize.Unit.MEGABYTE);
        private static final ExecutorService EXECUTOR = Executors.newCachedThreadPool(Threads.daemonThreadsNamed((String)"BenchmarkPartitionedOutputOperator-executor-%s"));
        private static final ScheduledExecutorService SCHEDULER = Executors.newScheduledThreadPool(1, Threads.daemonThreadsNamed((String)"BenchmarkPartitionedOutputOperator-scheduledExecutor-%s"));
        @Param(value={"2", "16", "256"})
        private int partitionCount = 256;
        @Param(value={"LZ4", "NONE"})
        private CompressionCodec compressionCodec = CompressionCodec.NONE;
        @Param(value={"1", "2"})
        private int channelCount = 1;
        @Param(value={"8192"})
        private int positionCount = 8192;
        @Param(value={"BIGINT", "BIGINT_PARTITION_CHANNEL_SKEWED", "DICTIONARY_BIGINT", "RLE_BIGINT", "BIGINT_PARTITION_CHANNEL_20_PERCENT", "BIGINT_PARTITION_CHANNEL_DICTIONARY_20_PERCENT", "BIGINT_PARTITION_CHANNEL_DICTIONARY_50_PERCENT", "BIGINT_PARTITION_CHANNEL_DICTIONARY_80_PERCENT", "BIGINT_PARTITION_CHANNEL_DICTIONARY_100_PERCENT", "BIGINT_PARTITION_CHANNEL_DICTIONARY_100_PERCENT_MINUS_1", "BIGINT_PARTITION_CHANNEL_RLE", "BIGINT_PARTITION_CHANNEL_RLE_NULL", "LONG_DECIMAL", "DICTIONARY_LONG_DECIMAL", "INTEGER", "DICTIONARY_INTEGER", "SMALLINT", "DICTIONARY_SMALLINT", "BOOLEAN", "DICTIONARY_BOOLEAN", "VARCHAR", "DICTIONARY_VARCHAR", "ARRAY_BIGINT", "ARRAY_VARCHAR", "ARRAY_ARRAY_BIGINT", "MAP_BIGINT_BIGINT", "MAP_BIGINT_MAP_BIGINT_BIGINT", "ROW_BIGINT_BIGINT", "ROW_ARRAY_BIGINT_ARRAY_BIGINT", "ROW_RLE_BIGINT_BIGINT"})
        private TestType type = TestType.BIGINT;
        @Param(value={"0", "0.2"})
        private float nullRate = 0.2f;
        private List<Type> types;
        private int pageCount;
        private Page dataPage;
        private Blackhole blackhole;

        public int getPageCount() {
            return this.pageCount;
        }

        public void setPageCount(int pageCount) {
            this.pageCount = pageCount;
        }

        public void setPartitionCount(int partitionCount) {
            this.partitionCount = partitionCount;
        }

        public void setPositionCount(int positionCount) {
            this.positionCount = positionCount;
        }

        public void setType(TestType type) {
            this.type = Objects.requireNonNull(type, "type is null");
        }

        public Page getDataPage() {
            return this.dataPage;
        }

        @Setup
        public void setup(Blackhole blackhole) {
            this.setupData(blackhole);
            BenchmarkPartitionedOutputOperator.pollute();
        }

        private void setupData(Blackhole blackhole) {
            this.blackhole = blackhole;
            this.types = this.type.getTypes(this.channelCount);
            this.dataPage = this.type.getPageGenerator().createPage(this.types, this.positionCount, this.nullRate);
            this.pageCount = this.type.getPageCount();
            this.types = ImmutableList.builder().addAll(this.types).add((Object)BigintType.BIGINT).build();
        }

        private static Page page(int positionCount, int channelCount, Supplier<Block> standardBlock, Block partitionBlock) {
            ImmutableList.Builder blocks = ImmutableList.builder();
            for (int i = 0; i < channelCount; ++i) {
                blocks.add((Object)standardBlock.get());
            }
            blocks.add((Object)partitionBlock);
            return new Page(positionCount, (Block[])blocks.build().toArray((Object[])new Block[0]));
        }

        private PartitionedOutputBuffer createPartitionedOutputBuffer() {
            PipelinedOutputBuffers buffers = PipelinedOutputBuffers.createInitial((PipelinedOutputBuffers.BufferType)PipelinedOutputBuffers.BufferType.PARTITIONED);
            for (int partition = 0; partition < this.partitionCount; ++partition) {
                buffers = buffers.withBuffer(new PipelinedOutputBuffers.OutputBufferId(partition), partition);
            }
            return this.createPartitionedBuffer(buffers.withNoMoreBufferIds(), DataSize.of((long)Long.MAX_VALUE, (DataSize.Unit)DataSize.Unit.BYTE));
        }

        private PartitionedOutputOperator createPartitionedOutputOperator() {
            BucketPartitionFunction partitionFunction = new BucketPartitionFunction((BucketFunction)new HashBucketFunction((HashGenerator)new PrecomputedHashGenerator(0), this.partitionCount), IntStream.range(0, this.partitionCount).toArray());
            PagesSerdeFactory serdeFactory = TestingPagesSerdes.createTestingPagesSerdeFactory(this.compressionCodec);
            PartitionedOutputBuffer buffer = this.createPartitionedOutputBuffer();
            PartitionedOutputOperator.PartitionedOutputFactory operatorFactory = new PartitionedOutputOperator.PartitionedOutputFactory((PartitionFunction)partitionFunction, (List)ImmutableList.of((Object)(this.types.size() - 1)), (List)ImmutableList.of(Optional.empty()), false, OptionalInt.empty(), (OutputBuffer)buffer, MAX_PARTITION_BUFFER_SIZE, POSITIONS_APPENDER_FACTORY, Optional.empty(), AggregatedMemoryContext.newSimpleAggregatedMemoryContext(), 0, Optional.empty());
            return (PartitionedOutputOperator)operatorFactory.createOutputOperator(0, new PlanNodeId("plan-node-0"), this.types, Function.identity(), serdeFactory).createOperator(this.createDriverContext());
        }

        private DriverContext createDriverContext() {
            return TestingTaskContext.builder((Executor)EXECUTOR, (ScheduledExecutorService)SCHEDULER, (Session)SessionTestUtils.TEST_SESSION).build().addPipelineContext(0, true, true, false).addDriverContext();
        }

        private TestingPartitionedOutputBuffer createPartitionedBuffer(PipelinedOutputBuffers buffers, DataSize dataSize) {
            return new TestingPartitionedOutputBuffer("task-instance-id", new OutputBufferStateMachine(new TaskId(new StageId(new QueryId("query"), 0), 0, 0), (Executor)SCHEDULER), buffers, dataSize, () -> new SimpleLocalMemoryContext(AggregatedMemoryContext.newSimpleAggregatedMemoryContext(), "test"), SCHEDULER, this.blackhole);
        }

        public static enum TestType {
            BIGINT((Type)BigintType.BIGINT, 5000),
            BIGINT_PARTITION_CHANNEL_SKEWED((Type)BigintType.BIGINT, 5000, (types, positionCount, nullRate) -> BenchmarkData.page(positionCount, types.size(), () -> BlockAssertions.createRandomBlockForType((Type)BigintType.BIGINT, positionCount, nullRate), (Block)BlockAssertions.createRandomLongsBlock(positionCount, 2))),
            DICTIONARY_BIGINT((Type)BigintType.BIGINT, 5000, PageTestUtils::createRandomDictionaryPage),
            RLE_BIGINT((Type)BigintType.BIGINT, 3000, PageTestUtils::createRandomRlePage),
            BIGINT_PARTITION_CHANNEL_20_PERCENT((Type)BigintType.BIGINT, 3000, (types, positionCount, nullRate) -> BenchmarkData.page(positionCount, types.size(), () -> BlockAssertions.createRandomBlockForType((Type)BigintType.BIGINT, positionCount, nullRate), (Block)BlockAssertions.createLongsBlock((Iterable)LongStream.range(0L, positionCount).mapToObj(value -> value % (long)(positionCount / 5)).collect(ImmutableList.toImmutableList())))),
            BIGINT_PARTITION_CHANNEL_DICTIONARY_20_PERCENT((Type)BigintType.BIGINT, 3000, (types, positionCount, nullRate) -> TestType.createDictionaryPartitionChannelPage(types, positionCount, nullRate, positionCount / 5)),
            BIGINT_PARTITION_CHANNEL_DICTIONARY_50_PERCENT((Type)BigintType.BIGINT, 3000, (types, positionCount, nullRate) -> TestType.createDictionaryPartitionChannelPage(types, positionCount, nullRate, positionCount / 2)),
            BIGINT_PARTITION_CHANNEL_DICTIONARY_80_PERCENT((Type)BigintType.BIGINT, 3000, (types, positionCount, nullRate) -> TestType.createDictionaryPartitionChannelPage(types, positionCount, nullRate, (int)((double)positionCount * 0.8))),
            BIGINT_PARTITION_CHANNEL_DICTIONARY_100_PERCENT((Type)BigintType.BIGINT, 3000, (types, positionCount, nullRate) -> TestType.createDictionaryPartitionChannelPage(types, positionCount, nullRate, positionCount)),
            BIGINT_PARTITION_CHANNEL_DICTIONARY_100_PERCENT_MINUS_1((Type)BigintType.BIGINT, 3000, (types, positionCount, nullRate) -> TestType.createDictionaryPartitionChannelPage(types, positionCount, nullRate, positionCount - 1)),
            BIGINT_PARTITION_CHANNEL_RLE((Type)BigintType.BIGINT, 5000, (types, positionCount, nullRate) -> BenchmarkData.page(positionCount, types.size(), () -> BlockAssertions.createRandomBlockForType((Type)BigintType.BIGINT, positionCount, nullRate), BlockAssertions.createRepeatedValuesBlock(42L, positionCount))),
            BIGINT_PARTITION_CHANNEL_RLE_NULL((Type)BigintType.BIGINT, 20, (types, positionCount, nullRate) -> BenchmarkData.page(positionCount, types.size(), () -> BlockAssertions.createRandomBlockForType((Type)BigintType.BIGINT, positionCount, nullRate), RunLengthEncodedBlock.create((Block)BlockAssertions.createLongsBlock(new Long[]{null}), (int)positionCount))),
            LONG_DECIMAL((Type)DecimalType.createDecimalType((int)19), 5000),
            DICTIONARY_LONG_DECIMAL((Type)DecimalType.createDecimalType((int)19), 5000, PageTestUtils::createRandomDictionaryPage),
            INTEGER((Type)IntegerType.INTEGER, 5000),
            DICTIONARY_INTEGER((Type)IntegerType.INTEGER, 5000, PageTestUtils::createRandomDictionaryPage),
            SMALLINT((Type)SmallintType.SMALLINT, 5000),
            DICTIONARY_SMALLINT((Type)SmallintType.SMALLINT, 5000, PageTestUtils::createRandomDictionaryPage),
            BOOLEAN((Type)BooleanType.BOOLEAN, 5000),
            DICTIONARY_BOOLEAN((Type)BooleanType.BOOLEAN, 5000, PageTestUtils::createRandomDictionaryPage),
            VARCHAR((Type)VarcharType.VARCHAR, 5000),
            DICTIONARY_VARCHAR((Type)VarcharType.VARCHAR, 5000, PageTestUtils::createRandomDictionaryPage),
            ARRAY_BIGINT((Type)new ArrayType((Type)BigintType.BIGINT), 1000),
            ARRAY_VARCHAR((Type)new ArrayType((Type)VarcharType.VARCHAR), 1000),
            ARRAY_ARRAY_BIGINT((Type)new ArrayType((Type)new ArrayType((Type)BigintType.BIGINT)), 1000),
            MAP_BIGINT_BIGINT((Type)BenchmarkPartitionedOutputOperator.createMapType((Type)BigintType.BIGINT, (Type)BigintType.BIGINT), 1000),
            MAP_BIGINT_MAP_BIGINT_BIGINT((Type)BenchmarkPartitionedOutputOperator.createMapType((Type)BigintType.BIGINT, (Type)BenchmarkPartitionedOutputOperator.createMapType((Type)BigintType.BIGINT, (Type)BigintType.BIGINT)), 1000),
            ROW_BIGINT_BIGINT((Type)BenchmarkPartitionedOutputOperator.rowTypeWithDefaultFieldNames((List<Type>)ImmutableList.of((Object)BigintType.BIGINT, (Object)BigintType.BIGINT)), 1000),
            ROW_ARRAY_BIGINT_ARRAY_BIGINT((Type)BenchmarkPartitionedOutputOperator.rowTypeWithDefaultFieldNames((List<Type>)ImmutableList.of((Object)new ArrayType((Type)BigintType.BIGINT), (Object)new ArrayType((Type)BigintType.BIGINT))), 1000),
            ROW_RLE_BIGINT_BIGINT((Type)BenchmarkPartitionedOutputOperator.rowTypeWithDefaultFieldNames((List<Type>)ImmutableList.of((Object)BigintType.BIGINT, (Object)BigintType.BIGINT)), 1000, (types, positionCount, nullRate) -> PageTestUtils.createPage(types, positionCount, Optional.of(ImmutableList.of((Object)0)), (List)types.stream().map(type -> {
                boolean[] isNull = null;
                if (nullRate > 0.0f) {
                    isNull = new boolean[positionCount];
                    Set<Integer> nullPositions = BlockAssertions.chooseNullPositions(positionCount, nullRate);
                    for (int nullPosition : nullPositions) {
                        isNull[nullPosition] = true;
                    }
                }
                return RowBlock.fromNotNullSuppressedFieldBlocks((int)positionCount, Optional.ofNullable(isNull), (Block[])new Block[]{RunLengthEncodedBlock.create((Block)BlockAssertions.createLongsBlock(-65128734213L), (int)positionCount), BlockAssertions.createRandomLongsBlock(positionCount, nullRate)});
            }).collect(ImmutableList.toImmutableList())));

            private final Type type;
            private final int pageCount;
            private final PageGenerator pageGenerator;

            private TestType(Type type, int pageCount) {
                this(type, pageCount, PageTestUtils::createRandomPage);
            }

            private TestType(Type type, int pageCount, PageGenerator pageGenerator) {
                this.type = Objects.requireNonNull(type, "type is null");
                this.pageCount = pageCount;
                this.pageGenerator = Objects.requireNonNull(pageGenerator, "pageGenerator is null");
            }

            public PageGenerator getPageGenerator() {
                return this.pageGenerator;
            }

            public int getPageCount() {
                return this.pageCount;
            }

            public List<Type> getTypes(int channelCount) {
                return Collections.nCopies(channelCount, this.type);
            }

            private static Page createDictionaryPartitionChannelPage(List<Type> types, int positionCount, float nullRate, int dictionarySize) {
                return BenchmarkData.page(positionCount, types.size(), () -> BlockAssertions.createRandomBlockForType((Type)types.get(0), positionCount, nullRate), BlockAssertions.createLongDictionaryBlock(0, positionCount, dictionarySize));
            }

            static interface PageGenerator {
                public Page createPage(List<Type> var1, int var2, float var3);
            }
        }

        private static class TestingPartitionedOutputBuffer
        extends PartitionedOutputBuffer {
            private final Blackhole blackhole;

            public TestingPartitionedOutputBuffer(String taskInstanceId, OutputBufferStateMachine stateMachine, PipelinedOutputBuffers outputBuffers, DataSize maxBufferSize, Supplier<LocalMemoryContext> memoryContextSupplier, Executor notificationExecutor, Blackhole blackhole) {
                super(taskInstanceId, stateMachine, outputBuffers, maxBufferSize, memoryContextSupplier, notificationExecutor);
                this.blackhole = blackhole;
            }

            public void enqueue(int partitionNumber, List<Slice> pages) {
                if (this.blackhole != null) {
                    this.blackhole.consume(pages);
                }
            }
        }
    }
}

