/*
 * Decompiled with CFR 0.152.
 */
package io.trino.operator.output;

import com.google.common.base.Preconditions;
import com.google.common.collect.ArrayListMultimap;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Multimap;
import com.google.common.util.concurrent.ListenableFuture;
import io.airlift.concurrent.Threads;
import io.airlift.slice.Slice;
import io.airlift.units.DataSize;
import io.trino.Session;
import io.trino.block.BlockAssertions;
import io.trino.execution.StateMachine;
import io.trino.execution.buffer.BufferResult;
import io.trino.execution.buffer.BufferState;
import io.trino.execution.buffer.OutputBuffer;
import io.trino.execution.buffer.OutputBufferInfo;
import io.trino.execution.buffer.OutputBuffers;
import io.trino.execution.buffer.PagesSerde;
import io.trino.execution.buffer.PagesSerdeFactory;
import io.trino.operator.BucketPartitionFunction;
import io.trino.operator.DriverContext;
import io.trino.operator.OperatorContext;
import io.trino.operator.OperatorFactories;
import io.trino.operator.OutputFactory;
import io.trino.operator.PartitionFunction;
import io.trino.operator.TaskContext;
import io.trino.operator.TrinoOperatorFactories;
import io.trino.operator.output.PartitionedOutputOperator;
import io.trino.spi.Page;
import io.trino.spi.block.Block;
import io.trino.spi.block.BlockEncodingSerde;
import io.trino.spi.block.DictionaryBlock;
import io.trino.spi.block.RunLengthEncodedBlock;
import io.trino.spi.block.TestingBlockEncodingSerde;
import io.trino.spi.predicate.NullableValue;
import io.trino.spi.type.ArrayType;
import io.trino.spi.type.BigintType;
import io.trino.spi.type.BooleanType;
import io.trino.spi.type.CharType;
import io.trino.spi.type.DecimalType;
import io.trino.spi.type.DoubleType;
import io.trino.spi.type.IntegerType;
import io.trino.spi.type.SmallintType;
import io.trino.spi.type.TinyintType;
import io.trino.spi.type.Type;
import io.trino.spi.type.UuidType;
import io.trino.spi.type.VarbinaryType;
import io.trino.spi.type.VarcharType;
import io.trino.sql.planner.SystemPartitioningHandle;
import io.trino.sql.planner.plan.PlanNodeId;
import io.trino.testing.TestingSession;
import io.trino.testing.TestingTaskContext;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.OptionalInt;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.function.Function;
import java.util.stream.IntStream;
import java.util.stream.Stream;
import org.assertj.core.api.Assertions;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;

@Test(singleThreaded=true)
public class TestPartitionedOutputOperator {
    private static final OperatorFactories TRINO_OPERATOR_FACTORIES = new TrinoOperatorFactories();
    private static final Session TEST_SESSION = TestingSession.testSessionBuilder().build();
    private static final DataSize MAX_MEMORY = DataSize.of((long)50L, (DataSize.Unit)DataSize.Unit.MEGABYTE);
    private static final DataSize PARTITION_MAX_MEMORY = DataSize.of((long)5L, (DataSize.Unit)DataSize.Unit.MEGABYTE);
    private static final int POSITIONS_PER_PAGE = 8;
    private static final int PARTITION_COUNT = 2;
    private static final PagesSerdeFactory PAGES_SERDE_FACTORY = new PagesSerdeFactory((BlockEncodingSerde)new TestingBlockEncodingSerde(), false);
    private static final PagesSerde PAGES_SERDE = PAGES_SERDE_FACTORY.createPagesSerde();
    private final Session testSession;
    private final OperatorFactories operatorFactories;
    private ExecutorService executor;
    private ScheduledExecutorService scheduledExecutor;
    private TestOutputBuffer outputBuffer;

    public TestPartitionedOutputOperator() {
        this(TEST_SESSION, TRINO_OPERATOR_FACTORIES);
    }

    protected TestPartitionedOutputOperator(Session testSession, OperatorFactories operatorFactories) {
        this.testSession = testSession;
        this.operatorFactories = operatorFactories;
    }

