/*
 * Decompiled with CFR 0.152.
 */
package io.trino.execution.scheduler;

import com.google.common.base.Preconditions;
import com.google.common.base.VerifyException;
import com.google.common.collect.ArrayListMultimap;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableListMultimap;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableMultimap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
import com.google.common.collect.Multimap;
import com.google.common.collect.Sets;
import com.google.common.util.concurrent.ListenableFuture;
import io.airlift.concurrent.MoreFutures;
import io.airlift.log.Logger;
import io.airlift.units.DataSize;
import io.trino.Session;
import io.trino.SystemSessionProperties;
import io.trino.connector.CatalogName;
import io.trino.execution.Lifespan;
import io.trino.execution.QueryManagerConfig;
import io.trino.execution.TableExecuteContext;
import io.trino.execution.TableExecuteContextManager;
import io.trino.execution.scheduler.BucketNodeMap;
import io.trino.execution.scheduler.NodeRequirements;
import io.trino.execution.scheduler.TaskDescriptor;
import io.trino.execution.scheduler.TaskSource;
import io.trino.execution.scheduler.TaskSourceFactory;
import io.trino.metadata.Split;
import io.trino.spi.HostAddress;
import io.trino.spi.QueryId;
import io.trino.spi.connector.NotPartitionedPartitionHandle;
import io.trino.spi.exchange.Exchange;
import io.trino.spi.exchange.ExchangeSourceHandle;
import io.trino.spi.exchange.ExchangeSourceSplitter;
import io.trino.spi.exchange.ExchangeSourceStatistics;
import io.trino.split.SplitSource;
import io.trino.sql.planner.PartitioningHandle;
import io.trino.sql.planner.PlanFragment;
import io.trino.sql.planner.SplitSourceFactory;
import io.trino.sql.planner.SystemPartitioningHandle;
import io.trino.sql.planner.plan.ExchangeNode;
import io.trino.sql.planner.plan.PlanFragmentId;
import io.trino.sql.planner.plan.PlanNodeId;
import io.trino.sql.planner.plan.RemoteSourceNode;
import java.util.ArrayDeque;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Queue;
import java.util.Set;
import java.util.function.LongConsumer;
import javax.inject.Inject;

