/*
 * Decompiled with CFR 0.152.
 */
package org.apache.druid.indexing.overlord;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Optional;
import com.google.common.collect.ImmutableList;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture;
import com.google.errorprone.annotations.concurrent.GuardedBy;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.apache.druid.indexer.RunnerTaskState;
import org.apache.druid.indexer.TaskLocation;
import org.apache.druid.indexer.TaskStatus;
import org.apache.druid.indexing.common.actions.TaskAction;
import org.apache.druid.indexing.common.actions.TaskActionClient;
import org.apache.druid.indexing.common.actions.TaskActionClientFactory;
import org.apache.druid.indexing.common.config.TaskStorageConfig;
import org.apache.druid.indexing.common.task.NoopTask;
import org.apache.druid.indexing.common.task.Task;
import org.apache.druid.indexing.overlord.HeapMemoryTaskStorage;
import org.apache.druid.indexing.overlord.IndexerMetadataStorageCoordinator;
import org.apache.druid.indexing.overlord.TaskLockbox;
import org.apache.druid.indexing.overlord.TaskQueue;
import org.apache.druid.indexing.overlord.TaskRunner;
import org.apache.druid.indexing.overlord.TaskRunnerListener;
import org.apache.druid.indexing.overlord.TaskRunnerWorkItem;
import org.apache.druid.indexing.overlord.TaskStorage;
import org.apache.druid.indexing.overlord.autoscaling.ScalingStats;
import org.apache.druid.indexing.overlord.config.DefaultTaskConfig;
import org.apache.druid.indexing.overlord.config.TaskLockConfig;
import org.apache.druid.indexing.overlord.config.TaskQueueConfig;
import org.apache.druid.java.util.common.Pair;
import org.apache.druid.java.util.common.concurrent.ScheduledExecutors;
import org.apache.druid.java.util.common.io.Closer;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.java.util.emitter.EmittingLogger;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.druid.metadata.IndexerSQLMetadataStorageCoordinator;
import org.apache.druid.metadata.MetadataStorageTablesConfig;
import org.apache.druid.metadata.SQLMetadataConnector;
import org.apache.druid.metadata.TaskLookup;
import org.apache.druid.metadata.TestDerbyConnector;
import org.apache.druid.segment.TestHelper;
import org.apache.druid.server.metrics.NoopServiceEmitter;
import org.joda.time.Duration;
import org.joda.time.Period;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;

public class TaskQueueScaleTest {
    private static final String DATASOURCE = "ds";
    private final int numTasks = 1000;
    @Rule
    public final TestDerbyConnector.DerbyConnectorRule derbyConnectorRule = new TestDerbyConnector.DerbyConnectorRule();
    private TaskQueue taskQueue;
    private TaskStorage taskStorage;
    private TestTaskRunner taskRunner;
    private Closer closer;

    @Before
    public void setUp() {
        EmittingLogger.registerEmitter((ServiceEmitter)new NoopServiceEmitter());
        this.closer = Closer.create();
        this.taskStorage = new HeapMemoryTaskStorage(new TaskStorageConfig(Period.hours((int)1)));
        this.taskRunner = new TestTaskRunner();
        this.closer.register(this.taskRunner::stop);
        ObjectMapper jsonMapper = TestHelper.makeJsonMapper();
        IndexerSQLMetadataStorageCoordinator storageCoordinator = new IndexerSQLMetadataStorageCoordinator(jsonMapper, (MetadataStorageTablesConfig)this.derbyConnectorRule.metadataTablesConfigSupplier().get(), (SQLMetadataConnector)this.derbyConnectorRule.getConnector());
        TaskActionClientFactory unsupportedTaskActionFactory = task -> new TaskActionClient(){

            public <RetType> RetType submit(TaskAction<RetType> taskAction) {
                throw new UnsupportedOperationException();
            }
        };
        this.taskQueue = new TaskQueue(new TaskLockConfig(), new TaskQueueConfig(null, Period.millis((int)1), null, null), new DefaultTaskConfig(), this.taskStorage, (TaskRunner)this.taskRunner, unsupportedTaskActionFactory, new TaskLockbox(this.taskStorage, (IndexerMetadataStorageCoordinator)storageCoordinator), (ServiceEmitter)new NoopServiceEmitter());
        this.taskQueue.start();
        this.closer.register(() -> ((TaskQueue)this.taskQueue).stop());
    }

