/*
 * Decompiled with CFR 0.152.
 */
package io.trino.execution.scheduler;

import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMultimap;
import com.google.common.collect.Multimap;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture;
import io.airlift.units.DataSize;
import io.trino.execution.RemoteTask;
import io.trino.execution.TaskStatus;
import io.trino.execution.buffer.OutputBufferStatus;
import io.trino.execution.scheduler.NodeSelector;
import io.trino.execution.scheduler.ScheduleResult;
import io.trino.execution.scheduler.StageExecution;
import io.trino.execution.scheduler.StageScheduler;
import io.trino.metadata.InternalNode;
import io.trino.metadata.Split;
import io.trino.spi.ErrorCodeSupplier;
import io.trino.spi.StandardErrorCode;
import io.trino.sql.planner.plan.PlanNodeId;
import io.trino.util.Failures;
import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Supplier;

public class ScaledWriterScheduler
implements StageScheduler {
    private static final double BUFFER_FULL_THRESHOLD = 0.5;
    private final StageExecution stage;
    private final Supplier<Collection<TaskStatus>> sourceTasksProvider;
    private final Supplier<Collection<TaskStatus>> writerTasksProvider;
    private final NodeSelector nodeSelector;
    private final ScheduledExecutorService executor;
    private final long writerScalingMinDataProcessed;
    private final Set<InternalNode> scheduledNodes = new HashSet<InternalNode>();
    private final AtomicBoolean done = new AtomicBoolean();
    private final int maxWriterNodeCount;
    private volatile SettableFuture<Void> future = SettableFuture.create();

    public ScaledWriterScheduler(StageExecution stage, Supplier<Collection<TaskStatus>> sourceTasksProvider, Supplier<Collection<TaskStatus>> writerTasksProvider, NodeSelector nodeSelector, ScheduledExecutorService executor, DataSize writerScalingMinDataProcessed, int maxWriterNodeCount) {
        this.stage = Objects.requireNonNull(stage, "stage is null");
        this.sourceTasksProvider = Objects.requireNonNull(sourceTasksProvider, "sourceTasksProvider is null");
        this.writerTasksProvider = Objects.requireNonNull(writerTasksProvider, "writerTasksProvider is null");
        this.nodeSelector = Objects.requireNonNull(nodeSelector, "nodeSelector is null");
        this.executor = Objects.requireNonNull(executor, "executor is null");
        this.writerScalingMinDataProcessed = writerScalingMinDataProcessed.toBytes();
        this.maxWriterNodeCount = maxWriterNodeCount;
    }

    public void finish() {
        this.done.set(true);
        this.future.set(null);
    }

    @Override
    public ScheduleResult schedule() {
        List<RemoteTask> writers = this.scheduleTasks(this.getNewTaskCount());
        this.future.set(null);
        this.future = SettableFuture.create();
        this.executor.schedule(() -> this.future.set(null), 200L, TimeUnit.MILLISECONDS);
        return new ScheduleResult(this.done.get(), (Iterable<? extends RemoteTask>)writers, (ListenableFuture<Void>)this.future, ScheduleResult.BlockedReason.WRITER_SCALING, 0);
    }

    private int getNewTaskCount() {
        if (this.scheduledNodes.isEmpty()) {
            return 1;
        }
        Collection<TaskStatus> writerTasks = this.writerTasksProvider.get();
        if (writerTasks.size() != this.scheduledNodes.size() || writerTasks.stream().map(TaskStatus::getMaxWriterCount).anyMatch(Optional::isEmpty)) {
            return 0;
        }
        if (this.isSourceTasksBufferFull() && this.isWriteThroughputSufficient() && this.scheduledNodes.size() < this.maxWriterNodeCount) {
            return 1;
        }
        return 0;
    }

    private boolean isSourceTasksBufferFull() {
        return this.isAverageBufferFull() || this.isWeightedBufferFull();
    }

    private boolean isWriteThroughputSufficient() {
        long minWriterInputBytesToScaleUp;
        Collection<TaskStatus> writerTasks = this.writerTasksProvider.get();
        long writerInputBytes = writerTasks.stream().map(TaskStatus::getWriterInputDataSize).mapToLong(DataSize::toBytes).sum();
        return writerInputBytes >= (minWriterInputBytesToScaleUp = writerTasks.stream().map(TaskStatus::getMaxWriterCount).map(Optional::get).mapToLong(writerCount -> this.writerScalingMinDataProcessed * (long)writerCount.intValue()).sum());
    }

    private boolean isWeightedBufferFull() {
        double totalOutputSize = 0.0;
        double overutilizedOutputSize = 0.0;
        for (TaskStatus task : this.sourceTasksProvider.get()) {
            if (task.getState().isTerminatingOrDone()) continue;
            long outputDataSize = task.getOutputDataSize().toBytes();
            totalOutputSize += (double)outputDataSize;
            if (!task.getOutputBufferStatus().isOverutilized()) continue;
            overutilizedOutputSize += (double)outputDataSize;
        }
        return totalOutputSize > 0.0 && overutilizedOutputSize / totalOutputSize >= 0.5;
    }

    private boolean isAverageBufferFull() {
        return this.sourceTasksProvider.get().stream().filter(task -> !task.getState().isTerminatingOrDone()).map(TaskStatus::getOutputBufferStatus).map(OutputBufferStatus::isOverutilized).mapToDouble(full -> full != false ? 1.0 : 0.0).average().orElse(0.0) >= 0.5;
    }

    private List<RemoteTask> scheduleTasks(int count) {
        if (count == 0) {
            return ImmutableList.of();
        }
        List<InternalNode> nodes = this.nodeSelector.selectRandomNodes(count, this.scheduledNodes);
        Failures.checkCondition(!this.scheduledNodes.isEmpty() || !nodes.isEmpty(), (ErrorCodeSupplier)StandardErrorCode.NO_NODES_AVAILABLE, "No nodes available to run query", new Object[0]);
        ImmutableList.Builder tasks = ImmutableList.builder();
        for (InternalNode node : nodes) {
            Optional<RemoteTask> remoteTask = this.stage.scheduleTask(node, this.scheduledNodes.size(), (Multimap<PlanNodeId, Split>)ImmutableMultimap.of());
            remoteTask.ifPresent(task -> {
                tasks.add(task);
                this.scheduledNodes.add(node);
            });
        }
        return tasks.build();
    }
}

