/*
 * Decompiled with CFR 0.152.
 */
package io.trino.operator.exchange;

import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.common.util.concurrent.ListenableFuture;
import io.airlift.units.DataSize;
import io.trino.SequencePageBuilder;
import io.trino.Session;
import io.trino.block.BlockAssertions;
import io.trino.execution.NodeTaskMap;
import io.trino.execution.scheduler.NodeScheduler;
import io.trino.execution.scheduler.NodeSchedulerConfig;
import io.trino.execution.scheduler.NodeSelectorFactory;
import io.trino.execution.scheduler.UniformNodeSelectorFactory;
import io.trino.metadata.InMemoryNodeManager;
import io.trino.metadata.InternalNode;
import io.trino.metadata.InternalNodeManager;
import io.trino.operator.HashGenerator;
import io.trino.operator.InterpretedHashGenerator;
import io.trino.operator.PageAssertions;
import io.trino.operator.exchange.LocalExchange;
import io.trino.operator.exchange.LocalExchangeBufferInfo;
import io.trino.operator.exchange.LocalExchangeSink;
import io.trino.operator.exchange.LocalExchangeSource;
import io.trino.operator.exchange.LocalPartitionGenerator;
import io.trino.spi.Page;
import io.trino.spi.block.Block;
import io.trino.spi.block.ValueBlock;
import io.trino.spi.connector.BucketFunction;
import io.trino.spi.connector.CatalogHandle;
import io.trino.spi.connector.ConnectorBucketNodeMap;
import io.trino.spi.connector.ConnectorNodePartitioningProvider;
import io.trino.spi.connector.ConnectorPartitioningHandle;
import io.trino.spi.connector.ConnectorSession;
import io.trino.spi.connector.ConnectorTransactionHandle;
import io.trino.spi.type.BigintType;
import io.trino.spi.type.Type;
import io.trino.spi.type.TypeOperators;
import io.trino.spi.type.VarcharType;
import io.trino.sql.planner.NodePartitioningManager;
import io.trino.sql.planner.PartitioningHandle;
import io.trino.sql.planner.SystemPartitioningHandle;
import io.trino.testing.TestingHandles;
import io.trino.testing.TestingSession;
import io.trino.testing.TestingTransactionHandle;
import io.trino.util.FinalizerService;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;
import java.util.function.Supplier;
import java.util.stream.IntStream;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInstance;
import org.junit.jupiter.api.parallel.Execution;
import org.junit.jupiter.api.parallel.ExecutionMode;

@TestInstance(value=TestInstance.Lifecycle.PER_METHOD)
@Execution(value=ExecutionMode.SAME_THREAD)
public class TestLocalExchange {
    private static final List<Type> TYPES = ImmutableList.of((Object)BigintType.BIGINT);
    private static final DataSize RETAINED_PAGE_SIZE = DataSize.ofBytes((long)TestLocalExchange.createPage(42).getRetainedSizeInBytes());
    private static final DataSize PAGE_SIZE = DataSize.ofBytes((long)TestLocalExchange.createPage(42).getSizeInBytes());
    private static final DataSize LOCAL_EXCHANGE_MAX_BUFFERED_BYTES = DataSize.of((long)32L, (DataSize.Unit)DataSize.Unit.MEGABYTE);
    private static final TypeOperators TYPE_OPERATORS = new TypeOperators();
    private static final Session SESSION = TestingSession.testSessionBuilder().build();
    private static final DataSize WRITER_SCALING_MIN_DATA_PROCESSED = DataSize.of((long)32L, (DataSize.Unit)DataSize.Unit.MEGABYTE);
    private static final Supplier<Long> TOTAL_MEMORY_USED = () -> 0L;
    private final ConcurrentMap<CatalogHandle, ConnectorNodePartitioningProvider> partitionManagers = new ConcurrentHashMap<CatalogHandle, ConnectorNodePartitioningProvider>();
    private NodePartitioningManager nodePartitioningManager;
    private final PartitioningHandle customScalingPartitioningHandle = this.getCustomScalingPartitioningHandle();

    @BeforeEach
    public void setUp() {
        NodeScheduler nodeScheduler = new NodeScheduler((NodeSelectorFactory)new UniformNodeSelectorFactory((InternalNodeManager)new InMemoryNodeManager(new InternalNode[0]), new NodeSchedulerConfig().setIncludeCoordinator(true), new NodeTaskMap(new FinalizerService())));
        this.nodePartitioningManager = new NodePartitioningManager(nodeScheduler, new TypeOperators(), catalogHandle -> {
            ConnectorNodePartitioningProvider result = (ConnectorNodePartitioningProvider)this.partitionManagers.get(catalogHandle);
            Preconditions.checkArgument((result != null ? 1 : 0) != 0, (String)"No partition manager for catalog handle: %s", (Object)catalogHandle);
            return result;
        });
    }

    @Test
    public void testGatherSingleWriter() {
        LocalExchange localExchange = new LocalExchange(this.nodePartitioningManager, SESSION, 8, SystemPartitioningHandle.SINGLE_DISTRIBUTION, (List)ImmutableList.of(), (List)ImmutableList.of(), Optional.empty(), DataSize.ofBytes((long)TestLocalExchange.retainedSizeOfPages(99)), TYPE_OPERATORS, WRITER_SCALING_MIN_DATA_PROCESSED, TOTAL_MEMORY_USED);
        this.run(localExchange, exchange -> {
            Assertions.assertThat((int)exchange.getBufferCount()).isEqualTo(1);
            TestLocalExchange.assertExchangeTotalBufferedBytes(exchange, 0);
            LocalExchange.LocalExchangeSinkFactory sinkFactory = exchange.createSinkFactory();
            sinkFactory.noMoreSinkFactories();
            LocalExchangeSource source = exchange.getNextSource();
            TestLocalExchange.assertSource(source, 0);
            LocalExchangeSink sink = sinkFactory.createSink();
            sinkFactory.close();
            TestLocalExchange.assertSinkCanWrite(sink);
            TestLocalExchange.assertSource(source, 0);
            ListenableFuture readFuture = source.waitForReading();
            Assertions.assertThat((boolean)readFuture.isDone()).isFalse();
            sink.addPage(TestLocalExchange.createPage(0));
            Assertions.assertThat((boolean)readFuture.isDone()).isTrue();
            TestLocalExchange.assertExchangeTotalBufferedBytes(exchange, 1);
            TestLocalExchange.assertSource(source, 1);
            sink.addPage(TestLocalExchange.createPage(1));
            TestLocalExchange.assertSource(source, 2);
            TestLocalExchange.assertExchangeTotalBufferedBytes(exchange, 2);
            TestLocalExchange.assertRemovePage(source, TestLocalExchange.createPage(0));
            TestLocalExchange.assertSource(source, 1);
            TestLocalExchange.assertExchangeTotalBufferedBytes(exchange, 1);
            TestLocalExchange.assertRemovePage(source, TestLocalExchange.createPage(1));
            TestLocalExchange.assertSource(source, 0);
            TestLocalExchange.assertExchangeTotalBufferedBytes(exchange, 0);
            sink.addPage(TestLocalExchange.createPage(2));
            sink.addPage(TestLocalExchange.createPage(3));
            TestLocalExchange.assertSource(source, 2);
            TestLocalExchange.assertExchangeTotalBufferedBytes(exchange, 2);
            sink.finish();
            TestLocalExchange.assertSinkFinished(sink);
            TestLocalExchange.assertSource(source, 2);
            TestLocalExchange.assertRemovePage(source, TestLocalExchange.createPage(2));
            TestLocalExchange.assertSource(source, 1);
            TestLocalExchange.assertSinkFinished(sink);
            TestLocalExchange.assertExchangeTotalBufferedBytes(exchange, 1);
            TestLocalExchange.assertRemovePage(source, TestLocalExchange.createPage(3));
            TestLocalExchange.assertSourceFinished(source);
            TestLocalExchange.assertExchangeTotalBufferedBytes(exchange, 0);
        });
    }

