/*
 * Decompiled with CFR 0.152.
 */
package org.apache.druid.indexing.overlord;

import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.inject.Inject;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import org.apache.druid.concurrent.TaskThreadPriority;
import org.apache.druid.guice.annotations.Self;
import org.apache.druid.indexer.TaskLocation;
import org.apache.druid.indexer.TaskStatus;
import org.apache.druid.indexing.common.TaskToolbox;
import org.apache.druid.indexing.common.TaskToolboxFactory;
import org.apache.druid.indexing.common.config.TaskConfig;
import org.apache.druid.indexing.common.task.Task;
import org.apache.druid.indexing.overlord.TaskRunner;
import org.apache.druid.indexing.overlord.TaskRunnerListener;
import org.apache.druid.indexing.overlord.TaskRunnerUtils;
import org.apache.druid.indexing.overlord.TaskRunnerWorkItem;
import org.apache.druid.indexing.overlord.autoscaling.ScalingStats;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.Numbers;
import org.apache.druid.java.util.common.Pair;
import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.java.util.common.lifecycle.LifecycleStart;
import org.apache.druid.java.util.common.lifecycle.LifecycleStop;
import org.apache.druid.java.util.emitter.EmittingLogger;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
import org.apache.druid.query.DataSource;
import org.apache.druid.query.Query;
import org.apache.druid.query.QueryRunner;
import org.apache.druid.query.QuerySegmentWalker;
import org.apache.druid.query.SegmentDescriptor;
import org.apache.druid.query.TableDataSource;
import org.apache.druid.query.planning.DataSourceAnalysis;
import org.apache.druid.server.DruidNode;
import org.apache.druid.server.SetAndVerifyContextQueryRunner;
import org.apache.druid.server.initialization.ServerConfig;
import org.joda.time.Interval;
import org.joda.time.ReadableInstant;
import org.joda.time.ReadablePeriod;

