/*
 * Decompiled with CFR 0.152.
 */
package org.apache.druid.indexing.overlord.hrtr;

import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Joiner;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import com.google.common.base.Predicate;
import com.google.common.base.Supplier;
import com.google.common.base.Throwables;
import com.google.common.collect.Collections2;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Maps;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListenableScheduledFuture;
import com.google.common.util.concurrent.ListeningScheduledExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.errorprone.annotations.concurrent.GuardedBy;
import java.io.IOException;
import java.io.InputStream;
import java.net.URL;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.api.ChildrenDeletable;
import org.apache.druid.concurrent.LifecycleLock;
import org.apache.druid.discovery.DiscoveryDruidNode;
import org.apache.druid.discovery.DruidNodeDiscovery;
import org.apache.druid.discovery.DruidNodeDiscoveryProvider;
import org.apache.druid.discovery.WorkerNodeService;
import org.apache.druid.indexer.RunnerTaskState;
import org.apache.druid.indexer.TaskLocation;
import org.apache.druid.indexer.TaskState;
import org.apache.druid.indexer.TaskStatus;
import org.apache.druid.indexing.common.task.IndexTaskUtils;
import org.apache.druid.indexing.common.task.Task;
import org.apache.druid.indexing.overlord.ImmutableWorkerInfo;
import org.apache.druid.indexing.overlord.RemoteTaskRunnerWorkItem;
import org.apache.druid.indexing.overlord.TaskRunnerListener;
import org.apache.druid.indexing.overlord.TaskRunnerUtils;
import org.apache.druid.indexing.overlord.TaskRunnerWorkItem;
import org.apache.druid.indexing.overlord.TaskStorage;
import org.apache.druid.indexing.overlord.WorkerTaskRunner;
import org.apache.druid.indexing.overlord.autoscaling.ProvisioningService;
import org.apache.druid.indexing.overlord.autoscaling.ProvisioningStrategy;
import org.apache.druid.indexing.overlord.autoscaling.ScalingStats;
import org.apache.druid.indexing.overlord.config.HttpRemoteTaskRunnerConfig;
import org.apache.druid.indexing.overlord.config.WorkerTaskRunnerConfig;
import org.apache.druid.indexing.overlord.hrtr.WorkerHolder;
import org.apache.druid.indexing.overlord.setup.WorkerBehaviorConfig;
import org.apache.druid.indexing.overlord.setup.WorkerSelectStrategy;
import org.apache.druid.indexing.worker.TaskAnnouncement;
import org.apache.druid.indexing.worker.Worker;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.Pair;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.java.util.common.concurrent.ScheduledExecutors;
import org.apache.druid.java.util.common.lifecycle.LifecycleStart;
import org.apache.druid.java.util.common.lifecycle.LifecycleStop;
import org.apache.druid.java.util.emitter.EmittingLogger;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
import org.apache.druid.java.util.http.client.HttpClient;
import org.apache.druid.java.util.http.client.Request;
import org.apache.druid.java.util.http.client.response.HttpResponseHandler;
import org.apache.druid.java.util.http.client.response.InputStreamResponseHandler;
import org.apache.druid.server.initialization.IndexerZkConfig;
import org.apache.druid.tasklogs.TaskLogStreamer;
import org.apache.zookeeper.KeeperException;
import org.jboss.netty.handler.codec.http.HttpMethod;
import org.joda.time.Duration;
import org.joda.time.Period;
import org.joda.time.ReadableInstant;
import org.joda.time.ReadablePeriod;

