/*
 * Decompiled with CFR 0.152.
 */
package io.trino.execution.scheduler;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.base.Verify;
import com.google.common.base.VerifyException;
import com.google.common.collect.ArrayListMultimap;
import com.google.common.collect.HashMultimap;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableListMultimap;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableMultimap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
import com.google.common.collect.ListMultimap;
import com.google.common.collect.Multimap;
import com.google.common.collect.Multimaps;
import com.google.common.collect.Sets;
import com.google.common.util.concurrent.AbstractFuture;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import io.airlift.concurrent.MoreFutures;
import io.airlift.log.Logger;
import io.airlift.units.DataSize;
import io.trino.Session;
import io.trino.SystemSessionProperties;
import io.trino.connector.CatalogName;
import io.trino.execution.ForQueryExecution;
import io.trino.execution.Lifespan;
import io.trino.execution.QueryManagerConfig;
import io.trino.execution.TableExecuteContext;
import io.trino.execution.TableExecuteContextManager;
import io.trino.execution.scheduler.BucketNodeMap;
import io.trino.execution.scheduler.NodeRequirements;
import io.trino.execution.scheduler.TaskDescriptor;
import io.trino.execution.scheduler.TaskSource;
import io.trino.execution.scheduler.TaskSourceFactory;
import io.trino.metadata.Split;
import io.trino.spi.HostAddress;
import io.trino.spi.QueryId;
import io.trino.spi.SplitWeight;
import io.trino.spi.connector.NotPartitionedPartitionHandle;
import io.trino.spi.exchange.Exchange;
import io.trino.spi.exchange.ExchangeSourceHandle;
import io.trino.spi.exchange.ExchangeSourceSplitter;
import io.trino.spi.exchange.ExchangeSourceStatistics;
import io.trino.split.SplitSource;
import io.trino.sql.planner.PartitioningHandle;
import io.trino.sql.planner.PlanFragment;
import io.trino.sql.planner.SplitSourceFactory;
import io.trino.sql.planner.SystemPartitioningHandle;
import io.trino.sql.planner.plan.ExchangeNode;
import io.trino.sql.planner.plan.PlanFragmentId;
import io.trino.sql.planner.plan.PlanNode;
import io.trino.sql.planner.plan.PlanNodeId;
import io.trino.sql.planner.plan.PlanVisitor;
import io.trino.sql.planner.plan.RemoteSourceNode;
import io.trino.sql.planner.plan.TableWriterNode;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.IdentityHashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.function.LongConsumer;
import javax.annotation.concurrent.GuardedBy;
import javax.inject.Inject;

