/*
 * Decompiled with CFR 0.152.
 */
package io.trino.execution.scheduler.faulttolerant;

import com.google.common.base.Preconditions;
import com.google.common.collect.ArrayListMultimap;
import com.google.common.collect.HashMultimap;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableListMultimap;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.ImmutableSetMultimap;
import com.google.common.collect.ListMultimap;
import com.google.common.collect.Multimaps;
import com.google.common.collect.SetMultimap;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningScheduledExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.SettableFuture;
import com.google.errorprone.annotations.concurrent.GuardedBy;
import io.airlift.concurrent.Threads;
import io.trino.exchange.SpoolingExchangeInput;
import io.trino.execution.TableExecuteContextManager;
import io.trino.execution.scheduler.TestingExchangeSourceHandle;
import io.trino.execution.scheduler.faulttolerant.EventDrivenTaskSource;
import io.trino.execution.scheduler.faulttolerant.FaultTolerantPartitioningScheme;
import io.trino.execution.scheduler.faulttolerant.NodeRequirements;
import io.trino.execution.scheduler.faulttolerant.SplitAssigner;
import io.trino.execution.scheduler.faulttolerant.SplitAssignerTester;
import io.trino.execution.scheduler.faulttolerant.TaskDescriptor;
import io.trino.execution.scheduler.faulttolerant.TestingConnectorSplit;
import io.trino.metadata.Split;
import io.trino.operator.ExchangeOperator;
import io.trino.spi.QueryId;
import io.trino.spi.connector.CatalogHandle;
import io.trino.spi.connector.ConnectorSplit;
import io.trino.spi.exchange.Exchange;
import io.trino.spi.exchange.ExchangeId;
import io.trino.spi.exchange.ExchangeSinkHandle;
import io.trino.spi.exchange.ExchangeSinkInstanceHandle;
import io.trino.spi.exchange.ExchangeSourceHandle;
import io.trino.spi.exchange.ExchangeSourceHandleSource;
import io.trino.split.RemoteSplit;
import io.trino.split.SplitSource;
import io.trino.sql.planner.plan.PlanFragmentId;
import io.trino.sql.planner.plan.PlanNodeId;
import io.trino.testing.TestingHandles;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.OptionalInt;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.IntStream;
import org.assertj.core.api.Assertions;
import org.assertj.core.api.Fail;
import org.assertj.core.api.ListAssert;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.RepeatedTest;
import org.junit.jupiter.api.TestInstance;
import org.junit.jupiter.api.Timeout;
import org.junit.jupiter.api.parallel.Execution;
import org.junit.jupiter.api.parallel.ExecutionMode;

@TestInstance(value=TestInstance.Lifecycle.PER_CLASS)
@Execution(value=ExecutionMode.CONCURRENT)
public class TestEventDrivenTaskSource {
    private static final int INVOCATION_COUNT = 20;
    private static final long TIMEOUT = 60L;
    private static final PlanNodeId PLAN_NODE_1 = new PlanNodeId("plan-node-1");
    private static final PlanNodeId PLAN_NODE_2 = new PlanNodeId("plan-node-2");
    private static final PlanNodeId PLAN_NODE_3 = new PlanNodeId("plan-node-3");
    private static final PlanNodeId PLAN_NODE_4 = new PlanNodeId("plan-node-3");
    private static final PlanFragmentId FRAGMENT_1 = new PlanFragmentId("fragment-1");
    private static final PlanFragmentId FRAGMENT_2 = new PlanFragmentId("fragment-2");
    private static final PlanFragmentId FRAGMENT_3 = new PlanFragmentId("fragment-3");
    private final AtomicInteger nextId = new AtomicInteger();
    private ListeningScheduledExecutorService executor;

    @BeforeAll
    public void setUp() {
        this.executor = MoreExecutors.listeningDecorator((ScheduledExecutorService)Executors.newScheduledThreadPool(10, Threads.daemonThreadsNamed((String)this.getClass().getName())));
    }

    @AfterAll
    public void tearDown() {
        if (this.executor != null) {
            this.executor.shutdownNow();
            this.executor = null;
        }
    }

