/*
 * Decompiled with CFR 0.152.
 */
package io.trino.execution.scheduler.policy;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.base.Verify;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.MoreCollectors;
import com.google.common.collect.Ordering;
import com.google.common.graph.Graph;
import com.google.common.graph.GraphBuilder;
import com.google.common.graph.Graphs;
import com.google.common.graph.MutableGraph;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture;
import io.airlift.log.Logger;
import io.trino.execution.scheduler.StageExecution;
import io.trino.execution.scheduler.policy.ExecutionSchedule;
import io.trino.execution.scheduler.policy.StagesScheduleResult;
import io.trino.server.DynamicFilterService;
import io.trino.spi.QueryId;
import io.trino.sql.planner.PlanFragment;
import io.trino.sql.planner.plan.AggregationNode;
import io.trino.sql.planner.plan.ExchangeNode;
import io.trino.sql.planner.plan.IndexJoinNode;
import io.trino.sql.planner.plan.JoinNode;
import io.trino.sql.planner.plan.PlanFragmentId;
import io.trino.sql.planner.plan.PlanNode;
import io.trino.sql.planner.plan.PlanVisitor;
import io.trino.sql.planner.plan.RemoteSourceNode;
import io.trino.sql.planner.plan.SemiJoinNode;
import io.trino.sql.planner.plan.SpatialJoinNode;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.function.Function;
import javax.annotation.concurrent.GuardedBy;