    @BeforeClass
    public void setUpClass() {
        this.executor = Executors.newCachedThreadPool(Threads.daemonThreadsNamed((String)(this.getClass().getSimpleName() + "-executor-%s")));
        this.scheduledExecutor = Executors.newScheduledThreadPool(1, Threads.daemonThreadsNamed((String)(this.getClass().getSimpleName() + "-scheduledExecutor-%s")));
    }

    @AfterClass(alwaysRun=true)
    public void tearDownClass() {
        this.executor.shutdownNow();
        this.executor = null;
        this.scheduledExecutor.shutdownNow();
        this.scheduledExecutor = null;
    }

    @BeforeMethod
    public void setUp() {
        this.outputBuffer = new TestOutputBuffer();
    }

    @Test
    public void testOutputForSimplePage() {
        PartitionedOutputOperator partitionedOutputOperator = this.partitionedOutputOperator(new Type[]{BigintType.BIGINT}).build();
        Page page = new Page(new Block[]{BlockAssertions.createLongSequenceBlock(0, 8)});
        List<Object> expected = TestPartitionedOutputOperator.readLongs(Stream.of(page), 0);
        TestPartitionedOutputOperator.processPages(partitionedOutputOperator, page);
        List<Object> partitioned = TestPartitionedOutputOperator.readLongs(this.outputBuffer.getEnqueuedDeserialized(), 0);
        Assertions.assertThat(partitioned).containsExactlyInAnyOrderElementsOf(expected);
        OperatorContext operatorContext = partitionedOutputOperator.getOperatorContext();
        Assert.assertEquals((long)operatorContext.getOutputDataSize().getTotalCount(), (long)page.getSizeInBytes());
        Assert.assertEquals((long)operatorContext.getOutputPositions().getTotalCount(), (long)page.getPositionCount());
    }

    @Test
    public void testOutputForEmptyPage() {
        PartitionedOutputOperator partitionedOutputOperator = this.partitionedOutputOperator(new Type[]{BigintType.BIGINT}).build();
        Page page = new Page(new Block[]{BlockAssertions.createLongsBlock((Iterable<Long>)ImmutableList.of())});
        TestPartitionedOutputOperator.processPages(partitionedOutputOperator, page);
        List<Object> partitioned = TestPartitionedOutputOperator.readLongs(this.outputBuffer.getEnqueuedDeserialized(), 0);
        Assertions.assertThat(partitioned).isEmpty();
    }

    @Test
    public void testOutputForPageWithNoBlockPartitionFunction() {
        PartitionedOutputOperator partitionedOutputOperator = this.partitionedOutputOperator(new Type[]{BigintType.BIGINT}).withPartitionFunction((PartitionFunction)new BucketPartitionFunction(SystemPartitioningHandle.SystemPartitionFunction.ROUND_ROBIN.createBucketFunction(null, false, 2, null), IntStream.range(0, 2).toArray())).withPartitionChannels((ImmutableList<Integer>)ImmutableList.of()).build();
        Page page = new Page(new Block[]{BlockAssertions.createLongSequenceBlock(0, 8)});
        TestPartitionedOutputOperator.processPages(partitionedOutputOperator, page);
        List<Object> partition0 = TestPartitionedOutputOperator.readLongs(this.outputBuffer.getEnqueuedDeserialized(0), 0);
        Assertions.assertThat(partition0).containsExactly(new Object[]{0L, 2L, 4L, 6L});
        List<Object> partition1 = TestPartitionedOutputOperator.readLongs(this.outputBuffer.getEnqueuedDeserialized(1), 0);
        Assertions.assertThat(partition1).containsExactly(new Object[]{1L, 3L, 5L, 7L});
    }

    @Test
    public void testOutputForMultipleSimplePages() {
        PartitionedOutputOperator partitionedOutputOperator = this.partitionedOutputOperator(new Type[]{BigintType.BIGINT}).build();
        Page page1 = new Page(new Block[]{BlockAssertions.createLongSequenceBlock(0, 8)});
        Page page2 = new Page(new Block[]{BlockAssertions.createLongSequenceBlock(1, 8)});
        Page page3 = new Page(new Block[]{BlockAssertions.createLongSequenceBlock(2, 8)});
        List<Object> expected = TestPartitionedOutputOperator.readLongs(Stream.of(page1, page2, page3), 0);
        TestPartitionedOutputOperator.processPages(partitionedOutputOperator, page1, page2, page3);
        List<Object> partitioned = TestPartitionedOutputOperator.readLongs(this.outputBuffer.getEnqueuedDeserialized(), 0);
        Assertions.assertThat(partitioned).containsExactlyInAnyOrderElementsOf(expected);
    }