    @RepeatedTest(value=20)
    @Timeout(value=60L)
    public void testHappyPath() throws Exception {
        this.testStageTaskSourceSuccess((ListMultimap<PlanFragmentId, ExchangeSourceHandle>)ImmutableListMultimap.of(), (Map<PlanFragmentId, PlanNodeId>)ImmutableMap.of(), (ListMultimap<PlanNodeId, ConnectorSplit>)ImmutableListMultimap.of());
        this.testStageTaskSourceSuccess((ListMultimap<PlanFragmentId, ExchangeSourceHandle>)ImmutableListMultimap.of(), (Map<PlanFragmentId, PlanNodeId>)ImmutableMap.of(), (ListMultimap<PlanNodeId, ConnectorSplit>)ImmutableListMultimap.of((Object)PLAN_NODE_1, (Object)this.createSplit(0)));
        this.testStageTaskSourceSuccess((ListMultimap<PlanFragmentId, ExchangeSourceHandle>)ImmutableListMultimap.of(), (Map<PlanFragmentId, PlanNodeId>)ImmutableMap.of(), (ListMultimap<PlanNodeId, ConnectorSplit>)ImmutableListMultimap.builder().putAll((Object)PLAN_NODE_1, (Object[])new ConnectorSplit[]{this.createSplit(0), this.createSplit(0), this.createSplit(1)}).build());
        this.testStageTaskSourceSuccess((ListMultimap<PlanFragmentId, ExchangeSourceHandle>)ImmutableListMultimap.of(), (Map<PlanFragmentId, PlanNodeId>)ImmutableMap.of(), (ListMultimap<PlanNodeId, ConnectorSplit>)ImmutableListMultimap.builder().putAll((Object)PLAN_NODE_1, (Object[])new ConnectorSplit[]{this.createSplit(0)}).putAll((Object)PLAN_NODE_2, (Object[])new ConnectorSplit[]{this.createSplit(0)}).build());
        this.testStageTaskSourceSuccess((ListMultimap<PlanFragmentId, ExchangeSourceHandle>)ImmutableListMultimap.of(), (Map<PlanFragmentId, PlanNodeId>)ImmutableMap.of(), (ListMultimap<PlanNodeId, ConnectorSplit>)ImmutableListMultimap.builder().putAll((Object)PLAN_NODE_1, (Object[])new ConnectorSplit[]{this.createSplit(0)}).putAll((Object)PLAN_NODE_2, (Object[])new ConnectorSplit[]{this.createSplit(0), this.createSplit(1)}).build());
        this.testStageTaskSourceSuccess((ListMultimap<PlanFragmentId, ExchangeSourceHandle>)ImmutableListMultimap.of(), (Map<PlanFragmentId, PlanNodeId>)ImmutableMap.of(), (ListMultimap<PlanNodeId, ConnectorSplit>)ImmutableListMultimap.builder().putAll((Object)PLAN_NODE_1, (Object[])new ConnectorSplit[]{this.createSplit(0), this.createSplit(3), this.createSplit(4)}).putAll((Object)PLAN_NODE_2, (Object[])new ConnectorSplit[]{this.createSplit(0), this.createSplit(1)}).build());
        this.testStageTaskSourceSuccess((ListMultimap<PlanFragmentId, ExchangeSourceHandle>)ImmutableListMultimap.of((Object)FRAGMENT_1, (Object)this.createSourceHandle(1)), (Map<PlanFragmentId, PlanNodeId>)ImmutableMap.of((Object)FRAGMENT_1, (Object)PLAN_NODE_1), (ListMultimap<PlanNodeId, ConnectorSplit>)ImmutableListMultimap.of());
        this.testStageTaskSourceSuccess((ListMultimap<PlanFragmentId, ExchangeSourceHandle>)ImmutableListMultimap.builder().putAll((Object)FRAGMENT_1, (Object[])new ExchangeSourceHandle[]{this.createSourceHandle(1), this.createSourceHandle(1)}).build(), (Map<PlanFragmentId, PlanNodeId>)ImmutableMap.of((Object)FRAGMENT_1, (Object)PLAN_NODE_1), (ListMultimap<PlanNodeId, ConnectorSplit>)ImmutableListMultimap.of());
        this.testStageTaskSourceSuccess((ListMultimap<PlanFragmentId, ExchangeSourceHandle>)ImmutableListMultimap.builder().putAll((Object)FRAGMENT_1, (Object[])new ExchangeSourceHandle[]{this.createSourceHandle(1), this.createSourceHandle(1)}).build(), (Map<PlanFragmentId, PlanNodeId>)ImmutableMap.builder().put((Object)FRAGMENT_1, (Object)PLAN_NODE_1).put((Object)FRAGMENT_2, (Object)PLAN_NODE_2).buildOrThrow(), (ListMultimap<PlanNodeId, ConnectorSplit>)ImmutableListMultimap.of());
        this.testStageTaskSourceSuccess((ListMultimap<PlanFragmentId, ExchangeSourceHandle>)ImmutableListMultimap.builder().putAll((Object)FRAGMENT_1, (Object[])new ExchangeSourceHandle[]{this.createSourceHandle(1), this.createSourceHandle(1)}).putAll((Object)FRAGMENT_2, (Object[])new ExchangeSourceHandle[]{this.createSourceHandle(1), this.createSourceHandle(3)}).build(), (Map<PlanFragmentId, PlanNodeId>)ImmutableMap.builder().put((Object)FRAGMENT_1, (Object)PLAN_NODE_1).put((Object)FRAGMENT_2, (Object)PLAN_NODE_2).buildOrThrow(), (ListMultimap<PlanNodeId, ConnectorSplit>)ImmutableListMultimap.of());
        this.testStageTaskSourceSuccess((ListMultimap<PlanFragmentId, ExchangeSourceHandle>)ImmutableListMultimap.builder().putAll((Object)FRAGMENT_1, (Object[])new ExchangeSourceHandle[]{this.createSourceHandle(1), this.createSourceHandle(1)}).putAll((Object)FRAGMENT_2, (Object[])new ExchangeSourceHandle[]{this.createSourceHandle(1), this.createSourceHandle(3)}).putAll((Object)FRAGMENT_3, (Object[])new ExchangeSourceHandle[]{this.createSourceHandle(4)}).build(), (Map<PlanFragmentId, PlanNodeId>)ImmutableMap.builder().put((Object)FRAGMENT_1, (Object)PLAN_NODE_1).put((Object)FRAGMENT_2, (Object)PLAN_NODE_1).put((Object)FRAGMENT_3, (Object)PLAN_NODE_2).buildOrThrow(), (ListMultimap<PlanNodeId, ConnectorSplit>)ImmutableListMultimap.of());
        this.testStageTaskSourceSuccess((ListMultimap<PlanFragmentId, ExchangeSourceHandle>)ImmutableListMultimap.builder().putAll((Object)FRAGMENT_1, (Object[])new ExchangeSourceHandle[]{this.createSourceHandle(1), this.createSourceHandle(1)}).build(), (Map<PlanFragmentId, PlanNodeId>)ImmutableMap.of((Object)FRAGMENT_1, (Object)PLAN_NODE_1), (ListMultimap<PlanNodeId, ConnectorSplit>)ImmutableListMultimap.builder().putAll((Object)PLAN_NODE_3, (Object[])new ConnectorSplit[]{this.createSplit(0)}).putAll((Object)PLAN_NODE_4, (Object[])new ConnectorSplit[]{this.createSplit(0)}).build());
        this.testStageTaskSourceSuccess((ListMultimap<PlanFragmentId, ExchangeSourceHandle>)ImmutableListMultimap.builder().putAll((Object)FRAGMENT_1, (Object[])new ExchangeSourceHandle[]{this.createSourceHandle(1), this.createSourceHandle(1)}).putAll((Object)FRAGMENT_2, (Object[])new ExchangeSourceHandle[]{this.createSourceHandle(1), this.createSourceHandle(3)}).build(), (Map<PlanFragmentId, PlanNodeId>)ImmutableMap.builder().put((Object)FRAGMENT_1, (Object)PLAN_NODE_3).put((Object)FRAGMENT_2, (Object)PLAN_NODE_4).buildOrThrow(), (ListMultimap<PlanNodeId, ConnectorSplit>)ImmutableListMultimap.builder().putAll((Object)PLAN_NODE_1, (Object[])new ConnectorSplit[]{this.createSplit(0), this.createSplit(3), this.createSplit(4)}).putAll((Object)PLAN_NODE_2, (Object[])new ConnectorSplit[]{this.createSplit(0), this.createSplit(1)}).build());
        this.testStageTaskSourceSuccess((ListMultimap<PlanFragmentId, ExchangeSourceHandle>)ImmutableListMultimap.builder().putAll((Object)FRAGMENT_1, (Object[])new ExchangeSourceHandle[]{this.createSourceHandle(1), this.createSourceHandle(1)}).putAll((Object)FRAGMENT_2, (Object[])new ExchangeSourceHandle[]{this.createSourceHandle(1), this.createSourceHandle(3)}).putAll((Object)FRAGMENT_3, (Object[])new ExchangeSourceHandle[]{this.createSourceHandle(4)}).build(), (Map<PlanFragmentId, PlanNodeId>)ImmutableMap.builder().put((Object)FRAGMENT_1, (Object)PLAN_NODE_1).put((Object)FRAGMENT_2, (Object)PLAN_NODE_1).put((Object)FRAGMENT_3, (Object)PLAN_NODE_2).buildOrThrow(), (ListMultimap<PlanNodeId, ConnectorSplit>)ImmutableListMultimap.builder().putAll((Object)PLAN_NODE_3, (Object[])new ConnectorSplit[]{this.createSplit(0), this.createSplit(3), this.createSplit(4)}).putAll((Object)PLAN_NODE_4, (Object[])new ConnectorSplit[]{this.createSplit(0), this.createSplit(1)}).build());
    }