    @Test
    public void testRandom() {
        LocalExchange localExchange = new LocalExchange(this.nodePartitioningManager, SESSION, 2, SystemPartitioningHandle.FIXED_ARBITRARY_DISTRIBUTION, (List)ImmutableList.of(), (List)ImmutableList.of(), Optional.empty(), LOCAL_EXCHANGE_MAX_BUFFERED_BYTES, TYPE_OPERATORS, WRITER_SCALING_MIN_DATA_PROCESSED, TOTAL_MEMORY_USED);
        this.run(localExchange, exchange -> {
            Assertions.assertThat((int)exchange.getBufferCount()).isEqualTo(2);
            TestLocalExchange.assertExchangeTotalBufferedBytes(exchange, 0);
            LocalExchange.LocalExchangeSinkFactory sinkFactory = exchange.createSinkFactory();
            sinkFactory.noMoreSinkFactories();
            LocalExchangeSink sink = sinkFactory.createSink();
            TestLocalExchange.assertSinkCanWrite(sink);
            sinkFactory.close();
            LocalExchangeSource sourceA = exchange.getNextSource();
            TestLocalExchange.assertSource(sourceA, 0);
            LocalExchangeSource sourceB = exchange.getNextSource();
            TestLocalExchange.assertSource(sourceB, 0);
            for (int i = 0; i < 100; ++i) {
                Page page = TestLocalExchange.createPage(0);
                sink.addPage(page);
                TestLocalExchange.assertExchangeTotalBufferedBytes(exchange, i + 1);
                LocalExchangeBufferInfo bufferInfoA = sourceA.getBufferInfo();
                LocalExchangeBufferInfo bufferInfoB = sourceB.getBufferInfo();
                Assertions.assertThat((long)(bufferInfoA.getBufferedBytes() + bufferInfoB.getBufferedBytes())).isEqualTo(TestLocalExchange.retainedSizeOfPages(i + 1));
                Assertions.assertThat((int)(bufferInfoA.getBufferedPages() + bufferInfoB.getBufferedPages())).isEqualTo(i + 1);
            }
            Assertions.assertThat((sourceA.getBufferInfo().getBufferedPages() > 0 ? 1 : 0) != 0).isTrue();
            Assertions.assertThat((sourceB.getBufferInfo().getBufferedPages() > 0 ? 1 : 0) != 0).isTrue();
            TestLocalExchange.assertExchangeTotalBufferedBytes(exchange, 100);
        });
    }

    @Test
    public void testScaleWriter() {
        LocalExchange localExchange = new LocalExchange(this.nodePartitioningManager, SESSION, 3, SystemPartitioningHandle.SCALED_WRITER_ROUND_ROBIN_DISTRIBUTION, (List)ImmutableList.of(), (List)ImmutableList.of(), Optional.empty(), DataSize.ofBytes((long)TestLocalExchange.retainedSizeOfPages(4)), TYPE_OPERATORS, DataSize.ofBytes((long)TestLocalExchange.sizeOfPages(2)), TOTAL_MEMORY_USED);
        this.run(localExchange, exchange -> {
            Assertions.assertThat((int)exchange.getBufferCount()).isEqualTo(3);
            TestLocalExchange.assertExchangeTotalBufferedBytes(exchange, 0);
            LocalExchange.LocalExchangeSinkFactory sinkFactory = exchange.createSinkFactory();
            sinkFactory.noMoreSinkFactories();
            LocalExchangeSink sink = sinkFactory.createSink();
            TestLocalExchange.assertSinkCanWrite(sink);
            sinkFactory.close();
            LocalExchangeSource sourceA = exchange.getNextSource();
            TestLocalExchange.assertSource(sourceA, 0);
            LocalExchangeSource sourceB = exchange.getNextSource();
            TestLocalExchange.assertSource(sourceB, 0);
            LocalExchangeSource sourceC = exchange.getNextSource();
            TestLocalExchange.assertSource(sourceC, 0);
            sink.addPage(TestLocalExchange.createPage(0));
            sink.addPage(TestLocalExchange.createPage(0));
            Assertions.assertThat((int)sourceA.getBufferInfo().getBufferedPages()).isEqualTo(2);
            Assertions.assertThat((int)sourceB.getBufferInfo().getBufferedPages()).isEqualTo(0);
            Assertions.assertThat((int)sourceC.getBufferInfo().getBufferedPages()).isEqualTo(0);
            sink.addPage(TestLocalExchange.createPage(0));
            Assertions.assertThat((int)sourceA.getBufferInfo().getBufferedPages()).isEqualTo(2);
            Assertions.assertThat((int)sourceB.getBufferInfo().getBufferedPages()).isEqualTo(1);
            Assertions.assertThat((int)sourceC.getBufferInfo().getBufferedPages()).isEqualTo(0);
            TestLocalExchange.assertRemovePage(sourceA, TestLocalExchange.createPage(0));
            TestLocalExchange.assertRemovePage(sourceA, TestLocalExchange.createPage(0));
            sink.addPage(TestLocalExchange.createPage(0));
            sink.addPage(TestLocalExchange.createPage(0));
            sink.addPage(TestLocalExchange.createPage(0));
            Assertions.assertThat((int)sourceA.getBufferInfo().getBufferedPages()).isEqualTo(1);
            Assertions.assertThat((int)sourceB.getBufferInfo().getBufferedPages()).isEqualTo(2);
            Assertions.assertThat((int)sourceC.getBufferInfo().getBufferedPages()).isEqualTo(1);
        });
    }

    @Test
    public void testNoWriterScalingWhenOnlyBufferSizeLimitIsExceeded() {
        LocalExchange localExchange = new LocalExchange(this.nodePartitioningManager, SESSION, 3, SystemPartitioningHandle.SCALED_WRITER_ROUND_ROBIN_DISTRIBUTION, (List)ImmutableList.of(), (List)ImmutableList.of(), Optional.empty(), DataSize.ofBytes((long)TestLocalExchange.retainedSizeOfPages(4)), TYPE_OPERATORS, DataSize.ofBytes((long)TestLocalExchange.sizeOfPages(10)), TOTAL_MEMORY_USED);
        this.run(localExchange, exchange -> {
            Assertions.assertThat((int)exchange.getBufferCount()).isEqualTo(3);
            TestLocalExchange.assertExchangeTotalBufferedBytes(exchange, 0);
            LocalExchange.LocalExchangeSinkFactory sinkFactory = exchange.createSinkFactory();
            sinkFactory.noMoreSinkFactories();
            LocalExchangeSink sink = sinkFactory.createSink();
            TestLocalExchange.assertSinkCanWrite(sink);
            sinkFactory.close();
            LocalExchangeSource sourceA = exchange.getNextSource();
            TestLocalExchange.assertSource(sourceA, 0);
            LocalExchangeSource sourceB = exchange.getNextSource();
            TestLocalExchange.assertSource(sourceB, 0);
            LocalExchangeSource sourceC = exchange.getNextSource();
            TestLocalExchange.assertSource(sourceC, 0);
            IntStream.range(0, 6).forEach(i -> sink.addPage(TestLocalExchange.createPage(0)));
            Assertions.assertThat((int)sourceA.getBufferInfo().getBufferedPages()).isEqualTo(6);
            Assertions.assertThat((int)sourceB.getBufferInfo().getBufferedPages()).isEqualTo(0);
            Assertions.assertThat((int)sourceC.getBufferInfo().getBufferedPages()).isEqualTo(0);
        });
    }

    @Test
    public void testScalingWithTwoDifferentPartitions() {
        this.testScalingWithTwoDifferentPartitions(this.customScalingPartitioningHandle);
        this.testScalingWithTwoDifferentPartitions(SystemPartitioningHandle.SCALED_WRITER_HASH_DISTRIBUTION);
    }

