/*
 * Decompiled with CFR 0.152.
 */
package org.apache.druid.indexing.overlord;

import com.google.common.util.concurrent.ListenableFuture;
import org.apache.druid.indexer.TaskState;
import org.apache.druid.indexer.TaskStatus;
import org.apache.druid.indexing.common.TestTasks;
import org.apache.druid.indexing.common.task.Task;
import org.apache.druid.indexing.overlord.RemoteTaskRunner;
import org.apache.druid.indexing.overlord.RemoteTaskRunnerTestUtils;
import org.apache.druid.indexing.overlord.TestRemoteTaskRunnerConfig;
import org.apache.druid.java.util.common.ISE;
import org.apache.zookeeper.ZooKeeper;
import org.joda.time.Period;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

public class RemoteTaskRunnerRunPendingTasksConcurrencyTest {
    private RemoteTaskRunner remoteTaskRunner;
    private final RemoteTaskRunnerTestUtils rtrTestUtils = new RemoteTaskRunnerTestUtils();

    @Before
    public void setUp() throws Exception {
        this.rtrTestUtils.setUp();
    }

    @After
    public void tearDown() throws Exception {
        if (this.remoteTaskRunner != null) {
            this.remoteTaskRunner.stop();
        }
        this.rtrTestUtils.tearDown();
    }

    @Test(timeout=60000L)
    public void testConcurrency() throws Exception {
        int i;
        this.rtrTestUtils.makeWorker("worker0", 3);
        this.rtrTestUtils.makeWorker("worker1", 3);
        this.remoteTaskRunner = this.rtrTestUtils.makeRemoteTaskRunner(new TestRemoteTaskRunnerConfig(new Period((Object)"PT3600S")){

            public int getPendingTasksRunnerNumThreads() {
                return 2;
            }
        });
        int numTasks = 6;
        ListenableFuture[] results = new ListenableFuture[numTasks];
        Task[] tasks = new Task[numTasks];
        for (i = 0; i < 2; ++i) {
            tasks[i] = TestTasks.unending("task" + i);
            results[i] = this.remoteTaskRunner.run(tasks[i]);
        }
        this.waitForBothWorkersToHaveUnackedTasks();
        for (i = 2; i < 5; ++i) {
            tasks[i] = TestTasks.unending("task" + i);
            results[i] = this.remoteTaskRunner.run(tasks[i]);
        }
        this.mockWorkerRunningAndCompletionSuccessfulTasks(tasks[0], tasks[1]);
        Assert.assertEquals((Object)TaskState.SUCCESS, (Object)((TaskStatus)results[0].get()).getStatusCode());
        Assert.assertEquals((Object)TaskState.SUCCESS, (Object)((TaskStatus)results[1].get()).getStatusCode());
        this.waitForBothWorkersToHaveUnackedTasks();
        if (this.remoteTaskRunner.getWorkersWithUnacknowledgedTask().containsValue(tasks[2].getId()) && this.remoteTaskRunner.getWorkersWithUnacknowledgedTask().containsValue(tasks[3].getId())) {
            this.remoteTaskRunner.shutdown("task4", "test");
            this.mockWorkerRunningAndCompletionSuccessfulTasks(tasks[3], tasks[2]);
            Assert.assertEquals((Object)TaskState.SUCCESS, (Object)((TaskStatus)results[3].get()).getStatusCode());
            Assert.assertEquals((Object)TaskState.SUCCESS, (Object)((TaskStatus)results[2].get()).getStatusCode());
        } else if (this.remoteTaskRunner.getWorkersWithUnacknowledgedTask().containsValue(tasks[3].getId()) && this.remoteTaskRunner.getWorkersWithUnacknowledgedTask().containsValue(tasks[4].getId())) {
            this.remoteTaskRunner.shutdown("task2", "test");
            this.mockWorkerRunningAndCompletionSuccessfulTasks(tasks[4], tasks[3]);
            Assert.assertEquals((Object)TaskState.SUCCESS, (Object)((TaskStatus)results[4].get()).getStatusCode());
            Assert.assertEquals((Object)TaskState.SUCCESS, (Object)((TaskStatus)results[3].get()).getStatusCode());
        } else if (this.remoteTaskRunner.getWorkersWithUnacknowledgedTask().containsValue(tasks[4].getId()) && this.remoteTaskRunner.getWorkersWithUnacknowledgedTask().containsValue(tasks[2].getId())) {
            this.remoteTaskRunner.shutdown("task3", "test");
            this.mockWorkerRunningAndCompletionSuccessfulTasks(tasks[4], tasks[2]);
            Assert.assertEquals((Object)TaskState.SUCCESS, (Object)((TaskStatus)results[4].get()).getStatusCode());
            Assert.assertEquals((Object)TaskState.SUCCESS, (Object)((TaskStatus)results[2].get()).getStatusCode());
        } else {
            throw new ISE("two out of three tasks 2,3 and 4 must be waiting for ack.", new Object[0]);
        }
        tasks[5] = TestTasks.unending("task5");
        results[5] = this.remoteTaskRunner.run(tasks[5]);
        this.waitForOneWorkerToHaveUnackedTasks();
        if (this.rtrTestUtils.taskAnnounced("worker0", tasks[5].getId())) {
            this.rtrTestUtils.mockWorkerRunningTask("worker0", tasks[5]);
            this.rtrTestUtils.mockWorkerCompleteSuccessfulTask("worker0", tasks[5]);
        } else {
            this.rtrTestUtils.mockWorkerRunningTask("worker1", tasks[5]);
            this.rtrTestUtils.mockWorkerCompleteSuccessfulTask("worker1", tasks[5]);
        }
        Assert.assertEquals((Object)TaskState.SUCCESS, (Object)((TaskStatus)results[5].get()).getStatusCode());
    }