    @RepeatedTest(value=20)
    @Timeout(value=60L)
    public void stressTest() throws Exception {
        ImmutableSet allFragments = ImmutableSet.of((Object)FRAGMENT_1, (Object)FRAGMENT_2, (Object)FRAGMENT_3);
        ImmutableMap remoteSources = ImmutableMap.of((Object)FRAGMENT_1, (Object)PLAN_NODE_1, (Object)FRAGMENT_2, (Object)PLAN_NODE_1, (Object)FRAGMENT_3, (Object)PLAN_NODE_2);
        ImmutableSet splitSources = ImmutableSet.of((Object)PLAN_NODE_3, (Object)PLAN_NODE_4);
        ArrayListMultimap sourceHandles = ArrayListMultimap.create();
        for (PlanFragmentId fragmentId : allFragments) {
            int numberOfHandles = ThreadLocalRandom.current().nextInt(100);
            for (int i = 0; i < numberOfHandles; ++i) {
                int partition = ThreadLocalRandom.current().nextInt(10);
                sourceHandles.put((Object)fragmentId, (Object)this.createSourceHandle(partition));
            }
        }
        ArrayListMultimap splits = ArrayListMultimap.create();
        for (PlanNodeId planNodeId : splitSources) {
            int numberOfSplits = ThreadLocalRandom.current().nextInt(100);
            for (int i = 0; i < numberOfSplits; ++i) {
                int partition = ThreadLocalRandom.current().nextInt(10);
                splits.put((Object)planNodeId, (Object)this.createSplit(partition));
            }
        }
        this.testStageTaskSourceSuccess((ListMultimap<PlanFragmentId, ExchangeSourceHandle>)sourceHandles, (Map<PlanFragmentId, PlanNodeId>)remoteSources, (ListMultimap<PlanNodeId, ConnectorSplit>)splits);
    }

