/*
 * Decompiled with CFR 0.152.
 */
package com.netflix.conductor.client.task;

import com.netflix.appinfo.InstanceInfo;
import com.netflix.conductor.client.http.TaskClient;
import com.netflix.conductor.client.task.TaskLogger;
import com.netflix.conductor.client.task.WorkflowTaskMetrics;
import com.netflix.conductor.client.worker.PropertyFactory;
import com.netflix.conductor.client.worker.Worker;
import com.netflix.conductor.common.metadata.tasks.Task;
import com.netflix.conductor.common.metadata.tasks.TaskResult;
import com.netflix.discovery.EurekaClient;
import com.netflix.servo.monitor.Stopwatch;
import java.io.PrintWriter;
import java.io.StringWriter;
import java.util.Arrays;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class WorkflowTaskCoordinator {
    private static final Logger logger = LoggerFactory.getLogger(WorkflowTaskCoordinator.class);
    private TaskClient client;
    private ExecutorService es;
    private ScheduledExecutorService ses;
    private EurekaClient ec;
    private List<Worker> workers = new LinkedList<Worker>();
    private int sleepWhenRetry;
    private int updateRetryCount;
    private int workerQueueSize;
    private int threadCount;
    private static final String DOMAIN = "domain";
    private static final String ALL_WORKERS = "all";

    public WorkflowTaskCoordinator(EurekaClient ec, TaskClient client, int threadCount, int sleepWhenRetry, int updateRetryCount, int workerQueueSize, Iterable<Worker> taskWorkers) {
        this.ec = ec;
        this.client = client;
        this.threadCount = threadCount;
        this.sleepWhenRetry = sleepWhenRetry;
        this.updateRetryCount = updateRetryCount;
        this.workerQueueSize = workerQueueSize;
        for (Worker worker : taskWorkers) {
            this.workers.add(worker);
        }
        TaskLogger.client = client;
    }

    public synchronized void init() {
        if (this.threadCount == -1) {
            this.threadCount = this.workers.size();
        }
        logger.info("Initialized the worker with {} threads", (Object)this.threadCount);
        final AtomicInteger count = new AtomicInteger(0);
        this.es = new ThreadPoolExecutor(this.threadCount, this.threadCount, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(this.workerQueueSize), new ThreadFactory(){

            @Override
            public Thread newThread(Runnable r) {
                Thread t = new Thread(r);
                t.setName("workflow-worker-" + count.getAndIncrement());
                return t;
            }
        });
        this.ses = Executors.newScheduledThreadPool(this.workers.size());
        this.workers.forEach(worker -> this.ses.scheduleWithFixedDelay(() -> this.pollForTask((Worker)worker), worker.getPollingInterval(), worker.getPollingInterval(), TimeUnit.MILLISECONDS));
    }

    private void pollForTask(Worker worker) {
        if (this.ec != null && !this.ec.getInstanceRemoteStatus().equals((Object)InstanceInfo.InstanceStatus.UP)) {
            logger.debug("Instance is NOT UP in discovery - will not poll");
            return;
        }
        if (worker.paused()) {
            WorkflowTaskMetrics.paused(worker.getTaskDefName());
            logger.debug("Worker {} has been paused. Not polling anymore!", worker.getClass());
            return;
        }
        String domain = PropertyFactory.getString(worker.getTaskDefName(), DOMAIN, null);
        if (domain == null) {
            domain = PropertyFactory.getString(ALL_WORKERS, DOMAIN, null);
        }
        logger.debug("Polling {}, domain={}, count = {} timeout = {} ms", new Object[]{worker.getTaskDefName(), domain, worker.getPollCount(), worker.getLongPollTimeoutInMS()});
        try {
            String taskType = worker.getTaskDefName();
            Stopwatch sw = WorkflowTaskMetrics.pollTimer(worker.getTaskDefName());
            List<Task> tasks = this.client.poll(taskType, domain, worker.getIdentity(), worker.getPollCount(), worker.getLongPollTimeoutInMS());
            sw.stop();
            logger.debug("Polled {}, for domain {} and receivd {} tasks", new Object[]{worker.getTaskDefName(), domain, tasks.size()});
            for (Task task : tasks) {
                this.es.submit(() -> {
                    TaskLogger.push(task);
                    try {
                        this.execute(worker, task);
                    }
                    catch (Throwable t) {
                        task.setStatus(Task.Status.FAILED);
                        TaskResult result = new TaskResult(task);
                        this.handleException(t, result, worker, true, task);
                    }
                    finally {
                        TaskLogger.remove(task);
                    }
                });
            }
        }
        catch (RejectedExecutionException qfe) {
            WorkflowTaskMetrics.queueFull(worker.getTaskDefName());
            logger.error("Execution queue is full", (Throwable)qfe);
        }
        catch (Exception e) {
            WorkflowTaskMetrics.pollingException(worker.getTaskDefName(), e);
            logger.error("Error when polling for task " + e.getMessage(), (Throwable)e);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void execute(Worker worker, Task task) {
        String taskType = task.getTaskDefName();
        try {
            if (!worker.preAck(task)) {
                logger.debug("Worker {} decided not to ack the task {}", (Object)taskType, (Object)task.getTaskId());
                return;
            }
            if (!this.client.ack(task.getTaskId(), worker.getIdentity()).booleanValue()) {
                WorkflowTaskMetrics.ackFailed(worker.getTaskDefName());
                logger.error("Ack failed for {}, id {}", (Object)taskType, (Object)task.getTaskId());
                return;
            }
        }
        catch (Exception e) {
            logger.error("ack exception for " + worker.getTaskDefName(), (Throwable)e);
            WorkflowTaskMetrics.ackException(worker.getTaskDefName(), e);
            return;
        }
        Stopwatch sw = WorkflowTaskMetrics.executionTimer(worker.getTaskDefName());
        TaskResult result = null;
        try {
            logger.debug("Executing task {} on worker {}", (Object)task, (Object)worker.getClass().getSimpleName());
            result = worker.execute(task);
            result.setWorkflowInstanceId(task.getWorkflowInstanceId());
            result.setTaskId(task.getTaskId());
        }
        catch (Exception e) {
            logger.error("Unable to execute task {}", (Object)task, (Object)e);
            if (result == null) {
                task.setStatus(Task.Status.FAILED);
                result = new TaskResult(task);
            }
            this.handleException(e, result, worker, false, task);
        }
        finally {
            sw.stop();
        }
        logger.debug("Task {} executed by worker {} with status {}", new Object[]{task.getTaskId(), worker.getClass().getSimpleName(), task.getStatus()});
        this.updateWithRetry(this.updateRetryCount, task, result, worker);
    }

    public int getThreadCount() {
        return this.threadCount;
    }

    public int getWorkerQueueSize() {
        return this.workerQueueSize;
    }

    public int getSleepWhenRetry() {
        return this.sleepWhenRetry;
    }

    public int getUpdateRetryCount() {
        return this.updateRetryCount;
    }

    private void updateWithRetry(int count, Task task, TaskResult result, Worker worker) {
        if (count < 0) {
            worker.onErrorUpdate(task);
            return;
        }
        try {
            this.client.updateTask(result);
            return;
        }
        catch (Exception t) {
            WorkflowTaskMetrics.updateTaskError(worker.getTaskDefName(), t);
            logger.error("Unable to update {} on count {}", new Object[]{result, count, t});
            try {
                Thread.sleep(this.sleepWhenRetry);
                this.updateWithRetry(--count, task, result, worker);
            }
            catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
            return;
        }
    }

    private void handleException(Throwable t, TaskResult result, Worker worker, boolean updateTask, Task task) {
        WorkflowTaskMetrics.executionException(worker.getTaskDefName(), t);
        result.setStatus(TaskResult.Status.FAILED);
        result.setReasonForIncompletion("Error while executing the task: " + t);
        StringWriter sw = new StringWriter();
        t.printStackTrace(new PrintWriter(sw));
        TaskLogger.log(sw.toString());
        this.updateWithRetry(this.updateRetryCount, task, result, worker);
    }

    public static class Builder {
        private int sleepWhenRetry = 500;
        private int updateRetryCount = 3;
        private int workerQueueSize = 100;
        private int threadCount = -1;
        private Iterable<Worker> taskWorkers;
        private EurekaClient ec;
        private TaskClient client;

        public Builder withSleepWhenRetry(int sleepWhenRetry) {
            this.sleepWhenRetry = sleepWhenRetry;
            return this;
        }

        public Builder withUpdateRetryCount(int updateRetryCount) {
            this.updateRetryCount = updateRetryCount;
            return this;
        }

        public Builder withWorkerQueueSize(int workerQueueSize) {
            this.workerQueueSize = workerQueueSize;
            return this;
        }

        public Builder withThreadCount(int threadCount) {
            if (threadCount < 1) {
                throw new IllegalArgumentException("No. of threads cannot be less than 1");
            }
            this.threadCount = threadCount;
            return this;
        }

        public Builder withTaskClient(TaskClient client) {
            this.client = client;
            return this;
        }

        public Builder withEurekaClient(EurekaClient ec) {
            this.ec = ec;
            return this;
        }

        public Builder withWorkers(Iterable<Worker> taskWorkers) {
            this.taskWorkers = taskWorkers;
            return this;
        }

        public Builder withWorkers(Worker ... taskWorkers) {
            this.taskWorkers = Arrays.asList(taskWorkers);
            return this;
        }

        public WorkflowTaskCoordinator build() {
            if (this.taskWorkers == null) {
                throw new IllegalArgumentException("No task workers are specified.  use withWorkers() to add one mor more task workers");
            }
            if (this.client == null) {
                throw new IllegalArgumentException("No TaskClient provided.  use withTaskClient() to provide one");
            }
            return new WorkflowTaskCoordinator(this.ec, this.client, this.threadCount, this.sleepWhenRetry, this.updateRetryCount, this.workerQueueSize, this.taskWorkers);
        }
    }
}

