/*
 * Decompiled with CFR 0.152.
 */
package org.apache.druid.indexing.overlord;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Function;
import com.google.common.base.Joiner;
import com.google.common.base.Optional;
import com.google.common.base.Predicate;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import com.google.common.io.ByteStreams;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.api.ChildrenDeletable;
import org.apache.curator.framework.recipes.cache.PathChildrenCache;
import org.apache.druid.indexer.TaskLocation;
import org.apache.druid.indexer.TaskState;
import org.apache.druid.indexer.TaskStatus;
import org.apache.druid.indexing.common.IndexingServiceCondition;
import org.apache.druid.indexing.common.TestRealtimeTask;
import org.apache.druid.indexing.common.TestTasks;
import org.apache.druid.indexing.common.TestUtils;
import org.apache.druid.indexing.common.task.Task;
import org.apache.druid.indexing.common.task.TaskResource;
import org.apache.druid.indexing.overlord.ImmutableWorkerInfo;
import org.apache.druid.indexing.overlord.RemoteTaskRunner;
import org.apache.druid.indexing.overlord.RemoteTaskRunnerTestUtils;
import org.apache.druid.indexing.overlord.RemoteTaskRunnerWorkItem;
import org.apache.druid.indexing.overlord.TestRemoteTaskRunnerConfig;
import org.apache.druid.indexing.overlord.ZkWorker;
import org.apache.druid.indexing.overlord.config.RemoteTaskRunnerConfig;
import org.apache.druid.indexing.worker.Worker;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.java.util.emitter.EmittingLogger;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.druid.java.util.emitter.service.ServiceEventBuilder;
import org.apache.druid.java.util.http.client.HttpClient;
import org.apache.druid.java.util.http.client.Request;
import org.apache.druid.java.util.http.client.response.HttpResponseHandler;
import org.apache.druid.server.metrics.NoopServiceEmitter;
import org.apache.druid.testing.DeadlockDetectingTimeout;
import org.easymock.Capture;
import org.easymock.EasyMock;
import org.joda.time.Period;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TestRule;
import org.junit.rules.TestWatcher;
import org.junit.runner.Description;
import org.mockito.Mockito;

public class RemoteTaskRunnerTest {
    private static final Logger LOG = new Logger(RemoteTaskRunnerTest.class);
    private static final Joiner JOINER = RemoteTaskRunnerTestUtils.JOINER;
    private static final String WORKER_HOST = "worker";
    private static final String ANNOUCEMENTS_PATH = JOINER.join((Object)RemoteTaskRunnerTestUtils.ANNOUNCEMENTS_PATH, (Object)"worker", new Object[0]);
    private static final String STATUS_PATH = JOINER.join((Object)RemoteTaskRunnerTestUtils.STATUS_PATH, (Object)"worker", new Object[0]);
    private static final Period TIMEOUT_PERIOD = Period.millis((int)30000);
    private RemoteTaskRunner remoteTaskRunner;
    private HttpClient httpClient;
    private RemoteTaskRunnerTestUtils rtrTestUtils = new RemoteTaskRunnerTestUtils();
    private ObjectMapper jsonMapper;
    private CuratorFramework cf;
    private Task task;
    private Worker worker;
    @Rule
    public TestRule watcher = new TestWatcher(){

        protected void starting(Description description) {
            LOG.info("Starting test: " + description.getMethodName(), new Object[0]);
        }

        protected void finished(Description description) {
            LOG.info("Finishing test: " + description.getMethodName(), new Object[0]);
        }
    };
    @Rule
    public final TestRule timeout = new DeadlockDetectingTimeout(60L, TimeUnit.SECONDS);

    @Before
    public void setUp() throws Exception {
        this.rtrTestUtils.setUp();
        this.jsonMapper = this.rtrTestUtils.getObjectMapper();
        this.cf = this.rtrTestUtils.getCuratorFramework();
        this.task = TestTasks.unending("task id with spaces");
        EmittingLogger.registerEmitter((ServiceEmitter)new NoopServiceEmitter());
    }

    @After
    public void tearDown() throws Exception {
        if (this.remoteTaskRunner != null) {
            this.remoteTaskRunner.stop();
        }
        this.rtrTestUtils.tearDown();
    }

    @Test
    public void testRun() throws Exception {
        this.doSetup();
        Assert.assertEquals((long)3L, (long)((Long)this.remoteTaskRunner.getTotalTaskSlotCount().get("_default_worker_category")));
        Assert.assertEquals((long)3L, (long)((Long)this.remoteTaskRunner.getIdleTaskSlotCount().get("_default_worker_category")));
        Assert.assertEquals((long)0L, (long)((Long)this.remoteTaskRunner.getUsedTaskSlotCount().get("_default_worker_category")));
        ListenableFuture result = this.remoteTaskRunner.run(this.task);
        Assert.assertTrue((boolean)this.taskAnnounced(this.task.getId()));
        this.mockWorkerRunningTask(this.task);
        Assert.assertTrue((boolean)this.workerRunningTask(this.task.getId()));
        this.mockWorkerCompleteSuccessfulTask(this.task);
        Assert.assertTrue((boolean)this.workerCompletedTask((ListenableFuture<TaskStatus>)result));
        Assert.assertEquals((Object)this.task.getId(), (Object)((TaskStatus)result.get()).getId());
        Assert.assertEquals((Object)TaskState.SUCCESS, (Object)((TaskStatus)result.get()).getStatusCode());
        ((ChildrenDeletable)this.cf.delete().guaranteed()).forPath(JOINER.join((Object)STATUS_PATH, (Object)this.task.getId(), new Object[0]));
        Assert.assertEquals((long)3L, (long)((Long)this.remoteTaskRunner.getTotalTaskSlotCount().get("_default_worker_category")));
        Assert.assertEquals((long)3L, (long)((Long)this.remoteTaskRunner.getIdleTaskSlotCount().get("_default_worker_category")));
        Assert.assertEquals((long)0L, (long)((Long)this.remoteTaskRunner.getUsedTaskSlotCount().get("_default_worker_category")));
    }