    private void testStageTaskSourceSuccess(ListMultimap<PlanFragmentId, ExchangeSourceHandle> sourceHandles, Map<PlanFragmentId, PlanNodeId> remoteSources, ListMultimap<PlanNodeId, ConnectorSplit> splits) throws Exception {
        List<TaskDescriptor> taskDescriptors;
        ArrayList handleSources = new ArrayList();
        HashMap exchanges = new HashMap();
        Multimaps.asMap(sourceHandles).forEach((fragmentId, handles) -> {
            TestingExchangeSourceHandleSource handleSource = new TestingExchangeSourceHandleSource((ScheduledExecutorService)this.executor, (List<ExchangeSourceHandle>)handles);
            handleSources.add(handleSource);
            exchanges.put(fragmentId, new TestingExchange(handleSource));
        });
        remoteSources.keySet().forEach(fragmentId -> {
            if (!exchanges.containsKey(fragmentId)) {
                TestingExchangeSourceHandleSource handleSource = new TestingExchangeSourceHandleSource((ScheduledExecutorService)this.executor, (List<ExchangeSourceHandle>)ImmutableList.of());
                handleSources.add(handleSource);
                exchanges.put(fragmentId, new TestingExchange(handleSource));
            }
        });
        HashMap splitSources = new HashMap();
        Multimaps.asMap(splits).forEach((planNodeId, connectorSplits) -> splitSources.put(planNodeId, new TestingSplitSource((ScheduledExecutorService)this.executor, (List<ConnectorSplit>)connectorSplits)));
        SplitAssignerTester tester = new SplitAssignerTester();
        int partitionCount = TestEventDrivenTaskSource.getPartitionCount(sourceHandles.values(), splits.values());
        FaultTolerantPartitioningScheme partitioningScheme = TestEventDrivenTaskSource.createPartitioningScheme(partitionCount);
        AtomicLong getSplitInvocations = new AtomicLong();
        ImmutableSet allSources = ImmutableSet.builder().addAll(remoteSources.values()).addAll((Iterable)splits.keySet()).build();
        TestingSplitAssigner testingSplitAssigner = new TestingSplitAssigner((Set<PlanNodeId>)allSources);
        try (EventDrivenTaskSource taskSource = new EventDrivenTaskSource(new QueryId("query"), new TableExecuteContextManager(), exchanges, (SetMultimap)remoteSources.entrySet().stream().collect(ImmutableSetMultimap.toImmutableSetMultimap(Map.Entry::getValue, Map.Entry::getKey)), () -> splitSources, (SplitAssigner)testingSplitAssigner, (Executor)this.executor, 1, 1L, partitioningScheme, getSplitDuration -> getSplitInvocations.incrementAndGet());){
            while (tester.getTaskDescriptors().isEmpty()) {
                SplitAssigner.AssignmentResult result = (SplitAssigner.AssignmentResult)taskSource.process().get(10L, TimeUnit.SECONDS);
                tester.update(result);
            }
            taskDescriptors = tester.getTaskDescriptors().get();
        }
        for (TestingExchangeSourceHandleSource handleSource : handleSources) {
            Assertions.assertThat((boolean)handleSource.isClosed()).isTrue();
        }
        for (SplitSource splitSource : splitSources.values()) {
            if (splitSource instanceof TestingSplitSource) {
                Iterator source = (TestingSplitSource)splitSource;
                Assertions.assertThat((boolean)((TestingSplitSource)((Object)source)).isClosed()).isTrue();
                continue;
            }
            Fail.fail((String)("unexpected split source: " + splitSource.getClass()));
        }
        ((ListAssert)Assertions.assertThat(taskDescriptors).isNotNull()).isNotEmpty();
        HashMap<Integer, SetMultimap> expectedHandles = new HashMap<Integer, SetMultimap>();
        HashMap<Integer, SetMultimap> expectedSplits = new HashMap<Integer, SetMultimap>();
        for (Map.Entry entry : sourceHandles.entries()) {
            TestingExchangeSourceHandle handle = (TestingExchangeSourceHandle)entry.getValue();
            PlanNodeId planNodeId2 = remoteSources.get(entry.getKey());
            expectedHandles.computeIfAbsent(handle.getPartitionId(), key -> HashMultimap.create()).put((Object)planNodeId2, (Object)handle);
        }
        for (Map.Entry entry : splits.entries()) {
            TestingConnectorSplit split = (TestingConnectorSplit)entry.getValue();
            expectedSplits.computeIfAbsent(split.getBucket().orElseThrow(), key -> HashMultimap.create()).put((Object)((PlanNodeId)entry.getKey()), (Object)split);
        }
        HashMap<Integer, SetMultimap> actualHandles = new HashMap<Integer, SetMultimap>();
        HashMap<Integer, SetMultimap> actualSplits = new HashMap<Integer, SetMultimap>();
        for (TaskDescriptor taskDescriptor : taskDescriptors) {
            int partitionId = taskDescriptor.getPartitionId();
            for (Map.Entry entry : taskDescriptor.getSplits().getSplitsFlat().entries()) {
                if (((Split)entry.getValue()).getCatalogHandle().equals((Object)ExchangeOperator.REMOTE_CATALOG_HANDLE)) {
                    RemoteSplit remoteSplit = (RemoteSplit)((Split)entry.getValue()).getConnectorSplit();
                    SpoolingExchangeInput input = (SpoolingExchangeInput)remoteSplit.getExchangeInput();
                    for (ExchangeSourceHandle handle : input.getExchangeSourceHandles()) {
                        Assertions.assertThat((int)handle.getPartitionId()).isEqualTo(partitionId);
                        actualHandles.computeIfAbsent(partitionId, key -> HashMultimap.create()).put((Object)((PlanNodeId)entry.getKey()), (Object)((TestingExchangeSourceHandle)handle));
                    }
                    continue;
                }
                TestingConnectorSplit split = (TestingConnectorSplit)((Split)entry.getValue()).getConnectorSplit();
                Assertions.assertThat((int)split.getBucket().orElseThrow()).isEqualTo(partitionId);
                actualSplits.computeIfAbsent(partitionId, key -> HashMultimap.create()).put((Object)((PlanNodeId)entry.getKey()), (Object)split);
            }
        }
        Assertions.assertThat(actualHandles).isEqualTo(expectedHandles);
        Assertions.assertThat(actualSplits).isEqualTo(expectedSplits);
    }