    private void testScalingWithTwoDifferentPartitions(PartitioningHandle partitioningHandle) {
        LocalExchange localExchange = new LocalExchange(this.nodePartitioningManager, TestingSession.testSessionBuilder().setSystemProperty("skewed_partition_min_data_processed_rebalance_threshold", "20kB").setSystemProperty("query_max_memory_per_node", "256MB").build(), 4, partitioningHandle, (List)ImmutableList.of((Object)0), TYPES, Optional.empty(), DataSize.ofBytes((long)TestLocalExchange.retainedSizeOfPages(2)), TYPE_OPERATORS, DataSize.of((long)10L, (DataSize.Unit)DataSize.Unit.KILOBYTE), TOTAL_MEMORY_USED);
        this.run(localExchange, exchange -> {
            Assertions.assertThat((int)exchange.getBufferCount()).isEqualTo(4);
            TestLocalExchange.assertExchangeTotalBufferedBytes(exchange, 0);
            LocalExchange.LocalExchangeSinkFactory sinkFactory = exchange.createSinkFactory();
            sinkFactory.noMoreSinkFactories();
            LocalExchangeSink sink = sinkFactory.createSink();
            TestLocalExchange.assertSinkCanWrite(sink);
            sinkFactory.close();
            LocalExchangeSource sourceA = exchange.getNextSource();
            TestLocalExchange.assertSource(sourceA, 0);
            LocalExchangeSource sourceB = exchange.getNextSource();
            TestLocalExchange.assertSource(sourceB, 0);
            LocalExchangeSource sourceC = exchange.getNextSource();
            TestLocalExchange.assertSource(sourceC, 0);
            LocalExchangeSource sourceD = exchange.getNextSource();
            TestLocalExchange.assertSource(sourceD, 0);
            sink.addPage(TestLocalExchange.createSingleValuePage(0, 1000));
            sink.addPage(TestLocalExchange.createSingleValuePage(0, 1000));
            sink.addPage(TestLocalExchange.createSingleValuePage(1, 2));
            sink.addPage(TestLocalExchange.createSingleValuePage(1, 2));
            TestLocalExchange.assertSource(sourceA, 2);
            TestLocalExchange.assertSource(sourceB, 0);
            TestLocalExchange.assertSource(sourceC, 0);
            TestLocalExchange.assertSource(sourceD, 2);
            sink.addPage(TestLocalExchange.createSingleValuePage(0, 1000));
            sink.addPage(TestLocalExchange.createSingleValuePage(0, 1000));
            sink.addPage(TestLocalExchange.createSingleValuePage(0, 1000));
            sink.addPage(TestLocalExchange.createSingleValuePage(0, 1000));
            TestLocalExchange.assertSource(sourceA, 2);
            TestLocalExchange.assertSource(sourceB, 2);
            TestLocalExchange.assertSource(sourceC, 0);
            TestLocalExchange.assertSource(sourceD, 4);
            sink.addPage(TestLocalExchange.createSingleValuePage(0, 1000));
            sink.addPage(TestLocalExchange.createSingleValuePage(0, 1000));
            sink.addPage(TestLocalExchange.createSingleValuePage(0, 1000));
            sink.addPage(TestLocalExchange.createSingleValuePage(0, 1000));
            TestLocalExchange.assertSource(sourceA, 3);
            TestLocalExchange.assertSource(sourceB, 4);
            TestLocalExchange.assertSource(sourceC, 0);
            TestLocalExchange.assertSource(sourceD, 5);
            sink.addPage(TestLocalExchange.createSingleValuePage(0, 1000));
            sink.addPage(TestLocalExchange.createSingleValuePage(0, 1000));
            sink.addPage(TestLocalExchange.createSingleValuePage(0, 1000));
            sink.addPage(TestLocalExchange.createSingleValuePage(0, 1000));
            TestLocalExchange.assertSource(sourceA, 4);
            TestLocalExchange.assertSource(sourceB, 5);
            TestLocalExchange.assertSource(sourceC, 1);
            TestLocalExchange.assertSource(sourceD, 6);
            sink.addPage(TestLocalExchange.createSingleValuePage(1, 10000));
            sink.addPage(TestLocalExchange.createSingleValuePage(1, 10000));
            sink.addPage(TestLocalExchange.createSingleValuePage(1, 10000));
            sink.addPage(TestLocalExchange.createSingleValuePage(1, 10000));
            TestLocalExchange.assertSource(sourceA, 5);
            TestLocalExchange.assertSource(sourceB, 8);
            TestLocalExchange.assertSource(sourceC, 1);
            TestLocalExchange.assertSource(sourceD, 6);
            sink.addPage(TestLocalExchange.createSingleValuePage(1, 10000));
            sink.addPage(TestLocalExchange.createSingleValuePage(1, 10000));
            sink.addPage(TestLocalExchange.createSingleValuePage(1, 10000));
            sink.addPage(TestLocalExchange.createSingleValuePage(1, 10000));
            TestLocalExchange.assertSource(sourceA, 6);
            TestLocalExchange.assertSource(sourceB, 9);
            TestLocalExchange.assertSource(sourceC, 2);
            TestLocalExchange.assertSource(sourceD, 7);
        });
    }

    @Test
    public void testScaledWriterRoundRobinExchangerWhenTotalMemoryUsedIsGreaterThanLimit() {
        AtomicLong totalMemoryUsed = new AtomicLong();
        LocalExchange localExchange = new LocalExchange(this.nodePartitioningManager, TestingSession.testSessionBuilder().setSystemProperty("query_max_memory_per_node", "11MB").build(), 3, SystemPartitioningHandle.SCALED_WRITER_ROUND_ROBIN_DISTRIBUTION, (List)ImmutableList.of(), (List)ImmutableList.of(), Optional.empty(), DataSize.ofBytes((long)TestLocalExchange.retainedSizeOfPages(4)), TYPE_OPERATORS, DataSize.ofBytes((long)TestLocalExchange.sizeOfPages(2)), totalMemoryUsed::get);
        this.run(localExchange, exchange -> {
            Assertions.assertThat((int)exchange.getBufferCount()).isEqualTo(3);
            TestLocalExchange.assertExchangeTotalBufferedBytes(exchange, 0);
            LocalExchange.LocalExchangeSinkFactory sinkFactory = exchange.createSinkFactory();
            sinkFactory.noMoreSinkFactories();
            LocalExchangeSink sink = sinkFactory.createSink();
            TestLocalExchange.assertSinkCanWrite(sink);
            sinkFactory.close();
            LocalExchangeSource sourceA = exchange.getNextSource();
            TestLocalExchange.assertSource(sourceA, 0);
            LocalExchangeSource sourceB = exchange.getNextSource();
            TestLocalExchange.assertSource(sourceB, 0);
            LocalExchangeSource sourceC = exchange.getNextSource();
            TestLocalExchange.assertSource(sourceC, 0);
            totalMemoryUsed.set(DataSize.of((long)11L, (DataSize.Unit)DataSize.Unit.MEGABYTE).toBytes());
            IntStream.range(0, 6).forEach(i -> sink.addPage(TestLocalExchange.createPage(0)));
            Assertions.assertThat((int)sourceA.getBufferInfo().getBufferedPages()).isEqualTo(6);
            Assertions.assertThat((int)sourceB.getBufferInfo().getBufferedPages()).isEqualTo(0);
            Assertions.assertThat((int)sourceC.getBufferInfo().getBufferedPages()).isEqualTo(0);
        });
    }

    @Test
    public void testNoWriterScalingWhenOnlyWriterScalingMinDataProcessedLimitIsExceeded() {
        LocalExchange localExchange = new LocalExchange(this.nodePartitioningManager, SESSION, 3, SystemPartitioningHandle.SCALED_WRITER_ROUND_ROBIN_DISTRIBUTION, (List)ImmutableList.of(), (List)ImmutableList.of(), Optional.empty(), DataSize.ofBytes((long)TestLocalExchange.retainedSizeOfPages(20)), TYPE_OPERATORS, DataSize.ofBytes((long)TestLocalExchange.sizeOfPages(2)), TOTAL_MEMORY_USED);
        this.run(localExchange, exchange -> {
            Assertions.assertThat((int)exchange.getBufferCount()).isEqualTo(3);
            TestLocalExchange.assertExchangeTotalBufferedBytes(exchange, 0);
            LocalExchange.LocalExchangeSinkFactory sinkFactory = exchange.createSinkFactory();
            sinkFactory.noMoreSinkFactories();
            LocalExchangeSink sink = sinkFactory.createSink();
            TestLocalExchange.assertSinkCanWrite(sink);
            sinkFactory.close();
            AtomicLong physicalWrittenBytesA = new AtomicLong(0L);
            LocalExchangeSource sourceA = exchange.getNextSource();
            TestLocalExchange.assertSource(sourceA, 0);
            LocalExchangeSource sourceB = exchange.getNextSource();
            TestLocalExchange.assertSource(sourceB, 0);
            LocalExchangeSource sourceC = exchange.getNextSource();
            TestLocalExchange.assertSource(sourceC, 0);
            IntStream.range(0, 8).forEach(i -> sink.addPage(TestLocalExchange.createPage(0)));
            physicalWrittenBytesA.set(TestLocalExchange.retainedSizeOfPages(8));
            sink.addPage(TestLocalExchange.createPage(0));
            Assertions.assertThat((int)sourceA.getBufferInfo().getBufferedPages()).isEqualTo(9);
            Assertions.assertThat((int)sourceB.getBufferInfo().getBufferedPages()).isEqualTo(0);
            Assertions.assertThat((int)sourceC.getBufferInfo().getBufferedPages()).isEqualTo(0);
        });
    }

    @Test
    public void testScalingForSkewedWriters() {
        this.testScalingForSkewedWriters(this.customScalingPartitioningHandle);
        this.testScalingForSkewedWriters(SystemPartitioningHandle.SCALED_WRITER_HASH_DISTRIBUTION);
    }