    @After
    public void tearDown() throws Exception {
        this.closer.close();
    }

    @Test(timeout=60000L)
    public void doMassLaunchAndExit() throws Exception {
        Assert.assertEquals((String)"no tasks should be running", (long)0L, (long)this.taskRunner.getKnownTasks().size());
        Assert.assertEquals((String)"no tasks should be known", (long)0L, (long)this.taskQueue.getTasks().size());
        Assert.assertEquals((String)"no tasks should be running", (long)0L, (long)this.taskQueue.getRunningTaskCount().size());
        for (int i = 0; i < 1000; ++i) {
            TestTask testTask = new TestTask(i, 2000L);
            this.taskQueue.add((Task)testTask);
        }
        Assert.assertEquals((String)"all tasks should be known", (long)1000L, (long)this.taskQueue.getTasks().size());
        long runningTasks = this.taskQueue.getRunningTaskCount().values().stream().mapToLong(Long::longValue).sum();
        long pendingTasks = this.taskQueue.getPendingTaskCount().values().stream().mapToLong(Long::longValue).sum();
        long waitingTasks = this.taskQueue.getWaitingTaskCount().values().stream().mapToLong(Long::longValue).sum();
        Assert.assertEquals((String)"all tasks should be known", (long)1000L, (long)(runningTasks + pendingTasks + waitingTasks));
        TaskLookup.CompleteTaskLookup completeTaskLookup = TaskLookup.CompleteTaskLookup.of((Integer)1000, (Duration)Duration.standardHours((long)1L));
        while (this.taskStorage.getTaskInfos((TaskLookup)completeTaskLookup, DATASOURCE).size() < 1000) {
            Thread.sleep(100L);
        }
        Thread.sleep(100L);
        Assert.assertEquals((String)"no tasks should be active", (long)0L, (long)this.taskStorage.getActiveTasks().size());
        runningTasks = this.taskQueue.getRunningTaskCount().values().stream().mapToLong(Long::longValue).sum();
        pendingTasks = this.taskQueue.getPendingTaskCount().values().stream().mapToLong(Long::longValue).sum();
        waitingTasks = this.taskQueue.getWaitingTaskCount().values().stream().mapToLong(Long::longValue).sum();
        Assert.assertEquals((String)"no tasks should be running", (long)0L, (long)runningTasks);
        Assert.assertEquals((String)"no tasks should be pending", (long)0L, (long)pendingTasks);
        Assert.assertEquals((String)"no tasks should be waiting", (long)0L, (long)waitingTasks);
    }

    @Test(timeout=60000L)
    public void doMassLaunchAndShutdown() throws Exception {
        Assert.assertEquals((String)"no tasks should be running", (long)0L, (long)this.taskRunner.getKnownTasks().size());
        ArrayList<String> taskIds = new ArrayList<String>();
        for (int i = 0; i < 1000; ++i) {
            TestTask testTask = new TestTask(i, Duration.standardHours((long)1L).getMillis());
            this.taskQueue.add((Task)testTask);
            taskIds.add(testTask.getId());
        }
        while (this.taskStorage.getActiveTasks().size() < 1000) {
            Thread.sleep(100L);
        }
        Assert.assertEquals((String)"all tasks should be running", (long)1000L, (long)this.taskStorage.getActiveTasks().size());
        for (String taskId : taskIds) {
            this.taskQueue.shutdown(taskId, "test shutdown", new Object[0]);
        }
        while (!this.taskStorage.getActiveTasks().isEmpty()) {
            Thread.sleep(100L);
        }
        Assert.assertEquals((String)"no tasks should be running", (long)0L, (long)this.taskStorage.getActiveTasks().size());
        int completed = this.taskStorage.getTaskInfos((TaskLookup)TaskLookup.CompleteTaskLookup.of((Integer)1000, (Duration)Duration.standardHours((long)1L)), DATASOURCE).size();
        Assert.assertEquals((String)"all tasks should have completed", (long)1000L, (long)completed);
    }