    private static FaultTolerantPartitioningScheme createPartitioningScheme(int partitionCount) {
        return new FaultTolerantPartitioningScheme(partitionCount, Optional.of(IntStream.range(0, partitionCount).toArray()), Optional.of(split -> ((TestingConnectorSplit)split.getConnectorSplit()).getBucket().orElseThrow()), Optional.empty());
    }

    private static int getPartitionCount(Collection<ExchangeSourceHandle> sourceHandles, Collection<ConnectorSplit> splits) {
        int maxPartitionId = sourceHandles.stream().mapToInt(ExchangeSourceHandle::getPartitionId).max().orElse(-1);
        maxPartitionId = Math.max(maxPartitionId, splits.stream().map(TestingConnectorSplit.class::cast).map(TestingConnectorSplit::getBucket).mapToInt(OptionalInt::orElseThrow).max().orElse(-1));
        return Math.max(maxPartitionId + 1, 1);
    }

    private TestingExchangeSourceHandle createSourceHandle(int partitionId) {
        return new TestingExchangeSourceHandle(this.nextId.getAndIncrement(), partitionId, 0L);
    }

    private TestingConnectorSplit createSplit(int partitionId) {
        return new TestingConnectorSplit(this.nextId.getAndIncrement(), OptionalInt.of(partitionId), Optional.empty());
    }