    @Test
    public void testOutputForSimplePageWithReplication() {
        PartitionedOutputOperator partitionedOutputOperator = this.partitionedOutputOperator(new Type[]{BigintType.BIGINT}).replicate().build();
        Page page = new Page(new Block[]{BlockAssertions.createLongsBlock(0L, 1L, 2L, 3L, null)});
        TestPartitionedOutputOperator.processPages(partitionedOutputOperator, page);
        List<Object> partition0 = TestPartitionedOutputOperator.readLongs(this.outputBuffer.getEnqueuedDeserialized(0), 0);
        Assertions.assertThat(partition0).containsExactly(new Object[]{0L, 2L, null});
        List<Object> partition1 = TestPartitionedOutputOperator.readLongs(this.outputBuffer.getEnqueuedDeserialized(1), 0);
        Assertions.assertThat(partition1).containsExactly(new Object[]{0L, 1L, 3L});
    }

    @Test
    public void testOutputForSimplePageWithNullChannel() {
        PartitionedOutputOperator partitionedOutputOperator = this.partitionedOutputOperator(new Type[]{BigintType.BIGINT}).withNullChannel(0).build();
        Page page = new Page(new Block[]{BlockAssertions.createLongsBlock(0L, 1L, 2L, 3L, null)});
        TestPartitionedOutputOperator.processPages(partitionedOutputOperator, page);
        List<Object> partition0 = TestPartitionedOutputOperator.readLongs(this.outputBuffer.getEnqueuedDeserialized(0), 0);
        Assertions.assertThat(partition0).containsExactlyInAnyOrder(new Object[]{0L, 2L, null});
        List<Object> partition1 = TestPartitionedOutputOperator.readLongs(this.outputBuffer.getEnqueuedDeserialized(1), 0);
        Assertions.assertThat(partition1).containsExactlyInAnyOrder(new Object[]{1L, 3L, null});
    }

    @Test
    public void testOutputForSimplePageWithPartitionConstant() {
        PartitionedOutputOperator partitionedOutputOperator = this.partitionedOutputOperator(new Type[]{BigintType.BIGINT}).withPartitionConstants((List<Optional<NullableValue>>)ImmutableList.of(Optional.of(new NullableValue((Type)BigintType.BIGINT, (Object)1L)))).withPartitionChannels(-1).build();
        Page page = new Page(new Block[]{BlockAssertions.createLongsBlock(0L, 1L, 2L, 3L, null)});
        List<Object> allValues = TestPartitionedOutputOperator.readLongs(Stream.of(page), 0);
        TestPartitionedOutputOperator.processPages(partitionedOutputOperator, page);
        List<Object> partition0 = TestPartitionedOutputOperator.readLongs(this.outputBuffer.getEnqueuedDeserialized(0), 0);
        Assertions.assertThat(partition0).isEmpty();
        List<Object> partition1 = TestPartitionedOutputOperator.readLongs(this.outputBuffer.getEnqueuedDeserialized(1), 0);
        Assertions.assertThat(partition1).containsExactlyElementsOf(allValues);
    }

    @Test
    public void testOutputForSimplePageWithPartitionConstantAndHashBlock() {
        PartitionedOutputOperator partitionedOutputOperator = this.partitionedOutputOperator(new Type[]{BigintType.BIGINT}).withPartitionConstants((List<Optional<NullableValue>>)ImmutableList.of(Optional.empty(), Optional.of(new NullableValue((Type)BigintType.BIGINT, (Object)1L)))).withPartitionChannels(0, -1).withHashChannels(0, 1).build();
        Page page = new Page(new Block[]{BlockAssertions.createLongsBlock(0L, 1L, 2L, 3L)});
        TestPartitionedOutputOperator.processPages(partitionedOutputOperator, page);
        List<Object> partition0 = TestPartitionedOutputOperator.readLongs(this.outputBuffer.getEnqueuedDeserialized(0), 0);
        Assertions.assertThat(partition0).containsExactly(new Object[]{1L, 3L});
        List<Object> partition1 = TestPartitionedOutputOperator.readLongs(this.outputBuffer.getEnqueuedDeserialized(1), 0);
        Assertions.assertThat(partition1).containsExactly(new Object[]{0L, 2L});
    }