    @Test
    public void testRunTaskThatAlreadyPending() throws Exception {
        this.doSetup();
        this.remoteTaskRunner.addPendingTask(this.task);
        this.remoteTaskRunner.runPendingTasks();
        Assert.assertFalse((boolean)this.workerRunningTask(this.task.getId()));
        ListenableFuture result = this.remoteTaskRunner.run(this.task);
        Assert.assertTrue((boolean)this.taskAnnounced(this.task.getId()));
        this.mockWorkerRunningTask(this.task);
        Assert.assertTrue((boolean)this.workerRunningTask(this.task.getId()));
        this.mockWorkerCompleteSuccessfulTask(this.task);
        Assert.assertTrue((boolean)this.workerCompletedTask((ListenableFuture<TaskStatus>)result));
        Assert.assertEquals((Object)this.task.getId(), (Object)((TaskStatus)result.get()).getId());
        Assert.assertEquals((Object)TaskState.SUCCESS, (Object)((TaskStatus)result.get()).getStatusCode());
    }

    @Test
    public void testStartWithNoWorker() {
        this.makeRemoteTaskRunner(new TestRemoteTaskRunnerConfig(TIMEOUT_PERIOD));
    }

    @Test
    public void testRunExistingTaskThatHasntStartedRunning() throws Exception {
        this.doSetup();
        this.remoteTaskRunner.run(this.task);
        Assert.assertTrue((boolean)this.taskAnnounced(this.task.getId()));
        ListenableFuture result = this.remoteTaskRunner.run(this.task);
        Assert.assertFalse((boolean)result.isDone());
        this.mockWorkerRunningTask(this.task);
        Assert.assertTrue((boolean)this.workerRunningTask(this.task.getId()));
        this.mockWorkerCompleteSuccessfulTask(this.task);
        Assert.assertTrue((boolean)this.workerCompletedTask((ListenableFuture<TaskStatus>)result));
        Assert.assertEquals((Object)this.task.getId(), (Object)((TaskStatus)result.get()).getId());
        Assert.assertEquals((Object)TaskState.SUCCESS, (Object)((TaskStatus)result.get()).getStatusCode());
    }

    @Test
    public void testRunExistingTaskThatHasStartedRunning() throws Exception {
        this.doSetup();
        this.remoteTaskRunner.run(this.task);
        Assert.assertTrue((boolean)this.taskAnnounced(this.task.getId()));
        this.mockWorkerRunningTask(this.task);
        Assert.assertTrue((boolean)this.workerRunningTask(this.task.getId()));
        ListenableFuture result = this.remoteTaskRunner.run(this.task);
        Assert.assertFalse((boolean)result.isDone());
        this.mockWorkerCompleteSuccessfulTask(this.task);
        Assert.assertTrue((boolean)this.workerCompletedTask((ListenableFuture<TaskStatus>)result));
        Assert.assertEquals((Object)this.task.getId(), (Object)((TaskStatus)result.get()).getId());
        Assert.assertEquals((Object)TaskState.SUCCESS, (Object)((TaskStatus)result.get()).getStatusCode());
    }

    @Test
    public void testRunTooMuchZKData() throws Exception {
        ServiceEmitter emitter = (ServiceEmitter)EasyMock.createMock(ServiceEmitter.class);
        EmittingLogger.registerEmitter((ServiceEmitter)emitter);
        EasyMock.replay((Object[])new Object[]{emitter});
        this.doSetup();
        this.remoteTaskRunner.run(TestTasks.unending(new String(new char[5000])));
        EasyMock.verify((Object[])new Object[]{emitter});
    }

    @Test
    public void testRunSameAvailabilityGroup() throws Exception {
        this.doSetup();
        TestRealtimeTask task1 = new TestRealtimeTask("rt1", new TaskResource("rt1", 1), "foo", TaskStatus.running((String)"rt1"), this.jsonMapper);
        this.remoteTaskRunner.run((Task)task1);
        Assert.assertTrue((boolean)this.taskAnnounced(task1.getId()));
        this.mockWorkerRunningTask((Task)task1);
        TestRealtimeTask task2 = new TestRealtimeTask("rt2", new TaskResource("rt1", 1), "foo", TaskStatus.running((String)"rt2"), this.jsonMapper);
        this.remoteTaskRunner.run((Task)task2);
        TestRealtimeTask task3 = new TestRealtimeTask("rt3", new TaskResource("rt2", 1), "foo", TaskStatus.running((String)"rt3"), this.jsonMapper);
        this.remoteTaskRunner.run((Task)task3);
        Assert.assertTrue((boolean)TestUtils.conditionValid(new IndexingServiceCondition(){

            @Override
            public boolean isValid() {
                return RemoteTaskRunnerTest.this.remoteTaskRunner.getRunningTasks().size() == 2;
            }
        }));
        Assert.assertTrue((boolean)TestUtils.conditionValid(new IndexingServiceCondition(){

            @Override
            public boolean isValid() {
                return RemoteTaskRunnerTest.this.remoteTaskRunner.getPendingTasks().size() == 1;
            }
        }));
        Assert.assertTrue((boolean)((RemoteTaskRunnerWorkItem)this.remoteTaskRunner.getPendingTasks().iterator().next()).getTaskId().equals("rt2"));
    }

