/*
 * Decompiled with CFR 0.152.
 */
package org.graylog.events.processor;

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonTypeName;
import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
import com.google.auto.value.AutoValue;
import com.google.common.base.Preconditions;
import com.google.inject.assistedinject.Assisted;
import jakarta.inject.Inject;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import org.graylog.events.configuration.EventsConfigurationProvider;
import org.graylog.events.processor.AutoValue_EventProcessorExecutionJob_Config;
import org.graylog.events.processor.AutoValue_EventProcessorExecutionJob_Data;
import org.graylog.events.processor.EventProcessorEngine;
import org.graylog.events.processor.EventProcessorException;
import org.graylog.events.processor.EventProcessorParametersWithTimerange;
import org.graylog.events.processor.EventProcessorPreconditionException;
import org.graylog.scheduler.Job;
import org.graylog.scheduler.JobDefinitionConfig;
import org.graylog.scheduler.JobDefinitionDto;
import org.graylog.scheduler.JobExecutionContext;
import org.graylog.scheduler.JobExecutionException;
import org.graylog.scheduler.JobScheduleStrategies;
import org.graylog.scheduler.JobTriggerData;
import org.graylog.scheduler.JobTriggerUpdate;
import org.graylog.scheduler.clock.JobSchedulerClock;
import org.joda.time.DateTime;
import org.joda.time.ReadableInstant;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class EventProcessorExecutionJob
implements Job {
    private static final Logger LOG = LoggerFactory.getLogger(EventProcessorExecutionJob.class);
    public static final String TYPE_NAME = "event-processor-execution-v1";
    private static final long RETRY_INTERVAL = 5000L;
    private final JobScheduleStrategies scheduleStrategies;
    private final JobSchedulerClock clock;
    private final EventProcessorEngine eventProcessorEngine;
    private final Config config;
    private final EventsConfigurationProvider configurationProvider;

    @Inject
    public EventProcessorExecutionJob(JobScheduleStrategies scheduleStrategies, JobSchedulerClock clock, EventProcessorEngine eventProcessorEngine, EventsConfigurationProvider configurationProvider, @Assisted JobDefinitionDto jobDefinition) {
        this.scheduleStrategies = scheduleStrategies;
        this.clock = clock;
        this.eventProcessorEngine = eventProcessorEngine;
        this.configurationProvider = configurationProvider;
        this.config = (Config)jobDefinition.config();
    }

    @Override
    public JobTriggerUpdate execute(JobExecutionContext ctx) throws JobExecutionException {
        EventProcessorParametersWithTimerange parameters;
        Optional<Data> data = ctx.trigger().data().map(d -> (Data)d);
        if (data.isPresent()) {
            LOG.trace("Using timerange from job trigger data: from={} to={} (trigger={})", new Object[]{data.get().timerangeFrom(), data.get().timerangeTo(), ctx.trigger().id()});
            parameters = this.config.parameters().withTimerange(data.get().timerangeFrom(), data.get().timerangeTo());
        } else {
            parameters = this.config.parameters();
        }
        DateTime from = parameters.timerange().getFrom();
        DateTime to = parameters.timerange().getTo();
        if (!to.isAfter((ReadableInstant)from)) {
            JobTriggerUpdate triggerUpdate = JobTriggerUpdate.withError(ctx.trigger());
            throw new JobExecutionException("Invalid time range - \"to\" timestamp <" + to.toString() + "> is not after \"from\" timestamp <" + from.toString() + ">", ctx.trigger(), triggerUpdate);
        }
        DateTime now = this.clock.nowUTC();
        if (now.isBefore((ReadableInstant)to)) {
            LOG.error("The end of the timerange to process is in the future, re-scheduling job trigger <{}> to run at <{}>", (Object)ctx.trigger().id(), (Object)to);
            return JobTriggerUpdate.withNextTime(to);
        }
        try {
            this.eventProcessorEngine.execute(this.config.eventDefinitionId(), parameters);
            DateTime nextTo = this.config.isCron() ? this.scheduleStrategies.nextTime(ctx.trigger(), to).orElse(to.plus(this.config.processingHopSize())) : to.plus(this.config.processingHopSize());
            DateTime nextFrom = nextTo.minus(this.config.processingWindowSize());
            long catchUpSize = this.configurationProvider.get().eventCatchupWindow();
            if (!this.config.isCron() && catchUpSize > 0L && catchUpSize > this.config.processingWindowSize() && to.plus(catchUpSize).isBefore((ReadableInstant)now) && this.config.processingHopSize() <= this.config.processingWindowSize()) {
                long chunkCount = catchUpSize / this.config.processingWindowSize();
                nextTo = to.plus(this.config.processingWindowSize() * chunkCount);
                LOG.info("Event processor <{}> is catching up on old data. Combining {} search windows with catchUpWindowSize={}ms: from={} to={}", new Object[]{this.config.eventDefinitionId(), chunkCount, catchUpSize, nextFrom, nextTo});
            }
            LOG.trace("Set new timerange of eventproc <{}> in job trigger data: from={} to={} (hopSize={}ms windowSize={}ms)", new Object[]{this.config.eventDefinitionId(), nextFrom, nextTo, this.config.processingHopSize(), this.config.processingWindowSize()});
            Data newData = data.map(Data::toBuilder).orElse(Data.builder()).timerangeFrom(nextFrom).timerangeTo(nextTo).build();
            Optional<DateTime> nextTime = this.scheduleStrategies.nextTime(ctx.trigger());
            if (nextTime.isPresent()) {
                if (nextTo.isBefore((ReadableInstant)now)) {
                    LOG.trace("Set nextTime to <{}> to catch up faster - calculated nextTime was <{}>", (Object)now, (Object)nextTime.get());
                    return JobTriggerUpdate.withNextTimeAndData(now, newData);
                }
                if (nextTo.isBefore((ReadableInstant)nextTime.get())) {
                    LOG.trace("Set nextTime to <{}> because it's closer to the timerange time - calculated nextTime was <{}>", (Object)nextTo, (Object)nextTime.get());
                    return JobTriggerUpdate.withNextTimeAndData(nextTo, newData);
                }
                LOG.trace("Set nextTime to <{}>", (Object)nextTime.get());
                return JobTriggerUpdate.withNextTimeAndData(nextTime.get(), newData);
            }
            LOG.trace("No nextTime for trigger <{}>", (Object)ctx.trigger().id());
            return JobTriggerUpdate.withoutNextTime();
        }
        catch (EventProcessorPreconditionException e) {
            if (e.getEventDefinition().isPresent()) {
                LOG.debug("Event processor <{}/{}> couldn't be executed because of a failed precondition (retry in {} ms)", new Object[]{e.getEventDefinition().get().title(), e.getEventDefinitionId(), 5000L});
            } else {
                LOG.debug("Event processor <{}> couldn't be executed because of a failed precondition (retry in {} ms)", (Object)e.getEventDefinitionId(), (Object)5000L);
            }
            return ctx.jobTriggerUpdates().retryIn(5000L, TimeUnit.MILLISECONDS);
        }
        catch (EventProcessorException e) {
            if (e.getEventDefinition().isPresent()) {
                LOG.error("Event processor <{}/{}> failed to execute: {} (retry in {} ms)", new Object[]{e.getEventDefinition().get().config().type(), e.getEventDefinitionId(), e.getMessage(), 5000L, e});
            } else {
                LOG.error("Event processor <{}> failed to execute: {} (retry in {} ms)", new Object[]{e.getEventDefinitionId(), e.getMessage(), 5000L, e});
            }
            if (e.isPermanent()) {
                LOG.error("Caught a permanent error, trigger <{}> will go into ERROR state - it will not be executed anymore and needs manual intervention! (event-definition-id: {} job-definition={}/{})", new Object[]{ctx.trigger().id(), e.getEventDefinitionId(), ctx.definition().id(), ctx.definition().title()});
                return JobTriggerUpdate.withError(ctx.trigger());
            }
            return ctx.jobTriggerUpdates().retryIn(5000L, TimeUnit.MILLISECONDS);
        }
        catch (Exception e) {
            LOG.error("Event processor <{}> failed to execute: parameters={} (retry in {} ms)", new Object[]{this.config.eventDefinitionId(), parameters, 5000L, e});
            return ctx.jobTriggerUpdates().retryIn(5000L, TimeUnit.MILLISECONDS);
        }
    }

    @JsonTypeName(value="event-processor-execution-v1")
    @JsonDeserialize(builder=Builder.class)
    @AutoValue
    public static abstract class Config
    implements JobDefinitionConfig {
        public static final String FIELD_EVENT_DEFINITION_ID = "event_definition_id";
        private static final String FIELD_PARAMETERS = "parameters";
        private static final String FIELD_PROCESSING_WINDOW_SIZE = "processing_window_size";
        private static final String FIELD_PROCESSING_HOP_SIZE = "processing_hop_size";
        private static final String FIELD_IS_CRON = "is_cron";

        @JsonProperty(value="event_definition_id")
        public abstract String eventDefinitionId();

        @JsonProperty(value="parameters")
        public abstract EventProcessorParametersWithTimerange parameters();

        @JsonProperty(value="processing_window_size")
        public abstract long processingWindowSize();

        @JsonProperty(value="processing_hop_size")
        public abstract long processingHopSize();

        @JsonProperty(value="is_cron")
        public abstract boolean isCron();

        public static Builder builder() {
            return Builder.create();
        }

        public abstract Builder toBuilder();

        public boolean hasEqualSchedule(Config other) {
            return this.processingWindowSize() == other.processingWindowSize() && this.processingHopSize() == other.processingHopSize();
        }

        @AutoValue.Builder
        public static abstract class Builder
        implements JobDefinitionConfig.Builder<Builder> {
            @JsonCreator
            public static Builder create() {
                return new AutoValue_EventProcessorExecutionJob_Config.Builder().type(EventProcessorExecutionJob.TYPE_NAME).isCron(false);
            }

            @JsonProperty(value="event_definition_id")
            public abstract Builder eventDefinitionId(String var1);

            @JsonProperty(value="parameters")
            public abstract Builder parameters(EventProcessorParametersWithTimerange var1);

            @JsonProperty(value="processing_window_size")
            public abstract Builder processingWindowSize(long var1);

            @JsonProperty(value="processing_hop_size")
            public abstract Builder processingHopSize(long var1);

            @JsonProperty(value="is_cron")
            public abstract Builder isCron(boolean var1);

            abstract Config autoBuild();

            public Config build() {
                this.type(EventProcessorExecutionJob.TYPE_NAME);
                return this.autoBuild();
            }
        }
    }

    @JsonTypeName(value="event-processor-execution-v1")
    @JsonDeserialize(builder=Builder.class)
    @AutoValue
    public static abstract class Data
    implements JobTriggerData {
        private static final String FIELD_TIMERANGE_FROM = "timerange_from";
        private static final String FIELD_TIMERANGE_TO = "timerange_to";

        @JsonProperty(value="timerange_from")
        public abstract DateTime timerangeFrom();

        @JsonProperty(value="timerange_to")
        public abstract DateTime timerangeTo();

        public static Data create(DateTime from, DateTime to) {
            Objects.requireNonNull(from, "from cannot be null");
            Objects.requireNonNull(to, "to cannot be null");
            Preconditions.checkArgument((boolean)from.isBefore((ReadableInstant)to), (Object)"from must be before to");
            return Data.builder().timerangeFrom(from).timerangeTo(to).build();
        }

        public static Builder builder() {
            return Builder.create();
        }

        public abstract Builder toBuilder();

        @AutoValue.Builder
        public static abstract class Builder
        implements JobTriggerData.Builder<Builder> {
            @JsonCreator
            public static Builder create() {
                return new AutoValue_EventProcessorExecutionJob_Data.Builder().type(EventProcessorExecutionJob.TYPE_NAME);
            }

            @JsonProperty(value="timerange_from")
            public abstract Builder timerangeFrom(DateTime var1);

            @JsonProperty(value="timerange_to")
            public abstract Builder timerangeTo(DateTime var1);

            abstract Data autoBuild();

            public Data build() {
                this.type(EventProcessorExecutionJob.TYPE_NAME);
                return this.autoBuild();
            }
        }
    }

    public static interface Factory
    extends Job.Factory<EventProcessorExecutionJob> {
        @Override
        public EventProcessorExecutionJob create(JobDefinitionDto var1);
    }
}