public class StageTaskSourceFactory
implements TaskSourceFactory {
    private static final Logger log = Logger.get(StageTaskSourceFactory.class);
    private final SplitSourceFactory splitSourceFactory;
    private final TableExecuteContextManager tableExecuteContextManager;
    private final int splitBatchSize;
    private final Executor executor;

    @Inject
    public StageTaskSourceFactory(SplitSourceFactory splitSourceFactory, TableExecuteContextManager tableExecuteContextManager, QueryManagerConfig queryManagerConfig, @ForQueryExecution ExecutorService executor) {
        this(splitSourceFactory, tableExecuteContextManager, Objects.requireNonNull(queryManagerConfig, "queryManagerConfig is null").getScheduleSplitBatchSize(), executor);
    }

    public StageTaskSourceFactory(SplitSourceFactory splitSourceFactory, TableExecuteContextManager tableExecuteContextManager, int splitBatchSize, ExecutorService executor) {
        this.splitSourceFactory = Objects.requireNonNull(splitSourceFactory, "splitSourceFactory is null");
        this.tableExecuteContextManager = Objects.requireNonNull(tableExecuteContextManager, "tableExecuteContextManager is null");
        this.splitBatchSize = splitBatchSize;
        this.executor = Objects.requireNonNull(executor, "executor is null");
    }

    @Override
    public TaskSource create(Session session, PlanFragment fragment, Map<PlanFragmentId, Exchange> sourceExchanges, Multimap<PlanFragmentId, ExchangeSourceHandle> exchangeSourceHandles, LongConsumer getSplitTimeRecorder, Optional<int[]> bucketToPartitionMap, Optional<BucketNodeMap> bucketNodeMap) {
        PartitioningHandle partitioning = fragment.getPartitioning();
        if (partitioning.equals(SystemPartitioningHandle.SINGLE_DISTRIBUTION)) {
            return SingleDistributionTaskSource.create(session, fragment, exchangeSourceHandles);
        }
        if (partitioning.equals(SystemPartitioningHandle.FIXED_ARBITRARY_DISTRIBUTION) || partitioning.equals(SystemPartitioningHandle.SCALED_WRITER_DISTRIBUTION)) {
            return ArbitraryDistributionTaskSource.create(session, fragment, sourceExchanges, exchangeSourceHandles, SystemSessionProperties.getFaultTolerantExecutionTargetTaskInputSize(session));
        }
        if (partitioning.equals(SystemPartitioningHandle.FIXED_HASH_DISTRIBUTION) || partitioning.getConnectorId().isPresent()) {
            return HashDistributionTaskSource.create(session, fragment, this.splitSourceFactory, sourceExchanges, exchangeSourceHandles, this.splitBatchSize, getSplitTimeRecorder, bucketToPartitionMap.orElseThrow(() -> new IllegalArgumentException("bucketToPartitionMap is expected to be present for hash distributed stages")), bucketNodeMap, (long)SystemSessionProperties.getFaultTolerantExecutionTargetTaskSplitCount(session) * SplitWeight.standard().getRawValue(), SystemSessionProperties.getFaultTolerantExecutionTargetTaskInputSize(session), SystemSessionProperties.getFaultTolerantPreserveInputPartitionsInWriteStage(session), this.executor);
        }
        if (partitioning.equals(SystemPartitioningHandle.SOURCE_DISTRIBUTION)) {
            return SourceDistributionTaskSource.create(session, fragment, this.splitSourceFactory, exchangeSourceHandles, this.tableExecuteContextManager, this.splitBatchSize, getSplitTimeRecorder, SystemSessionProperties.getFaultTolerantExecutionMinTaskSplitCount(session), (long)SystemSessionProperties.getFaultTolerantExecutionTargetTaskSplitCount(session) * SplitWeight.standard().getRawValue(), SystemSessionProperties.getFaultTolerantExecutionMaxTaskSplitCount(session), this.executor);
        }
        throw new IllegalArgumentException("Unexpected partitioning: " + partitioning);
    }

    private static IdentityHashMap<ExchangeSourceHandle, Exchange> getExchangeForHandleMap(Map<PlanFragmentId, Exchange> sourceExchanges, Multimap<PlanFragmentId, ExchangeSourceHandle> exchangeSourceHandles) {
        IdentityHashMap<ExchangeSourceHandle, Exchange> exchangeForHandle = new IdentityHashMap<ExchangeSourceHandle, Exchange>();
        for (Map.Entry entry : exchangeSourceHandles.entries()) {
            PlanFragmentId fragmentId = (PlanFragmentId)entry.getKey();
            ExchangeSourceHandle handle = (ExchangeSourceHandle)entry.getValue();
            Exchange exchange = sourceExchanges.get(fragmentId);
            Objects.requireNonNull(exchange, "Exchange not found for fragment " + fragmentId);
            exchangeForHandle.put(handle, exchange);
        }
        return exchangeForHandle;
    }

    private static ListMultimap<PlanNodeId, ExchangeSourceHandle> getReplicatedExchangeSourceHandles(PlanFragment fragment, Multimap<PlanFragmentId, ExchangeSourceHandle> handles) {
        return StageTaskSourceFactory.getInputsForRemoteSources((List)fragment.getRemoteSourceNodes().stream().filter(remoteSource -> remoteSource.getExchangeType() == ExchangeNode.Type.REPLICATE).collect(ImmutableList.toImmutableList()), handles);
    }

    private static ListMultimap<PlanNodeId, ExchangeSourceHandle> getPartitionedExchangeSourceHandles(PlanFragment fragment, Multimap<PlanFragmentId, ExchangeSourceHandle> handles) {
        return StageTaskSourceFactory.getInputsForRemoteSources((List)fragment.getRemoteSourceNodes().stream().filter(remoteSource -> remoteSource.getExchangeType() != ExchangeNode.Type.REPLICATE).collect(ImmutableList.toImmutableList()), handles);
    }

    private static Map<PlanFragmentId, PlanNodeId> getSourceFragmentToRemoteSourceNodeIdMap(List<RemoteSourceNode> remoteSourceNodes) {
        ImmutableMap.Builder result = ImmutableMap.builder();
        for (RemoteSourceNode node : remoteSourceNodes) {
            for (PlanFragmentId sourceFragmentId : node.getSourceFragmentIds()) {
                result.put((Object)sourceFragmentId, (Object)node.getId());
            }
        }
        return result.buildOrThrow();
    }

    private static ListMultimap<PlanNodeId, ExchangeSourceHandle> getInputsForRemoteSources(List<RemoteSourceNode> remoteSources, Multimap<PlanFragmentId, ExchangeSourceHandle> exchangeSourceHandles) {
        ImmutableListMultimap.Builder result = ImmutableListMultimap.builder();
        for (RemoteSourceNode remoteSource : remoteSources) {
            for (PlanFragmentId fragmentId : remoteSource.getSourceFragmentIds()) {
                Collection handles = Objects.requireNonNull(exchangeSourceHandles.get((Object)fragmentId), () -> "exchange source handle is missing for fragment: " + fragmentId);
                if (remoteSource.getExchangeType() == ExchangeNode.Type.GATHER || remoteSource.getExchangeType() == ExchangeNode.Type.REPLICATE) {
                    Preconditions.checkArgument((handles.size() <= 1 ? 1 : 0) != 0, (String)"at most 1 exchange source handle is expected, got: %s", (Object)handles);
                }
                result.putAll((Object)remoteSource.getId(), (Iterable)handles);
            }
        }
        return result.build();
    }

    private static class SplitLoadingFuture
    extends AbstractFuture<LoadedSplits> {
        private final PlanNodeId planNodeId;
        private final SplitSource splitSource;
        private final int splitBatchSize;
        private final LongConsumer getSplitTimeRecorder;
        private final Executor executor;
        @GuardedBy(value="this")
        private final List<Split> loadedSplits = new ArrayList<Split>();
        @GuardedBy(value="this")
        private ListenableFuture<SplitSource.SplitBatch> currentSplitBatch = Futures.immediateFuture(null);

        SplitLoadingFuture(PlanNodeId planNodeId, SplitSource splitSource, int splitBatchSize, LongConsumer getSplitTimeRecorder, Executor executor) {
            this.planNodeId = Objects.requireNonNull(planNodeId, "planNodeId is null");
            this.splitSource = Objects.requireNonNull(splitSource, "splitSource is null");
            this.splitBatchSize = splitBatchSize;
            this.getSplitTimeRecorder = Objects.requireNonNull(getSplitTimeRecorder, "getSplitTimeRecorder is null");
            this.executor = Objects.requireNonNull(executor, "executor is null");
        }

        public synchronized void load() {
            if (this.currentSplitBatch == null) {
                Preconditions.checkState((boolean)this.isCancelled(), (Object)"SplitLoadingFuture should be in cancelled state");
                return;
            }
            Preconditions.checkState((boolean)this.currentSplitBatch.isDone(), (Object)"next batch of splits requested before previous batch is done");
            this.currentSplitBatch = this.splitSource.getNextBatch(NotPartitionedPartitionHandle.NOT_PARTITIONED, Lifespan.taskWide(), this.splitBatchSize);
            final long start = System.nanoTime();
            Futures.addCallback(this.currentSplitBatch, (FutureCallback)new FutureCallback<SplitSource.SplitBatch>(){

                /*
                 * WARNING - Removed try catching itself - possible behaviour change.
                 */
                public void onSuccess(SplitSource.SplitBatch splitBatch) {
                    getSplitTimeRecorder.accept(start);
                    SplitLoadingFuture splitLoadingFuture = this;
                    synchronized (splitLoadingFuture) {
                        loadedSplits.addAll(splitBatch.getSplits());
                        if (splitBatch.isLastBatch()) {
                            this.set(new LoadedSplits(planNodeId, loadedSplits));
                            try {
                                splitSource.close();
                            }
                            catch (RuntimeException e) {
                                log.error((Throwable)e, "Error closing split source");
                            }
                        } else {
                            this.load();
                        }
                    }
                }

                public void onFailure(Throwable throwable) {
                    this.setException(throwable);
                }
            }, (Executor)this.executor);
        }

        protected synchronized void interruptTask() {
            if (this.currentSplitBatch != null) {
                this.currentSplitBatch.cancel(true);
                this.currentSplitBatch = null;
            }
        }
    }

    private static class LoadedSplits {
        private final PlanNodeId planNodeId;
        private final List<Split> splits;

        private LoadedSplits(PlanNodeId planNodeId, List<Split> splits) {
            this.planNodeId = Objects.requireNonNull(planNodeId, "planNodeId is null");
            this.splits = ImmutableList.copyOf((Collection)Objects.requireNonNull(splits, "splits is null"));
        }

        public PlanNodeId getPlanNodeId() {
            return this.planNodeId;
        }

        public List<Split> getSplits() {
            return this.splits;
        }
    }

    public static class SourceDistributionTaskSource
    implements TaskSource {
        private final QueryId queryId;
        private final PlanNodeId partitionedSourceNodeId;
        private final TableExecuteContextManager tableExecuteContextManager;
        private final SplitSource splitSource;
        private final ListMultimap<PlanNodeId, ExchangeSourceHandle> replicatedExchangeSourceHandles;
        private final int splitBatchSize;
        private final LongConsumer getSplitTimeRecorder;
        private final Optional<CatalogName> catalogRequirement;
        private final int minPartitionSplitCount;
        private final long targetPartitionSplitWeight;
        private final int maxPartitionSplitCount;
        private final DataSize taskMemory;
        private final Executor executor;
        @GuardedBy(value="this")
        private final Set<Split> remotelyAccessibleSplitBuffer = Sets.newIdentityHashSet();
        @GuardedBy(value="this")
        private final Map<HostAddress, Set<Split>> locallyAccessibleSplitBuffer = new HashMap<HostAddress, Set<Split>>();
        @GuardedBy(value="this")
        private int currentPartitionId;
        @GuardedBy(value="this")
        private boolean finished;
        @GuardedBy(value="this")
        private boolean closed;
        @GuardedBy(value="this")
        private ListenableFuture<SplitSource.SplitBatch> currentSplitBatchFuture = Futures.immediateFuture(null);

        public static SourceDistributionTaskSource create(Session session, PlanFragment fragment, SplitSourceFactory splitSourceFactory, Multimap<PlanFragmentId, ExchangeSourceHandle> exchangeSourceHandles, TableExecuteContextManager tableExecuteContextManager, int splitBatchSize, LongConsumer getSplitTimeRecorder, int minPartitionSplitCount, long targetPartitionSplitWeight, int maxPartitionSplitCount, Executor executor) {
            Preconditions.checkArgument((fragment.getPartitionedSources().size() == 1 ? 1 : 0) != 0, (String)"single partitioned source is expected, got: %s", fragment.getPartitionedSources());
            List<RemoteSourceNode> remoteSourceNodes = fragment.getRemoteSourceNodes();
            Preconditions.checkArgument((boolean)remoteSourceNodes.stream().allMatch(node -> node.getExchangeType() == ExchangeNode.Type.REPLICATE), (String)"only replicated exchanges are expected in source distributed stage, got: %s", remoteSourceNodes);
            PlanNodeId partitionedSourceNodeId = (PlanNodeId)Iterables.getOnlyElement(fragment.getPartitionedSources());
            Map<PlanNodeId, SplitSource> splitSources = splitSourceFactory.createSplitSources(session, fragment);
            SplitSource splitSource = splitSources.get(partitionedSourceNodeId);
            Optional<CatalogName> catalogName = Optional.of(splitSource.getCatalogName()).filter(catalog -> !CatalogName.isInternalSystemConnector(catalog));
            return new SourceDistributionTaskSource(session.getQueryId(), partitionedSourceNodeId, tableExecuteContextManager, splitSource, StageTaskSourceFactory.getReplicatedExchangeSourceHandles(fragment, exchangeSourceHandles), splitBatchSize, getSplitTimeRecorder, catalogName, minPartitionSplitCount, targetPartitionSplitWeight, maxPartitionSplitCount, SystemSessionProperties.getFaultTolerantExecutionDefaultTaskMemory(session), executor);
        }

        @VisibleForTesting
        SourceDistributionTaskSource(QueryId queryId, PlanNodeId partitionedSourceNodeId, TableExecuteContextManager tableExecuteContextManager, SplitSource splitSource, ListMultimap<PlanNodeId, ExchangeSourceHandle> replicatedExchangeSourceHandles, int splitBatchSize, LongConsumer getSplitTimeRecorder, Optional<CatalogName> catalogRequirement, int minPartitionSplitCount, long targetPartitionSplitWeight, int maxPartitionSplitCount, DataSize taskMemory, Executor executor) {
            this.queryId = Objects.requireNonNull(queryId, "queryId is null");
            this.partitionedSourceNodeId = Objects.requireNonNull(partitionedSourceNodeId, "partitionedSourceNodeId is null");
            this.tableExecuteContextManager = Objects.requireNonNull(tableExecuteContextManager, "tableExecuteContextManager is null");
            this.splitSource = Objects.requireNonNull(splitSource, "splitSource is null");
            this.replicatedExchangeSourceHandles = ImmutableListMultimap.copyOf((Multimap)((Multimap)Objects.requireNonNull(replicatedExchangeSourceHandles, "replicatedExchangeSourceHandles is null")));
            this.splitBatchSize = splitBatchSize;
            this.getSplitTimeRecorder = Objects.requireNonNull(getSplitTimeRecorder, "getSplitTimeRecorder is null");
            this.catalogRequirement = Objects.requireNonNull(catalogRequirement, "catalogRequirement is null");
            Preconditions.checkArgument((targetPartitionSplitWeight > 0L ? 1 : 0) != 0, (String)"targetPartitionSplitCount must be greater than 0: %s", (long)targetPartitionSplitWeight);
            this.targetPartitionSplitWeight = targetPartitionSplitWeight;
            Preconditions.checkArgument((minPartitionSplitCount >= 0 ? 1 : 0) != 0, (String)"minPartitionSplitCount must be greater than or equal to 0: %s", (int)minPartitionSplitCount);
            this.minPartitionSplitCount = minPartitionSplitCount;
            Preconditions.checkArgument((maxPartitionSplitCount > 0 ? 1 : 0) != 0, (String)"maxPartitionSplitCount must be greater than 0: %s", (int)maxPartitionSplitCount);
            Preconditions.checkArgument((maxPartitionSplitCount >= minPartitionSplitCount ? 1 : 0) != 0, (String)"maxPartitionSplitCount(%s) must be greater than or equal to minPartitionSplitCount(%s)", (int)maxPartitionSplitCount, (int)minPartitionSplitCount);
            this.maxPartitionSplitCount = maxPartitionSplitCount;
            this.taskMemory = Objects.requireNonNull(taskMemory, "taskMemory is null");
            this.executor = Objects.requireNonNull(executor, "executor is null");
        }

        @Override
        public synchronized ListenableFuture<List<TaskDescriptor>> getMoreTasks() {
            if (this.finished || this.closed) {
                return Futures.immediateFuture((Object)ImmutableList.of());
            }
            Preconditions.checkState((boolean)this.currentSplitBatchFuture.isDone(), (Object)"getMoreTasks called again before the previous batch of splits was ready");
            this.currentSplitBatchFuture = this.splitSource.getNextBatch(NotPartitionedPartitionHandle.NOT_PARTITIONED, Lifespan.taskWide(), this.splitBatchSize);
            long start = System.nanoTime();
            MoreFutures.addSuccessCallback(this.currentSplitBatchFuture, () -> this.getSplitTimeRecorder.accept(start));
            return Futures.transform(this.currentSplitBatchFuture, splitBatch -> {
                SourceDistributionTaskSource sourceDistributionTaskSource = this;
                synchronized (sourceDistributionTaskSource) {
                    for (Split split : splitBatch.getSplits()) {
                        if (split.isRemotelyAccessible()) {
                            this.remotelyAccessibleSplitBuffer.add(split);
                            continue;
                        }
                        List<HostAddress> addresses = split.getAddresses();
                        Preconditions.checkArgument((!addresses.isEmpty() ? 1 : 0) != 0, (Object)"split is not remotely accessible but the list of addresses is empty");
                        for (HostAddress hostAddress : addresses) {
                            this.locallyAccessibleSplitBuffer.computeIfAbsent(hostAddress, key -> Sets.newIdentityHashSet()).add(split);
                        }
                    }
                    ImmutableList.Builder readyTasksBuilder = ImmutableList.builder();
                    boolean isLastBatch = splitBatch.isLastBatch();
                    readyTasksBuilder.addAll(this.getReadyTasks(this.remotelyAccessibleSplitBuffer, (List<Set<Split>>)ImmutableList.of(), new NodeRequirements(this.catalogRequirement, (Set<HostAddress>)ImmutableSet.of(), this.taskMemory), isLastBatch));
                    for (HostAddress remoteHost : this.locallyAccessibleSplitBuffer.keySet()) {
                        readyTasksBuilder.addAll(this.getReadyTasks(this.locallyAccessibleSplitBuffer.get(remoteHost), (List)this.locallyAccessibleSplitBuffer.entrySet().stream().filter(entry -> !((HostAddress)entry.getKey()).equals((Object)remoteHost)).map(Map.Entry::getValue).collect(ImmutableList.toImmutableList()), new NodeRequirements(this.catalogRequirement, (Set<HostAddress>)ImmutableSet.of((Object)remoteHost), this.taskMemory), isLastBatch));
                    }
                    ImmutableList readyTasks = readyTasksBuilder.build();
                    if (isLastBatch) {
                        Optional<List<Object>> tableExecuteSplitsInfo = this.splitSource.getTableExecuteSplitsInfo();
                        tableExecuteSplitsInfo.ifPresent(info -> {
                            TableExecuteContext tableExecuteContext = this.tableExecuteContextManager.getTableExecuteContextForQuery(this.queryId);
                            tableExecuteContext.setSplitsInfo((List<Object>)info);
                        });
                        try {
                            this.splitSource.close();
                        }
                        catch (RuntimeException e) {
                            log.error((Throwable)e, "Error closing split source");
                        }
                        this.finished = true;
                    }
                    return readyTasks;
                }
            }, (Executor)this.executor);
        }

        private List<TaskDescriptor> getReadyTasks(Set<Split> splits, List<Set<Split>> otherSplitSets, NodeRequirements nodeRequirements, boolean includeRemainder) {
            Optional<TaskDescriptor> readyTask;
            ImmutableList.Builder result = ImmutableList.builder();
            while (!(readyTask = this.getReadyTask(splits, otherSplitSets, nodeRequirements)).isEmpty()) {
                result.add((Object)readyTask.get());
            }
            if (includeRemainder && !splits.isEmpty()) {
                result.add((Object)this.buildTaskDescriptor(splits, nodeRequirements));
                for (Set<Split> otherSplits : otherSplitSets) {
                    otherSplits.removeAll(splits);
                }
                splits.clear();
            }
            return result.build();
        }

        private Optional<TaskDescriptor> getReadyTask(Set<Split> splits, List<Set<Split>> otherSplitSets, NodeRequirements nodeRequirements) {
            ImmutableList.Builder chosenSplitsBuilder = ImmutableList.builder();
            int splitCount = 0;
            int totalSplitWeight = 0;
            for (Split split : splits) {
                totalSplitWeight = (int)((long)totalSplitWeight + split.getSplitWeight().getRawValue());
                chosenSplitsBuilder.add((Object)split);
                if (++splitCount < this.minPartitionSplitCount || (long)totalSplitWeight < this.targetPartitionSplitWeight && splitCount < this.maxPartitionSplitCount) continue;
                ImmutableList chosenSplits = chosenSplitsBuilder.build();
                for (Set<Split> otherSplits : otherSplitSets) {
                    chosenSplits.forEach(otherSplits::remove);
                }
                chosenSplits.forEach(splits::remove);
                return Optional.of(this.buildTaskDescriptor((Collection<Split>)chosenSplits, nodeRequirements));
            }
            return Optional.empty();
        }

        private synchronized TaskDescriptor buildTaskDescriptor(Collection<Split> splits, NodeRequirements nodeRequirements) {
            return new TaskDescriptor(this.currentPartitionId++, (ListMultimap<PlanNodeId, Split>)ImmutableListMultimap.builder().putAll((Object)this.partitionedSourceNodeId, splits).build(), this.replicatedExchangeSourceHandles, nodeRequirements);
        }

        @Override
        public synchronized boolean isFinished() {
            return this.finished;
        }

        @Override
        public synchronized void close() {
            if (this.closed) {
                return;
            }
            this.closed = true;
            this.splitSource.close();
        }
    }

    public static class HashDistributionTaskSource
    implements TaskSource {
        private final Map<PlanNodeId, SplitSource> splitSources;
        private final IdentityHashMap<ExchangeSourceHandle, Exchange> exchangeForHandle;
        private final Multimap<PlanNodeId, ExchangeSourceHandle> partitionedExchangeSourceHandles;
        private final Multimap<PlanNodeId, ExchangeSourceHandle> replicatedExchangeSourceHandles;
        private final int splitBatchSize;
        private final LongConsumer getSplitTimeRecorder;
        private final int[] bucketToPartitionMap;
        private final Optional<BucketNodeMap> bucketNodeMap;
        private final DataSize taskMemory;
        private final Optional<CatalogName> catalogRequirement;
        private final long targetPartitionSourceSizeInBytes;
        private final long targetPartitionSplitWeight;
        private final Executor executor;
        @GuardedBy(value="this")
        private ListenableFuture<List<LoadedSplits>> loadedSplitsFuture;
        @GuardedBy(value="this")
        private boolean finished;
        @GuardedBy(value="this")
        private boolean closed;

        public static HashDistributionTaskSource create(Session session, PlanFragment fragment, SplitSourceFactory splitSourceFactory, Map<PlanFragmentId, Exchange> sourceExchanges, Multimap<PlanFragmentId, ExchangeSourceHandle> exchangeSourceHandles, int splitBatchSize, LongConsumer getSplitTimeRecorder, int[] bucketToPartitionMap, Optional<BucketNodeMap> bucketNodeMap, long targetPartitionSplitWeight, DataSize targetPartitionSourceSize, boolean preserveInputPartitionsInWriteStage, Executor executor) {
            Preconditions.checkArgument((bucketNodeMap.isPresent() || fragment.getPartitionedSources().isEmpty() ? 1 : 0) != 0, (Object)"bucketNodeMap is expected to be set when the fragment reads partitioned sources (tables)");
            Map<PlanNodeId, SplitSource> splitSources = splitSourceFactory.createSplitSources(session, fragment);
            return new HashDistributionTaskSource(splitSources, StageTaskSourceFactory.getExchangeForHandleMap(sourceExchanges, exchangeSourceHandles), (Multimap<PlanNodeId, ExchangeSourceHandle>)StageTaskSourceFactory.getPartitionedExchangeSourceHandles(fragment, exchangeSourceHandles), (Multimap<PlanNodeId, ExchangeSourceHandle>)StageTaskSourceFactory.getReplicatedExchangeSourceHandles(fragment, exchangeSourceHandles), splitBatchSize, getSplitTimeRecorder, bucketToPartitionMap, bucketNodeMap, fragment.getPartitioning().getConnectorId(), targetPartitionSplitWeight, preserveInputPartitionsInWriteStage && HashDistributionTaskSource.isWriteFragment(fragment) ? DataSize.of((long)0L, (DataSize.Unit)DataSize.Unit.BYTE) : targetPartitionSourceSize, SystemSessionProperties.getFaultTolerantExecutionDefaultTaskMemory(session), executor);
        }

        private static boolean isWriteFragment(PlanFragment fragment) {
            PlanVisitor<Boolean, Void> visitor = new PlanVisitor<Boolean, Void>(){

                @Override
                protected Boolean visitPlan(PlanNode node, Void context) {
                    for (PlanNode child : node.getSources()) {
                        if (!child.accept(this, context).booleanValue()) continue;
                        return true;
                    }
                    return false;
                }

                @Override
                public Boolean visitTableWriter(TableWriterNode node, Void context) {
                    return true;
                }
            };
            return fragment.getRoot().accept(visitor, null);
        }

        @VisibleForTesting
        HashDistributionTaskSource(Map<PlanNodeId, SplitSource> splitSources, IdentityHashMap<ExchangeSourceHandle, Exchange> exchangeForHandle, Multimap<PlanNodeId, ExchangeSourceHandle> partitionedExchangeSourceHandles, Multimap<PlanNodeId, ExchangeSourceHandle> replicatedExchangeSourceHandles, int splitBatchSize, LongConsumer getSplitTimeRecorder, int[] bucketToPartitionMap, Optional<BucketNodeMap> bucketNodeMap, Optional<CatalogName> catalogRequirement, long targetPartitionSplitWeight, DataSize targetPartitionSourceSize, DataSize taskMemory, Executor executor) {
            this.splitSources = ImmutableMap.copyOf(Objects.requireNonNull(splitSources, "splitSources is null"));
            this.exchangeForHandle = new IdentityHashMap();
            this.exchangeForHandle.putAll(exchangeForHandle);
            this.partitionedExchangeSourceHandles = ImmutableListMultimap.copyOf(Objects.requireNonNull(partitionedExchangeSourceHandles, "partitionedExchangeSourceHandles is null"));
            this.replicatedExchangeSourceHandles = ImmutableListMultimap.copyOf(Objects.requireNonNull(replicatedExchangeSourceHandles, "replicatedExchangeSourceHandles is null"));
            this.splitBatchSize = splitBatchSize;
            this.getSplitTimeRecorder = Objects.requireNonNull(getSplitTimeRecorder, "getSplitTimeRecorder is null");
            this.bucketToPartitionMap = Objects.requireNonNull(bucketToPartitionMap, "bucketToPartitionMap is null");
            this.bucketNodeMap = Objects.requireNonNull(bucketNodeMap, "bucketNodeMap is null");
            this.taskMemory = Objects.requireNonNull(taskMemory, "taskMemory is null");
            Preconditions.checkArgument((bucketNodeMap.isPresent() || splitSources.isEmpty() ? 1 : 0) != 0, (Object)"bucketNodeMap is expected to be set when the fragment reads partitioned sources (tables)");
            this.catalogRequirement = Objects.requireNonNull(catalogRequirement, "catalogRequirement is null");
            this.targetPartitionSourceSizeInBytes = Objects.requireNonNull(targetPartitionSourceSize, "targetPartitionSourceSize is null").toBytes();
            this.targetPartitionSplitWeight = targetPartitionSplitWeight;
            this.executor = Objects.requireNonNull(executor, "executor is null");
        }

        @Override
        public synchronized ListenableFuture<List<TaskDescriptor>> getMoreTasks() {
            if (this.finished || this.closed) {
                return Futures.immediateFuture((Object)ImmutableList.of());
            }
            Preconditions.checkState((this.loadedSplitsFuture == null ? 1 : 0) != 0, (Object)"getMoreTasks called again while splits are being loaded");
            List splitSourceCompletionFutures = (List)this.splitSources.entrySet().stream().map(entry -> {
                SplitLoadingFuture future = new SplitLoadingFuture((PlanNodeId)entry.getKey(), (SplitSource)entry.getValue(), this.splitBatchSize, this.getSplitTimeRecorder, this.executor);
                future.load();
                return future;
            }).collect(ImmutableList.toImmutableList());
            this.loadedSplitsFuture = Futures.allAsList((Iterable)splitSourceCompletionFutures);
            return Futures.transform(this.loadedSplitsFuture, loadedSplitsList -> {
                HashDistributionTaskSource hashDistributionTaskSource = this;
                synchronized (hashDistributionTaskSource) {
                    HashMap<Integer, Object> partitionToSplitsMap = new HashMap<Integer, Object>();
                    HashMultimap partitionToNodeMap = HashMultimap.create();
                    for (Object loadedSplits : loadedSplitsList) {
                        BucketNodeMap bucketNodeMap = this.bucketNodeMap.orElseThrow(() -> new VerifyException("bucket to node map is expected to be present"));
                        for (Split split : ((LoadedSplits)loadedSplits).getSplits()) {
                            Set existingRequirement;
                            int bucket = bucketNodeMap.getBucket(split);
                            int partition = this.getPartitionForBucket(bucket);
                            if (!bucketNodeMap.isDynamic()) {
                                HostAddress requiredAddress = bucketNodeMap.getAssignedNode(split).get().getHostAndPort();
                                existingRequirement = partitionToNodeMap.get((Object)partition);
                                if (existingRequirement.isEmpty()) {
                                    existingRequirement.add(requiredAddress);
                                } else {
                                    Preconditions.checkState((boolean)existingRequirement.contains(requiredAddress), (String)"Unable to satisfy host requirement for partition %s. Existing requirement %s; Current split requirement: %s;", (Object)partition, (Object)existingRequirement, (Object)requiredAddress);
                                    existingRequirement.removeIf(host -> !host.equals((Object)requiredAddress));
                                }
                            }
                            if (!split.isRemotelyAccessible()) {
                                ImmutableSet requiredAddresses = ImmutableSet.copyOf(split.getAddresses());
                                Verify.verify((!requiredAddresses.isEmpty() ? 1 : 0) != 0, (String)"split is not remotely accessible but the list of addresses is empty: %s", (Object)split);
                                existingRequirement = partitionToNodeMap.get((Object)partition);
                                if (existingRequirement.isEmpty()) {
                                    existingRequirement.addAll(requiredAddresses);
                                } else {
                                    Sets.SetView intersection = Sets.intersection((Set)requiredAddresses, (Set)existingRequirement);
                                    Preconditions.checkState((!intersection.isEmpty() ? 1 : 0) != 0, (String)"Unable to satisfy host requirement for partition %s. Existing requirement %s; Current split requirement: %s;", (Object)partition, (Object)existingRequirement, (Object)requiredAddresses);
                                    partitionToNodeMap.replaceValues((Object)partition, (Iterable)ImmutableSet.copyOf((Collection)intersection));
                                }
                            }
                            Multimap partitionSplits = (Multimap)partitionToSplitsMap.computeIfAbsent(partition, p -> ArrayListMultimap.create());
                            partitionSplits.put((Object)((LoadedSplits)loadedSplits).getPlanNodeId(), (Object)split);
                        }
                    }
                    HashMap<Integer, Object> partitionToExchangeSourceHandlesMap = new HashMap<Integer, Object>();
                    for (Map.Entry entry : this.partitionedExchangeSourceHandles.entries()) {
                        PlanNodeId planNodeId = (PlanNodeId)entry.getKey();
                        ExchangeSourceHandle handle = (ExchangeSourceHandle)entry.getValue();
                        int partition = handle.getPartitionId();
                        Multimap partitionSourceHandles = partitionToExchangeSourceHandlesMap.computeIfAbsent(partition, p -> ArrayListMultimap.create());
                        partitionSourceHandles.put((Object)planNodeId, (Object)handle);
                    }
                    int taskPartitionId = 0;
                    ImmutableList.Builder partitionTasks = ImmutableList.builder();
                    for (Integer partition : Sets.union(partitionToSplitsMap.keySet(), partitionToExchangeSourceHandlesMap.keySet())) {
                        ListMultimap splits = (ListMultimap)partitionToSplitsMap.getOrDefault(partition, ImmutableListMultimap.of());
                        ImmutableListMultimap exchangeSourceHandles = ImmutableListMultimap.builder().putAll((Multimap)partitionToExchangeSourceHandlesMap.getOrDefault(partition, ImmutableMultimap.of())).build();
                        Set hostRequirement = partitionToNodeMap.get((Object)partition);
                        partitionTasks.add((Object)new TaskDescriptor(taskPartitionId++, (ListMultimap<PlanNodeId, Split>)splits, (ListMultimap<PlanNodeId, ExchangeSourceHandle>)exchangeSourceHandles, new NodeRequirements(this.catalogRequirement, hostRequirement, this.taskMemory)));
                    }
                    List<TaskDescriptor> result = this.postprocessTasks((List<TaskDescriptor>)partitionTasks.build());
                    this.finished = true;
                    return result;
                }
            }, (Executor)this.executor);
        }

        private List<TaskDescriptor> postprocessTasks(List<TaskDescriptor> tasks) {
            ListMultimap<NodeRequirements, TaskDescriptor> taskGroups = this.groupCompatibleTasks(tasks);
            ImmutableList.Builder joinedTasks = ImmutableList.builder();
            long replicatedExchangeSourcesSize = this.replicatedExchangeSourceHandles.values().stream().mapToLong(this::sourceHandleSize).sum();
            int taskPartitionId = 0;
            for (Map.Entry taskGroup : taskGroups.asMap().entrySet()) {
                NodeRequirements groupNodeRequirements = (NodeRequirements)taskGroup.getKey();
                Collection groupTasks = (Collection)taskGroup.getValue();
                ImmutableListMultimap.Builder splits = ImmutableListMultimap.builder();
                ImmutableListMultimap.Builder exchangeSources = ImmutableListMultimap.builder();
                long splitsWeight = 0L;
                long exchangeSourcesSize = 0L;
                for (TaskDescriptor task : groupTasks) {
                    ListMultimap<PlanNodeId, Split> taskSplits = task.getSplits();
                    ListMultimap<PlanNodeId, ExchangeSourceHandle> taskExchangeSources = task.getExchangeSourceHandles();
                    long taskSplitWeight = taskSplits.values().stream().mapToLong(split -> split.getSplitWeight().getRawValue()).sum();
                    long taskExchangeSourcesSize = taskExchangeSources.values().stream().mapToLong(this::sourceHandleSize).sum();
                    if (!(splitsWeight <= 0L && exchangeSourcesSize <= 0L || splitsWeight + taskSplitWeight <= this.targetPartitionSplitWeight && exchangeSourcesSize + taskExchangeSourcesSize + replicatedExchangeSourcesSize <= this.targetPartitionSourceSizeInBytes)) {
                        exchangeSources.putAll(this.replicatedExchangeSourceHandles);
                        joinedTasks.add((Object)new TaskDescriptor(taskPartitionId++, (ListMultimap<PlanNodeId, Split>)splits.build(), (ListMultimap<PlanNodeId, ExchangeSourceHandle>)exchangeSources.build(), groupNodeRequirements));
                        splits = ImmutableListMultimap.builder();
                        exchangeSources = ImmutableListMultimap.builder();
                        splitsWeight = 0L;
                        exchangeSourcesSize = 0L;
                    }
                    splits.putAll(taskSplits);
                    exchangeSources.putAll(taskExchangeSources);
                    splitsWeight += taskSplitWeight;
                    exchangeSourcesSize += taskExchangeSourcesSize;
                }
                ImmutableListMultimap remainderSplits = splits.build();
                ImmutableListMultimap remainderExchangeSources = exchangeSources.build();
                if (remainderSplits.isEmpty() && remainderExchangeSources.isEmpty()) continue;
                remainderExchangeSources = ImmutableListMultimap.builder().putAll((Multimap)remainderExchangeSources).putAll(this.replicatedExchangeSourceHandles).build();
                joinedTasks.add((Object)new TaskDescriptor(taskPartitionId++, (ListMultimap<PlanNodeId, Split>)remainderSplits, (ListMultimap<PlanNodeId, ExchangeSourceHandle>)remainderExchangeSources, groupNodeRequirements));
            }
            return joinedTasks.build();
        }

        private long sourceHandleSize(ExchangeSourceHandle handle) {
            Exchange exchange = this.exchangeForHandle.get(handle);
            ExchangeSourceStatistics exchangeSourceStatistics = exchange.getExchangeSourceStatistics(handle);
            return exchangeSourceStatistics.getSizeInBytes();
        }

        private ListMultimap<NodeRequirements, TaskDescriptor> groupCompatibleTasks(List<TaskDescriptor> tasks) {
            return Multimaps.index(tasks, TaskDescriptor::getNodeRequirements);
        }

        private int getPartitionForBucket(int bucket) {
            return this.bucketToPartitionMap[bucket];
        }

        @Override
        public synchronized boolean isFinished() {
            return this.finished;
        }

        @Override
        public synchronized void close() {
            if (this.closed) {
                return;
            }
            this.closed = true;
            for (SplitSource splitSource : this.splitSources.values()) {
                try {
                    splitSource.close();
                }
                catch (RuntimeException e) {
                    log.error((Throwable)e, "Error closing split source");
                }
            }
        }
    }

    public static class ArbitraryDistributionTaskSource
    implements TaskSource {
        private final IdentityHashMap<ExchangeSourceHandle, Exchange> sourceExchanges;
        private final Multimap<PlanNodeId, ExchangeSourceHandle> partitionedExchangeSourceHandles;
        private final Multimap<PlanNodeId, ExchangeSourceHandle> replicatedExchangeSourceHandles;
        private final long targetPartitionSizeInBytes;
        private DataSize taskMemory;
        private boolean finished;

        public static ArbitraryDistributionTaskSource create(Session session, PlanFragment fragment, Map<PlanFragmentId, Exchange> sourceExchanges, Multimap<PlanFragmentId, ExchangeSourceHandle> exchangeSourceHandles, DataSize targetPartitionSize) {
            Preconditions.checkArgument((boolean)fragment.getPartitionedSources().isEmpty(), (String)"no partitioned sources (table scans) expected, got: %s", fragment.getPartitionedSources());
            IdentityHashMap<ExchangeSourceHandle, Exchange> exchangeForHandleMap = StageTaskSourceFactory.getExchangeForHandleMap(sourceExchanges, exchangeSourceHandles);
            return new ArbitraryDistributionTaskSource(exchangeForHandleMap, (Multimap<PlanNodeId, ExchangeSourceHandle>)StageTaskSourceFactory.getPartitionedExchangeSourceHandles(fragment, exchangeSourceHandles), (Multimap<PlanNodeId, ExchangeSourceHandle>)StageTaskSourceFactory.getReplicatedExchangeSourceHandles(fragment, exchangeSourceHandles), targetPartitionSize, SystemSessionProperties.getFaultTolerantExecutionDefaultTaskMemory(session));
        }

        @VisibleForTesting
        ArbitraryDistributionTaskSource(IdentityHashMap<ExchangeSourceHandle, Exchange> sourceExchanges, Multimap<PlanNodeId, ExchangeSourceHandle> partitionedExchangeSourceHandles, Multimap<PlanNodeId, ExchangeSourceHandle> replicatedExchangeSourceHandles, DataSize targetPartitionSize, DataSize taskMemory) {
            this.sourceExchanges = new IdentityHashMap(Objects.requireNonNull(sourceExchanges, "sourceExchanges is null"));
            this.partitionedExchangeSourceHandles = ImmutableListMultimap.copyOf(Objects.requireNonNull(partitionedExchangeSourceHandles, "partitionedExchangeSourceHandles is null"));
            this.replicatedExchangeSourceHandles = ImmutableListMultimap.copyOf(Objects.requireNonNull(replicatedExchangeSourceHandles, "replicatedExchangeSourceHandles is null"));
            this.taskMemory = Objects.requireNonNull(taskMemory, "taskMemory is null");
            Preconditions.checkArgument((boolean)sourceExchanges.keySet().containsAll(partitionedExchangeSourceHandles.values()), (String)"Unexpected entries in partitionedExchangeSourceHandles map: %s; allowed keys: %s", (Object)partitionedExchangeSourceHandles.values(), sourceExchanges.keySet());
            Preconditions.checkArgument((boolean)sourceExchanges.keySet().containsAll(replicatedExchangeSourceHandles.values()), (String)"Unexpected entries in replicatedExchangeSourceHandles map: %s; allowed keys: %s", (Object)replicatedExchangeSourceHandles.values(), sourceExchanges.keySet());
            this.targetPartitionSizeInBytes = Objects.requireNonNull(targetPartitionSize, "targetPartitionSize is null").toBytes();
        }

        @Override
        public ListenableFuture<List<TaskDescriptor>> getMoreTasks() {
            if (this.finished) {
                return Futures.immediateFuture((Object)ImmutableList.of());
            }
            NodeRequirements nodeRequirements = new NodeRequirements(Optional.empty(), (Set<HostAddress>)ImmutableSet.of(), this.taskMemory);
            ImmutableList.Builder result = ImmutableList.builder();
            int currentPartitionId = 0;
            ImmutableListMultimap.Builder assignedExchangeSourceHandles = ImmutableListMultimap.builder();
            long assignedExchangeDataSize = 0L;
            int assignedExchangeSourceHandleCount = 0;
            for (Map.Entry entry : this.partitionedExchangeSourceHandles.entries()) {
                PlanNodeId remoteSourcePlanNodeId = (PlanNodeId)entry.getKey();
                ExchangeSourceHandle originalExchangeSourceHandle = (ExchangeSourceHandle)entry.getValue();
                Exchange sourceExchange = this.sourceExchanges.get(originalExchangeSourceHandle);
                ExchangeSourceSplitter splitter = sourceExchange.split(originalExchangeSourceHandle, this.targetPartitionSizeInBytes);
                ImmutableList.Builder sourceHandles = ImmutableList.builder();
                while (true) {
                    Preconditions.checkState((boolean)splitter.isBlocked().isDone(), (Object)"not supported");
                    Optional next = splitter.getNext();
                    if (next.isEmpty()) break;
                    sourceHandles.add((Object)((ExchangeSourceHandle)next.get()));
                }
                for (ExchangeSourceHandle handle : sourceHandles.build()) {
                    ExchangeSourceStatistics statistics = sourceExchange.getExchangeSourceStatistics(handle);
                    if (assignedExchangeDataSize != 0L && assignedExchangeDataSize + statistics.getSizeInBytes() > this.targetPartitionSizeInBytes) {
                        assignedExchangeSourceHandles.putAll(this.replicatedExchangeSourceHandles);
                        result.add((Object)new TaskDescriptor(currentPartitionId++, (ListMultimap<PlanNodeId, Split>)ImmutableListMultimap.of(), (ListMultimap<PlanNodeId, ExchangeSourceHandle>)assignedExchangeSourceHandles.build(), nodeRequirements));
                        assignedExchangeSourceHandles = ImmutableListMultimap.builder();
                        assignedExchangeDataSize = 0L;
                        assignedExchangeSourceHandleCount = 0;
                    }
                    assignedExchangeSourceHandles.put((Object)remoteSourcePlanNodeId, (Object)handle);
                    assignedExchangeDataSize += statistics.getSizeInBytes();
                    ++assignedExchangeSourceHandleCount;
                }
            }
            if (assignedExchangeSourceHandleCount > 0) {
                assignedExchangeSourceHandles.putAll(this.replicatedExchangeSourceHandles);
                result.add((Object)new TaskDescriptor(currentPartitionId, (ListMultimap<PlanNodeId, Split>)ImmutableListMultimap.of(), (ListMultimap<PlanNodeId, ExchangeSourceHandle>)assignedExchangeSourceHandles.build(), nodeRequirements));
            }
            this.finished = true;
            return Futures.immediateFuture((Object)result.build());
        }

        @Override
        public boolean isFinished() {
            return this.finished;
        }

        @Override
        public void close() {
        }
    }

    public static class SingleDistributionTaskSource
    implements TaskSource {
        private final ListMultimap<PlanNodeId, ExchangeSourceHandle> exchangeSourceHandles;
        private boolean finished;
        private DataSize taskMemory;

        public static SingleDistributionTaskSource create(Session session, PlanFragment fragment, Multimap<PlanFragmentId, ExchangeSourceHandle> exchangeSourceHandles) {
            Preconditions.checkArgument((boolean)fragment.getPartitionedSources().isEmpty(), (String)"no partitioned sources (table scans) expected, got: %s", fragment.getPartitionedSources());
            return new SingleDistributionTaskSource(StageTaskSourceFactory.getInputsForRemoteSources(fragment.getRemoteSourceNodes(), exchangeSourceHandles), SystemSessionProperties.getFaultTolerantExecutionDefaultTaskMemory(session));
        }

        @VisibleForTesting
        SingleDistributionTaskSource(ListMultimap<PlanNodeId, ExchangeSourceHandle> exchangeSourceHandles, DataSize taskMemory) {
            this.exchangeSourceHandles = ImmutableListMultimap.copyOf((Multimap)((Multimap)Objects.requireNonNull(exchangeSourceHandles, "exchangeSourceHandles is null")));
            this.taskMemory = Objects.requireNonNull(taskMemory, "taskMemory is null");
        }

        @Override
        public ListenableFuture<List<TaskDescriptor>> getMoreTasks() {
            if (this.finished) {
                return Futures.immediateFuture((Object)ImmutableList.of());
            }
            ImmutableList result = ImmutableList.of((Object)new TaskDescriptor(0, (ListMultimap<PlanNodeId, Split>)ImmutableListMultimap.of(), this.exchangeSourceHandles, new NodeRequirements(Optional.empty(), (Set<HostAddress>)ImmutableSet.of(), this.taskMemory)));
            this.finished = true;
            return Futures.immediateFuture((Object)result);
        }

        @Override
        public boolean isFinished() {
            return this.finished;
        }

        @Override
        public void close() {
        }
    }
}

