/*
 * Decompiled with CFR 0.152.
 */
package com.netflix.conductor.client.task;

import com.netflix.appinfo.InstanceInfo;
import com.netflix.conductor.client.http.TaskClient;
import com.netflix.conductor.client.task.WorkflowTaskMetrics;
import com.netflix.conductor.client.worker.Worker;
import com.netflix.conductor.common.metadata.tasks.Task;
import com.netflix.conductor.common.metadata.tasks.TaskResult;
import com.netflix.discovery.EurekaClient;
import com.netflix.servo.monitor.Stopwatch;
import java.util.Arrays;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class WorkflowTaskCoordinator {
    private static final Logger logger = LoggerFactory.getLogger(WorkflowTaskCoordinator.class);
    private int threadCount;
    private TaskClient client;
    private ExecutorService es;
    private ScheduledExecutorService ses;
    private EurekaClient ec;
    private List<Worker> workers = new LinkedList<Worker>();
    private int pollInterval = 1000;
    private int sleepWhenRetry = 500;
    private int updateRetryCount = 3;
    private int workerQueueSize = 100;

    public WorkflowTaskCoordinator(EurekaClient ec, TaskClient client, int threadCount, Worker ... taskWorkers) {
        this(ec, client, threadCount, Arrays.asList(taskWorkers));
    }

    public WorkflowTaskCoordinator(EurekaClient ec, TaskClient client, int threadCount, Iterable<Worker> taskWorkers) {
        this.ec = ec;
        this.client = client;
        this.threadCount = threadCount;
        for (Worker worker : taskWorkers) {
            this.registerWorker(worker);
        }
    }

    public WorkflowTaskCoordinator withPollInterval(int pollInterval) {
        this.pollInterval = pollInterval;
        return this;
    }

    public WorkflowTaskCoordinator withSleepWhenRetry(int sleepWhenRetry) {
        this.sleepWhenRetry = sleepWhenRetry;
        return this;
    }

    public WorkflowTaskCoordinator withUpdateRetryCount(int updateRetryCount) {
        this.updateRetryCount = updateRetryCount;
        return this;
    }

    public WorkflowTaskCoordinator withWorkerQueueSize(int workerQueueSize) {
        this.workerQueueSize = workerQueueSize;
        return this;
    }

    public synchronized void init() {
        final AtomicInteger count = new AtomicInteger(0);
        this.es = new ThreadPoolExecutor(this.threadCount, this.threadCount, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(this.workerQueueSize), new ThreadFactory(){

            @Override
            public Thread newThread(Runnable r) {
                Thread t = new Thread(r);
                t.setName("workflow-worker-" + count.getAndIncrement());
                return t;
            }
        });
        this.ses = Executors.newScheduledThreadPool(this.workers.size());
        this.workers.forEach(worker -> this.ses.scheduleWithFixedDelay(() -> this.pollForTask((Worker)worker), this.pollInterval, this.pollInterval, TimeUnit.MILLISECONDS));
    }

    public void registerWorker(Worker worker) {
        this.workers.add(worker);
        ++this.threadCount;
    }

    private void pollForTask(Worker worker) {
        if (this.ec != null && !this.ec.getInstanceRemoteStatus().equals((Object)InstanceInfo.InstanceStatus.UP)) {
            logger.debug("Instance is NOT UP in discovery - will not poll");
            return;
        }
        if (worker.paused()) {
            WorkflowTaskMetrics.paused(worker.getTaskDefName());
            logger.debug("Worker {} has been paused. Not polling anymore!", worker.getClass());
            return;
        }
        logger.debug("Polling {}, count = {} timeout = {} ms", new Object[]{worker.getTaskDefName(), worker.getPollCount(), worker.getLongPollTimeoutInMS()});
        try {
            String taskType = worker.getTaskDefName();
            Stopwatch sw = WorkflowTaskMetrics.pollTimer(worker.getTaskDefName());
            List<Task> tasks = this.client.poll(taskType, worker.getIdentity(), worker.getPollCount(), worker.getLongPollTimeoutInMS());
            sw.stop();
            logger.debug("Polled {} and receivd {} tasks", (Object)worker.getTaskDefName(), (Object)tasks.size());
            for (Task task : tasks) {
                this.es.submit(() -> this.execute(worker, task));
            }
        }
        catch (RejectedExecutionException qfe) {
            WorkflowTaskMetrics.queueFull(worker.getTaskDefName());
            logger.error("Execution queue is full", (Throwable)qfe);
        }
        catch (Exception e) {
            WorkflowTaskMetrics.pollingException(worker.getTaskDefName(), e);
            logger.error("Error when pollig for task", (Throwable)e);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void execute(Worker worker, Task task) {
        String taskType = task.getTaskDefName();
        if (!taskType.equals(task.getTaskType())) {
            logger.error("Queue name '{}' did not match type of task retrieved '{}' for task id '{}'.", new Object[]{taskType, task.getTaskType(), task.getTaskId()});
            return;
        }
        try {
            if (!this.client.ack(task.getTaskId(), worker.getIdentity()).booleanValue()) {
                WorkflowTaskMetrics.ackFailed(worker.getTaskDefName());
                logger.error("Ack failed for {}, id {}", (Object)taskType, (Object)task.getTaskId());
                return;
            }
        }
        catch (Exception e) {
            logger.error("ack exception for " + worker.getTaskDefName(), (Throwable)e);
            WorkflowTaskMetrics.ackException(worker.getTaskDefName(), e);
            return;
        }
        Task updatedTask = task;
        Stopwatch sw = WorkflowTaskMetrics.executionTimer(worker.getTaskDefName());
        try {
            logger.debug("Executing task {} on worker {}", (Object)task, (Object)worker.getClass().getSimpleName());
            updatedTask = worker.execute(task);
        }
        catch (Exception e) {
            WorkflowTaskMetrics.executionException(worker.getTaskDefName(), e);
            logger.error("Unable to execute task {}", (Object)task, (Object)e);
            updatedTask.setStatus(TaskResult.Status.FAILED);
            updatedTask.setStatus(Task.Status.FAILED);
            updatedTask.setReasonForIncompletion("Error while executing the task: " + e);
        }
        finally {
            sw.stop();
        }
        logger.debug("Task {} executed by worker {} with status {}", new Object[]{task.getTaskId(), worker.getClass().getSimpleName(), task.getStatus()});
        this.updateWithRetry(this.updateRetryCount, task, worker);
    }

    private void updateWithRetry(int count, Task task, Worker worker) {
        if (count < 0) {
            worker.onErrorUpdate(task);
            return;
        }
        try {
            this.client.updateTask(task);
            return;
        }
        catch (Throwable t) {
            WorkflowTaskMetrics.updateTaskError(worker.getTaskDefName(), t);
            logger.error("Unable to update {} on count {}", new Object[]{task, count, t});
            try {
                Thread.sleep(this.sleepWhenRetry);
                this.updateWithRetry(--count, task, worker);
            }
            catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
            return;
        }
    }
}

