/*
 * Decompiled with CFR 0.152.
 */
package org.graylog.scheduler;

import com.codahale.metrics.Counter;
import com.codahale.metrics.MetricRegistry;
import com.codahale.metrics.Timer;
import com.google.inject.assistedinject.Assisted;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.inject.Inject;
import org.graylog.scheduler.DBJobDefinitionService;
import org.graylog.scheduler.DBJobTriggerService;
import org.graylog.scheduler.Job;
import org.graylog.scheduler.JobDefinitionDto;
import org.graylog.scheduler.JobExecutionContext;
import org.graylog.scheduler.JobExecutionException;
import org.graylog.scheduler.JobScheduleStrategies;
import org.graylog.scheduler.JobTriggerDto;
import org.graylog.scheduler.JobTriggerUpdate;
import org.graylog.scheduler.JobTriggerUpdates;
import org.graylog.scheduler.eventbus.JobCompletedEvent;
import org.graylog.scheduler.eventbus.JobSchedulerEventBus;
import org.graylog.scheduler.worker.JobWorkerPool;
import org.joda.time.DateTime;
import org.joda.time.DateTimeZone;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class JobExecutionEngine {
    private static final Logger LOG = LoggerFactory.getLogger(JobExecutionEngine.class);
    private final DBJobTriggerService jobTriggerService;
    private final DBJobDefinitionService jobDefinitionService;
    private final JobSchedulerEventBus eventBus;
    private final JobScheduleStrategies scheduleStrategies;
    private final JobTriggerUpdates.Factory jobTriggerUpdatesFactory;
    private final Map<String, Job.Factory> jobFactory;
    private final JobWorkerPool workerPool;
    private Counter executionSuccessful;
    private Counter executionFailed;
    private Timer executionTime;
    private final AtomicBoolean isRunning = new AtomicBoolean(true);
    private final AtomicBoolean shouldCleanup = new AtomicBoolean(true);

    @Inject
    public JobExecutionEngine(DBJobTriggerService jobTriggerService, DBJobDefinitionService jobDefinitionService, JobSchedulerEventBus eventBus, JobScheduleStrategies scheduleStrategies, JobTriggerUpdates.Factory jobTriggerUpdatesFactory, Map<String, Job.Factory> jobFactory, @Assisted JobWorkerPool workerPool, MetricRegistry metricRegistry) {
        this.jobTriggerService = jobTriggerService;
        this.jobDefinitionService = jobDefinitionService;
        this.eventBus = eventBus;
        this.scheduleStrategies = scheduleStrategies;
        this.jobTriggerUpdatesFactory = jobTriggerUpdatesFactory;
        this.jobFactory = jobFactory;
        this.workerPool = workerPool;
        this.executionSuccessful = metricRegistry.counter(MetricRegistry.name(this.getClass(), (String[])new String[]{"executions", "successful"}));
        this.executionFailed = metricRegistry.counter(MetricRegistry.name(this.getClass(), (String[])new String[]{"executions", "failed"}));
        this.executionTime = metricRegistry.timer(MetricRegistry.name(this.getClass(), (String[])new String[]{"executions", "time"}));
    }

    public void shutdown() {
        this.isRunning.set(false);
    }

    private void cleanup() {
        int releasedTriggers;
        if (this.shouldCleanup.getAndSet(false) && (releasedTriggers = this.jobTriggerService.forceReleaseOwnedTriggers()) > 0) {
            LOG.warn("Force-released {} stale job triggers after an unclean job scheduler shutdown", (Object)releasedTriggers);
        }
    }

    public boolean execute() {
        Optional<JobTriggerDto> triggerOptional;
        if (this.shouldCleanup.get()) {
            this.cleanup();
        }
        if (this.isRunning.get() && this.workerPool.hasFreeSlots() && (triggerOptional = this.jobTriggerService.nextRunnableTrigger()).isPresent()) {
            JobTriggerDto trigger = triggerOptional.get();
            if (!this.workerPool.execute(() -> this.handleTrigger(trigger))) {
                this.jobTriggerService.releaseTrigger(trigger, JobTriggerUpdate.withNextTime(trigger.nextTime()));
                return false;
            }
            return true;
        }
        return false;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void handleTrigger(JobTriggerDto trigger) {
        LOG.trace("Locked trigger {} (owner={})", (Object)trigger.id(), (Object)trigger.lock().owner());
        try {
            JobDefinitionDto jobDefinition = (JobDefinitionDto)this.jobDefinitionService.get(trigger.jobDefinitionId()).orElseThrow(() -> new IllegalStateException("Couldn't find job definition " + trigger.jobDefinitionId()));
            Object job = this.jobFactory.get(jobDefinition.config().type()).create(jobDefinition);
            if (job == null) {
                throw new IllegalStateException("Couldn't find job factory for type " + jobDefinition.config().type());
            }
            this.executionTime.time(() -> this.executeJob(trigger, jobDefinition, (Job)job));
        }
        catch (IllegalStateException e) {
            LOG.error("Couldn't handle trigger due to a permanent error {} - trigger won't be retried", (Object)trigger.id(), (Object)e);
            this.jobTriggerService.setTriggerError(trigger);
        }
        catch (Exception e) {
            DateTime nextTime = DateTime.now((DateTimeZone)DateTimeZone.UTC).plusSeconds(5);
            LOG.error("Couldn't handle trigger {} - retrying at {}", new Object[]{trigger.id(), nextTime, e});
            this.jobTriggerService.releaseTrigger(trigger, JobTriggerUpdate.withNextTime(nextTime));
        }
        finally {
            this.eventBus.post(JobCompletedEvent.INSTANCE);
        }
    }

    private void executeJob(JobTriggerDto trigger, JobDefinitionDto jobDefinition, Job job) {
        block7: {
            try {
                JobTriggerUpdate triggerUpdate;
                if (LOG.isDebugEnabled()) {
                    LOG.debug("Execute job: {}/{}/{} (job-class={} trigger={} config={})", new Object[]{jobDefinition.title(), jobDefinition.id(), jobDefinition.config().type(), job.getClass().getSimpleName(), trigger.id(), jobDefinition.config()});
                }
                if ((triggerUpdate = job.execute(JobExecutionContext.create(trigger, jobDefinition, this.jobTriggerUpdatesFactory.create(trigger), this.isRunning))) == null) {
                    this.executionFailed.inc();
                    throw new IllegalStateException("Job#execute() must not return null - this is a bug in the job class");
                }
                this.executionSuccessful.inc();
                LOG.trace("Update trigger: trigger={} update={}", (Object)trigger.id(), (Object)triggerUpdate);
                if (!this.jobTriggerService.releaseTrigger(trigger, triggerUpdate)) {
                    LOG.error("Couldn't release trigger {}", (Object)trigger.id());
                }
            }
            catch (JobExecutionException e) {
                LOG.error("Job execution error - trigger={} job={}", new Object[]{trigger.id(), jobDefinition.id(), e});
                this.executionFailed.inc();
                if (!this.jobTriggerService.releaseTrigger(e.getTrigger(), e.getUpdate())) {
                    LOG.error("Couldn't release trigger {}", (Object)trigger.id());
                }
            }
            catch (Exception e) {
                this.executionFailed.inc();
                LOG.error("Unhandled job execution error - trigger={} job={}", new Object[]{trigger.id(), jobDefinition.id(), e});
                DateTime nextFutureTime = this.scheduleStrategies.nextFutureTime(trigger).orElse(null);
                if (this.jobTriggerService.releaseTrigger(trigger, JobTriggerUpdate.withNextTime(nextFutureTime))) break block7;
                LOG.error("Couldn't release trigger {}", (Object)trigger.id());
            }
        }
    }

    public static interface Factory {
        public JobExecutionEngine create(JobWorkerPool var1);
    }
}