    @Test
    public void testRunWithCapacity() throws Exception {
        this.doSetup();
        TestRealtimeTask task1 = new TestRealtimeTask("rt1", new TaskResource("rt1", 1), "foo", TaskStatus.running((String)"rt1"), this.jsonMapper);
        this.remoteTaskRunner.run((Task)task1);
        Assert.assertTrue((boolean)this.taskAnnounced(task1.getId()));
        this.mockWorkerRunningTask((Task)task1);
        TestRealtimeTask task2 = new TestRealtimeTask("rt2", new TaskResource("rt2", 3), "foo", TaskStatus.running((String)"rt2"), this.jsonMapper);
        this.remoteTaskRunner.run((Task)task2);
        TestRealtimeTask task3 = new TestRealtimeTask("rt3", new TaskResource("rt3", 2), "foo", TaskStatus.running((String)"rt3"), this.jsonMapper);
        this.remoteTaskRunner.run((Task)task3);
        Assert.assertTrue((boolean)this.taskAnnounced(task3.getId()));
        this.mockWorkerRunningTask((Task)task3);
        Assert.assertTrue((boolean)TestUtils.conditionValid(new IndexingServiceCondition(){

            @Override
            public boolean isValid() {
                return RemoteTaskRunnerTest.this.remoteTaskRunner.getRunningTasks().size() == 2;
            }
        }));
        Assert.assertTrue((boolean)TestUtils.conditionValid(new IndexingServiceCondition(){

            @Override
            public boolean isValid() {
                return RemoteTaskRunnerTest.this.remoteTaskRunner.getPendingTasks().size() == 1;
            }
        }));
        Assert.assertTrue((boolean)((RemoteTaskRunnerWorkItem)this.remoteTaskRunner.getPendingTasks().iterator().next()).getTaskId().equals("rt2"));
    }

    @Test
    public void testStatusRemoved() throws Exception {
        this.doSetup();
        ListenableFuture future = this.remoteTaskRunner.run(this.task);
        Assert.assertTrue((boolean)this.taskAnnounced(this.task.getId()));
        this.mockWorkerRunningTask(this.task);
        Assert.assertTrue((boolean)this.workerRunningTask(this.task.getId()));
        Assert.assertTrue((boolean)((RemoteTaskRunnerWorkItem)this.remoteTaskRunner.getRunningTasks().iterator().next()).getTaskId().equals(this.task.getId()));
        this.cf.delete().forPath(JOINER.join((Object)STATUS_PATH, (Object)this.task.getId(), new Object[0]));
        TaskStatus status = (TaskStatus)future.get();
        Assert.assertEquals((Object)status.getStatusCode(), (Object)TaskState.FAILED);
        Assert.assertNotNull((Object)status.getErrorMsg());
        Assert.assertTrue((boolean)status.getErrorMsg().contains("The worker that this task was assigned disappeared"));
    }

    @Test
    public void testBootstrap() throws Exception {
        this.makeWorker();
        TestRemoteTaskRunnerConfig rtrConfig = new TestRemoteTaskRunnerConfig(TIMEOUT_PERIOD);
        rtrConfig.setMaxPercentageBlacklistWorkers(100);
        this.makeRemoteTaskRunner(rtrConfig);
        TestRealtimeTask task1 = new TestRealtimeTask("first", new TaskResource("first", 1), "foo", TaskStatus.running((String)"first"), this.jsonMapper);
        this.remoteTaskRunner.run((Task)task1);
        Assert.assertTrue((boolean)this.taskAnnounced(task1.getId()));
        this.mockWorkerRunningTask((Task)task1);
        TestRealtimeTask task = new TestRealtimeTask("second", new TaskResource("task", 2), "foo", TaskStatus.running((String)"task"), this.jsonMapper);
        this.remoteTaskRunner.run((Task)task);
        TestRealtimeTask task2 = new TestRealtimeTask("second", new TaskResource("second", 2), "foo", TaskStatus.running((String)"second"), this.jsonMapper);
        this.remoteTaskRunner.run((Task)task2);
        Assert.assertTrue((boolean)this.taskAnnounced(task2.getId()));
        this.mockWorkerRunningTask((Task)task2);
        HashSet runningTasks = Sets.newHashSet((Iterable)Iterables.transform((Iterable)this.remoteTaskRunner.getRunningTasks(), (Function)new Function<RemoteTaskRunnerWorkItem, String>(){

            public String apply(RemoteTaskRunnerWorkItem input) {
                return input.getTaskId();
            }
        }));
        Assert.assertEquals((String)"runningTasks", (Object)ImmutableSet.of((Object)"first", (Object)"second"), (Object)runningTasks);
    }

    @Test
    public void testRunWithTaskComplete() throws Exception {
        this.doSetup();
        TestRealtimeTask task1 = new TestRealtimeTask("testTask", new TaskResource("testTask", 2), "foo", TaskStatus.success((String)"testTask"), this.jsonMapper);
        this.remoteTaskRunner.run((Task)task1);
        Assert.assertTrue((boolean)this.taskAnnounced(task1.getId()));
        this.mockWorkerRunningTask((Task)task1);
        this.mockWorkerCompleteSuccessfulTask((Task)task1);
        Assert.assertEquals((Object)TaskState.SUCCESS, (Object)((TaskStatus)this.remoteTaskRunner.run((Task)task1).get()).getStatusCode());
    }