public class SingleTaskBackgroundRunner
implements TaskRunner,
QuerySegmentWalker {
    private static final EmittingLogger log = new EmittingLogger(SingleTaskBackgroundRunner.class);
    private final TaskToolboxFactory toolboxFactory;
    private final TaskConfig taskConfig;
    private final ServiceEmitter emitter;
    private final TaskLocation location;
    private final ServerConfig serverConfig;
    private final CopyOnWriteArrayList<Pair<TaskRunnerListener, Executor>> listeners = new CopyOnWriteArrayList();
    private volatile ListeningExecutorService executorService;
    private volatile SingleTaskBackgroundRunnerWorkItem runningItem;
    private volatile boolean stopping;

    @Inject
    public SingleTaskBackgroundRunner(TaskToolboxFactory toolboxFactory, TaskConfig taskConfig, ServiceEmitter emitter, @Self DruidNode node, ServerConfig serverConfig) {
        this.toolboxFactory = (TaskToolboxFactory)Preconditions.checkNotNull((Object)toolboxFactory, (Object)"toolboxFactory");
        this.taskConfig = taskConfig;
        this.emitter = (ServiceEmitter)Preconditions.checkNotNull((Object)emitter, (Object)"emitter");
        this.location = TaskLocation.create((String)node.getHost(), (int)node.getPlaintextPort(), (int)node.getTlsPort());
        this.serverConfig = serverConfig;
    }

    @Override
    public List<Pair<Task, ListenableFuture<TaskStatus>>> restore() {
        return Collections.emptyList();
    }

    @Override
    public void registerListener(TaskRunnerListener listener, Executor executor) {
        for (Pair<TaskRunnerListener, Executor> pair : this.listeners) {
            if (!((TaskRunnerListener)pair.lhs).getListenerId().equals(listener.getListenerId())) continue;
            throw new ISE("Listener [%s] already registered", new Object[]{listener.getListenerId()});
        }
        Pair listenerPair = Pair.of((Object)listener, (Object)executor);
        this.listeners.add((Pair<TaskRunnerListener, Executor>)listenerPair);
        log.info("Registered listener [%s]", new Object[]{listener.getListenerId()});
        if (this.runningItem != null) {
            TaskRunnerUtils.notifyLocationChanged((Iterable<Pair<TaskRunnerListener, Executor>>)ImmutableList.of((Object)listenerPair), this.runningItem.getTaskId(), this.runningItem.getLocation());
        }
    }

    @Override
    public void unregisterListener(String listenerId) {
        for (Pair<TaskRunnerListener, Executor> pair : this.listeners) {
            if (!((TaskRunnerListener)pair.lhs).getListenerId().equals(listenerId)) continue;
            this.listeners.remove(pair);
            log.info("Unregistered listener [%s]", new Object[]{listenerId});
            return;
        }
    }

    private static ListeningExecutorService buildExecutorService(int priority) {
        return MoreExecutors.listeningDecorator((ExecutorService)Execs.singleThreaded((String)("task-runner-%d-priority-" + priority), (Integer)TaskThreadPriority.getThreadPriorityFromTaskPriority((int)priority)));
    }

    @Override
    @LifecycleStart
    public void start() {
    }

    @Override
    @LifecycleStop
    public void stop() {
        this.stopping = true;
        if (this.executorService != null) {
            try {
                this.executorService.shutdown();
            }
            catch (SecurityException ex) {
                log.error((Throwable)ex, "I can't control my own threads!", new Object[0]);
            }
        }
        if (this.runningItem != null) {
            Task task = this.runningItem.getTask();
            long start = System.currentTimeMillis();
            boolean error = false;
            log.info("Starting graceful shutdown of task[%s].", new Object[]{task.getId()});
            task.stopGracefully(this.taskConfig);
            if (this.taskConfig.isRestoreTasksOnRestart() && task.canRestore()) {
                try {
                    TaskStatus taskStatus = (TaskStatus)this.runningItem.getResult().get(new Interval((ReadableInstant)DateTimes.utc((long)start), (ReadablePeriod)this.taskConfig.getGracefulShutdownTimeout()).toDurationMillis(), TimeUnit.MILLISECONDS);
                    log.info("Graceful shutdown of task[%s] finished in %,dms.", new Object[]{task.getId(), System.currentTimeMillis() - start});
                    TaskRunnerUtils.notifyStatusChanged(this.listeners, task.getId(), taskStatus);
                }
                catch (Exception e) {
                    log.makeAlert((Throwable)e, "Graceful task shutdown failed: %s", new Object[]{task.getDataSource()}).addData("taskId", (Object)task.getId()).addData("dataSource", (Object)task.getDataSource()).emit();
                    log.warn((Throwable)e, "Graceful shutdown of task[%s] aborted with exception.", new Object[]{task.getId()});
                    error = true;
                    TaskRunnerUtils.notifyStatusChanged(this.listeners, task.getId(), TaskStatus.failure((String)task.getId(), (String)"Failed to stop gracefully with exception. See task logs for more details."));
                }
            } else {
                TaskRunnerUtils.notifyStatusChanged(this.listeners, task.getId(), TaskStatus.failure((String)task.getId(), (String)"Canceled as task execution process stopped"));
            }
            long elapsed = System.currentTimeMillis() - start;
            ServiceMetricEvent.Builder metricBuilder = ServiceMetricEvent.builder().setDimension("task", (Object)task.getId()).setDimension("dataSource", (Object)task.getDataSource()).setDimension("graceful", (Object)"true").setDimension("error", (Object)String.valueOf(error));
            this.emitter.emit(metricBuilder.build("task/interrupt/count", (Number)1L));
            this.emitter.emit(metricBuilder.build("task/interrupt/elapsed", (Number)elapsed));
        }
        if (this.executorService != null) {
            try {
                this.executorService.shutdownNow();
            }
            catch (SecurityException ex) {
                log.error((Throwable)ex, "I can't control my own threads!", new Object[0]);
            }
        }
    }

    @Override
    public ListenableFuture<TaskStatus> run(Task task) {
        if (this.runningItem == null) {
            TaskToolbox toolbox = this.toolboxFactory.build(task);
            Object taskPriorityObj = task.getContextValue("backgroundThreadPriority");
            int taskPriority = 0;
            try {
                taskPriority = taskPriorityObj == null ? 0 : Numbers.parseInt(taskPriorityObj);
            }
            catch (NumberFormatException e) {
                log.error((Throwable)e, "Error parsing task priority [%s] for task [%s]", new Object[]{taskPriorityObj, task.getId()});
            }
            this.executorService = SingleTaskBackgroundRunner.buildExecutorService(taskPriority);
            ListenableFuture statusFuture = this.executorService.submit((Callable)new SingleTaskBackgroundRunnerCallable(task, this.location, toolbox));
            this.runningItem = new SingleTaskBackgroundRunnerWorkItem(task, this.location, statusFuture);
            return statusFuture;
        }
        throw new ISE("Already running task[%s]", new Object[]{this.runningItem.getTask().getId()});
    }

    @Override
    public void shutdown(String taskid, String reason) {
        log.info("Shutdown [%s] because: [%s]", new Object[]{taskid, reason});
        if (this.runningItem != null && this.runningItem.getTask().getId().equals(taskid)) {
            this.runningItem.getResult().cancel(true);
        }
    }

    public Collection<TaskRunnerWorkItem> getRunningTasks() {
        return this.runningItem == null ? Collections.emptyList() : Collections.singletonList(this.runningItem);
    }

    public Collection<TaskRunnerWorkItem> getPendingTasks() {
        return Collections.emptyList();
    }

    public Collection<TaskRunnerWorkItem> getKnownTasks() {
        return this.runningItem == null ? Collections.emptyList() : Collections.singletonList(this.runningItem);
    }

    @Override
    public TaskLocation getTaskLocation(String taskId) {
        return this.location;
    }

    @Override
    public Optional<ScalingStats> getScalingStats() {
        return Optional.absent();
    }

    @Override
    public long getTotalTaskSlotCount() {
        return 1L;
    }

    @Override
    public long getIdleTaskSlotCount() {
        return this.runningItem == null ? 1L : 0L;
    }

    @Override
    public long getUsedTaskSlotCount() {
        return this.runningItem == null ? 0L : 1L;
    }

    @Override
    public long getLazyTaskSlotCount() {
        return 0L;
    }

    @Override
    public long getBlacklistedTaskSlotCount() {
        return 0L;
    }

    public <T> QueryRunner<T> getQueryRunnerForIntervals(Query<T> query, Iterable<Interval> intervals) {
        return this.getQueryRunnerImpl(query);
    }

    public <T> QueryRunner<T> getQueryRunnerForSegments(Query<T> query, Iterable<SegmentDescriptor> specs) {
        return this.getQueryRunnerImpl(query);
    }

    private <T> QueryRunner<T> getQueryRunnerImpl(Query<T> query) {
        QueryRunner<T> queryRunner = null;
        if (this.runningItem != null) {
            QueryRunner<T> taskQueryRunner;
            DataSourceAnalysis analysis = DataSourceAnalysis.forDataSource((DataSource)query.getDataSource());
            Task task = this.runningItem.getTask();
            if (analysis.getBaseTableDataSource().isPresent() && task.getDataSource().equals(((TableDataSource)analysis.getBaseTableDataSource().get()).getName()) && (taskQueryRunner = task.getQueryRunner(query)) != null) {
                queryRunner = taskQueryRunner;
            }
        }
        return new SetAndVerifyContextQueryRunner(this.serverConfig, (QueryRunner)(queryRunner == null ? new QueryRunner<T>() : queryRunner));
    }

    private class SingleTaskBackgroundRunnerCallable
    implements Callable<TaskStatus> {
        private final Task task;
        private final TaskLocation location;
        private final TaskToolbox toolbox;

        SingleTaskBackgroundRunnerCallable(Task task, TaskLocation location, TaskToolbox toolbox) {
            this.task = task;
            this.location = location;
            this.toolbox = toolbox;
        }

        @Override
        public TaskStatus call() {
            TaskStatus status;
            long startTime = System.currentTimeMillis();
            try {
                log.info("Running task: %s", new Object[]{this.task.getId()});
                TaskRunnerUtils.notifyLocationChanged(SingleTaskBackgroundRunner.this.listeners, this.task.getId(), this.location);
                TaskRunnerUtils.notifyStatusChanged(SingleTaskBackgroundRunner.this.listeners, this.task.getId(), TaskStatus.running((String)this.task.getId()));
                status = this.task.run(this.toolbox);
            }
            catch (InterruptedException e) {
                if (SingleTaskBackgroundRunner.this.stopping) {
                    log.debug((Throwable)e, "Interrupted while running task[%s] during graceful shutdown.", new Object[]{this.task});
                } else {
                    log.warn((Throwable)e, "Interrupted while running task[%s]", new Object[]{this.task});
                }
                status = TaskStatus.failure((String)this.task.getId(), (String)e.toString());
            }
            catch (Exception e) {
                log.error((Throwable)e, "Exception while running task[%s]", new Object[]{this.task});
                status = TaskStatus.failure((String)this.task.getId(), (String)e.toString());
            }
            catch (Throwable t) {
                log.error(t, "Uncaught Throwable while running task[%s]", new Object[]{this.task});
                throw t;
            }
            status = status.withDuration(System.currentTimeMillis() - startTime);
            TaskRunnerUtils.notifyStatusChanged(SingleTaskBackgroundRunner.this.listeners, this.task.getId(), status);
            return status;
        }
    }

    private static class SingleTaskBackgroundRunnerWorkItem
    extends TaskRunnerWorkItem {
        private final Task task;
        private final TaskLocation location;

        private SingleTaskBackgroundRunnerWorkItem(Task task, TaskLocation location, ListenableFuture<TaskStatus> result) {
            super(task.getId(), result);
            this.task = task;
            this.location = location;
        }

        public Task getTask() {
            return this.task;
        }

        @Override
        public TaskLocation getLocation() {
            return this.location;
        }

        @Override
        public String getTaskType() {
            return this.task.getType();
        }

        @Override
        public String getDataSource() {
            return this.task.getDataSource();
        }
    }
}

