/*
 * Decompiled with CFR 0.152.
 */
package org.apache.druid.indexing.common.task.batch.parallel;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture;
import com.google.errorprone.annotations.concurrent.GuardedBy;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.apache.druid.client.indexing.IndexingServiceClient;
import org.apache.druid.client.indexing.TaskStatusResponse;
import org.apache.druid.indexer.TaskState;
import org.apache.druid.indexer.TaskStatusPlus;
import org.apache.druid.indexing.common.task.Task;
import org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexingPhaseProgress;
import org.apache.druid.indexing.common.task.batch.parallel.SubTaskReport;
import org.apache.druid.indexing.common.task.batch.parallel.SubTaskSpec;
import org.apache.druid.indexing.common.task.batch.parallel.TaskHistory;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.java.util.common.logger.Logger;

public class TaskMonitor<T extends Task, SubTaskReportType extends SubTaskReport> {
    private static final Logger log = new Logger(TaskMonitor.class);
    private final ScheduledExecutorService taskStatusChecker = Execs.scheduledSingleThreaded((String)"task-monitor-%d");
    private final ConcurrentMap<String, MonitorEntry> runningTasks = new ConcurrentHashMap<String, MonitorEntry>();
    private final ConcurrentMap<String, TaskHistory<T>> taskHistories = new ConcurrentHashMap<String, TaskHistory<T>>();
    private final ConcurrentHashMap<String, SubTaskReportType> reportsMap = new ConcurrentHashMap();
    private final Object taskCountLock = new Object();
    private final Object startStopLock = new Object();
    private final IndexingServiceClient indexingServiceClient;
    private final int maxRetry;
    private final int estimatedNumSucceededTasks;
    @GuardedBy(value="taskCountLock")
    private int numRunningTasks;
    @GuardedBy(value="taskCountLock")
    private int numSucceededTasks;
    @GuardedBy(value="taskCountLock")
    private int numFailedTasks;
    private int numCanceledTasks;
    @GuardedBy(value="startStopLock")
    private boolean running = false;

