/*
 * Decompiled with CFR 0.152.
 */
package io.trino.operator.exchange;

import com.google.common.collect.ImmutableList;
import com.google.common.util.concurrent.ListenableFuture;
import io.airlift.units.DataSize;
import io.trino.SequencePageBuilder;
import io.trino.Session;
import io.trino.connector.CatalogName;
import io.trino.execution.Lifespan;
import io.trino.execution.NodeTaskMap;
import io.trino.execution.scheduler.NodeScheduler;
import io.trino.execution.scheduler.NodeSchedulerConfig;
import io.trino.execution.scheduler.NodeSelectorFactory;
import io.trino.execution.scheduler.UniformNodeSelectorFactory;
import io.trino.metadata.InMemoryNodeManager;
import io.trino.metadata.InternalNodeManager;
import io.trino.operator.HashGenerator;
import io.trino.operator.InterpretedHashGenerator;
import io.trino.operator.PageAssertions;
import io.trino.operator.PipelineExecutionStrategy;
import io.trino.operator.exchange.LocalExchange;
import io.trino.operator.exchange.LocalExchangeBufferInfo;
import io.trino.operator.exchange.LocalExchangeSink;
import io.trino.operator.exchange.LocalExchangeSource;
import io.trino.operator.exchange.LocalPartitionGenerator;
import io.trino.spi.Page;
import io.trino.spi.connector.BucketFunction;
import io.trino.spi.connector.ConnectorBucketNodeMap;
import io.trino.spi.connector.ConnectorNodePartitioningProvider;
import io.trino.spi.connector.ConnectorPartitioningHandle;
import io.trino.spi.connector.ConnectorSession;
import io.trino.spi.connector.ConnectorSplit;
import io.trino.spi.connector.ConnectorTransactionHandle;
import io.trino.spi.type.BigintType;
import io.trino.spi.type.Type;
import io.trino.spi.type.TypeOperators;
import io.trino.spi.type.VarcharType;
import io.trino.sql.planner.NodePartitioningManager;
import io.trino.sql.planner.PartitioningHandle;
import io.trino.sql.planner.SystemPartitioningHandle;
import io.trino.testing.TestingSession;
import io.trino.testing.TestingTransactionHandle;
import io.trino.type.BlockTypeOperators;
import io.trino.util.FinalizerService;
import java.util.List;
import java.util.Optional;
import java.util.function.Consumer;
import java.util.function.ToIntFunction;
import org.assertj.core.api.AbstractThrowableAssert;
import org.assertj.core.api.Assertions;
import org.testng.Assert;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;

@Test(singleThreaded=true)
public class TestLocalExchange {
    private static final List<Type> TYPES = ImmutableList.of((Object)BigintType.BIGINT);
    private static final DataSize RETAINED_PAGE_SIZE = DataSize.ofBytes((long)TestLocalExchange.createPage(42).getRetainedSizeInBytes());
    private static final DataSize LOCAL_EXCHANGE_MAX_BUFFERED_BYTES = DataSize.of((long)32L, (DataSize.Unit)DataSize.Unit.MEGABYTE);
    private static final BlockTypeOperators TYPE_OPERATOR_FACTORY = new BlockTypeOperators(new TypeOperators());
    private static final Session SESSION = TestingSession.testSessionBuilder().build();
    private NodePartitioningManager nodePartitioningManager;

    @BeforeMethod
    public void setUp() {
        NodeScheduler nodeScheduler = new NodeScheduler((NodeSelectorFactory)new UniformNodeSelectorFactory((InternalNodeManager)new InMemoryNodeManager(), new NodeSchedulerConfig().setIncludeCoordinator(true), new NodeTaskMap(new FinalizerService())));
        this.nodePartitioningManager = new NodePartitioningManager(nodeScheduler, new BlockTypeOperators(new TypeOperators()));
    }

    @DataProvider
    public static Object[][] executionStrategy() {
        return new Object[][]{{PipelineExecutionStrategy.UNGROUPED_EXECUTION}, {PipelineExecutionStrategy.GROUPED_EXECUTION}};
    }

