/*
 * Decompiled with CFR 0.152.
 */
package io.druid.indexing.worker;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.inject.Inject;
import io.druid.indexer.TaskLocation;
import io.druid.indexing.common.TaskStatus;
import io.druid.indexing.common.task.Task;
import io.druid.indexing.overlord.TaskRunner;
import io.druid.indexing.overlord.TaskRunnerListener;
import io.druid.indexing.worker.TaskAnnouncement;
import io.druid.indexing.worker.WorkerCuratorCoordinator;
import io.druid.java.util.common.Pair;
import io.druid.java.util.common.concurrent.Execs;
import io.druid.java.util.common.lifecycle.LifecycleStart;
import io.druid.java.util.common.lifecycle.LifecycleStop;
import io.druid.java.util.emitter.EmittingLogger;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.TimeUnit;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.cache.PathChildrenCache;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;

public class WorkerTaskMonitor {
    private static final EmittingLogger log = new EmittingLogger(WorkerTaskMonitor.class);
    private static final int STOP_WARNING_SECONDS = 10;
    private final ObjectMapper jsonMapper;
    private final PathChildrenCache pathChildrenCache;
    private final CuratorFramework cf;
    private final WorkerCuratorCoordinator workerCuratorCoordinator;
    private final TaskRunner taskRunner;
    private final ExecutorService exec;
    private final BlockingQueue<Notice> notices = new LinkedBlockingDeque<Notice>();
    private final Map<String, TaskDetails> running = new ConcurrentHashMap<String, TaskDetails>();
    private final CountDownLatch doneStopping = new CountDownLatch(1);
    private final Object lifecycleLock = new Object();
    private volatile boolean started = false;

