/*
 * Decompiled with CFR 0.152.
 */
package io.trino.execution.scheduler;

import com.google.common.base.Preconditions;
import com.google.common.base.Ticker;
import com.google.common.base.Verify;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.ListenableFuture;
import io.airlift.concurrent.MoreFutures;
import io.airlift.concurrent.SetThreadName;
import io.airlift.log.Logger;
import io.airlift.stats.TimeStat;
import io.airlift.units.Duration;
import io.trino.Session;
import io.trino.SystemSessionProperties;
import io.trino.exchange.ExchangeInput;
import io.trino.exchange.SpoolingExchangeInput;
import io.trino.execution.BasicStageStats;
import io.trino.execution.NodeTaskMap;
import io.trino.execution.QueryState;
import io.trino.execution.QueryStateMachine;
import io.trino.execution.RemoteTaskFactory;
import io.trino.execution.SqlStage;
import io.trino.execution.StageId;
import io.trino.execution.StageInfo;
import io.trino.execution.TaskId;
import io.trino.execution.scheduler.Exchanges;
import io.trino.execution.scheduler.FaultTolerantPartitioningScheme;
import io.trino.execution.scheduler.FaultTolerantPartitioningSchemeFactory;
import io.trino.execution.scheduler.FaultTolerantStageScheduler;
import io.trino.execution.scheduler.NodeAllocator;
import io.trino.execution.scheduler.NodeAllocatorService;
import io.trino.execution.scheduler.PartitionMemoryEstimatorFactory;
import io.trino.execution.scheduler.QueryScheduler;
import io.trino.execution.scheduler.SplitSchedulerStats;
import io.trino.execution.scheduler.StageManager;
import io.trino.execution.scheduler.TaskDescriptorStorage;
import io.trino.execution.scheduler.TaskExecutionStats;
import io.trino.execution.scheduler.TaskSourceFactory;
import io.trino.failuredetector.FailureDetector;
import io.trino.metadata.Metadata;
import io.trino.operator.RetryPolicy;
import io.trino.server.DynamicFilterService;
import io.trino.spi.exchange.Exchange;
import io.trino.spi.exchange.ExchangeContext;
import io.trino.spi.exchange.ExchangeId;
import io.trino.spi.exchange.ExchangeManager;
import io.trino.spi.exchange.ExchangeSourceHandle;
import io.trino.spi.exchange.ExchangeSourceOutputSelector;
import io.trino.sql.planner.NodePartitioningManager;
import io.trino.sql.planner.PlanFragment;
import io.trino.sql.planner.SubPlan;
import io.trino.sql.planner.plan.PlanFragmentId;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import javax.annotation.concurrent.GuardedBy;