    @Test(dataProvider="executionStrategy")
    public void testGatherSingleWriter(PipelineExecutionStrategy executionStrategy) {
        LocalExchange.LocalExchangeFactory localExchangeFactory = new LocalExchange.LocalExchangeFactory(this.nodePartitioningManager, SESSION, SystemPartitioningHandle.SINGLE_DISTRIBUTION, 8, TYPES, (List)ImmutableList.of(), Optional.empty(), executionStrategy, DataSize.ofBytes((long)TestLocalExchange.retainedSizeOfPages(99)), TYPE_OPERATOR_FACTORY);
        LocalExchange.LocalExchangeSinkFactoryId localExchangeSinkFactoryId = localExchangeFactory.newSinkFactoryId();
        localExchangeFactory.noMoreSinkFactories();
        this.run(localExchangeFactory, executionStrategy, exchange -> {
            Assert.assertEquals((int)exchange.getBufferCount(), (int)1);
            TestLocalExchange.assertExchangeTotalBufferedBytes(exchange, 0);
            LocalExchange.LocalExchangeSinkFactory sinkFactory = exchange.getSinkFactory(localExchangeSinkFactoryId);
            LocalExchangeSource source = exchange.getSource(0);
            TestLocalExchange.assertSource(source, 0);
            LocalExchangeSink sink = sinkFactory.createSink();
            sinkFactory.close();
            sinkFactory.noMoreSinkFactories();
            TestLocalExchange.assertSinkCanWrite(sink);
            TestLocalExchange.assertSource(source, 0);
            ListenableFuture readFuture = source.waitForReading();
            Assert.assertFalse((boolean)readFuture.isDone());
            sink.addPage(TestLocalExchange.createPage(0));
            Assert.assertTrue((boolean)readFuture.isDone());
            TestLocalExchange.assertExchangeTotalBufferedBytes(exchange, 1);
            TestLocalExchange.assertSource(source, 1);
            sink.addPage(TestLocalExchange.createPage(1));
            TestLocalExchange.assertSource(source, 2);
            TestLocalExchange.assertExchangeTotalBufferedBytes(exchange, 2);
            TestLocalExchange.assertRemovePage(source, TestLocalExchange.createPage(0));
            TestLocalExchange.assertSource(source, 1);
            TestLocalExchange.assertExchangeTotalBufferedBytes(exchange, 1);
            TestLocalExchange.assertRemovePage(source, TestLocalExchange.createPage(1));
            TestLocalExchange.assertSource(source, 0);
            TestLocalExchange.assertExchangeTotalBufferedBytes(exchange, 0);
            sink.addPage(TestLocalExchange.createPage(2));
            sink.addPage(TestLocalExchange.createPage(3));
            TestLocalExchange.assertSource(source, 2);
            TestLocalExchange.assertExchangeTotalBufferedBytes(exchange, 2);
            sink.finish();
            TestLocalExchange.assertSinkFinished(sink);
            TestLocalExchange.assertSource(source, 2);
            TestLocalExchange.assertRemovePage(source, TestLocalExchange.createPage(2));
            TestLocalExchange.assertSource(source, 1);
            TestLocalExchange.assertSinkFinished(sink);
            TestLocalExchange.assertExchangeTotalBufferedBytes(exchange, 1);
            TestLocalExchange.assertRemovePage(source, TestLocalExchange.createPage(3));
            TestLocalExchange.assertSourceFinished(source);
            TestLocalExchange.assertExchangeTotalBufferedBytes(exchange, 0);
        });
    }

