/*
 * Decompiled with CFR 0.152.
 */
package org.graylog.scheduler;

import com.codahale.metrics.Counter;
import com.codahale.metrics.Meter;
import com.codahale.metrics.MetricRegistry;
import com.codahale.metrics.Timer;
import com.github.benmanes.caffeine.cache.CacheLoader;
import com.github.benmanes.caffeine.cache.Caffeine;
import com.github.benmanes.caffeine.cache.LoadingCache;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Functions;
import com.google.inject.assistedinject.Assisted;
import io.opentelemetry.api.trace.Span;
import io.opentelemetry.instrumentation.annotations.WithSpan;
import jakarta.inject.Inject;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Function;
import one.util.streamex.EntryStream;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.graylog.scheduler.DBJobDefinitionService;
import org.graylog.scheduler.DBJobTriggerService;
import org.graylog.scheduler.Job;
import org.graylog.scheduler.JobDefinitionDto;
import org.graylog.scheduler.JobExecutionContext;
import org.graylog.scheduler.JobExecutionException;
import org.graylog.scheduler.JobScheduleStrategies;
import org.graylog.scheduler.JobSchedulerConfig;
import org.graylog.scheduler.JobTriggerDto;
import org.graylog.scheduler.JobTriggerUpdate;
import org.graylog.scheduler.JobTriggerUpdates;
import org.graylog.scheduler.eventbus.JobCompletedEvent;
import org.graylog.scheduler.eventbus.JobSchedulerEventBus;
import org.graylog.scheduler.worker.JobWorkerPool;
import org.graylog.tracing.GraylogSemanticAttributes;
import org.graylog2.cluster.lock.AlreadyLockedException;
import org.graylog2.cluster.lock.RefreshingLockService;
import org.graylog2.shared.metrics.MetricUtils;
import org.joda.time.DateTime;
import org.joda.time.DateTimeZone;
import org.joda.time.Duration;
import org.joda.time.ReadableDuration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class JobExecutionEngine {
    private static final long DEFAULT_BACKOFF = 5000L;
    private static final Logger LOG = LoggerFactory.getLogger(JobExecutionEngine.class);
    private final DBJobTriggerService jobTriggerService;
    private final DBJobDefinitionService jobDefinitionService;
    private final JobSchedulerEventBus eventBus;
    private final JobScheduleStrategies scheduleStrategies;
    private final JobTriggerUpdates.Factory jobTriggerUpdatesFactory;
    private final Map<String, Job.Factory> jobFactory;
    private final JobWorkerPool workerPool;
    private final RefreshingLockService.Factory refreshingLockServiceFactory;
    private final Map<String, Integer> concurrencyLimits;
    private final long backoffMillis;
    private final Counter executionSuccessful;
    private final Counter executionFailed;
    private final Meter executionDenied;
    private final Meter executionRescheduled;
    private final Timer executionTime;
    private final LoadingCache<String, Long> gaugeCache;
    private final AtomicBoolean isRunning = new AtomicBoolean(true);
    private final AtomicBoolean shouldCleanup = new AtomicBoolean(true);

    @Inject
    public JobExecutionEngine(DBJobTriggerService jobTriggerService, DBJobDefinitionService jobDefinitionService, JobSchedulerEventBus eventBus, JobScheduleStrategies scheduleStrategies, JobTriggerUpdates.Factory jobTriggerUpdatesFactory, RefreshingLockService.Factory refreshingLockServiceFactory, Map<String, Job.Factory> jobFactory, @Assisted JobWorkerPool workerPool, JobSchedulerConfig schedulerConfig, MetricRegistry metricRegistry) {
        this(jobTriggerService, jobDefinitionService, eventBus, scheduleStrategies, jobTriggerUpdatesFactory, refreshingLockServiceFactory, jobFactory, workerPool, schedulerConfig, metricRegistry, 5000L);
    }

    @VisibleForTesting
    public JobExecutionEngine(final DBJobTriggerService jobTriggerService, DBJobDefinitionService jobDefinitionService, JobSchedulerEventBus eventBus, JobScheduleStrategies scheduleStrategies, JobTriggerUpdates.Factory jobTriggerUpdatesFactory, RefreshingLockService.Factory refreshingLockServiceFactory, final Map<String, Job.Factory> jobFactory, JobWorkerPool workerPool, JobSchedulerConfig schedulerConfig, MetricRegistry metricRegistry, long backoffMillis) {
        this.jobTriggerService = jobTriggerService;
        this.jobDefinitionService = jobDefinitionService;
        this.eventBus = eventBus;
        this.scheduleStrategies = scheduleStrategies;
        this.jobTriggerUpdatesFactory = jobTriggerUpdatesFactory;
        this.jobFactory = jobFactory;
        this.workerPool = workerPool;
        this.refreshingLockServiceFactory = refreshingLockServiceFactory;
        this.concurrencyLimits = schedulerConfig.concurrencyLimits();
        this.backoffMillis = backoffMillis;
        this.executionSuccessful = metricRegistry.counter(MetricRegistry.name(this.getClass(), (String[])new String[]{"executions", "successful"}));
        this.executionFailed = metricRegistry.counter(MetricRegistry.name(this.getClass(), (String[])new String[]{"executions", "failed"}));
        this.executionDenied = metricRegistry.meter(MetricRegistry.name(this.getClass(), (String[])new String[]{"executions", "denied"}));
        this.executionRescheduled = metricRegistry.meter(MetricRegistry.name(this.getClass(), (String[])new String[]{"executions", "rescheduled"}));
        this.executionTime = metricRegistry.timer(MetricRegistry.name(this.getClass(), (String[])new String[]{"executions", "time"}));
        this.gaugeCache = Caffeine.newBuilder().expireAfterWrite(5L, TimeUnit.SECONDS).build((CacheLoader)new CacheLoader<String, Long>(){

            public @Nullable Long load(String key) {
                throw new UnsupportedOperationException("Always use #loadAll");
            }

            public Map<String, Long> loadAll(Set<? extends String> keys) {
                return EntryStream.of(jobFactory.entrySet().stream()).mapValues((Function)Functions.constant((Object)0L)).append(jobTriggerService.numberOfOverdueTriggers()).toMap((defaultValue, dbValue) -> dbValue);
            }
        });
        jobFactory.keySet().forEach(jobType -> MetricUtils.safelyRegister(metricRegistry, MetricRegistry.name(this.getClass(), (String[])new String[]{"executions", "overdue", "type", jobType}), () -> (Long)this.gaugeCache.getAll(jobFactory.keySet()).get(jobType)));
    }

    public void shutdown() {
        this.isRunning.set(false);
    }

    private void cleanup() {
        int releasedTriggers;
        if (this.shouldCleanup.getAndSet(false) && (releasedTriggers = this.jobTriggerService.forceReleaseOwnedTriggers()) > 0) {
            LOG.warn("Force-released {} stale job triggers after an unclean job scheduler shutdown", (Object)releasedTriggers);
        }
    }

    public boolean execute() {
        Optional<JobTriggerDto> triggerOptional;
        if (this.shouldCleanup.get()) {
            this.cleanup();
        }
        if (this.isRunning.get() && this.workerPool.hasFreeSlots() && (triggerOptional = this.jobTriggerService.nextRunnableTrigger()).isPresent()) {
            JobTriggerDto trigger = triggerOptional.get();
            if (!this.workerPool.execute(() -> this.handleTriggerWithConcurrencyLimit(trigger))) {
                this.jobTriggerService.releaseTrigger(trigger, JobTriggerUpdate.withNextTime(trigger.nextTime()));
                this.executionDenied.mark();
                return false;
            }
            return true;
        }
        this.executionDenied.mark();
        return false;
    }

    public void updateLockedJobs() {
        if (this.workerPool.anySlotsUsed()) {
            this.jobTriggerService.updateLockedJobTriggers();
        }
    }

    private void handleTriggerWithConcurrencyLimit(JobTriggerDto trigger) {
        int maxTypeConcurrency = this.concurrencyLimits.getOrDefault(trigger.jobDefinitionType(), 0);
        if (maxTypeConcurrency > 0) {
            try (RefreshingLockService refreshingLockService = this.refreshingLockServiceFactory.create();){
                try {
                    refreshingLockService.acquireAndKeepLock(trigger.jobDefinitionType(), maxTypeConcurrency);
                    this.handleTrigger(trigger);
                }
                catch (AlreadyLockedException e) {
                    DateTime nextTime = DateTime.now((DateTimeZone)DateTimeZone.UTC).plus((ReadableDuration)this.slidingBackoff(trigger));
                    this.jobTriggerService.releaseTrigger(trigger, JobTriggerUpdate.withConcurrencyReschedule(nextTime));
                    this.executionDenied.mark();
                    this.executionRescheduled.mark();
                }
            }
        } else {
            this.handleTrigger(trigger);
        }
    }

    private Duration slidingBackoff(JobTriggerDto trigger) {
        long slidingBackoffMillis = trigger.concurrencyRescheduleCount() < 1 ? this.backoffMillis : this.backoffMillis / (long)Math.min(trigger.concurrencyRescheduleCount(), 5);
        return Duration.millis((long)slidingBackoffMillis);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void handleTrigger(JobTriggerDto trigger) {
        LOG.trace("Locked trigger {} (owner={})", (Object)trigger.id(), (Object)trigger.lock().owner());
        try {
            JobDefinitionDto jobDefinition = (JobDefinitionDto)this.jobDefinitionService.get(trigger.jobDefinitionId()).orElseThrow(() -> new IllegalStateException("Couldn't find job definition " + trigger.jobDefinitionId()));
            Object job = this.jobFactory.get(jobDefinition.config().type()).create(jobDefinition);
            if (job == null) {
                throw new IllegalStateException("Couldn't find job factory for type " + jobDefinition.config().type());
            }
            this.executionTime.time(() -> this.executeJob(trigger, jobDefinition, (Job)job));
        }
        catch (IllegalStateException e) {
            LOG.error("Couldn't handle trigger due to a permanent error {} - trigger won't be retried", (Object)trigger.id(), (Object)e);
            this.jobTriggerService.setTriggerError(trigger);
        }
        catch (Exception e) {
            DateTime nextTime = DateTime.now((DateTimeZone)DateTimeZone.UTC).plusSeconds(5);
            LOG.error("Couldn't handle trigger {} - retrying at {}", new Object[]{trigger.id(), nextTime, e});
            this.jobTriggerService.releaseTrigger(trigger, JobTriggerUpdate.withNextTime(nextTime));
        }
        finally {
            this.eventBus.post(JobCompletedEvent.INSTANCE);
        }
    }

    @WithSpan
    private void executeJob(JobTriggerDto trigger, JobDefinitionDto jobDefinition, Job job) {
        Span.current().setAttribute(GraylogSemanticAttributes.SCHEDULER_JOB_CLASS, (Object)job.getClass().getSimpleName()).setAttribute(GraylogSemanticAttributes.SCHEDULER_JOB_DEFINITION_TYPE, (Object)jobDefinition.config().type()).setAttribute(GraylogSemanticAttributes.SCHEDULER_JOB_DEFINITION_TITLE, (Object)jobDefinition.title()).setAttribute(GraylogSemanticAttributes.SCHEDULER_JOB_DEFINITION_ID, (Object)String.valueOf(jobDefinition.id()));
        try {
            JobTriggerUpdate triggerUpdate;
            if (LOG.isDebugEnabled()) {
                LOG.debug("Execute job: {}/{}/{} (job-class={} trigger={} config={})", new Object[]{jobDefinition.title(), jobDefinition.id(), jobDefinition.config().type(), job.getClass().getSimpleName(), trigger.id(), jobDefinition.config()});
            }
            if ((triggerUpdate = job.execute(JobExecutionContext.create(trigger, jobDefinition, this.jobTriggerUpdatesFactory.create(trigger), this.isRunning, this.jobTriggerService))) == null) {
                this.executionFailed.inc();
                throw new IllegalStateException("Job#execute() must not return null - this is a bug in the job class");
            }
            this.executionSuccessful.inc();
            LOG.trace("Update trigger: trigger={} update={}", (Object)trigger.id(), (Object)triggerUpdate);
            this.jobTriggerService.releaseTrigger(trigger, triggerUpdate);
        }
        catch (JobExecutionException e) {
            LOG.error("Job execution error - trigger={} job={}", new Object[]{trigger.id(), jobDefinition.id(), e});
            this.executionFailed.inc();
            this.jobTriggerService.releaseTrigger(e.getTrigger(), e.getUpdate());
        }
        catch (Exception e) {
            this.executionFailed.inc();
            LOG.error("Unhandled job execution error - trigger={} job={}", new Object[]{trigger.id(), jobDefinition.id(), e});
            DateTime nextFutureTime = this.scheduleStrategies.nextFutureTime(trigger).orElse(null);
            this.jobTriggerService.releaseTrigger(trigger, JobTriggerUpdate.withNextTime(nextFutureTime));
        }
    }

    public static interface Factory {
        public JobExecutionEngine create(JobWorkerPool var1);
    }
}

