/*
 * Decompiled with CFR 0.152.
 */
package org.apache.dolphinscheduler.server.master.processor.queue;

import java.util.ArrayList;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import org.apache.dolphinscheduler.common.thread.Stopper;
import org.apache.dolphinscheduler.server.master.processor.queue.TaskEvent;
import org.apache.dolphinscheduler.server.master.processor.queue.TaskExecuteThreadPool;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

@Component
public class TaskEventService {
    private final Logger logger = LoggerFactory.getLogger(TaskEventService.class);
    private final BlockingQueue<TaskEvent> eventQueue = new LinkedBlockingQueue<TaskEvent>();
    private Thread taskEventThread;
    private Thread taskEventHandlerThread;
    @Autowired
    private TaskExecuteThreadPool taskExecuteThreadPool;

    @PostConstruct
    public void start() {
        this.taskEventThread = new TaskEventThread();
        this.taskEventThread.setName("TaskEventThread");
        this.taskEventThread.start();
        this.taskEventHandlerThread = new TaskEventHandlerThread();
        this.taskEventHandlerThread.setName("TaskEventHandlerThread");
        this.taskEventHandlerThread.start();
    }

    @PreDestroy
    public void stop() {
        try {
            this.taskEventThread.interrupt();
            this.taskEventHandlerThread.interrupt();
            if (!this.eventQueue.isEmpty()) {
                ArrayList remainEvents = new ArrayList(this.eventQueue.size());
                this.eventQueue.drainTo(remainEvents);
                for (TaskEvent taskEvent : remainEvents) {
                    this.taskExecuteThreadPool.submitTaskEvent(taskEvent);
                }
                this.taskExecuteThreadPool.eventHandler();
            }
        }
        catch (Exception e) {
            this.logger.error("stop error:", (Throwable)e);
        }
    }

    public void addEvent(TaskEvent taskEvent) {
        this.taskExecuteThreadPool.submitTaskEvent(taskEvent);
    }

    class TaskEventHandlerThread
    extends Thread {
        TaskEventHandlerThread() {
        }

        @Override
        public void run() {
            TaskEventService.this.logger.info("event handler thread started");
            while (Stopper.isRunning()) {
                try {
                    TaskEventService.this.taskExecuteThreadPool.eventHandler();
                    TimeUnit.MILLISECONDS.sleep(1000L);
                }
                catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                    break;
                }
                catch (Exception e) {
                    TaskEventService.this.logger.error("event handler thread error", (Throwable)e);
                }
            }
        }
    }

    class TaskEventThread
    extends Thread {
        TaskEventThread() {
        }

        @Override
        public void run() {
            while (Stopper.isRunning()) {
                try {
                    TaskEvent taskEvent = (TaskEvent)TaskEventService.this.eventQueue.take();
                    TaskEventService.this.taskExecuteThreadPool.submitTaskEvent(taskEvent);
                }
                catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                    break;
                }
                catch (Exception e) {
                    TaskEventService.this.logger.error("persist task error", (Throwable)e);
                }
            }
            TaskEventService.this.logger.info("StateEventResponseWorker stopped");
        }
    }
}

