/*
 * Decompiled with CFR 0.152.
 */
package io.trino.execution.scheduler;

import com.google.common.base.Preconditions;
import com.google.common.base.Stopwatch;
import com.google.common.base.Throwables;
import com.google.common.base.Ticker;
import com.google.common.base.Verify;
import com.google.common.base.VerifyException;
import com.google.common.collect.ArrayListMultimap;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableListMultimap;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Multimap;
import com.google.common.collect.Ordering;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.SettableFuture;
import io.airlift.concurrent.MoreFutures;
import io.airlift.log.Logger;
import io.airlift.units.DataSize;
import io.trino.Session;
import io.trino.SystemSessionProperties;
import io.trino.exchange.SpoolingExchangeInput;
import io.trino.execution.ExecutionFailureInfo;
import io.trino.execution.RemoteTask;
import io.trino.execution.SqlStage;
import io.trino.execution.StageId;
import io.trino.execution.TaskId;
import io.trino.execution.TaskState;
import io.trino.execution.TaskStatus;
import io.trino.execution.buffer.SpoolingOutputBuffers;
import io.trino.execution.scheduler.ErrorCodes;
import io.trino.execution.scheduler.Exchanges;
import io.trino.execution.scheduler.FaultTolerantPartitioningScheme;
import io.trino.execution.scheduler.NodeAllocator;
import io.trino.execution.scheduler.NodeRequirements;
import io.trino.execution.scheduler.PartitionMemoryEstimator;
import io.trino.execution.scheduler.TaskDescriptor;
import io.trino.execution.scheduler.TaskDescriptorStorage;
import io.trino.execution.scheduler.TaskExecutionStats;
import io.trino.execution.scheduler.TaskSource;
import io.trino.execution.scheduler.TaskSourceFactory;
import io.trino.failuredetector.FailureDetector;
import io.trino.metadata.InternalNode;
import io.trino.metadata.Split;
import io.trino.operator.ExchangeOperator;
import io.trino.server.DynamicFilterService;
import io.trino.spi.ErrorCode;
import io.trino.spi.ErrorCodeSupplier;
import io.trino.spi.ErrorType;
import io.trino.spi.StandardErrorCode;
import io.trino.spi.TrinoException;
import io.trino.spi.exchange.Exchange;
import io.trino.spi.exchange.ExchangeSinkHandle;
import io.trino.spi.exchange.ExchangeSinkInstanceHandle;
import io.trino.spi.exchange.ExchangeSourceHandle;
import io.trino.spi.exchange.ExchangeSourceOutputSelector;
import io.trino.split.RemoteSplit;
import io.trino.sql.planner.SystemPartitioningHandle;
import io.trino.sql.planner.plan.PlanFragmentId;
import io.trino.sql.planner.plan.PlanNode;
import io.trino.sql.planner.plan.PlanNodeId;
import io.trino.sql.planner.plan.RemoteSourceNode;
import io.trino.util.Failures;
import java.time.Duration;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.Executor;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicInteger;
import javax.annotation.concurrent.GuardedBy;

