/*
 * Decompiled with CFR 0.152.
 */
package io.trino.execution.scheduler;

import com.google.common.base.Preconditions;
import com.google.common.base.Verify;
import com.google.common.collect.ImmutableMultimap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Multimap;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
import io.airlift.concurrent.MoreFutures;
import io.airlift.log.Logger;
import io.trino.execution.RemoteTask;
import io.trino.execution.TableExecuteContext;
import io.trino.execution.TableExecuteContextManager;
import io.trino.execution.scheduler.PartitionIdAllocator;
import io.trino.execution.scheduler.ScheduleResult;
import io.trino.execution.scheduler.SourceScheduler;
import io.trino.execution.scheduler.SplitPlacementPolicy;
import io.trino.execution.scheduler.SplitPlacementResult;
import io.trino.execution.scheduler.StageExecution;
import io.trino.execution.scheduler.StageScheduler;
import io.trino.metadata.InternalNode;
import io.trino.metadata.Split;
import io.trino.server.DynamicFilterService;
import io.trino.split.EmptySplit;
import io.trino.split.SplitSource;
import io.trino.sql.planner.plan.PlanNodeId;
import java.util.Collection;
import java.util.HashMap;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.Executor;
import java.util.concurrent.ThreadLocalRandom;
import java.util.function.BooleanSupplier;

