/*
 * Decompiled with CFR 0.152.
 */
package com.facebook.presto.execution.scheduler;

import com.facebook.airlift.concurrent.MoreFutures;
import com.facebook.airlift.concurrent.SetThreadName;
import com.facebook.airlift.log.Logger;
import com.facebook.airlift.stats.TimeStat;
import com.facebook.presto.Session;
import com.facebook.presto.SystemSessionProperties;
import com.facebook.presto.cost.StatsAndCosts;
import com.facebook.presto.execution.BasicStageExecutionStats;
import com.facebook.presto.execution.LocationFactory;
import com.facebook.presto.execution.PartialResultQueryManager;
import com.facebook.presto.execution.QueryState;
import com.facebook.presto.execution.QueryStateMachine;
import com.facebook.presto.execution.RemoteTask;
import com.facebook.presto.execution.RemoteTaskFactory;
import com.facebook.presto.execution.SqlStageExecution;
import com.facebook.presto.execution.StageExecutionInfo;
import com.facebook.presto.execution.StageExecutionState;
import com.facebook.presto.execution.StageId;
import com.facebook.presto.execution.StageInfo;
import com.facebook.presto.execution.buffer.OutputBuffers;
import com.facebook.presto.execution.scheduler.CTEMaterializationTracker;
import com.facebook.presto.execution.scheduler.ExchangeLocationsConsumer;
import com.facebook.presto.execution.scheduler.ExecutionPolicy;
import com.facebook.presto.execution.scheduler.ExecutionSchedule;
import com.facebook.presto.execution.scheduler.PartialResultQueryTaskTracker;
import com.facebook.presto.execution.scheduler.ScheduleResult;
import com.facebook.presto.execution.scheduler.SectionExecution;
import com.facebook.presto.execution.scheduler.SectionExecutionFactory;
import com.facebook.presto.execution.scheduler.SplitSchedulerStats;
import com.facebook.presto.execution.scheduler.SqlQuerySchedulerInterface;
import com.facebook.presto.execution.scheduler.StageExecutionAndScheduler;
import com.facebook.presto.execution.scheduler.StreamingPlanSection;
import com.facebook.presto.execution.scheduler.StreamingSubPlan;
import com.facebook.presto.metadata.FunctionAndTypeManager;
import com.facebook.presto.metadata.Metadata;
import com.facebook.presto.spi.ErrorCodeSupplier;
import com.facebook.presto.spi.PrestoException;
import com.facebook.presto.spi.StandardErrorCode;
import com.facebook.presto.spi.VariableAllocator;
import com.facebook.presto.spi.WarningCollector;
import com.facebook.presto.spi.plan.PlanFragmentId;
import com.facebook.presto.spi.plan.PlanNode;
import com.facebook.presto.spi.plan.PlanNodeIdAllocator;
import com.facebook.presto.sql.parser.SqlParser;
import com.facebook.presto.sql.planner.PlanFragment;
import com.facebook.presto.sql.planner.SchedulingOrderVisitor;
import com.facebook.presto.sql.planner.SplitSourceFactory;
import com.facebook.presto.sql.planner.SubPlan;
import com.facebook.presto.sql.planner.TypeProvider;
import com.facebook.presto.sql.planner.optimizations.PlanOptimizer;
import com.facebook.presto.sql.planner.planPrinter.PlanPrinter;
import com.facebook.presto.sql.planner.sanity.PlanChecker;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
import com.google.common.collect.Streams;
import com.google.common.graph.Traverser;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
import com.sun.management.ThreadMXBean;
import io.airlift.units.Duration;
import java.lang.management.ManagementFactory;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.http.client.utils.URIBuilder;