    private void testScalingForSkewedWriters(PartitioningHandle partitioningHandle) {
        LocalExchange localExchange = new LocalExchange(this.nodePartitioningManager, TestingSession.testSessionBuilder().setSystemProperty("skewed_partition_min_data_processed_rebalance_threshold", "20kB").build(), 4, partitioningHandle, (List)ImmutableList.of((Object)0), TYPES, Optional.empty(), DataSize.ofBytes((long)TestLocalExchange.retainedSizeOfPages(2)), TYPE_OPERATORS, DataSize.of((long)10L, (DataSize.Unit)DataSize.Unit.KILOBYTE), TOTAL_MEMORY_USED);
        this.run(localExchange, exchange -> {
            Assertions.assertThat((int)exchange.getBufferCount()).isEqualTo(4);
            TestLocalExchange.assertExchangeTotalBufferedBytes(exchange, 0);
            LocalExchange.LocalExchangeSinkFactory sinkFactory = exchange.createSinkFactory();
            sinkFactory.noMoreSinkFactories();
            LocalExchangeSink sink = sinkFactory.createSink();
            TestLocalExchange.assertSinkCanWrite(sink);
            sinkFactory.close();
            LocalExchangeSource sourceA = exchange.getNextSource();
            TestLocalExchange.assertSource(sourceA, 0);
            LocalExchangeSource sourceB = exchange.getNextSource();
            TestLocalExchange.assertSource(sourceB, 0);
            LocalExchangeSource sourceC = exchange.getNextSource();
            TestLocalExchange.assertSource(sourceC, 0);
            LocalExchangeSource sourceD = exchange.getNextSource();
            TestLocalExchange.assertSource(sourceD, 0);
            sink.addPage(TestLocalExchange.createSingleValuePage(0, 1000));
            sink.addPage(TestLocalExchange.createSingleValuePage(0, 1000));
            sink.addPage(TestLocalExchange.createSingleValuePage(1, 2));
            sink.addPage(TestLocalExchange.createSingleValuePage(1, 2));
            TestLocalExchange.assertSource(sourceA, 2);
            TestLocalExchange.assertSource(sourceB, 0);
            TestLocalExchange.assertSource(sourceC, 0);
            TestLocalExchange.assertSource(sourceD, 2);
            sink.addPage(TestLocalExchange.createSingleValuePage(0, 1000));
            sink.addPage(TestLocalExchange.createSingleValuePage(0, 1000));
            sink.addPage(TestLocalExchange.createSingleValuePage(0, 1000));
            sink.addPage(TestLocalExchange.createSingleValuePage(0, 1000));
            TestLocalExchange.assertSource(sourceA, 2);
            TestLocalExchange.assertSource(sourceB, 2);
            TestLocalExchange.assertSource(sourceC, 0);
            TestLocalExchange.assertSource(sourceD, 4);
            sink.addPage(TestLocalExchange.createSingleValuePage(0, 1000));
            sink.addPage(TestLocalExchange.createSingleValuePage(0, 1000));
            sink.addPage(TestLocalExchange.createSingleValuePage(0, 1000));
            sink.addPage(TestLocalExchange.createSingleValuePage(0, 1000));
            TestLocalExchange.assertSource(sourceA, 3);
            TestLocalExchange.assertSource(sourceB, 4);
            TestLocalExchange.assertSource(sourceC, 0);
            TestLocalExchange.assertSource(sourceD, 5);
            sink.addPage(TestLocalExchange.createSingleValuePage(0, 1000));
            sink.addPage(TestLocalExchange.createSingleValuePage(0, 1000));
            sink.addPage(TestLocalExchange.createSingleValuePage(0, 1000));
            sink.addPage(TestLocalExchange.createSingleValuePage(0, 1000));
            sink.addPage(TestLocalExchange.createSingleValuePage(0, 1000));
            sink.addPage(TestLocalExchange.createSingleValuePage(0, 1000));
            TestLocalExchange.assertSource(sourceA, 5);
            TestLocalExchange.assertSource(sourceB, 5);
            TestLocalExchange.assertSource(sourceC, 2);
            TestLocalExchange.assertSource(sourceD, 6);
        });
    }

    @Test
    public void testNoScalingWhenDataWrittenIsLessThanMinFileSize() {
        this.testNoScalingWhenDataWrittenIsLessThanMinFileSize(this.customScalingPartitioningHandle);
        this.testNoScalingWhenDataWrittenIsLessThanMinFileSize(SystemPartitioningHandle.SCALED_WRITER_HASH_DISTRIBUTION);
    }

    private void testNoScalingWhenDataWrittenIsLessThanMinFileSize(PartitioningHandle partitioningHandle) {
        LocalExchange localExchange = new LocalExchange(this.nodePartitioningManager, TestingSession.testSessionBuilder().setSystemProperty("skewed_partition_min_data_processed_rebalance_threshold", "20kB").build(), 4, partitioningHandle, (List)ImmutableList.of((Object)0), TYPES, Optional.empty(), DataSize.ofBytes((long)TestLocalExchange.retainedSizeOfPages(2)), TYPE_OPERATORS, DataSize.of((long)50L, (DataSize.Unit)DataSize.Unit.MEGABYTE), TOTAL_MEMORY_USED);
        this.run(localExchange, exchange -> {
            Assertions.assertThat((int)exchange.getBufferCount()).isEqualTo(4);
            TestLocalExchange.assertExchangeTotalBufferedBytes(exchange, 0);
            LocalExchange.LocalExchangeSinkFactory sinkFactory = exchange.createSinkFactory();
            sinkFactory.noMoreSinkFactories();
            LocalExchangeSink sink = sinkFactory.createSink();
            TestLocalExchange.assertSinkCanWrite(sink);
            sinkFactory.close();
            LocalExchangeSource sourceA = exchange.getNextSource();
            TestLocalExchange.assertSource(sourceA, 0);
            LocalExchangeSource sourceB = exchange.getNextSource();
            TestLocalExchange.assertSource(sourceB, 0);
            LocalExchangeSource sourceC = exchange.getNextSource();
            TestLocalExchange.assertSource(sourceC, 0);
            LocalExchangeSource sourceD = exchange.getNextSource();
            TestLocalExchange.assertSource(sourceD, 0);
            sink.addPage(TestLocalExchange.createSingleValuePage(0, 1000));
            sink.addPage(TestLocalExchange.createSingleValuePage(0, 1000));
            sink.addPage(TestLocalExchange.createSingleValuePage(1, 2));
            sink.addPage(TestLocalExchange.createSingleValuePage(1, 2));
            TestLocalExchange.assertSource(sourceA, 2);
            TestLocalExchange.assertSource(sourceB, 0);
            TestLocalExchange.assertSource(sourceC, 0);
            TestLocalExchange.assertSource(sourceD, 2);
            sink.addPage(TestLocalExchange.createSingleValuePage(0, 1000));
            sink.addPage(TestLocalExchange.createSingleValuePage(0, 1000));
            sink.addPage(TestLocalExchange.createSingleValuePage(0, 1000));
            sink.addPage(TestLocalExchange.createSingleValuePage(0, 1000));
            TestLocalExchange.assertSource(sourceA, 2);
            TestLocalExchange.assertSource(sourceB, 0);
            TestLocalExchange.assertSource(sourceC, 0);
            TestLocalExchange.assertSource(sourceD, 6);
        });
    }

    @Test
    public void testNoScalingWhenBufferUtilizationIsLessThanLimit() {
        this.testNoScalingWhenBufferUtilizationIsLessThanLimit(this.customScalingPartitioningHandle);
        this.testNoScalingWhenBufferUtilizationIsLessThanLimit(SystemPartitioningHandle.SCALED_WRITER_HASH_DISTRIBUTION);
    }

    private void testNoScalingWhenBufferUtilizationIsLessThanLimit(PartitioningHandle partitioningHandle) {
        LocalExchange localExchange = new LocalExchange(this.nodePartitioningManager, TestingSession.testSessionBuilder().setSystemProperty("skewed_partition_min_data_processed_rebalance_threshold", "20kB").build(), 4, partitioningHandle, (List)ImmutableList.of((Object)0), TYPES, Optional.empty(), DataSize.of((long)50L, (DataSize.Unit)DataSize.Unit.MEGABYTE), TYPE_OPERATORS, DataSize.of((long)10L, (DataSize.Unit)DataSize.Unit.KILOBYTE), TOTAL_MEMORY_USED);
        this.run(localExchange, exchange -> {
            Assertions.assertThat((int)exchange.getBufferCount()).isEqualTo(4);
            TestLocalExchange.assertExchangeTotalBufferedBytes(exchange, 0);
            LocalExchange.LocalExchangeSinkFactory sinkFactory = exchange.createSinkFactory();
            sinkFactory.noMoreSinkFactories();
            LocalExchangeSink sink = sinkFactory.createSink();
            TestLocalExchange.assertSinkCanWrite(sink);
            sinkFactory.close();
            LocalExchangeSource sourceA = exchange.getNextSource();
            TestLocalExchange.assertSource(sourceA, 0);
            LocalExchangeSource sourceB = exchange.getNextSource();
            TestLocalExchange.assertSource(sourceB, 0);
            LocalExchangeSource sourceC = exchange.getNextSource();
            TestLocalExchange.assertSource(sourceC, 0);
            LocalExchangeSource sourceD = exchange.getNextSource();
            TestLocalExchange.assertSource(sourceD, 0);
            sink.addPage(TestLocalExchange.createSingleValuePage(0, 1000));
            sink.addPage(TestLocalExchange.createSingleValuePage(0, 1000));
            sink.addPage(TestLocalExchange.createSingleValuePage(1, 2));
            sink.addPage(TestLocalExchange.createSingleValuePage(1, 2));
            TestLocalExchange.assertSource(sourceA, 2);
            TestLocalExchange.assertSource(sourceB, 0);
            TestLocalExchange.assertSource(sourceC, 0);
            TestLocalExchange.assertSource(sourceD, 2);
            sink.addPage(TestLocalExchange.createSingleValuePage(0, 1000));
            sink.addPage(TestLocalExchange.createSingleValuePage(0, 1000));
            sink.addPage(TestLocalExchange.createSingleValuePage(0, 1000));
            sink.addPage(TestLocalExchange.createSingleValuePage(0, 1000));
            TestLocalExchange.assertSource(sourceA, 2);
            TestLocalExchange.assertSource(sourceB, 0);
            TestLocalExchange.assertSource(sourceC, 0);
            TestLocalExchange.assertSource(sourceD, 6);
        });
    }

