/*
 * Decompiled with CFR 0.152.
 */
package io.trino.execution.executor.dedicated;

import com.google.common.base.Ticker;
import com.google.common.collect.ImmutableList;
import com.google.common.util.concurrent.AbstractFuture;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import io.airlift.testing.TestingTicker;
import io.airlift.tracing.Tracing;
import io.airlift.units.Duration;
import io.opentelemetry.api.trace.Span;
import io.trino.execution.SplitRunner;
import io.trino.execution.StageId;
import io.trino.execution.TaskId;
import io.trino.execution.TaskManagerConfig;
import io.trino.execution.executor.TaskHandle;
import io.trino.execution.executor.dedicated.ThreadPerDriverTaskExecutor;
import io.trino.execution.executor.scheduler.FairScheduler;
import io.trino.spi.VersionEmbedder;
import io.trino.util.EmbedVersion;
import java.util.List;
import java.util.OptionalInt;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.Phaser;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;

public class TestThreadPerDriverTaskExecutor {
    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    @Timeout(value=10L)
    public void testCancellationWhileProcessing() throws ExecutionException, InterruptedException {
        ThreadPerDriverTaskExecutor executor = new ThreadPerDriverTaskExecutor(new TaskManagerConfig(), Tracing.noopTracer(), (VersionEmbedder)EmbedVersion.testingVersionEmbedder());
        executor.start();
        try {
            TaskId taskId = new TaskId(new StageId("query", 1), 1, 1);
            TaskHandle task = executor.addTask(taskId, () -> 0.0, 10, new Duration(1.0, TimeUnit.MILLISECONDS), OptionalInt.empty());
            CountDownLatch started = new CountDownLatch(1);
            TestingSplitRunner split = new TestingSplitRunner((List<Function<Duration, ListenableFuture<Void>>>)ImmutableList.of(duration -> {
                started.countDown();
                try {
                    Thread.currentThread().join();
                }
                catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
                return Futures.immediateVoidFuture();
            }));
            ListenableFuture splitDone = (ListenableFuture)executor.enqueueSplits(task, false, (List)ImmutableList.of((Object)split)).get(0);
            started.await();
            executor.removeTask(task);
            splitDone.get();
            Assertions.assertThat((boolean)split.isFinished()).isTrue();
        }
        finally {
            executor.stop();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    @Timeout(value=10L)
    public void testBlocking() throws ExecutionException, InterruptedException {
        ThreadPerDriverTaskExecutor executor = new ThreadPerDriverTaskExecutor(new TaskManagerConfig(), Tracing.noopTracer(), (VersionEmbedder)EmbedVersion.testingVersionEmbedder());
        executor.start();
        try {
            TaskId taskId = new TaskId(new StageId("query", 1), 1, 1);
            TaskHandle task = executor.addTask(taskId, () -> 0.0, 10, new Duration(1.0, TimeUnit.MILLISECONDS), OptionalInt.empty());
            TestFuture blocked = new TestFuture();
            TestingSplitRunner split = new TestingSplitRunner((List<Function<Duration, ListenableFuture<Void>>>)ImmutableList.of(duration -> blocked, duration -> Futures.immediateVoidFuture()));
            ListenableFuture splitDone = (ListenableFuture)executor.enqueueSplits(task, false, (List)ImmutableList.of((Object)split)).get(0);
            blocked.awaitListenerAdded();
            blocked.set(null);
            splitDone.get();
            Assertions.assertThat((boolean)split.isFinished()).isTrue();
        }
        finally {
            executor.stop();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    @Timeout(value=10L)
    public void testYielding() throws ExecutionException, InterruptedException {
        TestingTicker ticker = new TestingTicker();
        FairScheduler scheduler = new FairScheduler(1, "Runner-%d", (Ticker)ticker);
        ThreadPerDriverTaskExecutor executor = new ThreadPerDriverTaskExecutor(Tracing.noopTracer(), (VersionEmbedder)EmbedVersion.testingVersionEmbedder(), scheduler);
        executor.start();
        try {
            TaskId taskId = new TaskId(new StageId("query", 1), 1, 1);
            TaskHandle task = executor.addTask(taskId, () -> 0.0, 10, new Duration(1.0, TimeUnit.MILLISECONDS), OptionalInt.empty());
            Phaser phaser = new Phaser(2);
            TestingSplitRunner split = new TestingSplitRunner((List<Function<Duration, ListenableFuture<Void>>>)ImmutableList.of(duration -> {
                phaser.arriveAndAwaitAdvance();
                phaser.arriveAndAwaitAdvance();
                return Futures.immediateVoidFuture();
            }, duration -> {
                phaser.arriveAndAwaitAdvance();
                return Futures.immediateVoidFuture();
            }));
            ListenableFuture splitDone = (ListenableFuture)executor.enqueueSplits(task, false, (List)ImmutableList.of((Object)split)).get(0);
            phaser.arriveAndAwaitAdvance();
            ticker.increment(FairScheduler.QUANTUM_NANOS * 2L, TimeUnit.NANOSECONDS);
            phaser.arriveAndAwaitAdvance();
            Assertions.assertThat((int)phaser.arriveAndAwaitAdvance()).isEqualTo(3);
            splitDone.get();
            Assertions.assertThat((boolean)split.isFinished()).isTrue();
        }
        finally {
            executor.stop();
        }
    }

    private static class TestingSplitRunner
    implements SplitRunner {
        private final List<Function<Duration, ListenableFuture<Void>>> invocations;
        private int invocation;
        private volatile boolean finished;
        private volatile Thread runnerThread;

        public TestingSplitRunner(List<Function<Duration, ListenableFuture<Void>>> invocations) {
            this.invocations = invocations;
        }

        public final int getPipelineId() {
            return 0;
        }

        public final Span getPipelineSpan() {
            return Span.getInvalid();
        }

        public final boolean isFinished() {
            return this.finished;
        }

        public final ListenableFuture<Void> processFor(Duration duration) {
            ListenableFuture<Void> blocked;
            this.runnerThread = Thread.currentThread();
            try {
                blocked = this.invocations.get(this.invocation).apply(duration);
            }
            finally {
                this.runnerThread = null;
            }
            ++this.invocation;
            if (this.invocation == this.invocations.size()) {
                this.finished = true;
            }
            return blocked;
        }

        public final String getInfo() {
            return "";
        }

        public final void close() {
            this.finished = true;
            Thread runnerThread = this.runnerThread;
            if (runnerThread != null) {
                runnerThread.interrupt();
            }
        }
    }

    private static class TestFuture
    extends AbstractFuture<Void> {
        private final CountDownLatch listenerAdded = new CountDownLatch(1);

        private TestFuture() {
        }

        public void addListener(Runnable listener, Executor executor) {
            super.addListener(listener, executor);
            this.listenerAdded.countDown();
        }

        public boolean set(Void value) {
            return super.set((Object)value);
        }

        public void awaitListenerAdded() throws InterruptedException {
            this.listenerAdded.await();
        }
    }
}

