/*
 * Decompiled with CFR 0.152.
 */
package org.apache.druid.indexing.overlord.http;

import com.google.common.base.Function;
import com.google.common.base.Optional;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
import java.io.Closeable;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import javax.annotation.Nullable;
import javax.servlet.http.HttpServletRequest;
import javax.ws.rs.core.Response;
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.api.CompressionProvider;
import org.apache.curator.retry.RetryOneTime;
import org.apache.curator.test.TestingServer;
import org.apache.curator.test.Timing;
import org.apache.druid.curator.PotentiallyGzippedCompressionProvider;
import org.apache.druid.curator.discovery.NoopServiceAnnouncer;
import org.apache.druid.curator.discovery.ServiceAnnouncer;
import org.apache.druid.discovery.DruidLeaderSelector;
import org.apache.druid.indexer.TaskLocation;
import org.apache.druid.indexer.TaskState;
import org.apache.druid.indexer.TaskStatus;
import org.apache.druid.indexer.TaskStatusPlus;
import org.apache.druid.indexing.common.TaskLock;
import org.apache.druid.indexing.common.TimeChunkLock;
import org.apache.druid.indexing.common.actions.TaskActionClientFactory;
import org.apache.druid.indexing.common.config.TaskStorageConfig;
import org.apache.druid.indexing.common.task.NoopTask;
import org.apache.druid.indexing.common.task.Task;
import org.apache.druid.indexing.overlord.HeapMemoryTaskStorage;
import org.apache.druid.indexing.overlord.IndexerMetadataStorageAdapter;
import org.apache.druid.indexing.overlord.IndexerMetadataStorageCoordinator;
import org.apache.druid.indexing.overlord.TaskLockbox;
import org.apache.druid.indexing.overlord.TaskMaster;
import org.apache.druid.indexing.overlord.TaskRunner;
import org.apache.druid.indexing.overlord.TaskRunnerFactory;
import org.apache.druid.indexing.overlord.TaskRunnerListener;
import org.apache.druid.indexing.overlord.TaskRunnerWorkItem;
import org.apache.druid.indexing.overlord.TaskStorage;
import org.apache.druid.indexing.overlord.TaskStorageQueryAdapter;
import org.apache.druid.indexing.overlord.WorkerTaskRunnerQueryAdapter;
import org.apache.druid.indexing.overlord.autoscaling.ScalingStats;
import org.apache.druid.indexing.overlord.config.DefaultTaskConfig;
import org.apache.druid.indexing.overlord.config.TaskLockConfig;
import org.apache.druid.indexing.overlord.config.TaskQueueConfig;
import org.apache.druid.indexing.overlord.helpers.OverlordHelperManager;
import org.apache.druid.indexing.overlord.http.OverlordResource;
import org.apache.druid.indexing.overlord.http.TaskPayloadResponse;
import org.apache.druid.indexing.overlord.http.TaskStatusResponse;
import org.apache.druid.indexing.overlord.supervisor.SupervisorManager;
import org.apache.druid.indexing.test.TestIndexerMetadataStorageCoordinator;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.Pair;
import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.java.util.emitter.EmittingLogger;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.druid.server.DruidNode;
import org.apache.druid.server.coordinator.CoordinatorOverlordServiceConfig;
import org.apache.druid.server.metrics.NoopServiceEmitter;
import org.apache.druid.server.security.AuthTestUtils;
import org.apache.druid.server.security.AuthenticationResult;
import org.apache.druid.utils.CloseableUtils;
import org.easymock.EasyMock;
import org.joda.time.Period;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

public class OverlordTest {
    private static final TaskLocation TASK_LOCATION = new TaskLocation("dummy", 1000, -1);
    private TestingServer server;
    private Timing timing;
    private CuratorFramework curator;
    private TaskMaster taskMaster;
    private TaskLockbox taskLockbox;
    private TaskStorage taskStorage;
    private TaskActionClientFactory taskActionClientFactory;
    private CountDownLatch announcementLatch;
    private DruidNode druidNode;
    private OverlordResource overlordResource;
    private Map<String, CountDownLatch> taskCompletionCountDownLatches;
    private Map<String, CountDownLatch> runTaskCountDownLatches;
    private HttpServletRequest req;
    private SupervisorManager supervisorManager;
    private final String goodTaskId = "aaa";
    private final String badTaskId = "zzz";

    private void setupServerAndCurator() throws Exception {
        this.server = new TestingServer();
        this.timing = new Timing();
        this.curator = CuratorFrameworkFactory.builder().connectString(this.server.getConnectString()).sessionTimeoutMs(this.timing.session()).connectionTimeoutMs(this.timing.connection()).retryPolicy((RetryPolicy)new RetryOneTime(1)).compressionProvider((CompressionProvider)new PotentiallyGzippedCompressionProvider(true)).build();
    }