    @Test(dataProvider="executionStrategy")
    public void testBroadcast(PipelineExecutionStrategy executionStrategy) {
        LocalExchange.LocalExchangeFactory localExchangeFactory = new LocalExchange.LocalExchangeFactory(this.nodePartitioningManager, SESSION, SystemPartitioningHandle.FIXED_BROADCAST_DISTRIBUTION, 2, TYPES, (List)ImmutableList.of(), Optional.empty(), executionStrategy, LOCAL_EXCHANGE_MAX_BUFFERED_BYTES, TYPE_OPERATOR_FACTORY);
        LocalExchange.LocalExchangeSinkFactoryId localExchangeSinkFactoryId = localExchangeFactory.newSinkFactoryId();
        localExchangeFactory.noMoreSinkFactories();
        this.run(localExchangeFactory, executionStrategy, exchange -> {
            Assert.assertEquals((int)exchange.getBufferCount(), (int)2);
            TestLocalExchange.assertExchangeTotalBufferedBytes(exchange, 0);
            LocalExchange.LocalExchangeSinkFactory sinkFactory = exchange.getSinkFactory(localExchangeSinkFactoryId);
            LocalExchangeSink sinkA = sinkFactory.createSink();
            TestLocalExchange.assertSinkCanWrite(sinkA);
            LocalExchangeSink sinkB = sinkFactory.createSink();
            TestLocalExchange.assertSinkCanWrite(sinkB);
            sinkFactory.close();
            sinkFactory.noMoreSinkFactories();
            LocalExchangeSource sourceA = exchange.getSource(0);
            TestLocalExchange.assertSource(sourceA, 0);
            LocalExchangeSource sourceB = exchange.getSource(1);
            TestLocalExchange.assertSource(sourceB, 0);
            sinkA.addPage(TestLocalExchange.createPage(0));
            TestLocalExchange.assertSource(sourceA, 1);
            TestLocalExchange.assertSource(sourceB, 1);
            TestLocalExchange.assertExchangeTotalBufferedBytes(exchange, 1);
            sinkA.addPage(TestLocalExchange.createPage(0));
            TestLocalExchange.assertSource(sourceA, 2);
            TestLocalExchange.assertSource(sourceB, 2);
            TestLocalExchange.assertExchangeTotalBufferedBytes(exchange, 2);
            TestLocalExchange.assertRemovePage(sourceA, TestLocalExchange.createPage(0));
            TestLocalExchange.assertSource(sourceA, 1);
            TestLocalExchange.assertSource(sourceB, 2);
            TestLocalExchange.assertExchangeTotalBufferedBytes(exchange, 2);
            TestLocalExchange.assertRemovePage(sourceA, TestLocalExchange.createPage(0));
            TestLocalExchange.assertSource(sourceA, 0);
            TestLocalExchange.assertSource(sourceB, 2);
            TestLocalExchange.assertExchangeTotalBufferedBytes(exchange, 2);
            sinkA.finish();
            TestLocalExchange.assertSinkFinished(sinkA);
            TestLocalExchange.assertExchangeTotalBufferedBytes(exchange, 2);
            sinkB.addPage(TestLocalExchange.createPage(0));
            TestLocalExchange.assertSource(sourceA, 1);
            TestLocalExchange.assertSource(sourceB, 3);
            TestLocalExchange.assertExchangeTotalBufferedBytes(exchange, 3);
            sinkB.finish();
            TestLocalExchange.assertSinkFinished(sinkB);
            TestLocalExchange.assertSource(sourceA, 1);
            TestLocalExchange.assertSource(sourceB, 3);
            TestLocalExchange.assertExchangeTotalBufferedBytes(exchange, 3);
            TestLocalExchange.assertRemovePage(sourceA, TestLocalExchange.createPage(0));
            TestLocalExchange.assertSourceFinished(sourceA);
            TestLocalExchange.assertSource(sourceB, 3);
            TestLocalExchange.assertExchangeTotalBufferedBytes(exchange, 3);
            TestLocalExchange.assertRemovePage(sourceB, TestLocalExchange.createPage(0));
            TestLocalExchange.assertRemovePage(sourceB, TestLocalExchange.createPage(0));
            TestLocalExchange.assertSourceFinished(sourceA);
            TestLocalExchange.assertSource(sourceB, 1);
            TestLocalExchange.assertExchangeTotalBufferedBytes(exchange, 1);
            TestLocalExchange.assertRemovePage(sourceB, TestLocalExchange.createPage(0));
            TestLocalExchange.assertSourceFinished(sourceA);
            TestLocalExchange.assertSourceFinished(sourceB);
            TestLocalExchange.assertExchangeTotalBufferedBytes(exchange, 0);
        });
    }

    @Test(dataProvider="executionStrategy")
    public void testRandom(PipelineExecutionStrategy executionStrategy) {
        LocalExchange.LocalExchangeFactory localExchangeFactory = new LocalExchange.LocalExchangeFactory(this.nodePartitioningManager, SESSION, SystemPartitioningHandle.FIXED_ARBITRARY_DISTRIBUTION, 2, TYPES, (List)ImmutableList.of(), Optional.empty(), executionStrategy, LOCAL_EXCHANGE_MAX_BUFFERED_BYTES, TYPE_OPERATOR_FACTORY);
        LocalExchange.LocalExchangeSinkFactoryId localExchangeSinkFactoryId = localExchangeFactory.newSinkFactoryId();
        localExchangeFactory.noMoreSinkFactories();
        this.run(localExchangeFactory, executionStrategy, exchange -> {
            Assert.assertEquals((int)exchange.getBufferCount(), (int)2);
            TestLocalExchange.assertExchangeTotalBufferedBytes(exchange, 0);
            LocalExchange.LocalExchangeSinkFactory sinkFactory = exchange.getSinkFactory(localExchangeSinkFactoryId);
            LocalExchangeSink sink = sinkFactory.createSink();
            TestLocalExchange.assertSinkCanWrite(sink);
            sinkFactory.close();
            sinkFactory.noMoreSinkFactories();
            LocalExchangeSource sourceA = exchange.getSource(0);
            TestLocalExchange.assertSource(sourceA, 0);
            LocalExchangeSource sourceB = exchange.getSource(1);
            TestLocalExchange.assertSource(sourceB, 0);
            for (int i = 0; i < 100; ++i) {
                Page page = TestLocalExchange.createPage(0);
                sink.addPage(page);
                TestLocalExchange.assertExchangeTotalBufferedBytes(exchange, i + 1);
                LocalExchangeBufferInfo bufferInfoA = sourceA.getBufferInfo();
                LocalExchangeBufferInfo bufferInfoB = sourceB.getBufferInfo();
                Assert.assertEquals((long)(bufferInfoA.getBufferedBytes() + bufferInfoB.getBufferedBytes()), (long)TestLocalExchange.retainedSizeOfPages(i + 1));
                Assert.assertEquals((int)(bufferInfoA.getBufferedPages() + bufferInfoB.getBufferedPages()), (int)(i + 1));
            }
            Assert.assertTrue((sourceA.getBufferInfo().getBufferedPages() > 0 ? 1 : 0) != 0);
            Assert.assertTrue((sourceB.getBufferInfo().getBufferedPages() > 0 ? 1 : 0) != 0);
            TestLocalExchange.assertExchangeTotalBufferedBytes(exchange, 100);
        });
    }