    @Test
    public void testWorkerRemoved() throws Exception {
        this.doSetup();
        Assert.assertEquals((long)3L, (long)((Long)this.remoteTaskRunner.getTotalTaskSlotCount().get("_default_worker_category")));
        Assert.assertEquals((long)3L, (long)((Long)this.remoteTaskRunner.getIdleTaskSlotCount().get("_default_worker_category")));
        ListenableFuture future = this.remoteTaskRunner.run(this.task);
        Assert.assertTrue((boolean)this.taskAnnounced(this.task.getId()));
        this.mockWorkerRunningTask(this.task);
        Assert.assertTrue((boolean)this.workerRunningTask(this.task.getId()));
        this.cf.delete().forPath(ANNOUCEMENTS_PATH);
        TaskStatus status = (TaskStatus)future.get();
        Assert.assertEquals((Object)TaskState.FAILED, (Object)status.getStatusCode());
        Assert.assertNotNull((Object)status.getErrorMsg());
        Assert.assertTrue((boolean)status.getErrorMsg().contains("Canceled for worker cleanup"));
        RemoteTaskRunnerConfig config = this.remoteTaskRunner.getRemoteTaskRunnerConfig();
        Assert.assertTrue((boolean)TestUtils.conditionValid(new IndexingServiceCondition(){

            @Override
            public boolean isValid() {
                return RemoteTaskRunnerTest.this.remoteTaskRunner.getRemovedWorkerCleanups().isEmpty();
            }
        }, config.getTaskCleanupTimeout().toStandardDuration().getMillis() * 2L));
        Assert.assertNull((Object)this.cf.checkExists().forPath(STATUS_PATH));
        Assert.assertFalse((boolean)this.remoteTaskRunner.getTotalTaskSlotCount().containsKey("_default_worker_category"));
        Assert.assertFalse((boolean)this.remoteTaskRunner.getIdleTaskSlotCount().containsKey("_default_worker_category"));
    }

    @Test
    public void testWorkerDisabled() throws Exception {
        this.doSetup();
        ListenableFuture result = this.remoteTaskRunner.run(this.task);
        Assert.assertTrue((boolean)this.taskAnnounced(this.task.getId()));
        this.mockWorkerRunningTask(this.task);
        Assert.assertTrue((boolean)this.workerRunningTask(this.task.getId()));
        this.disableWorker();
        this.mockWorkerCompleteSuccessfulTask(this.task);
        Assert.assertTrue((boolean)this.workerCompletedTask((ListenableFuture<TaskStatus>)result));
        Assert.assertEquals((Object)this.task.getId(), (Object)((TaskStatus)result.get()).getId());
        Assert.assertEquals((Object)TaskState.SUCCESS, (Object)((TaskStatus)result.get()).getStatusCode());
        Assert.assertEquals((Object)"", (Object)((ImmutableWorkerInfo)Iterables.getOnlyElement((Iterable)this.remoteTaskRunner.getWorkers())).getWorker().getVersion());
    }

    @Test
    public void testRestartRemoteTaskRunner() throws Exception {
        this.doSetup();
        this.remoteTaskRunner.run(this.task);
        Assert.assertTrue((boolean)this.taskAnnounced(this.task.getId()));
        this.mockWorkerRunningTask(this.task);
        Assert.assertTrue((boolean)this.workerRunningTask(this.task.getId()));
        this.remoteTaskRunner.stop();
        this.makeRemoteTaskRunner(new TestRemoteTaskRunnerConfig(TIMEOUT_PERIOD));
        RemoteTaskRunnerWorkItem newWorkItem = this.remoteTaskRunner.getKnownTasks().stream().filter(workItem -> workItem.getTaskId().equals(this.task.getId())).findFirst().orElse(null);
        ListenableFuture result = newWorkItem.getResult();
        this.mockWorkerCompleteSuccessfulTask(this.task);
        Assert.assertTrue((boolean)this.workerCompletedTask((ListenableFuture<TaskStatus>)result));
        Assert.assertEquals((Object)this.task.getId(), (Object)((TaskStatus)result.get()).getId());
        Assert.assertEquals((Object)TaskState.SUCCESS, (Object)((TaskStatus)result.get()).getStatusCode());
    }

    @Test
    public void testRunPendingTaskFailToAssignTask() throws Exception {
        this.doSetup();
        Thread.sleep(100L);
        RemoteTaskRunnerWorkItem originalItem = this.remoteTaskRunner.addPendingTask(this.task);
        RemoteTaskRunnerWorkItem wankyItem = (RemoteTaskRunnerWorkItem)Mockito.mock(RemoteTaskRunnerWorkItem.class);
        Mockito.when((Object)wankyItem.getTaskId()).thenReturn((Object)originalItem.getTaskId()).thenReturn((Object)"wrongId");
        this.remoteTaskRunner.runPendingTask(wankyItem);
        TaskStatus taskStatus = (TaskStatus)originalItem.getResult().get(0L, TimeUnit.MILLISECONDS);
        Assert.assertEquals((Object)TaskState.FAILED, (Object)taskStatus.getStatusCode());
        Assert.assertEquals((Object)"Failed to assign this task. See overlord logs for more details.", (Object)taskStatus.getErrorMsg());
    }

    @Test
    public void testRunPendingTaskTimeoutToAssign() throws Exception {
        this.makeWorker();
        this.makeRemoteTaskRunner(new TestRemoteTaskRunnerConfig(TIMEOUT_PERIOD));
        RemoteTaskRunnerWorkItem workItem = this.remoteTaskRunner.addPendingTask(this.task);
        this.remoteTaskRunner.runPendingTask(workItem);
        TaskStatus taskStatus = (TaskStatus)workItem.getResult().get(0L, TimeUnit.MILLISECONDS);
        Assert.assertEquals((Object)TaskState.FAILED, (Object)taskStatus.getStatusCode());
        Assert.assertNotNull((Object)taskStatus.getErrorMsg());
        Assert.assertTrue((boolean)taskStatus.getErrorMsg().startsWith("The worker that this task is assigned did not start it in timeout"));
    }

    private void doSetup() throws Exception {
        this.makeWorker();
        this.makeRemoteTaskRunner(new TestRemoteTaskRunnerConfig(TIMEOUT_PERIOD));
    }