    TaskMonitor(IndexingServiceClient indexingServiceClient, int maxRetry, int estimatedNumSucceededTasks) {
        this.indexingServiceClient = (IndexingServiceClient)Preconditions.checkNotNull((Object)indexingServiceClient, (Object)"indexingServiceClient");
        this.maxRetry = maxRetry;
        this.estimatedNumSucceededTasks = estimatedNumSucceededTasks;
        log.info("TaskMonitor is initialized with estimatedNumSucceededTasks[%d]", new Object[]{estimatedNumSucceededTasks});
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void start(long taskStatusCheckingPeriod) {
        Object object = this.startStopLock;
        synchronized (object) {
            this.running = true;
            log.info("Starting taskMonitor", new Object[0]);
            this.taskStatusChecker.scheduleAtFixedRate(() -> {
                try {
                    Iterator iterator = this.runningTasks.entrySet().iterator();
                    block7: while (iterator.hasNext()) {
                        Map.Entry entry = iterator.next();
                        String specId = (String)entry.getKey();
                        MonitorEntry monitorEntry = (MonitorEntry)entry.getValue();
                        String taskId = monitorEntry.runningTask.getId();
                        TaskStatusResponse taskStatusResponse = this.indexingServiceClient.getTaskStatus(taskId);
                        TaskStatusPlus taskStatus = taskStatusResponse.getStatus();
                        if (taskStatus == null) continue;
                        switch ((TaskState)Preconditions.checkNotNull((Object)taskStatus.getStatusCode(), (Object)"taskState")) {
                            case SUCCESS: {
                                if (!this.reportsMap.containsKey(taskId)) {
                                    throw new ISE("Missing reports from task[%s]!", new Object[]{taskId});
                                }
                                this.incrementNumSucceededTasks();
                                monitorEntry.setLastStatus(taskStatus);
                                iterator.remove();
                                continue block7;
                            }
                            case FAILED: {
                                this.reportsMap.remove(taskId);
                                this.incrementNumFailedTasks();
                                log.warn("task[%s] failed!", new Object[]{taskId});
                                if (monitorEntry.numTries() < this.maxRetry) {
                                    log.info("We still have more chances[%d/%d] to process the spec[%s].", new Object[]{monitorEntry.numTries(), this.maxRetry, monitorEntry.spec.getId()});
                                    this.retry(specId, monitorEntry, taskStatus);
                                    continue block7;
                                }
                                log.error("spec[%s] failed after [%d] tries", new Object[]{monitorEntry.spec.getId(), monitorEntry.numTries()});
                                monitorEntry.setLastStatus(taskStatus);
                                iterator.remove();
                                continue block7;
                            }
                            case RUNNING: {
                                monitorEntry.updateStatus(taskStatus);
                                continue block7;
                            }
                        }
                        throw new ISE("Unknown taskStatus[%s] for task[%s[", new Object[]{taskStatus.getStatusCode(), taskId});
                    }
                }
                catch (Throwable t) {
                    log.error(t, "Error while monitoring", new Object[0]);
                }
            }, taskStatusCheckingPeriod, taskStatusCheckingPeriod, TimeUnit.MILLISECONDS);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void stop() {
        Object object = this.startStopLock;
        synchronized (object) {
            if (this.running) {
                this.running = false;
                this.taskStatusChecker.shutdownNow();
                Object object2 = this.taskCountLock;
                synchronized (object2) {
                    if (this.numRunningTasks > 0) {
                        Iterator iterator = this.runningTasks.values().iterator();
                        while (iterator.hasNext()) {
                            MonitorEntry entry = (MonitorEntry)iterator.next();
                            iterator.remove();
                            String taskId = entry.runningTask.getId();
                            log.info("Request to cancel subtask[%s]", new Object[]{taskId});
                            this.indexingServiceClient.cancelTask(taskId);
                            --this.numRunningTasks;
                            ++this.numCanceledTasks;
                        }
                        if (this.numRunningTasks > 0) {
                            log.warn("Inconsistent state: numRunningTasks[%d] is still not zero after trying to cancel all running tasks.", new Object[]{this.numRunningTasks});
                        }
                    }
                }
                log.info("Stopped taskMonitor", new Object[0]);
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public ListenableFuture<SubTaskCompleteEvent<T>> submit(SubTaskSpec<T> spec) {
        Object object = this.startStopLock;
        synchronized (object) {
            if (!this.running) {
                return Futures.immediateFailedFuture((Throwable)new ISE("TaskMonitor is not running", new Object[0]));
            }
            T task = this.submitTask(spec, 0);
            log.info("Submitted a new task[%s] for spec[%s]", new Object[]{task.getId(), spec.getId()});
            this.incrementNumRunningTasks();
            SettableFuture taskFuture = SettableFuture.create();
            this.runningTasks.put(spec.getId(), new MonitorEntry(this, spec, (Task)task, this.indexingServiceClient.getTaskStatus(task.getId()).getStatus(), taskFuture));
            return taskFuture;
        }
    }

    public void collectReport(SubTaskReportType report) {
        this.reportsMap.compute(report.getTaskId(), (taskId, prevReport) -> {
            if (prevReport != null) {
                log.warn("Received duplicate report for task [%s]", new Object[]{taskId});
                Preconditions.checkState((boolean)prevReport.equals(report), (String)"task[%s] sent two or more reports and previous report[%s] is different from the current one[%s]", (Object[])new Object[]{taskId, prevReport, report});
            }
            return report;
        });
    }

    public Map<String, SubTaskReportType> getReports() {
        return this.reportsMap;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void retry(String subTaskSpecId, MonitorEntry monitorEntry, TaskStatusPlus lastFailedTaskStatus) {
        Object object = this.startStopLock;
        synchronized (object) {
            if (this.running) {
                SubTaskSpec spec = monitorEntry.spec;
                T task = this.submitTask(spec, monitorEntry.taskHistory.size() + 1);
                log.info("Submitted a new task[%s] for retrying spec[%s]", new Object[]{task.getId(), spec.getId()});
                this.incrementNumRunningTasks();
                this.runningTasks.put(subTaskSpecId, monitorEntry.withNewRunningTask(task, this.indexingServiceClient.getTaskStatus(task.getId()).getStatus(), lastFailedTaskStatus));
            }
        }
    }

    private T submitTask(SubTaskSpec<T> spec, int numAttempts) {
        T task = spec.newSubTask(numAttempts);
        try {
            this.indexingServiceClient.runTask(task.getId(), task);
        }
        catch (Exception e) {
            if (this.isUnknownTypeIdException(e)) {
                log.warn((Throwable)e, "Got an unknown type id error. Retrying with a backward compatible type.", new Object[0]);
                task = spec.newSubTaskWithBackwardCompatibleType(numAttempts);
                this.indexingServiceClient.runTask(task.getId(), task);
            }
            throw e;
        }
        return task;
    }

    private boolean isUnknownTypeIdException(Throwable e) {
        if (e instanceof IllegalStateException && e.getMessage() != null && e.getMessage().contains("Could not resolve type id")) {
            return true;
        }
        if (e.getCause() != null) {
            return this.isUnknownTypeIdException(e.getCause());
        }
        return false;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void incrementNumRunningTasks() {
        Object object = this.taskCountLock;
        synchronized (object) {
            ++this.numRunningTasks;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void incrementNumSucceededTasks() {
        Object object = this.taskCountLock;
        synchronized (object) {
            --this.numRunningTasks;
            ++this.numSucceededTasks;
            log.info("[%d/%d] tasks succeeded", new Object[]{this.numSucceededTasks, this.estimatedNumSucceededTasks});
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void incrementNumFailedTasks() {
        Object object = this.taskCountLock;
        synchronized (object) {
            --this.numRunningTasks;
            ++this.numFailedTasks;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    int getNumSucceededTasks() {
        Object object = this.taskCountLock;
        synchronized (object) {
            return this.numSucceededTasks;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    int getNumRunningTasks() {
        Object object = this.taskCountLock;
        synchronized (object) {
            return this.numRunningTasks;
        }
    }

    @VisibleForTesting
    int getNumCanceledTasks() {
        return this.numCanceledTasks;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    ParallelIndexingPhaseProgress getProgress() {
        Object object = this.taskCountLock;
        synchronized (object) {
            return new ParallelIndexingPhaseProgress(this.numRunningTasks, this.numSucceededTasks, this.numFailedTasks, this.numSucceededTasks + this.numFailedTasks, this.numRunningTasks + this.numSucceededTasks + this.numFailedTasks, this.estimatedNumSucceededTasks);
        }
    }

    Set<String> getRunningTaskIds() {
        return this.runningTasks.values().stream().map(entry -> ((MonitorEntry)entry).runningTask.getId()).collect(Collectors.toSet());
    }

    List<SubTaskSpec<T>> getRunningSubTaskSpecs() {
        return this.runningTasks.values().stream().map(monitorEntry -> ((MonitorEntry)monitorEntry).spec).collect(Collectors.toList());
    }

    @Nullable
    MonitorEntry getRunningTaskMonitorEntry(String subTaskSpecId) {
        return this.runningTasks.values().stream().filter(monitorEntry -> ((MonitorEntry)monitorEntry).spec.getId().equals(subTaskSpecId)).findFirst().orElse(null);
    }

    List<SubTaskSpec<T>> getCompleteSubTaskSpecs() {
        return this.taskHistories.values().stream().map(TaskHistory::getSpec).collect(Collectors.toList());
    }

    @Nullable
    TaskHistory<T> getCompleteSubTaskSpecHistory(String subTaskSpecId) {
        return (TaskHistory)this.taskHistories.get(subTaskSpecId);
    }

    static class SubTaskCompleteEvent<T extends Task> {
        private final SubTaskSpec<T> spec;
        @Nullable
        private final TaskStatusPlus lastStatus;
        @Nullable
        private final Throwable throwable;

        static <T extends Task> SubTaskCompleteEvent<T> success(SubTaskSpec<T> spec, TaskStatusPlus lastStatus) {
            return new SubTaskCompleteEvent<T>(spec, (TaskStatusPlus)Preconditions.checkNotNull((Object)lastStatus, (Object)"lastStatus"), null);
        }

        static <T extends Task> SubTaskCompleteEvent<T> fail(SubTaskSpec<T> spec, Throwable t) {
            return new SubTaskCompleteEvent<T>(spec, null, t);
        }

        private SubTaskCompleteEvent(SubTaskSpec<T> spec, @Nullable TaskStatusPlus lastStatus, @Nullable Throwable throwable) {
            this.spec = (SubTaskSpec)Preconditions.checkNotNull(spec, (Object)"spec");
            this.lastStatus = lastStatus;
            this.throwable = throwable;
        }

        SubTaskSpec<T> getSpec() {
            return this.spec;
        }

        TaskState getLastState() {
            return this.lastStatus == null ? TaskState.FAILED : this.lastStatus.getStatusCode();
        }

        @Nullable
        TaskStatusPlus getLastStatus() {
            return this.lastStatus;
        }

        @Nullable
        Throwable getThrowable() {
            return this.throwable;
        }
    }

    static class MonitorEntry {
        private final SubTaskSpec<T> spec;
        private final T runningTask;
        private final CopyOnWriteArrayList<TaskStatusPlus> taskHistory;
        private final SettableFuture<SubTaskCompleteEvent<T>> completeEventFuture;
        @Nullable
        private volatile TaskStatusPlus runningStatus;
        final /* synthetic */ TaskMonitor this$0;

        private MonitorEntry(SubTaskSpec<T> spec, @Nullable T runningTask, TaskStatusPlus runningStatus, SettableFuture<SubTaskCompleteEvent<T>> completeEventFuture) {
            this(this$0, spec, (Task)runningTask, runningStatus, new CopyOnWriteArrayList(), completeEventFuture);
        }

        private MonitorEntry(SubTaskSpec<T> spec, @Nullable T runningTask, TaskStatusPlus runningStatus, CopyOnWriteArrayList<TaskStatusPlus> taskHistory, SettableFuture<SubTaskCompleteEvent<T>> completeEventFuture) {
            this.this$0 = this$0;
            this.spec = spec;
            this.runningTask = runningTask;
            this.runningStatus = runningStatus;
            this.taskHistory = taskHistory;
            this.completeEventFuture = completeEventFuture;
        }

        MonitorEntry withNewRunningTask(T newTask, @Nullable TaskStatusPlus newStatus, TaskStatusPlus statusOfLastTask) {
            this.taskHistory.add(statusOfLastTask);
            return new MonitorEntry(this.this$0, this.spec, newTask, newStatus, this.taskHistory, this.completeEventFuture);
        }

        int numTries() {
            return this.taskHistory.size() + 1;
        }

        void updateStatus(TaskStatusPlus statusPlus) {
            if (!this.runningTask.getId().equals(statusPlus.getId())) {
                throw new ISE("Task id[%s] of lastStatus is different from the running task[%s]", new Object[]{statusPlus.getId(), this.runningTask.getId()});
            }
            this.runningStatus = statusPlus;
        }

        void setLastStatus(TaskStatusPlus lastStatus) {
            if (!this.runningTask.getId().equals(lastStatus.getId())) {
                throw new ISE("Task id[%s] of lastStatus is different from the running task[%s]", new Object[]{lastStatus.getId(), this.runningTask.getId()});
            }
            this.runningStatus = lastStatus;
            this.taskHistory.add(lastStatus);
            this.this$0.taskHistories.put(this.spec.getId(), new TaskHistory(this.spec, this.taskHistory));
            this.completeEventFuture.set(SubTaskCompleteEvent.success(this.spec, lastStatus));
        }

        SubTaskSpec<T> getSpec() {
            return this.spec;
        }

        @Nullable
        TaskStatusPlus getRunningStatus() {
            return this.runningStatus;
        }

        List<TaskStatusPlus> getTaskHistory() {
            return this.taskHistory;
        }
    }
}