    @Test(dataProvider="executionStrategy")
    public void testPassthrough(PipelineExecutionStrategy executionStrategy) {
        LocalExchange.LocalExchangeFactory localExchangeFactory = new LocalExchange.LocalExchangeFactory(this.nodePartitioningManager, SESSION, SystemPartitioningHandle.FIXED_PASSTHROUGH_DISTRIBUTION, 2, TYPES, (List)ImmutableList.of(), Optional.empty(), executionStrategy, DataSize.ofBytes((long)TestLocalExchange.retainedSizeOfPages(1)), TYPE_OPERATOR_FACTORY);
        LocalExchange.LocalExchangeSinkFactoryId localExchangeSinkFactoryId = localExchangeFactory.newSinkFactoryId();
        localExchangeFactory.noMoreSinkFactories();
        this.run(localExchangeFactory, executionStrategy, exchange -> {
            Assert.assertEquals((int)exchange.getBufferCount(), (int)2);
            TestLocalExchange.assertExchangeTotalBufferedBytes(exchange, 0);
            LocalExchange.LocalExchangeSinkFactory sinkFactory = exchange.getSinkFactory(localExchangeSinkFactoryId);
            LocalExchangeSink sinkA = sinkFactory.createSink();
            LocalExchangeSink sinkB = sinkFactory.createSink();
            TestLocalExchange.assertSinkCanWrite(sinkA);
            TestLocalExchange.assertSinkCanWrite(sinkB);
            sinkFactory.close();
            sinkFactory.noMoreSinkFactories();
            LocalExchangeSource sourceA = exchange.getSource(0);
            TestLocalExchange.assertSource(sourceA, 0);
            LocalExchangeSource sourceB = exchange.getSource(1);
            TestLocalExchange.assertSource(sourceB, 0);
            sinkA.addPage(TestLocalExchange.createPage(0));
            TestLocalExchange.assertSource(sourceA, 1);
            TestLocalExchange.assertSource(sourceB, 0);
            TestLocalExchange.assertSinkWriteBlocked(sinkA);
            TestLocalExchange.assertSinkCanWrite(sinkB);
            sinkB.addPage(TestLocalExchange.createPage(1));
            TestLocalExchange.assertSource(sourceA, 1);
            TestLocalExchange.assertSource(sourceB, 1);
            TestLocalExchange.assertSinkWriteBlocked(sinkA);
            TestLocalExchange.assertExchangeTotalBufferedBytes(exchange, 2);
            TestLocalExchange.assertRemovePage(sourceA, TestLocalExchange.createPage(0));
            TestLocalExchange.assertSource(sourceA, 0);
            TestLocalExchange.assertSinkCanWrite(sinkA);
            TestLocalExchange.assertSinkWriteBlocked(sinkB);
            TestLocalExchange.assertExchangeTotalBufferedBytes(exchange, 1);
            sinkA.finish();
            TestLocalExchange.assertSinkFinished(sinkA);
            TestLocalExchange.assertSource(sourceB, 1);
            sourceA.finish();
            sourceB.finish();
            TestLocalExchange.assertRemovePage(sourceB, TestLocalExchange.createPage(1));
            TestLocalExchange.assertSourceFinished(sourceA);
            TestLocalExchange.assertSourceFinished(sourceB);
            TestLocalExchange.assertSinkFinished(sinkB);
            TestLocalExchange.assertExchangeTotalBufferedBytes(exchange, 0);
        });
    }

