/*
 * Decompiled with CFR 0.152.
 */
package io.druid.indexing.overlord;

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Joiner;
import com.google.common.base.Optional;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.io.ByteSource;
import com.google.common.io.Closer;
import com.google.common.io.Files;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.inject.Inject;
import com.metamx.common.ISE;
import com.metamx.common.Pair;
import com.metamx.common.lifecycle.LifecycleStop;
import com.metamx.emitter.EmittingLogger;
import io.druid.concurrent.Execs;
import io.druid.guice.annotations.Self;
import io.druid.indexing.common.TaskLocation;
import io.druid.indexing.common.TaskStatus;
import io.druid.indexing.common.config.TaskConfig;
import io.druid.indexing.common.task.Task;
import io.druid.indexing.common.tasklogs.LogUtils;
import io.druid.indexing.overlord.PortFinder;
import io.druid.indexing.overlord.TaskRunner;
import io.druid.indexing.overlord.TaskRunnerListener;
import io.druid.indexing.overlord.TaskRunnerUtils;
import io.druid.indexing.overlord.TaskRunnerWorkItem;
import io.druid.indexing.overlord.autoscaling.ScalingStats;
import io.druid.indexing.overlord.config.ForkingTaskRunnerConfig;
import io.druid.indexing.worker.config.WorkerConfig;
import io.druid.server.DruidNode;
import io.druid.tasklogs.TaskLogPusher;
import io.druid.tasklogs.TaskLogStreamer;
import java.io.Closeable;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.Callable;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import org.joda.time.DateTime;
import org.joda.time.Interval;
import org.joda.time.ReadableInstant;
import org.joda.time.ReadablePeriod;

