/*
 * Decompiled with CFR 0.152.
 */
package io.trino.execution;

import com.google.common.base.Ticker;
import com.google.common.collect.ImmutableList;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture;
import com.google.errorprone.annotations.concurrent.GuardedBy;
import io.airlift.node.NodeInfo;
import io.airlift.stats.GcMonitor;
import io.airlift.stats.TestingGcMonitor;
import io.airlift.testing.TestingTicker;
import io.airlift.tracing.Tracing;
import io.airlift.units.Duration;
import io.opentelemetry.api.trace.Span;
import io.trino.Session;
import io.trino.connector.CatalogProperties;
import io.trino.connector.ConnectorServices;
import io.trino.connector.ConnectorServicesProvider;
import io.trino.exchange.ExchangeManagerRegistry;
import io.trino.execution.BaseTestSqlTaskManager;
import io.trino.execution.LocationFactory;
import io.trino.execution.SplitRunner;
import io.trino.execution.SqlTaskManager;
import io.trino.execution.StageId;
import io.trino.execution.TaskId;
import io.trino.execution.TaskInfo;
import io.trino.execution.TaskManagementExecutor;
import io.trino.execution.TaskManagerConfig;
import io.trino.execution.TaskState;
import io.trino.execution.TaskTestUtils;
import io.trino.execution.executor.TaskExecutor;
import io.trino.execution.executor.TaskHandle;
import io.trino.execution.executor.timesharing.TimeSharingTaskExecutor;
import io.trino.memory.LocalMemoryManager;
import io.trino.memory.NodeMemoryConfig;
import io.trino.metadata.LanguageFunctionProvider;
import io.trino.metadata.WorkerLanguageFunctionProvider;
import io.trino.spi.VersionEmbedder;
import io.trino.spi.connector.CatalogHandle;
import io.trino.spiller.LocalSpillManager;
import io.trino.spiller.NodeSpillConfig;
import io.trino.version.EmbedVersion;
import java.util.List;
import java.util.OptionalInt;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.Predicate;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;

public class TestTaskExecutorStuckSplits {
    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testFailStuckSplitTasks() throws InterruptedException, ExecutionException, TimeoutException {
        TestingTicker ticker = new TestingTicker();
        TaskManagementExecutor taskManagementExecutor = new TaskManagementExecutor();
        TaskId taskId = new TaskId(new StageId("query", 0), 1, 0);
        TimeSharingTaskExecutor taskExecutor = new TimeSharingTaskExecutor(4, 8, 3, 4, (Ticker)ticker);
        TaskHandle taskHandle = taskExecutor.addTask(taskId, () -> 1.0, 1, new Duration(1.0, TimeUnit.SECONDS), OptionalInt.of(1));
        MockSplitRunner mockSplitRunner = new MockSplitRunner();
        taskExecutor.enqueueSplits(taskHandle, false, (List)ImmutableList.of((Object)mockSplitRunner));
        taskExecutor.start();
        try {
            mockSplitRunner.waitForStart();
            TaskManagerConfig taskManagerConfig = new TaskManagerConfig().setInterruptStuckSplitTasksEnabled(true).setInterruptStuckSplitTasksDetectionInterval(new Duration(10.0, TimeUnit.SECONDS)).setInterruptStuckSplitTasksWarningThreshold(new Duration(10.0, TimeUnit.SECONDS)).setInterruptStuckSplitTasksTimeout(new Duration(10.0, TimeUnit.SECONDS));
            try (SqlTaskManager sqlTaskManager = this.createSqlTaskManager(taskManagerConfig, new NodeMemoryConfig(), (TaskExecutor)taskExecutor, taskManagementExecutor, stackTraceElements -> true);){
                sqlTaskManager.addStateChangeListener(taskId, arg_0 -> TestTaskExecutorStuckSplits.lambda$testFailStuckSplitTasks$2(taskHandle, (TaskExecutor)taskExecutor, arg_0));
                ticker.increment(30L, TimeUnit.SECONDS);
                sqlTaskManager.failStuckSplitTasks();
                mockSplitRunner.waitForFinish();
                List taskInfos = sqlTaskManager.getAllTaskInfo();
                Assertions.assertThat((int)taskInfos.size()).isEqualTo(1);
                TaskInfo taskInfo = TestTaskExecutorStuckSplits.pollTerminatingTaskInfoUntilDone(sqlTaskManager, (TaskInfo)taskInfos.get(0));
                Assertions.assertThat((Comparable)taskInfo.getTaskStatus().getState()).isEqualTo((Object)TaskState.FAILED);
            }
        }
        finally {
            taskExecutor.stop();
            taskManagementExecutor.close();
        }
    }