    @Test(dataProvider="executionStrategy")
    public void testPartition(PipelineExecutionStrategy executionStrategy) {
        LocalExchange.LocalExchangeFactory localExchangeFactory = new LocalExchange.LocalExchangeFactory(this.nodePartitioningManager, SESSION, SystemPartitioningHandle.FIXED_HASH_DISTRIBUTION, 2, TYPES, (List)ImmutableList.of((Object)0), Optional.empty(), executionStrategy, LOCAL_EXCHANGE_MAX_BUFFERED_BYTES, TYPE_OPERATOR_FACTORY);
        LocalExchange.LocalExchangeSinkFactoryId localExchangeSinkFactoryId = localExchangeFactory.newSinkFactoryId();
        localExchangeFactory.noMoreSinkFactories();
        this.run(localExchangeFactory, executionStrategy, exchange -> {
            Assert.assertEquals((int)exchange.getBufferCount(), (int)2);
            TestLocalExchange.assertExchangeTotalBufferedBytes(exchange, 0);
            LocalExchange.LocalExchangeSinkFactory sinkFactory = exchange.getSinkFactory(localExchangeSinkFactoryId);
            LocalExchangeSink sink = sinkFactory.createSink();
            TestLocalExchange.assertSinkCanWrite(sink);
            sinkFactory.close();
            sinkFactory.noMoreSinkFactories();
            LocalExchangeSource sourceA = exchange.getSource(0);
            TestLocalExchange.assertSource(sourceA, 0);
            LocalExchangeSource sourceB = exchange.getSource(1);
            TestLocalExchange.assertSource(sourceB, 0);
            sink.addPage(TestLocalExchange.createPage(0));
            TestLocalExchange.assertSource(sourceA, 1);
            TestLocalExchange.assertSource(sourceB, 1);
            Assert.assertTrue((exchange.getBufferedBytes() >= TestLocalExchange.retainedSizeOfPages(1) ? 1 : 0) != 0);
            sink.addPage(TestLocalExchange.createPage(0));
            TestLocalExchange.assertSource(sourceA, 2);
            TestLocalExchange.assertSource(sourceB, 2);
            Assert.assertTrue((exchange.getBufferedBytes() >= TestLocalExchange.retainedSizeOfPages(2) ? 1 : 0) != 0);
            TestLocalExchange.assertPartitionedRemovePage(sourceA, 0, 2);
            TestLocalExchange.assertSource(sourceA, 1);
            TestLocalExchange.assertSource(sourceB, 2);
            TestLocalExchange.assertPartitionedRemovePage(sourceA, 0, 2);
            TestLocalExchange.assertSource(sourceA, 0);
            TestLocalExchange.assertSource(sourceB, 2);
            sink.finish();
            TestLocalExchange.assertSinkFinished(sink);
            TestLocalExchange.assertSourceFinished(sourceA);
            TestLocalExchange.assertSource(sourceB, 2);
            TestLocalExchange.assertPartitionedRemovePage(sourceB, 1, 2);
            TestLocalExchange.assertSourceFinished(sourceA);
            TestLocalExchange.assertSource(sourceB, 1);
            TestLocalExchange.assertPartitionedRemovePage(sourceB, 1, 2);
            TestLocalExchange.assertSourceFinished(sourceA);
            TestLocalExchange.assertSourceFinished(sourceB);
            TestLocalExchange.assertExchangeTotalBufferedBytes(exchange, 0);
        });
    }

    @Test(dataProvider="executionStrategy")
    public void testPartitionCustomPartitioning(PipelineExecutionStrategy executionStrategy) {
        ConnectorPartitioningHandle connectorPartitioningHandle = new ConnectorPartitioningHandle(){};
        ConnectorNodePartitioningProvider connectorNodePartitioningProvider = new ConnectorNodePartitioningProvider(){

            public ConnectorBucketNodeMap getBucketNodeMap(ConnectorTransactionHandle transactionHandle, ConnectorSession session, ConnectorPartitioningHandle partitioningHandle) {
                return ConnectorBucketNodeMap.createBucketNodeMap((int)2);
            }

            public ToIntFunction<ConnectorSplit> getSplitBucketFunction(ConnectorTransactionHandle transactionHandle, ConnectorSession session, ConnectorPartitioningHandle partitioningHandle) {
                throw new UnsupportedOperationException();
            }

            public BucketFunction getBucketFunction(ConnectorTransactionHandle transactionHandle, ConnectorSession session, ConnectorPartitioningHandle partitioningHandle, List<Type> partitionChannelTypes, int bucketCount) {
                return (page, position) -> {
                    long rowValue = BigintType.BIGINT.getLong(page.getBlock(0), position);
                    if (rowValue == 42L) {
                        return 0;
                    }
                    return 1;
                };
            }
        };
        ImmutableList types = ImmutableList.of((Object)VarcharType.VARCHAR, (Object)BigintType.BIGINT);
        this.nodePartitioningManager.addPartitioningProvider(new CatalogName("foo"), connectorNodePartitioningProvider);
        PartitioningHandle partitioningHandle = new PartitioningHandle(Optional.of(new CatalogName("foo")), Optional.of(TestingTransactionHandle.create()), connectorPartitioningHandle);
        LocalExchange.LocalExchangeFactory localExchangeFactory = new LocalExchange.LocalExchangeFactory(this.nodePartitioningManager, SESSION, partitioningHandle, 2, (List)types, (List)ImmutableList.of((Object)1), Optional.empty(), executionStrategy, LOCAL_EXCHANGE_MAX_BUFFERED_BYTES, TYPE_OPERATOR_FACTORY);
        LocalExchange.LocalExchangeSinkFactoryId localExchangeSinkFactoryId = localExchangeFactory.newSinkFactoryId();
        localExchangeFactory.noMoreSinkFactories();
        this.run(localExchangeFactory, executionStrategy, arg_0 -> TestLocalExchange.lambda$testPartitionCustomPartitioning$5(localExchangeSinkFactoryId, (List)types, arg_0));
    }