    @Test
    public void testPartitionPositionsWithRleNotNull() {
        PartitionedOutputOperator partitionedOutputOperator = this.partitionedOutputOperator(new Type[]{BigintType.BIGINT, BigintType.BIGINT}).build();
        Page page = new Page(new Block[]{BlockAssertions.createRLEBlock(0L, 8), BlockAssertions.createLongSequenceBlock(0, 8)});
        TestPartitionedOutputOperator.processPages(partitionedOutputOperator, page);
        List<Object> partition0 = TestPartitionedOutputOperator.readLongs(this.outputBuffer.getEnqueuedDeserialized(0), 1);
        Assertions.assertThat(partition0).containsExactlyElementsOf(TestPartitionedOutputOperator.readLongs(Stream.of(page), 1));
        List<Object> partition0HashBlock = TestPartitionedOutputOperator.readLongs(this.outputBuffer.getEnqueuedDeserialized(0), 0);
        Assertions.assertThat(partition0HashBlock).containsOnly(new Object[]{0L}).hasSize(8);
        Assertions.assertThat(this.outputBuffer.getEnqueuedDeserialized(1)).isEmpty();
    }

    @Test
    public void testPartitionPositionsWithRleNotNullWithReplication() {
        PartitionedOutputOperator partitionedOutputOperator = this.partitionedOutputOperator(new Type[]{BigintType.BIGINT, BigintType.BIGINT}).replicate().build();
        Page page = new Page(new Block[]{BlockAssertions.createRLEBlock(0L, 8), BlockAssertions.createLongSequenceBlock(0, 8)});
        TestPartitionedOutputOperator.processPages(partitionedOutputOperator, page);
        List<Object> partition0 = TestPartitionedOutputOperator.readLongs(this.outputBuffer.getEnqueuedDeserialized(0), 1);
        Assertions.assertThat(partition0).containsExactlyElementsOf(TestPartitionedOutputOperator.readLongs(Stream.of(page), 1));
        List<Object> partition1 = TestPartitionedOutputOperator.readLongs(this.outputBuffer.getEnqueuedDeserialized(1), 1);
        Assertions.assertThat(partition1).containsExactly(new Object[]{0L});
    }

    @Test
    public void testPartitionPositionsWithRleNullWithNullChannel() {
        PartitionedOutputOperator partitionedOutputOperator = this.partitionedOutputOperator(new Type[]{BigintType.BIGINT, BigintType.BIGINT}).withNullChannel(0).build();
        Page page = new Page(new Block[]{new RunLengthEncodedBlock(BlockAssertions.createLongsBlock(new Long[]{null}), 8), BlockAssertions.createLongSequenceBlock(0, 8)});
        TestPartitionedOutputOperator.processPages(partitionedOutputOperator, page);
        List<Object> partition0 = TestPartitionedOutputOperator.readLongs(this.outputBuffer.getEnqueuedDeserialized(0), 1);
        Assertions.assertThat(partition0).containsExactlyElementsOf(TestPartitionedOutputOperator.readLongs(Stream.of(page), 1));
        List<Object> partition1 = TestPartitionedOutputOperator.readLongs(this.outputBuffer.getEnqueuedDeserialized(1), 1);
        Assertions.assertThat(partition1).containsExactlyElementsOf(TestPartitionedOutputOperator.readLongs(Stream.of(page), 1));
    }