public class HttpRemoteTaskRunner
implements WorkerTaskRunner,
TaskLogStreamer {
    private static final EmittingLogger log = new EmittingLogger(HttpRemoteTaskRunner.class);
    private final LifecycleLock lifecycleLock = new LifecycleLock();
    private final ExecutorService pendingTasksExec;
    @GuardedBy(value="statusLock")
    private final ConcurrentMap<String, HttpRemoteTaskRunnerWorkItem> tasks = new ConcurrentHashMap<String, HttpRemoteTaskRunnerWorkItem>();
    @GuardedBy(value="statusLock")
    private final List<String> pendingTaskIds = new ArrayList<String>();
    private final ConcurrentMap<String, WorkerHolder> workers = new ConcurrentHashMap<String, WorkerHolder>();
    private final ScheduledExecutorService workersSyncExec;
    private final ConcurrentMap<String, WorkerHolder> lazyWorkers = new ConcurrentHashMap<String, WorkerHolder>();
    private final ConcurrentHashMap<String, WorkerHolder> blackListedWorkers = new ConcurrentHashMap();
    @GuardedBy(value="statusLock")
    private final ConcurrentMap<String, String> workersWithUnacknowledgedTask = new ConcurrentHashMap<String, String>();
    private final ListeningScheduledExecutorService cleanupExec;
    private final ConcurrentMap<String, ScheduledFuture> removedWorkerCleanups = new ConcurrentHashMap<String, ScheduledFuture>();
    private final Object statusLock = new Object();
    private final CopyOnWriteArrayList<Pair<TaskRunnerListener, Executor>> listeners = new CopyOnWriteArrayList();
    private final ProvisioningStrategy<WorkerTaskRunner> provisioningStrategy;
    private ProvisioningService provisioningService;
    private final DruidNodeDiscoveryProvider druidNodeDiscoveryProvider;
    private final HttpClient httpClient;
    private final ObjectMapper smileMapper;
    private final Supplier<WorkerBehaviorConfig> workerConfigRef;
    private final HttpRemoteTaskRunnerConfig config;
    private final TaskStorage taskStorage;
    private final ServiceEmitter emitter;
    private static final Joiner JOINER = Joiner.on((String)"/");
    @Nullable
    private final CuratorFramework cf;
    @Nullable
    private final ScheduledExecutorService zkCleanupExec;
    private final IndexerZkConfig indexerZkConfig;
    private volatile DruidNodeDiscovery.Listener nodeDiscoveryListener;

    public HttpRemoteTaskRunner(ObjectMapper smileMapper, HttpRemoteTaskRunnerConfig config, HttpClient httpClient, Supplier<WorkerBehaviorConfig> workerConfigRef, ProvisioningStrategy<WorkerTaskRunner> provisioningStrategy, DruidNodeDiscoveryProvider druidNodeDiscoveryProvider, TaskStorage taskStorage, @Nullable CuratorFramework cf, IndexerZkConfig indexerZkConfig, ServiceEmitter emitter) {
        this.smileMapper = smileMapper;
        this.config = config;
        this.httpClient = httpClient;
        this.druidNodeDiscoveryProvider = druidNodeDiscoveryProvider;
        this.taskStorage = taskStorage;
        this.workerConfigRef = workerConfigRef;
        this.emitter = emitter;
        this.pendingTasksExec = Execs.multiThreaded((int)config.getPendingTasksRunnerNumThreads(), (String)"hrtr-pending-tasks-runner-%d");
        this.workersSyncExec = ScheduledExecutors.fixed((int)config.getWorkerSyncNumThreads(), (String)"HttpRemoteTaskRunner-worker-sync-%d");
        this.cleanupExec = MoreExecutors.listeningDecorator((ScheduledExecutorService)ScheduledExecutors.fixed((int)1, (String)"HttpRemoteTaskRunner-Worker-Cleanup-%d"));
        if (cf != null) {
            this.cf = cf;
            this.zkCleanupExec = ScheduledExecutors.fixed((int)1, (String)"HttpRemoteTaskRunner-zk-cleanup-%d");
        } else {
            this.cf = null;
            this.zkCleanupExec = null;
        }
        this.indexerZkConfig = indexerZkConfig;
        this.provisioningStrategy = provisioningStrategy;
    }

    @Override
    @LifecycleStart
    public void start() {
        if (!this.lifecycleLock.canStart()) {
            return;
        }
        try {
            log.info("Starting...", new Object[0]);
            this.scheduleCompletedTaskStatusCleanupFromZk();
            this.startWorkersHandling();
            ScheduledExecutors.scheduleAtFixedRate((ScheduledExecutorService)this.cleanupExec, (Duration)Period.ZERO.toStandardDuration(), (Duration)this.config.getWorkerBlackListCleanupPeriod().toStandardDuration(), this::checkAndRemoveWorkersFromBlackList);
            this.provisioningService = this.provisioningStrategy.makeProvisioningService(this);
            this.scheduleSyncMonitoring();
            this.startPendingTaskHandling();
            this.lifecycleLock.started();
            log.info("Started.", new Object[0]);
        }
        catch (Exception e) {
            throw new RuntimeException(e);
        }
        finally {
            this.lifecycleLock.exitStart();
        }
    }

    private void scheduleCompletedTaskStatusCleanupFromZk() {
        if (this.cf == null) {
            return;
        }
        this.zkCleanupExec.scheduleAtFixedRate(() -> {
            try {
                Object workers;
                try {
                    workers = (List)this.cf.getChildren().forPath(this.indexerZkConfig.getStatusPath());
                }
                catch (KeeperException.NoNodeException e) {
                    workers = ImmutableList.of();
                }
                HashSet<String> knownActiveTaskIds = new HashSet<String>();
                if (!workers.isEmpty()) {
                    for (Task task : this.taskStorage.getActiveTasks()) {
                        knownActiveTaskIds.add(task.getId());
                    }
                }
                Iterator<Object> iterator = workers.iterator();
                while (iterator.hasNext()) {
                    Object taskIds;
                    String workerId = (String)iterator.next();
                    String workerStatusPath = JOINER.join((Object)this.indexerZkConfig.getStatusPath(), (Object)workerId, new Object[0]);
                    try {
                        taskIds = (List)this.cf.getChildren().forPath(workerStatusPath);
                    }
                    catch (KeeperException.NoNodeException e) {
                        taskIds = ImmutableList.of();
                    }
                    for (String taskId : taskIds) {
                        if (knownActiveTaskIds.contains(taskId)) continue;
                        String taskStatusPath = JOINER.join((Object)workerStatusPath, (Object)taskId, new Object[0]);
                        try {
                            ((ChildrenDeletable)this.cf.delete().guaranteed()).forPath(taskStatusPath);
                        }
                        catch (KeeperException.NoNodeException e) {
                            log.info("Failed to delete taskStatusPath[%s].", new Object[]{taskStatusPath});
                        }
                    }
                }
            }
            catch (InterruptedException ex) {
                Thread.currentThread().interrupt();
            }
            catch (Exception ex) {
                log.error((Throwable)ex, "Unknown error while doing task status cleanup in ZK.", new Object[0]);
            }
        }, 1L, 5L, TimeUnit.MINUTES);
    }

    Map<String, ImmutableWorkerInfo> getWorkersEligibleToRunTasks() {
        return Maps.transformEntries((Map)Maps.filterEntries(this.workers, input -> !this.lazyWorkers.containsKey(input.getKey()) && !this.workersWithUnacknowledgedTask.containsKey(input.getKey()) && !this.blackListedWorkers.containsKey(input.getKey()) && ((WorkerHolder)input.getValue()).isInitialized() && ((WorkerHolder)input.getValue()).isEnabled()), (key, value) -> value.toImmutable());
    }

    private ImmutableWorkerInfo findWorkerToRunTask(Task task) {
        WorkerSelectStrategy strategy;
        WorkerBehaviorConfig workerConfig = (WorkerBehaviorConfig)this.workerConfigRef.get();
        if (workerConfig == null || workerConfig.getSelectStrategy() == null) {
            strategy = WorkerBehaviorConfig.DEFAULT_STRATEGY;
            log.debug("No worker selection strategy set. Using default of [%s]", new Object[]{strategy.getClass().getSimpleName()});
        } else {
            strategy = workerConfig.getSelectStrategy();
        }
        return strategy.findWorkerForTask(this.config, (ImmutableMap<String, ImmutableWorkerInfo>)ImmutableMap.copyOf(this.getWorkersEligibleToRunTasks()), task);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private boolean runTaskOnWorker(HttpRemoteTaskRunnerWorkItem workItem, String workerHost) throws InterruptedException {
        String taskId = workItem.getTaskId();
        WorkerHolder workerHolder = (WorkerHolder)this.workers.get(workerHost);
        if (workerHolder == null || this.lazyWorkers.containsKey(workerHost) || this.blackListedWorkers.containsKey(workerHost)) {
            log.info("Not assigning task[%s] to removed or marked lazy/blacklisted worker[%s]", new Object[]{taskId, workerHost});
            return false;
        }
        log.info("Assigning task [%s] to worker [%s]", new Object[]{taskId, workerHost});
        if (workerHolder.assignTask(workItem.getTask())) {
            long waitMs = this.config.getTaskAssignmentTimeout().toStandardDuration().getMillis();
            long waitStart = System.currentTimeMillis();
            boolean isTaskAssignmentTimedOut = false;
            Object object = this.statusLock;
            synchronized (object) {
                while (this.tasks.containsKey(taskId) && ((HttpRemoteTaskRunnerWorkItem)this.tasks.get(taskId)).getState().isPending()) {
                    long remaining = waitMs - (System.currentTimeMillis() - waitStart);
                    if (remaining > 0L) {
                        this.statusLock.wait(remaining);
                        continue;
                    }
                    isTaskAssignmentTimedOut = true;
                    break;
                }
            }
            if (isTaskAssignmentTimedOut) {
                log.makeAlert("Task assignment timed out on worker [%s], never ran task [%s] in timeout[%s]!", new Object[]{workerHost, taskId, this.config.getTaskAssignmentTimeout()}).emit();
                this.taskComplete(workItem, workerHolder, TaskStatus.failure((String)taskId, (String)StringUtils.format((String)"The worker that this task is assigned did not start it in timeout[%s]. See overlord and middleManager/indexer logs for more details.", (Object[])new Object[]{this.config.getTaskAssignmentTimeout()})));
            }
            return true;
        }
        return false;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void taskComplete(HttpRemoteTaskRunnerWorkItem taskRunnerWorkItem, WorkerHolder workerHolder, TaskStatus taskStatus) {
        Preconditions.checkState((!Thread.holdsLock(this.statusLock) ? 1 : 0) != 0, (Object)"Current thread must not hold statusLock.");
        Preconditions.checkNotNull((Object)taskRunnerWorkItem, (Object)"taskRunnerWorkItem");
        Preconditions.checkNotNull((Object)taskStatus, (Object)"taskStatus");
        if (workerHolder != null) {
            log.info("Worker[%s] completed task[%s] with status[%s]", new Object[]{workerHolder.getWorker().getHost(), taskStatus.getId(), taskStatus.getStatusCode()});
            workerHolder.setLastCompletedTaskTime(DateTimes.nowUtc());
        }
        if (taskRunnerWorkItem.getResult().isDone()) {
            try {
                TaskState lastKnownState = ((TaskStatus)taskRunnerWorkItem.getResult().get()).getStatusCode();
                if (taskStatus.getStatusCode() != lastKnownState) {
                    log.warn("The state of the new task complete event is different from its last known state. New state[%s], last known state[%s]", new Object[]{taskStatus.getStatusCode(), lastKnownState});
                }
            }
            catch (InterruptedException e) {
                log.warn((Throwable)e, "Interrupted while getting the last known task status.", new Object[0]);
                Thread.currentThread().interrupt();
            }
            catch (ExecutionException e) {
                log.warn((Throwable)e, "Failed to get the last known task status. Ignoring this failure.", new Object[0]);
            }
        } else {
            taskRunnerWorkItem.setResult(taskStatus);
            TaskRunnerUtils.notifyStatusChanged(this.listeners, taskStatus.getId(), taskStatus);
            if (workerHolder != null) {
                this.blacklistWorkerIfNeeded(taskStatus, workerHolder);
            }
        }
        Object object = this.statusLock;
        synchronized (object) {
            this.statusLock.notifyAll();
        }
    }

    private void startWorkersHandling() throws InterruptedException {
        final CountDownLatch workerViewInitialized = new CountDownLatch(1);
        DruidNodeDiscovery druidNodeDiscovery = this.druidNodeDiscoveryProvider.getForService("workerNodeService");
        this.nodeDiscoveryListener = new DruidNodeDiscovery.Listener(){

            public void nodesAdded(Collection<DiscoveryDruidNode> nodes) {
                nodes.forEach(node -> HttpRemoteTaskRunner.this.addWorker(HttpRemoteTaskRunner.this.toWorker(node)));
            }

            public void nodesRemoved(Collection<DiscoveryDruidNode> nodes) {
                nodes.forEach(node -> HttpRemoteTaskRunner.this.removeWorker(HttpRemoteTaskRunner.this.toWorker(node)));
            }

            public void nodeViewInitialized() {
                workerViewInitialized.countDown();
            }
        };
        druidNodeDiscovery.registerListener(this.nodeDiscoveryListener);
        long workerDiscoveryStartTime = System.currentTimeMillis();
        while (!workerViewInitialized.await(30L, TimeUnit.SECONDS)) {
            if (System.currentTimeMillis() - workerDiscoveryStartTime > TimeUnit.MINUTES.toMillis(5L)) {
                throw new ISE("Couldn't discover workers.", new Object[0]);
            }
            log.info("Waiting for worker discovery...", new Object[0]);
        }
        log.info("[%s] Workers are discovered.", new Object[]{this.workers.size()});
        for (WorkerHolder worker : this.workers.values()) {
            log.info("Waiting for worker[%s] to sync state...", new Object[]{worker.getWorker().getHost()});
            worker.waitForInitialization();
        }
        log.info("Workers have sync'd state successfully.", new Object[0]);
    }

    private Worker toWorker(DiscoveryDruidNode node) {
        return new Worker(node.getDruidNode().getServiceScheme(), node.getDruidNode().getHostAndPortToUse(), ((WorkerNodeService)node.getServices().get("workerNodeService")).getIp(), ((WorkerNodeService)node.getServices().get("workerNodeService")).getCapacity(), ((WorkerNodeService)node.getServices().get("workerNodeService")).getVersion(), ((WorkerNodeService)node.getServices().get("workerNodeService")).getCategory());
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void addWorker(Worker worker) {
        Object object = this.workers;
        synchronized (object) {
            log.info("Worker[%s] reportin' for duty!", new Object[]{worker.getHost()});
            this.cancelWorkerCleanup(worker.getHost());
            WorkerHolder holder = (WorkerHolder)this.workers.get(worker.getHost());
            if (holder == null) {
                ArrayList<TaskAnnouncement> expectedAnnouncements = new ArrayList<TaskAnnouncement>();
                Object object2 = this.statusLock;
                synchronized (object2) {
                    for (Map.Entry e : this.tasks.entrySet()) {
                        Worker w;
                        if (((HttpRemoteTaskRunnerWorkItem)e.getValue()).getState() != HttpRemoteTaskRunnerWorkItem.State.RUNNING || (w = ((HttpRemoteTaskRunnerWorkItem)e.getValue()).getWorker()) == null || !w.getHost().equals(worker.getHost()) || ((HttpRemoteTaskRunnerWorkItem)e.getValue()).getTask() == null) continue;
                        expectedAnnouncements.add(TaskAnnouncement.create(((HttpRemoteTaskRunnerWorkItem)e.getValue()).getTask(), TaskStatus.running((String)((String)e.getKey())), ((HttpRemoteTaskRunnerWorkItem)e.getValue()).getLocation()));
                    }
                }
                holder = this.createWorkerHolder(this.smileMapper, this.httpClient, this.config, this.workersSyncExec, this::taskAddedOrUpdated, worker, expectedAnnouncements);
                holder.start();
                this.workers.put(worker.getHost(), holder);
            } else {
                log.info("Worker[%s] already exists.", new Object[]{worker.getHost()});
            }
        }
        object = this.statusLock;
        synchronized (object) {
            this.statusLock.notifyAll();
        }
    }

    protected WorkerHolder createWorkerHolder(ObjectMapper smileMapper, HttpClient httpClient, HttpRemoteTaskRunnerConfig config, ScheduledExecutorService workersSyncExec, WorkerHolder.Listener listener, Worker worker, List<TaskAnnouncement> knownAnnouncements) {
        return new WorkerHolder(smileMapper, httpClient, config, workersSyncExec, listener, worker, knownAnnouncements);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void removeWorker(Worker worker) {
        ConcurrentMap<String, WorkerHolder> concurrentMap = this.workers;
        synchronized (concurrentMap) {
            log.info("Kaboom! Worker[%s] removed!", new Object[]{worker.getHost()});
            WorkerHolder workerHolder = (WorkerHolder)this.workers.remove(worker.getHost());
            if (workerHolder != null) {
                try {
                    workerHolder.stop();
                    this.scheduleTasksCleanupForWorker(worker.getHost());
                }
                catch (Exception e) {
                    throw new RuntimeException(e);
                }
                finally {
                    this.checkAndRemoveWorkersFromBlackList();
                }
            }
            this.lazyWorkers.remove(worker.getHost());
        }
    }

    private boolean cancelWorkerCleanup(String workerHost) {
        ScheduledFuture previousCleanup = (ScheduledFuture)this.removedWorkerCleanups.remove(workerHost);
        if (previousCleanup != null) {
            log.info("Cancelling Worker[%s] scheduled task cleanup", new Object[]{workerHost});
            previousCleanup.cancel(false);
        }
        return previousCleanup != null;
    }

    private void scheduleTasksCleanupForWorker(final String workerHostAndPort) {
        this.cancelWorkerCleanup(workerHostAndPort);
        final ListenableScheduledFuture cleanupTask = this.cleanupExec.schedule(() -> {
            log.info("Running scheduled cleanup for Worker[%s]", new Object[]{workerHostAndPort});
            try {
                HashSet tasksToFail = new HashSet();
                Iterator iterator = this.statusLock;
                synchronized (iterator) {
                    for (Map.Entry e : this.tasks.entrySet()) {
                        Worker w;
                        if (((HttpRemoteTaskRunnerWorkItem)e.getValue()).getState() != HttpRemoteTaskRunnerWorkItem.State.RUNNING || (w = ((HttpRemoteTaskRunnerWorkItem)e.getValue()).getWorker()) == null || !w.getHost().equals(workerHostAndPort)) continue;
                        tasksToFail.add(e.getValue());
                    }
                }
                for (HttpRemoteTaskRunnerWorkItem taskItem : tasksToFail) {
                    if (taskItem.getResult().isDone()) continue;
                    log.warn("Failing task[%s] because worker[%s] disappeared and did not report within cleanup timeout[%s].", new Object[]{workerHostAndPort, taskItem.getTaskId(), this.config.getTaskCleanupTimeout()});
                    this.taskComplete(taskItem, null, TaskStatus.failure((String)taskItem.getTaskId(), (String)StringUtils.format((String)"The worker that this task was assigned disappeared and did not report cleanup within timeout[%s]. See overlord and middleManager/indexer logs for more details.", (Object[])new Object[]{this.config.getTaskCleanupTimeout()})));
                }
            }
            catch (Exception e) {
                log.makeAlert("Exception while cleaning up worker[%s]", new Object[]{workerHostAndPort}).emit();
                throw new RuntimeException(e);
            }
        }, this.config.getTaskCleanupTimeout().toStandardDuration().getMillis(), TimeUnit.MILLISECONDS);
        this.removedWorkerCleanups.put(workerHostAndPort, (ScheduledFuture)cleanupTask);
        Futures.addCallback((ListenableFuture)cleanupTask, (FutureCallback)new FutureCallback<Object>(){

            public void onSuccess(Object result) {
                HttpRemoteTaskRunner.this.removedWorkerCleanups.remove(workerHostAndPort, cleanupTask);
            }

            public void onFailure(Throwable t) {
                HttpRemoteTaskRunner.this.removedWorkerCleanups.remove(workerHostAndPort, cleanupTask);
            }
        });
    }

    private void scheduleSyncMonitoring() {
        this.workersSyncExec.scheduleAtFixedRate(() -> {
            log.debug("Running the Sync Monitoring.", new Object[0]);
            try {
                Iterator iterator = this.workers.entrySet().iterator();
                while (iterator.hasNext()) {
                    Map.Entry e = iterator.next();
                    WorkerHolder workerHolder = (WorkerHolder)e.getValue();
                    if (workerHolder.getUnderlyingSyncer().isOK()) continue;
                    ConcurrentMap<String, WorkerHolder> concurrentMap = this.workers;
                    synchronized (concurrentMap) {
                        if (this.workers.containsKey(e.getKey())) {
                            log.makeAlert("Worker[%s] is not syncing properly. Current state is [%s]. Resetting it.", new Object[]{workerHolder.getWorker().getHost(), workerHolder.getUnderlyingSyncer().getDebugInfo()}).emit();
                            this.removeWorker(workerHolder.getWorker());
                            this.addWorker(workerHolder.getWorker());
                        }
                    }
                }
                return;
            }
            catch (Exception ex) {
                if (ex instanceof InterruptedException) {
                    Thread.currentThread().interrupt();
                    return;
                }
                log.makeAlert((Throwable)ex, "Exception in sync monitoring.", new Object[0]).emit();
            }
        }, 1L, 5L, TimeUnit.MINUTES);
    }

    Map<String, Object> getWorkerSyncerDebugInfo() {
        Preconditions.checkArgument((boolean)this.lifecycleLock.awaitStarted(1L, TimeUnit.MILLISECONDS));
        HashMap result = Maps.newHashMapWithExpectedSize((int)this.workers.size());
        for (Map.Entry e : this.workers.entrySet()) {
            WorkerHolder serverHolder = (WorkerHolder)e.getValue();
            result.put(e.getKey(), serverHolder.getUnderlyingSyncer().getDebugInfo());
        }
        return result;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void checkAndRemoveWorkersFromBlackList() {
        boolean shouldRunPendingTasks = false;
        Iterator<Map.Entry<String, WorkerHolder>> iterator = this.blackListedWorkers.entrySet().iterator();
        while (iterator.hasNext()) {
            Map.Entry<String, WorkerHolder> e = iterator.next();
            if (!this.shouldRemoveNodeFromBlackList(e.getValue())) continue;
            iterator.remove();
            e.getValue().resetContinuouslyFailedTasksCount();
            e.getValue().setBlacklistedUntil(null);
            shouldRunPendingTasks = true;
        }
        if (shouldRunPendingTasks) {
            Object object = this.statusLock;
            synchronized (object) {
                this.statusLock.notifyAll();
            }
        }
    }

    private boolean shouldRemoveNodeFromBlackList(WorkerHolder workerHolder) {
        if (!this.workers.containsKey(workerHolder.getWorker().getHost())) {
            return true;
        }
        if ((double)this.blackListedWorkers.size() > (double)this.workers.size() * ((double)this.config.getMaxPercentageBlacklistWorkers() / 100.0)) {
            log.info("Removing [%s] from blacklist because percentage of blacklisted workers exceeds [%d]", new Object[]{workerHolder.getWorker(), this.config.getMaxPercentageBlacklistWorkers()});
            return true;
        }
        long remainingMillis = workerHolder.getBlacklistedUntil().getMillis() - System.currentTimeMillis();
        if (remainingMillis <= 0L) {
            log.info("Removing [%s] from blacklist because backoff time elapsed", new Object[]{workerHolder.getWorker()});
            return true;
        }
        log.info("[%s] still blacklisted for [%,ds]", new Object[]{workerHolder.getWorker(), remainingMillis / 1000L});
        return false;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void blacklistWorkerIfNeeded(TaskStatus taskStatus, WorkerHolder workerHolder) {
        ConcurrentHashMap<String, WorkerHolder> concurrentHashMap = this.blackListedWorkers;
        synchronized (concurrentHashMap) {
            if (taskStatus.isSuccess()) {
                workerHolder.resetContinuouslyFailedTasksCount();
                if (this.blackListedWorkers.remove(workerHolder.getWorker().getHost()) != null) {
                    workerHolder.setBlacklistedUntil(null);
                    log.info("[%s] removed from blacklist because a task finished with SUCCESS", new Object[]{workerHolder.getWorker()});
                }
            } else if (taskStatus.isFailure()) {
                workerHolder.incrementContinuouslyFailedTasksCount();
            }
            if (workerHolder.getContinuouslyFailedTasksCount() > this.config.getMaxRetriesBeforeBlacklist() && (double)this.blackListedWorkers.size() <= (double)this.workers.size() * ((double)this.config.getMaxPercentageBlacklistWorkers() / 100.0) - 1.0) {
                workerHolder.setBlacklistedUntil(DateTimes.nowUtc().plus((ReadablePeriod)this.config.getWorkerBlackListBackoffTime()));
                if (this.blackListedWorkers.put(workerHolder.getWorker().getHost(), workerHolder) == null) {
                    log.info("Blacklisting [%s] until [%s] after [%,d] failed tasks in a row.", new Object[]{workerHolder.getWorker(), workerHolder.getBlacklistedUntil(), workerHolder.getContinuouslyFailedTasksCount()});
                }
            }
        }
    }

    @Override
    public Collection<ImmutableWorkerInfo> getWorkers() {
        return this.workers.values().stream().map(worker -> worker.toImmutable()).collect(Collectors.toList());
    }

    @VisibleForTesting
    ConcurrentMap<String, WorkerHolder> getWorkersForTestingReadOnly() {
        return this.workers;
    }

    @Override
    public Collection<Worker> getLazyWorkers() {
        return this.lazyWorkers.values().stream().map(holder -> holder.getWorker()).collect(Collectors.toList());
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public Collection<Worker> markWorkersLazy(Predicate<ImmutableWorkerInfo> isLazyWorker, int maxWorkers) {
        Object object = this.statusLock;
        synchronized (object) {
            for (Map.Entry worker : this.workers.entrySet()) {
                WorkerHolder workerHolder = (WorkerHolder)worker.getValue();
                try {
                    if (!this.isWorkerOkForMarkingLazy(workerHolder.getWorker()) || !isLazyWorker.apply((Object)workerHolder.toImmutable())) continue;
                    log.info("Adding Worker[%s] to lazySet!", new Object[]{workerHolder.getWorker().getHost()});
                    this.lazyWorkers.put((String)worker.getKey(), workerHolder);
                    if (this.lazyWorkers.size() != maxWorkers) continue;
                    break;
                }
                catch (Exception e) {
                    throw new RuntimeException(e);
                }
            }
            return this.getLazyWorkers();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private boolean isWorkerOkForMarkingLazy(Worker worker) {
        Object object = this.statusLock;
        synchronized (object) {
            if (this.workersWithUnacknowledgedTask.containsKey(worker.getHost())) {
                return false;
            }
            for (Map.Entry e : this.tasks.entrySet()) {
                Worker w;
                if (((HttpRemoteTaskRunnerWorkItem)e.getValue()).getState() != HttpRemoteTaskRunnerWorkItem.State.RUNNING || (w = ((HttpRemoteTaskRunnerWorkItem)e.getValue()).getWorker()) == null || !w.getHost().equals(worker.getHost())) continue;
                return false;
            }
            return true;
        }
    }

    @Override
    public WorkerTaskRunnerConfig getConfig() {
        return this.config;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public Collection<Task> getPendingTaskPayloads() {
        Object object = this.statusLock;
        synchronized (object) {
            return this.tasks.values().stream().filter(item -> item.getState().isPending()).map(HttpRemoteTaskRunnerWorkItem::getTask).collect(Collectors.toList());
        }
    }

    public Optional<InputStream> streamTaskLog(String taskId, long offset) throws IOException {
        HttpRemoteTaskRunnerWorkItem taskRunnerWorkItem = (HttpRemoteTaskRunnerWorkItem)this.tasks.get(taskId);
        Worker worker = null;
        if (taskRunnerWorkItem != null && taskRunnerWorkItem.getState() != HttpRemoteTaskRunnerWorkItem.State.COMPLETE) {
            worker = taskRunnerWorkItem.getWorker();
        }
        if (worker == null || !this.workers.containsKey(worker.getHost())) {
            return Optional.absent();
        }
        URL url = TaskRunnerUtils.makeWorkerURL(worker, "/druid/worker/v1/task/%s/log?offset=%s", taskId, Long.toString(offset));
        try {
            return Optional.of((Object)this.httpClient.go(new Request(HttpMethod.GET, url), (HttpResponseHandler)new InputStreamResponseHandler()).get());
        }
        catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
        catch (ExecutionException e) {
            Throwables.propagateIfPossible((Throwable)e.getCause(), IOException.class);
            throw new RuntimeException(e);
        }
    }

    public Optional<InputStream> streamTaskReports(String taskId) throws IOException {
        HttpRemoteTaskRunnerWorkItem taskRunnerWorkItem = (HttpRemoteTaskRunnerWorkItem)this.tasks.get(taskId);
        Worker worker = null;
        if (taskRunnerWorkItem != null && taskRunnerWorkItem.getState() != HttpRemoteTaskRunnerWorkItem.State.COMPLETE) {
            worker = taskRunnerWorkItem.getWorker();
        }
        if (worker == null || !this.workers.containsKey(worker.getHost())) {
            return Optional.absent();
        }
        TaskLocation taskLocation = taskRunnerWorkItem.getLocation();
        if (TaskLocation.unknown().equals((Object)taskLocation)) {
            return Optional.absent();
        }
        URL url = TaskRunnerUtils.makeTaskLocationURL(taskLocation, "/druid/worker/v1/chat/%s/liveReports", taskId);
        try {
            return Optional.of((Object)this.httpClient.go(new Request(HttpMethod.GET, url), (HttpResponseHandler)new InputStreamResponseHandler()).get());
        }
        catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
        catch (ExecutionException e) {
            Throwables.propagateIfPossible((Throwable)e.getCause(), IOException.class);
            throw new RuntimeException(e);
        }
    }

    @Override
    public List<Pair<Task, ListenableFuture<TaskStatus>>> restore() {
        return ImmutableList.of();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void registerListener(TaskRunnerListener listener, Executor executor) {
        for (Pair<TaskRunnerListener, Executor> pair : this.listeners) {
            if (!((TaskRunnerListener)pair.lhs).getListenerId().equals(listener.getListenerId())) continue;
            throw new ISE("Listener [%s] already registered", new Object[]{listener.getListenerId()});
        }
        Pair listenerPair = Pair.of((Object)listener, (Object)executor);
        Object object = this.statusLock;
        synchronized (object) {
            for (Map.Entry entry : this.tasks.entrySet()) {
                if (((HttpRemoteTaskRunnerWorkItem)entry.getValue()).getState() != HttpRemoteTaskRunnerWorkItem.State.RUNNING) continue;
                TaskRunnerUtils.notifyLocationChanged((Iterable<Pair<TaskRunnerListener, Executor>>)ImmutableList.of((Object)listenerPair), (String)entry.getKey(), ((HttpRemoteTaskRunnerWorkItem)entry.getValue()).getLocation());
            }
            log.info("Registered listener [%s]", new Object[]{listener.getListenerId()});
            this.listeners.add((Pair<TaskRunnerListener, Executor>)listenerPair);
        }
    }

    @Override
    public void unregisterListener(String listenerId) {
        for (Pair<TaskRunnerListener, Executor> pair : this.listeners) {
            if (!((TaskRunnerListener)pair.lhs).getListenerId().equals(listenerId)) continue;
            this.listeners.remove(pair);
            log.info("Unregistered listener [%s]", new Object[]{listenerId});
            return;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public ListenableFuture<TaskStatus> run(Task task) {
        Preconditions.checkState((boolean)this.lifecycleLock.awaitStarted(1L, TimeUnit.MILLISECONDS), (Object)"not started");
        Object object = this.statusLock;
        synchronized (object) {
            HttpRemoteTaskRunnerWorkItem existing = (HttpRemoteTaskRunnerWorkItem)this.tasks.get(task.getId());
            if (existing != null) {
                log.info("Assigned a task[%s] that is known already. Ignored.", new Object[]{task.getId()});
                if (existing.getTask() == null) {
                    existing.setTask(task);
                }
                return existing.getResult();
            }
            log.info("Adding pending task[%s].", new Object[]{task.getId()});
            HttpRemoteTaskRunnerWorkItem taskRunnerWorkItem = new HttpRemoteTaskRunnerWorkItem(task.getId(), null, null, task, task.getType(), HttpRemoteTaskRunnerWorkItem.State.PENDING);
            this.tasks.put(task.getId(), taskRunnerWorkItem);
            this.pendingTaskIds.add(task.getId());
            this.statusLock.notifyAll();
            return taskRunnerWorkItem.getResult();
        }
    }

    private void startPendingTaskHandling() {
        for (int i = 0; i < this.config.getPendingTasksRunnerNumThreads(); ++i) {
            this.pendingTasksExec.submit(() -> {
                try {
                    if (!this.lifecycleLock.awaitStarted()) {
                        log.makeAlert("Lifecycle not started, PendingTaskExecution loop will not run.", new Object[0]).emit();
                        return;
                    }
                    this.pendingTasksExecutionLoop();
                }
                catch (Throwable t) {
                    log.makeAlert(t, "Error while waiting for lifecycle start. PendingTaskExecution loop will not run", new Object[0]).emit();
                }
                finally {
                    log.info("PendingTaskExecution loop exited.", new Object[0]);
                }
            });
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void pendingTasksExecutionLoop() {
        while (!Thread.interrupted() && this.lifecycleLock.awaitStarted(1L, TimeUnit.MILLISECONDS)) {
            try {
                TaskRunnerWorkItem taskItem = null;
                ImmutableWorkerInfo immutableWorker = null;
                Object object = this.statusLock;
                synchronized (object) {
                    Iterator<String> iter = this.pendingTaskIds.iterator();
                    while (iter.hasNext()) {
                        String taskId = iter.next();
                        HttpRemoteTaskRunnerWorkItem ti = (HttpRemoteTaskRunnerWorkItem)this.tasks.get(taskId);
                        if (ti == null || !ti.getState().isPending()) {
                            iter.remove();
                            continue;
                        }
                        if (ti.getState() == HttpRemoteTaskRunnerWorkItem.State.PENDING_WORKER_ASSIGN) continue;
                        if (ti.getTask() == null) {
                            taskItem = ti;
                            break;
                        }
                        immutableWorker = this.findWorkerToRunTask(ti.getTask());
                        if (immutableWorker == null) continue;
                        String prevUnackedTaskId = this.workersWithUnacknowledgedTask.putIfAbsent(immutableWorker.getWorker().getHost(), taskId);
                        if (prevUnackedTaskId != null) {
                            log.makeAlert("Found worker[%s] with unacked task[%s] but still was identified to run task[%s].", new Object[]{immutableWorker.getWorker().getHost(), prevUnackedTaskId, taskId}).emit();
                        }
                        ti.setState(HttpRemoteTaskRunnerWorkItem.State.PENDING_WORKER_ASSIGN);
                        taskItem = ti;
                        break;
                    }
                    if (taskItem == null) {
                        this.statusLock.wait(TimeUnit.MINUTES.toMillis(1L));
                        continue;
                    }
                }
                String taskId = taskItem.getTaskId();
                if (((HttpRemoteTaskRunnerWorkItem)taskItem).getTask() == null) {
                    log.makeAlert("No Task obj found in TaskItem for taskID[%s]. Failed.", new Object[]{taskId}).emit();
                    this.taskComplete((HttpRemoteTaskRunnerWorkItem)taskItem, null, TaskStatus.failure((String)taskId, (String)"No payload found for this task. See overlord logs and middleManager/indexer logs for more details."));
                    continue;
                }
                if (immutableWorker == null) {
                    throw new ISE("Unexpected state: null immutableWorker", new Object[0]);
                }
                try {
                    if (this.runTaskOnWorker((HttpRemoteTaskRunnerWorkItem)taskItem, immutableWorker.getWorker().getHost()) || ((HttpRemoteTaskRunnerWorkItem)taskItem).getState() != HttpRemoteTaskRunnerWorkItem.State.PENDING_WORKER_ASSIGN) continue;
                    ((HttpRemoteTaskRunnerWorkItem)taskItem).revertStateFromPendingWorkerAssignToPending();
                }
                catch (InterruptedException ex) {
                    log.info("Got InterruptedException while assigning task[%s].", new Object[]{taskId});
                    throw ex;
                }
                catch (Throwable th) {
                    log.makeAlert(th, "Exception while trying to assign task", new Object[0]).addData("taskId", (Object)taskId).emit();
                    this.taskComplete((HttpRemoteTaskRunnerWorkItem)taskItem, null, TaskStatus.failure((String)taskId, (String)"Failed to assign this task. See overlord logs for more details."));
                }
                finally {
                    Object object2 = this.statusLock;
                    synchronized (object2) {
                        this.workersWithUnacknowledgedTask.remove(immutableWorker.getWorker().getHost());
                        this.statusLock.notifyAll();
                    }
                }
            }
            catch (InterruptedException ex) {
                log.info("Interrupted, will Exit.", new Object[0]);
                Thread.currentThread().interrupt();
            }
            catch (Throwable th) {
                log.makeAlert(th, "Unknown Exception while trying to assign tasks.", new Object[0]).emit();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    List<String> getPendingTasksList() {
        Object object = this.statusLock;
        synchronized (object) {
            return ImmutableList.copyOf(this.pendingTaskIds);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void shutdown(String taskId, String reason) {
        if (!this.lifecycleLock.awaitStarted(1L, TimeUnit.SECONDS)) {
            log.info("This TaskRunner is stopped or not yet started. Ignoring shutdown command for task: %s", new Object[]{taskId});
            return;
        }
        WorkerHolder workerHolderRunningTask = null;
        Object object = this.statusLock;
        synchronized (object) {
            log.info("Shutdown [%s] because: [%s]", new Object[]{taskId, reason});
            HttpRemoteTaskRunnerWorkItem taskRunnerWorkItem = (HttpRemoteTaskRunnerWorkItem)this.tasks.remove(taskId);
            if (taskRunnerWorkItem != null) {
                if (taskRunnerWorkItem.getState() == HttpRemoteTaskRunnerWorkItem.State.RUNNING && (workerHolderRunningTask = (WorkerHolder)this.workers.get(taskRunnerWorkItem.getWorker().getHost())) == null) {
                    log.info("Can't shutdown! No worker running task[%s]", new Object[]{taskId});
                }
            } else {
                log.info("Received shutdown task[%s], but can't find it. Ignored.", new Object[]{taskId});
            }
        }
        if (workerHolderRunningTask != null) {
            log.debug("Got shutdown request for task[%s]. Asking worker[%s] to kill it.", new Object[]{taskId, workerHolderRunningTask.getWorker().getHost()});
            workerHolderRunningTask.shutdownTask(taskId);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    @LifecycleStop
    public void stop() {
        if (!this.lifecycleLock.canStop()) {
            throw new ISE("can't stop.", new Object[0]);
        }
        try {
            log.info("Stopping...", new Object[0]);
            if (this.provisioningService != null) {
                this.provisioningService.close();
            }
            this.pendingTasksExec.shutdownNow();
            this.workersSyncExec.shutdownNow();
            this.cleanupExec.shutdown();
            log.info("Removing listener", new Object[0]);
            DruidNodeDiscovery druidNodeDiscovery = this.druidNodeDiscoveryProvider.getForService("workerNodeService");
            druidNodeDiscovery.removeListener(this.nodeDiscoveryListener);
            log.info("Stopping worker holders", new Object[0]);
            ConcurrentMap<String, WorkerHolder> concurrentMap = this.workers;
            synchronized (concurrentMap) {
                this.workers.values().forEach(w -> {
                    try {
                        w.stop();
                    }
                    catch (Exception e) {
                        log.error((Throwable)e, e.getMessage(), new Object[0]);
                    }
                });
            }
        }
        finally {
            this.lifecycleLock.exitStop();
        }
        log.info("Stopped.", new Object[0]);
    }

    @Override
    public Collection<? extends TaskRunnerWorkItem> getRunningTasks() {
        return this.tasks.values().stream().filter(item -> item.getState() == HttpRemoteTaskRunnerWorkItem.State.RUNNING).collect(Collectors.toList());
    }

    @Override
    public Collection<? extends TaskRunnerWorkItem> getPendingTasks() {
        return this.tasks.values().stream().filter(item -> item.getState().isPending()).collect(Collectors.toList());
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public Collection<? extends TaskRunnerWorkItem> getKnownTasks() {
        Object object = this.statusLock;
        synchronized (object) {
            return ImmutableList.copyOf(this.tasks.values());
        }
    }

    public Collection<? extends TaskRunnerWorkItem> getCompletedTasks() {
        return this.tasks.values().stream().filter(item -> item.getState() == HttpRemoteTaskRunnerWorkItem.State.COMPLETE).collect(Collectors.toList());
    }

    @Override
    @Nullable
    public RunnerTaskState getRunnerTaskState(String taskId) {
        HttpRemoteTaskRunnerWorkItem workItem = (HttpRemoteTaskRunnerWorkItem)this.tasks.get(taskId);
        if (workItem == null) {
            return null;
        }
        return workItem.getState().toRunnerTaskState();
    }

    @Override
    public TaskLocation getTaskLocation(String taskId) {
        HttpRemoteTaskRunnerWorkItem workItem = (HttpRemoteTaskRunnerWorkItem)this.tasks.get(taskId);
        if (workItem == null) {
            return TaskLocation.unknown();
        }
        return workItem.getLocation();
    }

    public List<String> getBlacklistedWorkers() {
        return this.blackListedWorkers.values().stream().map(holder -> holder.getWorker().getHost()).collect(Collectors.toList());
    }

    public Collection<ImmutableWorkerInfo> getBlackListedWorkers() {
        return ImmutableList.copyOf((Collection)Collections2.transform(this.blackListedWorkers.values(), WorkerHolder::toImmutable));
    }

    Map<String, String> getWorkersWithUnacknowledgedTasks() {
        return this.workersWithUnacknowledgedTask;
    }

    @Override
    public Optional<ScalingStats> getScalingStats() {
        return Optional.fromNullable((Object)this.provisioningService.getStats());
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    void taskAddedOrUpdated(TaskAnnouncement announcement, WorkerHolder workerHolder) {
        HttpRemoteTaskRunnerWorkItem taskItem;
        String taskId = announcement.getTaskId();
        Worker worker = workerHolder.getWorker();
        log.debug("Worker[%s] wrote [%s] status for task [%s] on [%s]", new Object[]{worker.getHost(), announcement.getTaskStatus().getStatusCode(), taskId, announcement.getTaskLocation()});
        boolean shouldShutdownTask = false;
        boolean isTaskCompleted = false;
        Object object = this.statusLock;
        synchronized (object) {
            block33: {
                block34: {
                    taskItem = (HttpRemoteTaskRunnerWorkItem)this.tasks.get(taskId);
                    if (taskItem != null) break block33;
                    Optional<TaskStatus> knownStatusInStorage = this.taskStorage.getStatus(taskId);
                    if (!knownStatusInStorage.isPresent()) break block34;
                    switch (((TaskStatus)knownStatusInStorage.get()).getStatusCode()) {
                        case RUNNING: {
                            taskItem = new HttpRemoteTaskRunnerWorkItem(taskId, worker, TaskLocation.unknown(), null, announcement.getTaskType(), HttpRemoteTaskRunnerWorkItem.State.RUNNING);
                            this.tasks.put(taskId, taskItem);
                            break;
                        }
                        case SUCCESS: 
                        case FAILED: {
                            if (!announcement.getTaskStatus().isComplete()) {
                                log.info("Worker[%s] reported status for completed, known from taskStorage, task[%s]. Ignored.", new Object[]{worker.getHost(), taskId});
                                break;
                            }
                            break block33;
                        }
                        default: {
                            log.makeAlert("Found unrecognized state[%s] of task[%s] in taskStorage. Notification[%s] from worker[%s] is ignored.", new Object[]{((TaskStatus)knownStatusInStorage.get()).getStatusCode(), taskId, announcement, worker.getHost()}).emit();
                            break;
                        }
                    }
                    break block33;
                }
                log.warn("Worker[%s] reported status[%s] for unknown task[%s]. Ignored.", new Object[]{worker.getHost(), announcement.getStatus(), taskId});
            }
            if (taskItem == null) {
                if (!announcement.getTaskStatus().isComplete()) {
                    shouldShutdownTask = true;
                }
            } else {
                block8 : switch (announcement.getTaskStatus().getStatusCode()) {
                    case RUNNING: {
                        switch (taskItem.getState()) {
                            case PENDING: 
                            case PENDING_WORKER_ASSIGN: {
                                taskItem.setWorker(worker);
                                taskItem.setState(HttpRemoteTaskRunnerWorkItem.State.RUNNING);
                                log.info("Task[%s] started RUNNING on worker[%s].", new Object[]{taskId, worker.getHost()});
                                ServiceMetricEvent.Builder metricBuilder = new ServiceMetricEvent.Builder();
                                IndexTaskUtils.setTaskDimensions(metricBuilder, taskItem.getTask());
                                this.emitter.emit(metricBuilder.build("task/pending/time", (Number)new Duration((ReadableInstant)taskItem.getCreatedTime(), (ReadableInstant)DateTimes.nowUtc()).getMillis()));
                            }
                            case RUNNING: {
                                if (worker.getHost().equals(taskItem.getWorker().getHost())) {
                                    if (announcement.getTaskLocation().equals((Object)taskItem.getLocation())) break block8;
                                    log.info("Task[%s] location changed on worker[%s]. new location[%s].", new Object[]{taskId, worker.getHost(), announcement.getTaskLocation()});
                                    taskItem.setLocation(announcement.getTaskLocation());
                                    TaskRunnerUtils.notifyLocationChanged(this.listeners, taskId, announcement.getTaskLocation());
                                    break;
                                }
                                log.warn("Found worker[%s] running task[%s] which is being run by another worker[%s]. Notification ignored.", new Object[]{worker.getHost(), taskId, taskItem.getWorker().getHost()});
                                shouldShutdownTask = true;
                                break;
                            }
                            case COMPLETE: {
                                log.warn("Worker[%s] reported status for completed task[%s]. Ignored.", new Object[]{worker.getHost(), taskId});
                                shouldShutdownTask = true;
                                break;
                            }
                            default: {
                                log.makeAlert("Found unrecognized state[%s] of task[%s]. Notification[%s] from worker[%s] is ignored.", new Object[]{taskItem.getState(), taskId, announcement, worker.getHost()}).emit();
                                break;
                            }
                        }
                        break;
                    }
                    case SUCCESS: 
                    case FAILED: {
                        switch (taskItem.getState()) {
                            case PENDING: 
                            case PENDING_WORKER_ASSIGN: {
                                taskItem.setWorker(worker);
                                taskItem.setState(HttpRemoteTaskRunnerWorkItem.State.RUNNING);
                                log.info("Task[%s] finished on worker[%s].", new Object[]{taskId, worker.getHost()});
                            }
                            case RUNNING: {
                                if (worker.getHost().equals(taskItem.getWorker().getHost())) {
                                    if (!announcement.getTaskLocation().equals((Object)taskItem.getLocation())) {
                                        log.info("Task[%s] location changed on worker[%s]. new location[%s].", new Object[]{taskId, worker.getHost(), announcement.getTaskLocation()});
                                        taskItem.setLocation(announcement.getTaskLocation());
                                        TaskRunnerUtils.notifyLocationChanged(this.listeners, taskId, announcement.getTaskLocation());
                                    }
                                    isTaskCompleted = true;
                                    break block8;
                                }
                                log.warn("Worker[%s] reported completed task[%s] which is being run by another worker[%s]. Notification ignored.", new Object[]{worker.getHost(), taskId, taskItem.getWorker().getHost()});
                                break block8;
                            }
                            case COMPLETE: {
                                break block8;
                            }
                        }
                        log.makeAlert("Found unrecognized state[%s] of task[%s]. Notification[%s] from worker[%s] is ignored.", new Object[]{taskItem.getState(), taskId, announcement, worker.getHost()}).emit();
                        break;
                    }
                    default: {
                        log.makeAlert("Worker[%s] reported unrecognized state[%s] for task[%s].", new Object[]{worker.getHost(), announcement.getTaskStatus().getStatusCode(), taskId}).emit();
                    }
                }
            }
        }
        if (isTaskCompleted) {
            this.taskComplete(taskItem, workerHolder, announcement.getTaskStatus());
        }
        if (shouldShutdownTask) {
            log.warn("Killing task[%s] on worker[%s].", new Object[]{taskId, worker.getHost()});
            workerHolder.shutdownTask(taskId);
        }
        object = this.statusLock;
        synchronized (object) {
            this.statusLock.notifyAll();
        }
    }

    @Override
    public Map<String, Long> getTotalTaskSlotCount() {
        HashMap<String, Long> totalPeons = new HashMap<String, Long>();
        for (ImmutableWorkerInfo worker : this.getWorkers()) {
            String workerCategory = worker.getWorker().getCategory();
            int workerCapacity = worker.getWorker().getCapacity();
            totalPeons.compute(workerCategory, (category, totalCapacity) -> totalCapacity == null ? (long)workerCapacity : totalCapacity + (long)workerCapacity);
        }
        return totalPeons;
    }

    @Override
    public Map<String, Long> getIdleTaskSlotCount() {
        HashMap<String, Long> totalIdlePeons = new HashMap<String, Long>();
        for (ImmutableWorkerInfo worker : this.getWorkersEligibleToRunTasks().values()) {
            String workerCategory = worker.getWorker().getCategory();
            int workerAvailableCapacity = worker.getAvailableCapacity();
            totalIdlePeons.compute(workerCategory, (category, availableCapacity) -> availableCapacity == null ? (long)workerAvailableCapacity : availableCapacity + (long)workerAvailableCapacity);
        }
        return totalIdlePeons;
    }

    @Override
    public Map<String, Long> getUsedTaskSlotCount() {
        HashMap<String, Long> totalUsedPeons = new HashMap<String, Long>();
        for (ImmutableWorkerInfo worker : this.getWorkers()) {
            String workerCategory = worker.getWorker().getCategory();
            int workerUsedCapacity = worker.getCurrCapacityUsed();
            totalUsedPeons.compute(workerCategory, (category, usedCapacity) -> usedCapacity == null ? (long)workerUsedCapacity : usedCapacity + (long)workerUsedCapacity);
        }
        return totalUsedPeons;
    }

    @Override
    public Map<String, Long> getLazyTaskSlotCount() {
        HashMap<String, Long> totalLazyPeons = new HashMap<String, Long>();
        for (Worker worker : this.getLazyWorkers()) {
            String workerCategory = worker.getCategory();
            int workerLazyPeons = worker.getCapacity();
            totalLazyPeons.compute(workerCategory, (category, lazyPeons) -> lazyPeons == null ? (long)workerLazyPeons : lazyPeons + (long)workerLazyPeons);
        }
        return totalLazyPeons;
    }

    @Override
    public Map<String, Long> getBlacklistedTaskSlotCount() {
        HashMap<String, Long> totalBlacklistedPeons = new HashMap<String, Long>();
        for (ImmutableWorkerInfo worker : this.getBlackListedWorkers()) {
            String workerCategory = worker.getWorker().getCategory();
            int workerBlacklistedPeons = worker.getWorker().getCapacity();
            totalBlacklistedPeons.compute(workerCategory, (category, blacklistedPeons) -> blacklistedPeons == null ? (long)workerBlacklistedPeons : blacklistedPeons + (long)workerBlacklistedPeons);
        }
        return totalBlacklistedPeons;
    }

    private static class HttpRemoteTaskRunnerWorkItem
    extends RemoteTaskRunnerWorkItem {
        private Task task;
        private State state;

        HttpRemoteTaskRunnerWorkItem(String taskId, Worker worker, TaskLocation location, @Nullable Task task, String taskType, State state) {
            super(taskId, task == null ? null : task.getType(), worker, location, task == null ? null : task.getDataSource());
            this.state = (State)((Object)Preconditions.checkNotNull((Object)((Object)state)));
            Preconditions.checkArgument((task == null || taskType == null || taskType.equals(task.getType()) ? 1 : 0) != 0);
            this.task = task;
        }

        public Task getTask() {
            return this.task;
        }

        public void setTask(Task task) {
            this.task = task;
            if (this.getTaskType() == null) {
                this.setTaskType(task.getType());
            } else {
                Preconditions.checkArgument((boolean)this.getTaskType().equals(task.getType()));
            }
        }

        @JsonProperty
        public State getState() {
            return this.state;
        }

        @Override
        public void setResult(TaskStatus status) {
            this.setState(State.COMPLETE);
            super.setResult(status);
        }

        public void setState(State state) {
            Preconditions.checkArgument((state.index - this.state.index > 0 ? 1 : 0) != 0, (String)"Invalid state transition from [%s] to [%s]", (Object[])new Object[]{this.state, state});
            this.setStateUnconditionally(state);
        }

        public void revertStateFromPendingWorkerAssignToPending() {
            Preconditions.checkState((this.state == State.PENDING_WORKER_ASSIGN ? 1 : 0) != 0, (String)"Can't move state from [%s] to [%s]", (Object[])new Object[]{this.state, State.PENDING});
            this.setStateUnconditionally(State.PENDING);
        }

        private void setStateUnconditionally(State state) {
            if (log.isDebugEnabled()) {
                log.debug((Throwable)new RuntimeException("Stacktrace..."), "Setting task[%s] work item state from [%s] to [%s].", new Object[]{this.getTaskId(), this.state, state});
            }
            this.state = state;
        }

        static enum State {
            PENDING(0, true, RunnerTaskState.PENDING),
            PENDING_WORKER_ASSIGN(1, true, RunnerTaskState.PENDING),
            RUNNING(2, false, RunnerTaskState.RUNNING),
            COMPLETE(3, false, RunnerTaskState.NONE);

            private final int index;
            private final boolean isPending;
            private final RunnerTaskState runnerTaskState;

            private State(int index, boolean isPending, RunnerTaskState runnerTaskState) {
                this.index = index;
                this.isPending = isPending;
                this.runnerTaskState = runnerTaskState;
            }

            boolean isPending() {
                return this.isPending;
            }

            RunnerTaskState toRunnerTaskState() {
                return this.runnerTaskState;
            }
        }
    }
}