    @Test
    public void testNoScalingWhenTotalMemoryUsedIsGreaterThanLimit() {
        this.testNoScalingWhenTotalMemoryUsedIsGreaterThanLimit(this.customScalingPartitioningHandle);
        this.testNoScalingWhenTotalMemoryUsedIsGreaterThanLimit(SystemPartitioningHandle.SCALED_WRITER_HASH_DISTRIBUTION);
    }

    private void testNoScalingWhenTotalMemoryUsedIsGreaterThanLimit(PartitioningHandle partitioningHandle) {
        AtomicLong totalMemoryUsed = new AtomicLong();
        LocalExchange localExchange = new LocalExchange(this.nodePartitioningManager, TestingSession.testSessionBuilder().setSystemProperty("skewed_partition_min_data_processed_rebalance_threshold", "20kB").setSystemProperty("query_max_memory_per_node", "20MB").build(), 4, partitioningHandle, (List)ImmutableList.of((Object)0), TYPES, Optional.empty(), DataSize.ofBytes((long)TestLocalExchange.retainedSizeOfPages(2)), TYPE_OPERATORS, DataSize.of((long)10L, (DataSize.Unit)DataSize.Unit.KILOBYTE), totalMemoryUsed::get);
        this.run(localExchange, exchange -> {
            Assertions.assertThat((int)exchange.getBufferCount()).isEqualTo(4);
            TestLocalExchange.assertExchangeTotalBufferedBytes(exchange, 0);
            LocalExchange.LocalExchangeSinkFactory sinkFactory = exchange.createSinkFactory();
            sinkFactory.noMoreSinkFactories();
            LocalExchangeSink sink = sinkFactory.createSink();
            TestLocalExchange.assertSinkCanWrite(sink);
            sinkFactory.close();
            LocalExchangeSource sourceA = exchange.getNextSource();
            TestLocalExchange.assertSource(sourceA, 0);
            LocalExchangeSource sourceB = exchange.getNextSource();
            TestLocalExchange.assertSource(sourceB, 0);
            LocalExchangeSource sourceC = exchange.getNextSource();
            TestLocalExchange.assertSource(sourceC, 0);
            LocalExchangeSource sourceD = exchange.getNextSource();
            TestLocalExchange.assertSource(sourceD, 0);
            sink.addPage(TestLocalExchange.createSingleValuePage(0, 1000));
            sink.addPage(TestLocalExchange.createSingleValuePage(0, 1000));
            sink.addPage(TestLocalExchange.createSingleValuePage(1, 2));
            sink.addPage(TestLocalExchange.createSingleValuePage(1, 2));
            TestLocalExchange.assertSource(sourceA, 2);
            TestLocalExchange.assertSource(sourceB, 0);
            TestLocalExchange.assertSource(sourceC, 0);
            TestLocalExchange.assertSource(sourceD, 2);
            totalMemoryUsed.set(DataSize.of((long)5L, (DataSize.Unit)DataSize.Unit.MEGABYTE).toBytes());
            sink.addPage(TestLocalExchange.createSingleValuePage(0, 1000));
            sink.addPage(TestLocalExchange.createSingleValuePage(0, 1000));
            sink.addPage(TestLocalExchange.createSingleValuePage(0, 1000));
            sink.addPage(TestLocalExchange.createSingleValuePage(0, 1000));
            TestLocalExchange.assertSource(sourceA, 2);
            TestLocalExchange.assertSource(sourceB, 2);
            TestLocalExchange.assertSource(sourceC, 0);
            TestLocalExchange.assertSource(sourceD, 4);
            totalMemoryUsed.set(DataSize.of((long)15L, (DataSize.Unit)DataSize.Unit.MEGABYTE).toBytes());
            sink.addPage(TestLocalExchange.createSingleValuePage(0, 1000));
            sink.addPage(TestLocalExchange.createSingleValuePage(0, 1000));
            sink.addPage(TestLocalExchange.createSingleValuePage(0, 1000));
            sink.addPage(TestLocalExchange.createSingleValuePage(0, 1000));
            TestLocalExchange.assertSource(sourceA, 2);
            TestLocalExchange.assertSource(sourceB, 4);
            TestLocalExchange.assertSource(sourceC, 0);
            TestLocalExchange.assertSource(sourceD, 6);
        });
    }

    @Test
    public void testDoNotUpdateScalingStateWhenMemoryIsAboveLimit() {
        this.testDoNotUpdateScalingStateWhenMemoryIsAboveLimit(this.customScalingPartitioningHandle);
        this.testDoNotUpdateScalingStateWhenMemoryIsAboveLimit(SystemPartitioningHandle.SCALED_WRITER_HASH_DISTRIBUTION);
    }

    private void testDoNotUpdateScalingStateWhenMemoryIsAboveLimit(PartitioningHandle partitioningHandle) {
        AtomicLong totalMemoryUsed = new AtomicLong();
        LocalExchange localExchange = new LocalExchange(this.nodePartitioningManager, TestingSession.testSessionBuilder().setSystemProperty("skewed_partition_min_data_processed_rebalance_threshold", "20kB").setSystemProperty("query_max_memory_per_node", "20MB").build(), 4, partitioningHandle, (List)ImmutableList.of((Object)0), TYPES, Optional.empty(), DataSize.ofBytes((long)TestLocalExchange.retainedSizeOfPages(2)), TYPE_OPERATORS, DataSize.of((long)10L, (DataSize.Unit)DataSize.Unit.KILOBYTE), totalMemoryUsed::get);
        this.run(localExchange, exchange -> {
            Assertions.assertThat((int)exchange.getBufferCount()).isEqualTo(4);
            TestLocalExchange.assertExchangeTotalBufferedBytes(exchange, 0);
            LocalExchange.LocalExchangeSinkFactory sinkFactory = exchange.createSinkFactory();
            sinkFactory.noMoreSinkFactories();
            LocalExchangeSink sink = sinkFactory.createSink();
            TestLocalExchange.assertSinkCanWrite(sink);
            sinkFactory.close();
            LocalExchangeSource sourceA = exchange.getNextSource();
            TestLocalExchange.assertSource(sourceA, 0);
            LocalExchangeSource sourceB = exchange.getNextSource();
            TestLocalExchange.assertSource(sourceB, 0);
            LocalExchangeSource sourceC = exchange.getNextSource();
            TestLocalExchange.assertSource(sourceC, 0);
            LocalExchangeSource sourceD = exchange.getNextSource();
            TestLocalExchange.assertSource(sourceD, 0);
            sink.addPage(TestLocalExchange.createSingleValuePage(0, 1000));
            sink.addPage(TestLocalExchange.createSingleValuePage(0, 1000));
            sink.addPage(TestLocalExchange.createSingleValuePage(1, 2));
            sink.addPage(TestLocalExchange.createSingleValuePage(1, 2));
            TestLocalExchange.assertSource(sourceA, 2);
            TestLocalExchange.assertSource(sourceB, 0);
            TestLocalExchange.assertSource(sourceC, 0);
            TestLocalExchange.assertSource(sourceD, 2);
            totalMemoryUsed.set(DataSize.of((long)5L, (DataSize.Unit)DataSize.Unit.MEGABYTE).toBytes());
            sink.addPage(TestLocalExchange.createSingleValuePage(0, 1000));
            sink.addPage(TestLocalExchange.createSingleValuePage(0, 1000));
            sink.addPage(TestLocalExchange.createSingleValuePage(0, 1000));
            sink.addPage(TestLocalExchange.createSingleValuePage(0, 1000));
            TestLocalExchange.assertSource(sourceA, 2);
            TestLocalExchange.assertSource(sourceB, 2);
            TestLocalExchange.assertSource(sourceC, 0);
            TestLocalExchange.assertSource(sourceD, 4);
            totalMemoryUsed.set(DataSize.of((long)15L, (DataSize.Unit)DataSize.Unit.MEGABYTE).toBytes());
            sink.addPage(TestLocalExchange.createSingleValuePage(0, 1000));
            sink.addPage(TestLocalExchange.createSingleValuePage(0, 1000));
            sink.addPage(TestLocalExchange.createSingleValuePage(0, 1000));
            sink.addPage(TestLocalExchange.createSingleValuePage(0, 1000));
            TestLocalExchange.assertSource(sourceA, 2);
            TestLocalExchange.assertSource(sourceB, 4);
            TestLocalExchange.assertSource(sourceC, 0);
            TestLocalExchange.assertSource(sourceD, 6);
            totalMemoryUsed.set(DataSize.of((long)13L, (DataSize.Unit)DataSize.Unit.MEGABYTE).toBytes());
            sink.addPage(TestLocalExchange.createSingleValuePage(0, 10));
            sink.addPage(TestLocalExchange.createSingleValuePage(0, 10));
            sink.addPage(TestLocalExchange.createSingleValuePage(0, 10));
            sink.addPage(TestLocalExchange.createSingleValuePage(0, 10));
            TestLocalExchange.assertSource(sourceA, 3);
            TestLocalExchange.assertSource(sourceB, 6);
            TestLocalExchange.assertSource(sourceC, 0);
            TestLocalExchange.assertSource(sourceD, 7);
        });
    }