@Deprecated
public class FaultTolerantStageScheduler {
    private static final Logger log = Logger.get(FaultTolerantStageScheduler.class);
    private final Session session;
    private final SqlStage stage;
    private final FailureDetector failureDetector;
    private final TaskSourceFactory taskSourceFactory;
    private final NodeAllocator nodeAllocator;
    private final TaskDescriptorStorage taskDescriptorStorage;
    private final PartitionMemoryEstimator partitionMemoryEstimator;
    private final TaskExecutionStats taskExecutionStats;
    private final int maxRetryAttemptsPerTask;
    private final int maxTasksWaitingForNodePerStage;
    private final Exchange sinkExchange;
    private final FaultTolerantPartitioningScheme sinkPartitioningScheme;
    private final Map<PlanFragmentId, FaultTolerantStageScheduler> sourceSchedulers;
    private final Map<PlanFragmentId, Exchange> sourceExchanges;
    private final FaultTolerantPartitioningScheme sourcePartitioningScheme;
    private final DelayedFutureCompletor futureCompletor;
    @GuardedBy(value="this")
    private ListenableFuture<Void> blocked = Futures.immediateVoidFuture();
    @GuardedBy(value="this")
    private ListenableFuture<Void> tasksPopulatedFuture = Futures.immediateVoidFuture();
    @GuardedBy(value="this")
    private SettableFuture<Void> taskFinishedFuture;
    private final Duration minRetryDelay;
    private final Duration maxRetryDelay;
    private final double retryDelayScaleFactor;
    @GuardedBy(value="this")
    private Optional<Duration> delaySchedulingDuration = Optional.empty();
    @GuardedBy(value="this")
    private final Stopwatch delayStopwatch;
    @GuardedBy(value="this")
    private SettableFuture<Void> delaySchedulingFuture;
    @GuardedBy(value="this")
    private TaskSource taskSource;
    @GuardedBy(value="this")
    private final Map<Integer, ExchangeSinkHandle> partitionToExchangeSinkHandleMap = new HashMap<Integer, ExchangeSinkHandle>();
    @GuardedBy(value="this")
    private final Multimap<Integer, RemoteTask> partitionToRemoteTaskMap = ArrayListMultimap.create();
    @GuardedBy(value="this")
    private final Map<TaskId, RemoteTask> runningTasks = new HashMap<TaskId, RemoteTask>();
    @GuardedBy(value="this")
    private final Map<TaskId, NodeAllocator.NodeLease> runningNodes = new HashMap<TaskId, NodeAllocator.NodeLease>();
    @GuardedBy(value="this")
    private final Set<Integer> allPartitions = new HashSet<Integer>();
    @GuardedBy(value="this")
    private boolean noMorePartitions;
    @GuardedBy(value="this")
    private final Queue<Integer> queuedPartitions = new ArrayDeque<Integer>();
    @GuardedBy(value="this")
    private final Queue<PendingPartition> pendingPartitions = new ArrayDeque<PendingPartition>();
    @GuardedBy(value="this")
    private final Map<Integer, Integer> finishedPartitions = new HashMap<Integer, Integer>();
    @GuardedBy(value="this")
    private final AtomicInteger remainingRetryAttemptsOverall;
    @GuardedBy(value="this")
    private final Map<Integer, Integer> remainingAttemptsPerTask = new HashMap<Integer, Integer>();
    @GuardedBy(value="this")
    private final Map<Integer, PartitionMemoryEstimator.MemoryRequirements> partitionMemoryRequirements = new HashMap<Integer, PartitionMemoryEstimator.MemoryRequirements>();
    @GuardedBy(value="this")
    private Multimap<PlanNodeId, Split> outputSelectorSplits;
    private final DynamicFilterService dynamicFilterService;
    @GuardedBy(value="this")
    private Throwable failure;
    @GuardedBy(value="this")
    private boolean closed;

