/*
 * Decompiled with CFR 0.152.
 */
package io.trino.execution.scheduler;

import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMultimap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Multimap;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import io.airlift.log.Logger;
import io.trino.execution.RemoteTask;
import io.trino.execution.TableExecuteContextManager;
import io.trino.execution.scheduler.PartitionIdAllocator;
import io.trino.execution.scheduler.ScheduleResult;
import io.trino.execution.scheduler.SourcePartitionedScheduler;
import io.trino.execution.scheduler.SourceScheduler;
import io.trino.execution.scheduler.SplitPlacementPolicy;
import io.trino.execution.scheduler.StageExecution;
import io.trino.execution.scheduler.StageScheduler;
import io.trino.metadata.InternalNode;
import io.trino.metadata.Split;
import io.trino.server.DynamicFilterService;
import io.trino.split.SplitSource;
import io.trino.sql.planner.plan.PlanNodeId;
import java.util.ArrayDeque;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Queue;
import java.util.concurrent.ThreadLocalRandom;
import java.util.function.BooleanSupplier;
import javax.annotation.concurrent.NotThreadSafe;

@NotThreadSafe
public class MultiSourcePartitionedScheduler
implements StageScheduler {
    private static final Logger log = Logger.get(MultiSourcePartitionedScheduler.class);
    private final StageExecution stageExecution;
    private final Queue<SourceScheduler> sourceSchedulers;
    private final Map<InternalNode, RemoteTask> scheduledTasks = new HashMap<InternalNode, RemoteTask>();
    private final DynamicFilterService dynamicFilterService;
    private final SplitPlacementPolicy splitPlacementPolicy;
    private final PartitionIdAllocator partitionIdAllocator = new PartitionIdAllocator();

    public MultiSourcePartitionedScheduler(StageExecution stageExecution, Map<PlanNodeId, SplitSource> splitSources, SplitPlacementPolicy splitPlacementPolicy, int splitBatchSize, DynamicFilterService dynamicFilterService, TableExecuteContextManager tableExecuteContextManager, BooleanSupplier anySourceTaskBlocked) {
        Objects.requireNonNull(splitSources, "splitSources is null");
        Preconditions.checkArgument((splitSources.size() > 1 ? 1 : 0) != 0, (Object)"It is expected that there will be more than one split sources");
        ImmutableList.Builder sourceSchedulers = ImmutableList.builder();
        for (PlanNodeId planNodeId : splitSources.keySet()) {
            SplitSource splitSource = splitSources.get(planNodeId);
            SourceScheduler sourceScheduler = SourcePartitionedScheduler.newSourcePartitionedSchedulerAsSourceScheduler(stageExecution, planNodeId, splitSource, splitPlacementPolicy, splitBatchSize, dynamicFilterService, tableExecuteContextManager, anySourceTaskBlocked, this.partitionIdAllocator, this.scheduledTasks);
            sourceSchedulers.add((Object)sourceScheduler);
        }
        this.stageExecution = Objects.requireNonNull(stageExecution, "stageExecution is null");
        this.sourceSchedulers = new ArrayDeque<SourceScheduler>((Collection<SourceScheduler>)sourceSchedulers.build());
        this.dynamicFilterService = Objects.requireNonNull(dynamicFilterService, "dynamicFilterService is null");
        this.splitPlacementPolicy = Objects.requireNonNull(splitPlacementPolicy, "splitPlacementPolicy is null");
    }

    @Override
    public synchronized void start() {
        if (this.dynamicFilterService.isCollectingTaskNeeded(this.stageExecution.getStageId().getQueryId(), this.stageExecution.getFragment())) {
            this.stageExecution.beginScheduling();
            this.scheduleTaskOnRandomNode();
        }
    }

    @Override
    public synchronized ScheduleResult schedule() {
        ImmutableSet.Builder newScheduledTasks = ImmutableSet.builder();
        ListenableFuture<Void> blocked = Futures.immediateVoidFuture();
        Optional<Object> blockedReason = Optional.empty();
        int splitsScheduled = 0;
        while (!this.sourceSchedulers.isEmpty()) {
            SourceScheduler scheduler = this.sourceSchedulers.peek();
            ScheduleResult scheduleResult = scheduler.schedule();
            splitsScheduled += scheduleResult.getSplitsScheduled();
            newScheduledTasks.addAll(scheduleResult.getNewTasks());
            blocked = scheduleResult.getBlocked();
            blockedReason = scheduleResult.getBlockedReason();
            if (!blocked.isDone() || !scheduleResult.isFinished()) break;
            this.stageExecution.schedulingComplete(scheduler.getPlanNodeId());
            this.sourceSchedulers.remove().close();
        }
        if (blockedReason.isPresent()) {
            return new ScheduleResult(this.sourceSchedulers.isEmpty(), (Iterable<? extends RemoteTask>)newScheduledTasks.build(), blocked, (ScheduleResult.BlockedReason)((Object)blockedReason.get()), splitsScheduled);
        }
        return new ScheduleResult(this.sourceSchedulers.isEmpty(), (Iterable<? extends RemoteTask>)newScheduledTasks.build(), splitsScheduled);
    }

    @Override
    public void close() {
        for (SourceScheduler sourceScheduler : this.sourceSchedulers) {
            try {
                sourceScheduler.close();
            }
            catch (Throwable t) {
                log.warn(t, "Error closing split source");
            }
        }
        this.sourceSchedulers.clear();
    }

    private void scheduleTaskOnRandomNode() {
        Preconditions.checkState((boolean)this.scheduledTasks.isEmpty(), (Object)"Stage task is already scheduled on node");
        List<InternalNode> allNodes = this.splitPlacementPolicy.allNodes();
        Preconditions.checkState((allNodes.size() > 0 ? 1 : 0) != 0, (Object)"No nodes available");
        InternalNode node = allNodes.get(ThreadLocalRandom.current().nextInt(0, allNodes.size()));
        Optional<RemoteTask> remoteTask = this.stageExecution.scheduleTask(node, this.partitionIdAllocator.getNextId(), (Multimap<PlanNodeId, Split>)ImmutableMultimap.of());
        remoteTask.ifPresent(task -> this.scheduledTasks.put(node, (RemoteTask)task));
    }
}