    private void mockWorkerRunningAndCompletionSuccessfulTasks(Task t1, Task t2) throws Exception {
        if (this.rtrTestUtils.taskAnnounced("worker0", t1.getId())) {
            this.rtrTestUtils.mockWorkerRunningTask("worker0", t1);
            this.rtrTestUtils.mockWorkerCompleteSuccessfulTask("worker0", t1);
            this.rtrTestUtils.mockWorkerRunningTask("worker1", t2);
            this.rtrTestUtils.mockWorkerCompleteSuccessfulTask("worker1", t2);
        } else {
            this.rtrTestUtils.mockWorkerRunningTask("worker1", t1);
            this.rtrTestUtils.mockWorkerCompleteSuccessfulTask("worker1", t1);
            this.rtrTestUtils.mockWorkerRunningTask("worker0", t2);
            this.rtrTestUtils.mockWorkerCompleteSuccessfulTask("worker0", t2);
        }
    }

    private void waitForOneWorkerToHaveUnackedTasks() throws Exception {
        while (this.remoteTaskRunner.getWorkersWithUnacknowledgedTask().size() < 1) {
            Thread.sleep(5L);
        }
        ZooKeeper zk = this.rtrTestUtils.getCuratorFramework().getZookeeperClient().getZooKeeper();
        while (zk.getChildren(RemoteTaskRunnerTestUtils.TASKS_PATH + "/worker0", false).size() < 1 && zk.getChildren(RemoteTaskRunnerTestUtils.TASKS_PATH + "/worker1", false).size() < 1) {
            Thread.sleep(5L);
        }
    }

    private void waitForBothWorkersToHaveUnackedTasks() throws Exception {
        while (this.remoteTaskRunner.getWorkersWithUnacknowledgedTask().size() < 2) {
            Thread.sleep(5L);
        }
        ZooKeeper zk = this.rtrTestUtils.getCuratorFramework().getZookeeperClient().getZooKeeper();
        while (zk.getChildren(RemoteTaskRunnerTestUtils.TASKS_PATH + "/worker0", false).size() < 1 || zk.getChildren(RemoteTaskRunnerTestUtils.TASKS_PATH + "/worker1", false).size() < 1) {
            Thread.sleep(5L);
        }
    }
}