    public FaultTolerantStageScheduler(Session session, SqlStage stage, FailureDetector failureDetector, TaskSourceFactory taskSourceFactory, NodeAllocator nodeAllocator, TaskDescriptorStorage taskDescriptorStorage, PartitionMemoryEstimator partitionMemoryEstimator, TaskExecutionStats taskExecutionStats, DelayedFutureCompletor futureCompletor, Ticker ticker, Exchange sinkExchange, FaultTolerantPartitioningScheme sinkPartitioningScheme, Map<PlanFragmentId, FaultTolerantStageScheduler> sourceSchedulers, Map<PlanFragmentId, Exchange> sourceExchanges, FaultTolerantPartitioningScheme sourcePartitioningScheme, AtomicInteger remainingRetryAttemptsOverall, int taskRetryAttemptsPerTask, int maxTasksWaitingForNodePerStage, DynamicFilterService dynamicFilterService) {
        this.session = Objects.requireNonNull(session, "session is null");
        this.stage = Objects.requireNonNull(stage, "stage is null");
        this.failureDetector = Objects.requireNonNull(failureDetector, "failureDetector is null");
        this.taskSourceFactory = Objects.requireNonNull(taskSourceFactory, "taskSourceFactory is null");
        this.nodeAllocator = Objects.requireNonNull(nodeAllocator, "nodeAllocator is null");
        this.taskDescriptorStorage = Objects.requireNonNull(taskDescriptorStorage, "taskDescriptorStorage is null");
        this.partitionMemoryEstimator = Objects.requireNonNull(partitionMemoryEstimator, "partitionMemoryEstimator is null");
        this.taskExecutionStats = Objects.requireNonNull(taskExecutionStats, "taskExecutionStats is null");
        this.futureCompletor = Objects.requireNonNull(futureCompletor, "futureCompletor is null");
        this.sinkExchange = Objects.requireNonNull(sinkExchange, "sinkExchange is null");
        this.sinkPartitioningScheme = Objects.requireNonNull(sinkPartitioningScheme, "sinkPartitioningScheme is null");
        Set sourceFragments = (Set)stage.getFragment().getRemoteSourceNodes().stream().flatMap(remoteSource -> remoteSource.getSourceFragmentIds().stream()).collect(ImmutableSet.toImmutableSet());
        Objects.requireNonNull(sourceSchedulers, "sourceSchedulers is null");
        Preconditions.checkArgument((boolean)sourceSchedulers.keySet().containsAll(sourceFragments), (Object)"sourceSchedulers map is incomplete");
        this.sourceSchedulers = ImmutableMap.copyOf(sourceSchedulers);
        Objects.requireNonNull(sourceExchanges, "sourceExchanges is null");
        Preconditions.checkArgument((boolean)sourceExchanges.keySet().containsAll(sourceFragments), (Object)"sourceExchanges map is incomplete");
        this.sourceExchanges = ImmutableMap.copyOf(sourceExchanges);
        this.sourcePartitioningScheme = Objects.requireNonNull(sourcePartitioningScheme, "sourcePartitioningScheme is null");
        this.remainingRetryAttemptsOverall = Objects.requireNonNull(remainingRetryAttemptsOverall, "remainingRetryAttemptsOverall is null");
        this.maxRetryAttemptsPerTask = taskRetryAttemptsPerTask;
        this.maxTasksWaitingForNodePerStage = maxTasksWaitingForNodePerStage;
        this.minRetryDelay = Duration.ofMillis(SystemSessionProperties.getRetryInitialDelay(session).toMillis());
        this.maxRetryDelay = Duration.ofMillis(SystemSessionProperties.getRetryMaxDelay(session).toMillis());
        this.retryDelayScaleFactor = SystemSessionProperties.getRetryDelayScaleFactor(session);
        this.dynamicFilterService = Objects.requireNonNull(dynamicFilterService, "dynamicFilterService is null");
        this.delayStopwatch = Stopwatch.createUnstarted((Ticker)ticker);
    }

    public StageId getStageId() {
        return this.stage.getStageId();
    }

    public synchronized ListenableFuture<Void> isBlocked() {
        return Futures.nonCancellationPropagating(this.blocked);
    }

