/*
 * Decompiled with CFR 0.152.
 */
package org.graylog.events.processor.aggregation;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.primitives.Ints;
import com.google.inject.assistedinject.Assisted;
import jakarta.inject.Inject;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import org.graylog.events.conditions.BooleanNumberConditionsVisitor;
import org.graylog.events.event.Event;
import org.graylog.events.event.EventFactory;
import org.graylog.events.event.EventOriginContext;
import org.graylog.events.event.EventReplayInfo;
import org.graylog.events.event.EventWithContext;
import org.graylog.events.processor.DBEventProcessorStateService;
import org.graylog.events.processor.EventConsumer;
import org.graylog.events.processor.EventDefinition;
import org.graylog.events.processor.EventProcessor;
import org.graylog.events.processor.EventProcessorDependencyCheck;
import org.graylog.events.processor.EventProcessorException;
import org.graylog.events.processor.EventProcessorParameters;
import org.graylog.events.processor.EventProcessorPreconditionException;
import org.graylog.events.processor.EventStreamService;
import org.graylog.events.processor.aggregation.AggregationEventProcessorConfig;
import org.graylog.events.processor.aggregation.AggregationEventProcessorParameters;
import org.graylog.events.processor.aggregation.AggregationKeyResult;
import org.graylog.events.processor.aggregation.AggregationResult;
import org.graylog.events.processor.aggregation.AggregationSearch;
import org.graylog.events.processor.aggregation.AggregationSeriesValue;
import org.graylog.events.processor.aggregation.EventQuerySearchTypeSupplier;
import org.graylog.events.search.MoreSearch;
import org.graylog.plugins.views.search.Parameter;
import org.graylog.plugins.views.search.SearchType;
import org.graylog.plugins.views.search.elasticsearch.ElasticsearchQueryString;
import org.graylog.plugins.views.search.errors.ParameterExpansionError;
import org.graylog.plugins.views.search.errors.SearchException;
import org.graylog.plugins.views.search.rest.PermittedStreams;
import org.graylog.plugins.views.search.searchtypes.pivot.HasField;
import org.graylog.plugins.views.search.searchtypes.pivot.SeriesSpec;
import org.graylog.plugins.views.search.searchtypes.pivot.series.HasOptionalField;
import org.graylog2.indexer.ElasticsearchException;
import org.graylog2.indexer.messages.Messages;
import org.graylog2.indexer.results.ResultMessage;
import org.graylog2.notifications.Notification;
import org.graylog2.notifications.NotificationService;
import org.graylog2.plugin.Message;
import org.graylog2.plugin.MessageFactory;
import org.graylog2.plugin.MessageSummary;
import org.graylog2.plugin.indexer.searches.timeranges.AbsoluteRange;
import org.joda.time.DateTime;
import org.joda.time.DateTimeZone;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class AggregationEventProcessor
implements EventProcessor {
    private static final Logger LOG = LoggerFactory.getLogger(AggregationEventProcessor.class);
    private final EventDefinition eventDefinition;
    private final AggregationEventProcessorConfig config;
    private final AggregationSearch.Factory aggregationSearchFactory;
    private final EventProcessorDependencyCheck dependencyCheck;
    private final DBEventProcessorStateService stateService;
    private final MoreSearch moreSearch;
    private final EventStreamService eventStreamService;
    private final Messages messages;
    private final NotificationService notificationService;
    private final PermittedStreams permittedStreams;
    private final Set<EventQuerySearchTypeSupplier> eventQueryModifiers;
    private final MessageFactory messageFactory;

    @Inject
    public AggregationEventProcessor(@Assisted EventDefinition eventDefinition, AggregationSearch.Factory aggregationSearchFactory, EventProcessorDependencyCheck dependencyCheck, DBEventProcessorStateService stateService, MoreSearch moreSearch, EventStreamService eventStreamService, Messages messages, NotificationService notificationService, PermittedStreams permittedStreams, Set<EventQuerySearchTypeSupplier> eventQueryModifiers, MessageFactory messageFactory) {
        this.eventDefinition = eventDefinition;
        this.config = (AggregationEventProcessorConfig)eventDefinition.config();
        this.aggregationSearchFactory = aggregationSearchFactory;
        this.dependencyCheck = dependencyCheck;
        this.stateService = stateService;
        this.moreSearch = moreSearch;
        this.eventStreamService = eventStreamService;
        this.messages = messages;
        this.notificationService = notificationService;
        this.permittedStreams = permittedStreams;
        this.eventQueryModifiers = eventQueryModifiers;
        this.messageFactory = messageFactory;
    }

    @Override
    public void createEvents(EventFactory eventFactory, EventProcessorParameters processorParameters, EventConsumer<List<EventWithContext>> eventsConsumer) throws EventProcessorException {
        AggregationEventProcessorParameters parameters = (AggregationEventProcessorParameters)processorParameters;
        if (!this.dependencyCheck.hasMessagesIndexedUpTo(parameters.timerange())) {
            String msg = String.format(Locale.ROOT, "Couldn't run aggregation <%s/%s> for timerange <%s to %s> because required messages haven't been indexed, yet.", this.eventDefinition.title(), this.eventDefinition.id(), parameters.timerange().getFrom(), parameters.timerange().getTo());
            throw new EventProcessorPreconditionException(msg, this.eventDefinition);
        }
        LOG.debug("Creating events for config={} parameters={}", (Object)this.config, (Object)parameters);
        try {
            if (this.config.series().isEmpty()) {
                this.filterSearch(eventFactory, parameters, eventsConsumer);
            } else {
                this.aggregatedSearch(eventFactory, parameters, eventsConsumer);
            }
        }
        catch (SearchException e) {
            if (e.error() instanceof ParameterExpansionError) {
                String msg = String.format(Locale.ROOT, "Couldn't run aggregation <%s/%s>  because parameters failed to expand: %s", this.eventDefinition.title(), this.eventDefinition.id(), e.error().description());
                LOG.error(msg);
                throw new EventProcessorPreconditionException(msg, this.eventDefinition, e);
            }
        }
        catch (ElasticsearchException e) {
            String msg = String.format(Locale.ROOT, "Couldn't run aggregation <%s/%s> because of search error: %s", this.eventDefinition.title(), this.eventDefinition.id(), e.getMessage());
            LOG.error(msg);
            throw new EventProcessorPreconditionException(msg, this.eventDefinition, e);
        }
        this.stateService.setState(this.eventDefinition.id(), parameters.timerange().getFrom(), parameters.timerange().getTo());
    }

    @Override
    public void sourceMessagesForEvent(Event event, Consumer<List<MessageSummary>> messageConsumer, long limit) throws EventProcessorException {
        if (this.config.series().isEmpty()) {
            if (limit <= 0L) {
                return;
            }
            EventOriginContext.ESEventOriginContext esContext = EventOriginContext.parseESContext(event.getOriginContext()).orElseThrow(() -> new EventProcessorException("Failed to parse origin context", false, this.eventDefinition));
            try {
                ResultMessage message = this.messages.get(esContext.messageId(), esContext.indexName());
                messageConsumer.accept(Lists.newArrayList((Object[])new MessageSummary[]{new MessageSummary(message.getIndex(), message.getMessage())}));
            }
            catch (IOException e) {
                throw new EventProcessorException("Failed to query origin context message", false, this.eventDefinition, e);
            }
        } else {
            AtomicLong msgCount = new AtomicLong(0L);
            MoreSearch.ScrollCallback callback = (messages, continueScrolling) -> {
                ArrayList summaries = Lists.newArrayList();
                for (ResultMessage resultMessage : messages) {
                    if (msgCount.incrementAndGet() > limit) {
                        continueScrolling.set(false);
                        break;
                    }
                    Message msg = resultMessage.getMessage();
                    summaries.add(new MessageSummary(resultMessage.getIndex(), msg));
                }
                messageConsumer.accept(summaries);
            };
            ElasticsearchQueryString scrollQueryString = ElasticsearchQueryString.of(this.config.query());
            scrollQueryString = scrollQueryString.concatenate(this.groupByQueryString(event));
            LOG.debug("scrollQueryString: {}", (Object)scrollQueryString);
            AbsoluteRange timeRange = AbsoluteRange.create(event.getTimerangeStart(), event.getTimerangeEnd());
            this.moreSearch.scrollQuery(scrollQueryString.queryString(), (Set<String>)this.config.streams(), this.config.filters(), (Set<Parameter>)this.config.queryParameters(), timeRange, Math.min(500, Ints.saturatedCast((long)limit)), callback);
        }
    }

    private ElasticsearchQueryString groupByQueryString(Event event) {
        ElasticsearchQueryString result = ElasticsearchQueryString.empty();
        if (!this.config.groupBy().isEmpty()) {
            for (String key : event.getGroupByFields().keySet()) {
                String value = event.getGroupByFields().get(key);
                String query = key + ":\"" + MoreSearch.luceneEscape(value) + "\"";
                result = result.concatenate(ElasticsearchQueryString.of(query));
            }
        }
        return result;
    }

    private Set<String> getStreams(AggregationEventProcessorParameters parameters) {
        if (parameters.streams().isEmpty()) {
            HashSet<String> configStreams = new HashSet<String>((Collection<String>)this.config.streams());
            if (!this.config.streamCategories().isEmpty()) {
                configStreams.addAll((Collection<String>)this.permittedStreams.loadWithCategories((Collection<String>)this.config.streamCategories(), streamId -> true));
            }
            return configStreams;
        }
        return parameters.streams();
    }

    private void filterSearch(EventFactory eventFactory, AggregationEventProcessorParameters parameters, EventConsumer<List<EventWithContext>> eventsConsumer) throws EventProcessorException {
        Set<String> streams = this.getStreams(parameters);
        if (streams.isEmpty()) {
            streams = new HashSet<String>((Collection<String>)this.permittedStreams.loadAllMessageStreams(streamId -> true));
        }
        AtomicInteger messageCount = new AtomicInteger(0);
        MoreSearch.ScrollCallback callback = (messages, continueScrolling) -> {
            ImmutableList.Builder eventsWithContext = ImmutableList.builder();
            for (ResultMessage resultMessage : messages) {
                Message msg = resultMessage.getMessage();
                Event event = eventFactory.createEvent(this.eventDefinition, msg.getTimestamp(), this.eventDefinition.title());
                event.setOriginContext(EventOriginContext.elasticsearchMessage(resultMessage.getIndex(), msg.getId()));
                this.eventStreamService.buildEventSourceStreams(this.getStreams(parameters), (Set<String>)ImmutableSet.copyOf(msg.getStreamIds())).forEach(event::addSourceStream);
                event.setReplayInfo(EventReplayInfo.builder().timerangeStart(parameters.timerange().getFrom()).timerangeEnd(parameters.timerange().getTo()).query(this.config.query()).streams((Set<String>)event.getSourceStreams()).filters(this.config.filters()).build());
                eventsWithContext.add((Object)EventWithContext.create(event, msg));
                if (this.config.eventLimit() == 0 || messageCount.incrementAndGet() < this.config.eventLimit()) continue;
                eventsConsumer.accept((List<EventWithContext>)eventsWithContext.build());
                throw new EventLimitReachedException();
            }
            eventsConsumer.accept((List<EventWithContext>)eventsWithContext.build());
        };
        try {
            this.moreSearch.scrollQuery(this.config.query(), streams, this.config.filters(), (Set<Parameter>)this.config.queryParameters(), parameters.timerange(), parameters.batchSize(), callback);
        }
        catch (EventLimitReachedException e) {
            this.notificationService.publishIfFirst(this.notificationService.buildNow().addType(Notification.Type.EVENT_LIMIT_REACHED).addKey(this.eventDefinition.id()).addDetail("event_definition_title", this.eventDefinition.title()).addDetail("event_definition_id", this.eventDefinition.id()).addDetail("event_limit", this.config.eventLimit()).addSeverity(Notification.Severity.NORMAL));
            LOG.debug("Event limit reached at {} for '{}/{}' event definition.", new Object[]{this.config.eventLimit(), this.eventDefinition.title(), this.eventDefinition.id()});
        }
    }

    private void aggregatedSearch(EventFactory eventFactory, AggregationEventProcessorParameters parameters, EventConsumer<List<EventWithContext>> eventsConsumer) throws EventProcessorException {
        List<SearchType> additionalSearchTypes;
        AggregationSearch.User owner = new AggregationSearch.User("event-processor-aggregation-v1-" + this.eventDefinition.id(), DateTimeZone.UTC);
        AggregationSearch search = this.aggregationSearchFactory.create(this.config, parameters, owner, this.eventDefinition, additionalSearchTypes = this.eventQueryModifiers.stream().flatMap(e -> e.additionalSearchTypes(this.eventDefinition).stream()).toList());
        AggregationResult result = search.doSearch();
        if (result.keyResults().isEmpty()) {
            LOG.debug("Aggregated search returned empty result set.");
            return;
        }
        LOG.debug("Got {} (total-aggregated-messages={}) results.", (Object)result.keyResults().size(), (Object)result.totalAggregatedMessages());
        eventsConsumer.accept((List<EventWithContext>)this.eventsFromAggregationResult(eventFactory, parameters, result));
    }

    private boolean satisfiesConditions(AggregationKeyResult keyResult) {
        ImmutableMap numberReferences = (ImmutableMap)keyResult.seriesValues().stream().map(seriesValue -> Maps.immutableEntry((Object)seriesValue.series().id(), (Object)seriesValue.value())).collect(ImmutableMap.toImmutableMap(Map.Entry::getKey, Map.Entry::getValue));
        if (this.config.conditions().isPresent()) {
            return this.config.conditions().get().expression().map(expr -> (Boolean)expr.accept(new BooleanNumberConditionsVisitor((Map<String, Double>)numberReferences))).orElse(true);
        }
        return true;
    }

    @VisibleForTesting
    ImmutableList<EventWithContext> eventsFromAggregationResult(EventFactory eventFactory, AggregationEventProcessorParameters parameters, AggregationResult result) throws EventProcessorException {
        ImmutableList.Builder eventsWithContext = ImmutableList.builder();
        Set<String> sourceStreams = this.eventStreamService.buildEventSourceStreams(this.getStreams(parameters), result.sourceStreams());
        for (AggregationKeyResult keyResult : result.keyResults()) {
            if (!this.satisfiesConditions(keyResult)) {
                LOG.debug("Skipping result <{}> because the conditions <{}> don't match", (Object)keyResult, this.config.conditions());
                continue;
            }
            String keyString = String.join((CharSequence)"|", keyResult.key());
            String eventMessage = this.createEventMessageString(keyString, keyResult);
            DateTime eventTime = keyResult.timestamp().orElse(result.effectiveTimerange().to());
            Event event = eventFactory.createEvent(this.eventDefinition, eventTime, eventMessage);
            event.setTimerangeStart(keyResult.timestamp().map(t -> t.minus(this.config.searchWithinMs())).orElse(parameters.timerange().getFrom()));
            event.setTimerangeEnd(keyResult.timestamp().orElse(parameters.timerange().getTo()));
            event.setReplayInfo(EventReplayInfo.builder().timerangeStart(event.getTimerangeStart()).timerangeEnd(event.getTimerangeEnd()).query(this.config.query()).streams(sourceStreams).filters(this.config.filters()).build());
            sourceStreams.forEach(event::addSourceStream);
            HashMap<String, Object> fields = new HashMap<String, Object>();
            for (int i = 0; i < this.config.groupBy().size(); ++i) {
                try {
                    fields.put(this.config.groupBy().get(i), keyResult.key().get(i));
                    continue;
                }
                catch (IndexOutOfBoundsException e2) {
                    throw new EventProcessorException("Couldn't create events for: " + this.eventDefinition.title() + " (possibly due to non-existing grouping fields)", false, this.eventDefinition.id(), this.eventDefinition, e2);
                }
            }
            event.setGroupByFields(fields.entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue().toString())));
            for (AggregationSeriesValue seriesValue : keyResult.seriesValues()) {
                String function = seriesValue.series().type().toLowerCase(Locale.ROOT);
                Optional<String> field = this.fieldFromSeries(seriesValue.series());
                String fieldName = field.map(f -> String.format(Locale.ROOT, "aggregation_value_%s_%s", function, f)).orElseGet(() -> String.format(Locale.ROOT, "aggregation_value_%s", function));
                fields.put(fieldName, seriesValue.value());
            }
            fields.put("aggregation_key", keyString);
            Message message = this.messageFactory.createMessage(eventMessage, "", result.effectiveTimerange().to());
            message.addFields(fields);
            Map<String, Object> eventModifierState = this.eventQueryModifiers.stream().flatMap(modifier -> modifier.eventModifierData(result.additionalResults()).entrySet().stream()).collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
            LOG.debug("Creating event {}/{} - {} {} ({})", new Object[]{this.eventDefinition.title(), this.eventDefinition.id(), keyResult.key(), this.seriesString(keyResult), fields});
            eventsWithContext.add((Object)EventWithContext.builder().event(event).messageContext(message).eventModifierState(eventModifierState).build());
        }
        return eventsWithContext.build();
    }

    private Optional<String> fieldFromSeries(SeriesSpec series) {
        if (series instanceof HasField) {
            HasField hasField = (HasField)((Object)series);
            return Optional.ofNullable(hasField.field());
        }
        if (series instanceof HasOptionalField) {
            HasOptionalField hasOptionalField = (HasOptionalField)((Object)series);
            return hasOptionalField.field();
        }
        return Optional.empty();
    }

    private String createEventMessageString(String keyString, AggregationKeyResult keyResult) {
        StringBuilder builder = new StringBuilder(this.eventDefinition.title()).append(": ");
        if (!keyResult.key().isEmpty()) {
            builder.append(keyString).append(" - ");
        }
        builder.append(this.seriesString(keyResult));
        return builder.toString().trim();
    }

    private String seriesString(AggregationKeyResult keyResult) {
        return keyResult.seriesValues().stream().map(this::formatSeriesValue).collect(Collectors.joining(" "));
    }

    private String formatSeriesValue(AggregationSeriesValue seriesValue) {
        return String.format(Locale.ROOT, "%s=%s", seriesValue.series().literal(), seriesValue.value());
    }

    private static class EventLimitReachedException
    extends RuntimeException {
        private EventLimitReachedException() {
        }
    }

    public static interface Factory
    extends EventProcessor.Factory<AggregationEventProcessor> {
        @Override
        public AggregationEventProcessor create(EventDefinition var1);
    }
}

