/*
 * Decompiled with CFR 0.152.
 */
package io.trino.execution.scheduler.faulttolerant;

import com.google.common.base.MoreObjects;
import com.google.common.base.Preconditions;
import com.google.common.base.Stopwatch;
import com.google.common.base.Verify;
import com.google.common.collect.ArrayListMultimap;
import com.google.common.collect.HashMultimap;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableListMultimap;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.LinkedListMultimap;
import com.google.common.collect.ListMultimap;
import com.google.common.collect.Multimap;
import com.google.common.collect.Multimaps;
import com.google.common.collect.SetMultimap;
import com.google.common.collect.Sets;
import com.google.common.io.Closer;
import com.google.common.primitives.ImmutableLongArray;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.UncheckedExecutionException;
import com.google.errorprone.annotations.CheckReturnValue;
import com.google.errorprone.annotations.ThreadSafe;
import com.google.errorprone.annotations.concurrent.GuardedBy;
import io.airlift.log.Logger;
import io.airlift.units.DataSize;
import io.airlift.units.Duration;
import io.opentelemetry.api.common.Attributes;
import io.opentelemetry.api.trace.Span;
import io.opentelemetry.api.trace.Tracer;
import io.opentelemetry.context.Context;
import io.opentelemetry.context.ImplicitContextKeyed;
import io.trino.Session;
import io.trino.SystemSessionProperties;
import io.trino.cost.RuntimeInfoProvider;
import io.trino.cost.StaticRuntimeInfoProvider;
import io.trino.exchange.ExchangeContextInstance;
import io.trino.exchange.ExchangeInput;
import io.trino.exchange.SpoolingExchangeInput;
import io.trino.execution.BasicStageInfo;
import io.trino.execution.BasicStageStats;
import io.trino.execution.ExecutionFailureInfo;
import io.trino.execution.NodeTaskMap;
import io.trino.execution.QueryState;
import io.trino.execution.QueryStateMachine;
import io.trino.execution.RemoteTask;
import io.trino.execution.RemoteTaskFactory;
import io.trino.execution.SqlStage;
import io.trino.execution.StageId;
import io.trino.execution.StageInfo;
import io.trino.execution.StageState;
import io.trino.execution.StateMachine;
import io.trino.execution.TableInfo;
import io.trino.execution.TaskId;
import io.trino.execution.TaskState;
import io.trino.execution.TaskStatus;
import io.trino.execution.buffer.OutputBufferStatus;
import io.trino.execution.buffer.SpoolingOutputBuffers;
import io.trino.execution.buffer.SpoolingOutputStats;
import io.trino.execution.resourcegroups.IndexedPriorityQueue;
import io.trino.execution.scheduler.ErrorCodes;
import io.trino.execution.scheduler.Exchanges;
import io.trino.execution.scheduler.OutputDataSizeEstimate;
import io.trino.execution.scheduler.QueryScheduler;
import io.trino.execution.scheduler.SchedulingUtils;
import io.trino.execution.scheduler.SplitSchedulerStats;
import io.trino.execution.scheduler.TaskExecutionStats;
import io.trino.execution.scheduler.faulttolerant.EventDrivenTaskSource;
import io.trino.execution.scheduler.faulttolerant.EventDrivenTaskSourceFactory;
import io.trino.execution.scheduler.faulttolerant.FaultTolerantPartitioningScheme;
import io.trino.execution.scheduler.faulttolerant.FaultTolerantPartitioningSchemeFactory;
import io.trino.execution.scheduler.faulttolerant.NodeAllocator;
import io.trino.execution.scheduler.faulttolerant.NodeAllocatorService;
import io.trino.execution.scheduler.faulttolerant.NodeRequirements;
import io.trino.execution.scheduler.faulttolerant.OutputStatsEstimator;
import io.trino.execution.scheduler.faulttolerant.OutputStatsEstimatorFactory;
import io.trino.execution.scheduler.faulttolerant.PartitionMemoryEstimator;
import io.trino.execution.scheduler.faulttolerant.PartitionMemoryEstimatorFactory;
import io.trino.execution.scheduler.faulttolerant.SplitAssigner;
import io.trino.execution.scheduler.faulttolerant.SplitsMapping;
import io.trino.execution.scheduler.faulttolerant.StageExecutionStats;
import io.trino.execution.scheduler.faulttolerant.TaskDescriptor;
import io.trino.execution.scheduler.faulttolerant.TaskDescriptorStorage;
import io.trino.execution.scheduler.faulttolerant.TaskExecutionClass;
import io.trino.failuredetector.FailureDetector;
import io.trino.metadata.InternalNode;
import io.trino.metadata.Metadata;
import io.trino.metadata.Split;
import io.trino.operator.ExchangeOperator;
import io.trino.operator.RetryPolicy;
import io.trino.server.DynamicFilterService;
import io.trino.spi.ErrorCode;
import io.trino.spi.ErrorCodeSupplier;
import io.trino.spi.ErrorType;
import io.trino.spi.StandardErrorCode;
import io.trino.spi.TrinoException;
import io.trino.spi.exchange.Exchange;
import io.trino.spi.exchange.ExchangeContext;
import io.trino.spi.exchange.ExchangeId;
import io.trino.spi.exchange.ExchangeManager;
import io.trino.spi.exchange.ExchangeSinkHandle;
import io.trino.spi.exchange.ExchangeSinkInstanceHandle;
import io.trino.spi.exchange.ExchangeSourceHandle;
import io.trino.spi.exchange.ExchangeSourceOutputSelector;
import io.trino.split.RemoteSplit;
import io.trino.sql.planner.AdaptivePlanner;
import io.trino.sql.planner.NodePartitioningManager;
import io.trino.sql.planner.PlanFragment;
import io.trino.sql.planner.SubPlan;
import io.trino.sql.planner.SystemPartitioningHandle;
import io.trino.sql.planner.TopologicalOrderSubPlanVisitor;
import io.trino.sql.planner.optimizations.PlanNodeSearcher;
import io.trino.sql.planner.plan.AggregationNode;
import io.trino.sql.planner.plan.LimitNode;
import io.trino.sql.planner.plan.PlanFragmentId;
import io.trino.sql.planner.plan.PlanNode;
import io.trino.sql.planner.plan.PlanNodeId;
import io.trino.sql.planner.plan.RemoteSourceNode;
import io.trino.tracing.TrinoAttributes;
import io.trino.util.Failures;
import it.unimi.dsi.fastutil.ints.Int2ObjectMap;
import it.unimi.dsi.fastutil.ints.Int2ObjectOpenHashMap;
import it.unimi.dsi.fastutil.ints.IntIterator;
import it.unimi.dsi.fastutil.ints.IntOpenHashSet;
import it.unimi.dsi.fastutil.ints.IntSet;
import jakarta.annotation.Nullable;
import java.io.Closeable;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.lang.ref.SoftReference;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.OptionalLong;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;