    public synchronized void schedule() throws Exception {
        if (this.failure != null) {
            Throwables.propagateIfPossible((Throwable)this.failure, Exception.class);
            throw new RuntimeException(this.failure);
        }
        if (this.closed) {
            return;
        }
        if (this.isFinished()) {
            return;
        }
        if (!this.blocked.isDone()) {
            return;
        }
        if (this.delaySchedulingFuture != null && !this.delaySchedulingFuture.isDone()) {
            this.blocked = this.delaySchedulingFuture;
            return;
        }
        if (this.taskSource == null) {
            Map sourceHandles = (Map)this.sourceExchanges.entrySet().stream().collect(ImmutableMap.toImmutableMap(Map.Entry::getKey, entry -> Exchanges.getAllSourceHandles(((Exchange)entry.getValue()).getSourceHandles())));
            List blockedFutures = (List)sourceHandles.values().stream().filter(future -> !future.isDone()).collect(ImmutableList.toImmutableList());
            if (!blockedFutures.isEmpty()) {
                this.blocked = MoreFutures.asVoid((ListenableFuture)Futures.allAsList((Iterable)blockedFutures));
                return;
            }
            Multimap exchangeSources = (Multimap)sourceHandles.entrySet().stream().collect(ImmutableListMultimap.flatteningToImmutableListMultimap(Map.Entry::getKey, entry -> ((List)MoreFutures.getFutureValue((Future)((Future)entry.getValue()))).stream()));
            this.taskSource = this.taskSourceFactory.create(this.session, this.stage.getFragment(), (Multimap<PlanFragmentId, ExchangeSourceHandle>)exchangeSources, this.stage::recordGetSplitTime, this.sourcePartitioningScheme);
        }
        while (!(this.pendingPartitions.isEmpty() && this.queuedPartitions.isEmpty() && this.taskSource.isFinished())) {
            while (this.queuedPartitions.isEmpty() && this.pendingPartitions.size() < this.maxTasksWaitingForNodePerStage && !this.taskSource.isFinished()) {
                this.tasksPopulatedFuture = Futures.transform(this.taskSource.getMoreTasks(), tasks -> {
                    FaultTolerantStageScheduler faultTolerantStageScheduler = this;
                    synchronized (faultTolerantStageScheduler) {
                        for (TaskDescriptor task : tasks) {
                            this.queuedPartitions.add(task.getPartitionId());
                            this.allPartitions.add(task.getPartitionId());
                            this.taskDescriptorStorage.put(this.stage.getStageId(), task);
                            ExchangeSinkHandle exchangeSinkHandle = this.sinkExchange.addSink(task.getPartitionId());
                            this.partitionToExchangeSinkHandleMap.put(task.getPartitionId(), exchangeSinkHandle);
                        }
                        if (this.taskSource.isFinished()) {
                            this.dynamicFilterService.stageCannotScheduleMoreTasks(this.stage.getStageId(), 0, this.allPartitions.size());
                            this.sinkExchange.noMoreSinks();
                            this.noMorePartitions = true;
                        }
                        if (this.noMorePartitions && this.finishedPartitions.keySet().containsAll(this.allPartitions)) {
                            this.sinkExchange.allRequiredSinksFinished();
                        }
                        return null;
                    }
                }, (Executor)MoreExecutors.directExecutor());
                if (this.tasksPopulatedFuture.isDone()) continue;
                this.blocked = this.tasksPopulatedFuture;
                return;
            }
            Iterator pendingPartitionsIterator = this.pendingPartitions.iterator();
            boolean startedTask = false;
            while (pendingPartitionsIterator.hasNext()) {
                PendingPartition pendingPartition = (PendingPartition)pendingPartitionsIterator.next();
                if (!pendingPartition.getNodeLease().getNode().isDone()) continue;
                PartitionMemoryEstimator.MemoryRequirements memoryRequirements = this.partitionMemoryRequirements.get(pendingPartition.getPartition());
                Verify.verify((memoryRequirements != null ? 1 : 0) != 0, (String)"no entry for %s.%s in partitionMemoryRequirements", (Object)this.stage.getStageId(), (int)pendingPartition.getPartition());
                this.startTask(pendingPartition.getPartition(), pendingPartition.getNodeLease(), memoryRequirements);
                startedTask = true;
                pendingPartitionsIterator.remove();
            }
            if (!startedTask && (this.queuedPartitions.isEmpty() || this.pendingPartitions.size() >= this.maxTasksWaitingForNodePerStage)) break;
            while (this.pendingPartitions.size() < this.maxTasksWaitingForNodePerStage && !this.queuedPartitions.isEmpty()) {
                int partition = this.queuedPartitions.poll();
                Optional<TaskDescriptor> taskDescriptorOptional = this.taskDescriptorStorage.get(this.stage.getStageId(), partition);
                if (taskDescriptorOptional.isEmpty()) {
                    return;
                }
                TaskDescriptor taskDescriptor = taskDescriptorOptional.get();
                DataSize defaultTaskMemory = this.stage.getFragment().getPartitioning().equals(SystemPartitioningHandle.COORDINATOR_DISTRIBUTION) ? SystemSessionProperties.getFaultTolerantExecutionDefaultCoordinatorTaskMemory(this.session) : SystemSessionProperties.getFaultTolerantExecutionDefaultTaskMemory(this.session);
                PartitionMemoryEstimator.MemoryRequirements memoryRequirements = this.partitionMemoryRequirements.computeIfAbsent(partition, ignored -> this.partitionMemoryEstimator.getInitialMemoryRequirements(this.session, defaultTaskMemory));
                log.debug("Computed initial memory requirements for task from stage %s; requirements=%s; estimator=%s", new Object[]{this.stage.getStageId(), memoryRequirements, this.partitionMemoryEstimator});
                NodeRequirements nodeRequirements = taskDescriptor.getNodeRequirements();
                NodeAllocator.NodeLease nodeLease = this.nodeAllocator.acquire(nodeRequirements, memoryRequirements.getRequiredMemory());
                this.pendingPartitions.add(new PendingPartition(partition, nodeLease));
            }
        }
        ArrayList<Object> futures = new ArrayList<Object>();
        if (this.taskFinishedFuture != null && !this.taskFinishedFuture.isDone()) {
            futures.add(this.taskFinishedFuture);
        }
        for (PendingPartition pendingPartition : this.pendingPartitions) {
            futures.add(pendingPartition.getNodeLease().getNode());
        }
        if (!futures.isEmpty()) {
            this.blocked = MoreFutures.asVoid((ListenableFuture)MoreFutures.whenAnyComplete(futures));
        }
    }