    @Inject
    public WorkerTaskMonitor(ObjectMapper jsonMapper, CuratorFramework cf, WorkerCuratorCoordinator workerCuratorCoordinator, TaskRunner taskRunner) {
        this.jsonMapper = jsonMapper;
        this.pathChildrenCache = new PathChildrenCache(cf, workerCuratorCoordinator.getTaskPathForWorker(), false, true, Execs.makeThreadFactory((String)"TaskMonitorCache-%s"));
        this.cf = cf;
        this.workerCuratorCoordinator = workerCuratorCoordinator;
        this.taskRunner = taskRunner;
        this.exec = Execs.singleThreaded((String)"WorkerTaskMonitor");
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @LifecycleStart
    public void start() throws Exception {
        Object object = this.lifecycleLock;
        synchronized (object) {
            Preconditions.checkState((!this.started ? 1 : 0) != 0, (Object)"already started");
            Preconditions.checkState((!this.exec.isShutdown() ? 1 : 0) != 0, (Object)"already stopped");
            this.started = true;
            try {
                this.restoreRestorableTasks();
                this.cleanupStaleAnnouncements();
                this.registerRunListener();
                this.registerLocationListener();
                this.pathChildrenCache.start();
                this.exec.submit(new Runnable(){

                    @Override
                    public void run() {
                        WorkerTaskMonitor.this.mainLoop();
                    }
                });
                log.info("Started WorkerTaskMonitor.", new Object[0]);
                this.started = true;
            }
            catch (InterruptedException e) {
                throw e;
            }
            catch (Exception e) {
                log.makeAlert((Throwable)e, "Exception starting WorkerTaskMonitor", new Object[0]).emit();
                throw e;
            }
        }
    }

    private void mainLoop() {
        block10: {
            block8: while (true) {
                try {
                    while (!Thread.currentThread().isInterrupted()) {
                        Notice notice = this.notices.take();
                        try {
                            notice.handle();
                            continue block8;
                        }
                        catch (InterruptedException e) {
                            throw e;
                        }
                        catch (Exception e) {
                            log.makeAlert((Throwable)e, "Failed to handle notice", new Object[0]).addData("noticeClass", (Object)notice.getClass().getSimpleName()).addData("noticeTaskId", (Object)notice.getTaskId()).emit();
                        }
                    }
                    break block10;
                }
                catch (InterruptedException e) {
                    log.info("WorkerTaskMonitor interrupted, exiting.", new Object[0]);
                    break block10;
                }
            }
            finally {
                this.doneStopping.countDown();
            }
        }
    }

    private void restoreRestorableTasks() {
        List<Pair<Task, ListenableFuture<TaskStatus>>> restored = this.taskRunner.restore();
        for (Pair<Task, ListenableFuture<TaskStatus>> pair : restored) {
            this.addRunningTask((Task)pair.lhs, (ListenableFuture<TaskStatus>)((ListenableFuture)pair.rhs));
        }
    }

    private void cleanupStaleAnnouncements() throws Exception {
        for (TaskAnnouncement announcement : this.workerCuratorCoordinator.getAnnouncements()) {
            if (this.running.containsKey(announcement.getTaskStatus().getId()) || !announcement.getTaskStatus().isRunnable()) continue;
            log.info("Cleaning up stale announcement for task [%s].", new Object[]{announcement.getTaskStatus().getId()});
            this.workerCuratorCoordinator.updateTaskStatusAnnouncement(TaskAnnouncement.create(announcement.getTaskStatus().getId(), announcement.getTaskResource(), TaskStatus.failure(announcement.getTaskStatus().getId()), TaskLocation.unknown()));
        }
    }

    private void registerRunListener() {
        this.pathChildrenCache.getListenable().addListener((Object)new PathChildrenCacheListener(){

            public void childEvent(CuratorFramework curatorFramework, PathChildrenCacheEvent pathChildrenCacheEvent) throws Exception {
                if (pathChildrenCacheEvent.getType().equals((Object)PathChildrenCacheEvent.Type.CHILD_ADDED)) {
                    Task task = (Task)WorkerTaskMonitor.this.jsonMapper.readValue((byte[])WorkerTaskMonitor.this.cf.getData().forPath(pathChildrenCacheEvent.getData().getPath()), Task.class);
                    WorkerTaskMonitor.this.notices.add(new RunNotice(task));
                }
            }
        });
    }

    private void registerLocationListener() {
        this.taskRunner.registerListener(new TaskRunnerListener(){

            @Override
            public String getListenerId() {
                return "WorkerTaskMonitor";
            }

            @Override
            public void locationChanged(String taskId, TaskLocation newLocation) {
                WorkerTaskMonitor.this.notices.add(new LocationNotice(taskId, newLocation));
            }

            @Override
            public void statusChanged(String taskId, TaskStatus status) {
            }
        }, (Executor)MoreExecutors.sameThreadExecutor());
    }

    private void addRunningTask(final Task task, ListenableFuture<TaskStatus> future) {
        this.running.put(task.getId(), new TaskDetails(task));
        Futures.addCallback(future, (FutureCallback)new FutureCallback<TaskStatus>(){

            public void onSuccess(TaskStatus result) {
                WorkerTaskMonitor.this.notices.add(new StatusNotice(task, result));
            }

            public void onFailure(Throwable t) {
                WorkerTaskMonitor.this.notices.add(new StatusNotice(task, TaskStatus.failure(task.getId())));
            }
        });
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @LifecycleStop
    public void stop() throws InterruptedException {
        Object object = this.lifecycleLock;
        synchronized (object) {
            Preconditions.checkState((boolean)this.started, (Object)"not started");
            try {
                this.started = false;
                this.taskRunner.unregisterListener("WorkerTaskMonitor");
                this.exec.shutdownNow();
                this.pathChildrenCache.close();
                this.taskRunner.stop();
                if (!this.doneStopping.await(10L, TimeUnit.SECONDS)) {
                    log.warn("WorkerTaskMonitor taking longer than %s seconds to exit. Still waiting...", new Object[]{10});
                    this.doneStopping.await();
                }
                log.info("Stopped WorkerTaskMonitor.", new Object[0]);
            }
            catch (InterruptedException e) {
                throw e;
            }
            catch (Exception e) {
                log.makeAlert((Throwable)e, "Exception stopping WorkerTaskMonitor", new Object[0]).emit();
            }
        }
    }

    private class LocationNotice
    implements Notice {
        private final String taskId;
        private final TaskLocation location;

        public LocationNotice(String taskId, TaskLocation location) {
            this.taskId = taskId;
            this.location = location;
        }

        @Override
        public String getTaskId() {
            return this.taskId;
        }

        @Override
        public void handle() throws InterruptedException {
            TaskDetails details = (TaskDetails)WorkerTaskMonitor.this.running.get(this.taskId);
            if (details == null) {
                log.warn("Got location notice for task [%s] that isn't running...", new Object[]{this.taskId});
                return;
            }
            if (!Objects.equals(details.location, this.location)) {
                details.location = this.location;
                try {
                    log.info("Updating task [%s] announcement with location [%s]", new Object[]{this.taskId, this.location});
                    WorkerTaskMonitor.this.workerCuratorCoordinator.updateTaskStatusAnnouncement(TaskAnnouncement.create(details.task, details.status, details.location));
                }
                catch (InterruptedException e) {
                    throw e;
                }
                catch (Exception e) {
                    log.makeAlert((Throwable)e, "Failed to update task announcement", new Object[0]).addData("task", (Object)this.taskId).emit();
                }
            }
        }
    }

    private class StatusNotice
    implements Notice {
        private final Task task;
        private final TaskStatus status;

        public StatusNotice(Task task, TaskStatus status) {
            this.task = task;
            this.status = status;
        }

        @Override
        public String getTaskId() {
            return this.task.getId();
        }

        @Override
        public void handle() throws Exception {
            TaskDetails details = (TaskDetails)WorkerTaskMonitor.this.running.get(this.task.getId());
            if (details == null) {
                log.warn("Got status notice for task [%s] that isn't running...", new Object[]{this.task.getId()});
                return;
            }
            if (!this.status.isComplete()) {
                log.warn("WTF?! Got status notice for task [%s] that isn't complete (status = [%s])...", new Object[]{this.task.getId(), this.status.getStatusCode()});
                return;
            }
            details.status = this.status.withDuration(System.currentTimeMillis() - details.startTime);
            try {
                WorkerTaskMonitor.this.workerCuratorCoordinator.updateTaskStatusAnnouncement(TaskAnnouncement.create(details.task, details.status, details.location));
                log.info("Job's finished. Completed [%s] with status [%s]", new Object[]{this.task.getId(), this.status.getStatusCode()});
            }
            catch (InterruptedException e) {
                throw e;
            }
            catch (Exception e) {
                log.makeAlert((Throwable)e, "Failed to update task announcement", new Object[0]).addData("task", (Object)this.task.getId()).emit();
            }
            finally {
                WorkerTaskMonitor.this.running.remove(this.task.getId());
            }
        }
    }

    private class RunNotice
    implements Notice {
        private final Task task;

        public RunNotice(Task task) {
            this.task = task;
        }

        @Override
        public String getTaskId() {
            return this.task.getId();
        }

        @Override
        public void handle() throws Exception {
            if (WorkerTaskMonitor.this.running.containsKey(this.task.getId())) {
                log.warn("Got run notice for task [%s] that I am already running...", new Object[]{this.task.getId()});
                WorkerTaskMonitor.this.workerCuratorCoordinator.removeTaskRunZnode(this.task.getId());
                return;
            }
            log.info("Submitting runnable for task[%s]", new Object[]{this.task.getId()});
            WorkerTaskMonitor.this.workerCuratorCoordinator.updateTaskStatusAnnouncement(TaskAnnouncement.create(this.task, TaskStatus.running(this.task.getId()), TaskLocation.unknown()));
            log.info("Affirmative. Running task [%s]", new Object[]{this.task.getId()});
            WorkerTaskMonitor.this.workerCuratorCoordinator.removeTaskRunZnode(this.task.getId());
            ListenableFuture<TaskStatus> future = WorkerTaskMonitor.this.taskRunner.run(this.task);
            WorkerTaskMonitor.this.addRunningTask(this.task, (ListenableFuture<TaskStatus>)future);
        }
    }

    private static interface Notice {
        public String getTaskId();

        public void handle() throws Exception;
    }

    private static class TaskDetails {
        private final Task task;
        private final long startTime;
        private TaskStatus status;
        private TaskLocation location;

        public TaskDetails(Task task) {
            this.task = task;
            this.startTime = System.currentTimeMillis();
            this.status = TaskStatus.running(task.getId());
            this.location = TaskLocation.unknown();
        }
    }
}