    @Test(dataProvider="executionStrategy")
    public void writeUnblockWhenAllReadersFinish(PipelineExecutionStrategy executionStrategy) {
        ImmutableList types = ImmutableList.of((Object)BigintType.BIGINT);
        LocalExchange.LocalExchangeFactory localExchangeFactory = new LocalExchange.LocalExchangeFactory(this.nodePartitioningManager, SESSION, SystemPartitioningHandle.FIXED_BROADCAST_DISTRIBUTION, 2, (List)types, (List)ImmutableList.of(), Optional.empty(), executionStrategy, LOCAL_EXCHANGE_MAX_BUFFERED_BYTES, TYPE_OPERATOR_FACTORY);
        LocalExchange.LocalExchangeSinkFactoryId localExchangeSinkFactoryId = localExchangeFactory.newSinkFactoryId();
        localExchangeFactory.noMoreSinkFactories();
        this.run(localExchangeFactory, executionStrategy, exchange -> {
            Assert.assertEquals((int)exchange.getBufferCount(), (int)2);
            TestLocalExchange.assertExchangeTotalBufferedBytes(exchange, 0);
            LocalExchange.LocalExchangeSinkFactory sinkFactory = exchange.getSinkFactory(localExchangeSinkFactoryId);
            LocalExchangeSink sinkA = sinkFactory.createSink();
            TestLocalExchange.assertSinkCanWrite(sinkA);
            LocalExchangeSink sinkB = sinkFactory.createSink();
            TestLocalExchange.assertSinkCanWrite(sinkB);
            sinkFactory.close();
            sinkFactory.noMoreSinkFactories();
            LocalExchangeSource sourceA = exchange.getSource(0);
            TestLocalExchange.assertSource(sourceA, 0);
            LocalExchangeSource sourceB = exchange.getSource(1);
            TestLocalExchange.assertSource(sourceB, 0);
            sourceA.finish();
            TestLocalExchange.assertSourceFinished(sourceA);
            TestLocalExchange.assertSinkCanWrite(sinkA);
            TestLocalExchange.assertSinkCanWrite(sinkB);
            sourceB.finish();
            TestLocalExchange.assertSourceFinished(sourceB);
            TestLocalExchange.assertSinkFinished(sinkA);
            TestLocalExchange.assertSinkFinished(sinkB);
        });
    }

    @Test(dataProvider="executionStrategy")
    public void writeUnblockWhenAllReadersFinishAndPagesConsumed(PipelineExecutionStrategy executionStrategy) {
        LocalExchange.LocalExchangeFactory localExchangeFactory = new LocalExchange.LocalExchangeFactory(this.nodePartitioningManager, SESSION, SystemPartitioningHandle.FIXED_BROADCAST_DISTRIBUTION, 2, TYPES, (List)ImmutableList.of(), Optional.empty(), executionStrategy, DataSize.ofBytes((long)1L), TYPE_OPERATOR_FACTORY);
        LocalExchange.LocalExchangeSinkFactoryId localExchangeSinkFactoryId = localExchangeFactory.newSinkFactoryId();
        localExchangeFactory.noMoreSinkFactories();
        this.run(localExchangeFactory, executionStrategy, exchange -> {
            Assert.assertEquals((int)exchange.getBufferCount(), (int)2);
            TestLocalExchange.assertExchangeTotalBufferedBytes(exchange, 0);
            LocalExchange.LocalExchangeSinkFactory sinkFactory = exchange.getSinkFactory(localExchangeSinkFactoryId);
            LocalExchangeSink sinkA = sinkFactory.createSink();
            TestLocalExchange.assertSinkCanWrite(sinkA);
            LocalExchangeSink sinkB = sinkFactory.createSink();
            TestLocalExchange.assertSinkCanWrite(sinkB);
            sinkFactory.close();
            sinkFactory.noMoreSinkFactories();
            LocalExchangeSource sourceA = exchange.getSource(0);
            TestLocalExchange.assertSource(sourceA, 0);
            LocalExchangeSource sourceB = exchange.getSource(1);
            TestLocalExchange.assertSource(sourceB, 0);
            sinkA.addPage(TestLocalExchange.createPage(0));
            ListenableFuture<Void> sinkAFuture = TestLocalExchange.assertSinkWriteBlocked(sinkA);
            ListenableFuture<Void> sinkBFuture = TestLocalExchange.assertSinkWriteBlocked(sinkB);
            TestLocalExchange.assertSource(sourceA, 1);
            TestLocalExchange.assertSource(sourceB, 1);
            TestLocalExchange.assertExchangeTotalBufferedBytes(exchange, 1);
            sourceA.finish();
            TestLocalExchange.assertSource(sourceA, 1);
            TestLocalExchange.assertRemovePage(sourceA, TestLocalExchange.createPage(0));
            TestLocalExchange.assertSourceFinished(sourceA);
            TestLocalExchange.assertExchangeTotalBufferedBytes(exchange, 1);
            TestLocalExchange.assertSource(sourceB, 1);
            TestLocalExchange.assertSinkWriteBlocked(sinkA);
            TestLocalExchange.assertSinkWriteBlocked(sinkB);
            sourceB.finish();
            TestLocalExchange.assertSource(sourceB, 1);
            TestLocalExchange.assertRemovePage(sourceB, TestLocalExchange.createPage(0));
            TestLocalExchange.assertSourceFinished(sourceB);
            TestLocalExchange.assertExchangeTotalBufferedBytes(exchange, 0);
            Assert.assertTrue((boolean)sinkAFuture.isDone());
            Assert.assertTrue((boolean)sinkBFuture.isDone());
            TestLocalExchange.assertSinkFinished(sinkA);
            TestLocalExchange.assertSinkFinished(sinkB);
        });
    }