    @GuardedBy(value="this")
    private void startTask(int partition, NodeAllocator.NodeLease nodeLease, PartitionMemoryEstimator.MemoryRequirements memoryRequirements) {
        Optional<TaskDescriptor> taskDescriptorOptional = this.taskDescriptorStorage.get(this.stage.getStageId(), partition);
        if (taskDescriptorOptional.isEmpty()) {
            return;
        }
        TaskDescriptor taskDescriptor = taskDescriptorOptional.get();
        InternalNode node = (InternalNode)MoreFutures.getFutureValue(nodeLease.getNode());
        int attemptId = this.getNextAttemptIdForPartition(partition);
        ExchangeSinkHandle sinkHandle = this.partitionToExchangeSinkHandleMap.get(partition);
        ExchangeSinkInstanceHandle exchangeSinkInstanceHandle = this.sinkExchange.instantiateSink(sinkHandle, attemptId);
        SpoolingOutputBuffers outputBuffers = SpoolingOutputBuffers.createInitial(exchangeSinkInstanceHandle, this.sinkPartitioningScheme.getPartitionCount());
        ImmutableSet allSourcePlanNodeIds = ImmutableSet.builder().addAll(this.stage.getFragment().getPartitionedSources()).addAll(this.stage.getFragment().getRemoteSourceNodes().stream().map(PlanNode::getId).iterator()).build();
        this.createOutputSelectorSplitsIfNecessary();
        RemoteTask task = this.stage.createTask(node, partition, attemptId, this.sinkPartitioningScheme.getBucketToPartitionMap(), outputBuffers, (Multimap<PlanNodeId, Split>)ImmutableListMultimap.builder().putAll(this.outputSelectorSplits).putAll(taskDescriptor.getSplits()).build(), (Set<PlanNodeId>)allSourcePlanNodeIds, Optional.of(memoryRequirements.getRequiredMemory())).orElseThrow(() -> new VerifyException("stage execution is expected to be active"));
        nodeLease.attachTaskId(task.getTaskId());
        this.partitionToRemoteTaskMap.put((Object)partition, (Object)task);
        this.runningTasks.put(task.getTaskId(), task);
        this.runningNodes.put(task.getTaskId(), nodeLease);
        if (this.taskFinishedFuture == null) {
            this.taskFinishedFuture = SettableFuture.create();
        }
        task.addStateChangeListener(taskStatus -> this.updateTaskStatus((TaskStatus)taskStatus, sinkHandle));
        task.addFinalTaskInfoListener(this.taskExecutionStats::update);
        task.start();
    }

    @GuardedBy(value="this")
    private void createOutputSelectorSplitsIfNecessary() {
        if (this.outputSelectorSplits != null) {
            return;
        }
        ImmutableListMultimap.Builder selectors = ImmutableListMultimap.builder();
        for (RemoteSourceNode remoteSource : this.stage.getFragment().getRemoteSourceNodes()) {
            List<PlanFragmentId> sourceFragmentIds = remoteSource.getSourceFragmentIds();
            Set sourceExchangeIds = (Set)this.sourceExchanges.entrySet().stream().filter(entry -> sourceFragmentIds.contains(entry.getKey())).map(entry -> ((Exchange)entry.getValue()).getId()).collect(ImmutableSet.toImmutableSet());
            ExchangeSourceOutputSelector.Builder selector = ExchangeSourceOutputSelector.builder((Set)sourceExchangeIds);
            for (PlanFragmentId sourceFragment : sourceFragmentIds) {
                FaultTolerantStageScheduler sourceScheduler = this.sourceSchedulers.get(sourceFragment);
                Exchange sourceExchange = this.sourceExchanges.get(sourceFragment);
                Map<Integer, Integer> successfulAttempts = sourceScheduler.getSuccessfulAttempts();
                successfulAttempts.forEach((taskPartitionId, attemptId) -> selector.include(sourceExchange.getId(), taskPartitionId.intValue(), attemptId.intValue()));
                selector.setPartitionCount(sourceExchange.getId(), successfulAttempts.size());
            }
            selector.setFinal();
            selectors.put((Object)remoteSource.getId(), (Object)new Split(ExchangeOperator.REMOTE_CATALOG_HANDLE, new RemoteSplit(new SpoolingExchangeInput((List<ExchangeSourceHandle>)ImmutableList.of(), Optional.of(selector.build())))));
        }
        this.outputSelectorSplits = selectors.build();
    }

