/*
 * Decompiled with CFR 0.152.
 */
package io.trino.execution.executor.dedicated;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.base.Ticker;
import com.google.common.collect.ImmutableSet;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.errorprone.annotations.ThreadSafe;
import com.google.errorprone.annotations.concurrent.GuardedBy;
import com.google.inject.Inject;
import io.airlift.units.Duration;
import io.opentelemetry.api.trace.Tracer;
import io.trino.execution.SplitRunner;
import io.trino.execution.TaskId;
import io.trino.execution.TaskManagerConfig;
import io.trino.execution.executor.RunningSplitInfo;
import io.trino.execution.executor.TaskExecutor;
import io.trino.execution.executor.TaskHandle;
import io.trino.execution.executor.dedicated.TaskEntry;
import io.trino.execution.executor.scheduler.FairScheduler;
import io.trino.spi.VersionEmbedder;
import jakarta.annotation.PostConstruct;
import jakarta.annotation.PreDestroy;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.OptionalInt;
import java.util.Set;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.function.DoubleSupplier;
import java.util.function.Predicate;

@ThreadSafe
public class ThreadPerDriverTaskExecutor
implements TaskExecutor {
    private final FairScheduler scheduler;
    private final Tracer tracer;
    private final VersionEmbedder versionEmbedder;
    private final ScheduledThreadPoolExecutor backgroundTasks = new ScheduledThreadPoolExecutor(2);
    @GuardedBy(value="this")
    private final Map<TaskId, TaskEntry> tasks = new HashMap<TaskId, TaskEntry>();
    @GuardedBy(value="this")
    private boolean closed;

    @Inject
    public ThreadPerDriverTaskExecutor(TaskManagerConfig config, Tracer tracer, VersionEmbedder versionEmbedder) {
        this(tracer, versionEmbedder, new FairScheduler(config.getMaxWorkerThreads(), "SplitRunner-%d", Ticker.systemTicker()));
    }

    @VisibleForTesting
    public ThreadPerDriverTaskExecutor(Tracer tracer, VersionEmbedder versionEmbedder, FairScheduler scheduler) {
        this.scheduler = scheduler;
        this.tracer = Objects.requireNonNull(tracer, "tracer is null");
        this.versionEmbedder = Objects.requireNonNull(versionEmbedder, "versionEmbedder is null");
    }

    @Override
    @PostConstruct
    public synchronized void start() {
        this.scheduler.start();
        this.backgroundTasks.scheduleWithFixedDelay(this::scheduleMoreLeafSplits, 0L, 100L, TimeUnit.MILLISECONDS);
        this.backgroundTasks.scheduleWithFixedDelay(this::adjustConcurrency, 0L, 10L, TimeUnit.MILLISECONDS);
    }

    @Override
    @PreDestroy
    public synchronized void stop() {
        this.closed = true;
        this.tasks.values().forEach(TaskEntry::destroy);
        this.backgroundTasks.shutdownNow();
        this.scheduler.close();
    }

    @Override
    public synchronized TaskHandle addTask(TaskId taskId, DoubleSupplier utilizationSupplier, int initialSplitConcurrency, Duration splitConcurrencyAdjustFrequency, OptionalInt maxDriversPerTask) {
        Preconditions.checkArgument((!this.closed ? 1 : 0) != 0, (Object)"Executor is already closed");
        TaskEntry task = new TaskEntry(taskId, this.scheduler, this.versionEmbedder, this.tracer, initialSplitConcurrency, utilizationSupplier);
        this.tasks.put(taskId, task);
        return task;
    }

    @Override
    public synchronized void removeTask(TaskHandle handle) {
        TaskEntry entry = (TaskEntry)handle;
        this.tasks.remove(entry.taskId());
        if (!entry.isDestroyed()) {
            entry.destroy();
        }
    }

    @Override
    public synchronized List<ListenableFuture<Void>> enqueueSplits(TaskHandle handle, boolean intermediate, List<? extends SplitRunner> splits) {
        Preconditions.checkArgument((!this.closed ? 1 : 0) != 0, (Object)"Executor is already closed");
        TaskEntry entry = (TaskEntry)handle;
        ArrayList<ListenableFuture<Void>> futures = new ArrayList<ListenableFuture<Void>>();
        for (SplitRunner splitRunner : splits) {
            futures.add(entry.addSplit(splitRunner, intermediate));
        }
        return futures;
    }

    private synchronized void scheduleMoreLeafSplits() {
        for (TaskEntry task : this.tasks.values()) {
            task.scheduleMoreLeafSplits();
        }
    }

    private void adjustConcurrency() {
        for (TaskEntry task : this.tasks.values()) {
            task.updateConcurrency();
        }
    }

    @Override
    public Set<TaskId> getStuckSplitTaskIds(Duration processingDurationThreshold, Predicate<RunningSplitInfo> filter) {
        return ImmutableSet.of();
    }
}

