/*
 * Decompiled with CFR 0.152.
 */
package org.apache.druid.indexing.common.task.batch.parallel;

import com.google.common.util.concurrent.ListenableFuture;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.apache.druid.client.indexing.IndexingServiceClient;
import org.apache.druid.client.indexing.NoopIndexingServiceClient;
import org.apache.druid.client.indexing.TaskStatusResponse;
import org.apache.druid.data.input.InputSplit;
import org.apache.druid.indexer.RunnerTaskState;
import org.apache.druid.indexer.TaskLocation;
import org.apache.druid.indexer.TaskState;
import org.apache.druid.indexer.TaskStatus;
import org.apache.druid.indexer.TaskStatusPlus;
import org.apache.druid.indexing.common.TaskToolbox;
import org.apache.druid.indexing.common.task.NoopTask;
import org.apache.druid.indexing.common.task.batch.parallel.SubTaskReport;
import org.apache.druid.indexing.common.task.batch.parallel.SubTaskSpec;
import org.apache.druid.indexing.common.task.batch.parallel.TaskHistory;
import org.apache.druid.indexing.common.task.batch.parallel.TaskMonitor;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.concurrent.Execs;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

public class TaskMonitorTest {
    private static final int SPLIT_NUM = 10;
    private final ExecutorService taskRunner = Execs.multiThreaded((int)5, (String)"task-monitor-test-%d");
    private final ConcurrentMap<String, TaskState> tasks = new ConcurrentHashMap<String, TaskState>();
    private final TaskMonitor<TestTask, SimpleSubTaskReport> monitor = new TaskMonitor((IndexingServiceClient)new TestIndexingServiceClient(), 3, 10);

    @Before
    public void setup() {
        this.tasks.clear();
        this.monitor.start(100L);
    }

    @After
    public void teardown() {
        this.monitor.stop();
        this.taskRunner.shutdownNow();
    }

    @Test
    public void testBasic() throws InterruptedException, ExecutionException, TimeoutException {
        List futures = IntStream.range(0, 10).mapToObj(i -> this.monitor.submit((SubTaskSpec)new TestTaskSpec("specId" + i, "groupId", "supervisorId", null, new IntegerInputSplit(i), 100L, 0, false))).collect(Collectors.toList());
        for (int i2 = 0; i2 < futures.size(); ++i2) {
            TaskMonitor.SubTaskCompleteEvent result = (TaskMonitor.SubTaskCompleteEvent)((ListenableFuture)futures.get(i2)).get(1L, TimeUnit.SECONDS);
            Assert.assertEquals((Object)"supervisorId", (Object)result.getSpec().getSupervisorTaskId());
            Assert.assertEquals((Object)("specId" + i2), (Object)result.getSpec().getId());
            Assert.assertNotNull((Object)result.getLastStatus());
            Assert.assertEquals((Object)TaskState.SUCCESS, (Object)result.getLastStatus().getStatusCode());
            Assert.assertEquals((Object)TaskState.SUCCESS, (Object)result.getLastState());
        }
    }

    @Test
    public void testRetry() throws InterruptedException, ExecutionException, TimeoutException {
        List specs = IntStream.range(0, 10).mapToObj(i -> new TestTaskSpec("specId" + i, "groupId", "supervisorId", null, new IntegerInputSplit(i), 100L, 2, false)).collect(Collectors.toList());
        List futures = specs.stream().map(arg_0 -> this.monitor.submit(arg_0)).collect(Collectors.toList());
        for (int i2 = 0; i2 < futures.size(); ++i2) {
            TaskMonitor.SubTaskCompleteEvent result = (TaskMonitor.SubTaskCompleteEvent)((ListenableFuture)futures.get(i2)).get(2L, TimeUnit.SECONDS);
            Assert.assertEquals((Object)"supervisorId", (Object)result.getSpec().getSupervisorTaskId());
            Assert.assertEquals((Object)("specId" + i2), (Object)result.getSpec().getId());
            Assert.assertNotNull((Object)result.getLastStatus());
            Assert.assertEquals((Object)TaskState.SUCCESS, (Object)result.getLastStatus().getStatusCode());
            Assert.assertEquals((Object)TaskState.SUCCESS, (Object)result.getLastState());
            TaskHistory taskHistory = this.monitor.getCompleteSubTaskSpecHistory(((TestTaskSpec)((Object)specs.get(i2))).getId());
            Assert.assertNotNull((Object)taskHistory);
            List attemptHistory = taskHistory.getAttemptHistory();
            Assert.assertNotNull((Object)attemptHistory);
            Assert.assertEquals((long)3L, (long)attemptHistory.size());
            Assert.assertEquals((Object)TaskState.FAILED, (Object)((TaskStatusPlus)attemptHistory.get(0)).getStatusCode());
            Assert.assertEquals((Object)TaskState.FAILED, (Object)((TaskStatusPlus)attemptHistory.get(1)).getStatusCode());
        }
    }