public class EventDrivenFaultTolerantQueryScheduler
implements QueryScheduler {
    private static final Logger log = Logger.get(EventDrivenFaultTolerantQueryScheduler.class);
    private final QueryStateMachine queryStateMachine;
    private final Metadata metadata;
    private final RemoteTaskFactory remoteTaskFactory;
    private final TaskDescriptorStorage taskDescriptorStorage;
    private final EventDrivenTaskSourceFactory taskSourceFactory;
    private final boolean summarizeTaskInfo;
    private final NodeTaskMap nodeTaskMap;
    private final ExecutorService queryExecutor;
    private final ScheduledExecutorService scheduledExecutorService;
    private final Tracer tracer;
    private final SplitSchedulerStats schedulerStats;
    private final PartitionMemoryEstimatorFactory memoryEstimatorFactory;
    private final OutputStatsEstimatorFactory outputStatsEstimatorFactory;
    private final NodePartitioningManager nodePartitioningManager;
    private final ExchangeManager exchangeManager;
    private final NodeAllocatorService nodeAllocatorService;
    private final FailureDetector failureDetector;
    private final DynamicFilterService dynamicFilterService;
    private final TaskExecutionStats taskExecutionStats;
    private final Optional<AdaptivePlanner> adaptivePlanner;
    private final StageExecutionStats stageExecutionStats;
    private final SubPlan originalPlan;
    private final boolean stageEstimationForEagerParentEnabled;
    private final StageRegistry stageRegistry;
    @GuardedBy(value="this")
    private boolean started;
    @GuardedBy(value="this")
    private Scheduler scheduler;

    public EventDrivenFaultTolerantQueryScheduler(QueryStateMachine queryStateMachine, Metadata metadata, RemoteTaskFactory remoteTaskFactory, TaskDescriptorStorage taskDescriptorStorage, EventDrivenTaskSourceFactory taskSourceFactory, boolean summarizeTaskInfo, NodeTaskMap nodeTaskMap, ExecutorService queryExecutor, ScheduledExecutorService scheduledExecutorService, Tracer tracer, SplitSchedulerStats schedulerStats, PartitionMemoryEstimatorFactory memoryEstimatorFactory, OutputStatsEstimatorFactory outputStatsEstimatorFactory, NodePartitioningManager nodePartitioningManager, ExchangeManager exchangeManager, NodeAllocatorService nodeAllocatorService, FailureDetector failureDetector, DynamicFilterService dynamicFilterService, TaskExecutionStats taskExecutionStats, AdaptivePlanner adaptivePlanner, StageExecutionStats stageExecutionStats, SubPlan originalPlan) {
        this.queryStateMachine = Objects.requireNonNull(queryStateMachine, "queryStateMachine is null");
        RetryPolicy retryPolicy = SystemSessionProperties.getRetryPolicy(queryStateMachine.getSession());
        Verify.verify((retryPolicy == RetryPolicy.TASK ? 1 : 0) != 0, (String)"unexpected retry policy: %s", (Object)((Object)retryPolicy));
        this.metadata = Objects.requireNonNull(metadata, "metadata is null");
        this.remoteTaskFactory = Objects.requireNonNull(remoteTaskFactory, "remoteTaskFactory is null");
        this.taskDescriptorStorage = Objects.requireNonNull(taskDescriptorStorage, "taskDescriptorStorage is null");
        this.taskSourceFactory = Objects.requireNonNull(taskSourceFactory, "taskSourceFactory is null");
        this.summarizeTaskInfo = summarizeTaskInfo;
        this.nodeTaskMap = Objects.requireNonNull(nodeTaskMap, "nodeTaskMap is null");
        this.queryExecutor = Objects.requireNonNull(queryExecutor, "queryExecutor is null");
        this.scheduledExecutorService = Objects.requireNonNull(scheduledExecutorService, "scheduledExecutorService is null");
        this.tracer = Objects.requireNonNull(tracer, "tracer is null");
        this.schedulerStats = Objects.requireNonNull(schedulerStats, "schedulerStats is null");
        this.memoryEstimatorFactory = Objects.requireNonNull(memoryEstimatorFactory, "memoryEstimatorFactory is null");
        this.outputStatsEstimatorFactory = Objects.requireNonNull(outputStatsEstimatorFactory, "outputStatsEstimatorFactory is null");
        this.nodePartitioningManager = Objects.requireNonNull(nodePartitioningManager, "partitioningSchemeFactory is null");
        this.exchangeManager = Objects.requireNonNull(exchangeManager, "exchangeManager is null");
        this.nodeAllocatorService = Objects.requireNonNull(nodeAllocatorService, "nodeAllocatorService is null");
        this.failureDetector = Objects.requireNonNull(failureDetector, "failureDetector is null");
        this.dynamicFilterService = Objects.requireNonNull(dynamicFilterService, "dynamicFilterService is null");
        this.taskExecutionStats = Objects.requireNonNull(taskExecutionStats, "taskExecutionStats is null");
        this.adaptivePlanner = SystemSessionProperties.isFaultTolerantExecutionAdaptiveQueryPlanningEnabled(queryStateMachine.getSession()) ? Optional.of(Objects.requireNonNull(adaptivePlanner, "adaptivePlanner is null")) : Optional.empty();
        this.originalPlan = Objects.requireNonNull(originalPlan, "originalPlan is null");
        this.stageExecutionStats = Objects.requireNonNull(stageExecutionStats, "stageExecutionStats is null");
        this.stageEstimationForEagerParentEnabled = SystemSessionProperties.isFaultTolerantExecutionStageEstimationForEagerParentEnabled(queryStateMachine.getSession());
        this.stageRegistry = new StageRegistry(queryStateMachine, originalPlan);
    }

    @Override
    public synchronized void start() {
        Preconditions.checkState((!this.started ? 1 : 0) != 0, (Object)"already started");
        this.started = true;
        if (this.queryStateMachine.isDone()) {
            return;
        }
        this.taskDescriptorStorage.initialize(this.queryStateMachine.getQueryId());
        this.queryStateMachine.addStateChangeListener(state -> {
            if (state.isDone()) {
                this.taskDescriptorStorage.destroy(this.queryStateMachine.getQueryId());
            }
        });
        this.queryStateMachine.addStateChangeListener(state -> {
            Scheduler scheduler;
            if (!state.isDone()) {
                return;
            }
            EventDrivenFaultTolerantQueryScheduler eventDrivenFaultTolerantQueryScheduler = this;
            synchronized (eventDrivenFaultTolerantQueryScheduler) {
                scheduler = this.scheduler;
                this.scheduler = null;
            }
            if (scheduler != null) {
                scheduler.abort();
            }
            this.queryStateMachine.updateQueryInfo(Optional.ofNullable(this.stageRegistry.getStageInfo()));
        });
        Session session = this.queryStateMachine.getSession();
        int maxPartitionCount = SystemSessionProperties.getFaultTolerantExecutionMaxPartitionCount(session);
        FaultTolerantPartitioningSchemeFactory partitioningSchemeFactory = new FaultTolerantPartitioningSchemeFactory(this.nodePartitioningManager, session, maxPartitionCount);
        Closer closer = Closer.create();
        NodeAllocator nodeAllocator = (NodeAllocator)closer.register((Closeable)this.nodeAllocatorService.getNodeAllocator(session));
        try {
            this.scheduler = new Scheduler(this.queryStateMachine, this.metadata, this.remoteTaskFactory, this.taskDescriptorStorage, this.taskSourceFactory, this.summarizeTaskInfo, this.nodeTaskMap, this.queryExecutor, this.scheduledExecutorService, this.tracer, this.schedulerStats, this.memoryEstimatorFactory, this.outputStatsEstimatorFactory.create(session), partitioningSchemeFactory, this.exchangeManager, SystemSessionProperties.getTaskRetryAttemptsPerTask(session) + 1, SystemSessionProperties.getMaxTasksWaitingForNodePerStage(session), SystemSessionProperties.getMaxTasksWaitingForExecutionPerQuery(session), nodeAllocator, this.failureDetector, this.stageRegistry, this.taskExecutionStats, this.stageExecutionStats, this.dynamicFilterService, new SchedulingDelayer(SystemSessionProperties.getRetryInitialDelay(session), SystemSessionProperties.getRetryMaxDelay(session), SystemSessionProperties.getRetryDelayScaleFactor(session), Stopwatch.createUnstarted()), this.originalPlan, maxPartitionCount, SystemSessionProperties.isFaultTolerantExecutionRuntimeAdaptivePartitioningEnabled(session), SystemSessionProperties.getFaultTolerantExecutionRuntimeAdaptivePartitioningPartitionCount(session), SystemSessionProperties.getFaultTolerantExecutionRuntimeAdaptivePartitioningMaxTaskSize(session), this.stageEstimationForEagerParentEnabled, this.adaptivePlanner);
            this.queryExecutor.submit(this.scheduler::run);
        }
        catch (Throwable t) {
            block5: {
                try {
                    closer.close();
                }
                catch (Throwable closerFailure) {
                    if (t == closerFailure) break block5;
                    t.addSuppressed(closerFailure);
                }
            }
            throw t;
        }
    }

    @Override
    public void cancelStage(StageId stageId) {
        throw new UnsupportedOperationException("partial cancel is not supported in fault tolerant mode");
    }

    @Override
    public void failTask(TaskId taskId, Throwable failureCause) {
        this.stageRegistry.failTaskRemotely(taskId, failureCause);
    }

    @Override
    public BasicStageStats getBasicStageStats() {
        return this.stageRegistry.getBasicStageStats();
    }

    @Override
    public BasicStageInfo getBasicStageInfo() {
        return new BasicStageInfo(this.stageRegistry.getStageInfo());
    }

    @Override
    public StageInfo getStageInfo() {
        return this.stageRegistry.getStageInfo();
    }

    @Override
    public long getUserMemoryReservation() {
        return this.stageRegistry.getUserMemoryReservation();
    }

    @Override
    public long getTotalMemoryReservation() {
        return this.stageRegistry.getTotalMemoryReservation();
    }

    @Override
    public Duration getTotalCpuTime() {
        return this.stageRegistry.getTotalCpuTime();
    }

    private static Split createOutputSelectorSplit(ExchangeSourceOutputSelector selector) {
        return new Split(ExchangeOperator.REMOTE_CATALOG_HANDLE, new RemoteSplit(new SpoolingExchangeInput((List<ExchangeSourceHandle>)ImmutableList.of(), Optional.of(selector))));
    }

    @ThreadSafe
    private static class StageRegistry {
        private final QueryStateMachine queryStateMachine;
        private final AtomicReference<SubPlan> plan;
        private final Map<StageId, SqlStage> stages = new ConcurrentHashMap<StageId, SqlStage>();

        public StageRegistry(QueryStateMachine queryStateMachine, SubPlan plan) {
            this.queryStateMachine = Objects.requireNonNull(queryStateMachine, "queryStateMachine is null");
            this.plan = new AtomicReference<SubPlan>(Objects.requireNonNull(plan, "plan is null"));
        }

        public void add(SqlStage stage) {
            Verify.verify((this.stages.putIfAbsent(stage.getStageId(), stage) == null ? 1 : 0) != 0, (String)"stage %s is already present", (Object)stage.getStageId());
        }

        public void updatePlan(SubPlan plan) {
            this.plan.set(Objects.requireNonNull(plan, "plan is null"));
        }

        public StageInfo getStageInfo() {
            Map stageInfos = (Map)this.stages.values().stream().collect(ImmutableMap.toImmutableMap(stage -> stage.getFragment().getId(), SqlStage::getStageInfo));
            SubPlan plan = Objects.requireNonNull(this.plan.get(), "plan is null");
            HashSet<PlanFragmentId> reportedFragments = new HashSet<PlanFragmentId>();
            return this.getStageInfo(plan, stageInfos, reportedFragments);
        }

        private StageInfo getStageInfo(SubPlan plan, Map<PlanFragmentId, StageInfo> infos, Set<PlanFragmentId> reportedFragments) {
            PlanFragmentId fragmentId = plan.getFragment().getId();
            reportedFragments.add(fragmentId);
            StageInfo info = infos.get(fragmentId);
            if (info == null) {
                info = StageInfo.createInitial(this.queryStateMachine.getQueryId(), this.queryStateMachine.getQueryState().isDone() ? StageState.ABORTED : StageState.PLANNED, plan.getFragment());
            }
            List sourceStages = (List)plan.getChildren().stream().map(source -> this.getStageInfo((SubPlan)source, infos, reportedFragments)).collect(ImmutableList.toImmutableList());
            return info.withSubStages(sourceStages);
        }

        public BasicStageStats getBasicStageStats() {
            List stageStats = (List)this.stages.values().stream().map(SqlStage::getBasicStageStats).collect(ImmutableList.toImmutableList());
            return BasicStageStats.aggregateBasicStageStats(stageStats);
        }

        public long getUserMemoryReservation() {
            return this.stages.values().stream().mapToLong(SqlStage::getUserMemoryReservation).sum();
        }

        public long getTotalMemoryReservation() {
            return this.stages.values().stream().mapToLong(SqlStage::getTotalMemoryReservation).sum();
        }

        public Duration getTotalCpuTime() {
            long millis = this.stages.values().stream().mapToLong(stage -> stage.getTotalCpuTime().toMillis()).sum();
            return new Duration((double)millis, TimeUnit.MILLISECONDS);
        }

        public void failTaskRemotely(TaskId taskId, Throwable failureCause) {
            SqlStage sqlStage = Objects.requireNonNull(this.stages.get(taskId.getStageId()), () -> "stage not found: %s" + String.valueOf(taskId.getStageId()));
            sqlStage.failTaskRemotely(taskId, failureCause);
        }

        public void logDebugInfo() {
            if (!log.isDebugEnabled()) {
                return;
            }
            log.debug("SqlStages:");
            this.stages.forEach((stageId, stage) -> log.debug("SqlStage %s: %s", new Object[]{stageId, stage}));
        }
    }

    private static class Scheduler
    implements EventListener<Void> {
        private static final int EVENT_BUFFER_CAPACITY = 100;
        private static final long EVENT_PROCESSING_ENFORCED_FREQUENCY_MILLIS = TimeUnit.MINUTES.toMillis(1L);
        private static final long SCHEDULER_STALLED_DURATION_THRESHOLD_MILLIS = TimeUnit.MINUTES.toMillis(10L);
        private static final long SCHEDULER_MAX_DEBUG_INFO_FREQUENCY_MILLIS = TimeUnit.MINUTES.toMillis(10L);
        private static final long SCHEDULER_STALLED_DURATION_ON_TIME_EXCEEDED_THRESHOLD_MILLIS = TimeUnit.SECONDS.toMillis(30L);
        private static final long SCHEDULER_STALLED_DURATION_ON_USER_CANCELED_THRESHOLD_MILLIS = TimeUnit.SECONDS.toMillis(60L);
        private static final int EVENTS_DEBUG_INFOS_PER_BUCKET = 10;
        private static final int TASK_FAILURES_LOG_SIZE = 5;
        private final QueryStateMachine queryStateMachine;
        private final Metadata metadata;
        private final RemoteTaskFactory remoteTaskFactory;
        private final TaskDescriptorStorage taskDescriptorStorage;
        private final EventDrivenTaskSourceFactory taskSourceFactory;
        private final boolean summarizeTaskInfo;
        private final NodeTaskMap nodeTaskMap;
        private final ExecutorService queryExecutor;
        private final ScheduledExecutorService scheduledExecutorService;
        private final Tracer tracer;
        private final Span schedulerSpan;
        private final SplitSchedulerStats schedulerStats;
        private final PartitionMemoryEstimatorFactory memoryEstimatorFactory;
        private final OutputStatsEstimator outputStatsEstimator;
        private final FaultTolerantPartitioningSchemeFactory partitioningSchemeFactory;
        private final ExchangeManager exchangeManager;
        private final int maxTaskExecutionAttempts;
        private final int maxTasksWaitingForNode;
        private final int maxTasksWaitingForExecution;
        private final NodeAllocator nodeAllocator;
        private final FailureDetector failureDetector;
        private final StageRegistry stageRegistry;
        private final TaskExecutionStats taskExecutionStats;
        private final StageExecutionStats stageExecutionStats;
        private final DynamicFilterService dynamicFilterService;
        private final int maxPartitionCount;
        private final boolean runtimeAdaptivePartitioningEnabled;
        private final int runtimeAdaptivePartitioningPartitionCount;
        private final long runtimeAdaptivePartitioningMaxTaskSizeInBytes;
        private final boolean stageEstimationForEagerParentEnabled;
        private final BlockingQueue<Event> eventQueue = new LinkedBlockingQueue<Event>();
        private final List<Event> eventBuffer = new ArrayList<Event>(100);
        private final Stopwatch noEventsStopwatch = Stopwatch.createUnstarted();
        private final Stopwatch debugInfoStopwatch = Stopwatch.createUnstarted();
        private final Optional<EventDebugInfos> eventDebugInfos;
        private final Queue<Map.Entry<TaskId, RuntimeException>> taskFailures = new ArrayDeque<Map.Entry<TaskId, RuntimeException>>(5);
        private boolean started;
        private boolean runtimeAdaptivePartitioningApplied;
        private SubPlan plan;
        private List<SubPlan> planInTopologicalOrder;
        private final Optional<AdaptivePlanner> adaptivePlanner;
        private final Map<StageId, StageExecution> stageExecutions = new HashMap<StageId, StageExecution>();
        private final Map<SubPlan, IsReadyForExecutionResult> isReadyForExecutionCache = new HashMap<SubPlan, IsReadyForExecutionResult>();
        private final SetMultimap<StageId, StageId> stageConsumers = HashMultimap.create();
        private final SchedulingQueue schedulingQueue = new SchedulingQueue();
        private int nextSchedulingPriority;
        private final Map<ScheduledTask, PreSchedulingTaskContext> preSchedulingTaskContexts = new HashMap<ScheduledTask, PreSchedulingTaskContext>();
        private final SchedulingDelayer schedulingDelayer;
        private boolean queryOutputSet;

        public Scheduler(QueryStateMachine queryStateMachine, Metadata metadata, RemoteTaskFactory remoteTaskFactory, TaskDescriptorStorage taskDescriptorStorage, EventDrivenTaskSourceFactory taskSourceFactory, boolean summarizeTaskInfo, NodeTaskMap nodeTaskMap, ExecutorService queryExecutor, ScheduledExecutorService scheduledExecutorService, Tracer tracer, SplitSchedulerStats schedulerStats, PartitionMemoryEstimatorFactory memoryEstimatorFactory, OutputStatsEstimator outputStatsEstimator, FaultTolerantPartitioningSchemeFactory partitioningSchemeFactory, ExchangeManager exchangeManager, int maxTaskExecutionAttempts, int maxTasksWaitingForNode, int maxTasksWaitingForExecution, NodeAllocator nodeAllocator, FailureDetector failureDetector, StageRegistry stageRegistry, TaskExecutionStats taskExecutionStats, StageExecutionStats stageExecutionStats, DynamicFilterService dynamicFilterService, SchedulingDelayer schedulingDelayer, SubPlan plan, int maxPartitionCount, boolean runtimeAdaptivePartitioningEnabled, int runtimeAdaptivePartitioningPartitionCount, DataSize runtimeAdaptivePartitioningMaxTaskSize, boolean stageEstimationForEagerParentEnabled, Optional<AdaptivePlanner> adaptivePlanner) {
            this.queryStateMachine = Objects.requireNonNull(queryStateMachine, "queryStateMachine is null");
            this.metadata = Objects.requireNonNull(metadata, "metadata is null");
            this.remoteTaskFactory = Objects.requireNonNull(remoteTaskFactory, "remoteTaskFactory is null");
            this.taskDescriptorStorage = Objects.requireNonNull(taskDescriptorStorage, "taskDescriptorStorage is null");
            this.taskSourceFactory = Objects.requireNonNull(taskSourceFactory, "taskSourceFactory is null");
            this.summarizeTaskInfo = summarizeTaskInfo;
            this.nodeTaskMap = Objects.requireNonNull(nodeTaskMap, "nodeTaskMap is null");
            this.queryExecutor = Objects.requireNonNull(queryExecutor, "queryExecutor is null");
            this.scheduledExecutorService = Objects.requireNonNull(scheduledExecutorService, "scheduledExecutorService is null");
            this.tracer = Objects.requireNonNull(tracer, "tracer is null");
            this.schedulerStats = Objects.requireNonNull(schedulerStats, "schedulerStats is null");
            this.memoryEstimatorFactory = Objects.requireNonNull(memoryEstimatorFactory, "memoryEstimatorFactory is null");
            this.outputStatsEstimator = Objects.requireNonNull(outputStatsEstimator, "outputStatsEstimator is null");
            this.partitioningSchemeFactory = Objects.requireNonNull(partitioningSchemeFactory, "partitioningSchemeFactory is null");
            this.exchangeManager = Objects.requireNonNull(exchangeManager, "exchangeManager is null");
            Preconditions.checkArgument((maxTaskExecutionAttempts > 0 ? 1 : 0) != 0, (String)"maxTaskExecutionAttempts must be greater than zero: %s", (int)maxTaskExecutionAttempts);
            this.maxTaskExecutionAttempts = maxTaskExecutionAttempts;
            this.maxTasksWaitingForNode = maxTasksWaitingForNode;
            this.maxTasksWaitingForExecution = maxTasksWaitingForExecution;
            this.nodeAllocator = Objects.requireNonNull(nodeAllocator, "nodeAllocator is null");
            this.failureDetector = Objects.requireNonNull(failureDetector, "failureDetector is null");
            this.stageRegistry = Objects.requireNonNull(stageRegistry, "stageRegistry is null");
            this.taskExecutionStats = Objects.requireNonNull(taskExecutionStats, "taskExecutionStats is null");
            this.stageExecutionStats = Objects.requireNonNull(stageExecutionStats, "stageExecutionStats is null");
            this.dynamicFilterService = Objects.requireNonNull(dynamicFilterService, "dynamicFilterService is null");
            this.schedulingDelayer = Objects.requireNonNull(schedulingDelayer, "schedulingDelayer is null");
            this.plan = Objects.requireNonNull(plan, "plan is null");
            this.maxPartitionCount = maxPartitionCount;
            this.runtimeAdaptivePartitioningEnabled = runtimeAdaptivePartitioningEnabled;
            this.runtimeAdaptivePartitioningPartitionCount = runtimeAdaptivePartitioningPartitionCount;
            this.runtimeAdaptivePartitioningMaxTaskSizeInBytes = Objects.requireNonNull(runtimeAdaptivePartitioningMaxTaskSize, "runtimeAdaptivePartitioningMaxTaskSize is null").toBytes();
            this.adaptivePlanner = Objects.requireNonNull(adaptivePlanner, "adaptivePlanner is null");
            this.stageEstimationForEagerParentEnabled = stageEstimationForEagerParentEnabled;
            this.schedulerSpan = tracer.spanBuilder("scheduler").setParent(Context.current().with((ImplicitContextKeyed)queryStateMachine.getSession().getQuerySpan())).setAttribute(TrinoAttributes.QUERY_ID, (Object)queryStateMachine.getQueryId().toString()).startSpan();
            this.eventDebugInfos = log.isDebugEnabled() ? Optional.of(new EventDebugInfos(queryStateMachine.getQueryId().toString(), 10)) : Optional.empty();
            this.planInTopologicalOrder = TopologicalOrderSubPlanVisitor.sortPlanInTopologicalOrder(plan);
            this.noEventsStopwatch.start();
        }

        public void run() {
            Preconditions.checkState((!this.started ? 1 : 0) != 0, (Object)"already started");
            this.started = true;
            this.queryStateMachine.addStateChangeListener(state -> {
                if (state.isDone()) {
                    this.eventQueue.add(Event.WAKE_UP);
                }
            });
            Optional<Throwable> failure = Optional.empty();
            try {
                if (this.schedule()) {
                    while (this.processEvents() && (this.schedulingDelayer.getRemainingDelayInMillis() > 0L || this.schedule())) {
                    }
                }
            }
            catch (Throwable t) {
                failure = Optional.of(t);
            }
            for (StageExecution execution : this.stageExecutions.values()) {
                failure = this.closeAndAddSuppressed(failure, execution::abort);
            }
            for (PreSchedulingTaskContext context : this.preSchedulingTaskContexts.values()) {
                failure = this.closeAndAddSuppressed(failure, context.getNodeLease()::release);
            }
            this.preSchedulingTaskContexts.clear();
            failure = this.closeAndAddSuppressed(failure, this.nodeAllocator);
            failure.ifPresent(fail -> {
                this.queryStateMachine.transitionToFailed((Throwable)fail);
                this.schedulerSpan.addEvent("scheduler_failure", Attributes.of(TrinoAttributes.FAILURE_MESSAGE, (Object)fail.getMessage()));
            });
            this.schedulerSpan.end();
        }

        private Optional<Throwable> closeAndAddSuppressed(Optional<Throwable> existingFailure, Closeable closeable) {
            block3: {
                try {
                    closeable.close();
                }
                catch (Throwable t) {
                    if (existingFailure.isEmpty()) {
                        return Optional.of(t);
                    }
                    if (existingFailure.get() == t) break block3;
                    existingFailure.get().addSuppressed(t);
                }
            }
            return existingFailure;
        }

        private boolean processEvents() {
            try {
                Event event = this.eventQueue.poll(EVENT_PROCESSING_ENFORCED_FREQUENCY_MILLIS, TimeUnit.MILLISECONDS);
                if (event != null) {
                    this.eventBuffer.add(event);
                }
            }
            catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                throw new RuntimeException(e);
            }
            boolean eventDebugInfoRecorded = false;
            boolean aborted = false;
            while (!aborted) {
                this.eventQueue.drainTo(this.eventBuffer, 100 - this.eventBuffer.size());
                if (this.eventBuffer.isEmpty()) break;
                for (Event e : this.eventBuffer) {
                    eventDebugInfoRecorded |= this.recordEventsDebugInfo(e);
                    if (e == Event.ABORT) {
                        aborted = true;
                        break;
                    }
                    if (e == Event.WAKE_UP) continue;
                    e.accept(this);
                }
                this.eventBuffer.clear();
            }
            this.queryStateMachine.getFailureInfo().ifPresent(failureInfo -> {
                if (failureInfo.getErrorCode() == StandardErrorCode.EXCEEDED_TIME_LIMIT.toErrorCode() && this.noEventsStopwatch.elapsed().toMillis() > SCHEDULER_STALLED_DURATION_ON_TIME_EXCEEDED_THRESHOLD_MILLIS) {
                    this.logDebugInfoSafe(String.format("Scheduler stalled for %s on EXCEEDED_TIME_LIMIT", this.noEventsStopwatch.elapsed()));
                } else if (failureInfo.getErrorCode() == StandardErrorCode.USER_CANCELED.toErrorCode() && this.noEventsStopwatch.elapsed().toMillis() > SCHEDULER_STALLED_DURATION_ON_USER_CANCELED_THRESHOLD_MILLIS) {
                    this.logDebugInfoSafe(String.format("Scheduler stalled for %s on USER_CANCELED", this.noEventsStopwatch.elapsed()));
                }
            });
            if (eventDebugInfoRecorded) {
                this.noEventsStopwatch.reset().start();
                this.debugInfoStopwatch.reset();
            } else if (log.isDebugEnabled() && (!this.debugInfoStopwatch.isRunning() || this.debugInfoStopwatch.elapsed().toMillis() > SCHEDULER_MAX_DEBUG_INFO_FREQUENCY_MILLIS) && this.noEventsStopwatch.elapsed().toMillis() > SCHEDULER_STALLED_DURATION_THRESHOLD_MILLIS) {
                this.logDebugInfoSafe("Scheduler stalled for %s".formatted(this.noEventsStopwatch.elapsed()));
                this.debugInfoStopwatch.reset().start();
            }
            return !aborted;
        }

        private boolean recordEventsDebugInfo(Event event) {
            if (this.eventDebugInfos.isEmpty()) {
                return false;
            }
            return this.eventDebugInfos.orElseThrow().add(event);
        }

        private void logDebugInfoSafe(String reason) {
            try {
                this.logDebugInfo(reason);
            }
            catch (Throwable e) {
                log.error(e, "Unexpected error while logging debug info for %s", new Object[]{reason});
            }
        }

        private void logDebugInfo(String reason) {
            if (!log.isDebugEnabled()) {
                return;
            }
            log.debug("Scheduler debug info for %s START; reason=%s", new Object[]{this.queryStateMachine.getQueryId(), reason});
            log.debug("General state: %s", new Object[]{MoreObjects.toStringHelper((Object)this).add("queryState", (Object)this.queryStateMachine.getQueryState()).add("finalQueryInfo", this.queryStateMachine.getFinalQueryInfo()).add("maxTaskExecutionAttempts", this.maxTaskExecutionAttempts).add("maxTasksWaitingForNode", this.maxTasksWaitingForNode).add("maxTasksWaitingForExecution", this.maxTasksWaitingForExecution).add("maxPartitionCount", this.maxPartitionCount).add("runtimeAdaptivePartitioningEnabled", this.runtimeAdaptivePartitioningEnabled).add("runtimeAdaptivePartitioningPartitionCount", this.runtimeAdaptivePartitioningPartitionCount).add("runtimeAdaptivePartitioningMaxTaskSizeInBytes", this.runtimeAdaptivePartitioningMaxTaskSizeInBytes).add("stageEstimationForEagerParentEnabled", this.stageEstimationForEagerParentEnabled).add("started", this.started).add("runtimeAdaptivePartitioningApplied", this.runtimeAdaptivePartitioningApplied).add("nextSchedulingPriority", this.nextSchedulingPriority).add("preSchedulingTaskContexts", this.preSchedulingTaskContexts).add("schedulingDelayer", (Object)this.schedulingDelayer).add("queryOutputSet", this.queryOutputSet).toString()});
            this.stageRegistry.logDebugInfo();
            log.debug("StageExecutions:");
            this.stageExecutions.forEach((stageId, stageExecution) -> stageExecution.logDebugInfo());
            this.eventDebugInfos.ifPresent(EventDebugInfos::log);
            log.debug("Scheduler debug info for %s END", new Object[]{this.queryStateMachine.getQueryId()});
        }

        private boolean schedule() {
            if (this.checkComplete()) {
                return false;
            }
            this.optimize();
            this.updateStageExecutions();
            this.scheduleTasks();
            this.processNodeAcquisitions();
            this.updateMemoryRequirements();
            this.loadMoreTaskDescriptorsIfNecessary();
            return true;
        }

        private boolean checkComplete() {
            if (this.queryStateMachine.isDone()) {
                return true;
            }
            for (StageExecution execution : this.stageExecutions.values()) {
                if (execution.getState() != StageState.FAILED) continue;
                StageInfo stageInfo = execution.getStageInfo();
                ExecutionFailureInfo failureCause = stageInfo.getFailureCause();
                Throwable failure = failureCause == null ? new TrinoException((ErrorCodeSupplier)StandardErrorCode.GENERIC_INTERNAL_ERROR, "stage failed due to unknown error: %s".formatted(execution.getStageId())) : failureCause.toException();
                this.taskFailures.forEach(arg_0 -> Scheduler.lambda$checkComplete$4((RuntimeException)failure, arg_0));
                this.queryStateMachine.transitionToFailed(failure);
                return true;
            }
            this.setQueryOutputIfReady();
            return false;
        }

        private void setQueryOutputIfReady() {
            StageId rootStageId = this.getStageId(this.plan.getFragment().getId());
            final StageExecution rootStageExecution = this.stageExecutions.get(rootStageId);
            if (!this.queryOutputSet && rootStageExecution != null && rootStageExecution.getState() == StageState.FINISHED) {
                ListenableFuture<List<ExchangeSourceHandle>> sourceHandles = Exchanges.getAllSourceHandles(rootStageExecution.getExchange().getSourceHandles());
                Futures.addCallback(sourceHandles, (FutureCallback)new FutureCallback<List<ExchangeSourceHandle>>(this){
                    final /* synthetic */ Scheduler this$0;
                    {
                        this.this$0 = this$0;
                    }

                    public void onSuccess(List<ExchangeSourceHandle> handles) {
                        try {
                            this.this$0.queryStateMachine.updateInputsForQueryResults((List<ExchangeInput>)ImmutableList.of((Object)new SpoolingExchangeInput(handles, Optional.of(rootStageExecution.getSinkOutputSelector()))), true);
                            this.this$0.queryStateMachine.transitionToFinishing();
                        }
                        catch (Throwable t) {
                            this.onFailure(t);
                        }
                    }

                    public void onFailure(Throwable t) {
                        this.this$0.queryStateMachine.transitionToFailed(t);
                    }
                }, (Executor)this.queryExecutor);
                this.queryOutputSet = true;
            }
        }

        private void optimize() {
            SubPlan oldPlan = this.plan;
            this.plan = this.optimizePlan(this.plan);
            if (this.plan != oldPlan) {
                this.planInTopologicalOrder = TopologicalOrderSubPlanVisitor.sortPlanInTopologicalOrder(this.plan);
                this.stageRegistry.updatePlan(this.plan);
            }
        }

        private SubPlan optimizePlan(SubPlan plan) {
            if (this.adaptivePlanner.isEmpty()) {
                return plan;
            }
            for (SubPlan subPlan : this.planInTopologicalOrder) {
                PlanFragment fragment = subPlan.getFragment();
                StageId stageId = this.getStageId(fragment.getId());
                if (this.stageExecutions.containsKey(stageId) || subPlan.getChildren().isEmpty()) continue;
                IsReadyForExecutionResult isReadyForExecutionResult = this.isReadyForExecution(subPlan);
                IsReadyForExecutionResult oldValue = this.isReadyForExecutionCache.put(subPlan, isReadyForExecutionResult);
                if (!isReadyForExecutionResult.isReadyForExecution() || oldValue != null && oldValue.isReadyForExecution()) continue;
                return this.adaptivePlanner.get().optimize(plan, this.createRuntimeInfoProvider());
            }
            return plan;
        }

        private RuntimeInfoProvider createRuntimeInfoProvider() {
            ImmutableMap.Builder stageRuntimeOutputStats = ImmutableMap.builder();
            ImmutableMap.Builder planFragments = ImmutableMap.builder();
            this.planInTopologicalOrder.forEach(subPlan -> planFragments.put((Object)subPlan.getFragment().getId(), (Object)subPlan.getFragment()));
            this.stageExecutions.forEach((stageId, stageExecution) -> {
                if (this.isStageRuntimeStatsReady((StageExecution)stageExecution)) {
                    OutputStatsEstimator.OutputStatsEstimateResult runtimeOutputStats = stageExecution.getOutputStats(this.stageExecutions::get, false).get();
                    stageRuntimeOutputStats.put((Object)stageExecution.getStageFragment().getId(), (Object)runtimeOutputStats);
                }
            });
            return new StaticRuntimeInfoProvider((Map<PlanFragmentId, OutputStatsEstimator.OutputStatsEstimateResult>)stageRuntimeOutputStats.buildOrThrow(), (Map<PlanFragmentId, PlanFragment>)planFragments.buildOrThrow());
        }

        private boolean isStageRuntimeStatsReady(StageExecution stageExecution) {
            return stageExecution.getOutputStats(this.stageExecutions::get, false).map(OutputStatsEstimator.OutputStatsEstimateResult::isAccurate).orElse(false);
        }

        private void updateStageExecutions() {
            HashSet<StageId> currentPlanStages = new HashSet<StageId>();
            PlanFragmentId rootFragmentId = this.plan.getFragment().getId();
            for (SubPlan subPlan : this.planInTopologicalOrder) {
                IsReadyForExecutionResult result;
                PlanFragmentId fragmentId = subPlan.getFragment().getId();
                StageId stageId2 = this.getStageId(fragmentId);
                currentPlanStages.add(stageId2);
                StageExecution stageExecution2 = this.stageExecutions.get(stageId2);
                if (stageExecution2 == null && (result = this.isReadyForExecutionCache.computeIfAbsent(subPlan, ignored -> this.isReadyForExecution(subPlan))).isReadyForExecution()) {
                    this.createStageExecution(subPlan, fragmentId.equals(rootFragmentId), result.getSourceOutputSizeEstimates(), this.nextSchedulingPriority++, result.isSpeculative(), result.isEager());
                }
                if (stageExecution2 == null || !stageExecution2.getState().equals((Object)StageState.FINISHED) || stageExecution2.isExchangeClosed()) continue;
                this.closeSourceExchanges(subPlan);
            }
            this.stageExecutions.forEach((stageId, stageExecution) -> {
                if (!currentPlanStages.contains(stageId)) {
                    stageExecution.abort();
                }
            });
            this.isReadyForExecutionCache.clear();
        }

        private IsReadyForExecutionResult isReadyForExecution(SubPlan subPlan) {
            boolean standardTasksInQueue = this.schedulingQueue.getTaskCount(TaskExecutionClass.STANDARD) > 0;
            boolean standardTasksWaitingForNode = this.preSchedulingTaskContexts.values().stream().anyMatch(task -> task.getExecutionClass() == TaskExecutionClass.STANDARD && !task.getNodeLease().getNode().isDone());
            boolean eager = this.stageEstimationForEagerParentEnabled && this.shouldScheduleEagerly(subPlan);
            boolean speculative = false;
            int finishedSourcesCount = 0;
            HashMap<String, Integer> estimateCountByKind = new HashMap<String, Integer>();
            ImmutableMap.Builder sourceOutputStatsEstimates = ImmutableMap.builder();
            boolean someSourcesMadeProgress = false;
            for (SubPlan source : subPlan.getChildren()) {
                Optional<OutputStatsEstimator.OutputStatsEstimateResult> result;
                StageExecution sourceStageExecution = this.stageExecutions.get(this.getStageId(source.getFragment().getId()));
                if (sourceStageExecution == null) {
                    return IsReadyForExecutionResult.notReady();
                }
                if (sourceStageExecution.getState() != StageState.FINISHED) {
                    if (!this.exchangeManager.supportsConcurrentReadAndWrite()) {
                        return IsReadyForExecutionResult.notReady();
                    }
                    if (this.runtimeAdaptivePartitioningApplied) {
                        return IsReadyForExecutionResult.notReady();
                    }
                    if ((standardTasksInQueue || standardTasksWaitingForNode) && !eager) {
                        return IsReadyForExecutionResult.notReady();
                    }
                } else {
                    result = sourceStageExecution.getOutputStats(this.stageExecutions::get, eager).orElseThrow();
                    Verify.verify((boolean)Objects.equals(((OutputStatsEstimator.OutputStatsEstimateResult)((Object)result)).kind(), "FINISHED"), (String)"expected FINISHED status but got %s", (Object)((OutputStatsEstimator.OutputStatsEstimateResult)((Object)result)).kind());
                    ++finishedSourcesCount;
                    sourceOutputStatsEstimates.put((Object)sourceStageExecution.getStageId(), (Object)((OutputStatsEstimator.OutputStatsEstimateResult)((Object)result)).outputDataSizeEstimate());
                    someSourcesMadeProgress = true;
                    continue;
                }
                speculative = true;
                if (!this.canOutputDataEarly(source)) {
                    return IsReadyForExecutionResult.notReady();
                }
                if (!SchedulingUtils.canStream(subPlan, source)) {
                    return IsReadyForExecutionResult.notReady();
                }
                result = sourceStageExecution.getOutputStats(this.stageExecutions::get, eager);
                if (result.isEmpty()) {
                    return IsReadyForExecutionResult.notReady();
                }
                estimateCountByKind.compute(((OutputStatsEstimator.OutputStatsEstimateResult)result.orElseThrow()).kind(), (k, v) -> v == null ? 0 : v + 1);
                sourceOutputStatsEstimates.put((Object)sourceStageExecution.getStageId(), (Object)((OutputStatsEstimator.OutputStatsEstimateResult)result.orElseThrow()).outputDataSizeEstimate());
                someSourcesMadeProgress = someSourcesMadeProgress || sourceStageExecution.isSomeProgressMade();
            }
            if (!(subPlan.getChildren().isEmpty() || someSourcesMadeProgress || eager)) {
                return IsReadyForExecutionResult.notReady();
            }
            if (speculative) {
                log.debug("scheduling speculative %s/%s; sources: finished=%s; kinds=%s", new Object[]{this.queryStateMachine.getQueryId(), subPlan.getFragment().getId(), finishedSourcesCount, estimateCountByKind});
                estimateCountByKind.forEach(this.stageExecutionStats::recordSourceOutputEstimationOnStageStart);
            } else {
                this.stageExecutionStats.recordSourcesFinishedOnStageStart(subPlan.getChildren().size());
            }
            return IsReadyForExecutionResult.ready((Map<StageId, OutputDataSizeEstimate>)sourceOutputStatsEstimates.buildOrThrow(), eager, speculative);
        }

        private boolean shouldScheduleEagerly(SubPlan subPlan) {
            return Scheduler.hasSmallFinalLimitNode(subPlan);
        }

        private static boolean hasSmallFinalLimitNode(SubPlan subPlan) {
            if (!subPlan.getFragment().getPartitioning().isSingleNode()) {
                return false;
            }
            return PlanNodeSearcher.searchFrom(subPlan.getFragment().getRoot()).where(node -> {
                LimitNode limitNode;
                return node instanceof LimitNode && !(limitNode = (LimitNode)node).isPartial() && limitNode.getCount() < 1000000L;
            }).matches();
        }

        private boolean canOutputDataEarly(SubPlan source) {
            PlanFragment fragment = source.getFragment();
            return this.canOutputDataEarly(fragment.getRoot());
        }

        private boolean canOutputDataEarly(PlanNode node) {
            if (node instanceof AggregationNode) {
                AggregationNode aggregationNode = (AggregationNode)node;
                return aggregationNode.getStep().isOutputPartial();
            }
            return node.getSources().stream().allMatch(this::canOutputDataEarly);
        }

        private void closeSourceExchanges(SubPlan subPlan) {
            for (SubPlan source : subPlan.getChildren()) {
                StageExecution sourceStageExecution = this.stageExecutions.get(this.getStageId(source.getFragment().getId()));
                if (sourceStageExecution == null || !sourceStageExecution.getState().isDone()) continue;
                sourceStageExecution.closeExchange();
            }
        }

        private void createStageExecution(SubPlan subPlan, boolean rootFragment, Map<StageId, OutputDataSizeEstimate> sourceOutputSizeEstimates, int schedulingPriority, boolean speculative, boolean eager) {
            Closer closer = Closer.create();
            try {
                PlanFragment fragment = subPlan.getFragment();
                Session session = this.queryStateMachine.getSession();
                StageId stageId = this.getStageId(fragment.getId());
                SqlStage stage = SqlStage.createSqlStage(stageId, fragment, TableInfo.extract(session, this.metadata, fragment), this.remoteTaskFactory, session, this.summarizeTaskInfo, this.nodeTaskMap, this.queryStateMachine.getStateMachineExecutor(), this.tracer, this.schedulerSpan, this.schedulerStats);
                closer.register(stage::abort);
                this.stageRegistry.add(stage);
                stage.addFinalStageInfoListener(status -> this.queryStateMachine.updateQueryInfo(Optional.ofNullable(this.stageRegistry.getStageInfo())));
                ImmutableMap.Builder sourceExchangesBuilder = ImmutableMap.builder();
                HashMap<PlanFragmentId, OutputDataSizeEstimate> sourceOutputEstimatesByFragmentId = new HashMap<PlanFragmentId, OutputDataSizeEstimate>();
                for (SubPlan subPlan2 : subPlan.getChildren()) {
                    PlanFragmentId sourceFragmentId = subPlan2.getFragment().getId();
                    StageId sourceStageId = this.getStageId(sourceFragmentId);
                    StageExecution sourceStageExecution = this.getStageExecution(sourceStageId);
                    sourceExchangesBuilder.put((Object)sourceFragmentId, (Object)sourceStageExecution.getExchange());
                    OutputDataSizeEstimate outputDataSizeResult = sourceOutputSizeEstimates.get(sourceStageId);
                    Verify.verify((outputDataSizeResult != null ? 1 : 0) != 0, (String)"No output data size estimate in %s map for stage %s", sourceOutputSizeEstimates, (Object)sourceStageId);
                    sourceOutputEstimatesByFragmentId.put(sourceFragmentId, outputDataSizeResult);
                    this.stageConsumers.put((Object)sourceStageExecution.getStageId(), (Object)stageId);
                }
                ImmutableMap.Builder outputDataSizeEstimates = ImmutableMap.builder();
                for (RemoteSourceNode remoteSource : stage.getFragment().getRemoteSourceNodes()) {
                    ArrayList<OutputDataSizeEstimate> estimates = new ArrayList<OutputDataSizeEstimate>();
                    for (PlanFragmentId fragmentId : remoteSource.getSourceFragmentIds()) {
                        OutputDataSizeEstimate fragmentEstimate = (OutputDataSizeEstimate)sourceOutputEstimatesByFragmentId.get(fragmentId);
                        Verify.verify((fragmentEstimate != null ? 1 : 0) != 0, (String)"fragmentEstimate not found for fragment %s", (Object)fragmentId);
                        estimates.add(fragmentEstimate);
                    }
                    outputDataSizeEstimates.put((Object)remoteSource.getId(), (Object)OutputDataSizeEstimate.merge(estimates));
                }
                ImmutableMap immutableMap = sourceExchangesBuilder.buildOrThrow();
                EventDrivenTaskSource taskSource = (EventDrivenTaskSource)closer.register((Closeable)this.taskSourceFactory.create(session, stage.getStageSpan(), fragment, (Map<PlanFragmentId, Exchange>)immutableMap, this.partitioningSchemeFactory.get(fragment.getPartitioning(), fragment.getPartitionCount()), stage::recordGetSplitTime, (Map<PlanNodeId, OutputDataSizeEstimate>)outputDataSizeEstimates.buildOrThrow()));
                FaultTolerantPartitioningScheme sinkPartitioningScheme = this.partitioningSchemeFactory.get(fragment.getOutputPartitioningScheme().getPartitioning().getHandle(), fragment.getOutputPartitioningScheme().getPartitionCount());
                ExchangeContextInstance exchangeContext = new ExchangeContextInstance(this.queryStateMachine.getQueryId(), new ExchangeId("external-exchange-" + stage.getStageId().getId()), this.schedulerSpan);
                boolean preserveOrderWithinPartition = rootFragment && stage.getFragment().getPartitioning().equals(SystemPartitioningHandle.SINGLE_DISTRIBUTION);
                Exchange exchange = (Exchange)closer.register((Closeable)this.exchangeManager.createExchange((ExchangeContext)exchangeContext, sinkPartitioningScheme.getPartitionCount(), preserveOrderWithinPartition));
                boolean coordinatorStage = stage.getFragment().getPartitioning().equals(SystemPartitioningHandle.COORDINATOR_DISTRIBUTION);
                if (eager) {
                    immutableMap.values().forEach(sourceExchange -> sourceExchange.setSourceHandlesDeliveryMode(Exchange.SourceHandlesDeliveryMode.EAGER));
                }
                Function<PlanFragmentId, PlanFragment> planFragmentLookup = planFragmentId -> {
                    StageExecution stageExecution = this.stageExecutions.get(this.getStageId((PlanFragmentId)planFragmentId));
                    Preconditions.checkArgument((stageExecution != null ? 1 : 0) != 0, (String)"stage for fragment %s not started yet", (Object)planFragmentId);
                    return stageExecution.getStageInfo().getPlan();
                };
                StageExecution execution = new StageExecution(this.taskDescriptorStorage, this.taskFailures, stage, taskSource, sinkPartitioningScheme, exchange, this.stageExecutionStats, this.memoryEstimatorFactory.createPartitionMemoryEstimator(session, fragment, planFragmentLookup), this.outputStatsEstimator, coordinatorStage ? 1 : this.maxTaskExecutionAttempts, schedulingPriority, eager, speculative, this.dynamicFilterService);
                this.stageExecutions.put(execution.getStageId(), execution);
                for (SubPlan source : subPlan.getChildren()) {
                    PlanFragmentId sourceFragmentId = source.getFragment().getId();
                    StageExecution sourceExecution = this.getStageExecution(this.getStageId(sourceFragmentId));
                    execution.setSourceOutputSelector(sourceFragmentId, sourceExecution.getSinkOutputSelector());
                }
            }
            catch (Throwable t) {
                block9: {
                    try {
                        closer.close();
                    }
                    catch (Throwable closerFailure) {
                        if (closerFailure == t) break block9;
                        t.addSuppressed(closerFailure);
                    }
                }
                throw t;
            }
        }

        private StageId getStageId(PlanFragmentId fragmentId) {
            return StageId.create(this.queryStateMachine.getQueryId(), fragmentId);
        }

        private void scheduleTasks() {
            long standardTasksWaitingForNode = this.getWaitingForNodeTasksCount(TaskExecutionClass.STANDARD);
            long speculativeTasksWaitingForNode = this.getWaitingForNodeTasksCount(TaskExecutionClass.SPECULATIVE);
            long eagerSpeculativeTasksWaitingForNode = this.getWaitingForNodeTasksCount(TaskExecutionClass.EAGER_SPECULATIVE);
            block5: while (!this.schedulingQueue.isEmpty()) {
                int partitionId;
                Optional<NodeRequirements> nodeRequirements;
                StageExecution stageExecution;
                PrioritizedScheduledTask scheduledTask;
                if (this.schedulingQueue.getTaskCount(TaskExecutionClass.EAGER_SPECULATIVE) > 0 && eagerSpeculativeTasksWaitingForNode < (long)this.maxTasksWaitingForNode) {
                    scheduledTask = this.schedulingQueue.pollOrThrow(TaskExecutionClass.EAGER_SPECULATIVE);
                } else if (this.schedulingQueue.getTaskCount(TaskExecutionClass.STANDARD) > 0) {
                    if (standardTasksWaitingForNode >= (long)this.maxTasksWaitingForNode) break;
                    scheduledTask = this.schedulingQueue.pollOrThrow(TaskExecutionClass.STANDARD);
                } else {
                    if (this.schedulingQueue.getTaskCount(TaskExecutionClass.SPECULATIVE) <= 0 || standardTasksWaitingForNode > 0L || speculativeTasksWaitingForNode >= (long)this.maxTasksWaitingForNode) break;
                    scheduledTask = this.schedulingQueue.pollOrThrow(TaskExecutionClass.SPECULATIVE);
                }
                if ((stageExecution = this.getStageExecution(scheduledTask.task().stageId())).getState().isDone() || (nodeRequirements = stageExecution.getNodeRequirements(partitionId = scheduledTask.task().partitionId())).isEmpty()) continue;
                PartitionMemoryEstimator.MemoryRequirements memoryRequirements = stageExecution.getMemoryRequirements(partitionId);
                NodeAllocator.NodeLease lease = this.nodeAllocator.acquire(nodeRequirements.get(), memoryRequirements.getRequiredMemory(), scheduledTask.getExecutionClass());
                lease.getNode().addListener(() -> this.eventQueue.add(Event.WAKE_UP), (Executor)this.queryExecutor);
                this.preSchedulingTaskContexts.put(scheduledTask.task(), new PreSchedulingTaskContext(lease, scheduledTask.getExecutionClass()));
                switch (scheduledTask.getExecutionClass()) {
                    case STANDARD: {
                        ++standardTasksWaitingForNode;
                        continue block5;
                    }
                    case SPECULATIVE: {
                        ++speculativeTasksWaitingForNode;
                        continue block5;
                    }
                    case EAGER_SPECULATIVE: {
                        ++eagerSpeculativeTasksWaitingForNode;
                        continue block5;
                    }
                }
                throw new IllegalArgumentException("Unknown execution class " + String.valueOf((Object)scheduledTask.getExecutionClass()));
            }
        }

        private long getWaitingForNodeTasksCount(TaskExecutionClass executionClass) {
            return this.preSchedulingTaskContexts.values().stream().filter(context -> !context.getNodeLease().getNode().isDone()).filter(context -> context.getExecutionClass() == executionClass).count();
        }

        private void processNodeAcquisitions() {
            Iterator<Map.Entry<ScheduledTask, PreSchedulingTaskContext>> iterator = this.preSchedulingTaskContexts.entrySet().iterator();
            while (iterator.hasNext()) {
                Map.Entry<ScheduledTask, PreSchedulingTaskContext> entry = iterator.next();
                ScheduledTask scheduledTask = entry.getKey();
                PreSchedulingTaskContext context = entry.getValue();
                if (context.isWaitingForSinkInstanceHandle()) {
                    Verify.verify((boolean)context.getNodeLease().getNode().isDone(), (String)"isWaitingForSinkInstanceHandle true but node not set", (Object[])new Object[0]);
                    continue;
                }
                NodeAllocator.NodeLease nodeLease = context.getNodeLease();
                StageExecution stageExecution = this.getStageExecution(scheduledTask.stageId());
                if (stageExecution.getState().isDone()) {
                    iterator.remove();
                    nodeLease.release();
                    continue;
                }
                if (!nodeLease.getNode().isDone()) continue;
                context.setWaitingForSinkInstanceHandle(true);
                Optional<GetExchangeSinkInstanceHandleResult> getExchangeSinkInstanceHandleResult = stageExecution.getExchangeSinkInstanceHandle(scheduledTask.partitionId());
                if (getExchangeSinkInstanceHandleResult.isPresent()) {
                    CompletableFuture<ExchangeSinkInstanceHandle> sinkInstanceHandleFuture = getExchangeSinkInstanceHandleResult.get().exchangeSinkInstanceHandleFuture();
                    sinkInstanceHandleFuture.whenComplete((sinkInstanceHandle, throwable) -> {
                        if (throwable != null) {
                            this.eventQueue.add(new StageFailureEvent(scheduledTask.stageId, (Throwable)throwable));
                        } else {
                            this.eventQueue.add(new SinkInstanceHandleAcquiredEvent(scheduledTask.stageId(), scheduledTask.partitionId(), nodeLease, ((GetExchangeSinkInstanceHandleResult)getExchangeSinkInstanceHandleResult.get()).attempt(), (ExchangeSinkInstanceHandle)sinkInstanceHandle));
                        }
                    });
                    continue;
                }
                iterator.remove();
                nodeLease.release();
            }
        }

        private void updateMemoryRequirements() {
            for (StageExecution stageExecution : this.stageExecutions.values()) {
                stageExecution.updateMemoryRequirements();
            }
            for (Map.Entry entry : this.preSchedulingTaskContexts.entrySet()) {
                ScheduledTask scheduledTask = (ScheduledTask)entry.getKey();
                PreSchedulingTaskContext taskContext = (PreSchedulingTaskContext)entry.getValue();
                PartitionMemoryEstimator.MemoryRequirements currentPartitionMemoryRequirements = this.stageExecutions.get(scheduledTask.stageId()).getMemoryRequirements(scheduledTask.partitionId());
                taskContext.getNodeLease().setMemoryRequirement(currentPartitionMemoryRequirements.getRequiredMemory());
            }
        }

        @Override
        public Void onSinkInstanceHandleAcquired(SinkInstanceHandleAcquiredEvent sinkInstanceHandleAcquiredEvent) {
            ScheduledTask scheduledTask = new ScheduledTask(sinkInstanceHandleAcquiredEvent.getStageId(), sinkInstanceHandleAcquiredEvent.getPartitionId());
            PreSchedulingTaskContext context = this.preSchedulingTaskContexts.remove(scheduledTask);
            Verify.verify((context != null ? 1 : 0) != 0, (String)"expected %s in preSchedulingTaskContexts", (Object)scheduledTask);
            Verify.verify((boolean)context.getNodeLease().getNode().isDone(), (String)"expected node set for %s", (Object)scheduledTask);
            Verify.verify((boolean)context.isWaitingForSinkInstanceHandle(), (String)"expected isWaitingForSinkInstanceHandle set for %s", (Object)scheduledTask);
            NodeAllocator.NodeLease nodeLease = sinkInstanceHandleAcquiredEvent.getNodeLease();
            int partitionId = sinkInstanceHandleAcquiredEvent.getPartitionId();
            StageId stageId = sinkInstanceHandleAcquiredEvent.getStageId();
            int attempt = sinkInstanceHandleAcquiredEvent.getAttempt();
            ExchangeSinkInstanceHandle sinkInstanceHandle = sinkInstanceHandleAcquiredEvent.getSinkInstanceHandle();
            StageExecution stageExecution = this.getStageExecution(stageId);
            Optional<RemoteTask> remoteTask = stageExecution.schedule(partitionId, sinkInstanceHandle, attempt, nodeLease, context.getExecutionClass().isSpeculative());
            remoteTask.ifPresent(task -> {
                task.addStateChangeListener(this.createExchangeSinkInstanceHandleUpdateRequiredListener());
                task.addStateChangeListener(taskStatus -> {
                    if (taskStatus.getState().isDone()) {
                        nodeLease.release();
                    }
                });
                task.addFinalTaskInfoListener(this.taskExecutionStats::update);
                task.addFinalTaskInfoListener(taskInfo -> this.eventQueue.add(new RemoteTaskCompletedEvent(taskInfo.taskStatus())));
                nodeLease.attachTaskId(task.getTaskId());
                task.start();
                if (this.queryStateMachine.getQueryState() == QueryState.STARTING) {
                    this.queryStateMachine.transitionToRunning();
                }
            });
            if (remoteTask.isEmpty()) {
                nodeLease.release();
            }
            return null;
        }

        private StateMachine.StateChangeListener<TaskStatus> createExchangeSinkInstanceHandleUpdateRequiredListener() {
            AtomicLong respondedToVersion = new AtomicLong(-1L);
            return taskStatus -> {
                long localVersion;
                OutputBufferStatus outputBufferStatus = taskStatus.getOutputBufferStatus();
                if (outputBufferStatus.getOutputBuffersVersion().isEmpty()) {
                    return;
                }
                if (!outputBufferStatus.isExchangeSinkInstanceHandleUpdateRequired()) {
                    return;
                }
                long remoteVersion = outputBufferStatus.getOutputBuffersVersion().getAsLong();
                while (remoteVersion > (localVersion = respondedToVersion.get())) {
                    if (!respondedToVersion.compareAndSet(localVersion, remoteVersion)) continue;
                    this.eventQueue.add(new RemoteTaskExchangeSinkUpdateRequiredEvent((TaskStatus)taskStatus));
                    break;
                }
            };
        }

        private void loadMoreTaskDescriptorsIfNecessary() {
            boolean schedulingQueueIsFull = this.schedulingQueue.getTaskCount(TaskExecutionClass.STANDARD) >= this.maxTasksWaitingForExecution;
            for (final StageExecution stageExecution : this.stageExecutions.values()) {
                if (schedulingQueueIsFull && !stageExecution.hasOpenTaskRunning() && !stageExecution.isEager()) continue;
                stageExecution.loadMoreTaskDescriptors().ifPresent(future -> Futures.addCallback((ListenableFuture)future, (FutureCallback)new FutureCallback<SplitAssigner.AssignmentResult>(this){
                    final /* synthetic */ Scheduler this$0;
                    {
                        this.this$0 = this$0;
                    }

                    public void onSuccess(SplitAssigner.AssignmentResult result) {
                        this.this$0.eventQueue.add(new SplitAssignmentEvent(stageExecution.getStageId(), result));
                    }

                    public void onFailure(Throwable t) {
                        this.this$0.eventQueue.add(new StageFailureEvent(stageExecution.getStageId(), t));
                    }
                }, (Executor)this.queryExecutor));
            }
        }

        public void abort() {
            this.eventQueue.clear();
            this.eventQueue.add(Event.ABORT);
        }

        @Override
        public Void onRemoteTaskCompleted(RemoteTaskCompletedEvent event) {
            TaskStatus taskStatus = event.getTaskStatus();
            TaskId taskId = taskStatus.getTaskId();
            TaskState taskState = taskStatus.getState();
            StageExecution stageExecution = this.getStageExecution(taskId.getStageId());
            if (taskState == TaskState.FINISHED) {
                stageExecution.taskFinished(taskId, taskStatus);
            } else if (taskState == TaskState.FAILED) {
                ExecutionFailureInfo failureInfo = taskStatus.getFailures().stream().findFirst().map(this::rewriteTransportFailure).orElseGet(() -> Failures.toFailure(new TrinoException((ErrorCodeSupplier)StandardErrorCode.GENERIC_INTERNAL_ERROR, "A task failed for an unknown reason")));
                List<PrioritizedScheduledTask> replacementTasks = stageExecution.taskFailed(taskId, failureInfo, taskStatus);
                replacementTasks.forEach(this.schedulingQueue::addOrUpdate);
                if (this.shouldDelayScheduling(failureInfo.getErrorCode())) {
                    this.schedulingDelayer.startOrProlongDelayIfNecessary();
                    this.scheduledExecutorService.schedule(() -> this.eventQueue.add(Event.WAKE_UP), this.schedulingDelayer.getRemainingDelayInMillis(), TimeUnit.MILLISECONDS);
                }
            }
            ExchangeSourceOutputSelector outputSelector = stageExecution.getSinkOutputSelector();
            for (StageId consumerStageId : this.stageConsumers.get((Object)stageExecution.getStageId())) {
                this.getStageExecution(consumerStageId).setSourceOutputSelector(stageExecution.getStageFragmentId(), outputSelector);
            }
            return null;
        }

        @Override
        public Void onRemoteTaskExchangeSinkUpdateRequired(RemoteTaskExchangeSinkUpdateRequiredEvent event) {
            TaskId taskId = event.getTaskStatus().getTaskId();
            StageExecution stageExecution = this.getStageExecution(taskId.getStageId());
            stageExecution.initializeUpdateOfExchangeSinkInstanceHandle(taskId, this.eventQueue);
            return null;
        }

        @Override
        public Void onRemoteTaskExchangeUpdatedSinkAcquired(RemoteTaskExchangeUpdatedSinkAcquired event) {
            TaskId taskId = event.getTaskId();
            StageExecution stageExecution = this.getStageExecution(taskId.getStageId());
            stageExecution.finalizeUpdateOfExchangeSinkInstanceHandle(taskId, event.getExchangeSinkInstanceHandle());
            return null;
        }

        @Override
        public Void onSplitAssignment(SplitAssignmentEvent event) {
            StageExecution stageExecution = this.getStageExecution(event.getStageId());
            SplitAssigner.AssignmentResult assignment = event.getAssignmentResult();
            for (SplitAssigner.Partition partition : assignment.partitionsAdded()) {
                stageExecution.addPartition(partition.partitionId(), partition.nodeRequirements());
            }
            for (SplitAssigner.PartitionUpdate partitionUpdate : assignment.partitionUpdates()) {
                Optional<PrioritizedScheduledTask> scheduledTask = stageExecution.updatePartition(partitionUpdate.partitionId(), partitionUpdate.planNodeId(), partitionUpdate.readyForScheduling(), partitionUpdate.splits(), partitionUpdate.noMoreSplits());
                scheduledTask.ifPresent(this.schedulingQueue::addOrUpdate);
            }
            assignment.sealedPartitions().forEach(partitionId -> {
                Optional<PrioritizedScheduledTask> scheduledTask = stageExecution.sealPartition(partitionId);
                scheduledTask.ifPresent(prioritizedTask -> {
                    PreSchedulingTaskContext context = this.preSchedulingTaskContexts.get(prioritizedTask.task());
                    if (context != null) {
                        context.setExecutionClass(prioritizedTask.getExecutionClass());
                        context.getNodeLease().setExecutionClass(prioritizedTask.getExecutionClass());
                        return;
                    }
                    this.schedulingQueue.addOrUpdate((PrioritizedScheduledTask)prioritizedTask);
                });
            });
            if (assignment.noMorePartitions()) {
                stageExecution.noMorePartitions();
            }
            stageExecution.taskDescriptorLoadingComplete();
            return null;
        }

        @Override
        public Void onStageFailure(StageFailureEvent event) {
            StageExecution stageExecution = this.getStageExecution(event.getStageId());
            stageExecution.fail(event.getFailure());
            return null;
        }

        private StageExecution getStageExecution(StageId stageId) {
            StageExecution execution = this.stageExecutions.get(stageId);
            Preconditions.checkState((execution != null ? 1 : 0) != 0, (String)"stage execution does not exist for stage: %s", (Object)stageId);
            return execution;
        }

        private boolean shouldDelayScheduling(@Nullable ErrorCode errorCode) {
            return errorCode == null || errorCode.getType() == ErrorType.INTERNAL_ERROR || errorCode.getType() == ErrorType.EXTERNAL;
        }

        private ExecutionFailureInfo rewriteTransportFailure(ExecutionFailureInfo executionFailureInfo) {
            if (executionFailureInfo.getRemoteHost() == null || this.failureDetector.getState(executionFailureInfo.getRemoteHost()) != FailureDetector.State.GONE) {
                return executionFailureInfo;
            }
            return new ExecutionFailureInfo(executionFailureInfo.getType(), executionFailureInfo.getMessage(), executionFailureInfo.getCause(), executionFailureInfo.getSuppressed(), executionFailureInfo.getStack(), executionFailureInfo.getErrorLocation(), StandardErrorCode.REMOTE_HOST_GONE.toErrorCode(), executionFailureInfo.getRemoteHost());
        }

        private static /* synthetic */ void lambda$checkComplete$4(RuntimeException failure, Map.Entry taskFailure) {
            failure.addSuppressed(new RuntimeException("Task " + String.valueOf(taskFailure.getKey()) + " failed", (Throwable)taskFailure.getValue()));
        }

        private static class IsReadyForExecutionResult {
            private final boolean readyForExecution;
            private final boolean speculative;
            private final Optional<Map<StageId, OutputDataSizeEstimate>> sourceOutputSizeEstimates;
            private final boolean eager;

            @CheckReturnValue
            public static IsReadyForExecutionResult ready(Map<StageId, OutputDataSizeEstimate> sourceOutputSizeEstimates, boolean eager, boolean speculative) {
                return new IsReadyForExecutionResult(true, Optional.of(sourceOutputSizeEstimates), eager, speculative);
            }

            @CheckReturnValue
            public static IsReadyForExecutionResult notReady() {
                return new IsReadyForExecutionResult(false, Optional.empty(), false, false);
            }

            private IsReadyForExecutionResult(boolean readyForExecution, Optional<Map<StageId, OutputDataSizeEstimate>> sourceOutputSizeEstimates, boolean eager, boolean speculative) {
                Objects.requireNonNull(sourceOutputSizeEstimates, "sourceOutputSizeEstimates is null");
                if (readyForExecution) {
                    Preconditions.checkArgument((boolean)sourceOutputSizeEstimates.isPresent(), (Object)"expected sourceOutputSizeEstimates to be set");
                }
                if (!readyForExecution) {
                    Preconditions.checkArgument((boolean)sourceOutputSizeEstimates.isEmpty(), (Object)"expected sourceOutputSizeEstimates to be not set");
                }
                this.readyForExecution = readyForExecution;
                this.speculative = speculative;
                this.sourceOutputSizeEstimates = sourceOutputSizeEstimates.map(ImmutableMap::copyOf);
                this.eager = eager;
            }

            public boolean isReadyForExecution() {
                return this.readyForExecution;
            }

            public Map<StageId, OutputDataSizeEstimate> getSourceOutputSizeEstimates() {
                return this.sourceOutputSizeEstimates.orElseThrow();
            }

            public boolean isEager() {
                return this.eager;
            }

            public boolean isSpeculative() {
                return this.speculative;
            }
        }
    }

    private static class SchedulingDelayer {
        private final long minRetryDelayInMillis;
        private final long maxRetryDelayInMillis;
        private final double retryDelayScaleFactor;
        private final Stopwatch stopwatch;
        private long currentDelayInMillis;

        private SchedulingDelayer(Duration minRetryDelay, Duration maxRetryDelay, double retryDelayScaleFactor, Stopwatch stopwatch) {
            this.minRetryDelayInMillis = Objects.requireNonNull(minRetryDelay, "minRetryDelay is null").toMillis();
            this.maxRetryDelayInMillis = Objects.requireNonNull(maxRetryDelay, "maxRetryDelay is null").toMillis();
            Preconditions.checkArgument((retryDelayScaleFactor >= 1.0 ? 1 : 0) != 0, (String)"retryDelayScaleFactor is expected to be greater than or equal to 1: %s", (Object)retryDelayScaleFactor);
            this.retryDelayScaleFactor = retryDelayScaleFactor;
            this.stopwatch = Objects.requireNonNull(stopwatch, "stopwatch is null");
        }

        public void startOrProlongDelayIfNecessary() {
            if (this.stopwatch.isRunning()) {
                if (this.stopwatch.elapsed(TimeUnit.MILLISECONDS) > this.currentDelayInMillis) {
                    this.stopwatch.reset().start();
                    this.currentDelayInMillis = Math.min(Math.round((double)this.currentDelayInMillis * this.retryDelayScaleFactor), this.maxRetryDelayInMillis);
                }
            } else {
                this.stopwatch.start();
                this.currentDelayInMillis = this.minRetryDelayInMillis;
            }
        }

        public long getRemainingDelayInMillis() {
            if (this.stopwatch.isRunning()) {
                return Math.max(0L, this.currentDelayInMillis - this.stopwatch.elapsed(TimeUnit.MILLISECONDS));
            }
            return 0L;
        }

        public String toString() {
            return MoreObjects.toStringHelper((Object)this).add("minRetryDelayInMillis", this.minRetryDelayInMillis).add("maxRetryDelayInMillis", this.maxRetryDelayInMillis).add("retryDelayScaleFactor", this.retryDelayScaleFactor).add("stopwatch", (Object)this.stopwatch).add("currentDelayInMillis", this.currentDelayInMillis).toString();
        }
    }

    private static class PreSchedulingTaskContext {
        private final NodeAllocator.NodeLease nodeLease;
        private TaskExecutionClass executionClass;
        private boolean waitingForSinkInstanceHandle;

        public PreSchedulingTaskContext(NodeAllocator.NodeLease nodeLease, TaskExecutionClass executionClass) {
            this.nodeLease = Objects.requireNonNull(nodeLease, "nodeLease is null");
            this.executionClass = Objects.requireNonNull(executionClass, "executionClass is null");
        }

        public NodeAllocator.NodeLease getNodeLease() {
            return this.nodeLease;
        }

        public TaskExecutionClass getExecutionClass() {
            return this.executionClass;
        }

        public void setExecutionClass(TaskExecutionClass executionClass) {
            Preconditions.checkArgument((boolean)this.executionClass.canTransitionTo(executionClass), (String)"cannot change execution class from %s to %s", (Object)((Object)this.executionClass), (Object)((Object)executionClass));
            this.executionClass = executionClass;
        }

        public boolean isWaitingForSinkInstanceHandle() {
            return this.waitingForSinkInstanceHandle;
        }

        public void setWaitingForSinkInstanceHandle(boolean waitingForSinkInstanceHandle) {
            this.waitingForSinkInstanceHandle = waitingForSinkInstanceHandle;
        }

        public String toString() {
            return MoreObjects.toStringHelper((Object)this).add("nodeLease", (Object)this.nodeLease).add("executionClass", (Object)this.executionClass).add("waitingForSinkInstanceHandle", this.waitingForSinkInstanceHandle).toString();
        }
    }

    private record GetExchangeSinkInstanceHandleResult(CompletableFuture<ExchangeSinkInstanceHandle> exchangeSinkInstanceHandleFuture, int attempt) {
        public GetExchangeSinkInstanceHandleResult {
            Objects.requireNonNull(exchangeSinkInstanceHandleFuture, "exchangeSinkInstanceHandleFuture is null");
        }
    }

    private static abstract class StageEvent
    implements Event {
        private final StageId stageId;

        protected StageEvent(StageId stageId) {
            this.stageId = Objects.requireNonNull(stageId, "stageId is null");
        }

        public StageId getStageId() {
            return this.stageId;
        }
    }

    private static class StageFailureEvent
    extends StageEvent {
        private final Throwable failure;

        public StageFailureEvent(StageId stageId, Throwable failure) {
            super(stageId);
            this.failure = Objects.requireNonNull(failure, "failure is null");
        }

        public Throwable getFailure() {
            return this.failure;
        }

        @Override
        public <T> T accept(EventListener<T> listener) {
            return listener.onStageFailure(this);
        }

        public String toString() {
            return MoreObjects.toStringHelper((Object)this).add("stageId", (Object)this.getStageId()).add("failure", (Object)this.failure).toString();
        }
    }

    private static class SplitAssignmentEvent
    extends StageEvent {
        private final SplitAssigner.AssignmentResult assignmentResult;

        public SplitAssignmentEvent(StageId stageId, SplitAssigner.AssignmentResult assignmentResult) {
            super(stageId);
            this.assignmentResult = Objects.requireNonNull(assignmentResult, "assignmentResult is null");
        }

        public SplitAssigner.AssignmentResult getAssignmentResult() {
            return this.assignmentResult;
        }

        @Override
        public <T> T accept(EventListener<T> listener) {
            return listener.onSplitAssignment(this);
        }

        public String toString() {
            return MoreObjects.toStringHelper((Object)this).add("stageId", (Object)this.getStageId()).add("assignmentResult", (Object)this.assignmentResult).toString();
        }
    }

    private static abstract class RemoteTaskEvent
    implements Event {
        private final TaskStatus taskStatus;

        protected RemoteTaskEvent(TaskStatus taskStatus) {
            this.taskStatus = Objects.requireNonNull(taskStatus, "taskStatus is null");
        }

        public TaskStatus getTaskStatus() {
            return this.taskStatus;
        }

        public String toString() {
            return MoreObjects.toStringHelper((Object)this).add("taskStatus", (Object)this.taskStatus).toString();
        }
    }

    private static class RemoteTaskExchangeUpdatedSinkAcquired
    implements Event {
        private final TaskId taskId;
        private final ExchangeSinkInstanceHandle exchangeSinkInstanceHandle;

        private RemoteTaskExchangeUpdatedSinkAcquired(TaskId taskId, ExchangeSinkInstanceHandle exchangeSinkInstanceHandle) {
            this.taskId = Objects.requireNonNull(taskId, "taskId is null");
            this.exchangeSinkInstanceHandle = Objects.requireNonNull(exchangeSinkInstanceHandle, "exchangeSinkInstanceHandle is null");
        }

        @Override
        public <T> T accept(EventListener<T> listener) {
            return listener.onRemoteTaskExchangeUpdatedSinkAcquired(this);
        }

        public TaskId getTaskId() {
            return this.taskId;
        }

        public ExchangeSinkInstanceHandle getExchangeSinkInstanceHandle() {
            return this.exchangeSinkInstanceHandle;
        }

        public String toString() {
            return MoreObjects.toStringHelper((Object)this).add("taskId", (Object)this.taskId).add("exchangeSinkInstanceHandle", (Object)this.exchangeSinkInstanceHandle).toString();
        }
    }

    private static class RemoteTaskExchangeSinkUpdateRequiredEvent
    extends RemoteTaskEvent {
        protected RemoteTaskExchangeSinkUpdateRequiredEvent(TaskStatus taskStatus) {
            super(taskStatus);
        }

        @Override
        public <T> T accept(EventListener<T> listener) {
            return listener.onRemoteTaskExchangeSinkUpdateRequired(this);
        }

        @Override
        public String toString() {
            return MoreObjects.toStringHelper((Object)this).add("taskStatus", (Object)this.getTaskStatus()).toString();
        }
    }

    private static class RemoteTaskCompletedEvent
    extends RemoteTaskEvent {
        public RemoteTaskCompletedEvent(TaskStatus taskStatus) {
            super(taskStatus);
        }

        @Override
        public <T> T accept(EventListener<T> listener) {
            return listener.onRemoteTaskCompleted(this);
        }

        @Override
        public String toString() {
            return MoreObjects.toStringHelper((Object)this).add("taskStatus", (Object)this.getTaskStatus()).toString();
        }
    }

    private static class SinkInstanceHandleAcquiredEvent
    implements Event {
        private final StageId stageId;
        private final int partitionId;
        private final NodeAllocator.NodeLease nodeLease;
        private final int attempt;
        private final ExchangeSinkInstanceHandle sinkInstanceHandle;

        public SinkInstanceHandleAcquiredEvent(StageId stageId, int partitionId, NodeAllocator.NodeLease nodeLease, int attempt, ExchangeSinkInstanceHandle sinkInstanceHandle) {
            this.stageId = Objects.requireNonNull(stageId, "stageId is null");
            this.partitionId = partitionId;
            this.nodeLease = Objects.requireNonNull(nodeLease, "nodeLease is null");
            this.attempt = attempt;
            this.sinkInstanceHandle = Objects.requireNonNull(sinkInstanceHandle, "sinkInstanceHandle is null");
        }

        public StageId getStageId() {
            return this.stageId;
        }

        public int getPartitionId() {
            return this.partitionId;
        }

        public NodeAllocator.NodeLease getNodeLease() {
            return this.nodeLease;
        }

        public int getAttempt() {
            return this.attempt;
        }

        public ExchangeSinkInstanceHandle getSinkInstanceHandle() {
            return this.sinkInstanceHandle;
        }

        @Override
        public <T> T accept(EventListener<T> listener) {
            return listener.onSinkInstanceHandleAcquired(this);
        }

        public String toString() {
            return MoreObjects.toStringHelper((Object)this).add("stageId", (Object)this.stageId).add("partitionId", this.partitionId).add("nodeLease", (Object)this.nodeLease).add("attempt", this.attempt).add("sinkInstanceHandle", (Object)this.sinkInstanceHandle).toString();
        }
    }

    private static interface EventListener<T> {
        default public T onRemoteTaskCompleted(RemoteTaskCompletedEvent event) {
            return this.onRemoteTaskEvent(event);
        }

        default public T onRemoteTaskExchangeSinkUpdateRequired(RemoteTaskExchangeSinkUpdateRequiredEvent event) {
            return this.onRemoteTaskEvent(event);
        }

        default public T onRemoteTaskEvent(RemoteTaskEvent event) {
            return this.onEvent(event);
        }

        default public T onRemoteTaskExchangeUpdatedSinkAcquired(RemoteTaskExchangeUpdatedSinkAcquired event) {
            return this.onEvent(event);
        }

        default public T onSplitAssignment(SplitAssignmentEvent event) {
            return this.onStageEvent(event);
        }

        default public T onStageFailure(StageFailureEvent event) {
            return this.onStageEvent(event);
        }

        default public T onStageEvent(StageEvent event) {
            return this.onEvent(event);
        }

        default public T onSinkInstanceHandleAcquired(SinkInstanceHandleAcquiredEvent event) {
            return this.onEvent(event);
        }

        default public T onEvent(Event unused) {
            throw new RuntimeException("EventListener no implemented");
        }
    }

    private static interface Event {
        public static final Event ABORT = new Event(){

            @Override
            public <T> T accept(EventListener<T> listener) {
                throw new UnsupportedOperationException();
            }
        };
        public static final Event WAKE_UP = new Event(){

            @Override
            public <T> T accept(EventListener<T> listener) {
                throw new UnsupportedOperationException();
            }
        };

        public <T> T accept(EventListener<T> var1);
    }

    private static class SchedulingQueue {
        private final Map<TaskExecutionClass, IndexedPriorityQueue<ScheduledTask>> queues = ImmutableMap.builder().put((Object)TaskExecutionClass.STANDARD, new IndexedPriorityQueue(IndexedPriorityQueue.PriorityOrdering.LOW_TO_HIGH)).put((Object)TaskExecutionClass.SPECULATIVE, new IndexedPriorityQueue(IndexedPriorityQueue.PriorityOrdering.LOW_TO_HIGH)).put((Object)TaskExecutionClass.EAGER_SPECULATIVE, new IndexedPriorityQueue(IndexedPriorityQueue.PriorityOrdering.LOW_TO_HIGH)).buildOrThrow();

        public boolean isEmpty() {
            return this.queues.values().stream().allMatch(IndexedPriorityQueue::isEmpty);
        }

        private int getTaskCount(TaskExecutionClass executionClass) {
            return this.queues.get((Object)executionClass).size();
        }

        public PrioritizedScheduledTask pollOrThrow(TaskExecutionClass executionClass) {
            IndexedPriorityQueue.Prioritized<ScheduledTask> task = this.queues.get((Object)executionClass).pollPrioritized();
            Preconditions.checkState((task != null ? 1 : 0) != 0, (String)"queue for %s is empty", (Object)((Object)executionClass));
            return SchedulingQueue.getPrioritizedTask(executionClass, task);
        }

        public void addOrUpdate(PrioritizedScheduledTask prioritizedTask) {
            this.queues.values().forEach(queue -> queue.remove(prioritizedTask.task()));
            this.queues.get((Object)prioritizedTask.getExecutionClass()).addOrUpdate(prioritizedTask.task(), prioritizedTask.priority());
        }

        private static PrioritizedScheduledTask getPrioritizedTask(TaskExecutionClass executionClass, IndexedPriorityQueue.Prioritized<ScheduledTask> task) {
            return new PrioritizedScheduledTask(task.getValue(), executionClass, Math.toIntExact(task.getPriority()));
        }
    }

    private record PrioritizedScheduledTask(ScheduledTask task, TaskExecutionClass executionClass, int priority) {
        private PrioritizedScheduledTask {
            Objects.requireNonNull(task, "task is null");
            Objects.requireNonNull(executionClass, "executionClass is null");
            Preconditions.checkArgument((priority >= 0 ? 1 : 0) != 0, (String)"priority must be greater than or equal to zero: %s", (int)priority);
        }

        public static PrioritizedScheduledTask create(StageId stageId, int partitionId, int priority) {
            return new PrioritizedScheduledTask(new ScheduledTask(stageId, partitionId), TaskExecutionClass.STANDARD, priority);
        }

        public static PrioritizedScheduledTask createSpeculative(StageId stageId, int partitionId, int priority, boolean eager) {
            return new PrioritizedScheduledTask(new ScheduledTask(stageId, partitionId), eager ? TaskExecutionClass.EAGER_SPECULATIVE : TaskExecutionClass.SPECULATIVE, priority);
        }

        public TaskExecutionClass getExecutionClass() {
            return this.executionClass;
        }

        @Override
        public String toString() {
            return String.valueOf(this.task.stageId()) + "/" + this.task.partitionId() + "[" + String.valueOf((Object)this.executionClass) + "/" + this.priority + "]";
        }
    }

    private record ScheduledTask(StageId stageId, int partitionId) {
        private ScheduledTask {
            Objects.requireNonNull(stageId, "stageId is null");
            Preconditions.checkArgument((partitionId >= 0 ? 1 : 0) != 0, (String)"partitionId must be greater than or equal to zero: %s", (int)partitionId);
        }
    }

    private static class OpenTaskDescriptor {
        private final SplitsMapping splits;
        private final Set<PlanNodeId> noMoreSplits;
        private final NodeRequirements nodeRequirements;

        private OpenTaskDescriptor(SplitsMapping splits, Set<PlanNodeId> noMoreSplits, NodeRequirements nodeRequirements) {
            this.splits = Objects.requireNonNull(splits, "splits is null");
            this.noMoreSplits = ImmutableSet.copyOf((Collection)Objects.requireNonNull(noMoreSplits, "noMoreSplits is null"));
            this.nodeRequirements = Objects.requireNonNull(nodeRequirements, "nodeRequirements is null");
        }

        private static Map<PlanNodeId, ListMultimap<Integer, Split>> copySplits(Map<PlanNodeId, ListMultimap<Integer, Split>> splits) {
            ImmutableMap.Builder splitsBuilder = ImmutableMap.builder();
            splits.forEach((planNodeId, planNodeSplits) -> splitsBuilder.put(planNodeId, (Object)ImmutableListMultimap.copyOf((Multimap)planNodeSplits)));
            return splitsBuilder.buildOrThrow();
        }

        public SplitsMapping getSplits() {
            return this.splits;
        }

        public Set<PlanNodeId> getNoMoreSplits() {
            return this.noMoreSplits;
        }

        public NodeRequirements getNodeRequirements() {
            return this.nodeRequirements;
        }

        public OpenTaskDescriptor update(PlanNodeId planNodeId, ListMultimap<Integer, Split> splits, boolean noMoreSplits) {
            SplitsMapping.Builder updatedSplitsMapping = SplitsMapping.builder(this.splits);
            for (Map.Entry entry : Multimaps.asMap(splits).entrySet()) {
                Integer sourcePartition = (Integer)entry.getKey();
                List partitionSplits = (List)entry.getValue();
                updatedSplitsMapping.addSplits(planNodeId, sourcePartition, partitionSplits);
            }
            ImmutableSet updatedNoMoreSplits = this.noMoreSplits;
            if (noMoreSplits && !updatedNoMoreSplits.contains(planNodeId)) {
                updatedNoMoreSplits = ImmutableSet.builder().addAll(this.noMoreSplits).add((Object)planNodeId).build();
            }
            return new OpenTaskDescriptor(updatedSplitsMapping.build(), (Set<PlanNodeId>)updatedNoMoreSplits, this.nodeRequirements);
        }

        public TaskDescriptor createTaskDescriptor(int partitionId) {
            Sets.SetView missingNoMoreSplits = Sets.difference(this.splits.getPlanNodeIds(), this.noMoreSplits);
            Preconditions.checkState((boolean)missingNoMoreSplits.isEmpty(), (String)"missing no more splits for plan nodes: %s", (Object)missingNoMoreSplits);
            return new TaskDescriptor(partitionId, this.splits, this.nodeRequirements);
        }
    }

    private static class StagePartition {
        private final TaskDescriptorStorage taskDescriptorStorage;
        private final StageId stageId;
        private final int partitionId;
        private final ExchangeSinkHandle exchangeSinkHandle;
        private final Set<PlanNodeId> remoteSourceIds;
        private Optional<OpenTaskDescriptor> openTaskDescriptor;
        private PartitionMemoryEstimator.MemoryRequirements memoryRequirements;
        private boolean failureObserved;
        private int remainingAttempts;
        private final Map<TaskId, RemoteTask> tasks = new HashMap<TaskId, RemoteTask>();
        private final Map<TaskId, SpoolingOutputBuffers> taskOutputBuffers = new HashMap<TaskId, SpoolingOutputBuffers>();
        private final Set<TaskId> runningTasks = new HashSet<TaskId>();
        private final Map<TaskId, NodeAllocator.NodeLease> taskNodeLeases = new HashMap<TaskId, NodeAllocator.NodeLease>();
        private final Set<PlanNodeId> finalSelectors = new HashSet<PlanNodeId>();
        private final Set<PlanNodeId> noMoreSplits = new HashSet<PlanNodeId>();
        private boolean taskScheduled;
        private boolean finished;

        public StagePartition(TaskDescriptorStorage taskDescriptorStorage, StageId stageId, int partitionId, ExchangeSinkHandle exchangeSinkHandle, Set<PlanNodeId> remoteSourceIds, NodeRequirements nodeRequirements, PartitionMemoryEstimator.MemoryRequirements memoryRequirements, int maxTaskExecutionAttempts) {
            this.taskDescriptorStorage = Objects.requireNonNull(taskDescriptorStorage, "taskDescriptorStorage is null");
            this.stageId = Objects.requireNonNull(stageId, "stageId is null");
            this.partitionId = partitionId;
            this.exchangeSinkHandle = Objects.requireNonNull(exchangeSinkHandle, "exchangeSinkHandle is null");
            this.remoteSourceIds = ImmutableSet.copyOf((Collection)Objects.requireNonNull(remoteSourceIds, "remoteSourceIds is null"));
            Objects.requireNonNull(nodeRequirements, "nodeRequirements is null");
            this.openTaskDescriptor = Optional.of(new OpenTaskDescriptor(SplitsMapping.EMPTY, (Set<PlanNodeId>)ImmutableSet.of(), nodeRequirements));
            this.memoryRequirements = Objects.requireNonNull(memoryRequirements, "memoryRequirements is null");
            this.remainingAttempts = maxTaskExecutionAttempts;
        }

        public ExchangeSinkHandle getExchangeSinkHandle() {
            return this.exchangeSinkHandle;
        }

        public void addSplits(PlanNodeId planNodeId, ListMultimap<Integer, Split> splits, boolean noMoreSplits) {
            Preconditions.checkState((boolean)this.openTaskDescriptor.isPresent(), (Object)"openTaskDescriptor is empty");
            this.openTaskDescriptor = Optional.of(this.openTaskDescriptor.get().update(planNodeId, splits, noMoreSplits));
            if (noMoreSplits) {
                this.noMoreSplits.add(planNodeId);
            }
            for (RemoteTask task : this.tasks.values()) {
                task.addSplits((Multimap<PlanNodeId, Split>)ImmutableListMultimap.builder().putAll((Object)planNodeId, (Iterable)splits.values()).build());
                if (!noMoreSplits || !this.isFinalOutputSelectorDelivered(planNodeId)) continue;
                task.noMoreSplits(planNodeId);
            }
        }

        private boolean isFinalOutputSelectorDelivered(PlanNodeId planNodeId) {
            if (!this.remoteSourceIds.contains(planNodeId)) {
                return true;
            }
            return this.finalSelectors.contains(planNodeId);
        }

        public void seal() {
            Preconditions.checkState((boolean)this.openTaskDescriptor.isPresent(), (Object)"openTaskDescriptor is empty");
            TaskDescriptor taskDescriptor = this.openTaskDescriptor.get().createTaskDescriptor(this.partitionId);
            this.openTaskDescriptor = Optional.empty();
            if (!this.finished) {
                this.taskDescriptorStorage.put(this.stageId, taskDescriptor);
                for (TaskId runningTaskId : this.runningTasks) {
                    RemoteTask runningTask = this.tasks.get(runningTaskId);
                    runningTask.setSpeculative(false);
                    this.taskNodeLeases.get(runningTaskId).setExecutionClass(TaskExecutionClass.STANDARD);
                }
            }
        }

        public SplitsMapping getSplits() {
            if (this.finished) {
                return SplitsMapping.EMPTY;
            }
            return this.openTaskDescriptor.map(OpenTaskDescriptor::getSplits).or(() -> this.taskDescriptorStorage.get(this.stageId, this.partitionId).map(TaskDescriptor::getSplits)).orElse(SplitsMapping.EMPTY);
        }

        public boolean isNoMoreSplits(PlanNodeId planNodeId) {
            if (this.finished) {
                return true;
            }
            return this.openTaskDescriptor.map(taskDescriptor -> taskDescriptor.getNoMoreSplits().contains(planNodeId)).orElse(true);
        }

        public boolean isSealed() {
            return this.openTaskDescriptor.isEmpty();
        }

        public Optional<NodeRequirements> getNodeRequirements() {
            if (this.finished) {
                return Optional.empty();
            }
            if (this.openTaskDescriptor.isPresent()) {
                return this.openTaskDescriptor.map(OpenTaskDescriptor::getNodeRequirements);
            }
            Optional<TaskDescriptor> taskDescriptor = this.taskDescriptorStorage.get(this.stageId, this.partitionId);
            if (taskDescriptor.isPresent()) {
                return taskDescriptor.map(TaskDescriptor::getNodeRequirements);
            }
            return Optional.empty();
        }

        public PartitionMemoryEstimator.MemoryRequirements getMemoryRequirements() {
            return this.memoryRequirements;
        }

        public void updateInitialMemoryRequirements(PartitionMemoryEstimator.MemoryRequirements memoryRequirements) {
            if (this.failureObserved && memoryRequirements.getRequiredMemory().toBytes() < this.memoryRequirements.getRequiredMemory().toBytes()) {
                return;
            }
            this.memoryRequirements = memoryRequirements;
            for (TaskId runningTaskId : this.runningTasks) {
                this.taskNodeLeases.get(runningTaskId).setMemoryRequirement(memoryRequirements.getRequiredMemory());
            }
        }

        public void setPostFailureMemoryRequirements(PartitionMemoryEstimator.MemoryRequirements memoryRequirements) {
            this.memoryRequirements = Objects.requireNonNull(memoryRequirements, "memoryRequirements is null");
        }

        public int getRemainingAttempts() {
            return this.remainingAttempts;
        }

        public void addTask(RemoteTask remoteTask, SpoolingOutputBuffers outputBuffers, NodeAllocator.NodeLease nodeLease) {
            TaskId taskId = remoteTask.getTaskId();
            this.tasks.put(taskId, remoteTask);
            this.taskOutputBuffers.put(taskId, outputBuffers);
            this.taskNodeLeases.put(taskId, nodeLease);
            this.runningTasks.add(taskId);
        }

        public SpoolingOutputStats.Snapshot taskFinished(TaskId taskId) {
            RemoteTask remoteTask = this.tasks.get(taskId);
            Preconditions.checkArgument((remoteTask != null ? 1 : 0) != 0, (String)"task not found: %s", (Object)taskId);
            SpoolingOutputStats.Snapshot outputStats = remoteTask.retrieveAndDropSpoolingOutputStats();
            this.runningTasks.remove(taskId);
            this.tasks.values().forEach(RemoteTask::abort);
            this.finished = true;
            if (this.isSealed()) {
                this.taskDescriptorStorage.remove(this.stageId, this.partitionId);
            }
            return outputStats;
        }

        public void taskFailed(TaskId taskId) {
            this.runningTasks.remove(taskId);
            this.failureObserved = true;
            --this.remainingAttempts;
        }

        public void updateExchangeSinkInstanceHandle(TaskId taskId, ExchangeSinkInstanceHandle handle) {
            SpoolingOutputBuffers outputBuffers = this.taskOutputBuffers.get(taskId);
            Preconditions.checkArgument((outputBuffers != null ? 1 : 0) != 0, (String)"output buffers not found: %s", (Object)taskId);
            RemoteTask remoteTask = this.tasks.get(taskId);
            Preconditions.checkArgument((remoteTask != null ? 1 : 0) != 0, (String)"task not found: %s", (Object)taskId);
            SpoolingOutputBuffers updatedOutputBuffers = outputBuffers.withExchangeSinkInstanceHandle(handle);
            this.taskOutputBuffers.put(taskId, updatedOutputBuffers);
            remoteTask.setOutputBuffers(updatedOutputBuffers);
        }

        public void updateExchangeSourceOutputSelector(PlanNodeId planNodeId, ExchangeSourceOutputSelector selector) {
            if (selector.isFinal()) {
                this.finalSelectors.add(planNodeId);
            }
            for (TaskId taskId : this.runningTasks) {
                RemoteTask task = this.tasks.get(taskId);
                Verify.verify((task != null ? 1 : 0) != 0, (String)"task is null: %s", (Object)taskId);
                task.addSplits((Multimap<PlanNodeId, Split>)ImmutableListMultimap.of((Object)planNodeId, (Object)EventDrivenFaultTolerantQueryScheduler.createOutputSelectorSplit(selector)));
                if (!selector.isFinal() || !this.noMoreSplits.contains(planNodeId)) continue;
                task.noMoreSplits(planNodeId);
            }
        }

        public boolean isRunning() {
            return !this.runningTasks.isEmpty();
        }

        public boolean isTaskScheduled() {
            return this.taskScheduled;
        }

        public void setTaskScheduled(boolean taskScheduled) {
            Preconditions.checkArgument((boolean)taskScheduled, (Object)"taskScheduled must be true");
            this.taskScheduled = taskScheduled;
        }

        public boolean isFinished() {
            return this.finished;
        }

        public String getDebugInfo() {
            return MoreObjects.toStringHelper((Object)this).add("stageId", (Object)this.stageId).add("partitionId", this.partitionId).add("exchangeSinkHandle", (Object)this.exchangeSinkHandle).add("remoteSourceIds", this.remoteSourceIds).add("openTaskDescriptor", this.openTaskDescriptor).add("memoryRequirements", (Object)this.memoryRequirements).add("failureObserved", this.failureObserved).add("remainingAttempts", this.remainingAttempts).add("tasks", this.tasks).add("taskOutputBuffers", this.taskOutputBuffers).add("runningTasks", this.runningTasks).add("taskNodeLeases", this.taskNodeLeases).add("finalSelectors", this.finalSelectors).add("noMoreSplits", this.noMoreSplits).add("taskScheduled", this.taskScheduled).add("finished", this.finished).toString();
        }
    }

    public static class StageExecution {
        private final TaskDescriptorStorage taskDescriptorStorage;
        private final Queue<Map.Entry<TaskId, RuntimeException>> taskFailures;
        private final SqlStage stage;
        private final EventDrivenTaskSource taskSource;
        private final FaultTolerantPartitioningScheme sinkPartitioningScheme;
        private final Exchange exchange;
        private final StageExecutionStats stageExecutionStats;
        private final PartitionMemoryEstimator partitionMemoryEstimator;
        private final OutputStatsEstimator outputStatsEstimator;
        private final int maxTaskExecutionAttempts;
        private final int schedulingPriority;
        private final boolean eager;
        private boolean speculative;
        private final DynamicFilterService dynamicFilterService;
        private final long[] outputDataSize;
        private long outputRowCount;
        private final Int2ObjectMap<StagePartition> partitions = new Int2ObjectOpenHashMap();
        private boolean noMorePartitions;
        private final IntSet runningPartitions = new IntOpenHashSet();
        private final IntSet remainingPartitions = new IntOpenHashSet();
        private ExchangeSourceOutputSelector.Builder sinkOutputSelectorBuilder;
        private ExchangeSourceOutputSelector finalSinkOutputSelector;
        private final Set<PlanNodeId> remoteSourceIds;
        private final Map<PlanFragmentId, RemoteSourceNode> remoteSources;
        private final Map<PlanFragmentId, ExchangeSourceOutputSelector> sourceOutputSelectors = new HashMap<PlanFragmentId, ExchangeSourceOutputSelector>();
        private boolean taskDescriptorLoadingActive;
        private boolean exchangeClosed;
        private final long startTime = System.currentTimeMillis();
        private OptionalLong nonSpeculativeSwitchTime;
        private PartitionMemoryEstimator.MemoryRequirements initialMemoryRequirements;

        private StageExecution(TaskDescriptorStorage taskDescriptorStorage, Queue<Map.Entry<TaskId, RuntimeException>> taskFailures, SqlStage stage, EventDrivenTaskSource taskSource, FaultTolerantPartitioningScheme sinkPartitioningScheme, Exchange exchange, StageExecutionStats stageExecutionStats, PartitionMemoryEstimator partitionMemoryEstimator, OutputStatsEstimator outputStatsEstimator, int maxTaskExecutionAttempts, int schedulingPriority, boolean eager, boolean speculative, DynamicFilterService dynamicFilterService) {
            this.taskDescriptorStorage = Objects.requireNonNull(taskDescriptorStorage, "taskDescriptorStorage is null");
            this.taskFailures = Objects.requireNonNull(taskFailures, "taskFailures is null");
            this.stage = Objects.requireNonNull(stage, "stage is null");
            this.taskSource = Objects.requireNonNull(taskSource, "taskSource is null");
            this.sinkPartitioningScheme = Objects.requireNonNull(sinkPartitioningScheme, "sinkPartitioningScheme is null");
            this.exchange = Objects.requireNonNull(exchange, "exchange is null");
            this.stageExecutionStats = Objects.requireNonNull(stageExecutionStats, "stageExecutionStats is null");
            this.partitionMemoryEstimator = Objects.requireNonNull(partitionMemoryEstimator, "partitionMemoryEstimator is null");
            this.outputStatsEstimator = Objects.requireNonNull(outputStatsEstimator, "outputStatsEstimator is null");
            this.maxTaskExecutionAttempts = maxTaskExecutionAttempts;
            this.schedulingPriority = schedulingPriority;
            this.eager = eager;
            this.speculative = speculative;
            this.nonSpeculativeSwitchTime = speculative ? OptionalLong.empty() : OptionalLong.of(System.currentTimeMillis());
            this.dynamicFilterService = Objects.requireNonNull(dynamicFilterService, "dynamicFilterService is null");
            this.outputDataSize = new long[sinkPartitioningScheme.getPartitionCount()];
            this.sinkOutputSelectorBuilder = ExchangeSourceOutputSelector.builder((Set)ImmutableSet.of((Object)exchange.getId()));
            ImmutableMap.Builder remoteSources = ImmutableMap.builder();
            ImmutableSet.Builder remoteSourceIds = ImmutableSet.builder();
            for (RemoteSourceNode remoteSource : stage.getFragment().getRemoteSourceNodes()) {
                remoteSourceIds.add((Object)remoteSource.getId());
                remoteSource.getSourceFragmentIds().forEach(fragmentId -> remoteSources.put(fragmentId, (Object)remoteSource));
            }
            this.remoteSourceIds = remoteSourceIds.build();
            this.remoteSources = remoteSources.buildOrThrow();
            this.initialMemoryRequirements = this.computeCurrentInitialMemoryRequirements();
        }

        private PartitionMemoryEstimator.MemoryRequirements computeCurrentInitialMemoryRequirements() {
            return this.partitionMemoryEstimator.getInitialMemoryRequirements();
        }

        private void updateMemoryRequirements() {
            PartitionMemoryEstimator.MemoryRequirements newInitialMemoryRequirements = this.computeCurrentInitialMemoryRequirements();
            if (this.initialMemoryRequirements.equals(newInitialMemoryRequirements)) {
                return;
            }
            this.initialMemoryRequirements = newInitialMemoryRequirements;
            for (StagePartition partition : this.partitions.values()) {
                if (partition.isFinished()) continue;
                partition.updateInitialMemoryRequirements(this.initialMemoryRequirements);
            }
        }

        public StageId getStageId() {
            return this.stage.getStageId();
        }

        public PlanFragmentId getStageFragmentId() {
            return this.stage.getFragment().getId();
        }

        public PlanFragment getStageFragment() {
            return this.stage.getFragment();
        }

        public StageState getState() {
            return this.stage.getState();
        }

        public StageInfo getStageInfo() {
            return this.stage.getStageInfo();
        }

        public Exchange getExchange() {
            return this.exchange;
        }

        public boolean isExchangeClosed() {
            return this.exchangeClosed;
        }

        public void setSpeculative(boolean speculative) {
            Preconditions.checkArgument((!speculative || this.speculative ? 1 : 0) != 0, (Object)"cannot mark non-speculative stage as speculative");
            if (this.speculative && !speculative) {
                this.nonSpeculativeSwitchTime = OptionalLong.of(System.currentTimeMillis());
            }
            this.speculative = speculative;
        }

        public void addPartition(int partitionId, NodeRequirements nodeRequirements) {
            if (this.getState().isDone()) {
                return;
            }
            ExchangeSinkHandle exchangeSinkHandle = this.exchange.addSink(partitionId);
            StagePartition partition = new StagePartition(this.taskDescriptorStorage, this.stage.getStageId(), partitionId, exchangeSinkHandle, this.remoteSourceIds, nodeRequirements, this.initialMemoryRequirements, this.maxTaskExecutionAttempts);
            Preconditions.checkState((this.partitions.putIfAbsent(partitionId, (Object)partition) == null ? 1 : 0) != 0, (String)"partition with id %s already exist in stage %s", (int)partitionId, (Object)this.stage.getStageId());
            this.getSourceOutputSelectors().forEach(partition::updateExchangeSourceOutputSelector);
            this.remainingPartitions.add(partitionId);
        }

        public Optional<PrioritizedScheduledTask> updatePartition(int taskPartitionId, PlanNodeId planNodeId, boolean readyForScheduling, ListMultimap<Integer, Split> splits, boolean noMoreSplits) {
            if (this.getState().isDone()) {
                return Optional.empty();
            }
            StagePartition partition = this.getStagePartition(taskPartitionId);
            partition.addSplits(planNodeId, splits, noMoreSplits);
            if (readyForScheduling && !partition.isTaskScheduled()) {
                partition.setTaskScheduled(true);
                PrioritizedScheduledTask task = this.speculative ? PrioritizedScheduledTask.createSpeculative(this.stage.getStageId(), taskPartitionId, this.schedulingPriority, this.eager) : PrioritizedScheduledTask.create(this.stage.getStageId(), taskPartitionId, this.schedulingPriority);
                return Optional.of(task);
            }
            return Optional.empty();
        }

        public Optional<PrioritizedScheduledTask> sealPartition(int partitionId) {
            if (this.getState().isDone()) {
                return Optional.empty();
            }
            StagePartition partition = this.getStagePartition(partitionId);
            partition.seal();
            if (!partition.isRunning()) {
                return Optional.of(PrioritizedScheduledTask.create(this.stage.getStageId(), partitionId, this.schedulingPriority));
            }
            return Optional.empty();
        }

        public void noMorePartitions() {
            this.noMorePartitions = true;
            if (this.getState().isDone()) {
                return;
            }
            if (this.remainingPartitions.isEmpty()) {
                this.finish();
            }
        }

        public boolean isNoMorePartitions() {
            return this.noMorePartitions;
        }

        public int getPartitionsCount() {
            Preconditions.checkState((boolean)this.noMorePartitions, (Object)"noMorePartitions not set yet");
            return this.partitions.size();
        }

        public int getRemainingPartitionsCount() {
            Preconditions.checkState((boolean)this.noMorePartitions, (Object)"noMorePartitions not set yet");
            return this.remainingPartitions.size();
        }

        public void closeExchange() {
            if (this.exchangeClosed) {
                return;
            }
            this.exchange.close();
            this.exchangeClosed = true;
        }

        public Optional<GetExchangeSinkInstanceHandleResult> getExchangeSinkInstanceHandle(int partitionId) {
            if (this.getState().isDone()) {
                return Optional.empty();
            }
            StagePartition partition = this.getStagePartition(partitionId);
            Verify.verify((partition.getRemainingAttempts() >= 0 ? 1 : 0) != 0, (String)"remaining attempts is expected to be greater than or equal to zero: %s", (int)partition.getRemainingAttempts());
            if (partition.isFinished()) {
                return Optional.empty();
            }
            int attempt = this.maxTaskExecutionAttempts - partition.getRemainingAttempts();
            return Optional.of(new GetExchangeSinkInstanceHandleResult(this.exchange.instantiateSink(partition.getExchangeSinkHandle(), attempt), attempt));
        }

        public Optional<RemoteTask> schedule(int partitionId, ExchangeSinkInstanceHandle exchangeSinkInstanceHandle, int attempt, NodeAllocator.NodeLease nodeLease, boolean speculative) {
            InternalNode node;
            try {
                node = (InternalNode)Futures.getDone(nodeLease.getNode());
            }
            catch (ExecutionException e) {
                throw new UncheckedExecutionException((Throwable)e);
            }
            if (this.getState().isDone()) {
                return Optional.empty();
            }
            StagePartition partition = this.getStagePartition(partitionId);
            Verify.verify((partition.getRemainingAttempts() >= 0 ? 1 : 0) != 0, (String)"remaining attempts is expected to be greater than or equal to zero: %s", (int)partition.getRemainingAttempts());
            if (partition.isFinished()) {
                return Optional.empty();
            }
            Map<PlanNodeId, ExchangeSourceOutputSelector> outputSelectors = this.getSourceOutputSelectors();
            ArrayListMultimap splits = ArrayListMultimap.create();
            splits.putAll(partition.getSplits().getSplitsFlat());
            outputSelectors.forEach((arg_0, arg_1) -> StageExecution.lambda$schedule$1((ListMultimap)splits, arg_0, arg_1));
            HashSet<PlanNodeId> noMoreSplits = new HashSet<PlanNodeId>();
            for (RemoteSourceNode remoteSource : this.stage.getFragment().getRemoteSourceNodes()) {
                ExchangeSourceOutputSelector selector = outputSelectors.get(remoteSource.getId());
                if (selector == null || !selector.isFinal() || !partition.isNoMoreSplits(remoteSource.getId())) continue;
                noMoreSplits.add(remoteSource.getId());
            }
            for (PlanNodeId partitionedSource : this.stage.getFragment().getPartitionedSources()) {
                if (!partition.isNoMoreSplits(partitionedSource)) continue;
                noMoreSplits.add(partitionedSource);
            }
            SpoolingOutputBuffers outputBuffers = SpoolingOutputBuffers.createInitial(exchangeSinkInstanceHandle, this.sinkPartitioningScheme.getPartitionCount());
            Optional<RemoteTask> task = this.stage.createTask(node, partitionId, attempt, this.sinkPartitioningScheme.getBucketToPartitionMap(), outputBuffers, (Multimap<PlanNodeId, Split>)splits, noMoreSplits, Optional.of(partition.getMemoryRequirements().getRequiredMemory()), speculative);
            task.ifPresent(remoteTask -> {
                partition.addTask((RemoteTask)remoteTask, outputBuffers, nodeLease);
                this.runningPartitions.add(partitionId);
            });
            return task;
        }

        public boolean isEager() {
            return this.eager;
        }

        public boolean hasOpenTaskRunning() {
            if (this.getState().isDone()) {
                return false;
            }
            if (this.runningPartitions.isEmpty()) {
                return false;
            }
            IntIterator intIterator = this.runningPartitions.iterator();
            while (intIterator.hasNext()) {
                int partitionId = (Integer)intIterator.next();
                StagePartition partition = this.getStagePartition(partitionId);
                if (partition.isSealed()) continue;
                return true;
            }
            return false;
        }

        public Optional<ListenableFuture<SplitAssigner.AssignmentResult>> loadMoreTaskDescriptors() {
            if (this.getState().isDone() || this.taskDescriptorLoadingActive) {
                return Optional.empty();
            }
            Optional<ListenableFuture<SplitAssigner.AssignmentResult>> loadingFuture = this.taskSource.process();
            if (loadingFuture.isEmpty()) {
                return Optional.empty();
            }
            this.taskDescriptorLoadingActive = true;
            return loadingFuture;
        }

        public void taskDescriptorLoadingComplete() {
            this.taskDescriptorLoadingActive = false;
        }

        private Map<PlanNodeId, ExchangeSourceOutputSelector> getSourceOutputSelectors() {
            ImmutableMap.Builder result = ImmutableMap.builder();
            for (RemoteSourceNode remoteSource : this.stage.getFragment().getRemoteSourceNodes()) {
                ExchangeSourceOutputSelector mergedSelector = null;
                for (PlanFragmentId sourceFragmentId : remoteSource.getSourceFragmentIds()) {
                    ExchangeSourceOutputSelector sourceFragmentSelector = this.sourceOutputSelectors.get(sourceFragmentId);
                    if (sourceFragmentSelector == null) continue;
                    if (mergedSelector == null) {
                        mergedSelector = sourceFragmentSelector;
                        continue;
                    }
                    mergedSelector = mergedSelector.merge(sourceFragmentSelector);
                }
                if (mergedSelector == null) continue;
                result.put((Object)remoteSource.getId(), mergedSelector);
            }
            return result.buildOrThrow();
        }

        public void initializeUpdateOfExchangeSinkInstanceHandle(TaskId taskId, BlockingQueue<Event> eventQueue) {
            if (this.getState().isDone()) {
                return;
            }
            StagePartition partition = this.getStagePartition(taskId.getPartitionId());
            CompletableFuture exchangeSinkInstanceHandleFuture = this.exchange.updateSinkInstanceHandle(partition.getExchangeSinkHandle(), taskId.getAttemptId());
            exchangeSinkInstanceHandleFuture.whenComplete((sinkInstanceHandle, throwable) -> {
                if (throwable != null) {
                    eventQueue.add(new StageFailureEvent(taskId.getStageId(), (Throwable)throwable));
                } else {
                    eventQueue.add(new RemoteTaskExchangeUpdatedSinkAcquired(taskId, (ExchangeSinkInstanceHandle)sinkInstanceHandle));
                }
            });
        }

        public void finalizeUpdateOfExchangeSinkInstanceHandle(TaskId taskId, ExchangeSinkInstanceHandle updatedExchangeSinkInstanceHandle) {
            if (this.getState().isDone()) {
                return;
            }
            StagePartition partition = this.getStagePartition(taskId.getPartitionId());
            partition.updateExchangeSinkInstanceHandle(taskId, updatedExchangeSinkInstanceHandle);
        }

        public void taskFinished(TaskId taskId, TaskStatus taskStatus) {
            if (this.getState().isDone()) {
                return;
            }
            int partitionId = taskId.getPartitionId();
            StagePartition partition = this.getStagePartition(partitionId);
            this.exchange.sinkFinished(partition.getExchangeSinkHandle(), taskId.getAttemptId());
            SpoolingOutputStats.Snapshot outputStats = partition.taskFinished(taskId);
            if (!partition.isRunning()) {
                this.runningPartitions.remove(partitionId);
            }
            if (!this.remainingPartitions.remove(partitionId)) {
                return;
            }
            this.updateOutputSize(outputStats);
            this.partitionMemoryEstimator.registerPartitionFinished(partition.getMemoryRequirements(), taskStatus.getPeakMemoryReservation(), true, Optional.empty());
            this.sinkOutputSelectorBuilder.include(this.exchange.getId(), taskId.getPartitionId(), taskId.getAttemptId());
            if (this.noMorePartitions && this.remainingPartitions.isEmpty() && !this.stage.getState().isDone()) {
                this.finish();
            }
        }

        private void finish() {
            this.dynamicFilterService.stageCannotScheduleMoreTasks(this.stage.getStageId(), 0, this.partitions.size());
            this.exchange.noMoreSinks();
            this.exchange.allRequiredSinksFinished();
            Verify.verify((this.finalSinkOutputSelector == null ? 1 : 0) != 0, (String)"finalOutputSelector is already set", (Object[])new Object[0]);
            this.sinkOutputSelectorBuilder.setPartitionCount(this.exchange.getId(), this.partitions.size());
            this.sinkOutputSelectorBuilder.setFinal();
            this.finalSinkOutputSelector = this.sinkOutputSelectorBuilder.build();
            this.sinkOutputSelectorBuilder = null;
            this.stage.finish();
            this.taskSource.close();
            this.recordFinishStats();
        }

        private void recordFinishStats() {
            long finishTime = System.currentTimeMillis();
            long nonSpeculativeSwitchTime = this.nonSpeculativeSwitchTime.orElse(finishTime);
            double speculativeExecutionFraction = ((double)nonSpeculativeSwitchTime - (double)this.startTime) / ((double)finishTime - (double)this.startTime);
            if (Double.isFinite(speculativeExecutionFraction)) {
                this.stageExecutionStats.recordStageSpeculativeExecutionFraction(speculativeExecutionFraction);
            } else {
                this.stageExecutionStats.recordStageSpeculativeExecutionFraction(1.0);
            }
        }

        private void updateOutputSize(SpoolingOutputStats.Snapshot taskOutputStats) {
            int partitionId = 0;
            while (partitionId < this.sinkPartitioningScheme.getPartitionCount()) {
                long partitionSizeInBytes = taskOutputStats.getPartitionSizeInBytes(partitionId);
                Preconditions.checkArgument((partitionSizeInBytes >= 0L ? 1 : 0) != 0, (String)"partitionSizeInBytes must be greater than or equal to zero: %s", (long)partitionSizeInBytes);
                int n = partitionId++;
                this.outputDataSize[n] = this.outputDataSize[n] + partitionSizeInBytes;
            }
            this.outputRowCount += taskOutputStats.getRowCount();
        }

        public List<PrioritizedScheduledTask> taskFailed(TaskId taskId, ExecutionFailureInfo failureInfo, TaskStatus taskStatus) {
            if (this.getState().isDone()) {
                return ImmutableList.of();
            }
            int partitionId = taskId.getPartitionId();
            StagePartition partition = this.getStagePartition(partitionId);
            partition.taskFailed(taskId);
            if (!partition.isRunning()) {
                this.runningPartitions.remove(partitionId);
            }
            if (!this.remainingPartitions.contains(partitionId)) {
                return ImmutableList.of();
            }
            RuntimeException failure = failureInfo.toException();
            ErrorCode errorCode = failureInfo.getErrorCode();
            this.partitionMemoryEstimator.registerPartitionFinished(partition.getMemoryRequirements(), taskStatus.getPeakMemoryReservation(), false, Optional.ofNullable(errorCode));
            PartitionMemoryEstimator.MemoryRequirements currentMemoryLimits = partition.getMemoryRequirements();
            PartitionMemoryEstimator.MemoryRequirements newMemoryLimits = this.partitionMemoryEstimator.getNextRetryMemoryRequirements(partition.getMemoryRequirements(), taskStatus.getPeakMemoryReservation(), errorCode);
            partition.setPostFailureMemoryRequirements(newMemoryLimits);
            log.debug("Computed next memory requirements for task from stage %s; previous=%s; new=%s; peak=%s; estimator=%s", new Object[]{this.stage.getStageId(), currentMemoryLimits, newMemoryLimits, taskStatus.getPeakMemoryReservation(), this.partitionMemoryEstimator});
            if (errorCode != null && ErrorCodes.isOutOfMemoryError(errorCode) && (double)newMemoryLimits.getRequiredMemory().toBytes() * 0.99 <= (double)taskStatus.getPeakMemoryReservation().toBytes()) {
                String message = String.format("Cannot allocate enough memory for task %s. Reported peak memory reservation: %s. Maximum possible reservation: %s.", taskId, taskStatus.getPeakMemoryReservation(), newMemoryLimits.getRequiredMemory());
                this.stage.fail(new TrinoException(() -> errorCode, message, (Throwable)failure));
                return ImmutableList.of();
            }
            if (partition.getRemainingAttempts() == 0 || errorCode != null && errorCode.getType() == ErrorType.USER_ERROR) {
                this.stage.fail(failure);
                return ImmutableList.of();
            }
            this.recordTaskFailureInLog(taskId, failure);
            if (!partition.isSealed()) {
                return ImmutableList.of();
            }
            return ImmutableList.of((Object)PrioritizedScheduledTask.create(this.stage.getStageId(), partitionId, this.schedulingPriority));
        }

        private void recordTaskFailureInLog(TaskId taskId, RuntimeException failure) {
            if (this.taskFailures.size() == 5) {
                this.taskFailures.remove();
            }
            this.taskFailures.add(Map.entry(taskId, failure));
        }

        public PartitionMemoryEstimator.MemoryRequirements getMemoryRequirements(int partitionId) {
            return this.getStagePartition(partitionId).getMemoryRequirements();
        }

        public Optional<NodeRequirements> getNodeRequirements(int partitionId) {
            return this.getStagePartition(partitionId).getNodeRequirements();
        }

        public Optional<OutputStatsEstimator.OutputStatsEstimateResult> getOutputStats(Function<StageId, StageExecution> stageExecutionLookup, boolean parentEager) {
            if (this.stage.getState() == StageState.FINISHED) {
                return Optional.of(new OutputStatsEstimator.OutputStatsEstimateResult(new OutputDataSizeEstimate(ImmutableLongArray.copyOf((long[])this.outputDataSize)), this.outputRowCount, "FINISHED", true));
            }
            return this.outputStatsEstimator.getEstimatedOutputStats(this, stageExecutionLookup, parentEager);
        }

        public boolean isSomeProgressMade() {
            return this.partitions.size() > 0 && this.remainingPartitions.size() < this.partitions.size();
        }

        public long getOutputRowCount() {
            return this.outputRowCount;
        }

        public ExchangeSourceOutputSelector getSinkOutputSelector() {
            if (this.finalSinkOutputSelector != null) {
                return this.finalSinkOutputSelector;
            }
            return this.sinkOutputSelectorBuilder.build();
        }

        public void setSourceOutputSelector(PlanFragmentId sourceFragmentId, ExchangeSourceOutputSelector selector) {
            this.sourceOutputSelectors.put(sourceFragmentId, selector);
            RemoteSourceNode remoteSourceNode = this.remoteSources.get(sourceFragmentId);
            Verify.verify((remoteSourceNode != null ? 1 : 0) != 0, (String)"remoteSourceNode is null for fragment: %s", (Object)sourceFragmentId);
            ExchangeSourceOutputSelector mergedSelector = selector;
            for (PlanFragmentId fragmentId : remoteSourceNode.getSourceFragmentIds()) {
                ExchangeSourceOutputSelector fragmentSelector;
                if (fragmentId.equals(sourceFragmentId) || (fragmentSelector = this.sourceOutputSelectors.get(fragmentId)) == null) continue;
                mergedSelector = mergedSelector.merge(fragmentSelector);
            }
            ExchangeSourceOutputSelector finalMergedSelector = mergedSelector;
            this.remainingPartitions.forEach(value -> {
                StagePartition partition = (StagePartition)this.partitions.get(value);
                Verify.verify((partition != null ? 1 : 0) != 0, (String)"partition not found: %s", (int)value);
                partition.updateExchangeSourceOutputSelector(remoteSourceNode.getId(), finalMergedSelector);
            });
        }

        public void abort() {
            Closer closer = this.createStageExecutionCloser();
            closer.register(this.stage::abort);
            try {
                closer.close();
            }
            catch (IOException e) {
                throw new UncheckedIOException(e);
            }
        }

        public void fail(Throwable t) {
            if (this.stage.getState().isDone()) {
                return;
            }
            Closer closer = this.createStageExecutionCloser();
            closer.register(() -> this.stage.fail(t));
            try {
                closer.close();
            }
            catch (IOException e) {
                throw new UncheckedIOException(e);
            }
            this.taskDescriptorLoadingComplete();
        }

        private Closer createStageExecutionCloser() {
            Closer closer = Closer.create();
            closer.register((Closeable)this.taskSource);
            closer.register(this::closeExchange);
            return closer;
        }

        private StagePartition getStagePartition(int partitionId) {
            StagePartition partition = (StagePartition)this.partitions.get(partitionId);
            Preconditions.checkState((partition != null ? 1 : 0) != 0, (String)"partition with id %s does not exist in stage %s", (int)partitionId, (Object)this.stage.getStageId());
            return partition;
        }

        public long[] currentOutputDataSize() {
            return this.outputDataSize;
        }

        public FaultTolerantPartitioningScheme getSinkPartitioningScheme() {
            return this.sinkPartitioningScheme;
        }

        public void logDebugInfo() {
            if (!log.isDebugEnabled()) {
                return;
            }
            log.debug("StageExecution %s: %s", new Object[]{this.stage.getStageId(), MoreObjects.toStringHelper((Object)this).add("taskDescriptorStorage.getReservedBytes()", this.taskDescriptorStorage.getReservedBytes()).add("taskSource", (Object)this.taskSource.getDebugInfo()).add("sinkPartitioningScheme", (Object)this.sinkPartitioningScheme).add("exchange", (Object)this.exchange).add("schedulingPriority", this.schedulingPriority).add("eager", this.eager).add("outputDataSize", (Object)this.outputDataSize).add("noMorePartitions", this.noMorePartitions).add("runningPartitions", (Object)this.runningPartitions).add("remainingPartitions", (Object)this.remainingPartitions).add("sinkOutputSelectorBuilder", this.sinkOutputSelectorBuilder == null ? null : this.sinkOutputSelectorBuilder.build()).add("finalSinkOutputSelector", (Object)this.finalSinkOutputSelector).add("remoteSourceIds", this.remoteSourceIds).add("remoteSources", this.remoteSources).add("sourceOutputSelectors", this.sourceOutputSelectors).add("taskDescriptorLoadingActive", this.taskDescriptorLoadingActive).add("exchangeClosed", this.exchangeClosed).add("initialMemoryRequirements", (Object)this.initialMemoryRequirements).toString()});
            this.partitions.forEach((partitionId, stagePartition) -> log.debug("   StagePartition %s.%s: %s", new Object[]{this.stage.getStageId(), partitionId, stagePartition.getDebugInfo()}));
        }

        private static /* synthetic */ void lambda$schedule$1(ListMultimap splits, PlanNodeId planNodeId, ExchangeSourceOutputSelector outputSelector) {
            splits.put((Object)planNodeId, (Object)EventDrivenFaultTolerantQueryScheduler.createOutputSelectorSplit(outputSelector));
        }
    }

    private static class EventDebugInfos {
        private static final String GLOBAL_EVENTS_BUCKET = "GLOBAL";
        private static final EventListener<String> GET_BUCKET_LISTENER = new EventListener<String>(){

            @Override
            public String onRemoteTaskEvent(RemoteTaskEvent event) {
                return "task_" + event.getTaskStatus().getTaskId().getStageId().toString();
            }

            @Override
            public String onRemoteTaskExchangeUpdatedSinkAcquired(RemoteTaskExchangeUpdatedSinkAcquired event) {
                return "task_" + event.getTaskId().getStageId().toString();
            }

            @Override
            public String onStageEvent(StageEvent event) {
                return event.getStageId().toString();
            }

            @Override
            public String onEvent(Event event) {
                return EventDebugInfos.GLOBAL_EVENTS_BUCKET;
            }
        };
        private final String queryId;
        private final int eventsPerBucket;
        private long eventsCounter;
        private long filteredEventsCounter;
        private SoftReference<ListMultimap<String, String>> eventsDebugInfosReference;

        private EventDebugInfos(String queryId, int eventsPerBucket) {
            this.queryId = Objects.requireNonNull(queryId, "queryId is null");
            this.eventsPerBucket = eventsPerBucket;
            this.eventsDebugInfosReference = new SoftReference<LinkedListMultimap>(LinkedListMultimap.create());
        }

        private boolean add(Event event) {
            ListMultimap<String, String> eventsDebugInfos = this.getEventsDebugInfos();
            String bucket = this.getBucket(event);
            Optional<String> debugInfo = this.getFullDebugInfo(this.eventsCounter, event);
            ++this.eventsCounter;
            if (debugInfo.isEmpty()) {
                ++this.filteredEventsCounter;
                return false;
            }
            List bucketDebugInfos = eventsDebugInfos.get((Object)bucket);
            bucketDebugInfos.add(debugInfo.get());
            if (bucketDebugInfos.size() > this.eventsPerBucket) {
                Iterator iterator = bucketDebugInfos.iterator();
                iterator.next();
                iterator.remove();
            }
            return true;
        }

        private ListMultimap<String, String> getEventsDebugInfos() {
            LinkedListMultimap eventsDebugInfos = this.eventsDebugInfosReference.get();
            if (eventsDebugInfos == null) {
                log.debug("eventsDebugInfos for %s has been cleared", new Object[]{this.queryId});
                eventsDebugInfos = LinkedListMultimap.create();
                this.eventsDebugInfosReference = new SoftReference<LinkedListMultimap>(eventsDebugInfos);
            }
            return eventsDebugInfos;
        }

        private String getBucket(Event event) {
            if (event == Event.WAKE_UP || event == Event.ABORT) {
                return GLOBAL_EVENTS_BUCKET;
            }
            return event.accept(GET_BUCKET_LISTENER);
        }

        private Optional<String> getFullDebugInfo(long eventId, Event event) {
            return EventDebugInfos.getEventDebugInfo(event).map(info -> "[" + eventId + "/" + System.currentTimeMillis() + "/" + info + "]");
        }

        private static Optional<String> getEventDebugInfo(Event event) {
            SplitAssignmentEvent splitAssignmentEvent;
            if (event == Event.WAKE_UP) {
                return Optional.of("WAKE_UP");
            }
            if (event == Event.ABORT) {
                return Optional.of("ABORT");
            }
            if (event instanceof SplitAssignmentEvent && (splitAssignmentEvent = (SplitAssignmentEvent)event).getAssignmentResult().isEmpty()) {
                return Optional.empty();
            }
            return Optional.of(event.toString());
        }

        public void log() {
            if (!log.isDebugEnabled()) {
                return;
            }
            ListMultimap<String, String> eventsDebugInfos = this.getEventsDebugInfos();
            eventsDebugInfos.asMap().entrySet().stream().sorted(Map.Entry.comparingByKey()).forEachOrdered(entry -> {
                log.debug("Recent events for " + (String)entry.getKey());
                for (String eventDebugInfo : (Collection)entry.getValue()) {
                    log.debug("   " + eventDebugInfo);
                }
            });
            log.debug("Filtered events count " + this.filteredEventsCounter);
        }
    }
}

