/*
 * Decompiled with CFR 0.152.
 */
package io.takari.bpm.event;

import io.takari.bpm.api.NoEventFoundException;
import io.takari.bpm.event.Event;
import io.takari.bpm.event.EventDispatcher;
import io.takari.bpm.event.EventPersistenceManager;
import io.takari.bpm.event.ExpiredEvent;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public final class EventScheduler {
    private static final Logger log = LoggerFactory.getLogger(EventScheduler.class);
    private final EventPersistenceManager eventManager;
    private final BlockingQueue<ExpiredEvent> acquiredEventQueue;
    private final List<Thread> eventExecutorThreads = new ArrayList<Thread>();
    private final EventDispatcher dispatcher;
    private Thread eventAcquisitionThread;
    private volatile boolean stopped = true;
    private int eventExecutorsCount = 10;
    private int maxEventsPerAcquisition = 10;
    private long acquisitionDelay = TimeUnit.SECONDS.toMillis(5L);
    private long acquisitionErrorDelay = TimeUnit.SECONDS.toMillis(5L);
    private long executionErrorDelay = TimeUnit.SECONDS.toMillis(5L);

    public EventScheduler(EventPersistenceManager eventManager, int maxAcquiredEventQueueSize, EventDispatcher dispatcher) {
        this.eventManager = eventManager;
        this.acquiredEventQueue = new LinkedBlockingQueue<ExpiredEvent>(maxAcquiredEventQueueSize);
        this.dispatcher = dispatcher;
    }

    public void setEventExecutorsCount(int eventExecutorsCount) {
        this.eventExecutorsCount = eventExecutorsCount;
    }

    public void setMaxEventsPerAcquisition(int maxEventsPerAcquisition) {
        this.maxEventsPerAcquisition = maxEventsPerAcquisition;
    }

    public void setAcquisitionDelay(long acquisitionDelay) {
        this.acquisitionDelay = acquisitionDelay;
    }

    public void setAcquisitionErrorDelay(long acquisitionErrorDelay) {
        this.acquisitionErrorDelay = acquisitionErrorDelay;
    }

    public void setExecutionErrorDelay(long executionErrorDelay) {
        this.executionErrorDelay = executionErrorDelay;
    }

    public synchronized void start() {
        if (!this.stopped) {
            return;
        }
        this.stopped = false;
        int i = 0;
        while (i < this.eventExecutorsCount) {
            Thread t = new Thread("eventExecutionThread"){

                @Override
                public void run() {
                    EventScheduler.this.eventExecutionLoop();
                }
            };
            t.start();
            this.eventExecutorThreads.add(t);
            ++i;
        }
        this.eventAcquisitionThread = new Thread("eventAcquisitionThread"){

            @Override
            public void run() {
                EventScheduler.this.eventAcquisitionLoop();
            }
        };
        this.eventAcquisitionThread.start();
        log.info("start -> done");
    }

    public synchronized void stop() {
        if (this.stopped) {
            return;
        }
        this.stopped = true;
        this.eventAcquisitionThread.interrupt();
        for (Thread t : this.eventExecutorThreads) {
            t.interrupt();
        }
        this.eventExecutorThreads.clear();
        log.info("stop -> done");
    }

    private void eventAcquisitionLoop() {
        while (!Thread.currentThread().isInterrupted() && !this.stopped) {
            try {
                List<ExpiredEvent> acquiredEvents = this.eventManager.findNextExpiredEvent(this.maxEventsPerAcquisition);
                if (!acquiredEvents.isEmpty()) {
                    for (ExpiredEvent e : acquiredEvents) {
                        this.acquiredEventQueue.put(e);
                    }
                }
                if (acquiredEvents.size() >= this.maxEventsPerAcquisition) continue;
                this.sleep(this.acquisitionDelay);
            }
            catch (InterruptedException interruptedException) {
                log.debug("eventAcquisitionLoop -> interrupted");
                Thread.currentThread().interrupt();
            }
            catch (Exception e) {
                log.error("eventAcquisitionLoop -> error, retry in {} ms", (Object)this.acquisitionErrorDelay, (Object)e);
                this.sleep(this.acquisitionErrorDelay);
            }
        }
        log.debug("eventAcquisitionLoop -> done");
    }

    private void eventExecutionLoop() {
        while (!Thread.currentThread().isInterrupted() && !this.stopped) {
            try {
                ExpiredEvent x = this.acquiredEventQueue.take();
                Event e = this.eventManager.get(x.geId());
                if (e == null) continue;
                this.dispatcher.dispatch(e);
            }
            catch (NoEventFoundException e) {
                log.warn("eventExecutionLoop -> no event found: {}", (Object)e.getMessage());
            }
            catch (InterruptedException interruptedException) {
                log.info("eventExecutionLoop -> interrupted");
                Thread.currentThread().interrupt();
            }
            catch (Exception e) {
                log.error("eventExecutionLoop -> error, retry in {} ms", (Object)this.executionErrorDelay, (Object)e);
                this.sleep(this.executionErrorDelay);
            }
        }
        log.info("eventAcquisitionLoop -> done");
    }

    private void sleep(long t) {
        try {
            Thread.sleep(t);
        }
        catch (InterruptedException interruptedException) {
            Thread.currentThread().interrupt();
        }
    }
}