    @Test
    public void testOutputForDictionaryBlock() {
        PartitionedOutputOperator partitionedOutputOperator = this.partitionedOutputOperator(new Type[]{BigintType.BIGINT}).build();
        Page page = new Page(new Block[]{BlockAssertions.createLongDictionaryBlock(0, 10)});
        TestPartitionedOutputOperator.processPages(partitionedOutputOperator, page);
        List<Object> partition0 = TestPartitionedOutputOperator.readLongs(this.outputBuffer.getEnqueuedDeserialized(0), 0);
        Assertions.assertThat(partition0).containsExactlyElementsOf(Collections.nCopies(5, 0L));
        List<Object> partition1 = TestPartitionedOutputOperator.readLongs(this.outputBuffer.getEnqueuedDeserialized(1), 0);
        Assertions.assertThat(partition1).containsExactlyElementsOf(Collections.nCopies(5, 1L));
    }

    @Test
    public void testOutputForOneValueDictionaryBlock() {
        PartitionedOutputOperator partitionedOutputOperator = this.partitionedOutputOperator(new Type[]{BigintType.BIGINT}).build();
        Page page = new Page(new Block[]{new DictionaryBlock(BlockAssertions.createLongsBlock(0), new int[]{0, 0, 0, 0})});
        TestPartitionedOutputOperator.processPages(partitionedOutputOperator, page);
        List<Object> partition0 = TestPartitionedOutputOperator.readLongs(this.outputBuffer.getEnqueuedDeserialized(0), 0);
        Assertions.assertThat(partition0).containsExactlyElementsOf(Collections.nCopies(4, 0L));
        List<Object> partition1 = TestPartitionedOutputOperator.readLongs(this.outputBuffer.getEnqueuedDeserialized(1), 0);
        Assertions.assertThat(partition1).isEmpty();
    }

    @Test
    public void testOutputForViewDictionaryBlock() {
        PartitionedOutputOperator partitionedOutputOperator = this.partitionedOutputOperator(new Type[]{BigintType.BIGINT}).build();
        Page page = new Page(new Block[]{new DictionaryBlock(BlockAssertions.createLongSequenceBlock(4, 8), new int[]{1, 0, 3, 2})});
        TestPartitionedOutputOperator.processPages(partitionedOutputOperator, page);
        List<Object> partition0 = TestPartitionedOutputOperator.readLongs(this.outputBuffer.getEnqueuedDeserialized(0), 0);
        Assertions.assertThat(partition0).containsExactlyInAnyOrder(new Object[]{4L, 6L});
        List<Object> partition1 = TestPartitionedOutputOperator.readLongs(this.outputBuffer.getEnqueuedDeserialized(1), 0);
        Assertions.assertThat(partition1).containsExactlyInAnyOrder(new Object[]{5L, 7L});
    }

    @Test(dataProvider="types")
    public void testOutputForSimplePageWithType(Type type) {
        PartitionedOutputOperator partitionedOutputOperator = this.partitionedOutputOperator(new Type[]{BigintType.BIGINT, type}).build();
        Page page = new Page(new Block[]{BlockAssertions.createLongSequenceBlock(0, 8), this.createBlockForType(type, 8)});
        List<Object> expected = TestPartitionedOutputOperator.readChannel(Stream.of(page), 1, type);
        TestPartitionedOutputOperator.processPages(partitionedOutputOperator, page);
        List<Object> partitioned = TestPartitionedOutputOperator.readChannel(this.outputBuffer.getEnqueuedDeserialized(), 1, type);
        Assertions.assertThat(partitioned).containsExactlyInAnyOrderElementsOf(expected);
    }

    @DataProvider(name="types")
    public static Object[][] types() {
        return new Object[][]{{BigintType.BIGINT}, {BooleanType.BOOLEAN}, {IntegerType.INTEGER}, {CharType.createCharType((long)10L)}, {VarcharType.createUnboundedVarcharType()}, {DoubleType.DOUBLE}, {SmallintType.SMALLINT}, {TinyintType.TINYINT}, {UuidType.UUID}, {VarbinaryType.VARBINARY}, {DecimalType.createDecimalType((int)1)}, {DecimalType.createDecimalType((int)19)}, {new ArrayType((Type)BigintType.BIGINT)}};
    }

    private Block createBlockForType(Type type, int positionsPerPage) {
        return BlockAssertions.createRandomBlockForType(type, positionsPerPage, 0.2f);
    }

    private static void processPages(PartitionedOutputOperator partitionedOutputOperator, Page ... pages) {
        for (Page page : pages) {
            partitionedOutputOperator.addInput(page);
        }
        partitionedOutputOperator.finish();
    }