    @Test
    public void testNoScalingWhenNoWriterSkewness() {
        LocalExchange localExchange = new LocalExchange(this.nodePartitioningManager, TestingSession.testSessionBuilder().setSystemProperty("skewed_partition_min_data_processed_rebalance_threshold", "20kB").build(), 2, SystemPartitioningHandle.SCALED_WRITER_HASH_DISTRIBUTION, (List)ImmutableList.of((Object)0), TYPES, Optional.empty(), DataSize.ofBytes((long)TestLocalExchange.retainedSizeOfPages(2)), TYPE_OPERATORS, DataSize.of((long)50L, (DataSize.Unit)DataSize.Unit.KILOBYTE), TOTAL_MEMORY_USED);
        this.run(localExchange, exchange -> {
            Assertions.assertThat((int)exchange.getBufferCount()).isEqualTo(2);
            TestLocalExchange.assertExchangeTotalBufferedBytes(exchange, 0);
            LocalExchange.LocalExchangeSinkFactory sinkFactory = exchange.createSinkFactory();
            sinkFactory.noMoreSinkFactories();
            LocalExchangeSink sink = sinkFactory.createSink();
            TestLocalExchange.assertSinkCanWrite(sink);
            sinkFactory.close();
            LocalExchangeSource sourceA = exchange.getNextSource();
            TestLocalExchange.assertSource(sourceA, 0);
            LocalExchangeSource sourceB = exchange.getNextSource();
            TestLocalExchange.assertSource(sourceB, 0);
            sink.addPage(TestLocalExchange.createSingleValuePage(0, 1000));
            sink.addPage(TestLocalExchange.createSingleValuePage(1, 1000));
            TestLocalExchange.assertSource(sourceA, 1);
            TestLocalExchange.assertSource(sourceB, 1);
            sink.addPage(TestLocalExchange.createSingleValuePage(0, 1000));
            sink.addPage(TestLocalExchange.createSingleValuePage(1, 1000));
            TestLocalExchange.assertSource(sourceA, 2);
            TestLocalExchange.assertSource(sourceB, 2);
        });
    }

    @Test
    public void testPassthrough() {
        LocalExchange localExchange = new LocalExchange(this.nodePartitioningManager, SESSION, 2, SystemPartitioningHandle.FIXED_PASSTHROUGH_DISTRIBUTION, (List)ImmutableList.of(), (List)ImmutableList.of(), Optional.empty(), DataSize.ofBytes((long)TestLocalExchange.retainedSizeOfPages(1)), TYPE_OPERATORS, WRITER_SCALING_MIN_DATA_PROCESSED, TOTAL_MEMORY_USED);
        this.run(localExchange, exchange -> {
            Assertions.assertThat((int)exchange.getBufferCount()).isEqualTo(2);
            TestLocalExchange.assertExchangeTotalBufferedBytes(exchange, 0);
            LocalExchange.LocalExchangeSinkFactory sinkFactory = exchange.createSinkFactory();
            sinkFactory.noMoreSinkFactories();
            LocalExchangeSink sinkA = sinkFactory.createSink();
            LocalExchangeSink sinkB = sinkFactory.createSink();
            TestLocalExchange.assertSinkCanWrite(sinkA);
            TestLocalExchange.assertSinkCanWrite(sinkB);
            sinkFactory.close();
            LocalExchangeSource sourceA = exchange.getNextSource();
            TestLocalExchange.assertSource(sourceA, 0);
            LocalExchangeSource sourceB = exchange.getNextSource();
            TestLocalExchange.assertSource(sourceB, 0);
            sinkA.addPage(TestLocalExchange.createPage(0));
            TestLocalExchange.assertSource(sourceA, 1);
            TestLocalExchange.assertSource(sourceB, 0);
            TestLocalExchange.assertSinkWriteBlocked(sinkA);
            TestLocalExchange.assertSinkCanWrite(sinkB);
            sinkB.addPage(TestLocalExchange.createPage(1));
            TestLocalExchange.assertSource(sourceA, 1);
            TestLocalExchange.assertSource(sourceB, 1);
            TestLocalExchange.assertSinkWriteBlocked(sinkA);
            TestLocalExchange.assertExchangeTotalBufferedBytes(exchange, 2);
            TestLocalExchange.assertRemovePage(sourceA, TestLocalExchange.createPage(0));
            TestLocalExchange.assertSource(sourceA, 0);
            TestLocalExchange.assertSinkCanWrite(sinkA);
            TestLocalExchange.assertSinkWriteBlocked(sinkB);
            TestLocalExchange.assertExchangeTotalBufferedBytes(exchange, 1);
            sinkA.finish();
            TestLocalExchange.assertSinkFinished(sinkA);
            TestLocalExchange.assertSource(sourceB, 1);
            sourceA.finish();
            sourceB.finish();
            TestLocalExchange.assertRemovePage(sourceB, TestLocalExchange.createPage(1));
            TestLocalExchange.assertSourceFinished(sourceA);
            TestLocalExchange.assertSourceFinished(sourceB);
            TestLocalExchange.assertSinkFinished(sinkB);
            TestLocalExchange.assertExchangeTotalBufferedBytes(exchange, 0);
        });
    }

    @Test
    public void testPartition() {
        LocalExchange localExchange = new LocalExchange(this.nodePartitioningManager, SESSION, 2, SystemPartitioningHandle.FIXED_HASH_DISTRIBUTION, (List)ImmutableList.of((Object)0), TYPES, Optional.empty(), LOCAL_EXCHANGE_MAX_BUFFERED_BYTES, TYPE_OPERATORS, WRITER_SCALING_MIN_DATA_PROCESSED, TOTAL_MEMORY_USED);
        this.run(localExchange, exchange -> {
            Assertions.assertThat((int)exchange.getBufferCount()).isEqualTo(2);
            TestLocalExchange.assertExchangeTotalBufferedBytes(exchange, 0);
            LocalExchange.LocalExchangeSinkFactory sinkFactory = exchange.createSinkFactory();
            sinkFactory.noMoreSinkFactories();
            LocalExchangeSink sink = sinkFactory.createSink();
            TestLocalExchange.assertSinkCanWrite(sink);
            sinkFactory.close();
            LocalExchangeSource sourceA = exchange.getNextSource();
            TestLocalExchange.assertSource(sourceA, 0);
            LocalExchangeSource sourceB = exchange.getNextSource();
            TestLocalExchange.assertSource(sourceB, 0);
            sink.addPage(TestLocalExchange.createPage(0));
            TestLocalExchange.assertSource(sourceA, 1);
            TestLocalExchange.assertSource(sourceB, 1);
            Assertions.assertThat((sourceA.getBufferInfo().getBufferedBytes() + sourceB.getBufferInfo().getBufferedBytes() >= TestLocalExchange.retainedSizeOfPages(1) ? 1 : 0) != 0).isTrue();
            sink.addPage(TestLocalExchange.createPage(0));
            TestLocalExchange.assertSource(sourceA, 2);
            TestLocalExchange.assertSource(sourceB, 2);
            Assertions.assertThat((sourceA.getBufferInfo().getBufferedBytes() + sourceB.getBufferInfo().getBufferedBytes() >= TestLocalExchange.retainedSizeOfPages(2) ? 1 : 0) != 0).isTrue();
            TestLocalExchange.assertPartitionedRemovePage(sourceA, 0, 2);
            TestLocalExchange.assertSource(sourceA, 1);
            TestLocalExchange.assertSource(sourceB, 2);
            TestLocalExchange.assertPartitionedRemovePage(sourceA, 0, 2);
            TestLocalExchange.assertSource(sourceA, 0);
            TestLocalExchange.assertSource(sourceB, 2);
            sink.finish();
            TestLocalExchange.assertSinkFinished(sink);
            TestLocalExchange.assertSourceFinished(sourceA);
            TestLocalExchange.assertSource(sourceB, 2);
            TestLocalExchange.assertPartitionedRemovePage(sourceB, 1, 2);
            TestLocalExchange.assertSourceFinished(sourceA);
            TestLocalExchange.assertSource(sourceB, 1);
            TestLocalExchange.assertPartitionedRemovePage(sourceB, 1, 2);
            TestLocalExchange.assertSourceFinished(sourceA);
            TestLocalExchange.assertSourceFinished(sourceB);
            TestLocalExchange.assertExchangeTotalBufferedBytes(exchange, 0);
        });
    }