    private static class TestTaskRunnerWorkItem
    extends TaskRunnerWorkItem {
        private final RunnerTaskState state;

        public TestTaskRunnerWorkItem(String taskId) {
            this(taskId, (ListenableFuture<TaskStatus>)SettableFuture.create(), RunnerTaskState.PENDING);
        }

        private TestTaskRunnerWorkItem(String taskId, ListenableFuture<TaskStatus> result, RunnerTaskState state) {
            super(taskId, result);
            this.state = state;
        }

        public RunnerTaskState getState() {
            return this.state;
        }

        public TaskLocation getLocation() {
            return TaskLocation.unknown();
        }

        @Nullable
        public String getTaskType() {
            throw new UnsupportedOperationException();
        }

        public String getDataSource() {
            throw new UnsupportedOperationException();
        }

        public void setResult(TaskStatus result) {
            ((SettableFuture)this.getResult()).set((Object)result);
        }

        public TestTaskRunnerWorkItem withState(RunnerTaskState newState) {
            return new TestTaskRunnerWorkItem(this.getTaskId(), (ListenableFuture<TaskStatus>)this.getResult(), newState);
        }
    }

    private static class TestTaskRunner
    implements TaskRunner {
        private static final Logger log = new Logger(TestTaskRunner.class);
        private static final Duration T_PENDING_TO_RUNNING = Duration.standardSeconds((long)2L);
        private static final Duration T_SHUTDOWN_ACK = Duration.millis((long)8L);
        private static final Duration T_SHUTDOWN_COMPLETE = Duration.standardSeconds((long)2L);
        @GuardedBy(value="knownTasks")
        private final Map<String, TestTaskRunnerWorkItem> knownTasks = new HashMap<String, TestTaskRunnerWorkItem>();
        private final ScheduledExecutorService exec = ScheduledExecutors.fixed((int)8, (String)"TaskQueueScaleTest-%s");

        private TestTaskRunner() {
        }

        public void start() {
            throw new UnsupportedOperationException();
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public ListenableFuture<TaskStatus> run(Task task) {
            Map<String, TestTaskRunnerWorkItem> map = this.knownTasks;
            synchronized (map) {
                TestTaskRunnerWorkItem item = this.knownTasks.computeIfAbsent(task.getId(), TestTaskRunnerWorkItem::new);
                this.exec.schedule(() -> {
                    try {
                        Map<String, TestTaskRunnerWorkItem> map = this.knownTasks;
                        synchronized (map) {
                            TestTaskRunnerWorkItem item2 = this.knownTasks.get(task.getId());
                            if (item2.getState() == RunnerTaskState.PENDING) {
                                this.knownTasks.put(task.getId(), item2.withState(RunnerTaskState.RUNNING));
                            }
                        }
                        this.exec.schedule(() -> {
                            try {
                                TestTaskRunnerWorkItem item2;
                                Map<String, TestTaskRunnerWorkItem> map = this.knownTasks;
                                synchronized (map) {
                                    item2 = this.knownTasks.get(task.getId());
                                    this.knownTasks.put(task.getId(), item2.withState(RunnerTaskState.NONE));
                                }
                                if (item2 != null) {
                                    item2.setResult(TaskStatus.success((String)task.getId()));
                                }
                            }
                            catch (Throwable e) {
                                log.error(e, "Error in scheduled executor", new Object[0]);
                            }
                        }, ((TestTask)task).getRuntimeMillis(), TimeUnit.MILLISECONDS);
                    }
                    catch (Throwable e) {
                        log.error(e, "Error in scheduled executor", new Object[0]);
                    }
                }, T_PENDING_TO_RUNNING.getMillis(), TimeUnit.MILLISECONDS);
                return item.getResult();
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void shutdown(String taskid, String reason) {
            TestTaskRunnerWorkItem existingTask;
            Map<String, TestTaskRunnerWorkItem> map = this.knownTasks;
            synchronized (map) {
                if (!this.knownTasks.containsKey(taskid)) {
                    return;
                }
            }
            TestTaskRunner.threadSleep(T_SHUTDOWN_ACK);
            Map<String, TestTaskRunnerWorkItem> map2 = this.knownTasks;
            synchronized (map2) {
                existingTask = this.knownTasks.get(taskid);
            }
            if (!existingTask.getResult().isDone()) {
                this.exec.schedule(() -> {
                    existingTask.setResult(TaskStatus.failure((String)"taskId", (String)"stopped"));
                    Map<String, TestTaskRunnerWorkItem> map = this.knownTasks;
                    synchronized (map) {
                        this.knownTasks.remove(taskid);
                    }
                }, T_SHUTDOWN_COMPLETE.getMillis(), TimeUnit.MILLISECONDS);
            }
        }

        static void threadSleep(Duration duration) {
            try {
                Thread.sleep(duration.getMillis());
            }
            catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                throw new RuntimeException(e);
            }
        }

        public void registerListener(TaskRunnerListener listener, Executor executor) {
            throw new UnsupportedOperationException();
        }

        public void unregisterListener(String listenerId) {
            throw new UnsupportedOperationException();
        }

        public List<Pair<Task, ListenableFuture<TaskStatus>>> restore() {
            return null;
        }

        public void stop() {
            this.exec.shutdownNow();
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public Collection<? extends TaskRunnerWorkItem> getRunningTasks() {
            Map<String, TestTaskRunnerWorkItem> map = this.knownTasks;
            synchronized (map) {
                return this.knownTasks.values().stream().filter(item -> item.getState() == RunnerTaskState.RUNNING).collect(Collectors.toList());
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public Collection<? extends TaskRunnerWorkItem> getPendingTasks() {
            Map<String, TestTaskRunnerWorkItem> map = this.knownTasks;
            synchronized (map) {
                return this.knownTasks.values().stream().filter(item -> item.getState() == RunnerTaskState.PENDING).collect(Collectors.toList());
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public Collection<? extends TaskRunnerWorkItem> getKnownTasks() {
            Map<String, TestTaskRunnerWorkItem> map = this.knownTasks;
            synchronized (map) {
                return ImmutableList.copyOf(this.knownTasks.values());
            }
        }

        public Optional<ScalingStats> getScalingStats() {
            throw new UnsupportedOperationException();
        }

        public Map<String, Long> getTotalTaskSlotCount() {
            throw new UnsupportedOperationException();
        }

        public Map<String, Long> getIdleTaskSlotCount() {
            throw new UnsupportedOperationException();
        }

        public Map<String, Long> getUsedTaskSlotCount() {
            throw new UnsupportedOperationException();
        }

        public Map<String, Long> getLazyTaskSlotCount() {
            throw new UnsupportedOperationException();
        }

        public Map<String, Long> getBlacklistedTaskSlotCount() {
            throw new UnsupportedOperationException();
        }
    }

    private static class TestTask
    extends NoopTask {
        private final int number;
        private final long runtime;

        public TestTask(int number, long runtime) {
            super(null, null, TaskQueueScaleTest.DATASOURCE, 0L, 0L, null, null, Collections.emptyMap());
            this.number = number;
            this.runtime = runtime;
        }

        public int getNumber() {
            return this.number;
        }

        public long getRuntimeMillis() {
            return this.runtime;
        }
    }
}

