/*
 * Decompiled with CFR 0.152.
 */
package org.graylog.events.processor.aggregation;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import com.google.common.primitives.Ints;
import com.google.inject.assistedinject.Assisted;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import javax.inject.Inject;
import org.apache.logging.log4j.util.Strings;
import org.graylog.events.conditions.BooleanNumberConditionsVisitor;
import org.graylog.events.event.Event;
import org.graylog.events.event.EventFactory;
import org.graylog.events.event.EventOriginContext;
import org.graylog.events.event.EventWithContext;
import org.graylog.events.processor.DBEventProcessorStateService;
import org.graylog.events.processor.EventConsumer;
import org.graylog.events.processor.EventDefinition;
import org.graylog.events.processor.EventProcessor;
import org.graylog.events.processor.EventProcessorDependencyCheck;
import org.graylog.events.processor.EventProcessorException;
import org.graylog.events.processor.EventProcessorParameters;
import org.graylog.events.processor.EventProcessorPreconditionException;
import org.graylog.events.processor.aggregation.AggregationEventProcessorConfig;
import org.graylog.events.processor.aggregation.AggregationEventProcessorParameters;
import org.graylog.events.processor.aggregation.AggregationKeyResult;
import org.graylog.events.processor.aggregation.AggregationResult;
import org.graylog.events.processor.aggregation.AggregationSearch;
import org.graylog.events.processor.aggregation.AggregationSeries;
import org.graylog.events.processor.aggregation.AggregationSeriesValue;
import org.graylog.events.search.MoreSearch;
import org.graylog.plugins.views.search.Parameter;
import org.graylog.plugins.views.search.errors.ParameterExpansionError;
import org.graylog.plugins.views.search.errors.SearchException;
import org.graylog2.indexer.messages.Messages;
import org.graylog2.indexer.results.ResultMessage;
import org.graylog2.plugin.Message;
import org.graylog2.plugin.MessageSummary;
import org.graylog2.plugin.database.Persisted;
import org.graylog2.plugin.indexer.searches.timeranges.AbsoluteRange;
import org.graylog2.streams.StreamImpl;
import org.graylog2.streams.StreamService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class AggregationEventProcessor
implements EventProcessor {
    private static final Logger LOG = LoggerFactory.getLogger(AggregationEventProcessor.class);
    private final EventDefinition eventDefinition;
    private final AggregationEventProcessorConfig config;
    private final AggregationSearch.Factory aggregationSearchFactory;
    private final EventProcessorDependencyCheck dependencyCheck;
    private final DBEventProcessorStateService stateService;
    private final MoreSearch moreSearch;
    private final StreamService streamService;
    private final Messages messages;

    @Inject
    public AggregationEventProcessor(@Assisted EventDefinition eventDefinition, AggregationSearch.Factory aggregationSearchFactory, EventProcessorDependencyCheck dependencyCheck, DBEventProcessorStateService stateService, MoreSearch moreSearch, StreamService streamService, Messages messages) {
        this.eventDefinition = eventDefinition;
        this.config = (AggregationEventProcessorConfig)eventDefinition.config();
        this.aggregationSearchFactory = aggregationSearchFactory;
        this.dependencyCheck = dependencyCheck;
        this.stateService = stateService;
        this.moreSearch = moreSearch;
        this.streamService = streamService;
        this.messages = messages;
    }

    @Override
    public void createEvents(EventFactory eventFactory, EventProcessorParameters processorParameters, EventConsumer<List<EventWithContext>> eventsConsumer) throws EventProcessorException {
        AggregationEventProcessorParameters parameters;
        block5: {
            parameters = (AggregationEventProcessorParameters)processorParameters;
            if (!this.dependencyCheck.hasMessagesIndexedUpTo(parameters.timerange().getTo())) {
                String msg = String.format(Locale.ROOT, "Couldn't run aggregation <%s/%s> for timerange <%s to %s> because required messages haven't been indexed, yet.", this.eventDefinition.title(), this.eventDefinition.id(), parameters.timerange().getFrom(), parameters.timerange().getTo());
                throw new EventProcessorPreconditionException(msg, this.eventDefinition);
            }
            LOG.debug("Creating events for config={} parameters={}", (Object)this.config, (Object)parameters);
            try {
                if (this.config.series().isEmpty()) {
                    this.filterSearch(eventFactory, parameters, eventsConsumer);
                } else {
                    this.aggregatedSearch(eventFactory, parameters, eventsConsumer);
                }
            }
            catch (SearchException e) {
                if (!(e.error() instanceof ParameterExpansionError)) break block5;
                String msg = String.format(Locale.ROOT, "Couldn't run aggregation <%s/%s>  because parameters failed to expand: %s", this.eventDefinition.title(), this.eventDefinition.id(), e.error().description());
                LOG.error(msg);
                throw new EventProcessorPreconditionException(msg, this.eventDefinition, e);
            }
        }
        this.stateService.setState(this.eventDefinition.id(), parameters.timerange().getFrom(), parameters.timerange().getTo());
    }

    @Override
    public void sourceMessagesForEvent(Event event, Consumer<List<MessageSummary>> messageConsumer, long limit) throws EventProcessorException {
        if (this.config.series().isEmpty()) {
            if (limit <= 0L) {
                return;
            }
            EventOriginContext.ESEventOriginContext esContext = EventOriginContext.parseESContext(event.getOriginContext()).orElseThrow(() -> new EventProcessorException("Failed to parse origin context", false, this.eventDefinition));
            try {
                ResultMessage message = this.messages.get(esContext.messageId(), esContext.indexName());
                messageConsumer.accept(Lists.newArrayList((Object[])new MessageSummary[]{new MessageSummary(message.getIndex(), message.getMessage())}));
            }
            catch (IOException e) {
                throw new EventProcessorException("Failed to query origin context message", false, this.eventDefinition, e);
            }
        } else {
            AtomicLong msgCount = new AtomicLong(0L);
            MoreSearch.ScrollCallback callback = (messages, continueScrolling) -> {
                ArrayList summaries = Lists.newArrayList();
                for (ResultMessage resultMessage : messages) {
                    if (msgCount.incrementAndGet() > limit) {
                        continueScrolling.set(false);
                        break;
                    }
                    Message msg = resultMessage.getMessage();
                    summaries.add(new MessageSummary(resultMessage.getIndex(), msg));
                }
                messageConsumer.accept(summaries);
            };
            AbsoluteRange timeRange = AbsoluteRange.create(event.getTimerangeStart(), event.getTimerangeEnd());
            this.moreSearch.scrollQuery(this.config.query(), (Set<String>)this.config.streams(), (Set<Parameter>)this.config.queryParameters(), timeRange, Math.min(500, Ints.saturatedCast((long)limit)), callback);
        }
    }

    private Set<String> getStreams(AggregationEventProcessorParameters parameters) {
        return parameters.streams().isEmpty() ? this.config.streams() : parameters.streams();
    }

    private void filterSearch(EventFactory eventFactory, AggregationEventProcessorParameters parameters, EventConsumer<List<EventWithContext>> eventsConsumer) throws EventProcessorException {
        Set<String> streams = this.getStreams(parameters);
        MoreSearch.ScrollCallback callback = (messages, continueScrolling) -> {
            ImmutableList.Builder eventsWithContext = ImmutableList.builder();
            for (ResultMessage resultMessage : messages) {
                Message msg = resultMessage.getMessage();
                Event event = eventFactory.createEvent(this.eventDefinition, msg.getTimestamp(), this.eventDefinition.title());
                event.setOriginContext(EventOriginContext.elasticsearchMessage(resultMessage.getIndex(), msg.getId()));
                msg.getStreamIds().stream().filter(stream -> this.getStreams(parameters).contains(stream)).forEach(event::addSourceStream);
                eventsWithContext.add((Object)EventWithContext.create(event, msg));
            }
            eventsConsumer.accept((List<EventWithContext>)eventsWithContext.build());
        };
        this.moreSearch.scrollQuery(this.config.query(), streams, (Set<Parameter>)this.config.queryParameters(), parameters.timerange(), parameters.batchSize(), callback);
    }

    private void aggregatedSearch(EventFactory eventFactory, AggregationEventProcessorParameters parameters, EventConsumer<List<EventWithContext>> eventsConsumer) throws EventProcessorException {
        String owner = "event-processor-aggregation-v1-" + this.eventDefinition.id();
        AggregationSearch search = this.aggregationSearchFactory.create(this.config, parameters, owner, this.eventDefinition);
        AggregationResult result = search.doSearch();
        if (result.keyResults().isEmpty()) {
            LOG.debug("Aggregated search returned empty result set.");
            return;
        }
        LOG.debug("Got {} (total-aggregated-messages={}) results.", (Object)result.keyResults().size(), (Object)result.totalAggregatedMessages());
        eventsConsumer.accept((List<EventWithContext>)this.eventsFromAggregationResult(eventFactory, parameters, result));
    }

    private boolean satisfiesConditions(AggregationKeyResult keyResult) {
        ImmutableMap numberReferences = (ImmutableMap)keyResult.seriesValues().stream().map(seriesValue -> Maps.immutableEntry((Object)seriesValue.series().id(), (Object)seriesValue.value())).collect(ImmutableMap.toImmutableMap(Map.Entry::getKey, Map.Entry::getValue));
        if (this.config.conditions().isPresent()) {
            return this.config.conditions().get().expression().map(expr -> (Boolean)expr.accept(new BooleanNumberConditionsVisitor((Map<String, Double>)numberReferences))).orElse(true);
        }
        return true;
    }

    @VisibleForTesting
    ImmutableList<EventWithContext> eventsFromAggregationResult(EventFactory eventFactory, AggregationEventProcessorParameters parameters, AggregationResult result) {
        ImmutableList.Builder eventsWithContext = ImmutableList.builder();
        Set<String> searchStreams = this.getStreams(parameters);
        Object sourceStreams = searchStreams.isEmpty() && result.sourceStreams().isEmpty() ? this.streamService.loadAll().stream().map(Persisted::getId).filter(streamId -> !StreamImpl.DEFAULT_EVENT_STREAM_IDS.contains(streamId)).collect(Collectors.toSet()) : (searchStreams.isEmpty() ? result.sourceStreams() : (result.sourceStreams().isEmpty() ? searchStreams : Sets.intersection(searchStreams, result.sourceStreams())));
        for (AggregationKeyResult keyResult : result.keyResults()) {
            if (!this.satisfiesConditions(keyResult)) {
                LOG.debug("Skipping result <{}> because the conditions <{}> don't match", (Object)keyResult, this.config.conditions());
                continue;
            }
            String keyString = Strings.join(keyResult.key(), (char)'|');
            String eventMessage = this.createEventMessageString(keyString, keyResult);
            Event event = eventFactory.createEvent(this.eventDefinition, result.effectiveTimerange().to(), eventMessage);
            event.setTimerangeStart(parameters.timerange().getFrom());
            event.setTimerangeEnd(parameters.timerange().getTo());
            sourceStreams.forEach(event::addSourceStream);
            HashMap<String, Object> fields = new HashMap<String, Object>();
            for (int i = 0; i < this.config.groupBy().size(); ++i) {
                fields.put(this.config.groupBy().get(i), keyResult.key().get(i));
            }
            for (AggregationSeriesValue seriesValue : keyResult.seriesValues()) {
                String function = seriesValue.series().function().toString().toLowerCase(Locale.ROOT);
                Optional<String> field = seriesValue.series().field();
                String fieldName = field.isPresent() ? String.format(Locale.ROOT, "aggregation_value_%s_%s", function, field.get()) : String.format(Locale.ROOT, "aggregation_value_%s", function);
                fields.put(fieldName, seriesValue.value());
            }
            fields.put("aggregation_key", keyString);
            Message message = new Message(eventMessage, "", result.effectiveTimerange().to());
            message.addFields(fields);
            LOG.debug("Creating event {}/{} - {} {} ({})", new Object[]{this.eventDefinition.title(), this.eventDefinition.id(), keyResult.key(), this.seriesString(keyResult), fields});
            eventsWithContext.add((Object)EventWithContext.create(event, message));
        }
        return eventsWithContext.build();
    }

    private String createEventMessageString(String keyString, AggregationKeyResult keyResult) {
        StringBuilder builder = new StringBuilder(this.eventDefinition.title()).append(": ");
        if (!keyResult.key().isEmpty()) {
            builder.append(keyString).append(" - ");
        }
        for (AggregationSeriesValue seriesValue : keyResult.seriesValues()) {
            AggregationSeries series = seriesValue.series();
            String functionName = series.function().toString().toLowerCase(Locale.ROOT);
            String functionField = series.field().orElse("");
            builder.append(functionName).append("(").append(functionField).append(")");
            builder.append("=").append(seriesValue.value());
            builder.append(" ");
        }
        return builder.toString().trim();
    }

    private String seriesString(AggregationKeyResult keyResult) {
        return keyResult.seriesValues().stream().map(seriesValue -> String.format(Locale.ROOT, "%s(%s)=%s", seriesValue.series().function().toString().toLowerCase(Locale.ROOT), seriesValue.series().field().orElse(""), seriesValue.value())).collect(Collectors.joining(" "));
    }

    public static interface Factory
    extends EventProcessor.Factory<AggregationEventProcessor> {
        @Override
        public AggregationEventProcessor create(EventDefinition var1);
    }
}

