/*
 * Decompiled with CFR 0.152.
 */
package com.github.kagkarlsson.scheduler;

import com.github.kagkarlsson.scheduler.Clock;
import com.github.kagkarlsson.scheduler.CurrentlyExecuting;
import com.github.kagkarlsson.scheduler.Executor;
import com.github.kagkarlsson.scheduler.ExecutorUtils;
import com.github.kagkarlsson.scheduler.FetchCandidates;
import com.github.kagkarlsson.scheduler.LockAndFetchCandidates;
import com.github.kagkarlsson.scheduler.PollStrategy;
import com.github.kagkarlsson.scheduler.PollingStrategyConfig;
import com.github.kagkarlsson.scheduler.RunAndLogErrors;
import com.github.kagkarlsson.scheduler.RunUntilShutdown;
import com.github.kagkarlsson.scheduler.ScheduledExecution;
import com.github.kagkarlsson.scheduler.ScheduledExecutionsFilter;
import com.github.kagkarlsson.scheduler.SchedulerBuilder;
import com.github.kagkarlsson.scheduler.SchedulerClient;
import com.github.kagkarlsson.scheduler.SchedulerClientEventListener;
import com.github.kagkarlsson.scheduler.SchedulerName;
import com.github.kagkarlsson.scheduler.SchedulerState;
import com.github.kagkarlsson.scheduler.TaskRepository;
import com.github.kagkarlsson.scheduler.TaskResolver;
import com.github.kagkarlsson.scheduler.TriggerCheckForDueExecutions;
import com.github.kagkarlsson.scheduler.Waiter;
import com.github.kagkarlsson.scheduler.logging.ConfigurableLogger;
import com.github.kagkarlsson.scheduler.logging.LogLevel;
import com.github.kagkarlsson.scheduler.stats.StatsRegistry;
import com.github.kagkarlsson.scheduler.task.Execution;
import com.github.kagkarlsson.scheduler.task.ExecutionOperations;
import com.github.kagkarlsson.scheduler.task.OnStartup;
import com.github.kagkarlsson.scheduler.task.Task;
import com.github.kagkarlsson.scheduler.task.TaskInstance;
import com.github.kagkarlsson.scheduler.task.TaskInstanceId;
import java.time.Duration;
import java.time.Instant;
import java.util.Arrays;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import javax.sql.DataSource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class Scheduler
implements SchedulerClient {
    public static final double TRIGGER_NEXT_BATCH_WHEN_AVAILABLE_THREADS_RATIO = 0.5;
    public static final String THREAD_PREFIX = "db-scheduler";
    private static final Logger LOG = LoggerFactory.getLogger(Scheduler.class);
    private final SchedulerClient delegate;
    final Clock clock;
    final TaskRepository schedulerTaskRepository;
    final TaskResolver taskResolver;
    protected final PollStrategy executeDueStrategy;
    protected final Executor executor;
    private final ScheduledExecutorService housekeeperExecutor;
    int threadpoolSize;
    private final Waiter executeDueWaiter;
    private final Duration deleteUnresolvedAfter;
    private final Duration shutdownMaxWait;
    protected final List<OnStartup> onStartup;
    private final Waiter detectDeadWaiter;
    private final Duration heartbeatInterval;
    final StatsRegistry statsRegistry;
    private final ExecutorService dueExecutor;
    private final Waiter heartbeatWaiter;
    final SchedulerState.SettableSchedulerState schedulerState = new SchedulerState.SettableSchedulerState();
    final ConfigurableLogger failureLogger;

    protected Scheduler(Clock clock, TaskRepository schedulerTaskRepository, TaskRepository clientTaskRepository, TaskResolver taskResolver, int threadpoolSize, ExecutorService executorService, SchedulerName schedulerName, Waiter executeDueWaiter, Duration heartbeatInterval, boolean enableImmediateExecution, StatsRegistry statsRegistry, PollingStrategyConfig pollingStrategyConfig, Duration deleteUnresolvedAfter, Duration shutdownMaxWait, LogLevel logLevel, boolean logStackTrace, List<OnStartup> onStartup) {
        this.clock = clock;
        this.schedulerTaskRepository = schedulerTaskRepository;
        this.taskResolver = taskResolver;
        this.threadpoolSize = threadpoolSize;
        this.executor = new Executor(executorService, clock);
        this.executeDueWaiter = executeDueWaiter;
        this.deleteUnresolvedAfter = deleteUnresolvedAfter;
        this.shutdownMaxWait = shutdownMaxWait;
        this.onStartup = onStartup;
        this.detectDeadWaiter = new Waiter(heartbeatInterval.multipliedBy(2L), clock);
        this.heartbeatInterval = heartbeatInterval;
        this.heartbeatWaiter = new Waiter(heartbeatInterval, clock);
        this.statsRegistry = statsRegistry;
        this.dueExecutor = Executors.newSingleThreadExecutor(ExecutorUtils.defaultThreadFactoryWithPrefix("db-scheduler-execute-due-"));
        this.housekeeperExecutor = Executors.newScheduledThreadPool(3, ExecutorUtils.defaultThreadFactoryWithPrefix("db-scheduler-housekeeper-"));
        SchedulerClientEventListener earlyExecutionListener = enableImmediateExecution ? new TriggerCheckForDueExecutions(this.schedulerState, clock, executeDueWaiter) : SchedulerClientEventListener.NOOP;
        this.delegate = new SchedulerClient.StandardSchedulerClient(clientTaskRepository, earlyExecutionListener);
        this.failureLogger = ConfigurableLogger.create(LOG, logLevel, logStackTrace);
        if (pollingStrategyConfig.type == PollingStrategyConfig.Type.LOCK_AND_FETCH) {
            schedulerTaskRepository.checkSupportsLockAndFetch();
            this.executeDueStrategy = new LockAndFetchCandidates(this.executor, schedulerTaskRepository, this, threadpoolSize, statsRegistry, this.schedulerState, this.failureLogger, taskResolver, clock, pollingStrategyConfig, this::triggerCheckForDueExecutions);
        } else if (pollingStrategyConfig.type == PollingStrategyConfig.Type.FETCH) {
            this.executeDueStrategy = new FetchCandidates(this.executor, schedulerTaskRepository, this, threadpoolSize, statsRegistry, this.schedulerState, this.failureLogger, taskResolver, clock, pollingStrategyConfig, this::triggerCheckForDueExecutions);
        } else {
            throw new IllegalArgumentException("Unknown polling-strategy type: " + (Object)((Object)pollingStrategyConfig.type));
        }
        LOG.info("Using polling-strategy: " + pollingStrategyConfig.describe());
    }

    public void start() {
        LOG.info("Starting scheduler.");
        this.executeOnStartup();
        this.dueExecutor.submit(new RunUntilShutdown(this.executeDueStrategy, this.executeDueWaiter, this.schedulerState, this.statsRegistry));
        this.housekeeperExecutor.scheduleWithFixedDelay(new RunAndLogErrors(this::detectDeadExecutions, this.statsRegistry), 0L, this.detectDeadWaiter.getWaitDuration().toMillis(), TimeUnit.MILLISECONDS);
        this.housekeeperExecutor.scheduleWithFixedDelay(new RunAndLogErrors(this::updateHeartbeats, this.statsRegistry), 0L, this.heartbeatWaiter.getWaitDuration().toMillis(), TimeUnit.MILLISECONDS);
        this.schedulerState.setStarted();
    }

    protected void executeDue() {
        this.executeDueStrategy.run();
    }

    protected void executeOnStartup() {
        SchedulerClient.StandardSchedulerClient onStartupClient = new SchedulerClient.StandardSchedulerClient(this.schedulerTaskRepository);
        this.onStartup.forEach(os -> {
            try {
                os.onStartup(onStartupClient, this.clock);
            }
            catch (Exception e) {
                LOG.error("Unexpected error while executing OnStartup tasks. Continuing.", (Throwable)e);
                this.statsRegistry.register(StatsRegistry.SchedulerStatsEvent.UNEXPECTED_ERROR);
            }
        });
    }

    public void stop() {
        this.stop(Duration.ofSeconds(1L), Duration.ofSeconds(5L));
    }

    void stop(Duration utilExecutorsWaitBeforeInterrupt, Duration utilExecutorsWaitAfterInterrupt) {
        if (this.schedulerState.isShuttingDown()) {
            LOG.warn("Multiple calls to 'stop()'. Scheduler is already stopping.");
            return;
        }
        this.schedulerState.setIsShuttingDown();
        LOG.info("Shutting down Scheduler.");
        if (this.executeDueWaiter.isWaiting()) {
            this.dueExecutor.shutdownNow();
            if (!ExecutorUtils.awaitTermination(this.dueExecutor, utilExecutorsWaitAfterInterrupt)) {
                LOG.warn("Failed to shutdown due-executor properly.");
            }
        } else if (!ExecutorUtils.shutdownAndAwaitTermination(this.dueExecutor, utilExecutorsWaitBeforeInterrupt, utilExecutorsWaitAfterInterrupt)) {
            LOG.warn("Failed to shutdown due-executor properly.");
        }
        if (!ExecutorUtils.shutdownAndAwaitTermination(this.housekeeperExecutor, utilExecutorsWaitBeforeInterrupt, utilExecutorsWaitAfterInterrupt)) {
            LOG.warn("Failed to shutdown housekeeper-executor properly.");
        }
        this.executor.stop(this.shutdownMaxWait);
    }

    public SchedulerState getSchedulerState() {
        return this.schedulerState;
    }

    @Override
    public <T> void schedule(TaskInstance<T> taskInstance, Instant executionTime) {
        this.delegate.schedule(taskInstance, executionTime);
    }

    @Override
    public void reschedule(TaskInstanceId taskInstanceId, Instant newExecutionTime) {
        this.delegate.reschedule(taskInstanceId, newExecutionTime);
    }

    @Override
    public <T> void reschedule(TaskInstanceId taskInstanceId, Instant newExecutionTime, T newData) {
        this.delegate.reschedule(taskInstanceId, newExecutionTime, newData);
    }

    @Override
    public void cancel(TaskInstanceId taskInstanceId) {
        this.delegate.cancel(taskInstanceId);
    }

    @Override
    public void fetchScheduledExecutions(Consumer<ScheduledExecution<Object>> consumer) {
        this.delegate.fetchScheduledExecutions(consumer);
    }

    @Override
    public void fetchScheduledExecutions(ScheduledExecutionsFilter filter, Consumer<ScheduledExecution<Object>> consumer) {
        this.delegate.fetchScheduledExecutions(filter, consumer);
    }

    @Override
    public <T> void fetchScheduledExecutionsForTask(String taskName, Class<T> dataClass, Consumer<ScheduledExecution<T>> consumer) {
        this.delegate.fetchScheduledExecutionsForTask(taskName, dataClass, consumer);
    }

    @Override
    public <T> void fetchScheduledExecutionsForTask(String taskName, Class<T> dataClass, ScheduledExecutionsFilter filter, Consumer<ScheduledExecution<T>> consumer) {
        this.delegate.fetchScheduledExecutionsForTask(taskName, dataClass, filter, consumer);
    }

    @Override
    public Optional<ScheduledExecution<Object>> getScheduledExecution(TaskInstanceId taskInstanceId) {
        return this.delegate.getScheduledExecution(taskInstanceId);
    }

    public List<Execution> getFailingExecutions(Duration failingAtLeastFor) {
        return this.schedulerTaskRepository.getExecutionsFailingLongerThan(failingAtLeastFor);
    }

    public void triggerCheckForDueExecutions() {
        this.executeDueWaiter.wakeOrSkipNextWait();
    }

    public List<CurrentlyExecuting> getCurrentlyExecuting() {
        return this.executor.getCurrentlyExecuting();
    }

    protected void detectDeadExecutions() {
        LOG.debug("Deleting executions with unresolved tasks.");
        this.taskResolver.getUnresolvedTaskNames(this.deleteUnresolvedAfter).forEach(taskName -> {
            LOG.warn("Deleting all executions for task with name '{}'. They have been unresolved for more than {}", taskName, (Object)this.deleteUnresolvedAfter);
            int removed = this.schedulerTaskRepository.removeExecutions((String)taskName);
            LOG.info("Removed {} executions", (Object)removed);
            this.taskResolver.clearUnresolved((String)taskName);
        });
        LOG.debug("Checking for dead executions.");
        Instant now = this.clock.now();
        Instant oldAgeLimit = now.minus(this.getMaxAgeBeforeConsideredDead());
        List<Execution> oldExecutions = this.schedulerTaskRepository.getDeadExecutions(oldAgeLimit);
        if (!oldExecutions.isEmpty()) {
            oldExecutions.forEach(execution -> {
                LOG.info("Found dead execution. Delegating handling to task. Execution: " + execution);
                try {
                    Optional<Task> task = this.taskResolver.resolve(execution.taskInstance.getTaskName());
                    if (task.isPresent()) {
                        this.statsRegistry.register(StatsRegistry.SchedulerStatsEvent.DEAD_EXECUTION);
                        task.get().getDeadExecutionHandler().deadExecution((Execution)execution, new ExecutionOperations(this.schedulerTaskRepository, (Execution)execution));
                    } else {
                        LOG.error("Failed to find implementation for task with name '{}' for detected dead execution. Either delete the execution from the databaser, or add an implementation for it.", (Object)execution.taskInstance.getTaskName());
                    }
                }
                catch (Throwable e) {
                    LOG.error("Failed while handling dead execution {}. Will be tried again later.", execution, (Object)e);
                    this.statsRegistry.register(StatsRegistry.SchedulerStatsEvent.UNEXPECTED_ERROR);
                }
            });
        } else {
            LOG.trace("No dead executions found.");
        }
        this.statsRegistry.register(StatsRegistry.SchedulerStatsEvent.RAN_DETECT_DEAD);
    }

    void updateHeartbeats() {
        List<CurrentlyExecuting> currentlyProcessing = this.executor.getCurrentlyExecuting();
        if (currentlyProcessing.isEmpty()) {
            LOG.trace("No executions to update heartbeats for. Skipping.");
            return;
        }
        LOG.debug("Updating heartbeats for {} executions being processed.", (Object)currentlyProcessing.size());
        Instant now = this.clock.now();
        currentlyProcessing.stream().map(CurrentlyExecuting::getExecution).forEach(execution -> {
            LOG.trace("Updating heartbeat for execution: " + execution);
            try {
                this.schedulerTaskRepository.updateHeartbeat((Execution)execution, now);
            }
            catch (Throwable e) {
                LOG.error("Failed while updating heartbeat for execution {}. Will try again later.", execution, (Object)e);
                this.statsRegistry.register(StatsRegistry.SchedulerStatsEvent.UNEXPECTED_ERROR);
            }
        });
        this.statsRegistry.register(StatsRegistry.SchedulerStatsEvent.RAN_UPDATE_HEARTBEATS);
    }

    Duration getMaxAgeBeforeConsideredDead() {
        return this.heartbeatInterval.multipliedBy(4L);
    }

    public static SchedulerBuilder create(DataSource dataSource, Task<?> ... knownTasks) {
        return Scheduler.create(dataSource, Arrays.asList(knownTasks));
    }

    public static SchedulerBuilder create(DataSource dataSource, List<Task<?>> knownTasks) {
        return new SchedulerBuilder(dataSource, knownTasks);
    }
}