public class SourcePartitionedScheduler
implements SourceScheduler {
    private static final Logger log = Logger.get(SourcePartitionedScheduler.class);
    private final StageExecution stageExecution;
    private final SplitSource splitSource;
    private final SplitPlacementPolicy splitPlacementPolicy;
    private final int splitBatchSize;
    private final PlanNodeId partitionedNode;
    private final DynamicFilterService dynamicFilterService;
    private final TableExecuteContextManager tableExecuteContextManager;
    private final BooleanSupplier anySourceTaskBlocked;
    private final PartitionIdAllocator partitionIdAllocator;
    private final Map<InternalNode, RemoteTask> scheduledTasks;
    private final Set<Split> pendingSplits = new LinkedHashSet<Split>();
    private ListenableFuture<SplitSource.SplitBatch> nextSplitBatchFuture;
    private ListenableFuture<Void> placementFuture = Futures.immediateVoidFuture();
    private State state = State.INITIALIZED;

    private SourcePartitionedScheduler(StageExecution stageExecution, PlanNodeId partitionedNode, SplitSource splitSource, SplitPlacementPolicy splitPlacementPolicy, int splitBatchSize, DynamicFilterService dynamicFilterService, TableExecuteContextManager tableExecuteContextManager, BooleanSupplier anySourceTaskBlocked, PartitionIdAllocator partitionIdAllocator, Map<InternalNode, RemoteTask> scheduledTasks) {
        this.stageExecution = Objects.requireNonNull(stageExecution, "stageExecution is null");
        this.splitSource = Objects.requireNonNull(splitSource, "splitSource is null");
        this.splitPlacementPolicy = Objects.requireNonNull(splitPlacementPolicy, "splitPlacementPolicy is null");
        Preconditions.checkArgument((splitBatchSize > 0 ? 1 : 0) != 0, (Object)"splitBatchSize must be at least one");
        this.splitBatchSize = splitBatchSize;
        this.partitionedNode = Objects.requireNonNull(partitionedNode, "partitionedNode is null");
        this.dynamicFilterService = Objects.requireNonNull(dynamicFilterService, "dynamicFilterService is null");
        this.tableExecuteContextManager = Objects.requireNonNull(tableExecuteContextManager, "tableExecuteContextManager is null");
        this.anySourceTaskBlocked = Objects.requireNonNull(anySourceTaskBlocked, "anySourceTaskBlocked is null");
        this.partitionIdAllocator = Objects.requireNonNull(partitionIdAllocator, "partitionIdAllocator is null");
        this.scheduledTasks = Objects.requireNonNull(scheduledTasks, "scheduledTasks is null");
    }

    @Override
    public PlanNodeId getPlanNodeId() {
        return this.partitionedNode;
    }

    public static StageScheduler newSourcePartitionedSchedulerAsStageScheduler(StageExecution stageExecution, PlanNodeId partitionedNode, SplitSource splitSource, SplitPlacementPolicy splitPlacementPolicy, int splitBatchSize, DynamicFilterService dynamicFilterService, TableExecuteContextManager tableExecuteContextManager, BooleanSupplier anySourceTaskBlocked) {
        final SourcePartitionedScheduler sourcePartitionedScheduler = new SourcePartitionedScheduler(stageExecution, partitionedNode, splitSource, splitPlacementPolicy, splitBatchSize, dynamicFilterService, tableExecuteContextManager, anySourceTaskBlocked, new PartitionIdAllocator(), new HashMap<InternalNode, RemoteTask>());
        return new StageScheduler(){

            @Override
            public void start() {
                sourcePartitionedScheduler.start();
            }

            @Override
            public ScheduleResult schedule() {
                return sourcePartitionedScheduler.schedule();
            }

            @Override
            public void close() {
                sourcePartitionedScheduler.close();
            }
        };
    }

    public static SourceScheduler newSourcePartitionedSchedulerAsSourceScheduler(StageExecution stageExecution, PlanNodeId partitionedNode, SplitSource splitSource, SplitPlacementPolicy splitPlacementPolicy, int splitBatchSize, DynamicFilterService dynamicFilterService, TableExecuteContextManager tableExecuteContextManager, BooleanSupplier anySourceTaskBlocked, PartitionIdAllocator partitionIdAllocator, Map<InternalNode, RemoteTask> scheduledTasks) {
        return new SourcePartitionedScheduler(stageExecution, partitionedNode, splitSource, splitPlacementPolicy, splitBatchSize, dynamicFilterService, tableExecuteContextManager, anySourceTaskBlocked, partitionIdAllocator, scheduledTasks);
    }

    @Override
    public synchronized void start() {
        if (this.dynamicFilterService.isCollectingTaskNeeded(this.stageExecution.getStageId().getQueryId(), this.stageExecution.getFragment())) {
            this.stageExecution.beginScheduling();
            this.createTaskOnRandomNode();
        }
    }

    @Override
    public synchronized ScheduleResult schedule() {
        if (this.state == State.FINISHED) {
            return new ScheduleResult(true, (Iterable<? extends RemoteTask>)ImmutableSet.of(), 0);
        }
        int overallSplitAssignmentCount = 0;
        Multimap<InternalNode, Split> splitAssignment = ImmutableMultimap.of();
        ImmutableSet.Builder overallNewTasks = ImmutableSet.builder();
        Optional<Object> blockedFuture = Optional.empty();
        boolean blockedOnPlacements = false;
        boolean blockedOnNextSplitBatch = false;
        if (this.state == State.SPLITS_SCHEDULED) {
            Verify.verify((this.nextSplitBatchFuture == null ? 1 : 0) != 0);
        } else if (this.pendingSplits.isEmpty()) {
            if (this.nextSplitBatchFuture == null) {
                this.nextSplitBatchFuture = this.splitSource.getNextBatch(this.splitBatchSize);
                long start = System.nanoTime();
                MoreFutures.addSuccessCallback(this.nextSplitBatchFuture, () -> this.stageExecution.recordSplitSourceMetrics(this.partitionedNode, this.splitSource.getMetrics(), start));
            }
            if (this.nextSplitBatchFuture.isDone()) {
                SplitSource.SplitBatch nextSplits = (SplitSource.SplitBatch)MoreFutures.getFutureValue(this.nextSplitBatchFuture);
                this.nextSplitBatchFuture = null;
                this.pendingSplits.addAll(nextSplits.getSplits());
                if (nextSplits.isLastBatch()) {
                    if (this.state == State.INITIALIZED && this.pendingSplits.isEmpty()) {
                        this.pendingSplits.add(new Split(this.splitSource.getCatalogHandle(), new EmptySplit(this.splitSource.getCatalogHandle())));
                    }
                    log.debug("stage id: %s, node: %s; transitioning to SPLITS_SCHEDULED", new Object[]{this.stageExecution.getStageId(), this.partitionedNode});
                    this.state = State.SPLITS_SCHEDULED;
                }
            } else {
                blockedFuture = Optional.of(SourcePartitionedScheduler.asVoid(this.nextSplitBatchFuture));
                blockedOnNextSplitBatch = true;
                log.debug("stage id: %s, node: %s; blocked on next split batch", new Object[]{this.stageExecution.getStageId(), this.partitionedNode});
            }
        }
        if (!this.pendingSplits.isEmpty() && this.state == State.INITIALIZED) {
            log.debug("stage id: %s, node: %s; transitioning to SPLITS_ADDED", new Object[]{this.stageExecution.getStageId(), this.partitionedNode});
            this.state = State.SPLITS_ADDED;
        }
        if (blockedFuture.isEmpty() && !this.pendingSplits.isEmpty()) {
            if (!this.placementFuture.isDone()) {
                blockedFuture = Optional.of(this.placementFuture);
                blockedOnPlacements = true;
            } else {
                SplitPlacementResult splitPlacementResult = this.splitPlacementPolicy.computeAssignments(this.pendingSplits);
                splitAssignment = splitPlacementResult.getAssignments();
                splitAssignment.values().forEach(this.pendingSplits::remove);
                overallSplitAssignmentCount += splitAssignment.size();
                if (!this.pendingSplits.isEmpty()) {
                    this.placementFuture = splitPlacementResult.getBlocked();
                    blockedFuture = Optional.of(this.placementFuture);
                    blockedOnPlacements = true;
                }
            }
        }
        if (blockedOnPlacements) {
            log.debug("stage id: %s, node: %s; blocked on placements", new Object[]{this.stageExecution.getStageId(), this.partitionedNode});
        }
        overallNewTasks.addAll(this.assignSplits(splitAssignment));
        if (this.pendingSplits.isEmpty() && this.state == State.SPLITS_SCHEDULED) {
            log.debug("stage id: %s, node: %s; transitioning to FINISHED", new Object[]{this.stageExecution.getStageId(), this.partitionedNode});
            this.state = State.FINISHED;
            Optional<List<Object>> tableExecuteSplitsInfo = this.splitSource.getTableExecuteSplitsInfo();
            tableExecuteSplitsInfo.ifPresent(info -> {
                TableExecuteContext tableExecuteContext = this.tableExecuteContextManager.getTableExecuteContextForQuery(this.stageExecution.getStageId().getQueryId());
                tableExecuteContext.setSplitsInfo((List<Object>)info);
            });
            this.splitSource.close();
            return new ScheduleResult(true, (Iterable<? extends RemoteTask>)overallNewTasks.build(), overallSplitAssignmentCount);
        }
        if (blockedFuture.isEmpty()) {
            log.debug("stage id: %s, node: %s; assigned %s splits (not blocked)", new Object[]{this.stageExecution.getStageId(), this.partitionedNode, overallSplitAssignmentCount});
            return new ScheduleResult(false, (Iterable<? extends RemoteTask>)overallNewTasks.build(), overallSplitAssignmentCount);
        }
        if (this.anySourceTaskBlocked.getAsBoolean()) {
            log.debug("stage id: %s, node: %s; unblocking dynamic filters", new Object[]{this.stageExecution.getStageId(), this.partitionedNode});
            this.dynamicFilterService.unblockStageDynamicFilters(this.stageExecution.getStageId().getQueryId(), this.stageExecution.getAttemptId(), this.stageExecution.getFragment());
            if (blockedOnPlacements) {
                log.debug("stage id: %s, node: %s; finalize task creation if necessary", new Object[]{this.stageExecution.getStageId(), this.partitionedNode});
                overallNewTasks.addAll(this.finalizeTaskCreationIfNecessary());
            }
        }
        ScheduleResult.BlockedReason blockedReason = blockedOnNextSplitBatch ? ScheduleResult.BlockedReason.WAITING_FOR_SOURCE : ScheduleResult.BlockedReason.SPLIT_QUEUES_FULL;
        log.debug("stage id: %s, node: %s; assigned %s splits (blocked reason %s)", new Object[]{this.stageExecution.getStageId(), this.partitionedNode, overallSplitAssignmentCount, blockedReason});
        return new ScheduleResult(false, (Iterable<? extends RemoteTask>)overallNewTasks.build(), (ListenableFuture<Void>)Futures.nonCancellationPropagating((ListenableFuture)((ListenableFuture)blockedFuture.get())), blockedReason, overallSplitAssignmentCount);
    }

    private static <T> ListenableFuture<Void> asVoid(ListenableFuture<T> future) {
        return Futures.transform(future, v -> null, (Executor)MoreExecutors.directExecutor());
    }

    @Override
    public void close() {
        this.splitSource.close();
    }

    private Set<RemoteTask> assignSplits(Multimap<InternalNode, Split> splitAssignment) {
        ImmutableSet.Builder newTasks = ImmutableSet.builder();
        ImmutableSet nodes = ImmutableSet.copyOf((Collection)splitAssignment.keySet());
        for (InternalNode node : nodes) {
            ImmutableMultimap splits = ImmutableMultimap.builder().putAll((Object)this.partitionedNode, (Iterable)splitAssignment.get((Object)node)).build();
            RemoteTask task = this.scheduledTasks.get(node);
            if (task != null) {
                task.addSplits((Multimap<PlanNodeId, Split>)splits);
                continue;
            }
            this.scheduleTask(node, (Multimap<PlanNodeId, Split>)splits).ifPresent(arg_0 -> ((ImmutableSet.Builder)newTasks).add(arg_0));
        }
        return newTasks.build();
    }

    private void createTaskOnRandomNode() {
        Preconditions.checkState((boolean)this.scheduledTasks.isEmpty(), (Object)"Stage task is already scheduled on node");
        List<InternalNode> allNodes = this.splitPlacementPolicy.allNodes();
        Preconditions.checkState((allNodes.size() > 0 ? 1 : 0) != 0, (Object)"No nodes available");
        InternalNode node = allNodes.get(ThreadLocalRandom.current().nextInt(0, allNodes.size()));
        this.scheduleTask(node, (Multimap<PlanNodeId, Split>)ImmutableMultimap.of());
    }

    private Set<RemoteTask> finalizeTaskCreationIfNecessary() {
        if (this.stageExecution.getFragment().isLeaf()) {
            return ImmutableSet.of();
        }
        this.splitPlacementPolicy.lockDownNodes();
        Set newTasks = (Set)this.splitPlacementPolicy.allNodes().stream().filter(node -> !this.scheduledTasks.containsKey(node)).map(node -> this.scheduleTask((InternalNode)node, (Multimap<PlanNodeId, Split>)ImmutableMultimap.of())).filter(Optional::isPresent).map(Optional::get).collect(ImmutableSet.toImmutableSet());
        this.stageExecution.transitionToSchedulingSplits();
        return newTasks;
    }

    private Optional<RemoteTask> scheduleTask(InternalNode node, Multimap<PlanNodeId, Split> initialSplits) {
        Optional<RemoteTask> remoteTask = this.stageExecution.scheduleTask(node, this.partitionIdAllocator.getNextId(), initialSplits);
        remoteTask.ifPresent(task -> this.scheduledTasks.put(node, (RemoteTask)task));
        return remoteTask;
    }

    private static enum State {
        INITIALIZED,
        SPLITS_ADDED,
        SPLITS_SCHEDULED,
        FINISHED;

    }
}