@Deprecated
public class FaultTolerantQueryScheduler
implements QueryScheduler {
    private static final Logger log = Logger.get(FaultTolerantQueryScheduler.class);
    private final QueryStateMachine queryStateMachine;
    private final ExecutorService queryExecutor;
    private final SplitSchedulerStats schedulerStats;
    private final FailureDetector failureDetector;
    private final TaskSourceFactory taskSourceFactory;
    private final TaskDescriptorStorage taskDescriptorStorage;
    private final ExchangeManager exchangeManager;
    private final NodePartitioningManager nodePartitioningManager;
    private final int taskRetryAttemptsOverall;
    private final int taskRetryAttemptsPerTask;
    private final int maxTasksWaitingForNodePerStage;
    private final ScheduledExecutorService scheduledExecutorService;
    private final NodeAllocatorService nodeAllocatorService;
    private final PartitionMemoryEstimatorFactory partitionMemoryEstimatorFactory;
    private final TaskExecutionStats taskExecutionStats;
    private final DynamicFilterService dynamicFilterService;
    private final StageManager stageManager;
    @GuardedBy(value="this")
    private boolean started;
    @GuardedBy(value="this")
    private Scheduler scheduler;

    public FaultTolerantQueryScheduler(QueryStateMachine queryStateMachine, ExecutorService queryExecutor, SplitSchedulerStats schedulerStats, FailureDetector failureDetector, TaskSourceFactory taskSourceFactory, TaskDescriptorStorage taskDescriptorStorage, ExchangeManager exchangeManager, NodePartitioningManager nodePartitioningManager, int taskRetryAttemptsOverall, int taskRetryAttemptsPerTask, int maxTasksWaitingForNodePerStage, ScheduledExecutorService scheduledExecutorService, NodeAllocatorService nodeAllocatorService, PartitionMemoryEstimatorFactory partitionMemoryEstimatorFactory, TaskExecutionStats taskExecutionStats, DynamicFilterService dynamicFilterService, Metadata metadata, RemoteTaskFactory taskFactory, NodeTaskMap nodeTaskMap, SubPlan planTree, boolean summarizeTaskInfo) {
        this.queryStateMachine = Objects.requireNonNull(queryStateMachine, "queryStateMachine is null");
        RetryPolicy retryPolicy = SystemSessionProperties.getRetryPolicy(queryStateMachine.getSession());
        Verify.verify((retryPolicy == RetryPolicy.TASK ? 1 : 0) != 0, (String)"unexpected retry policy: %s", (Object)((Object)retryPolicy));
        this.queryExecutor = Objects.requireNonNull(queryExecutor, "queryExecutor is null");
        this.schedulerStats = Objects.requireNonNull(schedulerStats, "schedulerStats is null");
        this.failureDetector = Objects.requireNonNull(failureDetector, "failureDetector is null");
        this.taskSourceFactory = Objects.requireNonNull(taskSourceFactory, "taskSourceFactory is null");
        this.taskDescriptorStorage = Objects.requireNonNull(taskDescriptorStorage, "taskDescriptorStorage is null");
        this.exchangeManager = Objects.requireNonNull(exchangeManager, "exchangeManager is null");
        this.nodePartitioningManager = Objects.requireNonNull(nodePartitioningManager, "nodePartitioningManager is null");
        this.taskRetryAttemptsOverall = taskRetryAttemptsOverall;
        this.taskRetryAttemptsPerTask = taskRetryAttemptsPerTask;
        this.maxTasksWaitingForNodePerStage = maxTasksWaitingForNodePerStage;
        this.scheduledExecutorService = Objects.requireNonNull(scheduledExecutorService, "scheduledExecutorService is null");
        this.nodeAllocatorService = Objects.requireNonNull(nodeAllocatorService, "nodeAllocatorService is null");
        this.partitionMemoryEstimatorFactory = Objects.requireNonNull(partitionMemoryEstimatorFactory, "partitionMemoryEstimatorFactory is null");
        this.taskExecutionStats = Objects.requireNonNull(taskExecutionStats, "taskExecutionStats is null");
        this.dynamicFilterService = Objects.requireNonNull(dynamicFilterService, "dynamicFilterService is null");
        this.stageManager = StageManager.create(queryStateMachine, metadata, taskFactory, nodeTaskMap, queryExecutor, schedulerStats, planTree, summarizeTaskInfo);
    }

    @Override
    public synchronized void start() {
        if (this.started) {
            return;
        }
        this.started = true;
        if (this.queryStateMachine.isDone()) {
            return;
        }
        this.queryStateMachine.addStateChangeListener(state -> {
            Scheduler scheduler;
            if (!state.isDone()) {
                return;
            }
            FaultTolerantQueryScheduler faultTolerantQueryScheduler = this;
            synchronized (faultTolerantQueryScheduler) {
                scheduler = this.scheduler;
                this.scheduler = null;
            }
            if (state == QueryState.FINISHED) {
                if (scheduler != null) {
                    scheduler.cancel();
                }
                this.stageManager.finish();
            } else if (state == QueryState.FAILED) {
                if (scheduler != null) {
                    scheduler.abort();
                }
                this.stageManager.abort();
            }
            this.queryStateMachine.updateQueryInfo(Optional.ofNullable(this.getStageInfo()));
        });
        this.scheduler = this.createScheduler();
        this.queryExecutor.submit(this.scheduler::schedule);
    }

    private Scheduler createScheduler() {
        this.taskDescriptorStorage.initialize(this.queryStateMachine.getQueryId());
        this.queryStateMachine.addStateChangeListener(state -> {
            if (state.isDone()) {
                this.taskDescriptorStorage.destroy(this.queryStateMachine.getQueryId());
            }
        });
        Session session = this.queryStateMachine.getSession();
        FaultTolerantPartitioningSchemeFactory partitioningSchemeFactory = new FaultTolerantPartitioningSchemeFactory(this.nodePartitioningManager, session, SystemSessionProperties.getFaultTolerantExecutionPartitionCount(session));
        HashMap<PlanFragmentId, FaultTolerantStageScheduler> schedulers = new HashMap<PlanFragmentId, FaultTolerantStageScheduler>();
        HashMap<PlanFragmentId, Exchange> exchanges = new HashMap<PlanFragmentId, Exchange>();
        NodeAllocator nodeAllocator = this.nodeAllocatorService.getNodeAllocator(session);
        try {
            List<SqlStage> stagesInTopologicalOrder = this.stageManager.getStagesInTopologicalOrder();
            List stagesInReverseTopologicalOrder = Lists.reverse(stagesInTopologicalOrder);
            Preconditions.checkArgument((this.taskRetryAttemptsOverall >= 0 ? 1 : 0) != 0, (String)"taskRetryAttemptsOverall must be greater than or equal to 0: %s", (int)this.taskRetryAttemptsOverall);
            AtomicInteger remainingTaskRetryAttemptsOverall = new AtomicInteger(this.taskRetryAttemptsOverall);
            for (SqlStage stage : stagesInReverseTopologicalOrder) {
                PlanFragment fragment = stage.getFragment();
                boolean outputStage = this.stageManager.getOutputStage().getStageId().equals(stage.getStageId());
                ExchangeContext exchangeContext = new ExchangeContext(session.getQueryId(), new ExchangeId("external-exchange-" + stage.getStageId().getId()));
                FaultTolerantPartitioningScheme sinkPartitioningScheme = partitioningSchemeFactory.get(fragment.getPartitioningScheme().getPartitioning().getHandle());
                Exchange exchange = this.exchangeManager.createExchange(exchangeContext, sinkPartitioningScheme.getPartitionCount(), outputStage);
                exchanges.put(fragment.getId(), exchange);
                ImmutableMap.Builder sourceExchanges = ImmutableMap.builder();
                ImmutableMap.Builder sourceSchedulers = ImmutableMap.builder();
                for (SqlStage childStage : this.stageManager.getChildren(fragment.getId())) {
                    PlanFragmentId childFragmentId = childStage.getFragment().getId();
                    Exchange sourceExchange = (Exchange)exchanges.get(childFragmentId);
                    Verify.verify((sourceExchange != null ? 1 : 0) != 0, (String)"exchange not found for fragment: %s", (Object)childFragmentId);
                    sourceExchanges.put((Object)childFragmentId, (Object)sourceExchange);
                    FaultTolerantStageScheduler sourceScheduler = (FaultTolerantStageScheduler)schedulers.get(childFragmentId);
                    Verify.verify((sourceScheduler != null ? 1 : 0) != 0, (String)"scheduler not found for fragment: %s", (Object)childFragmentId);
                    sourceSchedulers.put((Object)childFragmentId, (Object)sourceScheduler);
                }
                FaultTolerantPartitioningScheme sourcePartitioningScheme = partitioningSchemeFactory.get(fragment.getPartitioning());
                FaultTolerantStageScheduler scheduler = new FaultTolerantStageScheduler(session, stage, this.failureDetector, this.taskSourceFactory, nodeAllocator, this.taskDescriptorStorage, this.partitionMemoryEstimatorFactory.createPartitionMemoryEstimator(), this.taskExecutionStats, (future, delay) -> this.scheduledExecutorService.schedule(() -> future.set(null), delay.toMillis(), TimeUnit.MILLISECONDS), Ticker.systemTicker(), exchange, sinkPartitioningScheme, (Map<PlanFragmentId, FaultTolerantStageScheduler>)sourceSchedulers.buildOrThrow(), (Map<PlanFragmentId, Exchange>)sourceExchanges.buildOrThrow(), sourcePartitioningScheme, remainingTaskRetryAttemptsOverall, this.taskRetryAttemptsPerTask, this.maxTasksWaitingForNodePerStage, this.dynamicFilterService);
                schedulers.put(fragment.getId(), scheduler);
                if (!outputStage) continue;
                ListenableFuture<List<ExchangeSourceHandle>> sourceHandles = Exchanges.getAllSourceHandles(exchange.getSourceHandles());
                MoreFutures.addSuccessCallback(sourceHandles, handles -> {
                    try {
                        ExchangeSourceOutputSelector.Builder selector = ExchangeSourceOutputSelector.builder((Set)ImmutableSet.of((Object)exchange.getId()));
                        Map<Integer, Integer> successfulAttempts = scheduler.getSuccessfulAttempts();
                        successfulAttempts.forEach((taskPartitionId, attemptId) -> selector.include(exchange.getId(), taskPartitionId.intValue(), attemptId.intValue()));
                        selector.setPartitionCount(exchange.getId(), successfulAttempts.size());
                        selector.setFinal();
                        SpoolingExchangeInput input = new SpoolingExchangeInput((List<ExchangeSourceHandle>)handles, Optional.of(selector.build()));
                        this.queryStateMachine.updateInputsForQueryResults((List<ExchangeInput>)ImmutableList.of((Object)input), true);
                    }
                    catch (Throwable t) {
                        this.queryStateMachine.transitionToFailed(t);
                    }
                });
                MoreFutures.addExceptionCallback(sourceHandles, this.queryStateMachine::transitionToFailed);
            }
            return new Scheduler(this.queryStateMachine, (List<FaultTolerantStageScheduler>)ImmutableList.copyOf(schedulers.values()), this.stageManager, this.schedulerStats, nodeAllocator);
        }
        catch (Throwable t) {
            block12: {
                for (FaultTolerantStageScheduler scheduler : schedulers.values()) {
                    try {
                        scheduler.abort();
                    }
                    catch (Throwable closeFailure) {
                        if (t == closeFailure) continue;
                        t.addSuppressed(closeFailure);
                    }
                }
                try {
                    nodeAllocator.close();
                }
                catch (Throwable closeFailure) {
                    if (t == closeFailure) break block12;
                    t.addSuppressed(closeFailure);
                }
            }
            for (Exchange exchange : exchanges.values()) {
                try {
                    exchange.close();
                }
                catch (Throwable closeFailure) {
                    if (t == closeFailure) continue;
                    t.addSuppressed(closeFailure);
                }
            }
            throw t;
        }
    }

    @Override
    public void cancelStage(StageId stageId) {
        throw new UnsupportedOperationException("partial cancel is not supported in fault tolerant mode");
    }

    @Override
    public void failTask(TaskId taskId, Throwable failureCause) {
        this.stageManager.failTaskRemotely(taskId, failureCause);
    }

    @Override
    public BasicStageStats getBasicStageStats() {
        return this.stageManager.getBasicStageStats();
    }

    @Override
    public StageInfo getStageInfo() {
        return this.stageManager.getStageInfo();
    }

    @Override
    public long getUserMemoryReservation() {
        return this.stageManager.getUserMemoryReservation();
    }

    @Override
    public long getTotalMemoryReservation() {
        return this.stageManager.getTotalMemoryReservation();
    }

    @Override
    public Duration getTotalCpuTime() {
        return this.stageManager.getTotalCpuTime();
    }

    private static boolean isFinishingOrDone(QueryStateMachine queryStateMachine) {
        QueryState queryState = queryStateMachine.getQueryState();
        return queryState == QueryState.FINISHING || queryState.isDone();
    }

    private static class Scheduler {
        private final QueryStateMachine queryStateMachine;
        private final List<FaultTolerantStageScheduler> schedulers;
        private final StageManager stageManager;
        private final SplitSchedulerStats schedulerStats;
        private final NodeAllocator nodeAllocator;

        private Scheduler(QueryStateMachine queryStateMachine, List<FaultTolerantStageScheduler> schedulers, StageManager stageManager, SplitSchedulerStats schedulerStats, NodeAllocator nodeAllocator) {
            this.queryStateMachine = Objects.requireNonNull(queryStateMachine, "queryStateMachine is null");
            this.stageManager = Objects.requireNonNull(stageManager, "stageManager is null");
            this.schedulers = ImmutableList.copyOf((Collection)Objects.requireNonNull(schedulers, "schedulers is null"));
            this.schedulerStats = Objects.requireNonNull(schedulerStats, "schedulerStats is null");
            this.nodeAllocator = Objects.requireNonNull(nodeAllocator, "nodeAllocator is null");
        }

        public void schedule() {
            if (this.schedulers.isEmpty()) {
                this.queryStateMachine.transitionToFinishing();
                return;
            }
            this.queryStateMachine.transitionToRunning();
            try (SetThreadName ignored = new SetThreadName("Query-%s", new Object[]{this.queryStateMachine.getQueryId()});){
                ArrayList<ListenableFuture<Void>> blockedStages = new ArrayList<ListenableFuture<Void>>();
                while (!FaultTolerantQueryScheduler.isFinishingOrDone(this.queryStateMachine)) {
                    blockedStages.clear();
                    boolean atLeastOneStageIsNotBlocked = false;
                    boolean allFinished = true;
                    for (FaultTolerantStageScheduler scheduler : this.schedulers) {
                        if (scheduler.isFinished()) {
                            this.stageManager.get(scheduler.getStageId()).finish();
                            continue;
                        }
                        allFinished = false;
                        ListenableFuture<Void> blocked = scheduler.isBlocked();
                        if (!blocked.isDone()) {
                            blockedStages.add(blocked);
                            continue;
                        }
                        try {
                            scheduler.schedule();
                        }
                        catch (Throwable t) {
                            this.fail(t, Optional.of(scheduler.getStageId()));
                            ignored.close();
                            return;
                        }
                        blocked = scheduler.isBlocked();
                        if (!blocked.isDone()) {
                            blockedStages.add(blocked);
                            continue;
                        }
                        atLeastOneStageIsNotBlocked = true;
                    }
                    if (allFinished) {
                        this.queryStateMachine.transitionToFinishing();
                        return;
                    }
                    if (atLeastOneStageIsNotBlocked) continue;
                    Verify.verify((!blockedStages.isEmpty() ? 1 : 0) != 0, (String)"blockedStages is not expected to be empty here", (Object[])new Object[0]);
                    TimeStat.BlockTimer timer = this.schedulerStats.getSleepTime().time();
                    try {
                        try {
                            MoreFutures.tryGetFutureValue((Future)MoreFutures.whenAnyComplete(blockedStages), (int)1, (TimeUnit)TimeUnit.SECONDS);
                        }
                        catch (CancellationException e) {
                            log.debug("Scheduling has been cancelled for query %s. Query state: %s", new Object[]{this.queryStateMachine.getQueryId(), this.queryStateMachine.getQueryState()});
                        }
                    }
                    finally {
                        if (timer == null) continue;
                        timer.close();
                    }
                }
            }
            catch (Throwable t) {
                this.fail(t, Optional.empty());
            }
        }

        public void cancel() {
            this.schedulers.forEach(FaultTolerantStageScheduler::cancel);
            this.closeNodeAllocator();
        }

        public void abort() {
            this.schedulers.forEach(FaultTolerantStageScheduler::abort);
            this.closeNodeAllocator();
        }

        private void fail(Throwable t, Optional<StageId> failedStageId) {
            this.abort();
            this.stageManager.getStagesInTopologicalOrder().forEach(stage -> {
                if (failedStageId.isPresent() && ((StageId)failedStageId.get()).equals(stage.getStageId())) {
                    stage.fail(t);
                } else {
                    stage.abort();
                }
            });
            this.queryStateMachine.transitionToFailed(t);
        }

        private void closeNodeAllocator() {
            try {
                this.nodeAllocator.close();
            }
            catch (Throwable t) {
                log.warn(t, "Error closing node allocator for query: %s", new Object[]{this.queryStateMachine.getQueryId()});
            }
        }
    }
}