    private void makeRemoteTaskRunner(RemoteTaskRunnerConfig config) {
        this.httpClient = (HttpClient)EasyMock.createMock(HttpClient.class);
        this.remoteTaskRunner = this.rtrTestUtils.makeRemoteTaskRunner(config, this.httpClient);
    }

    private void makeWorker() throws Exception {
        this.worker = this.rtrTestUtils.makeWorker(WORKER_HOST, 3);
    }

    private void disableWorker() throws Exception {
        this.rtrTestUtils.disableWorker(this.worker);
    }

    private boolean taskAnnounced(String taskId) {
        return this.rtrTestUtils.taskAnnounced(WORKER_HOST, taskId);
    }

    private boolean workerRunningTask(String taskId) {
        return this.rtrTestUtils.workerRunningTask(WORKER_HOST, taskId);
    }

    private boolean workerCompletedTask(final ListenableFuture<TaskStatus> result) {
        return TestUtils.conditionValid(new IndexingServiceCondition(){

            @Override
            public boolean isValid() {
                return result.isDone();
            }
        });
    }

    private void mockWorkerRunningTask(Task task) throws Exception {
        this.rtrTestUtils.mockWorkerRunningTask(WORKER_HOST, task);
    }

    private void mockWorkerCompleteSuccessfulTask(Task task) throws Exception {
        this.rtrTestUtils.mockWorkerCompleteSuccessfulTask(WORKER_HOST, task);
    }

    private void mockWorkerCompleteFailedTask(Task task) throws Exception {
        this.rtrTestUtils.mockWorkerCompleteFailedTask(WORKER_HOST, task);
    }

    @Test
    public void testFindLazyWorkerTaskRunning() throws Exception {
        this.doSetup();
        this.remoteTaskRunner.start();
        this.remoteTaskRunner.run(this.task);
        Assert.assertTrue((boolean)this.taskAnnounced(this.task.getId()));
        this.mockWorkerRunningTask(this.task);
        Collection lazyworkers = this.remoteTaskRunner.markWorkersLazy((Predicate)new Predicate<ImmutableWorkerInfo>(){

            public boolean apply(ImmutableWorkerInfo input) {
                return true;
            }
        }, 1);
        Assert.assertTrue((boolean)lazyworkers.isEmpty());
        Assert.assertTrue((boolean)this.remoteTaskRunner.getLazyWorkers().isEmpty());
        Assert.assertEquals((long)1L, (long)this.remoteTaskRunner.getWorkers().size());
    }

    @Test
    public void testFindLazyWorkerForWorkerJustAssignedTask() throws Exception {
        this.doSetup();
        this.remoteTaskRunner.run(this.task);
        Assert.assertTrue((boolean)this.taskAnnounced(this.task.getId()));
        Collection lazyworkers = this.remoteTaskRunner.markWorkersLazy((Predicate)new Predicate<ImmutableWorkerInfo>(){

            public boolean apply(ImmutableWorkerInfo input) {
                return true;
            }
        }, 1);
        Assert.assertTrue((boolean)lazyworkers.isEmpty());
        Assert.assertTrue((boolean)this.remoteTaskRunner.getLazyWorkers().isEmpty());
        Assert.assertEquals((long)1L, (long)this.remoteTaskRunner.getWorkers().size());
    }

    @Test
    public void testFindLazyWorkerNotRunningAnyTask() throws Exception {
        this.doSetup();
        Collection lazyworkers = this.remoteTaskRunner.markWorkersLazy((Predicate)new Predicate<ImmutableWorkerInfo>(){

            public boolean apply(ImmutableWorkerInfo input) {
                return true;
            }
        }, 1);
        Assert.assertEquals((long)1L, (long)lazyworkers.size());
        Assert.assertEquals((long)1L, (long)this.remoteTaskRunner.getLazyWorkers().size());
        Assert.assertEquals((long)3L, (long)((Long)this.remoteTaskRunner.getTotalTaskSlotCount().get("_default_worker_category")));
        Assert.assertFalse((boolean)this.remoteTaskRunner.getIdleTaskSlotCount().containsKey("_default_worker_category"));
        Assert.assertEquals((long)3L, (long)((Long)this.remoteTaskRunner.getLazyTaskSlotCount().get("_default_worker_category")));
    }

    @Test
    public void testFindLazyWorkerNotRunningAnyTaskButWithZeroMaxWorkers() throws Exception {
        this.doSetup();
        Collection lazyworkers = this.remoteTaskRunner.markWorkersLazy((Predicate)new Predicate<ImmutableWorkerInfo>(){

            public boolean apply(ImmutableWorkerInfo input) {
                return true;
            }
        }, 0);
        Assert.assertEquals((long)0L, (long)lazyworkers.size());
        Assert.assertEquals((long)0L, (long)this.remoteTaskRunner.getLazyWorkers().size());
    }

    @Test
    public void testWorkerZKReconnect() throws Exception {
        this.makeWorker();
        this.makeRemoteTaskRunner(new TestRemoteTaskRunnerConfig(new Period((Object)"PT5M")));
        ListenableFuture future = this.remoteTaskRunner.run(this.task);
        Assert.assertTrue((boolean)this.taskAnnounced(this.task.getId()));
        this.mockWorkerRunningTask(this.task);
        Assert.assertTrue((boolean)this.workerRunningTask(this.task.getId()));
        byte[] bytes = (byte[])this.cf.getData().forPath(ANNOUCEMENTS_PATH);
        this.cf.delete().forPath(ANNOUCEMENTS_PATH);
        Assert.assertTrue((boolean)TestUtils.conditionValid(new IndexingServiceCondition(){

            @Override
            public boolean isValid() {
                return RemoteTaskRunnerTest.this.remoteTaskRunner.getRemovedWorkerCleanups().containsKey(RemoteTaskRunnerTest.this.worker.getHost());
            }
        }));
        this.cf.create().forPath(ANNOUCEMENTS_PATH, bytes);
        Assert.assertTrue((boolean)TestUtils.conditionValid(new IndexingServiceCondition(){

            @Override
            public boolean isValid() {
                return !RemoteTaskRunnerTest.this.remoteTaskRunner.getRemovedWorkerCleanups().containsKey(RemoteTaskRunnerTest.this.worker.getHost());
            }
        }));
        this.mockWorkerCompleteSuccessfulTask(this.task);
        TaskStatus status = (TaskStatus)future.get();
        Assert.assertEquals((Object)status.getStatusCode(), (Object)TaskState.SUCCESS);
        Assert.assertEquals((Object)TaskState.SUCCESS, (Object)status.getStatusCode());
    }