    @Test
    public void testMismatchedExecutionStrategy() {
        LocalExchange.LocalExchangeFactory ungroupedLocalExchangeFactory = new LocalExchange.LocalExchangeFactory(this.nodePartitioningManager, SESSION, SystemPartitioningHandle.FIXED_HASH_DISTRIBUTION, 2, TYPES, (List)ImmutableList.of((Object)0), Optional.empty(), PipelineExecutionStrategy.UNGROUPED_EXECUTION, LOCAL_EXCHANGE_MAX_BUFFERED_BYTES, TYPE_OPERATOR_FACTORY);
        ((AbstractThrowableAssert)Assertions.assertThatThrownBy(() -> ungroupedLocalExchangeFactory.getLocalExchange(Lifespan.driverGroup((int)3))).isInstanceOf(IllegalArgumentException.class)).hasMessage("LocalExchangeFactory is declared as UNGROUPED_EXECUTION. Driver-group exchange cannot be created.");
        LocalExchange.LocalExchangeFactory groupedLocalExchangeFactory = new LocalExchange.LocalExchangeFactory(this.nodePartitioningManager, SESSION, SystemPartitioningHandle.FIXED_HASH_DISTRIBUTION, 2, TYPES, (List)ImmutableList.of((Object)0), Optional.empty(), PipelineExecutionStrategy.GROUPED_EXECUTION, LOCAL_EXCHANGE_MAX_BUFFERED_BYTES, TYPE_OPERATOR_FACTORY);
        ((AbstractThrowableAssert)Assertions.assertThatThrownBy(() -> groupedLocalExchangeFactory.getLocalExchange(Lifespan.taskWide())).isInstanceOf(IllegalArgumentException.class)).hasMessage("LocalExchangeFactory is declared as GROUPED_EXECUTION. Task-wide exchange cannot be created.");
    }

    private void run(LocalExchange.LocalExchangeFactory localExchangeFactory, PipelineExecutionStrategy pipelineExecutionStrategy, Consumer<LocalExchange> test) {
        switch (pipelineExecutionStrategy) {
            case UNGROUPED_EXECUTION: {
                test.accept(localExchangeFactory.getLocalExchange(Lifespan.taskWide()));
                return;
            }
            case GROUPED_EXECUTION: {
                test.accept(localExchangeFactory.getLocalExchange(Lifespan.driverGroup((int)1)));
                test.accept(localExchangeFactory.getLocalExchange(Lifespan.driverGroup((int)12)));
                test.accept(localExchangeFactory.getLocalExchange(Lifespan.driverGroup((int)23)));
                return;
            }
        }
        throw new IllegalArgumentException("Unknown pipelineExecutionStrategy");
    }

    private static void assertSource(LocalExchangeSource source, int pageCount) {
        LocalExchangeBufferInfo bufferInfo = source.getBufferInfo();
        Assert.assertEquals((int)bufferInfo.getBufferedPages(), (int)pageCount);
        Assert.assertFalse((boolean)source.isFinished());
        if (pageCount == 0) {
            Assert.assertFalse((boolean)source.waitForReading().isDone());
            Assert.assertNull((Object)source.removePage());
            Assert.assertFalse((boolean)source.waitForReading().isDone());
            Assert.assertFalse((boolean)source.isFinished());
            Assert.assertEquals((long)bufferInfo.getBufferedBytes(), (long)0L);
        } else {
            Assert.assertTrue((boolean)source.waitForReading().isDone());
            Assert.assertTrue((bufferInfo.getBufferedBytes() > 0L ? 1 : 0) != 0);
        }
    }