    @Test
    public void testPartitionCustomPartitioning() {
        ConnectorPartitioningHandle connectorPartitioningHandle = new ConnectorPartitioningHandle(){};
        ConnectorNodePartitioningProvider connectorNodePartitioningProvider = new ConnectorNodePartitioningProvider(){

            public Optional<ConnectorBucketNodeMap> getBucketNodeMapping(ConnectorTransactionHandle transactionHandle, ConnectorSession session, ConnectorPartitioningHandle partitioningHandle) {
                return Optional.of(ConnectorBucketNodeMap.createBucketNodeMap((int)2));
            }

            public BucketFunction getBucketFunction(ConnectorTransactionHandle transactionHandle, ConnectorSession session, ConnectorPartitioningHandle partitioningHandle, List<Type> partitionChannelTypes, int bucketCount) {
                return (page, position) -> {
                    long rowValue = BigintType.BIGINT.getLong(page.getBlock(0), position);
                    if (rowValue == 42L) {
                        return 0;
                    }
                    return 1;
                };
            }
        };
        ImmutableList types = ImmutableList.of((Object)VarcharType.VARCHAR, (Object)BigintType.BIGINT);
        this.partitionManagers.put(TestingHandles.TEST_CATALOG_HANDLE, connectorNodePartitioningProvider);
        PartitioningHandle partitioningHandle = new PartitioningHandle(Optional.of(TestingHandles.TEST_CATALOG_HANDLE), Optional.of(TestingTransactionHandle.create()), connectorPartitioningHandle);
        LocalExchange localExchange = new LocalExchange(this.nodePartitioningManager, SESSION, 2, partitioningHandle, (List)ImmutableList.of((Object)1), (List)ImmutableList.of((Object)BigintType.BIGINT), Optional.empty(), LOCAL_EXCHANGE_MAX_BUFFERED_BYTES, TYPE_OPERATORS, WRITER_SCALING_MIN_DATA_PROCESSED, TOTAL_MEMORY_USED);
        this.run(localExchange, arg_0 -> TestLocalExchange.lambda$testPartitionCustomPartitioning$20((List)types, arg_0));
    }

    @Test
    public void writeUnblockWhenAllReadersFinish() {
        LocalExchange localExchange = new LocalExchange(this.nodePartitioningManager, SESSION, 2, SystemPartitioningHandle.FIXED_ARBITRARY_DISTRIBUTION, (List)ImmutableList.of(), (List)ImmutableList.of(), Optional.empty(), LOCAL_EXCHANGE_MAX_BUFFERED_BYTES, TYPE_OPERATORS, WRITER_SCALING_MIN_DATA_PROCESSED, TOTAL_MEMORY_USED);
        this.run(localExchange, exchange -> {
            Assertions.assertThat((int)exchange.getBufferCount()).isEqualTo(2);
            TestLocalExchange.assertExchangeTotalBufferedBytes(exchange, 0);
            LocalExchange.LocalExchangeSinkFactory sinkFactory = exchange.createSinkFactory();
            sinkFactory.noMoreSinkFactories();
            LocalExchangeSink sinkA = sinkFactory.createSink();
            TestLocalExchange.assertSinkCanWrite(sinkA);
            LocalExchangeSink sinkB = sinkFactory.createSink();
            TestLocalExchange.assertSinkCanWrite(sinkB);
            sinkFactory.close();
            LocalExchangeSource sourceA = exchange.getNextSource();
            TestLocalExchange.assertSource(sourceA, 0);
            LocalExchangeSource sourceB = exchange.getNextSource();
            TestLocalExchange.assertSource(sourceB, 0);
            sourceA.finish();
            TestLocalExchange.assertSourceFinished(sourceA);
            TestLocalExchange.assertSinkCanWrite(sinkA);
            TestLocalExchange.assertSinkCanWrite(sinkB);
            sourceB.finish();
            TestLocalExchange.assertSourceFinished(sourceB);
            TestLocalExchange.assertSinkFinished(sinkA);
            TestLocalExchange.assertSinkFinished(sinkB);
        });
    }

    @Test
    public void writeUnblockWhenAllReadersFinishAndPagesConsumed() {
        LocalExchange localExchange = new LocalExchange(this.nodePartitioningManager, SESSION, 2, SystemPartitioningHandle.FIXED_PASSTHROUGH_DISTRIBUTION, (List)ImmutableList.of(), (List)ImmutableList.of(), Optional.empty(), DataSize.ofBytes((long)2L), TYPE_OPERATORS, WRITER_SCALING_MIN_DATA_PROCESSED, TOTAL_MEMORY_USED);
        this.run(localExchange, exchange -> {
            Assertions.assertThat((int)exchange.getBufferCount()).isEqualTo(2);
            TestLocalExchange.assertExchangeTotalBufferedBytes(exchange, 0);
            LocalExchange.LocalExchangeSinkFactory sinkFactory = exchange.createSinkFactory();
            sinkFactory.noMoreSinkFactories();
            LocalExchangeSink sinkA = sinkFactory.createSink();
            TestLocalExchange.assertSinkCanWrite(sinkA);
            ListenableFuture sinkAFinished = sinkA.isFinished();
            Assertions.assertThat((boolean)sinkAFinished.isDone()).isFalse();
            LocalExchangeSink sinkB = sinkFactory.createSink();
            TestLocalExchange.assertSinkCanWrite(sinkB);
            ListenableFuture sinkBFinished = sinkB.isFinished();
            Assertions.assertThat((boolean)sinkBFinished.isDone()).isFalse();
            sinkFactory.close();
            LocalExchangeSource sourceA = exchange.getNextSource();
            TestLocalExchange.assertSource(sourceA, 0);
            LocalExchangeSource sourceB = exchange.getNextSource();
            TestLocalExchange.assertSource(sourceB, 0);
            sinkA.addPage(TestLocalExchange.createPage(0));
            ListenableFuture<Void> sinkAFuture = TestLocalExchange.assertSinkWriteBlocked(sinkA);
            sinkB.addPage(TestLocalExchange.createPage(0));
            ListenableFuture<Void> sinkBFuture = TestLocalExchange.assertSinkWriteBlocked(sinkB);
            TestLocalExchange.assertSource(sourceA, 1);
            TestLocalExchange.assertSource(sourceB, 1);
            TestLocalExchange.assertExchangeTotalBufferedBytes(exchange, 2);
            sourceA.finish();
            TestLocalExchange.assertSource(sourceA, 1);
            TestLocalExchange.assertRemovePage(sourceA, TestLocalExchange.createPage(0));
            TestLocalExchange.assertSourceFinished(sourceA);
            TestLocalExchange.assertExchangeTotalBufferedBytes(exchange, 1);
            TestLocalExchange.assertSource(sourceB, 1);
            TestLocalExchange.assertSinkWriteBlocked(sinkB);
            sourceB.finish();
            TestLocalExchange.assertSource(sourceB, 1);
            TestLocalExchange.assertRemovePage(sourceB, TestLocalExchange.createPage(0));
            TestLocalExchange.assertSourceFinished(sourceB);
            TestLocalExchange.assertExchangeTotalBufferedBytes(exchange, 0);
            Assertions.assertThat((boolean)sinkAFuture.isDone()).isTrue();
            Assertions.assertThat((boolean)sinkBFuture.isDone()).isTrue();
            Assertions.assertThat((boolean)sinkAFinished.isDone()).isTrue();
            Assertions.assertThat((boolean)sinkBFinished.isDone()).isTrue();
            TestLocalExchange.assertSinkFinished(sinkA);
            TestLocalExchange.assertSinkFinished(sinkB);
        });
    }