    private static class TestingSplitAssigner
    implements SplitAssigner {
        private final Set<PlanNodeId> allSources;
        private final Set<Integer> partitions = new HashSet<Integer>();
        private final Set<PlanNodeId> finishedSources = new HashSet<PlanNodeId>();
        private boolean finished;

        private TestingSplitAssigner(Set<PlanNodeId> allSources) {
            this.allSources = ImmutableSet.copyOf((Collection)Objects.requireNonNull(allSources, "allSources is null"));
        }

        public SplitAssigner.AssignmentResult assign(PlanNodeId planNodeId, ListMultimap<Integer, Split> splitsMap, boolean noMoreSplits) {
            Preconditions.checkState((!this.finished ? 1 : 0) != 0, (Object)"finished is set");
            SplitAssigner.AssignmentResult.Builder result = SplitAssigner.AssignmentResult.builder();
            Multimaps.asMap(splitsMap).forEach((partition, splits) -> {
                if (this.partitions.add((Integer)partition)) {
                    result.addPartition(new SplitAssigner.Partition(partition.intValue(), new NodeRequirements(Optional.empty(), (Set)ImmutableSet.of())));
                    for (PlanNodeId finishedSource : this.finishedSources) {
                        result.updatePartition(new SplitAssigner.PartitionUpdate(partition.intValue(), finishedSource, false, (ListMultimap)ImmutableListMultimap.of(), true));
                    }
                }
                ImmutableListMultimap partitionSplits = ImmutableListMultimap.builder().putAll(partition, (Iterable)splits).build();
                result.updatePartition(new SplitAssigner.PartitionUpdate(partition.intValue(), planNodeId, true, (ListMultimap)partitionSplits, noMoreSplits));
            });
            if (noMoreSplits) {
                this.finishedSources.add(planNodeId);
                for (Integer partition2 : this.partitions) {
                    result.updatePartition(new SplitAssigner.PartitionUpdate(partition2.intValue(), planNodeId, false, (ListMultimap)ImmutableListMultimap.of(), true));
                }
            }
            if (this.finishedSources.containsAll(this.allSources)) {
                this.partitions.forEach(arg_0 -> ((SplitAssigner.AssignmentResult.Builder)result).sealPartition(arg_0));
            }
            return result.build();
        }

        public SplitAssigner.AssignmentResult finish() {
            SplitAssigner.AssignmentResult.Builder result = SplitAssigner.AssignmentResult.builder();
            if (this.finished) {
                return result.build();
            }
            this.finished = true;
            Preconditions.checkState((boolean)this.finishedSources.containsAll(this.allSources));
            if (this.partitions.isEmpty()) {
                this.partitions.add(0);
                result.addPartition(new SplitAssigner.Partition(0, new NodeRequirements(Optional.empty(), (Set)ImmutableSet.of()))).sealPartition(0);
            }
            return result.setNoMorePartitions().build();
        }

        public boolean isFinished() {
            return this.finished;
        }
    }