    private static void assertSourceFinished(LocalExchangeSource source) {
        Assert.assertTrue((boolean)source.isFinished());
        LocalExchangeBufferInfo bufferInfo = source.getBufferInfo();
        Assert.assertEquals((int)bufferInfo.getBufferedPages(), (int)0);
        Assert.assertEquals((long)bufferInfo.getBufferedBytes(), (long)0L);
        Assert.assertTrue((boolean)source.waitForReading().isDone());
        Assert.assertNull((Object)source.removePage());
        Assert.assertTrue((boolean)source.waitForReading().isDone());
        Assert.assertTrue((boolean)source.isFinished());
    }

    private static void assertRemovePage(LocalExchangeSource source, Page expectedPage) {
        TestLocalExchange.assertRemovePage(TYPES, source, expectedPage);
    }

    private static void assertRemovePage(List<Type> types, LocalExchangeSource source, Page expectedPage) {
        Assert.assertTrue((boolean)source.waitForReading().isDone());
        Page actualPage = source.removePage();
        Assert.assertNotNull((Object)actualPage);
        Assert.assertEquals((int)actualPage.getChannelCount(), (int)expectedPage.getChannelCount());
        PageAssertions.assertPageEquals(types, actualPage, expectedPage);
    }

    private static void assertPartitionedRemovePage(LocalExchangeSource source, int partition, int partitionCount) {
        Assert.assertTrue((boolean)source.waitForReading().isDone());
        Page page = source.removePage();
        Assert.assertNotNull((Object)page);
        LocalPartitionGenerator partitionGenerator = new LocalPartitionGenerator((HashGenerator)new InterpretedHashGenerator(TYPES, new int[]{0}, TYPE_OPERATOR_FACTORY), partitionCount);
        for (int position = 0; position < page.getPositionCount(); ++position) {
            Assert.assertEquals((int)partitionGenerator.getPartition(page, position), (int)partition);
        }
    }

    private static void assertSinkCanWrite(LocalExchangeSink sink) {
        Assert.assertFalse((boolean)sink.isFinished());
        Assert.assertTrue((boolean)sink.waitForWriting().isDone());
    }

    private static ListenableFuture<Void> assertSinkWriteBlocked(LocalExchangeSink sink) {
        Assert.assertFalse((boolean)sink.isFinished());
        ListenableFuture writeFuture = sink.waitForWriting();
        Assert.assertFalse((boolean)writeFuture.isDone());
        return writeFuture;
    }

    private static void assertSinkFinished(LocalExchangeSink sink) {
        Assert.assertTrue((boolean)sink.isFinished());
        Assert.assertTrue((boolean)sink.waitForWriting().isDone());
        sink.addPage(TestLocalExchange.createPage(0));
        Assert.assertTrue((boolean)sink.isFinished());
        Assert.assertTrue((boolean)sink.waitForWriting().isDone());
    }

    private static void assertExchangeTotalBufferedBytes(LocalExchange exchange, int pageCount) {
        Assert.assertEquals((long)exchange.getBufferedBytes(), (long)TestLocalExchange.retainedSizeOfPages(pageCount));
    }

    private static Page createPage(int i) {
        return SequencePageBuilder.createSequencePage(TYPES, 100, i);
    }

    public static long retainedSizeOfPages(int count) {
        return RETAINED_PAGE_SIZE.toBytes() * (long)count;
    }

    private static /* synthetic */ void lambda$testPartitionCustomPartitioning$5(LocalExchange.LocalExchangeSinkFactoryId localExchangeSinkFactoryId, List types, LocalExchange exchange) {
        Assert.assertEquals((int)exchange.getBufferCount(), (int)2);
        TestLocalExchange.assertExchangeTotalBufferedBytes(exchange, 0);
        LocalExchange.LocalExchangeSinkFactory sinkFactory = exchange.getSinkFactory(localExchangeSinkFactoryId);
        LocalExchangeSink sink = sinkFactory.createSink();
        TestLocalExchange.assertSinkCanWrite(sink);
        sinkFactory.close();
        sinkFactory.noMoreSinkFactories();
        LocalExchangeSource sourceA = exchange.getSource(1);
        TestLocalExchange.assertSource(sourceA, 0);
        LocalExchangeSource sourceB = exchange.getSource(0);
        TestLocalExchange.assertSource(sourceB, 0);
        Page pageA = SequencePageBuilder.createSequencePage(types, 1, 100, 42);
        sink.addPage(pageA);
        TestLocalExchange.assertSource(sourceA, 1);
        TestLocalExchange.assertSource(sourceB, 0);
        TestLocalExchange.assertRemovePage(types, sourceA, pageA);
        TestLocalExchange.assertSource(sourceA, 0);
        Page pageB = SequencePageBuilder.createSequencePage(types, 100, 100, 43);
        sink.addPage(pageB);
        TestLocalExchange.assertSource(sourceA, 0);
        TestLocalExchange.assertSource(sourceB, 1);
        TestLocalExchange.assertRemovePage(types, sourceB, pageB);
        TestLocalExchange.assertSource(sourceB, 0);
    }
}