    /*
     * Enabled force condition propagation
     * Lifted jumps to return sites
     */
    public synchronized boolean isFinished() {
        if (this.failure != null) return false;
        if (this.taskSource == null) return false;
        if (!this.taskSource.isFinished()) return false;
        if (!this.tasksPopulatedFuture.isDone()) return false;
        if (!this.queuedPartitions.isEmpty()) return false;
        if (!this.allPartitions.stream().allMatch(this.finishedPartitions::containsKey)) return false;
        return true;
    }

    public synchronized Map<Integer, Integer> getSuccessfulAttempts() {
        return ImmutableMap.copyOf(this.finishedPartitions);
    }

    public void cancel() {
        this.close(false);
    }

    public void abort() {
        this.close(true);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void fail(Throwable t) {
        FaultTolerantStageScheduler faultTolerantStageScheduler = this;
        synchronized (faultTolerantStageScheduler) {
            if (this.failure == null) {
                this.failure = t;
            }
        }
        this.close(true);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void close(boolean abort) {
        boolean closed;
        FaultTolerantStageScheduler faultTolerantStageScheduler = this;
        synchronized (faultTolerantStageScheduler) {
            closed = this.closed;
            this.closed = true;
        }
        if (!closed) {
            this.cancelRunningTasks(abort);
            this.cancelBlockedFuture();
            this.releasePendingNodes();
            this.closeTaskSource();
            this.closeSinkExchange();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void cancelRunningTasks(boolean abort) {
        ImmutableList tasks;
        FaultTolerantStageScheduler faultTolerantStageScheduler = this;
        synchronized (faultTolerantStageScheduler) {
            tasks = ImmutableList.copyOf(this.runningTasks.values());
        }
        if (abort) {
            tasks.forEach(RemoteTask::abort);
        } else {
            tasks.forEach(RemoteTask::cancel);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void cancelBlockedFuture() {
        ListenableFuture<Void> future;
        Verify.verify((!Thread.holdsLock(this) ? 1 : 0) != 0);
        FaultTolerantStageScheduler faultTolerantStageScheduler = this;
        synchronized (faultTolerantStageScheduler) {
            future = this.blocked;
        }
        if (future != null && !future.isDone()) {
            future.cancel(true);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void releasePendingNodes() {
        Verify.verify((!Thread.holdsLock(this) ? 1 : 0) != 0);
        ArrayList<NodeAllocator.NodeLease> leases = new ArrayList<NodeAllocator.NodeLease>();
        FaultTolerantStageScheduler faultTolerantStageScheduler = this;
        synchronized (faultTolerantStageScheduler) {
            for (PendingPartition pendingPartition : this.pendingPartitions) {
                leases.add(pendingPartition.getNodeLease());
            }
            this.pendingPartitions.clear();
        }
        for (NodeAllocator.NodeLease lease : leases) {
            lease.release();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void closeTaskSource() {
        TaskSource taskSource;
        FaultTolerantStageScheduler faultTolerantStageScheduler = this;
        synchronized (faultTolerantStageScheduler) {
            taskSource = this.taskSource;
        }
        if (taskSource != null) {
            try {
                taskSource.close();
            }
            catch (RuntimeException e) {
                log.warn((Throwable)e, "Error closing task source for stage: %s", new Object[]{this.stage.getStageId()});
            }
        }
    }

    private void closeSinkExchange() {
        try {
            this.sinkExchange.close();
        }
        catch (RuntimeException e) {
            log.warn((Throwable)e, "Error closing sink exchange for stage: %s", new Object[]{this.stage.getStageId()});
        }
    }

    private int getNextAttemptIdForPartition(int partition) {
        int latestAttemptId = this.partitionToRemoteTaskMap.get((Object)partition).stream().mapToInt(task -> task.getTaskId().getAttemptId()).max().orElse(-1);
        return latestAttemptId + 1;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void updateTaskStatus(TaskStatus taskStatus, ExchangeSinkHandle exchangeSinkHandle) {
        TaskState state = taskStatus.getState();
        if (!state.isDone()) {
            return;
        }
        try {
            SettableFuture<Void> previousTaskFinishedFuture;
            Throwable failure = null;
            SettableFuture<Void> previousDelaySchedulingFuture = null;
            FaultTolerantStageScheduler faultTolerantStageScheduler = this;
            synchronized (faultTolerantStageScheduler) {
                TaskId taskId = taskStatus.getTaskId();
                this.runningTasks.remove(taskId);
                previousTaskFinishedFuture = this.taskFinishedFuture;
                this.taskFinishedFuture = !this.runningTasks.isEmpty() ? SettableFuture.create() : null;
                NodeAllocator.NodeLease nodeLease = Objects.requireNonNull(this.runningNodes.remove(taskId), () -> "node not found for task id: " + taskId);
                nodeLease.release();
                int partitionId = taskId.getPartitionId();
                if (!this.finishedPartitions.containsKey(partitionId) && !this.closed) {
                    PartitionMemoryEstimator.MemoryRequirements memoryLimits = this.partitionMemoryRequirements.get(partitionId);
                    Verify.verify((memoryLimits != null ? 1 : 0) != 0);
                    switch (state) {
                        case FINISHED: {
                            this.finishedPartitions.put(partitionId, taskId.getAttemptId());
                            this.sinkExchange.sinkFinished(exchangeSinkHandle, taskId.getAttemptId());
                            if (this.noMorePartitions && this.finishedPartitions.keySet().containsAll(this.allPartitions)) {
                                this.sinkExchange.allRequiredSinksFinished();
                            }
                            this.partitionToRemoteTaskMap.get((Object)partitionId).forEach(RemoteTask::abort);
                            this.partitionMemoryEstimator.registerPartitionFinished(this.session, memoryLimits, taskStatus.getPeakMemoryReservation(), true, Optional.empty());
                            if (this.delayStopwatch.isRunning() && this.delayStopwatch.elapsed().compareTo(this.delaySchedulingDuration.get()) > 0) {
                                previousDelaySchedulingFuture = this.delaySchedulingFuture;
                                this.delayStopwatch.reset();
                                this.delaySchedulingDuration = Optional.empty();
                                this.delaySchedulingFuture = null;
                            }
                            this.taskDescriptorStorage.remove(this.stage.getStageId(), partitionId);
                            break;
                        }
                        case CANCELED: {
                            log.debug("Task cancelled: %s", new Object[]{taskId});
                            break;
                        }
                        case ABORTED: {
                            log.debug("Task aborted: %s", new Object[]{taskId});
                            break;
                        }
                        case FAILED: {
                            ExecutionFailureInfo failureInfo = taskStatus.getFailures().stream().findFirst().map(this::rewriteTransportFailure).orElse(Failures.toFailure(new TrinoException((ErrorCodeSupplier)StandardErrorCode.GENERIC_INTERNAL_ERROR, "A task failed for an unknown reason")));
                            log.warn((Throwable)failureInfo.toException(), "Task failed: %s", new Object[]{taskId});
                            ErrorCode errorCode = failureInfo.getErrorCode();
                            this.partitionMemoryEstimator.registerPartitionFinished(this.session, memoryLimits, taskStatus.getPeakMemoryReservation(), false, Optional.ofNullable(errorCode));
                            boolean coordinatorStage = this.stage.getFragment().getPartitioning().equals(SystemPartitioningHandle.COORDINATOR_DISTRIBUTION);
                            int taskRemainingAttempts = this.remainingAttemptsPerTask.getOrDefault(partitionId, coordinatorStage ? 0 : this.maxRetryAttemptsPerTask);
                            if (this.remainingRetryAttemptsOverall.get() > 0 && taskRemainingAttempts > 0 && (errorCode == null || errorCode.getType() != ErrorType.USER_ERROR)) {
                                this.remainingRetryAttemptsOverall.decrementAndGet();
                                this.remainingAttemptsPerTask.put(partitionId, taskRemainingAttempts - 1);
                                PartitionMemoryEstimator.MemoryRequirements newMemoryLimits = this.partitionMemoryEstimator.getNextRetryMemoryRequirements(this.session, memoryLimits, taskStatus.getPeakMemoryReservation(), errorCode);
                                log.debug("Computed next memory requirements for task from stage %s; previous=%s; new=%s; peak=%s; estimator=%s", new Object[]{this.stage.getStageId(), memoryLimits, newMemoryLimits, taskStatus.getPeakMemoryReservation(), this.partitionMemoryEstimator});
                                if (errorCode != null && ErrorCodes.isOutOfMemoryError(errorCode) && (double)newMemoryLimits.getRequiredMemory().toBytes() * 0.99 <= (double)taskStatus.getPeakMemoryReservation().toBytes()) {
                                    String message = String.format("Cannot allocate enough memory for task %s. Reported peak memory reservation: %s. Maximum possible reservation: %s.", taskId, taskStatus.getPeakMemoryReservation(), newMemoryLimits.getRequiredMemory());
                                    failure = new TrinoException(() -> errorCode, message, (Throwable)failureInfo.toException());
                                    break;
                                }
                                this.partitionMemoryRequirements.put(partitionId, newMemoryLimits);
                                this.queuedPartitions.add(partitionId);
                                log.debug("Retrying partition %s for stage %s", new Object[]{partitionId, this.stage.getStageId()});
                                if (errorCode == null || !this.shouldDelayScheduling(errorCode)) break;
                                if (this.delayStopwatch.isRunning()) {
                                    SettableFuture newDelaySchedulingFuture;
                                    Preconditions.checkState((boolean)this.delaySchedulingDuration.isPresent());
                                    if (this.delayStopwatch.elapsed().compareTo(this.delaySchedulingDuration.get()) <= 0) break;
                                    this.delayStopwatch.reset().start();
                                    this.delaySchedulingDuration = this.delaySchedulingDuration.map(duration -> (Duration)Ordering.natural().min((Object)Duration.ofMillis((long)((double)duration.toMillis() * this.retryDelayScaleFactor)), (Object)this.maxRetryDelay));
                                    previousDelaySchedulingFuture = this.delaySchedulingFuture;
                                    this.delaySchedulingFuture = newDelaySchedulingFuture = SettableFuture.create();
                                    this.futureCompletor.completeFuture((SettableFuture<Void>)newDelaySchedulingFuture, this.delaySchedulingDuration.get());
                                    break;
                                }
                                this.delayStopwatch.start();
                                this.delaySchedulingDuration = Optional.of(this.minRetryDelay);
                                this.delaySchedulingFuture = SettableFuture.create();
                                this.futureCompletor.completeFuture(this.delaySchedulingFuture, this.delaySchedulingDuration.get());
                                break;
                            }
                            failure = failureInfo.toException();
                            break;
                        }
                        default: {
                            throw new IllegalArgumentException("Unexpected task state: " + state);
                        }
                    }
                }
            }
            if (failure != null) {
                this.fail(failure);
            }
            if (previousTaskFinishedFuture != null && !previousTaskFinishedFuture.isDone()) {
                previousTaskFinishedFuture.set(null);
            }
            if (previousDelaySchedulingFuture != null && !previousDelaySchedulingFuture.isDone()) {
                previousDelaySchedulingFuture.set(null);
            }
        }
        catch (Throwable t) {
            this.fail(t);
        }
    }

    private boolean shouldDelayScheduling(ErrorCode errorCode) {
        return errorCode.getType() == ErrorType.INTERNAL_ERROR || errorCode.getType() == ErrorType.EXTERNAL;
    }

    private ExecutionFailureInfo rewriteTransportFailure(ExecutionFailureInfo executionFailureInfo) {
        if (executionFailureInfo.getRemoteHost() == null || this.failureDetector.getState(executionFailureInfo.getRemoteHost()) != FailureDetector.State.GONE) {
            return executionFailureInfo;
        }
        return new ExecutionFailureInfo(executionFailureInfo.getType(), executionFailureInfo.getMessage(), executionFailureInfo.getCause(), executionFailureInfo.getSuppressed(), executionFailureInfo.getStack(), executionFailureInfo.getErrorLocation(), StandardErrorCode.REMOTE_HOST_GONE.toErrorCode(), executionFailureInfo.getRemoteHost());
    }

    public static interface DelayedFutureCompletor {
        public void completeFuture(SettableFuture<Void> var1, Duration var2);
    }

    private static class PendingPartition {
        private final int partition;
        private final NodeAllocator.NodeLease nodeLease;

        public PendingPartition(int partition, NodeAllocator.NodeLease nodeLease) {
            this.partition = partition;
            this.nodeLease = Objects.requireNonNull(nodeLease, "nodeLease is null");
        }

        public int getPartition() {
            return this.partition;
        }

        public NodeAllocator.NodeLease getNodeLease() {
            return this.nodeLease;
        }
    }
}

