/*
 * Decompiled with CFR 0.152.
 */
package org.apache.dolphinscheduler.server.worker.runner;

import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.DelayQueue;
import javax.annotation.Nullable;
import org.apache.dolphinscheduler.common.lifecycle.ServerLifeCycleManager;
import org.apache.dolphinscheduler.common.thread.ThreadUtils;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContextCacheManager;
import org.apache.dolphinscheduler.server.worker.config.TaskExecuteThreadsFullPolicy;
import org.apache.dolphinscheduler.server.worker.config.WorkerConfig;
import org.apache.dolphinscheduler.server.worker.metrics.WorkerServerMetrics;
import org.apache.dolphinscheduler.server.worker.runner.WorkerDelayTaskExecuteRunnable;
import org.apache.dolphinscheduler.server.worker.runner.WorkerExecService;
import org.apache.dolphinscheduler.server.worker.runner.WorkerTaskExecuteRunnable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;

@Component
public class WorkerManagerThread
implements Runnable {
    private final Logger logger = LoggerFactory.getLogger(WorkerManagerThread.class);
    private final DelayQueue<WorkerDelayTaskExecuteRunnable> waitSubmitQueue;
    private final WorkerExecService workerExecService;
    private final WorkerConfig workerConfig;
    private final int workerExecThreads;
    private final ConcurrentHashMap<Integer, WorkerTaskExecuteRunnable> taskExecuteThreadMap = new ConcurrentHashMap();

    public WorkerManagerThread(WorkerConfig workerConfig) {
        this.workerConfig = workerConfig;
        this.workerExecThreads = workerConfig.getExecThreads();
        this.waitSubmitQueue = new DelayQueue();
        this.workerExecService = new WorkerExecService(ThreadUtils.newDaemonFixedThreadExecutor((String)"Worker-Execute-Thread", (int)workerConfig.getExecThreads()), this.taskExecuteThreadMap);
    }

    @Nullable
    public WorkerTaskExecuteRunnable getTaskExecuteThread(Integer taskInstanceId) {
        return this.taskExecuteThreadMap.get(taskInstanceId);
    }

    public int getWaitSubmitQueueSize() {
        return this.waitSubmitQueue.size();
    }

    public int getThreadPoolQueueSize() {
        return this.workerExecService.getThreadPoolQueueSize();
    }

    public void killTaskBeforeExecuteByInstanceId(Integer taskInstanceId) {
        this.waitSubmitQueue.stream().filter(taskExecuteThread -> taskExecuteThread.getTaskExecutionContext().getTaskInstanceId() == taskInstanceId.intValue()).forEach(this.waitSubmitQueue::remove);
    }

    public boolean offer(WorkerDelayTaskExecuteRunnable workerDelayTaskExecuteRunnable) {
        if (this.workerConfig.getTaskExecuteThreadsFullPolicy() == TaskExecuteThreadsFullPolicy.CONTINUE) {
            return this.waitSubmitQueue.offer(workerDelayTaskExecuteRunnable);
        }
        if (this.waitSubmitQueue.size() > this.workerExecThreads) {
            this.logger.warn("Wait submit queue is full, will retry submit task later");
            WorkerServerMetrics.incWorkerSubmitQueueIsFullCount();
            ThreadUtils.sleep((long)1000L);
            if (this.waitSubmitQueue.size() > this.workerExecThreads) {
                return false;
            }
        }
        return this.waitSubmitQueue.offer(workerDelayTaskExecuteRunnable);
    }

    public void start() {
        this.logger.info("Worker manager thread starting");
        Thread thread = new Thread((Runnable)this, this.getClass().getName());
        thread.setDaemon(true);
        thread.start();
        this.logger.info("Worker manager thread started");
    }

    @Override
    public void run() {
        Thread.currentThread().setName("Worker-Execute-Manager-Thread");
        while (!ServerLifeCycleManager.isStopped()) {
            try {
                if (!ServerLifeCycleManager.isRunning()) {
                    Thread.sleep(1000L);
                }
                if (this.getThreadPoolQueueSize() <= this.workerExecThreads) {
                    WorkerDelayTaskExecuteRunnable workerDelayTaskExecuteRunnable = (WorkerDelayTaskExecuteRunnable)this.waitSubmitQueue.take();
                    this.workerExecService.submit(workerDelayTaskExecuteRunnable);
                    continue;
                }
                WorkerServerMetrics.incWorkerOverloadCount();
                this.logger.info("Exec queue is full, waiting submit queue {}, waiting exec queue size {}", (Object)this.getWaitSubmitQueueSize(), (Object)this.getThreadPoolQueueSize());
                ThreadUtils.sleep((long)1000L);
            }
            catch (Exception e) {
                this.logger.error("An unexpected interrupt is happened, the exception will be ignored and this thread will continue to run", (Throwable)e);
            }
        }
    }

    public void clearTask() {
        this.waitSubmitQueue.clear();
        this.workerExecService.getTaskExecuteThreadMap().values().forEach(workerTaskExecuteRunnable -> {
            int taskInstanceId = workerTaskExecuteRunnable.getTaskExecutionContext().getTaskInstanceId();
            try {
                workerTaskExecuteRunnable.cancelTask();
                this.logger.info("Cancel the taskInstance in worker  {}", (Object)taskInstanceId);
            }
            catch (Exception ex) {
                this.logger.error("Cancel the taskInstance error {}", (Object)taskInstanceId, (Object)ex);
            }
            finally {
                TaskExecutionContextCacheManager.removeByTaskInstanceId((Integer)taskInstanceId);
            }
        });
        this.workerExecService.getTaskExecuteThreadMap().clear();
    }
}