    private PartitioningHandle getCustomScalingPartitioningHandle() {
        ConnectorPartitioningHandle connectorPartitioningHandle = new ConnectorPartitioningHandle(){};
        ConnectorNodePartitioningProvider connectorNodePartitioningProvider = new ConnectorNodePartitioningProvider(){

            public Optional<ConnectorBucketNodeMap> getBucketNodeMapping(ConnectorTransactionHandle transactionHandle, ConnectorSession session, ConnectorPartitioningHandle partitioningHandle) {
                return Optional.of(ConnectorBucketNodeMap.createBucketNodeMap((int)4));
            }

            public BucketFunction getBucketFunction(ConnectorTransactionHandle transactionHandle, ConnectorSession session, ConnectorPartitioningHandle partitioningHandle, List<Type> partitionChannelTypes, int bucketCount) {
                return (page, position) -> {
                    long rowValue = BigintType.BIGINT.getLong(page.getBlock(0), position);
                    if (rowValue == 0L) {
                        return 2;
                    }
                    return 1;
                };
            }
        };
        this.partitionManagers.put(TestingHandles.TEST_CATALOG_HANDLE, connectorNodePartitioningProvider);
        return new PartitioningHandle(Optional.of(TestingHandles.TEST_CATALOG_HANDLE), Optional.of(TestingTransactionHandle.create()), connectorPartitioningHandle, true);
    }

    private void run(LocalExchange localExchange, Consumer<LocalExchange> test) {
        test.accept(localExchange);
    }

    private static void assertSource(LocalExchangeSource source, int pageCount) {
        LocalExchangeBufferInfo bufferInfo = source.getBufferInfo();
        Assertions.assertThat((int)bufferInfo.getBufferedPages()).isEqualTo(pageCount);
        Assertions.assertThat((boolean)source.isFinished()).isFalse();
        if (pageCount == 0) {
            Assertions.assertThat((boolean)source.waitForReading().isDone()).isFalse();
            Assertions.assertThat((Object)source.removePage()).isNull();
            Assertions.assertThat((boolean)source.waitForReading().isDone()).isFalse();
            Assertions.assertThat((boolean)source.isFinished()).isFalse();
            Assertions.assertThat((long)bufferInfo.getBufferedBytes()).isEqualTo(0L);
        } else {
            Assertions.assertThat((boolean)source.waitForReading().isDone()).isTrue();
            Assertions.assertThat((bufferInfo.getBufferedBytes() > 0L ? 1 : 0) != 0).isTrue();
        }
    }

    private static void assertSourceFinished(LocalExchangeSource source) {
        Assertions.assertThat((boolean)source.isFinished()).isTrue();
        LocalExchangeBufferInfo bufferInfo = source.getBufferInfo();
        Assertions.assertThat((int)bufferInfo.getBufferedPages()).isEqualTo(0);
        Assertions.assertThat((long)bufferInfo.getBufferedBytes()).isEqualTo(0L);
        Assertions.assertThat((boolean)source.waitForReading().isDone()).isTrue();
        Assertions.assertThat((Object)source.removePage()).isNull();
        Assertions.assertThat((boolean)source.waitForReading().isDone()).isTrue();
        Assertions.assertThat((boolean)source.isFinished()).isTrue();
    }

    private static void assertRemovePage(LocalExchangeSource source, Page expectedPage) {
        TestLocalExchange.assertRemovePage(TYPES, source, expectedPage);
    }

    private static void assertRemovePage(List<Type> types, LocalExchangeSource source, Page expectedPage) {
        Assertions.assertThat((boolean)source.waitForReading().isDone()).isTrue();
        Page actualPage = source.removePage();
        Assertions.assertThat((Object)actualPage).isNotNull();
        Assertions.assertThat((int)actualPage.getChannelCount()).isEqualTo(expectedPage.getChannelCount());
        PageAssertions.assertPageEquals(types, actualPage, expectedPage);
    }

    private static void assertPartitionedRemovePage(LocalExchangeSource source, int partition, int partitionCount) {
        Assertions.assertThat((boolean)source.waitForReading().isDone()).isTrue();
        Page page = source.removePage();
        Assertions.assertThat((Object)page).isNotNull();
        LocalPartitionGenerator partitionGenerator = new LocalPartitionGenerator((HashGenerator)InterpretedHashGenerator.createChannelsHashGenerator(TYPES, (int[])new int[]{0}, (TypeOperators)TYPE_OPERATORS), partitionCount);
        for (int position = 0; position < page.getPositionCount(); ++position) {
            Assertions.assertThat((int)partitionGenerator.getPartition(page, position)).isEqualTo(partition);
        }
    }

    private static void assertSinkCanWrite(LocalExchangeSink sink) {
        Assertions.assertThat((boolean)sink.isFinished().isDone()).isFalse();
        Assertions.assertThat((boolean)sink.waitForWriting().isDone()).isTrue();
    }

    private static ListenableFuture<Void> assertSinkWriteBlocked(LocalExchangeSink sink) {
        Assertions.assertThat((boolean)sink.isFinished().isDone()).isFalse();
        ListenableFuture writeFuture = sink.waitForWriting();
        Assertions.assertThat((boolean)writeFuture.isDone()).isFalse();
        return writeFuture;
    }

    private static void assertSinkFinished(LocalExchangeSink sink) {
        Assertions.assertThat((boolean)sink.isFinished().isDone()).isTrue();
        Assertions.assertThat((boolean)sink.waitForWriting().isDone()).isTrue();
        sink.addPage(TestLocalExchange.createPage(0));
        Assertions.assertThat((boolean)sink.isFinished().isDone()).isTrue();
        Assertions.assertThat((boolean)sink.waitForWriting().isDone()).isTrue();
    }

    private static void assertExchangeTotalBufferedBytes(LocalExchange exchange, int pageCount) {
        long bufferedBytes = 0L;
        for (int i = 0; i < exchange.getBufferCount(); ++i) {
            bufferedBytes += exchange.getSource(i).getBufferInfo().getBufferedBytes();
        }
        Assertions.assertThat((long)bufferedBytes).isEqualTo(TestLocalExchange.retainedSizeOfPages(pageCount));
    }

    private static Page createPage(int i) {
        return SequencePageBuilder.createSequencePage(TYPES, 100, i);
    }

    private static Page createSingleValuePage(int value, int length) {
        List values = (List)IntStream.range(0, length).mapToObj(i -> value).collect(ImmutableList.toImmutableList());
        ValueBlock block = BlockAssertions.createLongsBlock(values);
        return new Page(new Block[]{block});
    }

    private static long sizeOfPages(int count) {
        return PAGE_SIZE.toBytes() * (long)count;
    }

    public static long retainedSizeOfPages(int count) {
        return RETAINED_PAGE_SIZE.toBytes() * (long)count;
    }

    private static /* synthetic */ void lambda$testPartitionCustomPartitioning$20(List types, LocalExchange exchange) {
        Assertions.assertThat((int)exchange.getBufferCount()).isEqualTo(2);
        TestLocalExchange.assertExchangeTotalBufferedBytes(exchange, 0);
        LocalExchange.LocalExchangeSinkFactory sinkFactory = exchange.createSinkFactory();
        sinkFactory.noMoreSinkFactories();
        LocalExchangeSink sink = sinkFactory.createSink();
        TestLocalExchange.assertSinkCanWrite(sink);
        sinkFactory.close();
        LocalExchangeSource sourceB = exchange.getNextSource();
        TestLocalExchange.assertSource(sourceB, 0);
        LocalExchangeSource sourceA = exchange.getNextSource();
        TestLocalExchange.assertSource(sourceA, 0);
        Page pageA = SequencePageBuilder.createSequencePage(types, 1, 100, 42);
        sink.addPage(pageA);
        TestLocalExchange.assertSource(sourceA, 1);
        TestLocalExchange.assertSource(sourceB, 0);
        TestLocalExchange.assertRemovePage(types, sourceA, pageA);
        TestLocalExchange.assertSource(sourceA, 0);
        Page pageB = SequencePageBuilder.createSequencePage(types, 100, 100, 43);
        sink.addPage(pageB);
        TestLocalExchange.assertSource(sourceA, 0);
        TestLocalExchange.assertSource(sourceB, 1);
        TestLocalExchange.assertRemovePage(types, sourceB, pageB);
        TestLocalExchange.assertSource(sourceB, 0);
    }
}