    @Test
    public void testSortByInsertionTime() {
        RemoteTaskRunnerWorkItem item1 = new RemoteTaskRunnerWorkItem("b", "t", null, null, "ds_test").withQueueInsertionTime(DateTimes.of((String)"2015-01-01T00:00:03Z"));
        RemoteTaskRunnerWorkItem item2 = new RemoteTaskRunnerWorkItem("a", "t", null, null, "ds_test").withQueueInsertionTime(DateTimes.of((String)"2015-01-01T00:00:02Z"));
        RemoteTaskRunnerWorkItem item3 = new RemoteTaskRunnerWorkItem("c", "t", null, null, "ds_test").withQueueInsertionTime(DateTimes.of((String)"2015-01-01T00:00:01Z"));
        ArrayList workItems = Lists.newArrayList((Object[])new RemoteTaskRunnerWorkItem[]{item1, item2, item3});
        RemoteTaskRunner.sortByInsertionTime((List)workItems);
        Assert.assertEquals((Object)item3, workItems.get(0));
        Assert.assertEquals((Object)item2, workItems.get(1));
        Assert.assertEquals((Object)item1, workItems.get(2));
    }

    @Test
    public void testBlacklistZKWorkers() throws Exception {
        this.makeWorker();
        TestRemoteTaskRunnerConfig rtrConfig = new TestRemoteTaskRunnerConfig(TIMEOUT_PERIOD);
        rtrConfig.setMaxPercentageBlacklistWorkers(100);
        this.makeRemoteTaskRunner(rtrConfig);
        TestRealtimeTask task1 = new TestRealtimeTask("realtime1", new TaskResource("realtime1", 1), "foo", TaskStatus.success((String)"realtime1"), this.jsonMapper);
        ListenableFuture taskFuture1 = this.remoteTaskRunner.run((Task)task1);
        Assert.assertTrue((boolean)this.taskAnnounced(task1.getId()));
        this.mockWorkerRunningTask((Task)task1);
        this.mockWorkerCompleteFailedTask((Task)task1);
        Assert.assertTrue((boolean)((TaskStatus)taskFuture1.get()).isFailure());
        Assert.assertEquals((long)0L, (long)this.remoteTaskRunner.getBlackListedWorkers().size());
        Assert.assertEquals((long)1L, (long)this.remoteTaskRunner.findWorkerRunningTask(task1.getId()).getContinuouslyFailedTasksCount());
        TestRealtimeTask task2 = new TestRealtimeTask("realtime2", new TaskResource("realtime2", 1), "foo", TaskStatus.running((String)"realtime2"), this.jsonMapper);
        ListenableFuture taskFuture2 = this.remoteTaskRunner.run((Task)task2);
        Assert.assertTrue((boolean)this.taskAnnounced(task2.getId()));
        this.mockWorkerRunningTask((Task)task2);
        this.mockWorkerCompleteFailedTask((Task)task2);
        Assert.assertTrue((boolean)((TaskStatus)taskFuture2.get()).isFailure());
        Assert.assertEquals((long)1L, (long)this.remoteTaskRunner.getBlackListedWorkers().size());
        Assert.assertEquals((long)2L, (long)this.remoteTaskRunner.findWorkerRunningTask(task2.getId()).getContinuouslyFailedTasksCount());
        ((RemoteTaskRunnerTestUtils.TestableRemoteTaskRunner)this.remoteTaskRunner).setCurrentTimeMillis(System.currentTimeMillis());
        this.remoteTaskRunner.checkBlackListedNodes();
        Assert.assertEquals((long)1L, (long)this.remoteTaskRunner.getBlackListedWorkers().size());
        ((RemoteTaskRunnerTestUtils.TestableRemoteTaskRunner)this.remoteTaskRunner).setCurrentTimeMillis(System.currentTimeMillis() + 2L * TIMEOUT_PERIOD.toStandardDuration().getMillis());
        this.remoteTaskRunner.checkBlackListedNodes();
        Assert.assertEquals((long)0L, (long)this.remoteTaskRunner.getBlackListedWorkers().size());
        Assert.assertEquals((long)0L, (long)this.remoteTaskRunner.findWorkerRunningTask(task2.getId()).getContinuouslyFailedTasksCount());
        TestRealtimeTask task3 = new TestRealtimeTask("realtime3", new TaskResource("realtime3", 1), "foo", TaskStatus.running((String)"realtime3"), this.jsonMapper);
        ListenableFuture taskFuture3 = this.remoteTaskRunner.run((Task)task3);
        Assert.assertTrue((boolean)this.taskAnnounced(task3.getId()));
        this.mockWorkerRunningTask((Task)task3);
        this.mockWorkerCompleteSuccessfulTask((Task)task3);
        Assert.assertTrue((boolean)((TaskStatus)taskFuture3.get()).isSuccess());
        Assert.assertEquals((long)0L, (long)this.remoteTaskRunner.getBlackListedWorkers().size());
        Assert.assertEquals((long)0L, (long)this.remoteTaskRunner.findWorkerRunningTask(task3.getId()).getContinuouslyFailedTasksCount());
    }

