/*
 * Decompiled with CFR 0.152.
 */
package io.trino.execution.executor.dedicated;

import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.SettableFuture;
import com.google.errorprone.annotations.concurrent.GuardedBy;
import io.opentelemetry.api.trace.Tracer;
import io.trino.execution.SplitRunner;
import io.trino.execution.TaskId;
import io.trino.execution.executor.TaskHandle;
import io.trino.execution.executor.dedicated.ConcurrencyController;
import io.trino.execution.executor.dedicated.SplitProcessor;
import io.trino.execution.executor.scheduler.FairScheduler;
import io.trino.execution.executor.scheduler.Group;
import io.trino.execution.executor.scheduler.Schedulable;
import io.trino.execution.executor.scheduler.SchedulerContext;
import io.trino.spi.VersionEmbedder;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.Objects;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.DoubleSupplier;

class TaskEntry
implements TaskHandle {
    private final TaskId taskId;
    private final Group group;
    private final FairScheduler scheduler;
    private final VersionEmbedder versionEmbedder;
    private final Tracer tracer;
    private final DoubleSupplier utilization;
    private final AtomicInteger nextSplitId = new AtomicInteger();
    @GuardedBy(value="this")
    private final ConcurrencyController concurrency;
    private volatile boolean destroyed;
    @GuardedBy(value="this")
    private int runningLeafSplits;
    @GuardedBy(value="this")
    private final Queue<QueuedSplit> pending = new LinkedList<QueuedSplit>();
    @GuardedBy(value="this")
    private final Set<SplitRunner> running = new HashSet<SplitRunner>();

    public TaskEntry(TaskId taskId, FairScheduler scheduler, VersionEmbedder versionEmbedder, Tracer tracer, int initialConcurrency, DoubleSupplier utilization) {
        this.taskId = Objects.requireNonNull(taskId, "taskId is null");
        this.scheduler = Objects.requireNonNull(scheduler, "scheduler is null");
        this.versionEmbedder = Objects.requireNonNull(versionEmbedder, "versionEmbedder is null");
        this.tracer = Objects.requireNonNull(tracer, "tracer is null");
        this.utilization = Objects.requireNonNull(utilization, "utilization is null");
        this.group = scheduler.createGroup(taskId.toString());
        this.concurrency = new ConcurrencyController(initialConcurrency);
    }

    public TaskId taskId() {
        return this.taskId;
    }

    public synchronized void destroy() {
        this.scheduler.removeGroup(this.group);
        this.destroyed = true;
        for (SplitRunner splitRunner : this.running) {
            splitRunner.close();
        }
        this.running.clear();
        for (QueuedSplit queuedSplit : this.pending) {
            queuedSplit.split().close();
            queuedSplit.done.set(null);
        }
        this.pending.clear();
    }

    public synchronized ListenableFuture<Void> enqueueLeafSplit(SplitRunner split) {
        SettableFuture done = SettableFuture.create();
        this.pending.add(new QueuedSplit(split, (SettableFuture<Void>)done));
        return done;
    }

    public synchronized boolean dequeueAndRunLeafSplit(Runnable doneCallback) {
        QueuedSplit split = this.pending.poll();
        if (split == null) {
            return false;
        }
        this.runSplit(split.split()).addListener(() -> {
            this.leafSplitDone(split);
            doneCallback.run();
        }, MoreExecutors.directExecutor());
        ++this.runningLeafSplits;
        return true;
    }

    private synchronized void leafSplitDone(QueuedSplit split) {
        --this.runningLeafSplits;
        split.done().set(null);
    }

    public synchronized ListenableFuture<Void> runSplit(SplitRunner split) {
        int splitId = this.nextSplitId();
        ListenableFuture<Void> done = this.scheduler.submit(this.group, splitId, new VersionEmbedderBridge(this.versionEmbedder, new SplitProcessor(this.taskId, splitId, split, this.tracer)));
        done.addListener(() -> this.splitDone(split), MoreExecutors.directExecutor());
        this.running.add(split);
        return done;
    }

    private synchronized void splitDone(SplitRunner split) {
        split.close();
        this.running.remove(split);
    }

    private int nextSplitId() {
        return this.nextSplitId.incrementAndGet();
    }

    public synchronized int runningLeafSplits() {
        return this.runningLeafSplits;
    }

    @Override
    public boolean isDestroyed() {
        return this.destroyed;
    }

    public synchronized void updateConcurrency() {
        this.concurrency.update(this.utilization.getAsDouble(), this.runningLeafSplits);
    }

    public synchronized boolean hasPendingLeafSplits() {
        return !this.pending.isEmpty();
    }

    public synchronized int targetConcurrency() {
        return this.concurrency.targetConcurrency();
    }

    private record QueuedSplit(SplitRunner split, SettableFuture<Void> done) {
    }

    private record VersionEmbedderBridge(VersionEmbedder versionEmbedder, Schedulable delegate) implements Schedulable
    {
        @Override
        public void run(SchedulerContext context) {
            Runnable adapter = () -> this.delegate.run(context);
            this.versionEmbedder.embedVersion(adapter).run();
        }
    }
}