    private void tearDownServerAndCurator() {
        CloseableUtils.closeAndWrapExceptions((Closeable)this.curator);
        CloseableUtils.closeAndWrapExceptions((Closeable)this.server);
    }

    @Before
    public void setUp() throws Exception {
        this.req = (HttpServletRequest)EasyMock.createMock(HttpServletRequest.class);
        EasyMock.expect((Object)this.req.getAttribute("Druid-Allow-Unsecured-Path")).andReturn(null).anyTimes();
        EasyMock.expect((Object)this.req.getAttribute("Druid-Authorization-Checked")).andReturn(null).anyTimes();
        EasyMock.expect((Object)this.req.getAttribute("Druid-Authentication-Result")).andReturn((Object)new AuthenticationResult("druid", "druid", null, null)).anyTimes();
        this.req.setAttribute("Druid-Authorization-Checked", (Object)true);
        EasyMock.expectLastCall().anyTimes();
        this.supervisorManager = (SupervisorManager)EasyMock.createMock(SupervisorManager.class);
        this.taskActionClientFactory = (TaskActionClientFactory)EasyMock.createStrictMock(TaskActionClientFactory.class);
        EasyMock.expect((Object)this.taskActionClientFactory.create((Task)EasyMock.anyObject())).andReturn(null).anyTimes();
        EasyMock.replay((Object[])new Object[]{this.taskActionClientFactory, this.req});
        this.taskStorage = new HeapMemoryTaskStorage(new TaskStorageConfig(null));
        TestIndexerMetadataStorageCoordinator mdc = new TestIndexerMetadataStorageCoordinator();
        this.taskLockbox = new TaskLockbox(this.taskStorage, (IndexerMetadataStorageCoordinator)mdc);
        this.runTaskCountDownLatches = new HashMap<String, CountDownLatch>();
        this.runTaskCountDownLatches.put("0", new CountDownLatch(1));
        this.runTaskCountDownLatches.put("1", new CountDownLatch(1));
        this.taskCompletionCountDownLatches = new HashMap<String, CountDownLatch>();
        this.taskCompletionCountDownLatches.put("0", new CountDownLatch(1));
        this.taskCompletionCountDownLatches.put("1", new CountDownLatch(1));
        this.announcementLatch = new CountDownLatch(1);
        this.setupServerAndCurator();
        this.curator.start();
        this.curator.blockUntilConnected();
        this.druidNode = new DruidNode("hey", "what", false, Integer.valueOf(1234), null, true, false);
        NoopServiceEmitter serviceEmitter = new NoopServiceEmitter();
        NoopTask badTask = new NoopTask("zzz", "zzz", "datasource", 10000L, 0L, null, null, null);
        TimeChunkLock badLock = new TimeChunkLock(null, "zzz", "datasource", Intervals.ETERNITY, "version1", 50);
        NoopTask goodTask = new NoopTask("aaa", "aaa", "datasource", 0L, 0L, null, null, null);
        TimeChunkLock goodLock = new TimeChunkLock(null, "aaa", "datasource", Intervals.ETERNITY, "version0", 50);
        this.taskStorage.insert((Task)goodTask, TaskStatus.running((String)"aaa"));
        this.taskStorage.insert((Task)badTask, TaskStatus.running((String)"zzz"));
        this.taskStorage.addLock("zzz", (TaskLock)badLock);
        this.taskStorage.addLock("aaa", (TaskLock)goodLock);
        this.runTaskCountDownLatches.put("zzz", new CountDownLatch(1));
        this.runTaskCountDownLatches.put("aaa", new CountDownLatch(1));
        this.taskCompletionCountDownLatches.put("zzz", new CountDownLatch(1));
        this.taskCompletionCountDownLatches.put("aaa", new CountDownLatch(1));
        TaskRunnerFactory taskRunnerFactory = () -> new MockTaskRunner(this.runTaskCountDownLatches, this.taskCompletionCountDownLatches);
        taskRunnerFactory.build().run((Task)badTask);
        taskRunnerFactory.build().run((Task)goodTask);
        this.taskMaster = new TaskMaster(new TaskLockConfig(), new TaskQueueConfig(null, new Period(1L), null, new Period(10L)), new DefaultTaskConfig(), this.taskLockbox, this.taskStorage, this.taskActionClientFactory, this.druidNode, taskRunnerFactory, (ServiceAnnouncer)new NoopServiceAnnouncer(){

            public void announce(DruidNode node) {
                OverlordTest.this.announcementLatch.countDown();
            }
        }, new CoordinatorOverlordServiceConfig(null, null), (ServiceEmitter)serviceEmitter, this.supervisorManager, (OverlordHelperManager)EasyMock.createNiceMock(OverlordHelperManager.class), (DruidLeaderSelector)new TestDruidLeaderSelector());
        EmittingLogger.registerEmitter((ServiceEmitter)serviceEmitter);
    }