    @Test
    public void testBlacklistZKWorkers25Percent() throws Exception {
        this.rtrTestUtils.makeWorker(WORKER_HOST, 10);
        this.rtrTestUtils.makeWorker("worker2", 10);
        TestRemoteTaskRunnerConfig rtrConfig = new TestRemoteTaskRunnerConfig(TIMEOUT_PERIOD);
        rtrConfig.setMaxPercentageBlacklistWorkers(25);
        this.makeRemoteTaskRunner(rtrConfig);
        String firstWorker = null;
        String secondWorker = null;
        for (int i = 1; i < 13; ++i) {
            String taskId = StringUtils.format((String)"rt-%d", (Object[])new Object[]{i});
            TestRealtimeTask task = new TestRealtimeTask(taskId, new TaskResource(taskId, 1), "foo", TaskStatus.success((String)taskId), this.jsonMapper);
            ListenableFuture taskFuture = this.remoteTaskRunner.run((Task)task);
            if (i == 1) {
                if (this.rtrTestUtils.taskAnnounced("worker2", task.getId())) {
                    firstWorker = "worker2";
                    secondWorker = WORKER_HOST;
                } else {
                    firstWorker = WORKER_HOST;
                    secondWorker = "worker2";
                }
            }
            String expectedWorker = i % 2 == 0 ? secondWorker : firstWorker;
            Assert.assertTrue((boolean)this.rtrTestUtils.taskAnnounced(expectedWorker, task.getId()));
            this.rtrTestUtils.mockWorkerRunningTask(expectedWorker, (Task)task);
            this.rtrTestUtils.mockWorkerCompleteFailedTask(expectedWorker, (Task)task);
            Assert.assertTrue((boolean)((TaskStatus)taskFuture.get()).isFailure());
            Assert.assertEquals((long)0L, (long)this.remoteTaskRunner.getBlackListedWorkers().size());
            Assert.assertEquals((long)((i + 1) / 2), (long)this.remoteTaskRunner.findWorkerRunningTask(task.getId()).getContinuouslyFailedTasksCount());
        }
    }

    @Test
    public void testBlacklistZKWorkers50Percent() throws Exception {
        this.rtrTestUtils.makeWorker(WORKER_HOST, 10);
        this.rtrTestUtils.makeWorker("worker2", 10);
        TestRemoteTaskRunnerConfig rtrConfig = new TestRemoteTaskRunnerConfig(TIMEOUT_PERIOD);
        rtrConfig.setMaxPercentageBlacklistWorkers(50);
        this.makeRemoteTaskRunner(rtrConfig);
        String firstWorker = null;
        String secondWorker = null;
        for (int i = 1; i < 13; ++i) {
            String taskId = StringUtils.format((String)"rt-%d", (Object[])new Object[]{i});
            TestRealtimeTask task = new TestRealtimeTask(taskId, new TaskResource(taskId, 1), "foo", TaskStatus.success((String)taskId), this.jsonMapper);
            ListenableFuture taskFuture = this.remoteTaskRunner.run((Task)task);
            if (i == 1) {
                if (this.rtrTestUtils.taskAnnounced("worker2", task.getId())) {
                    firstWorker = "worker2";
                    secondWorker = WORKER_HOST;
                } else {
                    firstWorker = WORKER_HOST;
                    secondWorker = "worker2";
                }
            }
            String expectedWorker = i % 2 == 0 || i > 4 ? secondWorker : firstWorker;
            Assert.assertTrue((boolean)this.rtrTestUtils.taskAnnounced(expectedWorker, task.getId()));
            this.rtrTestUtils.mockWorkerRunningTask(expectedWorker, (Task)task);
            this.rtrTestUtils.mockWorkerCompleteFailedTask(expectedWorker, (Task)task);
            Assert.assertTrue((boolean)((TaskStatus)taskFuture.get()).isFailure());
            Assert.assertEquals((long)(i > 2 ? 1L : 0L), (long)this.remoteTaskRunner.getBlackListedWorkers().size());
            Assert.assertEquals((long)(i > 4 ? (long)(i - 2) : (long)((i + 1) / 2)), (long)this.remoteTaskRunner.findWorkerRunningTask(task.getId()).getContinuouslyFailedTasksCount());
        }
    }