public class ForkingTaskRunner
implements TaskRunner,
TaskLogStreamer {
    private static final EmittingLogger log = new EmittingLogger(ForkingTaskRunner.class);
    private static final String CHILD_PROPERTY_PREFIX = "druid.indexer.fork.property.";
    private static final String TASK_RESTORE_FILENAME = "restore.json";
    private final ForkingTaskRunnerConfig config;
    private final TaskConfig taskConfig;
    private final Properties props;
    private final TaskLogPusher taskLogPusher;
    private final DruidNode node;
    private final ListeningExecutorService exec;
    private final ObjectMapper jsonMapper;
    private final PortFinder portFinder;
    private final CopyOnWriteArrayList<Pair<TaskRunnerListener, Executor>> listeners = new CopyOnWriteArrayList();
    private final Map<String, ForkingTaskRunnerWorkItem> tasks = Maps.newConcurrentMap();
    private volatile boolean stopping = false;

    @Inject
    public ForkingTaskRunner(ForkingTaskRunnerConfig config, TaskConfig taskConfig, WorkerConfig workerConfig, Properties props, TaskLogPusher taskLogPusher, ObjectMapper jsonMapper, @Self DruidNode node) {
        this.config = config;
        this.taskConfig = taskConfig;
        this.props = props;
        this.taskLogPusher = taskLogPusher;
        this.jsonMapper = jsonMapper;
        this.node = node;
        this.portFinder = new PortFinder(config.getStartPort());
        this.exec = MoreExecutors.listeningDecorator((ExecutorService)Execs.multiThreaded((int)workerConfig.getCapacity(), (String)"forking-task-runner-%d"));
    }

    @Override
    public List<Pair<Task, ListenableFuture<TaskStatus>>> restore() {
        TaskRestoreInfo taskRestoreInfo;
        File restoreFile = this.getRestoreFile();
        if (restoreFile.exists()) {
            try {
                taskRestoreInfo = (TaskRestoreInfo)this.jsonMapper.readValue(restoreFile, TaskRestoreInfo.class);
            }
            catch (Exception e) {
                log.error((Throwable)e, "Failed to read restorable tasks from file[%s]. Skipping restore.", new Object[]{restoreFile});
                return ImmutableList.of();
            }
        } else {
            return ImmutableList.of();
        }
        ArrayList retVal = Lists.newArrayList();
        for (String taskId : taskRestoreInfo.getRunningTasks()) {
            try {
                File taskFile = new File(this.taskConfig.getTaskDir(taskId), "task.json");
                Task task = (Task)this.jsonMapper.readValue(taskFile, Task.class);
                if (!task.getId().equals(taskId)) {
                    throw new ISE("WTF?! Task[%s] restore file had wrong id[%s].", new Object[]{taskId, task.getId()});
                }
                if (!this.taskConfig.isRestoreTasksOnRestart() || !task.canRestore()) continue;
                log.info("Restoring task[%s].", new Object[]{task.getId()});
                retVal.add(Pair.of((Object)task, this.run(task)));
            }
            catch (Exception e) {
                log.warn((Throwable)e, "Failed to restore task[%s]. Trying to restore other tasks.", new Object[]{taskId});
            }
        }
        log.info("Restored %,d tasks.", new Object[]{retVal.size()});
        return retVal;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void registerListener(TaskRunnerListener listener, Executor executor) {
        for (Pair<TaskRunnerListener, Executor> pair : this.listeners) {
            if (!((TaskRunnerListener)pair.lhs).getListenerId().equals(listener.getListenerId())) continue;
            throw new ISE("Listener [%s] already registered", new Object[]{listener.getListenerId()});
        }
        Pair listenerPair = Pair.of((Object)listener, (Object)executor);
        Map<String, ForkingTaskRunnerWorkItem> map = this.tasks;
        synchronized (map) {
            for (ForkingTaskRunnerWorkItem item : this.tasks.values()) {
                TaskRunnerUtils.notifyLocationChanged((Iterable<Pair<TaskRunnerListener, Executor>>)ImmutableList.of((Object)listenerPair), item.getTaskId(), item.getLocation());
            }
            this.listeners.add((Pair<TaskRunnerListener, Executor>)listenerPair);
            log.info("Registered listener [%s]", new Object[]{listener.getListenerId()});
        }
    }

    @Override
    public void unregisterListener(String listenerId) {
        for (Pair<TaskRunnerListener, Executor> pair : this.listeners) {
            if (!((TaskRunnerListener)pair.lhs).getListenerId().equals(listenerId)) continue;
            this.listeners.remove(pair);
            log.info("Unregistered listener [%s]", new Object[]{listenerId});
            return;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public ListenableFuture<TaskStatus> run(final Task task) {
        Map<String, ForkingTaskRunnerWorkItem> map = this.tasks;
        synchronized (map) {
            if (!this.tasks.containsKey(task.getId())) {
                this.tasks.put(task.getId(), new ForkingTaskRunnerWorkItem(task, this.exec.submit((Callable)new Callable<TaskStatus>(){

                    /*
                     * Exception decompiling
                     */
                    @Override
                    public TaskStatus call() {
                        /*
                         * This method has failed to decompile.  When submitting a bug report, please provide this stack trace, and (if you hold appropriate legal rights) the relevant class file.
                         * 
                         * org.benf.cfr.reader.util.ConfusedCFRException: Started 2 blocks at once
                         *     at org.benf.cfr.reader.bytecode.analysis.opgraph.Op04StructuredStatement.getStartingBlocks(Op04StructuredStatement.java:412)
                         *     at org.benf.cfr.reader.bytecode.analysis.opgraph.Op04StructuredStatement.buildNestedBlocks(Op04StructuredStatement.java:487)
                         *     at org.benf.cfr.reader.bytecode.analysis.opgraph.Op03SimpleStatement.createInitialStructuredBlock(Op03SimpleStatement.java:736)
                         *     at org.benf.cfr.reader.bytecode.CodeAnalyser.getAnalysisInner(CodeAnalyser.java:850)
                         *     at org.benf.cfr.reader.bytecode.CodeAnalyser.getAnalysisOrWrapFail(CodeAnalyser.java:278)
                         *     at org.benf.cfr.reader.bytecode.CodeAnalyser.getAnalysis(CodeAnalyser.java:201)
                         *     at org.benf.cfr.reader.entities.attributes.AttributeCode.analyse(AttributeCode.java:94)
                         *     at org.benf.cfr.reader.entities.Method.analyse(Method.java:531)
                         *     at org.benf.cfr.reader.entities.ClassFile.analyseMid(ClassFile.java:1055)
                         *     at org.benf.cfr.reader.entities.ClassFile.analyseInnerClassesPass1(ClassFile.java:923)
                         *     at org.benf.cfr.reader.entities.ClassFile.analyseMid(ClassFile.java:1035)
                         *     at org.benf.cfr.reader.entities.ClassFile.analyseTop(ClassFile.java:942)
                         *     at org.benf.cfr.reader.Driver.doJarVersionTypes(Driver.java:257)
                         *     at org.benf.cfr.reader.Driver.doJar(Driver.java:139)
                         *     at org.benf.cfr.reader.CfrDriverImpl.analyse(CfrDriverImpl.java:76)
                         *     at org.benf.cfr.reader.Main.main(Main.java:54)
                         */
                        throw new IllegalStateException("Decompilation failed");
                    }
                })));
            }
            this.saveRunningTasks();
            return this.tasks.get(task.getId()).getResult();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    @LifecycleStop
    public void stop() {
        block11: {
            this.stopping = true;
            this.exec.shutdown();
            Map<String, ForkingTaskRunnerWorkItem> map = this.tasks;
            synchronized (map) {
                for (ForkingTaskRunnerWorkItem taskWorkItem : this.tasks.values()) {
                    if (taskWorkItem.processHolder == null) continue;
                    log.info("Closing output stream to task[%s].", new Object[]{taskWorkItem.getTask().getId()});
                    try {
                        taskWorkItem.processHolder.process.getOutputStream().close();
                    }
                    catch (Exception e) {
                        log.warn((Throwable)e, "Failed to close stdout to task[%s]. Destroying task.", new Object[]{taskWorkItem.getTask().getId()});
                        taskWorkItem.processHolder.process.destroy();
                    }
                }
            }
            DateTime start = new DateTime();
            long timeout = new Interval((ReadableInstant)start, (ReadablePeriod)this.taskConfig.getGracefulShutdownTimeout()).toDurationMillis();
            log.info("Waiting up to %,dms for shutdown.", new Object[]{timeout});
            if (timeout > 0L) {
                try {
                    boolean terminated = this.exec.awaitTermination(timeout, TimeUnit.MILLISECONDS);
                    long elapsed = System.currentTimeMillis() - start.getMillis();
                    if (terminated) {
                        log.info("Finished stopping in %,dms.", new Object[]{elapsed});
                        break block11;
                    }
                    ImmutableSet stillRunning = ImmutableSet.copyOf(this.tasks.keySet());
                    log.makeAlert("Failed to stop forked tasks", new Object[0]).addData("stillRunning", (Object)stillRunning).addData("elapsed", (Object)elapsed).emit();
                    log.warn("Executor failed to stop after %,dms, not waiting for it! Tasks still running: [%s]", new Object[]{elapsed, Joiner.on((String)"; ").join((Iterable)stillRunning)});
                }
                catch (InterruptedException e) {
                    log.warn((Throwable)e, "Interrupted while waiting for executor to finish.", new Object[0]);
                    Thread.currentThread().interrupt();
                }
            } else {
                log.warn("Ran out of time, not waiting for executor to finish!", new Object[0]);
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void shutdown(String taskid) {
        ForkingTaskRunnerWorkItem taskInfo;
        Map<String, ForkingTaskRunnerWorkItem> map = this.tasks;
        synchronized (map) {
            taskInfo = this.tasks.get(taskid);
            if (taskInfo == null) {
                log.info("Ignoring request to cancel unknown task: %s", new Object[]{taskid});
                return;
            }
            taskInfo.shutdown = true;
        }
        if (taskInfo.processHolder != null) {
            log.info("Killing process for task: %s", new Object[]{taskid});
            taskInfo.processHolder.process.destroy();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public Collection<TaskRunnerWorkItem> getRunningTasks() {
        Map<String, ForkingTaskRunnerWorkItem> map = this.tasks;
        synchronized (map) {
            ArrayList ret = Lists.newArrayList();
            for (ForkingTaskRunnerWorkItem taskWorkItem : this.tasks.values()) {
                if (taskWorkItem.processHolder == null) continue;
                ret.add(taskWorkItem);
            }
            return ret;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public Collection<TaskRunnerWorkItem> getPendingTasks() {
        Map<String, ForkingTaskRunnerWorkItem> map = this.tasks;
        synchronized (map) {
            ArrayList ret = Lists.newArrayList();
            for (ForkingTaskRunnerWorkItem taskWorkItem : this.tasks.values()) {
                if (taskWorkItem.processHolder != null) continue;
                ret.add(taskWorkItem);
            }
            return ret;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public Collection<TaskRunnerWorkItem> getKnownTasks() {
        Map<String, ForkingTaskRunnerWorkItem> map = this.tasks;
        synchronized (map) {
            return Lists.newArrayList(this.tasks.values());
        }
    }

    @Override
    public Optional<ScalingStats> getScalingStats() {
        return Optional.absent();
    }

    @Override
    public void start() {
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public Optional<ByteSource> streamTaskLog(String taskid, final long offset) {
        ProcessHolder processHolder;
        Map<String, ForkingTaskRunnerWorkItem> map = this.tasks;
        synchronized (map) {
            ForkingTaskRunnerWorkItem taskWorkItem = this.tasks.get(taskid);
            if (taskWorkItem == null || taskWorkItem.processHolder == null) {
                return Optional.absent();
            }
            processHolder = taskWorkItem.processHolder;
        }
        return Optional.of((Object)new ByteSource(){

            public InputStream openStream() throws IOException {
                return LogUtils.streamFile(processHolder.logFile, offset);
            }
        });
    }

    private void saveRunningTasks() {
        File restoreFile = this.getRestoreFile();
        ArrayList theTasks = Lists.newArrayList();
        for (ForkingTaskRunnerWorkItem forkingTaskRunnerWorkItem : this.tasks.values()) {
            theTasks.add(forkingTaskRunnerWorkItem.getTaskId());
        }
        try {
            Files.createParentDirs((File)restoreFile);
            this.jsonMapper.writeValue(restoreFile, (Object)new TaskRestoreInfo(theTasks));
        }
        catch (Exception e) {
            log.warn((Throwable)e, "Failed to save tasks to restore file[%s]. Skipping this save.", new Object[]{restoreFile});
        }
    }

    private File getRestoreFile() {
        return new File(this.taskConfig.getBaseTaskDir(), TASK_RESTORE_FILENAME);
    }

    static /* synthetic */ TaskConfig access$000(ForkingTaskRunner x0) {
        return x0.taskConfig;
    }

    static /* synthetic */ DruidNode access$100(ForkingTaskRunner x0) {
        return x0.node;
    }

    static /* synthetic */ ForkingTaskRunnerConfig access$200(ForkingTaskRunner x0) {
        return x0.config;
    }

    static /* synthetic */ PortFinder access$300(ForkingTaskRunner x0) {
        return x0.portFinder;
    }

    static /* synthetic */ Map access$400(ForkingTaskRunner x0) {
        return x0.tasks;
    }

    static /* synthetic */ EmittingLogger access$600() {
        return log;
    }

    static /* synthetic */ Properties access$800(ForkingTaskRunner x0) {
        return x0.props;
    }

    static /* synthetic */ ObjectMapper access$900(ForkingTaskRunner x0) {
        return x0.jsonMapper;
    }

    static /* synthetic */ CopyOnWriteArrayList access$1200(ForkingTaskRunner x0) {
        return x0.listeners;
    }

    static /* synthetic */ TaskLogPusher access$1400(ForkingTaskRunner x0) {
        return x0.taskLogPusher;
    }

    static /* synthetic */ boolean access$1500(ForkingTaskRunner x0) {
        return x0.stopping;
    }

    static /* synthetic */ void access$1600(ForkingTaskRunner x0) {
        x0.saveRunningTasks();
    }

    private static class ProcessHolder {
        private final Process process;
        private final File logFile;
        private final String host;
        private final int port;

        private ProcessHolder(Process process, File logFile, String host, int port) {
            this.process = process;
            this.logFile = logFile;
            this.host = host;
            this.port = port;
        }

        private void registerWithCloser(Closer closer) {
            closer.register((Closeable)this.process.getInputStream());
            closer.register((Closeable)this.process.getOutputStream());
        }

        static /* synthetic */ void access$1100(ProcessHolder x0, Closer x1) {
            x0.registerWithCloser(x1);
        }
    }

    private static class ForkingTaskRunnerWorkItem
    extends TaskRunnerWorkItem {
        private final Task task;
        private volatile boolean shutdown = false;
        private volatile ProcessHolder processHolder = null;

        private ForkingTaskRunnerWorkItem(Task task, ListenableFuture<TaskStatus> statusFuture) {
            super(task.getId(), statusFuture);
            this.task = task;
        }

        public Task getTask() {
            return this.task;
        }

        @Override
        public TaskLocation getLocation() {
            if (this.processHolder == null) {
                return TaskLocation.unknown();
            }
            return TaskLocation.create(this.processHolder.host, this.processHolder.port);
        }

        static /* synthetic */ boolean access$500(ForkingTaskRunnerWorkItem x0) {
            return x0.shutdown;
        }

        static /* synthetic */ ProcessHolder access$702(ForkingTaskRunnerWorkItem x0, ProcessHolder x1) {
            x0.processHolder = x1;
            return x0.processHolder;
        }
    }

    private static class TaskRestoreInfo {
        @JsonProperty
        private final List<String> runningTasks;

        @JsonCreator
        public TaskRestoreInfo(@JsonProperty(value="runningTasks") List<String> runningTasks) {
            this.runningTasks = runningTasks;
        }

        public List<String> getRunningTasks() {
            return this.runningTasks;
        }
    }
}