    @Test(timeout=60000L)
    public void testOverlordRun() throws Exception {
        this.taskMaster.start();
        this.announcementLatch.await();
        while (!this.taskMaster.isLeader()) {
            Thread.sleep(10L);
        }
        Assert.assertEquals((Object)this.taskMaster.getCurrentLeader(), (Object)this.druidNode.getHostAndPort());
        Assert.assertEquals((Object)Optional.absent(), (Object)this.taskMaster.getRedirectLocation());
        TaskStorageQueryAdapter taskStorageQueryAdapter = new TaskStorageQueryAdapter(this.taskStorage, this.taskLockbox);
        WorkerTaskRunnerQueryAdapter workerTaskRunnerQueryAdapter = new WorkerTaskRunnerQueryAdapter(this.taskMaster, null);
        this.overlordResource = new OverlordResource(this.taskMaster, taskStorageQueryAdapter, new IndexerMetadataStorageAdapter(taskStorageQueryAdapter, null), null, null, null, AuthTestUtils.TEST_AUTHORIZER_MAPPER, workerTaskRunnerQueryAdapter, null);
        Response response = this.overlordResource.getLeader();
        Assert.assertEquals((Object)this.druidNode.getHostAndPort(), (Object)response.getEntity());
        this.waitForTaskStatus("zzz", TaskState.FAILED);
        this.taskCompletionCountDownLatches.get("aaa").countDown();
        this.waitForTaskStatus("aaa", TaskState.SUCCESS);
        String taskId_0 = "0";
        NoopTask task_0 = NoopTask.create((String)"0", (int)0);
        response = this.overlordResource.taskPost((Task)task_0, this.req);
        Assert.assertEquals((long)200L, (long)response.getStatus());
        Assert.assertEquals((Object)ImmutableMap.of((Object)"task", (Object)"0"), (Object)response.getEntity());
        response = this.overlordResource.taskPost((Task)task_0, this.req);
        Assert.assertEquals((long)400L, (long)response.getStatus());
        response = this.overlordResource.getTaskPayload("0");
        Assert.assertEquals((Object)task_0, (Object)((TaskPayloadResponse)response.getEntity()).getPayload());
        response = this.overlordResource.getTaskPayload("whatever");
        Assert.assertEquals((long)404L, (long)response.getStatus());
        response = this.overlordResource.getTaskStatus("0");
        Assert.assertEquals((Object)"0", (Object)((TaskStatusResponse)response.getEntity()).getTask());
        Assert.assertEquals((Object)TaskStatus.running((String)"0").getStatusCode(), (Object)((TaskStatusResponse)response.getEntity()).getStatus().getStatusCode());
        this.taskCompletionCountDownLatches.get("0").countDown();
        this.waitForTaskStatus("0", TaskState.SUCCESS);
        String taskId_1 = "1";
        NoopTask task_1 = NoopTask.create((String)"1", (int)0);
        this.taskStorage.insert((Task)task_1, TaskStatus.running((String)"1"));
        this.runTaskCountDownLatches.get("1").await();
        response = this.overlordResource.getRunningTasks(null, this.req);
        Assert.assertEquals((long)1L, (long)((List)response.getEntity()).size());
        TaskStatusPlus taskResponseObject = (TaskStatusPlus)((List)response.getEntity()).get(0);
        Assert.assertEquals((Object)"1", (Object)taskResponseObject.getId());
        Assert.assertEquals((Object)TASK_LOCATION, (Object)taskResponseObject.getLocation());
        this.taskCompletionCountDownLatches.get("1").countDown();
        this.waitForTaskStatus("1", TaskState.SUCCESS);
        response = this.overlordResource.getCompleteTasks(null, this.req);
        Assert.assertEquals((long)4L, (long)((List)response.getEntity()).size());
        response = this.overlordResource.getCompleteTasks(Integer.valueOf(1), this.req);
        Assert.assertEquals((long)1L, (long)((List)response.getEntity()).size());
        this.taskMaster.stop();
        Assert.assertFalse((boolean)this.taskMaster.isLeader());
        EasyMock.verify((Object[])new Object[]{this.taskActionClientFactory});
    }

