/*
 * Decompiled with CFR 0.152.
 */
package com.facebook.presto.execution.scheduler;

import com.facebook.airlift.concurrent.MoreFutures;
import com.facebook.airlift.concurrent.SetThreadName;
import com.facebook.airlift.http.client.HttpUriBuilder;
import com.facebook.airlift.log.Logger;
import com.facebook.airlift.stats.TimeStat;
import com.facebook.presto.Session;
import com.facebook.presto.SystemSessionProperties;
import com.facebook.presto.cost.StatsAndCosts;
import com.facebook.presto.execution.BasicStageExecutionStats;
import com.facebook.presto.execution.ExecutionFailureInfo;
import com.facebook.presto.execution.LocationFactory;
import com.facebook.presto.execution.PartialResultQueryManager;
import com.facebook.presto.execution.QueryState;
import com.facebook.presto.execution.QueryStateMachine;
import com.facebook.presto.execution.RemoteTask;
import com.facebook.presto.execution.RemoteTaskFactory;
import com.facebook.presto.execution.SqlStageExecution;
import com.facebook.presto.execution.StageExecutionInfo;
import com.facebook.presto.execution.StageExecutionState;
import com.facebook.presto.execution.StageId;
import com.facebook.presto.execution.StageInfo;
import com.facebook.presto.execution.buffer.OutputBuffers;
import com.facebook.presto.execution.scheduler.ExchangeLocationsConsumer;
import com.facebook.presto.execution.scheduler.ExecutionPolicy;
import com.facebook.presto.execution.scheduler.ExecutionSchedule;
import com.facebook.presto.execution.scheduler.PartialResultQueryTaskTracker;
import com.facebook.presto.execution.scheduler.ScheduleResult;
import com.facebook.presto.execution.scheduler.SectionExecution;
import com.facebook.presto.execution.scheduler.SectionExecutionFactory;
import com.facebook.presto.execution.scheduler.SplitSchedulerStats;
import com.facebook.presto.execution.scheduler.SqlQuerySchedulerInterface;
import com.facebook.presto.execution.scheduler.StageExecutionAndScheduler;
import com.facebook.presto.execution.scheduler.StreamingPlanSection;
import com.facebook.presto.execution.scheduler.StreamingSubPlan;
import com.facebook.presto.metadata.FunctionAndTypeManager;
import com.facebook.presto.metadata.InternalNodeManager;
import com.facebook.presto.metadata.Metadata;
import com.facebook.presto.spi.ErrorCodeSupplier;
import com.facebook.presto.spi.PrestoException;
import com.facebook.presto.spi.StandardErrorCode;
import com.facebook.presto.spi.VariableAllocator;
import com.facebook.presto.spi.WarningCollector;
import com.facebook.presto.spi.plan.PlanNode;
import com.facebook.presto.spi.plan.PlanNodeIdAllocator;
import com.facebook.presto.sql.parser.SqlParser;
import com.facebook.presto.sql.planner.PlanFragment;
import com.facebook.presto.sql.planner.PlanFragmenterUtils;
import com.facebook.presto.sql.planner.SchedulingOrderVisitor;
import com.facebook.presto.sql.planner.SplitSourceFactory;
import com.facebook.presto.sql.planner.SubPlan;
import com.facebook.presto.sql.planner.TypeProvider;
import com.facebook.presto.sql.planner.optimizations.PlanOptimizer;
import com.facebook.presto.sql.planner.plan.PlanFragmentId;
import com.facebook.presto.sql.planner.planPrinter.PlanPrinter;
import com.facebook.presto.sql.planner.sanity.PlanChecker;
import com.google.common.base.Preconditions;
import com.google.common.base.Verify;
import com.google.common.base.VerifyException;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableListMultimap;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
import com.google.common.collect.ListMultimap;
import com.google.common.collect.Streams;
import com.google.common.graph.Traverser;
import com.google.common.util.concurrent.ListenableFuture;
import io.airlift.units.DataSize;
import io.airlift.units.Duration;
import java.net.URI;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import java.util.stream.Stream;