    @Test
    public void testSuccessfulTaskOnBlacklistedWorker() throws Exception {
        this.makeWorker();
        TestRemoteTaskRunnerConfig rtrConfig = new TestRemoteTaskRunnerConfig(TIMEOUT_PERIOD);
        rtrConfig.setMaxPercentageBlacklistWorkers(100);
        this.makeRemoteTaskRunner(rtrConfig);
        TestRealtimeTask task1 = new TestRealtimeTask("realtime1", new TaskResource("realtime1", 1), "foo", TaskStatus.success((String)"realtime1"), this.jsonMapper);
        TestRealtimeTask task2 = new TestRealtimeTask("realtime2", new TaskResource("realtime2", 1), "foo", TaskStatus.success((String)"realtime2"), this.jsonMapper);
        TestRealtimeTask task3 = new TestRealtimeTask("realtime3", new TaskResource("realtime3", 1), "foo", TaskStatus.success((String)"realtime3"), this.jsonMapper);
        ListenableFuture taskFuture1 = this.remoteTaskRunner.run((Task)task1);
        Assert.assertTrue((boolean)this.taskAnnounced(task1.getId()));
        this.mockWorkerRunningTask((Task)task1);
        this.mockWorkerCompleteFailedTask((Task)task1);
        Assert.assertTrue((boolean)((TaskStatus)taskFuture1.get()).isFailure());
        Assert.assertEquals((long)0L, (long)this.remoteTaskRunner.getBlackListedWorkers().size());
        Assert.assertFalse((boolean)this.remoteTaskRunner.getBlacklistedTaskSlotCount().containsKey("_default_worker_category"));
        ListenableFuture taskFuture2 = this.remoteTaskRunner.run((Task)task2);
        Assert.assertTrue((boolean)this.taskAnnounced(task2.getId()));
        this.mockWorkerRunningTask((Task)task2);
        Assert.assertFalse((boolean)this.remoteTaskRunner.getBlacklistedTaskSlotCount().containsKey("_default_worker_category"));
        ListenableFuture taskFuture3 = this.remoteTaskRunner.run((Task)task3);
        Assert.assertTrue((boolean)this.taskAnnounced(task3.getId()));
        this.mockWorkerRunningTask((Task)task3);
        this.mockWorkerCompleteFailedTask((Task)task3);
        Assert.assertTrue((boolean)((TaskStatus)taskFuture3.get()).isFailure());
        Assert.assertEquals((long)1L, (long)this.remoteTaskRunner.getBlackListedWorkers().size());
        Assert.assertEquals((long)3L, (long)((Long)this.remoteTaskRunner.getBlacklistedTaskSlotCount().get("_default_worker_category")));
        this.mockWorkerCompleteSuccessfulTask((Task)task2);
        Assert.assertTrue((boolean)((TaskStatus)taskFuture2.get()).isSuccess());
        Assert.assertEquals((long)0L, (long)this.remoteTaskRunner.getBlackListedWorkers().size());
        Assert.assertFalse((boolean)this.remoteTaskRunner.getBlacklistedTaskSlotCount().containsKey("_default_worker_category"));
    }

    @Test
    public void testStatusListenerEventDataNullShouldNotThrowException() throws Exception {
        Worker worker = (Worker)EasyMock.createMock(Worker.class);
        EasyMock.expect((Object)worker.getHost()).andReturn((Object)"host").atLeastOnce();
        EasyMock.replay((Object[])new Object[]{worker});
        ServiceEmitter emitter = (ServiceEmitter)EasyMock.createMock(ServiceEmitter.class);
        Capture capturedArgument = Capture.newInstance();
        emitter.emit((ServiceEventBuilder)EasyMock.capture((Capture)capturedArgument));
        EasyMock.expectLastCall().atLeastOnce();
        EmittingLogger.registerEmitter((ServiceEmitter)emitter);
        EasyMock.replay((Object[])new Object[]{emitter});
        PathChildrenCache cache = new PathChildrenCache(this.cf, "/test", true);
        this.testStartWithNoWorker();
        cache.getListenable().addListener((Object)this.remoteTaskRunner.getStatusListener(worker, new ZkWorker(worker, cache, this.jsonMapper), null));
        cache.start(PathChildrenCache.StartMode.POST_INITIALIZED_EVENT);
        Assert.assertTrue((boolean)TestUtils.conditionValid(() -> cache.getCurrentData().size() == 1));
        EasyMock.verify((Object[])new Object[]{worker});
        EasyMock.verify((Object[])new Object[]{emitter});
        Map alertDataMap = ((EmittingLogger.EmittingAlertBuilder)capturedArgument.getValue()).build(null).getDataMap();
        Assert.assertTrue((boolean)alertDataMap.containsKey("znode"));
        Assert.assertNull(alertDataMap.get("znode"));
    }

    @Test
    public void testStreamTaskReportsUnknownTask() throws Exception {
        this.doSetup();
        Assert.assertEquals((Object)Optional.absent(), (Object)this.remoteTaskRunner.streamTaskReports("foo"));
    }

    @Test
    public void testStreamTaskReportsKnownTask() throws Exception {
        this.doSetup();
        Capture capturedRequest = Capture.newInstance();
        String reportString = "my report!";
        ByteArrayInputStream reportResponse = new ByteArrayInputStream(StringUtils.toUtf8((String)"my report!"));
        EasyMock.expect((Object)this.httpClient.go((Request)EasyMock.capture((Capture)capturedRequest), (HttpResponseHandler)EasyMock.anyObject())).andReturn((Object)Futures.immediateFuture((Object)reportResponse));
        EasyMock.replay((Object[])new Object[]{this.httpClient});
        ListenableFuture result = this.remoteTaskRunner.run(this.task);
        Assert.assertTrue((boolean)this.taskAnnounced(this.task.getId()));
        this.mockWorkerRunningTask(this.task);
        Assert.assertTrue((boolean)TestUtils.conditionValid(() -> !this.remoteTaskRunner.getRunningTasks().isEmpty() && !((RemoteTaskRunnerWorkItem)Iterables.getOnlyElement((Iterable)this.remoteTaskRunner.getRunningTasks())).getLocation().equals((Object)TaskLocation.unknown())));
        InputStream in = (InputStream)this.remoteTaskRunner.streamTaskReports(this.task.getId()).get();
        ByteArrayOutputStream baos = new ByteArrayOutputStream();
        ByteStreams.copy((InputStream)in, (OutputStream)baos);
        Assert.assertEquals((Object)"my report!", (Object)StringUtils.fromUtf8((byte[])baos.toByteArray()));
        this.mockWorkerCompleteSuccessfulTask(this.task);
        Assert.assertTrue((boolean)this.workerCompletedTask((ListenableFuture<TaskStatus>)result));
        Assert.assertEquals((Object)Optional.absent(), (Object)this.remoteTaskRunner.streamTaskReports(this.task.getId()));
        EasyMock.verify((Object[])new Object[]{this.httpClient});
        Assert.assertEquals((Object)"http://dummy:9000/druid/worker/v1/chat/task%20id%20with%20spaces/liveReports", (Object)((Request)capturedRequest.getValue()).getUrl().toString());
    }
}