public class SqlQueryScheduler
implements SqlQuerySchedulerInterface {
    private static final Logger log = Logger.get(SqlQueryScheduler.class);
    private static final ThreadMXBean THREAD_MX_BEAN = (ThreadMXBean)ManagementFactory.getThreadMXBean();
    private final LocationFactory locationFactory;
    private final ExecutionPolicy executionPolicy;
    private final SplitSchedulerStats schedulerStats;
    private final QueryStateMachine queryStateMachine;
    private final AtomicReference<SubPlan> plan = new AtomicReference();
    private final StreamingPlanSection sectionedPlan;
    private final StageId rootStageId;
    private final boolean summarizeTaskInfo;
    private final int maxConcurrentMaterializations;
    private final Session session;
    private final FunctionAndTypeManager functionAndTypeManager;
    private final List<PlanOptimizer> runtimePlanOptimizers;
    private final WarningCollector warningCollector;
    private final PlanNodeIdAllocator idAllocator;
    private final VariableAllocator variableAllocator;
    private final SectionExecutionFactory sectionExecutionFactory;
    private final RemoteTaskFactory remoteTaskFactory;
    private final SplitSourceFactory splitSourceFactory;
    private final Set<StageId> runtimeOptimizedStages = Collections.synchronizedSet(new HashSet());
    private final PlanChecker planChecker;
    private final Metadata metadata;
    private final Map<StageId, StageExecutionAndScheduler> stageExecutions = new ConcurrentHashMap<StageId, StageExecutionAndScheduler>();
    private final ExecutorService executor;
    private final AtomicBoolean started = new AtomicBoolean();
    private final AtomicBoolean scheduling = new AtomicBoolean();
    private final PartialResultQueryTaskTracker partialResultQueryTaskTracker;
    private final CTEMaterializationTracker cteMaterializationTracker = new CTEMaterializationTracker();

    public static SqlQueryScheduler createSqlQueryScheduler(LocationFactory locationFactory, ExecutionPolicy executionPolicy, ExecutorService queryExecutor, SplitSchedulerStats schedulerStats, SectionExecutionFactory sectionExecutionFactory, RemoteTaskFactory remoteTaskFactory, SplitSourceFactory splitSourceFactory, Session session, FunctionAndTypeManager functionAndTypeManager, QueryStateMachine queryStateMachine, SubPlan plan, OutputBuffers rootOutputBuffers, boolean summarizeTaskInfo, List<PlanOptimizer> runtimePlanOptimizers, WarningCollector warningCollector, PlanNodeIdAllocator idAllocator, VariableAllocator variableAllocator, PlanChecker planChecker, Metadata metadata, SqlParser sqlParser, PartialResultQueryManager partialResultQueryManager) {
        SqlQueryScheduler sqlQueryScheduler = new SqlQueryScheduler(locationFactory, executionPolicy, queryExecutor, schedulerStats, sectionExecutionFactory, remoteTaskFactory, splitSourceFactory, session, functionAndTypeManager, queryStateMachine, plan, summarizeTaskInfo, rootOutputBuffers, runtimePlanOptimizers, warningCollector, idAllocator, variableAllocator, planChecker, metadata, sqlParser, partialResultQueryManager);
        sqlQueryScheduler.initialize();
        return sqlQueryScheduler;
    }

    private SqlQueryScheduler(LocationFactory locationFactory, ExecutionPolicy executionPolicy, ExecutorService queryExecutor, SplitSchedulerStats schedulerStats, SectionExecutionFactory sectionExecutionFactory, RemoteTaskFactory remoteTaskFactory, SplitSourceFactory splitSourceFactory, Session session, FunctionAndTypeManager functionAndTypeManager, QueryStateMachine queryStateMachine, SubPlan plan, boolean summarizeTaskInfo, OutputBuffers rootOutputBuffers, List<PlanOptimizer> runtimePlanOptimizers, WarningCollector warningCollector, PlanNodeIdAllocator idAllocator, VariableAllocator variableAllocator, PlanChecker planChecker, Metadata metadata, SqlParser sqlParser, PartialResultQueryManager partialResultQueryManager) {
        this.locationFactory = Objects.requireNonNull(locationFactory, "locationFactory is null");
        this.executionPolicy = Objects.requireNonNull(executionPolicy, "schedulerPolicyFactory is null");
        this.executor = queryExecutor;
        this.schedulerStats = Objects.requireNonNull(schedulerStats, "schedulerStats is null");
        this.queryStateMachine = Objects.requireNonNull(queryStateMachine, "queryStateMachine is null");
        this.plan.compareAndSet(null, Objects.requireNonNull(plan, "plan is null"));
        this.session = Objects.requireNonNull(session, "session is null");
        this.functionAndTypeManager = Objects.requireNonNull(functionAndTypeManager, "functionManager is null");
        this.runtimePlanOptimizers = Objects.requireNonNull(runtimePlanOptimizers, "runtimePlanOptimizers is null");
        this.warningCollector = Objects.requireNonNull(warningCollector, "warningCollector is null");
        this.idAllocator = Objects.requireNonNull(idAllocator, "idAllocator is null");
        this.variableAllocator = Objects.requireNonNull(variableAllocator, "variableAllocator is null");
        this.planChecker = Objects.requireNonNull(planChecker, "planChecker is null");
        this.metadata = Objects.requireNonNull(metadata, "metadata is null");
        this.sectionExecutionFactory = Objects.requireNonNull(sectionExecutionFactory, "sectionExecutionFactory is null");
        this.remoteTaskFactory = Objects.requireNonNull(remoteTaskFactory, "remoteTaskFactory is null");
        this.splitSourceFactory = Objects.requireNonNull(splitSourceFactory, "splitSourceFactory is null");
        this.sectionedPlan = StreamingPlanSection.extractStreamingSections(plan);
        this.summarizeTaskInfo = summarizeTaskInfo;
        OutputBuffers.OutputBufferId rootBufferId = (OutputBuffers.OutputBufferId)Iterables.getOnlyElement(rootOutputBuffers.getBuffers().keySet());
        List<StageExecutionAndScheduler> stageExecutions = this.createStageExecutions(sectionExecutionFactory, (fragmentId, tasks, noMoreExchangeLocations) -> SqlQueryScheduler.updateQueryOutputLocations(queryStateMachine, rootBufferId, tasks, noMoreExchangeLocations), this.sectionedPlan, Optional.of(new int[1]), rootOutputBuffers, remoteTaskFactory, splitSourceFactory, session);
        this.rootStageId = ((StageExecutionAndScheduler)Iterables.getLast(stageExecutions)).getStageExecution().getStageExecutionId().getStageId();
        stageExecutions.stream().forEach(execution -> this.stageExecutions.put(execution.getStageExecution().getStageExecutionId().getStageId(), (StageExecutionAndScheduler)execution));
        this.maxConcurrentMaterializations = SystemSessionProperties.getMaxConcurrentMaterializations(session);
        this.partialResultQueryTaskTracker = new PartialResultQueryTaskTracker(partialResultQueryManager, SystemSessionProperties.getPartialResultsCompletionRatioThreshold(session), SystemSessionProperties.getPartialResultsMaxExecutionTimeMultiplier(session), warningCollector);
    }

    private void initialize() {
        SqlStageExecution rootStage = this.stageExecutions.get(this.rootStageId).getStageExecution();
        rootStage.addStateChangeListener(state -> {
            if (state == StageExecutionState.FINISHED) {
                this.queryStateMachine.transitionToFinishing();
            } else if (state == StageExecutionState.CANCELED) {
                this.queryStateMachine.transitionToCanceled();
            }
        });
        for (StageExecutionAndScheduler stageExecutionInfo : this.stageExecutions.values()) {
            SqlStageExecution stageExecution = stageExecutionInfo.getStageExecution();
            if (stageExecution.isCTETableFinishStage()) {
                stageExecution.addStateChangeListener(state -> {
                    if (state == StageExecutionState.FINISHED) {
                        String cteName = stageExecution.getCTEWriterId();
                        log.debug("CTE write completed for: " + cteName);
                        this.cteMaterializationTracker.markCTEAsMaterialized(cteName);
                    }
                });
            }
            stageExecution.addStateChangeListener(state -> {
                if (this.queryStateMachine.isDone()) {
                    return;
                }
                if (state == StageExecutionState.FAILED) {
                    this.queryStateMachine.transitionToFailed(stageExecution.getStageExecutionInfo().getFailureCause().get().toException());
                } else if (state == StageExecutionState.ABORTED) {
                    this.queryStateMachine.transitionToFailed(new PrestoException((ErrorCodeSupplier)StandardErrorCode.GENERIC_INTERNAL_ERROR, "Query stage was aborted"));
                } else if (state == StageExecutionState.FINISHED) {
                    this.startScheduling();
                } else if (this.queryStateMachine.getQueryState() == QueryState.STARTING && stageExecution.hasTasks()) {
                    this.queryStateMachine.transitionToRunning();
                }
            });
            stageExecution.addFinalStageInfoListener(status -> this.queryStateMachine.updateQueryInfo(Optional.of(this.getStageInfo())));
        }
        this.queryStateMachine.addStateChangeListener(newState -> {
            if (newState.isDone()) {
                this.queryStateMachine.updateQueryInfo(Optional.of(this.getStageInfo()));
            }
        });
    }

    private static void updateQueryOutputLocations(QueryStateMachine queryStateMachine, OutputBuffers.OutputBufferId rootBufferId, Set<RemoteTask> tasks, boolean noMoreExchangeLocations) {
        Map bufferLocations = (Map)tasks.stream().collect(ImmutableMap.toImmutableMap(task -> SqlQueryScheduler.getBufferLocation(task, rootBufferId), RemoteTask::getTaskId));
        queryStateMachine.updateOutputLocations(bufferLocations, noMoreExchangeLocations);
    }

    private static URI getBufferLocation(RemoteTask remoteTask, OutputBuffers.OutputBufferId rootBufferId) {
        URI location = remoteTask.getTaskStatus().getSelf();
        try {
            URIBuilder builder = new URIBuilder(location);
            List segments = builder.getPathSegments();
            segments.add("results");
            segments.add(rootBufferId.toString());
            builder.setPathSegments(segments);
            return builder.build();
        }
        catch (URISyntaxException e) {
            throw new RuntimeException(e);
        }
    }

    private List<StageExecutionAndScheduler> createStageExecutions(SectionExecutionFactory sectionExecutionFactory, ExchangeLocationsConsumer locationsConsumer, StreamingPlanSection section, Optional<int[]> bucketToPartition, OutputBuffers outputBuffers, RemoteTaskFactory remoteTaskFactory, SplitSourceFactory splitSourceFactory, Session session) {
        ImmutableList.Builder stages = ImmutableList.builder();
        for (StreamingPlanSection childSection : section.getChildren()) {
            ExchangeLocationsConsumer childLocationsConsumer = (fragmentId, tasks, noMoreExchangeLocations) -> {};
            stages.addAll(this.createStageExecutions(sectionExecutionFactory, childLocationsConsumer, childSection, Optional.empty(), OutputBuffers.createDiscardingOutputBuffers(), remoteTaskFactory, splitSourceFactory, session));
        }
        List<StageExecutionAndScheduler> sectionStages = sectionExecutionFactory.createSectionExecutions(session, section, locationsConsumer, bucketToPartition, outputBuffers, this.summarizeTaskInfo, remoteTaskFactory, splitSourceFactory, 0, this.cteMaterializationTracker).getSectionStages();
        stages.addAll(sectionStages);
        return stages.build();
    }

    @Override
    public void start() {
        if (this.started.compareAndSet(false, true)) {
            this.startScheduling();
        }
    }

    private void startScheduling() {
        Objects.requireNonNull(this.stageExecutions);
        if (this.scheduling.get()) {
            return;
        }
        this.executor.submit(this::schedule);
    }

    private void schedule() {
        if (!this.scheduling.compareAndSet(false, true)) {
            return;
        }
        ArrayList scheduledStageExecutions = new ArrayList();
        try (SetThreadName ignored = new SetThreadName("Query-%s", new Object[]{this.queryStateMachine.getQueryId()});){
            HashSet<StageId> completedStages = new HashSet<StageId>();
            LinkedList<ExecutionSchedule> sectionExecutionSchedules = new LinkedList<ExecutionSchedule>();
            block27: while (!Thread.currentThread().isInterrupted()) {
                sectionExecutionSchedules.removeIf(ExecutionSchedule::isFinished);
                List<StreamingPlanSection> sectionsReadyForExecution = this.getSectionsReadyForExecution();
                if (sectionsReadyForExecution.isEmpty() && sectionExecutionSchedules.isEmpty()) break;
                List<List<StageExecutionAndScheduler>> sectionStageExecutions = this.getStageExecutions(sectionsReadyForExecution);
                sectionStageExecutions.forEach(scheduledStageExecutions::addAll);
                sectionStageExecutions.stream().map(executionInfos -> (ImmutableList)executionInfos.stream().collect(ImmutableList.toImmutableList())).map(stages -> this.executionPolicy.createExecutionSchedule(this.session, (Collection<StageExecutionAndScheduler>)stages)).forEach(sectionExecutionSchedules::add);
                while (sectionExecutionSchedules.stream().noneMatch(ExecutionSchedule::isFinished)) {
                    Object stageExecutionAndScheduler22;
                    ArrayList blockedStages = new ArrayList();
                    List executionsToSchedule = (List)sectionExecutionSchedules.stream().flatMap(schedule -> schedule.getStagesToSchedule().stream()).collect(ImmutableList.toImmutableList());
                    for (Object stageExecutionAndScheduler22 : executionsToSchedule) {
                        long l = THREAD_MX_BEAN.getCurrentThreadCpuTime();
                        long startWallNanos = System.nanoTime();
                        SqlStageExecution stageExecution = ((StageExecutionAndScheduler)stageExecutionAndScheduler22).getStageExecution();
                        stageExecution.beginScheduling();
                        ScheduleResult result = ((StageExecutionAndScheduler)stageExecutionAndScheduler22).getStageScheduler().schedule();
                        if (SystemSessionProperties.isPartialResultsEnabled(this.session) && ((StageExecutionAndScheduler)stageExecutionAndScheduler22).getStageExecution().getFragment().isLeaf()) {
                            for (RemoteTask task : result.getNewTasks()) {
                                this.partialResultQueryTaskTracker.trackTask(task);
                                task.addFinalTaskInfoListener(this.partialResultQueryTaskTracker::recordTaskFinish);
                            }
                        }
                        if (result.isFinished()) {
                            stageExecution.schedulingComplete();
                        } else if (!result.getBlocked().isDone()) {
                            blockedStages.add(result.getBlocked());
                        }
                        ((StageExecutionAndScheduler)stageExecutionAndScheduler22).getStageLinkage().processScheduleResults(stageExecution.getState(), result.getNewTasks());
                        this.schedulerStats.getSplitsScheduledPerIteration().add((long)result.getSplitsScheduled());
                        if (result.getBlockedReason().isPresent()) {
                            ScheduleResult.BlockedReason blockedReason = result.getBlockedReason().get();
                            switch (blockedReason) {
                                case WRITER_SCALING: {
                                    break;
                                }
                                case WAITING_FOR_CTE_MATERIALIZATION: {
                                    this.schedulerStats.getWaitingForCTEMaterialization().update(1L);
                                    break;
                                }
                                case WAITING_FOR_SOURCE: {
                                    this.schedulerStats.getWaitingForSource().update(1L);
                                    break;
                                }
                                case SPLIT_QUEUES_FULL: {
                                    this.schedulerStats.getSplitQueuesFull().update(1L);
                                    break;
                                }
                                case MIXED_SPLIT_QUEUES_FULL_AND_WAITING_FOR_SOURCE: {
                                    this.schedulerStats.getMixedSplitQueuesFullAndWaitingForSource().update(1L);
                                    break;
                                }
                                case NO_ACTIVE_DRIVER_GROUP: {
                                    this.schedulerStats.getNoActiveDriverGroup().update(1L);
                                    break;
                                }
                                default: {
                                    throw new UnsupportedOperationException("Unknown blocked reason: " + (Object)((Object)blockedReason));
                                }
                            }
                            if (!result.getBlocked().isDone()) {
                                long startBlockedNanos = System.nanoTime();
                                result.getBlocked().addListener(() -> stageExecution.recordSchedulerBlockedTime(blockedReason, System.nanoTime() - startBlockedNanos), MoreExecutors.directExecutor());
                            }
                        }
                        stageExecution.recordSchedulerRunningTime(THREAD_MX_BEAN.getCurrentThreadCpuTime() - l, System.nanoTime() - startWallNanos);
                    }
                    boolean stageFinishedExecution = false;
                    stageExecutionAndScheduler22 = scheduledStageExecutions.iterator();
                    while (stageExecutionAndScheduler22.hasNext()) {
                        StageExecutionAndScheduler stageExecutionAndScheduler = (StageExecutionAndScheduler)stageExecutionAndScheduler22.next();
                        SqlStageExecution stageExecution = stageExecutionAndScheduler.getStageExecution();
                        StageId stageId = stageExecution.getStageExecutionId().getStageId();
                        if (completedStages.contains(stageId) || !stageExecution.getState().isDone()) continue;
                        stageExecutionAndScheduler.getStageLinkage().processScheduleResults(stageExecution.getState(), (Set<RemoteTask>)ImmutableSet.of());
                        completedStages.add(stageId);
                        stageFinishedExecution = true;
                    }
                    if (stageFinishedExecution) continue block27;
                    if (blockedStages.isEmpty()) continue;
                    try (TimeStat.BlockTimer timer = this.schedulerStats.getSleepTime().time();){
                        MoreFutures.tryGetFutureValue((Future)MoreFutures.whenAnyComplete(blockedStages), (int)1, (TimeUnit)TimeUnit.SECONDS);
                    }
                    for (ListenableFuture listenableFuture : blockedStages) {
                        listenableFuture.cancel(true);
                    }
                }
            }
            for (StageExecutionAndScheduler stageExecutionInfo : scheduledStageExecutions) {
                StageExecutionState state = stageExecutionInfo.getStageExecution().getState();
                if (state == StageExecutionState.SCHEDULED || state == StageExecutionState.RUNNING || state.isDone()) continue;
                throw new PrestoException((ErrorCodeSupplier)StandardErrorCode.GENERIC_INTERNAL_ERROR, String.format("Scheduling is complete, but stage execution %s is in state %s", new Object[]{stageExecutionInfo.getStageExecution().getStageExecutionId(), state}));
            }
            this.scheduling.set(false);
            this.partialResultQueryTaskTracker.completeTaskScheduling();
            if (!this.getSectionsReadyForExecution().isEmpty()) {
                this.startScheduling();
            }
        }
        catch (Throwable t) {
            this.scheduling.set(false);
            this.queryStateMachine.transitionToFailed(t);
            throw t;
        }
        finally {
            RuntimeException closeError = new RuntimeException();
            for (StageExecutionAndScheduler stageExecutionInfo : scheduledStageExecutions) {
                try {
                    stageExecutionInfo.getStageScheduler().close();
                }
                catch (Throwable t) {
                    this.queryStateMachine.transitionToFailed(t);
                    if (closeError == t) continue;
                    closeError.addSuppressed(t);
                }
            }
            if (closeError.getSuppressed().length > 0) {
                throw closeError;
            }
        }
    }

    private List<StreamingPlanSection> getSectionsReadyForExecution() {
        long runningPlanSections = Streams.stream((Iterable)Traverser.forTree(StreamingPlanSection::getChildren).depthFirstPreOrder((Object)this.sectionedPlan)).map(section -> this.getStageExecution(section.getPlan().getFragment().getId()).getState()).filter(state -> !state.isDone() && state != StageExecutionState.PLANNED).count();
        return (List)Streams.stream((Iterable)Traverser.forTree(StreamingPlanSection::getChildren).depthFirstPreOrder((Object)this.sectionedPlan)).filter(this::isReadyForExecution).limit(SystemSessionProperties.isEnhancedCTESchedulingEnabled(this.session) ? Long.MAX_VALUE : (long)this.maxConcurrentMaterializations - runningPlanSections).map(this::tryCostBasedOptimize).collect(ImmutableList.toImmutableList());
    }

    private StreamingPlanSection tryCostBasedOptimize(StreamingPlanSection section) {
        if (!SystemSessionProperties.isRuntimeOptimizerEnabled(this.session) || section.getChildren().isEmpty()) {
            return section;
        }
        HashMap<PlanFragment, PlanFragment> oldToNewFragment = new HashMap<PlanFragment, PlanFragment>();
        Streams.stream((Iterable)Traverser.forTree(StreamingSubPlan::getChildren).depthFirstPreOrder((Object)section.getPlan())).forEach(currentSubPlan -> {
            Optional<PlanFragment> newPlanFragment = this.performRuntimeOptimizations((StreamingSubPlan)currentSubPlan);
            if (newPlanFragment.isPresent()) {
                this.planChecker.validatePlanFragment(newPlanFragment.get(), this.session, this.metadata, this.warningCollector);
                oldToNewFragment.put(currentSubPlan.getFragment(), newPlanFragment.get());
            }
        });
        if (oldToNewFragment.isEmpty()) {
            return section;
        }
        oldToNewFragment.forEach((oldFragment, newFragment) -> this.runtimeOptimizedStages.add(this.getStageId(oldFragment.getId())));
        this.updatePlan(oldToNewFragment);
        this.updateStageExecutions(section, oldToNewFragment);
        log.debug("Invoked CBO during runtime, optimized stage IDs: " + oldToNewFragment.keySet().stream().map(PlanFragment::getId).map(PlanFragmentId::toString).collect(Collectors.joining(", ")));
        return section;
    }

    private Optional<PlanFragment> performRuntimeOptimizations(StreamingSubPlan subPlan) {
        PlanFragment fragment = subPlan.getFragment();
        PlanNode newRoot = fragment.getRoot();
        for (PlanOptimizer optimizer : this.runtimePlanOptimizers) {
            newRoot = optimizer.optimize(newRoot, this.session, TypeProvider.viewOf(this.variableAllocator.getVariables()), this.variableAllocator, this.idAllocator, this.warningCollector).getPlanNode();
        }
        if (newRoot != fragment.getRoot()) {
            Optional<StatsAndCosts> estimatedStatsAndCosts = fragment.getStatsAndCosts();
            return Optional.of(new PlanFragment(fragment.getId(), newRoot, fragment.getVariables(), fragment.getPartitioning(), SchedulingOrderVisitor.scheduleOrder(newRoot), fragment.getPartitioningScheme(), fragment.getStageExecutionDescriptor(), fragment.isOutputTableWriterFragment(), estimatedStatsAndCosts, Optional.of(PlanPrinter.jsonFragmentPlan(newRoot, fragment.getVariables(), estimatedStatsAndCosts.orElse(StatsAndCosts.empty()), this.functionAndTypeManager, this.session))));
        }
        return Optional.empty();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void updateStageExecutions(StreamingPlanSection section, Map<PlanFragment, PlanFragment> oldToNewFragment) {
        ExchangeLocationsConsumer locationsConsumer;
        OutputBuffers outputBuffers;
        Optional<int[]> bucketToPartition;
        StreamingPlanSection newSection = new StreamingPlanSection(this.rewriteStreamingSubPlan(section.getPlan(), oldToNewFragment), section.getChildren());
        PlanFragment sectionRootFragment = newSection.getPlan().getFragment();
        if (SqlQueryScheduler.isRootFragment(sectionRootFragment)) {
            bucketToPartition = Optional.of(new int[1]);
            outputBuffers = OutputBuffers.createInitialEmptyOutputBuffers(sectionRootFragment.getPartitioningScheme().getPartitioning().getHandle()).withBuffer(new OutputBuffers.OutputBufferId(0), 0).withNoMoreBufferIds();
            OutputBuffers.OutputBufferId rootBufferId = (OutputBuffers.OutputBufferId)Iterables.getOnlyElement(outputBuffers.getBuffers().keySet());
            locationsConsumer = (fragmentId, tasks, noMoreExchangeLocations) -> SqlQueryScheduler.updateQueryOutputLocations(this.queryStateMachine, rootBufferId, tasks, noMoreExchangeLocations);
        } else {
            bucketToPartition = Optional.empty();
            outputBuffers = OutputBuffers.createDiscardingOutputBuffers();
            locationsConsumer = (fragmentId, tasks, noMoreExchangeLocations) -> {};
        }
        SectionExecution sectionExecution = this.sectionExecutionFactory.createSectionExecutions(this.session, newSection, locationsConsumer, bucketToPartition, outputBuffers, this.summarizeTaskInfo, this.remoteTaskFactory, this.splitSourceFactory, 0, this.cteMaterializationTracker);
        this.addStateChangeListeners(sectionExecution);
        Map updatedStageExecutions = (Map)sectionExecution.getSectionStages().stream().collect(ImmutableMap.toImmutableMap(execution -> execution.getStageExecution().getStageExecutionId().getStageId(), Function.identity()));
        SqlQueryScheduler sqlQueryScheduler = this;
        synchronized (sqlQueryScheduler) {
            this.stageExecutions.putAll(updatedStageExecutions);
        }
    }

    private void updatePlan(Map<PlanFragment, PlanFragment> oldToNewFragments) {
        this.plan.getAndUpdate(value -> this.rewritePlan((SubPlan)value, oldToNewFragments));
    }

    private SubPlan rewritePlan(SubPlan root, Map<PlanFragment, PlanFragment> oldToNewFragments) {
        ImmutableList.Builder children = ImmutableList.builder();
        for (SubPlan child : root.getChildren()) {
            children.add((Object)this.rewritePlan(child, oldToNewFragments));
        }
        if (oldToNewFragments.containsKey(root.getFragment())) {
            return new SubPlan(oldToNewFragments.get(root.getFragment()), (List<SubPlan>)children.build());
        }
        return new SubPlan(root.getFragment(), (List<SubPlan>)children.build());
    }

    private void addStateChangeListeners(SectionExecution sectionExecution) {
        for (StageExecutionAndScheduler stageExecutionAndScheduler : sectionExecution.getSectionStages()) {
            SqlStageExecution stageExecution = stageExecutionAndScheduler.getStageExecution();
            if (SqlQueryScheduler.isRootFragment(stageExecution.getFragment())) {
                stageExecution.addStateChangeListener(state -> {
                    if (state == StageExecutionState.FINISHED) {
                        this.queryStateMachine.transitionToFinishing();
                    } else if (state == StageExecutionState.CANCELED) {
                        this.queryStateMachine.transitionToCanceled();
                    }
                });
            }
            stageExecution.addStateChangeListener(state -> {
                if (this.queryStateMachine.isDone()) {
                    return;
                }
                if (state == StageExecutionState.FAILED) {
                    this.queryStateMachine.transitionToFailed(stageExecution.getStageExecutionInfo().getFailureCause().get().toException());
                } else if (state == StageExecutionState.ABORTED) {
                    this.queryStateMachine.transitionToFailed(new PrestoException((ErrorCodeSupplier)StandardErrorCode.GENERIC_INTERNAL_ERROR, "Query stage was aborted"));
                } else if (state == StageExecutionState.FINISHED) {
                    this.startScheduling();
                } else if (this.queryStateMachine.getQueryState() == QueryState.STARTING && stageExecution.hasTasks()) {
                    this.queryStateMachine.transitionToRunning();
                }
            });
            stageExecution.addFinalStageInfoListener(status -> this.queryStateMachine.updateQueryInfo(Optional.of(this.getStageInfo())));
        }
    }

    private StreamingSubPlan rewriteStreamingSubPlan(StreamingSubPlan root, Map<PlanFragment, PlanFragment> oldToNewFragment) {
        ImmutableList.Builder childrenPlans = ImmutableList.builder();
        for (StreamingSubPlan child : root.getChildren()) {
            childrenPlans.add((Object)this.rewriteStreamingSubPlan(child, oldToNewFragment));
        }
        if (oldToNewFragment.containsKey(root.getFragment())) {
            return new StreamingSubPlan(oldToNewFragment.get(root.getFragment()), (List<StreamingSubPlan>)childrenPlans.build());
        }
        return new StreamingSubPlan(root.getFragment(), (List<StreamingSubPlan>)childrenPlans.build());
    }

    private static boolean isRootFragment(PlanFragment fragment) {
        return fragment.getId().getId() == 0;
    }

    private boolean isReadyForExecution(StreamingPlanSection section) {
        SqlStageExecution stageExecution = this.getStageExecution(section.getPlan().getFragment().getId());
        if (stageExecution.getState() != StageExecutionState.PLANNED) {
            return false;
        }
        if (!SystemSessionProperties.isEnhancedCTESchedulingEnabled(this.session)) {
            for (StreamingPlanSection child : section.getChildren()) {
                SqlStageExecution rootStageExecution = this.getStageExecution(child.getPlan().getFragment().getId());
                if (rootStageExecution.getState() == StageExecutionState.FINISHED) continue;
                return false;
            }
        }
        return true;
    }

    private List<List<StageExecutionAndScheduler>> getStageExecutions(List<StreamingPlanSection> sections) {
        return (List)sections.stream().map(section -> (ImmutableList)Streams.stream((Iterable)Traverser.forTree(StreamingSubPlan::getChildren).depthFirstPreOrder((Object)section.getPlan())).collect(ImmutableList.toImmutableList())).map(plans -> (ImmutableList)plans.stream().map(StreamingSubPlan::getFragment).map(PlanFragment::getId).map(this::getStageExecutionInfo).collect(ImmutableList.toImmutableList())).collect(ImmutableList.toImmutableList());
    }

    private SqlStageExecution getStageExecution(PlanFragmentId planFragmentId) {
        return this.stageExecutions.get(this.getStageId(planFragmentId)).getStageExecution();
    }

    private StageExecutionAndScheduler getStageExecutionInfo(PlanFragmentId planFragmentId) {
        return this.stageExecutions.get(this.getStageId(planFragmentId));
    }

    private StageId getStageId(PlanFragmentId fragmentId) {
        return new StageId(this.queryStateMachine.getQueryId(), fragmentId.getId());
    }

    @Override
    public long getUserMemoryReservation() {
        return this.stageExecutions.values().stream().mapToLong(stageExecutionInfo -> stageExecutionInfo.getStageExecution().getUserMemoryReservation()).sum();
    }

    @Override
    public long getTotalMemoryReservation() {
        return this.stageExecutions.values().stream().mapToLong(stageExecutionInfo -> stageExecutionInfo.getStageExecution().getTotalMemoryReservation()).sum();
    }

    @Override
    public Duration getTotalCpuTime() {
        long millis = this.stageExecutions.values().stream().mapToLong(stage -> stage.getStageExecution().getTotalCpuTime().toMillis()).sum();
        return new Duration((double)millis, TimeUnit.MILLISECONDS);
    }

    @Override
    public long getRawInputDataSizeInBytes() {
        return this.stageExecutions.values().stream().mapToLong(stage -> stage.getStageExecution().getRawInputDataSize()).sum();
    }

    @Override
    public long getWrittenIntermediateDataSizeInBytes() {
        return this.stageExecutions.values().stream().mapToLong(stage -> stage.getStageExecution().getWrittenIntermediateDataSize()).sum();
    }

    @Override
    public long getOutputPositions() {
        return this.stageExecutions.get(this.rootStageId).getStageExecution().getStageExecutionInfo().getStats().getOutputPositions();
    }

    @Override
    public long getOutputDataSizeInBytes() {
        return this.stageExecutions.get(this.rootStageId).getStageExecution().getStageExecutionInfo().getStats().getOutputDataSizeInBytes();
    }

    @Override
    public BasicStageExecutionStats getBasicStageStats() {
        List stageStats = (List)this.stageExecutions.values().stream().map(stageExecutionInfo -> stageExecutionInfo.getStageExecution().getBasicStageStats()).collect(ImmutableList.toImmutableList());
        return BasicStageExecutionStats.aggregateBasicStageStats(stageStats);
    }

    @Override
    public StageInfo getStageInfo() {
        Map stageInfos = (Map)this.stageExecutions.values().stream().map(StageExecutionAndScheduler::getStageExecution).collect(ImmutableMap.toImmutableMap(execution -> execution.getStageExecutionId().getStageId(), SqlStageExecution::getStageExecutionInfo));
        return this.buildStageInfo(this.plan.get(), stageInfos);
    }

    private StageInfo buildStageInfo(SubPlan subPlan, Map<StageId, StageExecutionInfo> stageExecutionInfos) {
        StageId stageId = this.getStageId(subPlan.getFragment().getId());
        StageExecutionInfo stageExecutionInfo = stageExecutionInfos.get(stageId);
        Preconditions.checkArgument((stageExecutionInfo != null ? 1 : 0) != 0, (String)"No stageExecutionInfo for %s", (Object)stageId);
        return new StageInfo(stageId, this.locationFactory.createStageLocation(stageId), Optional.of(subPlan.getFragment()), stageExecutionInfo, (List<StageExecutionInfo>)ImmutableList.of(), (List)subPlan.getChildren().stream().map(plan -> this.buildStageInfo((SubPlan)plan, stageExecutionInfos)).collect(ImmutableList.toImmutableList()), this.runtimeOptimizedStages.contains(stageId));
    }

    @Override
    public void cancelStage(StageId stageId) {
        try (SetThreadName ignored = new SetThreadName("Query-%s", new Object[]{this.queryStateMachine.getQueryId()});){
            SqlStageExecution execution = this.stageExecutions.get(stageId).getStageExecution();
            SqlStageExecution stage = Objects.requireNonNull(execution, () -> String.format("Stage %s does not exist", stageId));
            stage.cancel();
        }
    }

    @Override
    public void abort() {
        try (SetThreadName ignored = new SetThreadName("Query-%s", new Object[]{this.queryStateMachine.getQueryId()});){
            this.stageExecutions.values().forEach(stageExecutionInfo -> stageExecutionInfo.getStageExecution().abort());
        }
    }
}

