/*
 * Decompiled with CFR 0.152.
 */
package com.netflix.conductor.client.task;

import com.google.common.base.Stopwatch;
import com.netflix.appinfo.InstanceInfo;
import com.netflix.conductor.client.http.TaskClient;
import com.netflix.conductor.client.task.WorkflowTaskMetrics;
import com.netflix.conductor.client.worker.PropertyFactory;
import com.netflix.conductor.client.worker.Worker;
import com.netflix.conductor.common.metadata.tasks.Task;
import com.netflix.conductor.common.metadata.tasks.TaskResult;
import com.netflix.conductor.common.utils.RetryUtil;
import com.netflix.discovery.EurekaClient;
import java.io.PrintWriter;
import java.io.StringWriter;
import java.util.Arrays;
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class WorkflowTaskCoordinator {
    private static final Logger logger = LoggerFactory.getLogger(WorkflowTaskCoordinator.class);
    private TaskClient taskClient;
    private ExecutorService executorService;
    private ScheduledExecutorService scheduledExecutorService;
    private EurekaClient eurekaClient;
    private List<Worker> workers = new LinkedList<Worker>();
    private int sleepWhenRetry;
    private int updateRetryCount;
    private int workerQueueSize;
    private LinkedBlockingQueue<Runnable> workerQueue;
    private int threadCount;
    private String workerNamePrefix;
    private static final String DOMAIN = "domain";
    private static final String ALL_WORKERS = "all";
    private static final long SHUTDOWN_WAIT_TIME_IN_SEC = 10L;

    public WorkflowTaskCoordinator(EurekaClient eurekaClient, TaskClient taskClient, int threadCount, int sleepWhenRetry, int updateRetryCount, int workerQueueSize, Iterable<Worker> taskWorkers, String workerNamePrefix) {
        this.eurekaClient = eurekaClient;
        this.taskClient = taskClient;
        this.threadCount = threadCount;
        this.sleepWhenRetry = sleepWhenRetry;
        this.updateRetryCount = updateRetryCount;
        this.workerQueueSize = workerQueueSize;
        this.workerNamePrefix = workerNamePrefix;
        taskWorkers.forEach(this.workers::add);
    }

    public synchronized void init() {
        if (this.threadCount == -1) {
            this.threadCount = this.workers.size();
        }
        logger.info("Initialized the worker with {} threads", (Object)this.threadCount);
        this.workerQueue = new LinkedBlockingQueue(this.workerQueueSize);
        AtomicInteger count = new AtomicInteger(0);
        this.executorService = new ThreadPoolExecutor(this.threadCount, this.threadCount, 0L, TimeUnit.MILLISECONDS, this.workerQueue, runnable -> {
            Thread thread = new Thread(runnable);
            thread.setName(this.workerNamePrefix + count.getAndIncrement());
            return thread;
        });
        this.scheduledExecutorService = Executors.newScheduledThreadPool(this.workers.size());
        this.workers.forEach(worker -> this.scheduledExecutorService.scheduleWithFixedDelay(() -> this.pollForTask((Worker)worker), worker.getPollingInterval(), worker.getPollingInterval(), TimeUnit.MILLISECONDS));
    }

    public void shutdown() {
        this.scheduledExecutorService.shutdown();
        this.executorService.shutdown();
        this.shutdownExecutorService(this.scheduledExecutorService, 10L);
        this.shutdownExecutorService(this.executorService, 10L);
    }

    private void shutdownExecutorService(ExecutorService executorService, long timeout) {
        try {
            if (executorService.awaitTermination(timeout, TimeUnit.SECONDS)) {
                logger.debug("tasks completed, shutting down");
            } else {
                logger.warn(String.format("forcing shutdown after waiting for %s second", timeout));
                this.scheduledExecutorService.shutdownNow();
            }
        }
        catch (InterruptedException ie) {
            logger.warn("shutdown interrupted, invoking shutdownNow");
            executorService.shutdownNow();
            Thread.currentThread().interrupt();
        }
    }

    private void pollForTask(Worker worker) {
        if (this.eurekaClient != null && !this.eurekaClient.getInstanceRemoteStatus().equals((Object)InstanceInfo.InstanceStatus.UP)) {
            logger.debug("Instance is NOT UP in discovery - will not poll");
            return;
        }
        if (worker.paused()) {
            WorkflowTaskMetrics.incrementTaskPausedCount(worker.getTaskDefName());
            logger.debug("Worker {} has been paused. Not polling anymore!", worker.getClass());
            return;
        }
        String domain = Optional.ofNullable(PropertyFactory.getString(worker.getTaskDefName(), DOMAIN, null)).orElse(PropertyFactory.getString(ALL_WORKERS, DOMAIN, null));
        logger.debug("Polling {}, domain={}, count = {} timeout = {} ms", new Object[]{worker.getTaskDefName(), domain, worker.getPollCount(), worker.getLongPollTimeoutInMS()});
        List tasks = Collections.emptyList();
        try {
            int realPollCount = Math.min(this.workerQueue.remainingCapacity(), worker.getPollCount());
            if (realPollCount <= 0) {
                logger.warn("All workers are busy, not polling. queue size = {}, max = {}", (Object)this.workerQueue.size(), (Object)this.workerQueueSize);
                return;
            }
            String taskType = worker.getTaskDefName();
            tasks = (List)WorkflowTaskMetrics.getPollTimer(taskType).record(() -> this.taskClient.batchPollTasksInDomain(taskType, domain, worker.getIdentity(), realPollCount, worker.getLongPollTimeoutInMS()));
            WorkflowTaskMetrics.incrementTaskPollCount(taskType, tasks.size());
            logger.debug("Polled {}, domain {}, received {} tasks in worker - {}", new Object[]{worker.getTaskDefName(), domain, tasks.size(), worker.getIdentity()});
        }
        catch (Exception e) {
            WorkflowTaskMetrics.incrementTaskPollErrorCount(worker.getTaskDefName(), e);
            logger.error("Error when polling for tasks", (Throwable)e);
        }
        for (Task task : tasks) {
            try {
                this.executorService.submit(() -> {
                    try {
                        logger.debug("Executing task {}, taskId - {} in worker - {}", new Object[]{task.getTaskDefName(), task.getTaskId(), worker.getIdentity()});
                        this.execute(worker, task);
                    }
                    catch (Throwable t) {
                        task.setStatus(Task.Status.FAILED);
                        TaskResult result = new TaskResult(task);
                        this.handleException(t, result, worker, task);
                    }
                });
            }
            catch (RejectedExecutionException e) {
                WorkflowTaskMetrics.incrementTaskExecutionQueueFullCount(worker.getTaskDefName());
                logger.error("Execution queue is full, returning task: {}", (Object)task.getTaskId(), (Object)e);
                this.returnTask(worker, task);
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void execute(Worker worker, Task task) {
        String taskType = task.getTaskDefName();
        try {
            if (!worker.preAck(task)) {
                logger.debug("Worker decided not to ack the task {}, taskId = {}", (Object)taskType, (Object)task.getTaskId());
                return;
            }
            if (!this.taskClient.ack(task.getTaskId(), worker.getIdentity()).booleanValue()) {
                WorkflowTaskMetrics.incrementTaskAckFailedCount(worker.getTaskDefName());
                return;
            }
            logger.debug("Ack successful for {}, taskId = {}", (Object)taskType, (Object)task.getTaskId());
        }
        catch (Exception e) {
            logger.error(String.format("ack exception for task %s, taskId = %s in worker - %s", task.getTaskDefName(), task.getTaskId(), worker.getIdentity()), (Throwable)e);
            WorkflowTaskMetrics.incrementTaskAckErrorCount(worker.getTaskDefName(), e);
            return;
        }
        Stopwatch stopwatch = Stopwatch.createStarted();
        TaskResult result = null;
        try {
            logger.debug("Executing task {} in worker {} at {}", new Object[]{task, worker.getClass().getSimpleName(), worker.getIdentity()});
            result = worker.execute(task);
            result.setWorkflowInstanceId(task.getWorkflowInstanceId());
            result.setTaskId(task.getTaskId());
            result.setWorkerId(worker.getIdentity());
        }
        catch (Exception e) {
            logger.error("Unable to execute task {}", (Object)task, (Object)e);
            if (result == null) {
                task.setStatus(Task.Status.FAILED);
                result = new TaskResult(task);
            }
            this.handleException(e, result, worker, task);
        }
        finally {
            stopwatch.stop();
            WorkflowTaskMetrics.getExecutionTimer(worker.getTaskDefName()).record(stopwatch.elapsed(TimeUnit.MILLISECONDS), TimeUnit.MILLISECONDS);
        }
        logger.debug("Task {} executed by worker {} at {} with status {}", new Object[]{task.getTaskId(), worker.getClass().getSimpleName(), worker.getIdentity(), task.getStatus()});
        this.updateWithRetry(this.updateRetryCount, task, result, worker);
    }

    public int getThreadCount() {
        return this.threadCount;
    }

    public int getWorkerQueueSize() {
        return this.workerQueueSize;
    }

    public int getSleepWhenRetry() {
        return this.sleepWhenRetry;
    }

    public int getUpdateRetryCount() {
        return this.updateRetryCount;
    }

    public String getWorkerNamePrefix() {
        return this.workerNamePrefix;
    }

    private void updateWithRetry(int count, Task task, TaskResult result, Worker worker) {
        try {
            String description = String.format("Retry updating task result: %s for task: %s in worker: %s", result.toString(), task.getTaskDefName(), worker.getIdentity());
            String methodName = "updateWithRetry";
            new RetryUtil().retryOnException(() -> {
                this.taskClient.updateTask(result, task.getTaskType());
                return null;
            }, null, null, count, description, methodName);
        }
        catch (Exception e) {
            worker.onErrorUpdate(task);
            WorkflowTaskMetrics.incrementTaskUpdateErrorCount(worker.getTaskDefName(), e);
            logger.error(String.format("Failed to update result: %s for task: %s in worker: %s", result.toString(), task.getTaskDefName(), worker.getIdentity()), (Throwable)e);
        }
    }

    private void handleException(Throwable t, TaskResult result, Worker worker, Task task) {
        logger.error(String.format("Error while executing task %s", task.toString()), t);
        WorkflowTaskMetrics.incrementTaskExecutionErrorCount(worker.getTaskDefName(), t);
        result.setStatus(TaskResult.Status.FAILED);
        result.setReasonForIncompletion("Error while executing the task: " + t);
        StringWriter stringWriter = new StringWriter();
        t.printStackTrace(new PrintWriter(stringWriter));
        result.log(stringWriter.toString());
        this.updateWithRetry(this.updateRetryCount, task, result, worker);
    }

    private void returnTask(Worker worker, Task task) {
        logger.warn("Returning task {} back to conductor", (Object)task.getTaskId());
        this.updateWithRetry(this.updateRetryCount, task, new TaskResult(task), worker);
    }

    public static class Builder {
        private String workerNamePrefix = "workflow-worker-";
        private int sleepWhenRetry = 500;
        private int updateRetryCount = 3;
        private int workerQueueSize = 100;
        private int threadCount = -1;
        private Iterable<Worker> taskWorkers;
        private EurekaClient eurekaClient;
        private TaskClient taskClient;

        public Builder withWorkerNamePrefix(String workerNamePrefix) {
            this.workerNamePrefix = workerNamePrefix;
            return this;
        }

        public Builder withSleepWhenRetry(int sleepWhenRetry) {
            this.sleepWhenRetry = sleepWhenRetry;
            return this;
        }

        public Builder withUpdateRetryCount(int updateRetryCount) {
            this.updateRetryCount = updateRetryCount;
            return this;
        }

        public Builder withWorkerQueueSize(int workerQueueSize) {
            this.workerQueueSize = workerQueueSize;
            return this;
        }

        public Builder withThreadCount(int threadCount) {
            if (threadCount < 1) {
                throw new IllegalArgumentException("No. of threads cannot be less than 1");
            }
            this.threadCount = threadCount;
            return this;
        }

        public Builder withTaskClient(TaskClient client) {
            this.taskClient = client;
            return this;
        }

        public Builder withEurekaClient(EurekaClient eurekaClient) {
            this.eurekaClient = eurekaClient;
            return this;
        }

        public Builder withWorkers(Iterable<Worker> taskWorkers) {
            this.taskWorkers = taskWorkers;
            return this;
        }

        public Builder withWorkers(Worker ... taskWorkers) {
            this.taskWorkers = Arrays.asList(taskWorkers);
            return this;
        }

        public WorkflowTaskCoordinator build() {
            if (this.taskWorkers == null) {
                throw new IllegalArgumentException("No task workers are specified. use withWorkers() to add one mor more task workers");
            }
            if (this.taskClient == null) {
                throw new IllegalArgumentException("No TaskClient provided. use withTaskClient() to provide one");
            }
            return new WorkflowTaskCoordinator(this.eurekaClient, this.taskClient, this.threadCount, this.sleepWhenRetry, this.updateRetryCount, this.workerQueueSize, this.taskWorkers, this.workerNamePrefix);
        }
    }
}