    private SqlTaskManager createSqlTaskManager(TaskManagerConfig taskManagerConfig, NodeMemoryConfig nodeMemoryConfig, TaskExecutor taskExecutor, TaskManagementExecutor taskManagementExecutor, Predicate<List<StackTraceElement>> stuckSplitStackTracePredicate) {
        return new SqlTaskManager((VersionEmbedder)new EmbedVersion("testversion"), (ConnectorServicesProvider)new NoConnectorServicesProvider(), TaskTestUtils.createTestingPlanner(), (LanguageFunctionProvider)new WorkerLanguageFunctionProvider(), (LocationFactory)new BaseTestSqlTaskManager.MockLocationFactory(), taskExecutor, TaskTestUtils.createTestSplitMonitor(), new NodeInfo("test"), new LocalMemoryManager(new NodeMemoryConfig()), taskManagementExecutor, taskManagerConfig, nodeMemoryConfig, new LocalSpillManager(new NodeSpillConfig()), new NodeSpillConfig(), (GcMonitor)new TestingGcMonitor(), Tracing.noopTracer(), new ExchangeManagerRegistry(), stuckSplitStackTracePredicate);
    }

    private static TaskInfo pollTerminatingTaskInfoUntilDone(SqlTaskManager taskManager, TaskInfo taskInfo) throws InterruptedException, ExecutionException, TimeoutException {
        Assertions.assertThat((boolean)taskInfo.getTaskStatus().getState().isTerminatingOrDone()).isTrue();
        for (int attempts = 3; attempts > 0 && taskInfo.getTaskStatus().getState().isTerminating(); --attempts) {
            taskInfo = (TaskInfo)taskManager.getTaskInfo(taskInfo.getTaskStatus().getTaskId(), taskInfo.getTaskStatus().getVersion()).get(5L, TimeUnit.SECONDS);
        }
        return taskInfo;
    }

    private static /* synthetic */ void lambda$testFailStuckSplitTasks$2(TaskHandle taskHandle, TaskExecutor taskExecutor, TaskState state) {
        if (state.isTerminatingOrDone() && !taskHandle.isDestroyed()) {
            taskExecutor.removeTask(taskHandle);
        }
    }

    private static class MockSplitRunner
    implements SplitRunner {
        private final SettableFuture<Void> startedFuture = SettableFuture.create();
        private final SettableFuture<Void> finishedFuture = SettableFuture.create();
        @GuardedBy(value="this")
        private Thread runnerThread;
        @GuardedBy(value="this")
        private boolean closed;

        private MockSplitRunner() {
        }

        public void waitForStart() throws ExecutionException, InterruptedException, TimeoutException {
            this.startedFuture.get(10L, TimeUnit.SECONDS);
        }

        public void waitForFinish() throws ExecutionException, InterruptedException, TimeoutException {
            this.finishedFuture.get(10L, TimeUnit.SECONDS);
        }

        public int getPipelineId() {
            return 0;
        }

        public Span getPipelineSpan() {
            return Span.getInvalid();
        }

        public synchronized boolean isFinished() {
            return this.closed;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public ListenableFuture<Void> processFor(Duration duration) {
            this.startedFuture.set(null);
            MockSplitRunner mockSplitRunner = this;
            synchronized (mockSplitRunner) {
                this.runnerThread = Thread.currentThread();
                if (this.closed) {
                    this.finishedFuture.set(null);
                    return Futures.immediateVoidFuture();
                }
            }
            try {
                while (true) {
                    Thread.sleep(100000L);
                }
            }
            catch (InterruptedException e) {
                MockSplitRunner mockSplitRunner2 = this;
                synchronized (mockSplitRunner2) {
                    this.closed = true;
                }
                this.finishedFuture.set(null);
                return Futures.immediateVoidFuture();
            }
        }

        public String getInfo() {
            return "MockSplitRunner";
        }

        public synchronized void close() {
            this.closed = true;
            if (this.runnerThread != null) {
                this.runnerThread.interrupt();
            }
        }
    }

    private static class NoConnectorServicesProvider
    implements ConnectorServicesProvider {
        private NoConnectorServicesProvider() {
        }

        public void loadInitialCatalogs() {
        }

        public void ensureCatalogsLoaded(Session session, List<CatalogProperties> catalogs) {
        }

        public void pruneCatalogs(Set<CatalogHandle> catalogsInUse) {
            throw new UnsupportedOperationException();
        }

        public ConnectorServices getConnectorServices(CatalogHandle catalogHandle) {
            throw new UnsupportedOperationException();
        }
    }
}

