/*
 * Decompiled with CFR 0.152.
 */
package org.graylog.events.processor.aggregation;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.MoreObjects;
import com.google.common.base.Strings;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import com.google.common.util.concurrent.Uninterruptibles;
import com.google.inject.assistedinject.Assisted;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Locale;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;
import javax.inject.Inject;
import org.graylog.events.configuration.EventsConfigurationProvider;
import org.graylog.events.processor.EventDefinition;
import org.graylog.events.processor.EventProcessorException;
import org.graylog.events.processor.aggregation.AggregationEventProcessorConfig;
import org.graylog.events.processor.aggregation.AggregationEventProcessorParameters;
import org.graylog.events.processor.aggregation.AggregationKeyResult;
import org.graylog.events.processor.aggregation.AggregationResult;
import org.graylog.events.processor.aggregation.AggregationSearch;
import org.graylog.events.processor.aggregation.AggregationSeries;
import org.graylog.events.processor.aggregation.AggregationSeriesValue;
import org.graylog.events.search.MoreSearch;
import org.graylog.plugins.views.search.Filter;
import org.graylog.plugins.views.search.Query;
import org.graylog.plugins.views.search.QueryResult;
import org.graylog.plugins.views.search.Search;
import org.graylog.plugins.views.search.SearchJob;
import org.graylog.plugins.views.search.SearchType;
import org.graylog.plugins.views.search.db.SearchJobService;
import org.graylog.plugins.views.search.elasticsearch.ElasticsearchQueryString;
import org.graylog.plugins.views.search.engine.QueryEngine;
import org.graylog.plugins.views.search.errors.EmptyParameterError;
import org.graylog.plugins.views.search.errors.QueryError;
import org.graylog.plugins.views.search.errors.SearchError;
import org.graylog.plugins.views.search.filter.OrFilter;
import org.graylog.plugins.views.search.filter.StreamFilter;
import org.graylog.plugins.views.search.rest.PermittedStreams;
import org.graylog.plugins.views.search.searchtypes.pivot.BucketSpec;
import org.graylog.plugins.views.search.searchtypes.pivot.Pivot;
import org.graylog.plugins.views.search.searchtypes.pivot.PivotResult;
import org.graylog.plugins.views.search.searchtypes.pivot.SeriesSpec;
import org.graylog.plugins.views.search.searchtypes.pivot.buckets.DateRange;
import org.graylog.plugins.views.search.searchtypes.pivot.buckets.DateRangeBucket;
import org.graylog.plugins.views.search.searchtypes.pivot.buckets.Values;
import org.graylog.plugins.views.search.searchtypes.pivot.series.Count;
import org.graylog2.plugin.database.Persisted;
import org.graylog2.plugin.indexer.searches.timeranges.TimeRange;
import org.joda.time.DateTime;
import org.joda.time.DateTimeZone;
import org.joda.time.ReadableInstant;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class PivotAggregationSearch
implements AggregationSearch {
    private static final Logger LOG = LoggerFactory.getLogger(PivotAggregationSearch.class);
    private static final String QUERY_ID = "query-1";
    private static final String PIVOT_ID = "pivot-1";
    private static final String STREAMS_QUERY_ID = "streams-query-1";
    private static final String STREAMS_PIVOT_ID = "streams-pivot-1";
    private static final String STREAMS_PIVOT_COUNT_ID = "streams-pivot-count-1";
    private final AggregationEventProcessorConfig config;
    private final AggregationEventProcessorParameters parameters;
    private final String searchOwner;
    private final SearchJobService searchJobService;
    private final QueryEngine queryEngine;
    private final EventsConfigurationProvider configurationProvider;
    private final EventDefinition eventDefinition;
    private final MoreSearch moreSearch;
    private final PermittedStreams permittedStreams;

    @Inject
    public PivotAggregationSearch(@Assisted AggregationEventProcessorConfig config, @Assisted AggregationEventProcessorParameters parameters, @Assisted String searchOwner, @Assisted EventDefinition eventDefinition, SearchJobService searchJobService, QueryEngine queryEngine, EventsConfigurationProvider configProvider, MoreSearch moreSearch, PermittedStreams permittedStreams) {
        this.config = config;
        this.parameters = parameters;
        this.searchOwner = searchOwner;
        this.eventDefinition = eventDefinition;
        this.searchJobService = searchJobService;
        this.queryEngine = queryEngine;
        this.configurationProvider = configProvider;
        this.moreSearch = moreSearch;
        this.permittedStreams = permittedStreams;
    }

    private String metricName(AggregationSeries series) {
        return String.format(Locale.ROOT, "metric/%s/%s/%s", series.function().toString().toLowerCase(Locale.ROOT), series.field().orElse("<no-field>"), series.id());
    }

    @Override
    public AggregationResult doSearch() throws EventProcessorException {
        SearchJob searchJob = this.getSearchJob(this.parameters, this.searchOwner, this.config.searchWithinMs(), this.config.executeEveryMs());
        QueryResult queryResult = searchJob.results().get(QUERY_ID);
        QueryResult streamQueryResult = searchJob.results().get(STREAMS_QUERY_ID);
        Set aggregationErrors = (Set)MoreObjects.firstNonNull(queryResult.errors(), Collections.emptySet());
        Set streamErrors = (Set)MoreObjects.firstNonNull(streamQueryResult.errors(), Collections.emptySet());
        if (!aggregationErrors.isEmpty() || !streamErrors.isEmpty()) {
            Set errors = aggregationErrors.isEmpty() ? streamErrors : aggregationErrors;
            errors.forEach(error -> {
                if (error instanceof QueryError) {
                    String backtrace;
                    QueryError queryError = (QueryError)error;
                    String string = backtrace = queryError.backtrace() != null ? queryError.backtrace() : "";
                    if (error instanceof EmptyParameterError) {
                        LOG.debug("Aggregation search query <{}> with empty Parameter: {}\n{}", new Object[]{queryError.queryId(), queryError.description(), backtrace});
                    } else {
                        LOG.error("Aggregation search query <{}> returned an error: {}\n{}", new Object[]{queryError.queryId(), queryError.description(), backtrace});
                    }
                } else {
                    LOG.error("Aggregation search returned an error: {}", error);
                }
            });
            if (errors.stream().filter(e -> !(e instanceof EmptyParameterError)).count() <= 1L) {
                return AggregationResult.empty();
            }
            if (errors.size() > 1) {
                throw new EventProcessorException("Pivot search failed with multiple errors.", false, this.eventDefinition);
            }
            throw new EventProcessorException(((SearchError)errors.iterator().next()).description(), false, this.eventDefinition);
        }
        PivotResult pivotResult = (PivotResult)queryResult.searchTypes().get(PIVOT_ID);
        PivotResult streamsResult = (PivotResult)streamQueryResult.searchTypes().get(STREAMS_PIVOT_ID);
        return AggregationResult.builder().keyResults((List<AggregationKeyResult>)this.extractValues(pivotResult)).effectiveTimerange(pivotResult.effectiveTimerange()).totalAggregatedMessages(pivotResult.total()).sourceStreams((Set<String>)this.extractSourceStreams(streamsResult)).build();
    }

    private ImmutableSet<String> extractSourceStreams(PivotResult pivotResult) {
        return (ImmutableSet)pivotResult.rows().stream().filter(row -> "leaf".equals(row.source())).map(row -> (String)row.key().get(0)).collect(ImmutableSet.toImmutableSet());
    }

    @VisibleForTesting
    ImmutableList<AggregationKeyResult> extractValues(PivotResult pivotResult) throws EventProcessorException {
        ImmutableList.Builder results = ImmutableList.builder();
        for (PivotResult.Row row : pivotResult.rows()) {
            if (!"leaf".equals(row.source())) continue;
            if (row.key().size() == 0 || Strings.isNullOrEmpty((String)((String)row.key().get(0)))) {
                throw new EventProcessorException("Invalid row key! Expected at least the date range timestamp value: " + row.key().toString(), true, this.eventDefinition);
            }
            String timeKey = (String)row.key().get(0);
            ImmutableList groupKey = row.key().size() > 1 ? row.key().subList(1, row.key().size()) : ImmutableList.of();
            ImmutableList.Builder values = ImmutableList.builder();
            for (PivotResult.Value value : row.values()) {
                if (!"row-leaf".equals(value.source())) continue;
                for (AggregationSeries series : this.config.series()) {
                    if (value.key().isEmpty() || !((String)value.key().get(0)).equals(this.metricName(series))) continue;
                    Object maybeNumberValue = MoreObjects.firstNonNull((Object)value.value(), (Object)Double.NaN);
                    if (maybeNumberValue instanceof Number) {
                        double numberValue = ((Number)maybeNumberValue).doubleValue();
                        AggregationSeriesValue seriesValue = AggregationSeriesValue.builder().key((List<String>)groupKey).value(numberValue).series(series).build();
                        values.add((Object)seriesValue);
                        continue;
                    }
                    throw new IllegalStateException("Got unexpected non-number value for " + series.toString() + " " + row.toString() + " " + value.toString());
                }
            }
            DateTime resultTimestamp = DateTime.parse((String)timeKey).withZone(DateTimeZone.UTC);
            results.add((Object)AggregationKeyResult.builder().key((List<String>)groupKey).timestamp(resultTimestamp).seriesValues((List<AggregationSeriesValue>)values.build()).build());
        }
        return results.build();
    }

    private SearchJob getSearchJob(AggregationEventProcessorParameters parameters, String username, long searchWithinMs, long executeEveryMs) throws EventProcessorException {
        Search search = Search.builder().queries((ImmutableSet<Query>)ImmutableSet.of((Object)this.getAggregationQuery(parameters, searchWithinMs, executeEveryMs), (Object)this.getSourceStreamsQuery(parameters))).parameters(this.config.queryParameters()).build();
        search = search.addStreamsToQueriesWithoutStreams(() -> this.permittedStreams.load(streamId -> true));
        SearchJob searchJob = this.queryEngine.execute(this.searchJobService.create(search, username), Collections.emptySet());
        try {
            Uninterruptibles.getUninterruptibly(searchJob.getResultFuture(), (long)this.configurationProvider.get().eventsSearchTimeout(), (TimeUnit)TimeUnit.MILLISECONDS);
        }
        catch (ExecutionException e) {
            throw new EventProcessorException("Error executing search job: " + e.getMessage(), false, this.eventDefinition, e);
        }
        catch (TimeoutException e) {
            throw new EventProcessorException("Timeout while executing search job.", false, this.eventDefinition, e);
        }
        catch (Exception e) {
            throw new EventProcessorException("Unhandled exception in search job.", false, this.eventDefinition, e);
        }
        return searchJob;
    }

    private Query getSourceStreamsQuery(AggregationEventProcessorParameters parameters) {
        Pivot pivot = Pivot.builder().id(STREAMS_PIVOT_ID).rollup(true).rowGroups((List<BucketSpec>)ImmutableList.of((Object)((BucketSpec)Values.builder().limit(Integer.MAX_VALUE).field("streams").build()))).series((List<SeriesSpec>)ImmutableList.of((Object)Count.builder().id(STREAMS_PIVOT_COUNT_ID).build())).build();
        Set<SearchType> searchTypes = Collections.singleton(pivot);
        Query.Builder queryBuilder = Query.builder().id(STREAMS_QUERY_ID).searchTypes(searchTypes).query(ElasticsearchQueryString.of(this.config.query())).timerange(parameters.timerange());
        Set<String> streams = this.getStreams(parameters);
        if (!streams.isEmpty()) {
            queryBuilder.filter(this.filteringForStreamIds(streams));
        }
        return queryBuilder.build();
    }

    private Query getAggregationQuery(AggregationEventProcessorParameters parameters, long searchWithinMs, long executeEveryMs) {
        Pivot.Builder pivotBuilder = Pivot.builder().id(PIVOT_ID).rollup(true);
        ImmutableList series = (ImmutableList)this.config.series().stream().map(entry -> entry.function().toSeriesSpec(this.metricName((AggregationSeries)entry), entry.field().orElse(null))).collect(ImmutableList.toImmutableList());
        if (!series.isEmpty()) {
            pivotBuilder.series((List<SeriesSpec>)series);
        }
        DateRangeBucket dateRangeBucket = PivotAggregationSearch.buildDateRangeBuckets(parameters.timerange(), searchWithinMs, executeEveryMs);
        ArrayList<BucketSpec> groupBy = new ArrayList<BucketSpec>();
        groupBy.add(dateRangeBucket);
        if (!this.config.groupBy().isEmpty()) {
            groupBy.addAll(this.config.groupBy().stream().map(field -> (Values)Values.builder().limit(Integer.MAX_VALUE).field((String)field).build()).collect(Collectors.toList()));
        }
        pivotBuilder.rowGroups(groupBy);
        Set<SearchType> searchTypes = Collections.singleton(pivotBuilder.build());
        Query.Builder queryBuilder = Query.builder().id(QUERY_ID).searchTypes(searchTypes).query(ElasticsearchQueryString.of(this.config.query())).timerange(parameters.timerange());
        Set<String> streams = this.getStreams(parameters);
        if (!streams.isEmpty()) {
            queryBuilder.filter(this.filteringForStreamIds(streams));
        }
        return queryBuilder.build();
    }

    private Filter filteringForStreamIds(Set<String> streamIds) {
        Set streamFilters = streamIds.stream().map(StreamFilter::ofId).collect(Collectors.toSet());
        return ((OrFilter.Builder)OrFilter.builder().filters(streamFilters)).build();
    }

    private Set<String> getStreams(AggregationEventProcessorParameters parameters) {
        ImmutableSet<String> streamIds = parameters.streams().isEmpty() ? this.config.streams() : parameters.streams();
        Set<String> existingStreams = this.moreSearch.loadStreams((Set<String>)streamIds).stream().map(Persisted::getId).collect(Collectors.toSet());
        Set nonExistingStreams = streamIds.stream().filter(stream -> !existingStreams.contains(stream)).collect(Collectors.toSet());
        if (nonExistingStreams.size() != 0) {
            LOG.warn("Removing non-existing streams <{}> from event definition <{}>/<{}>", new Object[]{nonExistingStreams, this.eventDefinition.id(), this.eventDefinition.title()});
        }
        return existingStreams;
    }

    @VisibleForTesting
    static DateRangeBucket buildDateRangeBuckets(TimeRange timeRange, long searchWithinMs, long executeEveryMs) {
        DateTime to;
        ImmutableList.Builder ranges = ImmutableList.builder();
        DateTime from = timeRange.getFrom();
        do {
            to = from.plusSeconds((int)(searchWithinMs / 1000L));
            ranges.add((Object)DateRange.builder().from(from).to(to).build());
            from = from.plusSeconds((int)executeEveryMs / 1000);
        } while (to.isBefore((ReadableInstant)timeRange.getTo()));
        return (DateRangeBucket)DateRangeBucket.builder().field("timestamp").ranges((List<DateRange>)ranges.build()).build();
    }
}