    private void waitForTaskStatus(String taskId, TaskState status) throws InterruptedException {
        Response response;
        while (!status.equals((Object)((TaskStatusResponse)(response = this.overlordResource.getTaskStatus(taskId)).getEntity()).getStatus().getStatusCode())) {
            Thread.sleep(10L);
        }
    }

    @After
    public void tearDown() {
        this.tearDownServerAndCurator();
    }

    private static class TestDruidLeaderSelector
    implements DruidLeaderSelector {
        private volatile DruidLeaderSelector.Listener listener;
        private volatile String leader;

        private TestDruidLeaderSelector() {
        }

        public String getCurrentLeader() {
            return this.leader;
        }

        public boolean isLeader() {
            return this.leader != null;
        }

        public int localTerm() {
            return 0;
        }

        public void registerListener(DruidLeaderSelector.Listener listener) {
            this.listener = listener;
            this.leader = "what:1234";
            listener.becomeLeader();
        }

        public void unregisterListener() {
            this.leader = null;
            this.listener.stopBeingLeader();
        }
    }

    public static class MockTaskRunner
    implements TaskRunner {
        private Map<String, CountDownLatch> completionLatches;
        private Map<String, CountDownLatch> runLatches;
        private ConcurrentHashMap<String, TaskRunnerWorkItem> taskRunnerWorkItems;
        private List<String> runningTasks;

        public MockTaskRunner(Map<String, CountDownLatch> runLatches, Map<String, CountDownLatch> completionLatches) {
            this.runLatches = runLatches;
            this.completionLatches = completionLatches;
            this.taskRunnerWorkItems = new ConcurrentHashMap();
            this.runningTasks = new ArrayList<String>();
        }

        public List<Pair<Task, ListenableFuture<TaskStatus>>> restore() {
            return ImmutableList.of();
        }

        public void registerListener(TaskRunnerListener listener, Executor executor) {
            throw new UnsupportedOperationException();
        }

        public void unregisterListener(String listenerId) {
            throw new UnsupportedOperationException();
        }

        public void stop() {
        }

        public synchronized ListenableFuture<TaskStatus> run(final Task task) {
            final String taskId = task.getId();
            ListenableFuture future = MoreExecutors.listeningDecorator((ExecutorService)Execs.singleThreaded((String)"noop_test_task_exec_%s")).submit((Callable)new Callable<TaskStatus>(){

                @Override
                public TaskStatus call() throws Exception {
                    runningTasks.add(taskId);
                    if (runLatches != null) {
                        ((CountDownLatch)runLatches.get(taskId)).countDown();
                    }
                    if (completionLatches != null) {
                        ((CountDownLatch)completionLatches.get(taskId)).await();
                    }
                    taskRunnerWorkItems.remove(taskId);
                    runningTasks.remove(taskId);
                    return TaskStatus.success((String)taskId);
                }
            });
            TaskRunnerWorkItem taskRunnerWorkItem = new TaskRunnerWorkItem(taskId, future){

                public TaskLocation getLocation() {
                    return TASK_LOCATION;
                }

                public String getTaskType() {
                    return task.getType();
                }

                public String getDataSource() {
                    return task.getDataSource();
                }
            };
            this.taskRunnerWorkItems.put(taskId, taskRunnerWorkItem);
            return future;
        }

        public void shutdown(String taskid, String reason) {
            this.runningTasks.remove(taskid);
        }

        public synchronized Collection<? extends TaskRunnerWorkItem> getRunningTasks() {
            return Lists.transform(this.runningTasks, (Function)new Function<String, TaskRunnerWorkItem>(){

                @Nullable
                public TaskRunnerWorkItem apply(String input) {
                    return (TaskRunnerWorkItem)taskRunnerWorkItems.get(input);
                }
            });
        }

        public Collection<TaskRunnerWorkItem> getPendingTasks() {
            return ImmutableList.of();
        }

        public Collection<? extends TaskRunnerWorkItem> getKnownTasks() {
            return this.taskRunnerWorkItems.values();
        }

        public Optional<ScalingStats> getScalingStats() {
            return Optional.absent();
        }

        public Map<String, Long> getTotalTaskSlotCount() {
            throw new UnsupportedOperationException();
        }

        public Map<String, Long> getIdleTaskSlotCount() {
            throw new UnsupportedOperationException();
        }

        public Map<String, Long> getUsedTaskSlotCount() {
            throw new UnsupportedOperationException();
        }

        public Map<String, Long> getLazyTaskSlotCount() {
            throw new UnsupportedOperationException();
        }

        public Map<String, Long> getBlacklistedTaskSlotCount() {
            throw new UnsupportedOperationException();
        }

        public void start() {
        }
    }
}