    private static class TestingExchangeSourceHandleSource
    implements ExchangeSourceHandleSource {
        private final ScheduledExecutorService executor;
        @GuardedBy(value="this")
        private final Queue<ExchangeSourceHandle> remainingHandles;
        @GuardedBy(value="this")
        private CompletableFuture<ExchangeSourceHandleSource.ExchangeSourceHandleBatch> currentFuture;
        @GuardedBy(value="this")
        private boolean closed;

        private TestingExchangeSourceHandleSource(ScheduledExecutorService executor, List<ExchangeSourceHandle> handles) {
            this.executor = Objects.requireNonNull(executor, "executor is null");
            this.remainingHandles = new LinkedList<ExchangeSourceHandle>((Collection)Objects.requireNonNull(handles, "handles is null"));
        }

        public synchronized CompletableFuture<ExchangeSourceHandleSource.ExchangeSourceHandleBatch> getNextBatch() {
            Preconditions.checkState((!this.closed ? 1 : 0) != 0, (Object)"closed");
            Preconditions.checkState((this.currentFuture == null || this.currentFuture.isDone() ? 1 : 0) != 0, (Object)"currentFuture is still running");
            this.currentFuture = new CompletableFuture();
            long delay = ThreadLocalRandom.current().nextInt(3);
            if (delay == 0L) {
                this.setNextBatch();
            } else {
                this.executor.schedule(this::setNextBatch, delay, TimeUnit.MILLISECONDS);
            }
            return this.currentFuture;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        private void setNextBatch() {
            ExchangeSourceHandleSource.ExchangeSourceHandleBatch batch;
            CompletableFuture<ExchangeSourceHandleSource.ExchangeSourceHandleBatch> future;
            TestingExchangeSourceHandleSource testingExchangeSourceHandleSource = this;
            synchronized (testingExchangeSourceHandleSource) {
                future = this.currentFuture;
                ExchangeSourceHandle handle = this.remainingHandles.poll();
                boolean lastBatch = this.remainingHandles.isEmpty();
                batch = new ExchangeSourceHandleSource.ExchangeSourceHandleBatch((List)(handle == null ? ImmutableList.of() : ImmutableList.of((Object)handle)), lastBatch);
            }
            if (future != null) {
                future.complete(batch);
            }
        }

        public synchronized void close() {
            if (this.closed) {
                return;
            }
            this.closed = true;
            if (this.currentFuture != null) {
                this.currentFuture.cancel(true);
                this.currentFuture = null;
            }
            this.remainingHandles.clear();
        }

        public synchronized boolean isClosed() {
            return this.closed;
        }
    }

    private static class TestingSplitSource
    implements SplitSource {
        private final ScheduledExecutorService executor;
        @GuardedBy(value="this")
        private final Queue<ConnectorSplit> remainingSplits;
        @GuardedBy(value="this")
        private SettableFuture<SplitSource.SplitBatch> currentFuture;
        @GuardedBy(value="this")
        private boolean finished;
        @GuardedBy(value="this")
        private boolean closed;

        public TestingSplitSource(ScheduledExecutorService executor, List<ConnectorSplit> splits) {
            this.executor = Objects.requireNonNull(executor, "executor is null");
            this.remainingSplits = new LinkedList<ConnectorSplit>(splits);
        }

        public CatalogHandle getCatalogHandle() {
            return TestingHandles.TEST_CATALOG_HANDLE;
        }

        public synchronized ListenableFuture<SplitSource.SplitBatch> getNextBatch(int maxSize) {
            Preconditions.checkState((!this.closed ? 1 : 0) != 0, (Object)"closed");
            Preconditions.checkState((this.currentFuture == null || this.currentFuture.isDone() ? 1 : 0) != 0, (Object)"currentFuture is still running");
            this.currentFuture = SettableFuture.create();
            long delay = ThreadLocalRandom.current().nextInt(3);
            if (delay == 0L) {
                this.setNextBatch();
            } else {
                this.executor.schedule(this::setNextBatch, delay, TimeUnit.MILLISECONDS);
            }
            return this.currentFuture;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        private void setNextBatch() {
            SplitSource.SplitBatch batch;
            SettableFuture<SplitSource.SplitBatch> future;
            TestingSplitSource testingSplitSource = this;
            synchronized (testingSplitSource) {
                future = this.currentFuture;
                ConnectorSplit split = this.remainingSplits.poll();
                boolean lastBatch = this.remainingSplits.isEmpty();
                batch = new SplitSource.SplitBatch((List)(split == null ? ImmutableList.of() : ImmutableList.of((Object)new Split(TestingHandles.TEST_CATALOG_HANDLE, split))), lastBatch);
                if (lastBatch) {
                    this.finished = true;
                }
            }
            if (future != null) {
                future.set((Object)batch);
            }
        }

        public synchronized void close() {
            if (this.closed) {
                return;
            }
            this.closed = true;
            if (this.currentFuture != null) {
                this.currentFuture.cancel(true);
                this.currentFuture = null;
            }
            this.remainingSplits.clear();
        }

        public synchronized boolean isFinished() {
            return this.finished || this.closed;
        }

        public Optional<List<Object>> getTableExecuteSplitsInfo() {
            return Optional.empty();
        }

        public synchronized boolean isClosed() {
            return this.closed;
        }
    }

    private static class TestingExchange
    implements Exchange {
        @GuardedBy(value="this")
        private ExchangeSourceHandleSource exchangeSourceHandleSource;
        @GuardedBy(value="this")
        private boolean closed;

        public TestingExchange(ExchangeSourceHandleSource exchangeSourceHandleSource) {
            this.exchangeSourceHandleSource = Objects.requireNonNull(exchangeSourceHandleSource, "exchangeSourceHandleSource is null");
        }

        public ExchangeId getId() {
            throw new UnsupportedOperationException();
        }

        public ExchangeSinkHandle addSink(int taskPartitionId) {
            throw new UnsupportedOperationException();
        }

        public void noMoreSinks() {
            throw new UnsupportedOperationException();
        }

        public CompletableFuture<ExchangeSinkInstanceHandle> instantiateSink(ExchangeSinkHandle sinkHandle, int taskAttemptId) {
            throw new UnsupportedOperationException();
        }

        public CompletableFuture<ExchangeSinkInstanceHandle> updateSinkInstanceHandle(ExchangeSinkHandle sinkHandle, int taskAttemptId) {
            throw new UnsupportedOperationException();
        }

        public void sinkFinished(ExchangeSinkHandle sinkHandle, int taskAttemptId) {
            throw new UnsupportedOperationException();
        }

        public void allRequiredSinksFinished() {
            throw new UnsupportedOperationException();
        }

        public synchronized ExchangeSourceHandleSource getSourceHandles() {
            Preconditions.checkState((!this.closed ? 1 : 0) != 0, (Object)"already closed");
            Preconditions.checkState((this.exchangeSourceHandleSource != null ? 1 : 0) != 0, (Object)"already retrieved");
            ExchangeSourceHandleSource result = this.exchangeSourceHandleSource;
            this.exchangeSourceHandleSource = null;
            return result;
        }

        public synchronized void close() {
            this.closed = true;
        }
    }
}