public class StageTaskSourceFactory
implements TaskSourceFactory {
    private static final Logger log = Logger.get(StageTaskSourceFactory.class);
    private final SplitSourceFactory splitSourceFactory;
    private final TableExecuteContextManager tableExecuteContextManager;
    private final int splitBatchSize;

    @Inject
    public StageTaskSourceFactory(SplitSourceFactory splitSourceFactory, TableExecuteContextManager tableExecuteContextManager, QueryManagerConfig queryManagerConfig) {
        this(splitSourceFactory, tableExecuteContextManager, Objects.requireNonNull(queryManagerConfig, "queryManagerConfig is null").getScheduleSplitBatchSize());
    }

    public StageTaskSourceFactory(SplitSourceFactory splitSourceFactory, TableExecuteContextManager tableExecuteContextManager, int splitBatchSize) {
        this.splitSourceFactory = Objects.requireNonNull(splitSourceFactory, "splitSourceFactory is null");
        this.tableExecuteContextManager = Objects.requireNonNull(tableExecuteContextManager, "tableExecuteContextManager is null");
        this.splitBatchSize = splitBatchSize;
    }

    @Override
    public TaskSource create(Session session, PlanFragment fragment, Map<PlanFragmentId, Exchange> sourceExchanges, Multimap<PlanFragmentId, ExchangeSourceHandle> exchangeSourceHandles, LongConsumer getSplitTimeRecorder, Optional<int[]> bucketToPartitionMap, Optional<BucketNodeMap> bucketNodeMap) {
        PartitioningHandle partitioning = fragment.getPartitioning();
        if (partitioning.equals(SystemPartitioningHandle.SINGLE_DISTRIBUTION)) {
            return SingleDistributionTaskSource.create(fragment, exchangeSourceHandles);
        }
        if (partitioning.equals(SystemPartitioningHandle.FIXED_ARBITRARY_DISTRIBUTION) || partitioning.equals(SystemPartitioningHandle.SCALED_WRITER_DISTRIBUTION)) {
            return ArbitraryDistributionTaskSource.create(fragment, sourceExchanges, exchangeSourceHandles, SystemSessionProperties.getFaultTolerantExecutionTargetTaskInputSize(session));
        }
        if (partitioning.equals(SystemPartitioningHandle.FIXED_HASH_DISTRIBUTION) || partitioning.getConnectorId().isPresent()) {
            return HashDistributionTaskSource.create(session, fragment, this.splitSourceFactory, exchangeSourceHandles, this.splitBatchSize, getSplitTimeRecorder, bucketToPartitionMap.orElseThrow(() -> new IllegalArgumentException("bucketToPartitionMap is expected to be present for hash distributed stages")), bucketNodeMap);
        }
        if (partitioning.equals(SystemPartitioningHandle.SOURCE_DISTRIBUTION)) {
            return SourceDistributionTaskSource.create(session, fragment, this.splitSourceFactory, exchangeSourceHandles, this.tableExecuteContextManager, this.splitBatchSize, getSplitTimeRecorder, SystemSessionProperties.getFaultTolerantExecutionTargetTaskSplitCount(session));
        }
        throw new IllegalArgumentException("Unexpected partitioning: " + partitioning);
    }

    private static Multimap<PlanNodeId, ExchangeSourceHandle> getReplicatedExchangeSourceHandles(PlanFragment fragment, Multimap<PlanFragmentId, ExchangeSourceHandle> handles) {
        return StageTaskSourceFactory.getInputsForRemoteSources((List)fragment.getRemoteSourceNodes().stream().filter(remoteSource -> remoteSource.getExchangeType() == ExchangeNode.Type.REPLICATE).collect(ImmutableList.toImmutableList()), handles);
    }

    private static Multimap<PlanNodeId, ExchangeSourceHandle> getPartitionedExchangeSourceHandles(PlanFragment fragment, Multimap<PlanFragmentId, ExchangeSourceHandle> handles) {
        return StageTaskSourceFactory.getInputsForRemoteSources((List)fragment.getRemoteSourceNodes().stream().filter(remoteSource -> remoteSource.getExchangeType() != ExchangeNode.Type.REPLICATE).collect(ImmutableList.toImmutableList()), handles);
    }

    private static Map<PlanFragmentId, PlanNodeId> getSourceFragmentToRemoteSourceNodeIdMap(List<RemoteSourceNode> remoteSourceNodes) {
        ImmutableMap.Builder result = ImmutableMap.builder();
        for (RemoteSourceNode node : remoteSourceNodes) {
            for (PlanFragmentId sourceFragmentId : node.getSourceFragmentIds()) {
                result.put((Object)sourceFragmentId, (Object)node.getId());
            }
        }
        return result.build();
    }

    private static Multimap<PlanNodeId, ExchangeSourceHandle> getInputsForRemoteSources(List<RemoteSourceNode> remoteSources, Multimap<PlanFragmentId, ExchangeSourceHandle> exchangeSourceHandles) {
        ImmutableListMultimap.Builder result = ImmutableListMultimap.builder();
        for (RemoteSourceNode remoteSource : remoteSources) {
            for (PlanFragmentId fragmentId : remoteSource.getSourceFragmentIds()) {
                Collection handles = Objects.requireNonNull(exchangeSourceHandles.get((Object)fragmentId), () -> "exchange source handle is missing for fragment: " + fragmentId);
                if (remoteSource.getExchangeType() == ExchangeNode.Type.GATHER || remoteSource.getExchangeType() == ExchangeNode.Type.REPLICATE) {
                    Preconditions.checkArgument((handles.size() <= 1 ? 1 : 0) != 0, (String)"at most 1 exchange source handle is expected, got: %s", (Object)handles);
                }
                result.putAll((Object)remoteSource.getId(), (Iterable)handles);
            }
        }
        return result.build();
    }

    public static class SourceDistributionTaskSource
    implements TaskSource {
        private final QueryId queryId;
        private final PlanNodeId partitionedSourceNodeId;
        private final TableExecuteContextManager tableExecuteContextManager;
        private final SplitSource splitSource;
        private final Multimap<PlanNodeId, ExchangeSourceHandle> replicatedExchangeSourceHandles;
        private final int splitBatchSize;
        private final LongConsumer getSplitTimeRecorder;
        private final Optional<CatalogName> catalogRequirement;
        private final int targetPartitionSplitCount;
        private final Queue<Split> remotelyAccessibleSplitBuffer = new ArrayDeque<Split>();
        private final Map<HostAddress, Set<Split>> locallyAccessibleSplitBuffer = new HashMap<HostAddress, Set<Split>>();
        private int currentPartitionId;
        private boolean finished;
        private boolean closed;

        public static SourceDistributionTaskSource create(Session session, PlanFragment fragment, SplitSourceFactory splitSourceFactory, Multimap<PlanFragmentId, ExchangeSourceHandle> exchangeSourceHandles, TableExecuteContextManager tableExecuteContextManager, int splitBatchSize, LongConsumer getSplitTimeRecorder, int targetPartitionSplitCount) {
            Preconditions.checkArgument((fragment.getPartitionedSources().size() == 1 ? 1 : 0) != 0, (String)"single partitioned source is expected, got: %s", fragment.getPartitionedSources());
            List<RemoteSourceNode> remoteSourceNodes = fragment.getRemoteSourceNodes();
            Preconditions.checkArgument((boolean)remoteSourceNodes.stream().allMatch(node -> node.getExchangeType() == ExchangeNode.Type.REPLICATE), (String)"only replicated exchanges are expected in source distributed stage, got: %s", remoteSourceNodes);
            PlanNodeId partitionedSourceNodeId = (PlanNodeId)Iterables.getOnlyElement(fragment.getPartitionedSources());
            Map<PlanNodeId, SplitSource> splitSources = splitSourceFactory.createSplitSources(session, fragment);
            SplitSource splitSource = splitSources.get(partitionedSourceNodeId);
            Optional<CatalogName> catalogName = Optional.of(splitSource.getCatalogName()).filter(catalog -> !CatalogName.isInternalSystemConnector(catalog));
            return new SourceDistributionTaskSource(session.getQueryId(), partitionedSourceNodeId, tableExecuteContextManager, splitSource, StageTaskSourceFactory.getReplicatedExchangeSourceHandles(fragment, exchangeSourceHandles), splitBatchSize, getSplitTimeRecorder, catalogName, targetPartitionSplitCount);
        }

        public SourceDistributionTaskSource(QueryId queryId, PlanNodeId partitionedSourceNodeId, TableExecuteContextManager tableExecuteContextManager, SplitSource splitSource, Multimap<PlanNodeId, ExchangeSourceHandle> replicatedExchangeSourceHandles, int splitBatchSize, LongConsumer getSplitTimeRecorder, Optional<CatalogName> catalogRequirement, int targetPartitionSplitCount) {
            this.queryId = Objects.requireNonNull(queryId, "queryId is null");
            this.partitionedSourceNodeId = Objects.requireNonNull(partitionedSourceNodeId, "partitionedSourceNodeId is null");
            this.tableExecuteContextManager = Objects.requireNonNull(tableExecuteContextManager, "tableExecuteContextManager is null");
            this.splitSource = Objects.requireNonNull(splitSource, "splitSource is null");
            this.replicatedExchangeSourceHandles = ImmutableListMultimap.copyOf(Objects.requireNonNull(replicatedExchangeSourceHandles, "replicatedExchangeSourceHandles is null"));
            this.splitBatchSize = splitBatchSize;
            this.getSplitTimeRecorder = Objects.requireNonNull(getSplitTimeRecorder, "getSplitTimeRecorder is null");
            this.catalogRequirement = Objects.requireNonNull(catalogRequirement, "catalogRequirement is null");
            Preconditions.checkArgument((targetPartitionSplitCount > 0 ? 1 : 0) != 0, (String)"targetPartitionSplitCount must be positive: %s", (int)targetPartitionSplitCount);
            this.targetPartitionSplitCount = targetPartitionSplitCount;
        }

        /*
         * Exception decompiling
         */
        @Override
        public List<TaskDescriptor> getMoreTasks() {
            /*
             * This method has failed to decompile.  When submitting a bug report, please provide this stack trace, and (if you hold appropriate legal rights) the relevant class file.
             * 
             * org.benf.cfr.reader.util.ConfusedCFRException: Tried to end blocks [0[DOLOOP]], but top level block is 4[UNCONDITIONALDOLOOP]
             *     at org.benf.cfr.reader.bytecode.analysis.opgraph.Op04StructuredStatement.processEndingBlocks(Op04StructuredStatement.java:435)
             *     at org.benf.cfr.reader.bytecode.analysis.opgraph.Op04StructuredStatement.buildNestedBlocks(Op04StructuredStatement.java:484)
             *     at org.benf.cfr.reader.bytecode.analysis.opgraph.Op03SimpleStatement.createInitialStructuredBlock(Op03SimpleStatement.java:736)
             *     at org.benf.cfr.reader.bytecode.CodeAnalyser.getAnalysisInner(CodeAnalyser.java:850)
             *     at org.benf.cfr.reader.bytecode.CodeAnalyser.getAnalysisOrWrapFail(CodeAnalyser.java:278)
             *     at org.benf.cfr.reader.bytecode.CodeAnalyser.getAnalysis(CodeAnalyser.java:201)
             *     at org.benf.cfr.reader.entities.attributes.AttributeCode.analyse(AttributeCode.java:94)
             *     at org.benf.cfr.reader.entities.Method.analyse(Method.java:531)
             *     at org.benf.cfr.reader.entities.ClassFile.analyseMid(ClassFile.java:1055)
             *     at org.benf.cfr.reader.entities.ClassFile.analyseInnerClassesPass1(ClassFile.java:923)
             *     at org.benf.cfr.reader.entities.ClassFile.analyseMid(ClassFile.java:1035)
             *     at org.benf.cfr.reader.entities.ClassFile.analyseTop(ClassFile.java:942)
             *     at org.benf.cfr.reader.Driver.doJarVersionTypes(Driver.java:257)
             *     at org.benf.cfr.reader.Driver.doJar(Driver.java:139)
             *     at org.benf.cfr.reader.CfrDriverImpl.analyse(CfrDriverImpl.java:76)
             *     at org.benf.cfr.reader.Main.main(Main.java:54)
             */
            throw new IllegalStateException("Decompilation failed");
        }

        private static <T> List<T> removeN(Collection<T> collection, int n) {
            ImmutableList.Builder result = ImmutableList.builder();
            Iterator<T> iterator = collection.iterator();
            for (int i = 0; i < n && iterator.hasNext(); ++i) {
                T item = iterator.next();
                iterator.remove();
                result.add(item);
            }
            return result.build();
        }

        @Override
        public boolean isFinished() {
            return this.finished;
        }

        @Override
        public void close() {
            if (this.closed) {
                return;
            }
            this.closed = true;
            this.splitSource.close();
        }

        private static /* synthetic */ void lambda$getMoreTasks$6(List splits, Set otherHostSplits) {
            splits.forEach(otherHostSplits::remove);
        }

        private /* synthetic */ void lambda$getMoreTasks$5(List info) {
            TableExecuteContext tableExecuteContext = this.tableExecuteContextManager.getTableExecuteContextForQuery(this.queryId);
            tableExecuteContext.setSplitsInfo(info);
        }

        private static /* synthetic */ Set lambda$getMoreTasks$4(HostAddress key) {
            return Sets.newIdentityHashSet();
        }

        private /* synthetic */ void lambda$getMoreTasks$3(long start) {
            this.getSplitTimeRecorder.accept(start);
        }

        private static /* synthetic */ void lambda$getMoreTasks$2(List splits, Set otherHostSplits) {
            splits.forEach(otherHostSplits::remove);
        }
    }

    public static class HashDistributionTaskSource
    implements TaskSource {
        private final Map<PlanNodeId, SplitSource> splitSources;
        private final Multimap<PlanNodeId, ExchangeSourceHandle> partitionedExchangeSourceHandles;
        private final Multimap<PlanNodeId, ExchangeSourceHandle> replicatedExchangeSourceHandles;
        private final int splitBatchSize;
        private final LongConsumer getSplitTimeRecorder;
        private final int[] bucketToPartitionMap;
        private final Optional<BucketNodeMap> bucketNodeMap;
        private final Optional<CatalogName> catalogRequirement;
        private boolean finished;
        private boolean closed;

        public static HashDistributionTaskSource create(Session session, PlanFragment fragment, SplitSourceFactory splitSourceFactory, Multimap<PlanFragmentId, ExchangeSourceHandle> exchangeSourceHandles, int splitBatchSize, LongConsumer getSplitTimeRecorder, int[] bucketToPartitionMap, Optional<BucketNodeMap> bucketNodeMap) {
            Preconditions.checkArgument((bucketNodeMap.isPresent() || fragment.getPartitionedSources().isEmpty() ? 1 : 0) != 0, (Object)"bucketNodeMap is expected to be set when the fragment reads partitioned sources (tables)");
            Map<PlanNodeId, SplitSource> splitSources = splitSourceFactory.createSplitSources(session, fragment);
            return new HashDistributionTaskSource(splitSources, StageTaskSourceFactory.getPartitionedExchangeSourceHandles(fragment, exchangeSourceHandles), StageTaskSourceFactory.getReplicatedExchangeSourceHandles(fragment, exchangeSourceHandles), splitBatchSize, getSplitTimeRecorder, bucketToPartitionMap, bucketNodeMap, fragment.getPartitioning().getConnectorId());
        }

        public HashDistributionTaskSource(Map<PlanNodeId, SplitSource> splitSources, Multimap<PlanNodeId, ExchangeSourceHandle> partitionedExchangeSourceHandles, Multimap<PlanNodeId, ExchangeSourceHandle> replicatedExchangeSourceHandles, int splitBatchSize, LongConsumer getSplitTimeRecorder, int[] bucketToPartitionMap, Optional<BucketNodeMap> bucketNodeMap, Optional<CatalogName> catalogRequirement) {
            this.splitSources = ImmutableMap.copyOf(Objects.requireNonNull(splitSources, "splitSources is null"));
            this.partitionedExchangeSourceHandles = ImmutableListMultimap.copyOf(Objects.requireNonNull(partitionedExchangeSourceHandles, "partitionedExchangeSourceHandles is null"));
            this.replicatedExchangeSourceHandles = ImmutableListMultimap.copyOf(Objects.requireNonNull(replicatedExchangeSourceHandles, "replicatedExchangeSourceHandles is null"));
            this.splitBatchSize = splitBatchSize;
            this.getSplitTimeRecorder = Objects.requireNonNull(getSplitTimeRecorder, "getSplitTimeRecorder is null");
            this.bucketToPartitionMap = Objects.requireNonNull(bucketToPartitionMap, "bucketToPartitionMap is null");
            this.bucketNodeMap = Objects.requireNonNull(bucketNodeMap, "bucketNodeMap is null");
            Preconditions.checkArgument((bucketNodeMap.isPresent() || splitSources.isEmpty() ? 1 : 0) != 0, (Object)"bucketNodeMap is expected to be set when the fragment reads partitioned sources (tables)");
            this.catalogRequirement = Objects.requireNonNull(catalogRequirement, "catalogRequirement is null");
        }

        /*
         * WARNING - void declaration
         */
        @Override
        public List<TaskDescriptor> getMoreTasks() {
            if (this.finished || this.closed) {
                return ImmutableList.of();
            }
            HashMap<Integer, Object> partitionToSplitsMap = new HashMap<Integer, Object>();
            HashMap<Integer, HostAddress> partitionToNodeMap = new HashMap<Integer, HostAddress>();
            block0: for (Map.Entry<PlanNodeId, SplitSource> entry : this.splitSources.entrySet()) {
                SplitSource splitSource = entry.getValue();
                BucketNodeMap bucketNodeMap = this.bucketNodeMap.orElseThrow(() -> new VerifyException("bucket to node map is expected to be present"));
                while (!splitSource.isFinished()) {
                    ListenableFuture<SplitSource.SplitBatch> splitBatchFuture = splitSource.getNextBatch(NotPartitionedPartitionHandle.NOT_PARTITIONED, Lifespan.taskWide(), this.splitBatchSize);
                    long start = System.nanoTime();
                    MoreFutures.addSuccessCallback(splitBatchFuture, () -> this.getSplitTimeRecorder.accept(start));
                    SplitSource.SplitBatch splitBatch = (SplitSource.SplitBatch)MoreFutures.getFutureValue(splitBatchFuture);
                    for (Split split : splitBatch.getSplits()) {
                        int bucket = bucketNodeMap.getBucket(split);
                        int partition = this.getPartitionForBucket(bucket);
                        if (!bucketNodeMap.isDynamic()) {
                            HostAddress existingValue = partitionToNodeMap.put(partition, bucketNodeMap.getAssignedNode(split).get().getHostAndPort());
                            Preconditions.checkState((existingValue == null ? 1 : 0) != 0, (String)"host already assigned for partition %s: %s", (int)partition, (Object)existingValue);
                        }
                        Multimap partitionSplits = partitionToSplitsMap.computeIfAbsent(partition, p -> ArrayListMultimap.create());
                        partitionSplits.put((Object)entry.getKey(), (Object)split);
                    }
                    if (!splitBatch.isLastBatch()) continue;
                    splitSource.close();
                    continue block0;
                }
            }
            HashMap<Integer, Object> partitionToExchangeSourceHandlesMap = new HashMap<Integer, Object>();
            for (Map.Entry entry : this.partitionedExchangeSourceHandles.entries()) {
                PlanNodeId planNodeId = (PlanNodeId)entry.getKey();
                ExchangeSourceHandle handle = (ExchangeSourceHandle)entry.getValue();
                int partition = handle.getPartitionId();
                Multimap partitionSourceHandles = partitionToExchangeSourceHandlesMap.computeIfAbsent(partition, p -> ArrayListMultimap.create());
                partitionSourceHandles.put((Object)planNodeId, (Object)handle);
            }
            boolean bl = false;
            ImmutableList.Builder result = ImmutableList.builder();
            for (Integer partition : Sets.union(partitionToSplitsMap.keySet(), partitionToExchangeSourceHandlesMap.keySet())) {
                void var4_7;
                Multimap splits = (Multimap)partitionToSplitsMap.getOrDefault(partition, ImmutableMultimap.of());
                ImmutableListMultimap exchangeSourceHandles = ImmutableListMultimap.builder().putAll((Multimap)partitionToExchangeSourceHandlesMap.getOrDefault(partition, ImmutableMultimap.of())).putAll(this.replicatedExchangeSourceHandles).build();
                HostAddress host = (HostAddress)partitionToNodeMap.get(partition);
                ImmutableSet hostRequirement = host == null ? ImmutableSet.of() : ImmutableSet.of((Object)host);
                result.add((Object)new TaskDescriptor((int)(++var4_7), (Multimap<PlanNodeId, Split>)splits, (Multimap<PlanNodeId, ExchangeSourceHandle>)exchangeSourceHandles, new NodeRequirements(this.catalogRequirement, (Set<HostAddress>)hostRequirement)));
            }
            this.finished = true;
            return result.build();
        }

        private int getPartitionForBucket(int bucket) {
            return this.bucketToPartitionMap[bucket];
        }

        @Override
        public boolean isFinished() {
            return this.finished;
        }

        @Override
        public void close() {
            if (this.closed) {
                return;
            }
            this.closed = true;
            for (SplitSource splitSource : this.splitSources.values()) {
                try {
                    splitSource.close();
                }
                catch (RuntimeException e) {
                    log.error((Throwable)e, "Error closing split source");
                }
            }
        }
    }

    public static class ArbitraryDistributionTaskSource
    implements TaskSource {
        private final Map<PlanFragmentId, PlanNodeId> sourceFragmentToRemoteSourceNodeIdMap;
        private final Map<PlanFragmentId, Exchange> sourceExchanges;
        private final Multimap<PlanFragmentId, ExchangeSourceHandle> exchangeSourceHandles;
        private final long targetPartitionSizeInBytes;
        private boolean finished;

        public static ArbitraryDistributionTaskSource create(PlanFragment fragment, Map<PlanFragmentId, Exchange> sourceExchanges, Multimap<PlanFragmentId, ExchangeSourceHandle> exchangeSourceHandles, DataSize targetPartitionSize) {
            Preconditions.checkArgument((boolean)fragment.getPartitionedSources().isEmpty(), (String)"no partitioned sources (table scans) expected, got: %s", fragment.getPartitionedSources());
            Preconditions.checkArgument((boolean)fragment.getRemoteSourceNodes().stream().noneMatch(node -> node.getExchangeType() == ExchangeNode.Type.REPLICATE), (String)"replicated exchanges are not expected in source distributed stage, got: %s", fragment.getRemoteSourceNodes());
            return new ArbitraryDistributionTaskSource(StageTaskSourceFactory.getSourceFragmentToRemoteSourceNodeIdMap(fragment.getRemoteSourceNodes()), sourceExchanges, exchangeSourceHandles, targetPartitionSize);
        }

        public ArbitraryDistributionTaskSource(Map<PlanFragmentId, PlanNodeId> sourceFragmentToRemoteSourceNodeIdMap, Map<PlanFragmentId, Exchange> sourceExchanges, Multimap<PlanFragmentId, ExchangeSourceHandle> exchangeSourceHandles, DataSize targetPartitionSize) {
            this.sourceFragmentToRemoteSourceNodeIdMap = ImmutableMap.copyOf(Objects.requireNonNull(sourceFragmentToRemoteSourceNodeIdMap, "sourceFragmentToRemoteSourceNodeIdMap is null"));
            this.sourceExchanges = ImmutableMap.copyOf(Objects.requireNonNull(sourceExchanges, "sourceExchanges is null"));
            Preconditions.checkArgument((boolean)sourceFragmentToRemoteSourceNodeIdMap.keySet().equals(sourceExchanges.keySet()), (String)"sourceFragmentToRemoteSourceNodeIdMap and sourceExchanges are expected to have the same set of keys: %s != %s", sourceFragmentToRemoteSourceNodeIdMap.keySet(), sourceExchanges.keySet());
            this.exchangeSourceHandles = ImmutableListMultimap.copyOf(Objects.requireNonNull(exchangeSourceHandles, "exchangeSourceHandles is null"));
            Preconditions.checkArgument((boolean)sourceExchanges.keySet().containsAll(exchangeSourceHandles.keySet()), (String)"Unexpected keys in exchangeSourceHandles map: %s; allowed keys: %s", (Object)exchangeSourceHandles.keySet(), sourceExchanges.keySet());
            this.targetPartitionSizeInBytes = Objects.requireNonNull(targetPartitionSize, "targetPartitionSize is null").toBytes();
        }

        @Override
        public List<TaskDescriptor> getMoreTasks() {
            NodeRequirements nodeRequirements = new NodeRequirements(Optional.empty(), (Set<HostAddress>)ImmutableSet.of());
            ImmutableList.Builder result = ImmutableList.builder();
            int currentPartitionId = 0;
            ImmutableListMultimap.Builder assignedExchangeSourceHandles = ImmutableListMultimap.builder();
            long assignedExchangeDataSize = 0L;
            int assignedExchangeSourceHandleCount = 0;
            for (Map.Entry entry : this.exchangeSourceHandles.entries()) {
                PlanFragmentId sourceFragmentId = (PlanFragmentId)entry.getKey();
                PlanNodeId remoteSourcePlanNodeId = this.sourceFragmentToRemoteSourceNodeIdMap.get(sourceFragmentId);
                ExchangeSourceHandle originalExchangeSourceHandle = (ExchangeSourceHandle)entry.getValue();
                Exchange sourceExchange = this.sourceExchanges.get(sourceFragmentId);
                ExchangeSourceSplitter splitter = sourceExchange.split(originalExchangeSourceHandle, this.targetPartitionSizeInBytes);
                ImmutableList.Builder sourceHandles = ImmutableList.builder();
                while (true) {
                    Preconditions.checkState((boolean)splitter.isBlocked().isDone(), (Object)"not supported");
                    Optional next = splitter.getNext();
                    if (next.isEmpty()) break;
                    sourceHandles.add((Object)((ExchangeSourceHandle)next.get()));
                }
                for (ExchangeSourceHandle handle : sourceHandles.build()) {
                    ExchangeSourceStatistics statistics = sourceExchange.getExchangeSourceStatistics(handle);
                    if (assignedExchangeDataSize != 0L && assignedExchangeDataSize + statistics.getSizeInBytes() > this.targetPartitionSizeInBytes) {
                        result.add((Object)new TaskDescriptor(currentPartitionId++, (Multimap<PlanNodeId, Split>)ImmutableListMultimap.of(), (Multimap<PlanNodeId, ExchangeSourceHandle>)assignedExchangeSourceHandles.build(), nodeRequirements));
                        assignedExchangeSourceHandles = ImmutableListMultimap.builder();
                        assignedExchangeDataSize = 0L;
                        assignedExchangeSourceHandleCount = 0;
                    }
                    assignedExchangeSourceHandles.put((Object)remoteSourcePlanNodeId, (Object)handle);
                    assignedExchangeDataSize += statistics.getSizeInBytes();
                    ++assignedExchangeSourceHandleCount;
                }
            }
            if (assignedExchangeSourceHandleCount > 0) {
                result.add((Object)new TaskDescriptor(currentPartitionId, (Multimap<PlanNodeId, Split>)ImmutableListMultimap.of(), (Multimap<PlanNodeId, ExchangeSourceHandle>)assignedExchangeSourceHandles.build(), nodeRequirements));
            }
            this.finished = true;
            return result.build();
        }

        @Override
        public boolean isFinished() {
            return this.finished;
        }

        @Override
        public void close() {
        }
    }

    public static class SingleDistributionTaskSource
    implements TaskSource {
        private final Multimap<PlanNodeId, ExchangeSourceHandle> exchangeSourceHandles;
        private boolean finished;

        public static SingleDistributionTaskSource create(PlanFragment fragment, Multimap<PlanFragmentId, ExchangeSourceHandle> exchangeSourceHandles) {
            Preconditions.checkArgument((boolean)fragment.getPartitionedSources().isEmpty(), (String)"no partitioned sources (table scans) expected, got: %s", fragment.getPartitionedSources());
            return new SingleDistributionTaskSource(StageTaskSourceFactory.getInputsForRemoteSources(fragment.getRemoteSourceNodes(), exchangeSourceHandles));
        }

        public SingleDistributionTaskSource(Multimap<PlanNodeId, ExchangeSourceHandle> exchangeSourceHandles) {
            this.exchangeSourceHandles = ImmutableMultimap.copyOf(Objects.requireNonNull(exchangeSourceHandles, "exchangeSourceHandles is null"));
        }

        @Override
        public List<TaskDescriptor> getMoreTasks() {
            ImmutableList result = ImmutableList.of((Object)new TaskDescriptor(0, (Multimap<PlanNodeId, Split>)ImmutableMultimap.of(), this.exchangeSourceHandles, new NodeRequirements(Optional.empty(), (Set<HostAddress>)ImmutableSet.of())));
            this.finished = true;
            return result;
        }

        @Override
        public boolean isFinished() {
            return this.finished;
        }

        @Override
        public void close() {
        }
    }
}