    @Test
    public void testResubmitWithOldType() throws InterruptedException, ExecutionException, TimeoutException {
        List specs = IntStream.range(0, 10).mapToObj(i -> new TestTaskSpec("specId" + i, "groupId", "supervisorId", null, new IntegerInputSplit(i), 100L, 0, true)).collect(Collectors.toList());
        List futures = specs.stream().map(arg_0 -> this.monitor.submit(arg_0)).collect(Collectors.toList());
        for (int i2 = 0; i2 < futures.size(); ++i2) {
            TaskMonitor.SubTaskCompleteEvent result = (TaskMonitor.SubTaskCompleteEvent)((ListenableFuture)futures.get(i2)).get(2L, TimeUnit.SECONDS);
            Assert.assertEquals((Object)"supervisorId", (Object)result.getSpec().getSupervisorTaskId());
            Assert.assertEquals((Object)("specId" + i2), (Object)result.getSpec().getId());
            Assert.assertNotNull((Object)result.getLastStatus());
            Assert.assertEquals((Object)TaskState.SUCCESS, (Object)result.getLastStatus().getStatusCode());
            Assert.assertEquals((Object)TaskState.SUCCESS, (Object)result.getLastState());
            TaskHistory taskHistory = this.monitor.getCompleteSubTaskSpecHistory(((TestTaskSpec)((Object)specs.get(i2))).getId());
            Assert.assertNotNull((Object)taskHistory);
            List attemptHistory = taskHistory.getAttemptHistory();
            Assert.assertNotNull((Object)attemptHistory);
            Assert.assertEquals((long)1L, (long)attemptHistory.size());
            Assert.assertEquals((Object)TaskState.SUCCESS, (Object)((TaskStatusPlus)attemptHistory.get(0)).getStatusCode());
        }
    }

    private static class SimpleSubTaskReport
    implements SubTaskReport {
        private final String taskId;

        private SimpleSubTaskReport(String taskId) {
            this.taskId = taskId;
        }

        public String getTaskId() {
            return this.taskId;
        }
    }

    private static class IntegerInputSplit
    extends InputSplit<Integer> {
        IntegerInputSplit(int split) {
            super((Object)split);
        }
    }

    private class TestIndexingServiceClient
    extends NoopIndexingServiceClient {
        private TestIndexingServiceClient() {
        }

        public String runTask(String taskId, Object taskObject) {
            TestTask task = (TestTask)((Object)taskObject);
            TaskMonitorTest.this.tasks.put(task.getId(), TaskState.RUNNING);
            if (task.throwUnknownTypeIdError) {
                throw new RuntimeException((Throwable)new ISE("Could not resolve type id 'test_task_id'", new Object[0]));
            }
            TaskMonitorTest.this.taskRunner.submit(() -> TaskMonitorTest.this.tasks.put(task.getId(), task.run(null).getStatusCode()));
            return task.getId();
        }

        public TaskStatusResponse getTaskStatus(String taskId) {
            return new TaskStatusResponse(taskId, new TaskStatusPlus(taskId, "groupId", "testTask", DateTimes.EPOCH, DateTimes.EPOCH, (TaskState)TaskMonitorTest.this.tasks.get(taskId), RunnerTaskState.RUNNING, Long.valueOf(-1L), TaskLocation.unknown(), "testDataSource", null));
        }
    }

    private class TestTask
    extends NoopTask {
        private final boolean shouldFail;
        private final boolean throwUnknownTypeIdError;

        TestTask(String id, long runTime, boolean shouldFail, boolean throwUnknownTypeIdError) {
            super(id, null, "testDataSource", runTime, 0L, null, null, null);
            this.shouldFail = shouldFail;
            this.throwUnknownTypeIdError = throwUnknownTypeIdError;
        }

        public TaskStatus run(TaskToolbox toolbox) throws Exception {
            TaskMonitorTest.this.monitor.collectReport((SubTaskReport)new SimpleSubTaskReport(this.getId()));
            if (this.shouldFail) {
                Thread.sleep(this.getRunTime());
                return TaskStatus.failure((String)this.getId(), (String)"Dummy task status failure for testing");
            }
            return super.run(toolbox);
        }
    }

    private class TestTaskSpec
    extends SubTaskSpec<TestTask> {
        private final long runTime;
        private final int numMaxFails;
        private final boolean throwUnknownTypeIdError;
        private int numFails;

        TestTaskSpec(String id, String groupId, String supervisorTaskId, Map<String, Object> context, InputSplit inputSplit, long runTime, int numMaxFails, boolean throwUnknownTypeIdError) {
            super(id, groupId, supervisorTaskId, context, inputSplit);
            this.runTime = runTime;
            this.numMaxFails = numMaxFails;
            this.throwUnknownTypeIdError = throwUnknownTypeIdError;
        }

        public TestTask newSubTask(int numAttempts) {
            return new TestTask(this.getId(), this.runTime, this.numFails++ < this.numMaxFails, this.throwUnknownTypeIdError);
        }

        public TestTask newSubTaskWithBackwardCompatibleType(int numAttempts) {
            return new TestTask(this.getId(), this.runTime, this.numFails++ < this.numMaxFails, false);
        }
    }
}