    private static List<Object> readLongs(Stream<Page> pages, int channel) {
        return TestPartitionedOutputOperator.readChannel(pages, channel, (Type)BigintType.BIGINT);
    }

    private static List<Object> readChannel(Stream<Page> pages, int channel, Type type) {
        ArrayList result = new ArrayList();
        pages.forEach(page -> {
            Block block = page.getBlock(channel);
            for (int i = 0; i < block.getPositionCount(); ++i) {
                if (block.isNull(i)) {
                    result.add(null);
                    continue;
                }
                result.add(type.getObjectValue(null, block, i));
            }
        });
        return Collections.unmodifiableList(result);
    }

    private PartitionedOutputOperatorBuilder partitionedOutputOperator(Type ... types) {
        return this.partitionedOutputOperator((List<Type>)ImmutableList.copyOf((Object[])types));
    }

    private PartitionedOutputOperatorBuilder partitionedOutputOperator(List<Type> types) {
        return this.partitionedOutputOperator().withTypes(types);
    }

    private PartitionedOutputOperatorBuilder partitionedOutputOperator() {
        return new PartitionedOutputOperatorBuilder(this.operatorFactories, this.testSession, this.executor, this.scheduledExecutor, this.outputBuffer);
    }

    private static class TestOutputBuffer
    implements OutputBuffer {
        private final Multimap<Integer, Slice> enqueued = ArrayListMultimap.create();

        private TestOutputBuffer() {
        }

        public Stream<Page> getEnqueuedDeserialized() {
            return this.getEnqueued().stream().map(arg_0 -> ((PagesSerde)PAGES_SERDE).deserialize(arg_0));
        }

        public List<Slice> getEnqueued() {
            return ImmutableList.copyOf((Collection)this.enqueued.values());
        }

        public Stream<Page> getEnqueuedDeserialized(int partition) {
            return this.getEnqueued(partition).stream().map(arg_0 -> ((PagesSerde)PAGES_SERDE).deserialize(arg_0));
        }

        public List<Slice> getEnqueued(int partition) {
            Collection serializedPages = this.enqueued.get((Object)partition);
            return serializedPages == null ? ImmutableList.of() : ImmutableList.copyOf((Collection)serializedPages);
        }

        public void enqueue(int partition, List<Slice> pages) {
            this.enqueued.putAll((Object)partition, pages);
        }

        public OutputBufferInfo getInfo() {
            return null;
        }

        public boolean isFinished() {
            return false;
        }

        public double getUtilization() {
            return 0.0;
        }

        public boolean isOverutilized() {
            return false;
        }

        public void addStateChangeListener(StateMachine.StateChangeListener<BufferState> stateChangeListener) {
        }

        public void setOutputBuffers(OutputBuffers newOutputBuffers) {
        }

        public ListenableFuture<BufferResult> get(OutputBuffers.OutputBufferId bufferId, long token, DataSize maxSize) {
            return null;
        }

        public void acknowledge(OutputBuffers.OutputBufferId bufferId, long token) {
        }

        public void abort(OutputBuffers.OutputBufferId bufferId) {
        }

        public ListenableFuture<Void> isFull() {
            return null;
        }

        public void enqueue(List<Slice> pages) {
        }

        public void setNoMorePages() {
        }

        public void destroy() {
        }

        public void fail() {
        }

        public long getPeakMemoryUsage() {
            return 0L;
        }
    }

    static class PartitionedOutputOperatorBuilder {
        private final OperatorFactories operatorFactories;
        private final Session testSession;
        private final ExecutorService executor;
        private final ScheduledExecutorService scheduledExecutor;
        private final OutputBuffer outputBuffer;
        private ImmutableList<Integer> partitionChannels = ImmutableList.of((Object)0);
        private List<Optional<NullableValue>> partitionConstants = ImmutableList.of();
        private PartitionFunction partitionFunction = new SumModuloPartitionFunction(2, 0);
        private boolean shouldReplicate;
        private OptionalInt nullChannel = OptionalInt.empty();
        private List<Type> types;