public class PhasedExecutionSchedule
implements ExecutionSchedule {
    private static final Logger log = Logger.get(PhasedExecutionSchedule.class);
    private final MutableGraph<PlanFragmentId> fragmentDependency;
    private final MutableGraph<PlanFragmentId> fragmentTopology;
    private final List<PlanFragmentId> sortedFragments = new ArrayList<PlanFragmentId>();
    private final Map<PlanFragmentId, StageExecution> stagesByFragmentId;
    private final Set<StageExecution> schedulingStages = new LinkedHashSet<StageExecution>();
    private final DynamicFilterService dynamicFilterService;
    private Ordering<PlanFragmentId> fragmentOrdering;
    @GuardedBy(value="this")
    private SettableFuture<Void> rescheduleFuture = SettableFuture.create();

    public static PhasedExecutionSchedule forStages(Collection<StageExecution> stages, DynamicFilterService dynamicFilterService) {
        PhasedExecutionSchedule schedule = new PhasedExecutionSchedule(stages, dynamicFilterService);
        schedule.init(stages);
        return schedule;
    }

    private PhasedExecutionSchedule(Collection<StageExecution> stages, DynamicFilterService dynamicFilterService) {
        this.fragmentDependency = GraphBuilder.directed().build();
        this.fragmentTopology = GraphBuilder.directed().build();
        this.stagesByFragmentId = (Map)stages.stream().collect(ImmutableMap.toImmutableMap(stage -> stage.getFragment().getId(), Function.identity()));
        this.dynamicFilterService = Objects.requireNonNull(dynamicFilterService, "dynamicFilterService is null");
    }

    private void init(Collection<StageExecution> stages) {
        ImmutableSet.Builder fragmentsToExecute = ImmutableSet.builder();
        fragmentsToExecute.addAll(this.extractDependenciesAndReturnNonLazyFragments(stages));
        this.fragmentDependency.nodes().stream().filter(fragmentId -> this.fragmentDependency.inDegree(fragmentId) == 0).forEach(arg_0 -> ((ImmutableSet.Builder)fragmentsToExecute).add(arg_0));
        this.fragmentOrdering = Ordering.explicit(this.sortedFragments);
        this.selectForExecution((Set<PlanFragmentId>)fragmentsToExecute.build());
        log.debug("fragmentDependency: %s, fragmentTopology: %s, sortedFragments: %s, stagesByFragmentId: %s", new Object[]{this.fragmentDependency, this.fragmentTopology, this.sortedFragments, this.stagesByFragmentId});
    }

    @Override
    public StagesScheduleResult getStagesToSchedule() {
        Optional<ListenableFuture<Void>> rescheduleFuture = this.getRescheduleFuture();
        this.schedule();
        return new StagesScheduleResult(this.schedulingStages, rescheduleFuture);
    }

    @Override
    public boolean isFinished() {
        return this.fragmentDependency.nodes().isEmpty();
    }

    @VisibleForTesting
    synchronized Optional<ListenableFuture<Void>> getRescheduleFuture() {
        return Optional.of(this.rescheduleFuture);
    }

    @VisibleForTesting
    void schedule() {
        ImmutableSet.Builder fragmentsToExecute = ImmutableSet.builder();
        fragmentsToExecute.addAll(this.removeScheduledStages());
        fragmentsToExecute.addAll(this.unblockStagesWithFullOutputBuffer());
        this.selectForExecution((Set<PlanFragmentId>)fragmentsToExecute.build());
    }

    @VisibleForTesting
    List<PlanFragmentId> getSortedFragments() {
        return this.sortedFragments;
    }

    @VisibleForTesting
    Graph<PlanFragmentId> getFragmentDependency() {
        return this.fragmentDependency;
    }

    @VisibleForTesting
    Set<StageExecution> getSchedulingStages() {
        return this.schedulingStages;
    }

    private Set<PlanFragmentId> removeScheduledStages() {
        Set scheduledStages = (Set)this.stagesByFragmentId.values().stream().filter(this::isStageScheduled).collect(ImmutableSet.toImmutableSet());
        log.debug("scheduledStages: %s", new Object[]{scheduledStages});
        return (Set)scheduledStages.stream().flatMap(stage -> this.removeScheduledStage((StageExecution)stage).stream()).collect(ImmutableSet.toImmutableSet());
    }

    private Set<PlanFragmentId> removeScheduledStage(StageExecution stage) {
        PlanFragmentId fragmentId = stage.getFragment().getId();
        if (!this.fragmentDependency.nodes().contains(fragmentId)) {
            return ImmutableSet.of();
        }
        Set fragmentsToExecute = (Set)this.fragmentDependency.successors((Object)fragmentId).stream().filter(dependentFragmentId -> this.fragmentDependency.inDegree(dependentFragmentId) == 1).collect(ImmutableSet.toImmutableSet());
        this.fragmentDependency.removeNode((Object)fragmentId);
        this.schedulingStages.remove(stage);
        return fragmentsToExecute;
    }

    private Set<PlanFragmentId> unblockStagesWithFullOutputBuffer() {
        Set blockedFragments = (Set)this.stagesByFragmentId.values().stream().filter(StageExecution::isAnyTaskBlocked).map(stage -> stage.getFragment().getId()).collect(ImmutableSet.toImmutableSet());
        log.debug("blockedFragments: %s", new Object[]{blockedFragments});
        return (Set)blockedFragments.stream().flatMap(fragmentId -> this.fragmentTopology.successors(fragmentId).stream()).collect(ImmutableSet.toImmutableSet());
    }

    private void selectForExecution(Set<PlanFragmentId> fragmentIds) {
        Objects.requireNonNull(this.fragmentOrdering, "fragmentOrdering is null");
        List selectedForExecution = (List)fragmentIds.stream().sorted(this.fragmentOrdering).map(this.stagesByFragmentId::get).collect(ImmutableList.toImmutableList());
        log.debug("selectedForExecution: %s", new Object[]{selectedForExecution});
        selectedForExecution.forEach(this::selectForExecution);
    }

    private void selectForExecution(StageExecution stage) {
        if (this.isStageScheduled(stage)) {
            return;
        }
        if (this.schedulingStages.add(stage) && this.fragmentDependency.outDegree((Object)stage.getFragment().getId()) > 0) {
            stage.addStateChangeListener(state -> {
                if (this.isStageScheduled(stage)) {
                    this.notifyReschedule(stage);
                }
            });
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void notifyReschedule(StageExecution stage) {
        SettableFuture<Void> rescheduleFuture;
        PhasedExecutionSchedule phasedExecutionSchedule = this;
        synchronized (phasedExecutionSchedule) {
            rescheduleFuture = this.rescheduleFuture;
            this.rescheduleFuture = SettableFuture.create();
        }
        log.debug("notifyReschedule by %s", new Object[]{stage});
        rescheduleFuture.set(null);
    }

    private boolean isStageScheduled(StageExecution stage) {
        StageExecution.State state = stage.getState();
        return state == StageExecution.State.SCHEDULED || state == StageExecution.State.RUNNING || state == StageExecution.State.FLUSHING || state.isDone();
    }

    private Set<PlanFragmentId> extractDependenciesAndReturnNonLazyFragments(Collection<StageExecution> stages) {
        if (stages.isEmpty()) {
            return ImmutableSet.of();
        }
        QueryId queryId = (QueryId)stages.stream().map(stage -> stage.getStageId().getQueryId()).distinct().collect(MoreCollectors.onlyElement());
        List fragments = (List)stages.stream().map(StageExecution::getFragment).collect(ImmutableList.toImmutableList());
        Visitor visitor = new Visitor(queryId, fragments);
        visitor.processAllFragments();
        Verify.verify((!Graphs.hasCycle(this.fragmentDependency) ? 1 : 0) != 0, (String)"circular dependency between stages", (Object[])new Object[0]);
        return visitor.getNonLazyFragments();
    }

    private class Visitor
    extends PlanVisitor<FragmentSubGraph, PlanFragmentId> {
        private final QueryId queryId;
        private final Map<PlanFragmentId, PlanFragment> fragments;
        private final ImmutableSet.Builder<PlanFragmentId> nonLazyFragments = ImmutableSet.builder();
        private final Map<PlanFragmentId, FragmentSubGraph> fragmentSubGraphs = new HashMap<PlanFragmentId, FragmentSubGraph>();

        public Visitor(QueryId queryId, Collection<PlanFragment> fragments) {
            this.queryId = queryId;
            this.fragments = (Map)fragments.stream().collect(ImmutableMap.toImmutableMap(PlanFragment::getId, Function.identity()));
        }

        public Set<PlanFragmentId> getNonLazyFragments() {
            return this.nonLazyFragments.build();
        }

        public void processAllFragments() {
            this.fragments.forEach((fragmentId, fragment) -> {
                PhasedExecutionSchedule.this.fragmentDependency.addNode(fragmentId);
                PhasedExecutionSchedule.this.fragmentTopology.addNode(fragmentId);
            });
            Set remoteSources = (Set)this.fragments.values().stream().map(PlanFragment::getRemoteSourceNodes).flatMap(Collection::stream).map(RemoteSourceNode::getSourceFragmentIds).flatMap(Collection::stream).collect(ImmutableSet.toImmutableSet());
            this.fragments.keySet().stream().filter(fragmentId -> !remoteSources.contains(fragmentId)).forEach(this::processFragment);
        }

        public FragmentSubGraph processFragment(PlanFragmentId planFragmentId) {
            if (this.fragmentSubGraphs.containsKey(planFragmentId)) {
                return this.fragmentSubGraphs.get(planFragmentId);
            }
            FragmentSubGraph subGraph = this.processFragment(this.fragments.get(planFragmentId));
            Verify.verify((this.fragmentSubGraphs.put(planFragmentId, subGraph) == null ? 1 : 0) != 0, (String)"fragment %s was already processed", (Object)planFragmentId);
            PhasedExecutionSchedule.this.sortedFragments.add(planFragmentId);
            return subGraph;
        }

        private FragmentSubGraph processFragment(PlanFragment fragment) {
            ImmutableSet lazyUpstreamFragments;
            FragmentSubGraph subGraph = fragment.getRoot().accept(this, fragment.getId());
            ImmutableSet upstreamFragments = ImmutableSet.builder().addAll(subGraph.getUpstreamFragments()).add((Object)fragment.getId()).build();
            if (subGraph.isCurrentFragmentLazy()) {
                lazyUpstreamFragments = ImmutableSet.builder().addAll(subGraph.getLazyUpstreamFragments()).add((Object)fragment.getId()).build();
            } else {
                lazyUpstreamFragments = subGraph.getLazyUpstreamFragments();
                this.nonLazyFragments.add((Object)fragment.getId());
            }
            return new FragmentSubGraph((Set<PlanFragmentId>)upstreamFragments, (Set<PlanFragmentId>)lazyUpstreamFragments, false);
        }

        @Override
        public FragmentSubGraph visitJoin(JoinNode node, PlanFragmentId currentFragmentId) {
            return this.processJoin(node.getDistributionType().orElseThrow() == JoinNode.DistributionType.REPLICATED, node.getLeft(), node.getRight(), currentFragmentId);
        }

        @Override
        public FragmentSubGraph visitSpatialJoin(SpatialJoinNode node, PlanFragmentId currentFragmentId) {
            return this.processJoin(node.getDistributionType() == SpatialJoinNode.DistributionType.REPLICATED, node.getLeft(), node.getRight(), currentFragmentId);
        }

        @Override
        public FragmentSubGraph visitSemiJoin(SemiJoinNode node, PlanFragmentId currentFragmentId) {
            return this.processJoin(node.getDistributionType().orElseThrow() == SemiJoinNode.DistributionType.REPLICATED, node.getSource(), node.getFilteringSource(), currentFragmentId);
        }

        @Override
        public FragmentSubGraph visitIndexJoin(IndexJoinNode node, PlanFragmentId currentFragmentId) {
            return this.processJoin(true, node.getProbeSource(), node.getIndexSource(), currentFragmentId);
        }

        private FragmentSubGraph processJoin(boolean replicated, PlanNode probe, PlanNode build, PlanFragmentId currentFragmentId) {
            boolean currentFragmentLazy;
            FragmentSubGraph buildSubGraph = build.accept(this, currentFragmentId);
            FragmentSubGraph probeSubGraph = probe.accept(this, currentFragmentId);
            this.addDependencyEdges(buildSubGraph.getUpstreamFragments(), probeSubGraph.getLazyUpstreamFragments());
            boolean bl = currentFragmentLazy = probeSubGraph.isCurrentFragmentLazy() && buildSubGraph.isCurrentFragmentLazy();
            if (replicated && currentFragmentLazy && !PhasedExecutionSchedule.this.dynamicFilterService.isStageSchedulingNeededToCollectDynamicFilters(this.queryId, this.fragments.get(currentFragmentId))) {
                this.addDependencyEdges(buildSubGraph.getUpstreamFragments(), (Set<PlanFragmentId>)ImmutableSet.of((Object)currentFragmentId));
            } else {
                currentFragmentLazy = false;
            }
            return new FragmentSubGraph((Set<PlanFragmentId>)ImmutableSet.builder().addAll(probeSubGraph.getUpstreamFragments()).addAll(buildSubGraph.getUpstreamFragments()).build(), probeSubGraph.getLazyUpstreamFragments(), currentFragmentLazy);
        }

        @Override
        public FragmentSubGraph visitAggregation(AggregationNode node, PlanFragmentId currentFragmentId) {
            FragmentSubGraph subGraph = node.getSource().accept(this, currentFragmentId);
            if (node.getStep() != AggregationNode.Step.FINAL && node.getStep() != AggregationNode.Step.SINGLE) {
                return subGraph;
            }
            return new FragmentSubGraph(subGraph.getUpstreamFragments(), (Set<PlanFragmentId>)ImmutableSet.of(), false);
        }

        @Override
        public FragmentSubGraph visitRemoteSource(RemoteSourceNode node, PlanFragmentId currentFragmentId) {
            List subGraphs = (List)node.getSourceFragmentIds().stream().map(this::processFragment).collect(ImmutableList.toImmutableList());
            node.getSourceFragmentIds().forEach(sourceFragmentId -> PhasedExecutionSchedule.this.fragmentTopology.putEdge(sourceFragmentId, (Object)currentFragmentId));
            return new FragmentSubGraph((Set)subGraphs.stream().flatMap(source -> source.getUpstreamFragments().stream()).collect(ImmutableSet.toImmutableSet()), (Set)subGraphs.stream().flatMap(source -> source.getLazyUpstreamFragments().stream()).collect(ImmutableSet.toImmutableSet()), true);
        }

        @Override
        public FragmentSubGraph visitExchange(ExchangeNode node, PlanFragmentId currentFragmentId) {
            Preconditions.checkArgument((node.getScope() == ExchangeNode.Scope.LOCAL ? 1 : 0) != 0, (Object)"Only local exchanges are supported in the phased execution scheduler");
            return this.visitPlan((PlanNode)node, currentFragmentId);
        }

        @Override
        protected FragmentSubGraph visitPlan(PlanNode node, PlanFragmentId currentFragmentId) {
            List sourceSubGraphs = (List)node.getSources().stream().map(subPlanNode -> subPlanNode.accept(this, currentFragmentId)).collect(ImmutableList.toImmutableList());
            return new FragmentSubGraph((Set)sourceSubGraphs.stream().flatMap(source -> source.getUpstreamFragments().stream()).collect(ImmutableSet.toImmutableSet()), (Set)sourceSubGraphs.stream().flatMap(source -> source.getLazyUpstreamFragments().stream()).collect(ImmutableSet.toImmutableSet()), sourceSubGraphs.stream().allMatch(FragmentSubGraph::isCurrentFragmentLazy));
        }

        private void addDependencyEdges(Set<PlanFragmentId> sourceFragments, Set<PlanFragmentId> targetFragments) {
            for (PlanFragmentId targetFragment : targetFragments) {
                for (PlanFragmentId sourceFragment : sourceFragments) {
                    PhasedExecutionSchedule.this.fragmentDependency.putEdge((Object)sourceFragment, (Object)targetFragment);
                }
            }
        }
    }

    private static class FragmentSubGraph {
        private final Set<PlanFragmentId> upstreamFragments;
        private final Set<PlanFragmentId> lazyUpstreamFragments;
        private final boolean currentFragmentLazy;

        public FragmentSubGraph(Set<PlanFragmentId> upstreamFragments, Set<PlanFragmentId> lazyUpstreamFragments, boolean currentFragmentLazy) {
            this.upstreamFragments = Objects.requireNonNull(upstreamFragments, "upstreamFragments is null");
            this.lazyUpstreamFragments = Objects.requireNonNull(lazyUpstreamFragments, "lazyUpstreamFragments is null");
            this.currentFragmentLazy = currentFragmentLazy;
        }

        public Set<PlanFragmentId> getUpstreamFragments() {
            return this.upstreamFragments;
        }

        public Set<PlanFragmentId> getLazyUpstreamFragments() {
            return this.lazyUpstreamFragments;
        }

        public boolean isCurrentFragmentLazy() {
            return this.currentFragmentLazy;
        }
    }
}

