/*
 * Decompiled with CFR 0.152.
 */
package io.trino.execution.executor.dedicated;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.base.Ticker;
import com.google.common.collect.ImmutableSet;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.errorprone.annotations.ThreadSafe;
import com.google.errorprone.annotations.concurrent.GuardedBy;
import com.google.inject.Inject;
import io.airlift.units.Duration;
import io.opentelemetry.api.trace.Tracer;
import io.trino.execution.SplitRunner;
import io.trino.execution.TaskId;
import io.trino.execution.TaskManagerConfig;
import io.trino.execution.executor.RunningSplitInfo;
import io.trino.execution.executor.TaskExecutor;
import io.trino.execution.executor.TaskHandle;
import io.trino.execution.executor.dedicated.SplitProcessor;
import io.trino.execution.executor.scheduler.FairScheduler;
import io.trino.execution.executor.scheduler.Group;
import io.trino.execution.executor.scheduler.Schedulable;
import io.trino.execution.executor.scheduler.SchedulerContext;
import io.trino.spi.VersionEmbedder;
import jakarta.annotation.PostConstruct;
import jakarta.annotation.PreDestroy;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Objects;
import java.util.OptionalInt;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.DoubleSupplier;
import java.util.function.Predicate;

@ThreadSafe
public class ThreadPerDriverTaskExecutor
implements TaskExecutor {
    private final FairScheduler scheduler;
    private final Tracer tracer;
    private final VersionEmbedder versionEmbedder;
    private volatile boolean closed;

    @Inject
    public ThreadPerDriverTaskExecutor(TaskManagerConfig config, Tracer tracer, VersionEmbedder versionEmbedder) {
        this(tracer, versionEmbedder, new FairScheduler(config.getMaxWorkerThreads(), "SplitRunner-%d", Ticker.systemTicker()));
    }

    @VisibleForTesting
    public ThreadPerDriverTaskExecutor(Tracer tracer, VersionEmbedder versionEmbedder, FairScheduler scheduler) {
        this.scheduler = scheduler;
        this.tracer = Objects.requireNonNull(tracer, "tracer is null");
        this.versionEmbedder = Objects.requireNonNull(versionEmbedder, "versionEmbedder is null");
    }

    @Override
    @PostConstruct
    public synchronized void start() {
        this.scheduler.start();
    }

    @Override
    @PreDestroy
    public synchronized void stop() {
        this.closed = true;
        this.scheduler.close();
    }

    @Override
    public synchronized TaskHandle addTask(TaskId taskId, DoubleSupplier utilizationSupplier, int initialSplitConcurrency, Duration splitConcurrencyAdjustFrequency, OptionalInt maxDriversPerTask) {
        Preconditions.checkArgument((!this.closed ? 1 : 0) != 0, (Object)"Executor is already closed");
        Group group = this.scheduler.createGroup(taskId.toString());
        return new TaskEntry(taskId, group);
    }

    @Override
    public synchronized void removeTask(TaskHandle handle) {
        TaskEntry entry = (TaskEntry)handle;
        if (!entry.isDestroyed()) {
            this.scheduler.removeGroup(entry.group());
            entry.destroy();
        }
    }

    @Override
    public synchronized List<ListenableFuture<Void>> enqueueSplits(TaskHandle handle, boolean intermediate, List<? extends SplitRunner> splits) {
        Preconditions.checkArgument((!this.closed ? 1 : 0) != 0, (Object)"Executor is already closed");
        TaskEntry entry = (TaskEntry)handle;
        ArrayList<ListenableFuture<Void>> futures = new ArrayList<ListenableFuture<Void>>();
        for (SplitRunner splitRunner : splits) {
            entry.addSplit(splitRunner);
            int splitId = entry.nextSplitId();
            ListenableFuture<Void> done = this.scheduler.submit(entry.group(), splitId, new VersionEmbedderBridge(this.versionEmbedder, new SplitProcessor(entry.taskId(), splitId, splitRunner, this.tracer)));
            done.addListener(splitRunner::close, MoreExecutors.directExecutor());
            futures.add(done);
        }
        return futures;
    }

    @Override
    public Set<TaskId> getStuckSplitTaskIds(Duration processingDurationThreshold, Predicate<RunningSplitInfo> filter) {
        return ImmutableSet.of();
    }

    private static class TaskEntry
    implements TaskHandle {
        private final TaskId taskId;
        private final Group group;
        private final AtomicInteger nextSplitId = new AtomicInteger();
        private volatile boolean destroyed;
        @GuardedBy(value="this")
        private Set<SplitRunner> splits = new HashSet<SplitRunner>();

        public TaskEntry(TaskId taskId, Group group) {
            this.taskId = taskId;
            this.group = group;
        }

        public TaskId taskId() {
            return this.taskId;
        }

        public Group group() {
            return this.group;
        }

        public synchronized void destroy() {
            this.destroyed = true;
            for (SplitRunner split : this.splits) {
                split.close();
            }
        }

        public synchronized void addSplit(SplitRunner split) {
            Preconditions.checkArgument((!this.destroyed ? 1 : 0) != 0, (String)"Task already destroyed: %s", (Object)this.taskId);
            this.splits.add(split);
        }

        public int nextSplitId() {
            return this.nextSplitId.incrementAndGet();
        }

        @Override
        public boolean isDestroyed() {
            return this.destroyed;
        }
    }

    private record VersionEmbedderBridge(VersionEmbedder versionEmbedder, Schedulable delegate) implements Schedulable
    {
        @Override
        public void run(SchedulerContext context) {
            Runnable adapter = () -> this.delegate.run(context);
            this.versionEmbedder.embedVersion(adapter).run();
        }
    }
}