        PartitionedOutputOperatorBuilder(OperatorFactories operatorFactories, Session testSession, ExecutorService executor, ScheduledExecutorService scheduledExecutor, OutputBuffer outputBuffer) {
            this.operatorFactories = Objects.requireNonNull(operatorFactories, "operatorFactories is null");
            this.testSession = Objects.requireNonNull(testSession, "testSession is null");
            this.executor = Objects.requireNonNull(executor, "executor is null");
            this.scheduledExecutor = Objects.requireNonNull(scheduledExecutor, "scheduledExecutor is null");
            this.outputBuffer = Objects.requireNonNull(outputBuffer, "outputBuffer is null");
        }

        public PartitionedOutputOperatorBuilder withPartitionChannels(Integer ... partitionChannels) {
            return this.withPartitionChannels((ImmutableList<Integer>)ImmutableList.copyOf((Object[])partitionChannels));
        }

        public PartitionedOutputOperatorBuilder withPartitionChannels(ImmutableList<Integer> partitionChannels) {
            this.partitionChannels = partitionChannels;
            return this;
        }

        public PartitionedOutputOperatorBuilder withPartitionConstants(List<Optional<NullableValue>> partitionConstants) {
            this.partitionConstants = partitionConstants;
            return this;
        }

        public PartitionedOutputOperatorBuilder withHashChannels(int ... hashChannels) {
            return this.withPartitionFunction(new SumModuloPartitionFunction(2, hashChannels));
        }

        public PartitionedOutputOperatorBuilder withPartitionFunction(PartitionFunction partitionFunction) {
            this.partitionFunction = partitionFunction;
            return this;
        }

        public PartitionedOutputOperatorBuilder replicate() {
            return this.withShouldReplicate(true);
        }

        public PartitionedOutputOperatorBuilder withShouldReplicate(boolean shouldReplicate) {
            this.shouldReplicate = shouldReplicate;
            return this;
        }

        public PartitionedOutputOperatorBuilder withNullChannel(int nullChannel) {
            return this.withNullChannel(OptionalInt.of(nullChannel));
        }

        public PartitionedOutputOperatorBuilder withNullChannel(OptionalInt nullChannel) {
            this.nullChannel = nullChannel;
            return this;
        }

        public PartitionedOutputOperatorBuilder withTypes(List<Type> types) {
            this.types = types;
            return this;
        }

        public PartitionedOutputOperator build() {
            TaskContext taskContext = TestingTaskContext.builder((Executor)this.executor, (ScheduledExecutorService)this.scheduledExecutor, (Session)this.testSession).setMemoryPoolSize(MAX_MEMORY).build();
            DriverContext driverContext = taskContext.addPipelineContext(0, true, true, false).addDriverContext();
            OutputBuffers buffers = OutputBuffers.createInitialEmptyOutputBuffers((OutputBuffers.BufferType)OutputBuffers.BufferType.PARTITIONED);
            for (int partition = 0; partition < 2; ++partition) {
                buffers = buffers.withBuffer(new OutputBuffers.OutputBufferId(partition), partition);
            }
            OutputFactory operatorFactory = this.operatorFactories.partitionedOutput(taskContext, this.partitionFunction, this.partitionChannels, this.partitionConstants, this.shouldReplicate, this.nullChannel, this.outputBuffer, PARTITION_MAX_MEMORY);
            return (PartitionedOutputOperator)operatorFactory.createOutputOperator(0, new PlanNodeId("plan-node-0"), this.types, Function.identity(), PAGES_SERDE_FACTORY).createOperator(driverContext);
        }
    }

    private static class SumModuloPartitionFunction
    implements PartitionFunction {
        private final int[] hashChannels;
        private final int partitionCount;

        SumModuloPartitionFunction(int partitionCount, int ... hashChannels) {
            Preconditions.checkArgument((partitionCount > 0 ? 1 : 0) != 0);
            this.partitionCount = partitionCount;
            this.hashChannels = hashChannels;
        }

        public int getPartitionCount() {
            return this.partitionCount;
        }

        public int getPartition(Page page, int position) {
            long value = 0L;
            for (int i = 0; i < this.hashChannels.length; ++i) {
                value += page.getBlock(this.hashChannels[i]).getLong(position, 0);
            }
            return Math.toIntExact(Math.abs(value) % (long)this.partitionCount);
        }
    }
}