public class SqlQueryScheduler
implements SqlQuerySchedulerInterface {
    private static final Logger log = Logger.get(SqlQueryScheduler.class);
    private final LocationFactory locationFactory;
    private final ExecutionPolicy executionPolicy;
    private final ExecutorService executor;
    private final SplitSchedulerStats schedulerStats;
    private final SectionExecutionFactory sectionExecutionFactory;
    private final RemoteTaskFactory remoteTaskFactory;
    private final SplitSourceFactory splitSourceFactory;
    private final InternalNodeManager nodeManager;
    private final Session session;
    private final QueryStateMachine queryStateMachine;
    private final AtomicReference<SubPlan> plan = new AtomicReference();
    private final FunctionAndTypeManager functionAndTypeManager;
    private final List<PlanOptimizer> runtimePlanOptimizers;
    private final WarningCollector warningCollector;
    private final PlanNodeIdAllocator idAllocator;
    private final VariableAllocator variableAllocator;
    private final Set<StageId> runtimeOptimizedStages = Collections.synchronizedSet(new HashSet());
    private final PlanChecker planChecker;
    private final Metadata metadata;
    private final SqlParser sqlParser;
    private final StreamingPlanSection sectionedPlan;
    private final boolean summarizeTaskInfo;
    private final int maxConcurrentMaterializations;
    private final int maxStageRetries;
    private final Map<StageId, List<SectionExecution>> sectionExecutions = new ConcurrentHashMap<StageId, List<SectionExecution>>();
    private final AtomicBoolean started = new AtomicBoolean();
    private final AtomicBoolean scheduling = new AtomicBoolean();
    private final AtomicInteger retriedSections = new AtomicInteger();
    private final PartialResultQueryTaskTracker partialResultQueryTaskTracker;

    public static SqlQueryScheduler createSqlQueryScheduler(LocationFactory locationFactory, ExecutionPolicy executionPolicy, ExecutorService executor, SplitSchedulerStats schedulerStats, SectionExecutionFactory sectionExecutionFactory, RemoteTaskFactory remoteTaskFactory, SplitSourceFactory splitSourceFactory, InternalNodeManager nodeManager, Session session, QueryStateMachine queryStateMachine, SubPlan plan, boolean summarizeTaskInfo, FunctionAndTypeManager functionAndTypeManager, List<PlanOptimizer> runtimePlanOptimizers, WarningCollector warningCollector, PlanNodeIdAllocator idAllocator, VariableAllocator variableAllocator, PlanChecker planChecker, Metadata metadata, SqlParser sqlParser, PartialResultQueryManager partialResultQueriesHandler) {
        SqlQueryScheduler sqlQueryScheduler = new SqlQueryScheduler(locationFactory, executionPolicy, executor, schedulerStats, sectionExecutionFactory, remoteTaskFactory, splitSourceFactory, nodeManager, session, queryStateMachine, plan, summarizeTaskInfo, functionAndTypeManager, runtimePlanOptimizers, warningCollector, idAllocator, variableAllocator, planChecker, metadata, sqlParser, partialResultQueriesHandler);
        sqlQueryScheduler.initialize();
        return sqlQueryScheduler;
    }

    private SqlQueryScheduler(LocationFactory locationFactory, ExecutionPolicy executionPolicy, ExecutorService executor, SplitSchedulerStats schedulerStats, SectionExecutionFactory sectionExecutionFactory, RemoteTaskFactory remoteTaskFactory, SplitSourceFactory splitSourceFactory, InternalNodeManager nodeManager, Session session, QueryStateMachine queryStateMachine, SubPlan plan, boolean summarizeTaskInfo, FunctionAndTypeManager functionAndTypeManager, List<PlanOptimizer> runtimePlanOptimizers, WarningCollector warningCollector, PlanNodeIdAllocator idAllocator, VariableAllocator variableAllocator, PlanChecker planChecker, Metadata metadata, SqlParser sqlParser, PartialResultQueryManager partialResultQueryManager) {
        this.locationFactory = Objects.requireNonNull(locationFactory, "locationFactory is null");
        this.executionPolicy = Objects.requireNonNull(executionPolicy, "schedulerPolicyFactory is null");
        this.executor = Objects.requireNonNull(executor, "executor is null");
        this.schedulerStats = Objects.requireNonNull(schedulerStats, "schedulerStats is null");
        this.sectionExecutionFactory = Objects.requireNonNull(sectionExecutionFactory, "sectionExecutionFactory is null");
        this.remoteTaskFactory = Objects.requireNonNull(remoteTaskFactory, "remoteTaskFactory is null");
        this.splitSourceFactory = Objects.requireNonNull(splitSourceFactory, "splitSourceFactory is null");
        this.nodeManager = Objects.requireNonNull(nodeManager, "nodeManager is null");
        this.session = Objects.requireNonNull(session, "session is null");
        this.queryStateMachine = Objects.requireNonNull(queryStateMachine, "queryStateMachine is null");
        this.functionAndTypeManager = Objects.requireNonNull(functionAndTypeManager, "functionManager is null");
        this.runtimePlanOptimizers = Objects.requireNonNull(runtimePlanOptimizers, "runtimePlanOptimizers is null");
        this.warningCollector = Objects.requireNonNull(warningCollector, "warningCollector is null");
        this.idAllocator = Objects.requireNonNull(idAllocator, "idAllocator is null");
        this.variableAllocator = Objects.requireNonNull(variableAllocator, "variableAllocator is null");
        this.planChecker = Objects.requireNonNull(planChecker, "planChecker is null");
        this.metadata = Objects.requireNonNull(metadata, "metadata is null");
        this.sqlParser = Objects.requireNonNull(sqlParser, "sqlParser is null");
        this.plan.compareAndSet(null, Objects.requireNonNull(plan, "plan is null"));
        this.sectionedPlan = StreamingPlanSection.extractStreamingSections(plan);
        this.summarizeTaskInfo = summarizeTaskInfo;
        this.maxConcurrentMaterializations = SystemSessionProperties.getMaxConcurrentMaterializations(session);
        this.maxStageRetries = SystemSessionProperties.getMaxStageRetries(session);
        this.partialResultQueryTaskTracker = new PartialResultQueryTaskTracker(partialResultQueryManager, SystemSessionProperties.getPartialResultsCompletionRatioThreshold(session), SystemSessionProperties.getPartialResultsMaxExecutionTimeMultiplier(session), warningCollector);
    }

    private void initialize() {
        this.queryStateMachine.addStateChangeListener(newState -> {
            if (newState.isDone()) {
                this.queryStateMachine.updateQueryInfo(Optional.of(this.getStageInfo()));
            }
        });
    }

    @Override
    public void start() {
        if (this.started.compareAndSet(false, true)) {
            this.startScheduling();
        }
    }

    private void startScheduling() {
        if (this.scheduling.get()) {
            return;
        }
        this.executor.submit(this::schedule);
    }

    private void schedule() {
        if (!this.scheduling.compareAndSet(false, true)) {
            return;
        }
        ArrayList scheduledStageExecutions = new ArrayList();
        try {
            SetThreadName ignored = new SetThreadName("Query-%s", new Object[]{this.queryStateMachine.getQueryId()});
            Object object = null;
            try {
                HashSet<StageId> completedStages = new HashSet<StageId>();
                LinkedList<ExecutionSchedule> executionSchedules = new LinkedList<ExecutionSchedule>();
                block34: while (!Thread.currentThread().isInterrupted()) {
                    executionSchedules.removeIf(ExecutionSchedule::isFinished);
                    List<StreamingPlanSection> sectionsReadyForExecution = this.getSectionsReadyForExecution();
                    if (sectionsReadyForExecution.isEmpty() && executionSchedules.isEmpty()) break;
                    List<SectionExecution> sectionExecutions = this.createStageExecutions((List)sectionsReadyForExecution.stream().map(this::tryCostBasedOptimize).collect(ImmutableList.toImmutableList()));
                    if (this.queryStateMachine.isDone()) {
                        sectionExecutions.forEach(SectionExecution::abort);
                        break;
                    }
                    sectionExecutions.forEach(sectionExecution -> scheduledStageExecutions.addAll(sectionExecution.getSectionStages()));
                    sectionExecutions.stream().map(SectionExecution::getSectionStages).map(stages -> this.executionPolicy.createExecutionSchedule(this.session, (Collection<StageExecutionAndScheduler>)stages)).forEach(executionSchedules::add);
                    while (!executionSchedules.isEmpty() && executionSchedules.stream().noneMatch(ExecutionSchedule::isFinished)) {
                        Object executionAndScheduler22;
                        ArrayList blockedStages = new ArrayList();
                        List executionsToSchedule = (List)executionSchedules.stream().flatMap(schedule -> schedule.getStagesToSchedule().stream()).collect(ImmutableList.toImmutableList());
                        block36: for (Object executionAndScheduler22 : executionsToSchedule) {
                            ((StageExecutionAndScheduler)executionAndScheduler22).getStageExecution().beginScheduling();
                            ScheduleResult scheduleResult = ((StageExecutionAndScheduler)executionAndScheduler22).getStageScheduler().schedule();
                            if (SystemSessionProperties.isPartialResultsEnabled(this.session) && ((StageExecutionAndScheduler)executionAndScheduler22).getStageExecution().getFragment().isLeaf()) {
                                for (RemoteTask task : scheduleResult.getNewTasks()) {
                                    this.partialResultQueryTaskTracker.trackTask(task);
                                    task.addFinalTaskInfoListener(this.partialResultQueryTaskTracker::recordTaskFinish);
                                }
                            }
                            if (scheduleResult.isFinished()) {
                                ((StageExecutionAndScheduler)executionAndScheduler22).getStageExecution().schedulingComplete();
                            } else if (!scheduleResult.getBlocked().isDone()) {
                                blockedStages.add(scheduleResult.getBlocked());
                            }
                            ((StageExecutionAndScheduler)executionAndScheduler22).getStageLinkage().processScheduleResults(((StageExecutionAndScheduler)executionAndScheduler22).getStageExecution().getState(), scheduleResult.getNewTasks());
                            this.schedulerStats.getSplitsScheduledPerIteration().add((long)scheduleResult.getSplitsScheduled());
                            if (!scheduleResult.getBlockedReason().isPresent()) continue;
                            switch (scheduleResult.getBlockedReason().get()) {
                                case WRITER_SCALING: {
                                    continue block36;
                                }
                                case WAITING_FOR_SOURCE: {
                                    this.schedulerStats.getWaitingForSource().update(1L);
                                    continue block36;
                                }
                                case SPLIT_QUEUES_FULL: {
                                    this.schedulerStats.getSplitQueuesFull().update(1L);
                                    continue block36;
                                }
                                case MIXED_SPLIT_QUEUES_FULL_AND_WAITING_FOR_SOURCE: {
                                    this.schedulerStats.getMixedSplitQueuesFullAndWaitingForSource().update(1L);
                                    continue block36;
                                }
                                case NO_ACTIVE_DRIVER_GROUP: {
                                    this.schedulerStats.getNoActiveDriverGroup().update(1L);
                                    continue block36;
                                }
                            }
                            throw new UnsupportedOperationException("Unknown blocked reason: " + (Object)((Object)scheduleResult.getBlockedReason().get()));
                        }
                        boolean stageFinishedExecution = false;
                        executionAndScheduler22 = scheduledStageExecutions.iterator();
                        while (executionAndScheduler22.hasNext()) {
                            StageExecutionAndScheduler stageExecutionAndScheduler = (StageExecutionAndScheduler)executionAndScheduler22.next();
                            SqlStageExecution stageExecution = stageExecutionAndScheduler.getStageExecution();
                            StageId stageId = stageExecution.getStageExecutionId().getStageId();
                            if (completedStages.contains(stageId) || !stageExecution.getState().isDone()) continue;
                            stageExecutionAndScheduler.getStageLinkage().processScheduleResults(stageExecution.getState(), (Set<RemoteTask>)ImmutableSet.of());
                            completedStages.add(stageId);
                            stageFinishedExecution = true;
                        }
                        if (stageFinishedExecution) continue block34;
                        if (blockedStages.isEmpty()) continue;
                        Throwable throwable = null;
                        try (TimeStat.BlockTimer timer = this.schedulerStats.getSleepTime().time();){
                            MoreFutures.tryGetFutureValue((Future)MoreFutures.whenAnyComplete(blockedStages), (int)1, (TimeUnit)TimeUnit.SECONDS);
                        }
                        catch (Throwable throwable2) {
                            Throwable throwable3 = throwable2;
                            throw throwable2;
                        }
                        for (ListenableFuture listenableFuture : blockedStages) {
                            listenableFuture.cancel(true);
                        }
                    }
                }
                for (StageExecutionAndScheduler stageExecutionAndScheduler : scheduledStageExecutions) {
                    StageExecutionState state = stageExecutionAndScheduler.getStageExecution().getState();
                    if (state == StageExecutionState.SCHEDULED || state == StageExecutionState.RUNNING || state.isDone()) continue;
                    throw new PrestoException((ErrorCodeSupplier)StandardErrorCode.GENERIC_INTERNAL_ERROR, String.format("Scheduling is complete, but stage execution %s is in state %s", new Object[]{stageExecutionAndScheduler.getStageExecution().getStageExecutionId(), state}));
                }
                this.scheduling.set(false);
                this.partialResultQueryTaskTracker.completeTaskScheduling();
                if (!this.getSectionsReadyForExecution().isEmpty()) {
                    this.startScheduling();
                }
            }
            catch (Throwable completedStages) {
                object = completedStages;
                throw completedStages;
            }
            finally {
                if (ignored != null) {
                    if (object != null) {
                        try {
                            ignored.close();
                        }
                        catch (Throwable completedStages) {
                            ((Throwable)object).addSuppressed(completedStages);
                        }
                    } else {
                        ignored.close();
                    }
                }
            }
        }
        catch (Throwable t) {
            this.scheduling.set(false);
            this.queryStateMachine.transitionToFailed(t);
            throw t;
        }
        finally {
            RuntimeException closeError = new RuntimeException();
            for (StageExecutionAndScheduler stageExecutionAndScheduler : scheduledStageExecutions) {
                try {
                    stageExecutionAndScheduler.getStageScheduler().close();
                }
                catch (Throwable t) {
                    this.queryStateMachine.transitionToFailed(t);
                    if (closeError == t) continue;
                    closeError.addSuppressed(t);
                }
            }
            if (closeError.getSuppressed().length > 0) {
                throw closeError;
            }
        }
    }

    private StreamingPlanSection tryCostBasedOptimize(StreamingPlanSection section) {
        if (!SystemSessionProperties.isRuntimeOptimizerEnabled(this.session) || section.getChildren().isEmpty()) {
            return section;
        }
        HashMap<PlanFragment, PlanFragment> oldToNewFragment = new HashMap<PlanFragment, PlanFragment>();
        Streams.stream((Iterable)Traverser.forTree(StreamingSubPlan::getChildren).depthFirstPreOrder((Object)section.getPlan())).forEach(currentSubPlan -> {
            Optional<PlanFragment> newPlanFragment = this.performRuntimeOptimizations((StreamingSubPlan)currentSubPlan);
            if (newPlanFragment.isPresent()) {
                this.planChecker.validatePlanFragment(newPlanFragment.get().getRoot(), this.session, this.metadata, this.sqlParser, TypeProvider.viewOf(this.variableAllocator.getVariables()), this.warningCollector);
                oldToNewFragment.put(currentSubPlan.getFragment(), newPlanFragment.get());
            }
        });
        if (oldToNewFragment.isEmpty()) {
            return section;
        }
        oldToNewFragment.forEach((oldFragment, newFragment) -> this.runtimeOptimizedStages.add(this.getStageId(oldFragment.getId())));
        this.updatePlan(oldToNewFragment);
        log.debug("Invoked CBO during runtime, optimized stage IDs: " + oldToNewFragment.keySet().stream().map(PlanFragment::getId).map(PlanFragmentId::toString).collect(Collectors.joining(", ")));
        return new StreamingPlanSection(this.rewriteStreamingSubPlan(section.getPlan(), oldToNewFragment), section.getChildren());
    }

    private Optional<PlanFragment> performRuntimeOptimizations(StreamingSubPlan subPlan) {
        PlanFragment fragment = subPlan.getFragment();
        PlanNode newRoot = fragment.getRoot();
        for (PlanOptimizer optimizer : this.runtimePlanOptimizers) {
            newRoot = optimizer.optimize(newRoot, this.session, TypeProvider.viewOf(this.variableAllocator.getVariables()), this.variableAllocator, this.idAllocator, this.warningCollector).getPlanNode();
        }
        if (newRoot != fragment.getRoot()) {
            StatsAndCosts estimatedStatsAndCosts = fragment.getStatsAndCosts();
            return Optional.of(new PlanFragment(fragment.getId(), newRoot, fragment.getVariables(), fragment.getPartitioning(), SchedulingOrderVisitor.scheduleOrder(newRoot), fragment.getPartitioningScheme(), fragment.getStageExecutionDescriptor(), fragment.isOutputTableWriterFragment(), estimatedStatsAndCosts, Optional.of(PlanPrinter.jsonFragmentPlan(newRoot, fragment.getVariables(), estimatedStatsAndCosts, this.functionAndTypeManager, this.session))));
        }
        return Optional.empty();
    }

    private void updatePlan(Map<PlanFragment, PlanFragment> oldToNewFragments) {
        this.plan.getAndUpdate(value -> this.rewritePlan((SubPlan)value, oldToNewFragments));
    }

    private SubPlan rewritePlan(SubPlan root, Map<PlanFragment, PlanFragment> oldToNewFragments) {
        ImmutableList.Builder children = ImmutableList.builder();
        for (SubPlan child : root.getChildren()) {
            children.add((Object)this.rewritePlan(child, oldToNewFragments));
        }
        if (oldToNewFragments.containsKey(root.getFragment())) {
            return new SubPlan(oldToNewFragments.get(root.getFragment()), (List<SubPlan>)children.build());
        }
        return new SubPlan(root.getFragment(), (List<SubPlan>)children.build());
    }

    private StreamingSubPlan rewriteStreamingSubPlan(StreamingSubPlan root, Map<PlanFragment, PlanFragment> oldToNewFragment) {
        ImmutableList.Builder childrenPlans = ImmutableList.builder();
        for (StreamingSubPlan child : root.getChildren()) {
            childrenPlans.add((Object)this.rewriteStreamingSubPlan(child, oldToNewFragment));
        }
        if (oldToNewFragment.containsKey(root.getFragment())) {
            return new StreamingSubPlan(oldToNewFragment.get(root.getFragment()), (List<StreamingSubPlan>)childrenPlans.build());
        }
        return new StreamingSubPlan(root.getFragment(), (List<StreamingSubPlan>)childrenPlans.build());
    }

    private List<StreamingPlanSection> getSectionsReadyForExecution() {
        long runningPlanSections = Streams.stream((Iterable)Traverser.forTree(StreamingPlanSection::getChildren).depthFirstPreOrder((Object)this.sectionedPlan)).map(section -> this.getLatestSectionExecution(this.getStageId(section.getPlan().getFragment().getId()))).filter(Optional::isPresent).map(Optional::get).filter(SectionExecution::isRunning).count();
        return (List)Streams.stream((Iterable)Traverser.forTree(StreamingPlanSection::getChildren).depthFirstPreOrder((Object)this.sectionedPlan)).filter(this::isReadyForExecution).limit((long)this.maxConcurrentMaterializations - runningPlanSections).collect(ImmutableList.toImmutableList());
    }

    private boolean isReadyForExecution(StreamingPlanSection section) {
        Optional<SectionExecution> sectionExecution = this.getLatestSectionExecution(this.getStageId(section.getPlan().getFragment().getId()));
        if (sectionExecution.isPresent() && (sectionExecution.get().isRunning() || sectionExecution.get().isFinished())) {
            return false;
        }
        for (StreamingPlanSection child : section.getChildren()) {
            Optional<SectionExecution> childSectionExecution = this.getLatestSectionExecution(this.getStageId(child.getPlan().getFragment().getId()));
            if (childSectionExecution.isPresent() && childSectionExecution.get().isFinished()) continue;
            return false;
        }
        return true;
    }

    private Optional<SectionExecution> getLatestSectionExecution(StageId stageId) {
        List<SectionExecution> sectionExecutions = this.sectionExecutions.get(stageId);
        if (sectionExecutions == null || sectionExecutions.isEmpty()) {
            return Optional.empty();
        }
        return Optional.of(Iterables.getLast(sectionExecutions));
    }

    private StageId getStageId(PlanFragmentId fragmentId) {
        return new StageId(this.session.getQueryId(), fragmentId.getId());
    }

    private List<SectionExecution> createStageExecutions(List<StreamingPlanSection> sections) {
        ImmutableList.Builder result = ImmutableList.builder();
        for (StreamingPlanSection section : sections) {
            ExchangeLocationsConsumer locationsConsumer;
            OutputBuffers outputBuffers;
            Optional<int[]> bucketToPartition;
            StageId sectionId = this.getStageId(section.getPlan().getFragment().getId());
            List attempts = this.sectionExecutions.computeIfAbsent(sectionId, ignored -> new CopyOnWriteArrayList());
            Verify.verify((attempts.isEmpty() || ((SectionExecution)Iterables.getLast((Iterable)attempts)).isFailed() ? 1 : 0) != 0, (String)"Non-failed sectionExecutions already exists", (Object[])new Object[0]);
            PlanFragment sectionRootFragment = section.getPlan().getFragment();
            if (PlanFragmenterUtils.isRootFragment(sectionRootFragment)) {
                bucketToPartition = Optional.of(new int[1]);
                outputBuffers = OutputBuffers.createInitialEmptyOutputBuffers(sectionRootFragment.getPartitioningScheme().getPartitioning().getHandle()).withBuffer(new OutputBuffers.OutputBufferId(0), 0).withNoMoreBufferIds();
                OutputBuffers.OutputBufferId rootBufferId = (OutputBuffers.OutputBufferId)Iterables.getOnlyElement(outputBuffers.getBuffers().keySet());
                locationsConsumer = (fragmentId, tasks, noMoreExchangeLocations) -> SqlQueryScheduler.updateQueryOutputLocations(this.queryStateMachine, rootBufferId, tasks, noMoreExchangeLocations);
            } else {
                bucketToPartition = Optional.empty();
                outputBuffers = OutputBuffers.createDiscardingOutputBuffers();
                locationsConsumer = (fragmentId, tasks, noMoreExchangeLocations) -> {};
            }
            int attemptId = attempts.size();
            SectionExecution sectionExecution = this.sectionExecutionFactory.createSectionExecutions(this.session, section, locationsConsumer, bucketToPartition, outputBuffers, this.summarizeTaskInfo, this.remoteTaskFactory, this.splitSourceFactory, attemptId);
            this.addStateChangeListeners(sectionExecution);
            attempts.add(sectionExecution);
            result.add((Object)sectionExecution);
        }
        return result.build();
    }

    private static void updateQueryOutputLocations(QueryStateMachine queryStateMachine, OutputBuffers.OutputBufferId rootBufferId, Set<RemoteTask> tasks, boolean noMoreExchangeLocations) {
        Map bufferLocations = (Map)tasks.stream().collect(ImmutableMap.toImmutableMap(task -> SqlQueryScheduler.getBufferLocation(task, rootBufferId), RemoteTask::getTaskId));
        queryStateMachine.updateOutputLocations(bufferLocations, noMoreExchangeLocations);
    }

    private static URI getBufferLocation(RemoteTask remoteTask, OutputBuffers.OutputBufferId rootBufferId) {
        URI location = remoteTask.getTaskStatus().getSelf();
        return HttpUriBuilder.uriBuilderFrom((URI)location).appendPath("results").appendPath(rootBufferId.toString()).build();
    }

    private void addStateChangeListeners(SectionExecution sectionExecution) {
        for (StageExecutionAndScheduler stageExecutionAndScheduler : sectionExecution.getSectionStages()) {
            SqlStageExecution stageExecution = stageExecutionAndScheduler.getStageExecution();
            if (PlanFragmenterUtils.isRootFragment(stageExecution.getFragment())) {
                stageExecution.addStateChangeListener(state -> {
                    if (state == StageExecutionState.FINISHED) {
                        this.queryStateMachine.transitionToFinishing();
                    } else if (state == StageExecutionState.CANCELED) {
                        this.queryStateMachine.transitionToCanceled();
                    }
                });
            }
            stageExecution.addStateChangeListener(state -> {
                if (this.queryStateMachine.isDone()) {
                    return;
                }
                if (state == StageExecutionState.FAILED) {
                    ExecutionFailureInfo failureInfo = stageExecution.getStageExecutionInfo().getFailureCause().orElseThrow(() -> new VerifyException(String.format("stage execution failed, but the failure info is missing: %s", stageExecution.getStageExecutionId())));
                    RuntimeException failureException = failureInfo.toException();
                    boolean isRootSection = PlanFragmenterUtils.isRootFragment(sectionExecution.getRootStage().getStageExecution().getFragment());
                    if (isRootSection) {
                        this.queryStateMachine.transitionToFailed(failureException);
                        return;
                    }
                    if (this.retriedSections.get() >= this.maxStageRetries) {
                        this.queryStateMachine.transitionToFailed(failureException);
                        return;
                    }
                    if (!SqlStageExecution.RECOVERABLE_ERROR_CODES.contains(failureInfo.getErrorCode())) {
                        this.queryStateMachine.transitionToFailed(failureException);
                        return;
                    }
                    try {
                        if (sectionExecution.abort()) {
                            this.retriedSections.incrementAndGet();
                            this.nodeManager.refreshNodes();
                            this.startScheduling();
                        }
                    }
                    catch (Throwable t) {
                        if (failureException != t) {
                            failureException.addSuppressed(t);
                        }
                        this.queryStateMachine.transitionToFailed(failureException);
                    }
                } else if (state == StageExecutionState.FINISHED) {
                    this.startScheduling();
                } else if (this.queryStateMachine.getQueryState() == QueryState.STARTING && stageExecution.hasTasks()) {
                    this.queryStateMachine.transitionToRunning();
                }
            });
            stageExecution.addFinalStageInfoListener(status -> this.queryStateMachine.updateQueryInfo(Optional.of(this.getStageInfo())));
        }
    }

    @Override
    public long getUserMemoryReservation() {
        return this.getAllStagesExecutions().mapToLong(SqlStageExecution::getUserMemoryReservation).sum();
    }

    @Override
    public long getTotalMemoryReservation() {
        return this.getAllStagesExecutions().mapToLong(SqlStageExecution::getTotalMemoryReservation).sum();
    }

    @Override
    public Duration getTotalCpuTime() {
        long millis = this.getAllStagesExecutions().map(SqlStageExecution::getTotalCpuTime).mapToLong(Duration::toMillis).sum();
        return new Duration((double)millis, TimeUnit.MILLISECONDS);
    }

    @Override
    public DataSize getRawInputDataSize() {
        long rawInputDataSize = this.getAllStagesExecutions().map(SqlStageExecution::getRawInputDataSize).mapToLong(DataSize::toBytes).sum();
        return DataSize.succinctBytes((long)rawInputDataSize);
    }

    @Override
    public long getOutputPositions() {
        return this.getStageInfo().getLatestAttemptExecutionInfo().getStats().getOutputPositions();
    }

    @Override
    public DataSize getOutputDataSize() {
        return this.getStageInfo().getLatestAttemptExecutionInfo().getStats().getOutputDataSize();
    }

    @Override
    public BasicStageExecutionStats getBasicStageStats() {
        List stageStats = (List)this.getAllStagesExecutions().map(SqlStageExecution::getBasicStageStats).collect(ImmutableList.toImmutableList());
        return BasicStageExecutionStats.aggregateBasicStageStats(stageStats);
    }

    private Stream<SqlStageExecution> getAllStagesExecutions() {
        return this.sectionExecutions.values().stream().flatMap(Collection::stream).flatMap(sectionExecution -> sectionExecution.getSectionStages().stream()).map(StageExecutionAndScheduler::getStageExecution);
    }

    @Override
    public StageInfo getStageInfo() {
        ListMultimap<StageId, SqlStageExecution> stageExecutions = this.getStageExecutions();
        return this.buildStageInfo(this.plan.get(), stageExecutions);
    }

    private StageInfo buildStageInfo(SubPlan subPlan, ListMultimap<StageId, SqlStageExecution> stageExecutions) {
        StageId stageId = this.getStageId(subPlan.getFragment().getId());
        List attempts = stageExecutions.get((Object)stageId);
        StageExecutionInfo latestAttemptInfo = attempts.isEmpty() ? StageExecutionInfo.unscheduledExecutionInfo(stageId.getId(), this.queryStateMachine.isDone()) : ((SqlStageExecution)Iterables.getLast((Iterable)attempts)).getStageExecutionInfo();
        ImmutableList previousAttemptInfos = attempts.size() < 2 ? ImmutableList.of() : (List)attempts.subList(0, attempts.size() - 1).stream().map(SqlStageExecution::getStageExecutionInfo).collect(ImmutableList.toImmutableList());
        return new StageInfo(stageId, this.locationFactory.createStageLocation(stageId), Optional.of(subPlan.getFragment()), latestAttemptInfo, (List<StageExecutionInfo>)previousAttemptInfos, (List)subPlan.getChildren().stream().map(plan -> this.buildStageInfo((SubPlan)plan, stageExecutions)).collect(ImmutableList.toImmutableList()), this.runtimeOptimizedStages.contains(stageId));
    }

    private ListMultimap<StageId, SqlStageExecution> getStageExecutions() {
        ImmutableListMultimap.Builder result = ImmutableListMultimap.builder();
        for (Collection collection : this.sectionExecutions.values()) {
            for (SectionExecution sectionExecution : collection) {
                for (StageExecutionAndScheduler stageExecution : sectionExecution.getSectionStages()) {
                    result.put((Object)stageExecution.getStageExecution().getStageExecutionId().getStageId(), (Object)stageExecution.getStageExecution());
                }
            }
        }
        return result.build();
    }

    @Override
    public void cancelStage(StageId stageId) {
        try (SetThreadName ignored = new SetThreadName("Query-%s", new Object[]{this.queryStateMachine.getQueryId()});){
            this.getAllStagesExecutions().filter(execution -> execution.getStageExecutionId().getStageId().equals(stageId)).forEach(SqlStageExecution::cancel);
        }
    }

    @Override
    public void abort() {
        try (SetThreadName ignored = new SetThreadName("Query-%s", new Object[]{this.queryStateMachine.getQueryId()});){
            Preconditions.checkState((boolean)this.queryStateMachine.isDone(), (String)"query scheduler is expected to be aborted only if the query is finished: %s", (Object)((Object)this.queryStateMachine.getQueryState()));
            this.getAllStagesExecutions().forEach(SqlStageExecution::abort);
        }
    }
}

