/*
 * Decompiled with CFR 0.152.
 */
package com.digitalpebble.stormcrawler.opensearch.persistence;

import com.digitalpebble.stormcrawler.Metadata;
import com.digitalpebble.stormcrawler.opensearch.persistence.AbstractSpout;
import com.digitalpebble.stormcrawler.util.ConfUtils;
import java.time.Instant;
import java.util.Calendar;
import java.util.Date;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import org.apache.commons.lang.StringUtils;
import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.task.TopologyContext;
import org.joda.time.format.ISODateTimeFormat;
import org.opensearch.action.ActionListener;
import org.opensearch.action.search.SearchRequest;
import org.opensearch.action.search.SearchResponse;
import org.opensearch.client.RequestOptions;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.index.query.BoolQueryBuilder;
import org.opensearch.index.query.QueryBuilder;
import org.opensearch.index.query.QueryBuilders;
import org.opensearch.search.SearchHit;
import org.opensearch.search.aggregations.AggregationBuilder;
import org.opensearch.search.aggregations.AggregationBuilders;
import org.opensearch.search.aggregations.Aggregations;
import org.opensearch.search.aggregations.BucketOrder;
import org.opensearch.search.aggregations.bucket.SingleBucketAggregation;
import org.opensearch.search.aggregations.bucket.sampler.DiversifiedAggregationBuilder;
import org.opensearch.search.aggregations.bucket.terms.Terms;
import org.opensearch.search.aggregations.bucket.terms.TermsAggregationBuilder;
import org.opensearch.search.aggregations.metrics.MinAggregationBuilder;
import org.opensearch.search.aggregations.metrics.TopHits;
import org.opensearch.search.aggregations.metrics.TopHitsAggregationBuilder;
import org.opensearch.search.builder.SearchSourceBuilder;
import org.opensearch.search.sort.FieldSortBuilder;
import org.opensearch.search.sort.SortBuilder;
import org.opensearch.search.sort.SortBuilders;
import org.opensearch.search.sort.SortOrder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class AggregationSpout
extends AbstractSpout
implements ActionListener<SearchResponse> {
    private static final Logger LOG = LoggerFactory.getLogger(AggregationSpout.class);
    private static final String StatusSampleParamName = "opensearch.status.sample";
    private static final String MostRecentDateIncreaseParamName = "opensearch.status.recentDate.increase";
    private static final String MostRecentDateMinGapParamName = "opensearch.status.recentDate.min.gap";
    private boolean sample = false;
    private int recentDateIncrease = -1;
    private int recentDateMinGap = -1;
    protected Set<String> currentBuckets;

    @Override
    public void open(Map<String, Object> stormConf, TopologyContext context, SpoutOutputCollector collector) {
        this.sample = ConfUtils.getBoolean(stormConf, (String)StatusSampleParamName, (boolean)this.sample);
        this.recentDateIncrease = ConfUtils.getInt(stormConf, (String)MostRecentDateIncreaseParamName, (int)this.recentDateIncrease);
        this.recentDateMinGap = ConfUtils.getInt(stormConf, (String)MostRecentDateMinGapParamName, (int)this.recentDateMinGap);
        super.open(stormConf, context, collector);
        this.currentBuckets = new HashSet<String>();
    }

    @Override
    protected void populateBuffer() {
        if (this.queryDate == null) {
            this.queryDate = new Date();
            this.lastTimeResetToNOW = Instant.now();
        }
        String formattedQueryDate = ISODateTimeFormat.dateTimeNoMillis().print(this.queryDate.getTime());
        LOG.info("{} Populating buffer with nextFetchDate <= {}", (Object)this.logIdprefix, (Object)formattedQueryDate);
        BoolQueryBuilder queryBuilder = QueryBuilders.boolQuery().filter((QueryBuilder)QueryBuilders.rangeQuery((String)"nextFetchDate").lte((Object)formattedQueryDate));
        if (this.filterQueries != null) {
            for (String filterQuery : this.filterQueries) {
                queryBuilder.filter((QueryBuilder)QueryBuilders.queryStringQuery((String)filterQuery));
            }
        }
        SearchRequest request = new SearchRequest(new String[]{this.indexName});
        SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
        sourceBuilder.query((QueryBuilder)queryBuilder);
        sourceBuilder.from(0);
        sourceBuilder.size(0);
        sourceBuilder.explain(Boolean.valueOf(false));
        sourceBuilder.trackTotalHits(false);
        if (this.queryTimeout != -1) {
            sourceBuilder.timeout(new TimeValue((long)this.queryTimeout, TimeUnit.SECONDS));
        }
        TermsAggregationBuilder aggregations = ((TermsAggregationBuilder)AggregationBuilders.terms((String)"partition").field(this.partitionField)).size(this.maxBucketNum);
        TopHitsAggregationBuilder tophits = AggregationBuilders.topHits((String)"docs").size(this.maxURLsPerBucket).explain(false);
        for (String bsf : this.bucketSortField) {
            FieldSortBuilder sorter = (FieldSortBuilder)SortBuilders.fieldSort((String)bsf).order(SortOrder.ASC);
            tophits.sort((SortBuilder)sorter);
        }
        aggregations.subAggregation((AggregationBuilder)tophits);
        if (StringUtils.isNotBlank((String)this.totalSortField)) {
            MinAggregationBuilder minBuilder = (MinAggregationBuilder)AggregationBuilders.min((String)"top_hit").field(this.totalSortField);
            aggregations.subAggregation((AggregationBuilder)minBuilder);
            aggregations.order(BucketOrder.aggregation((String)"top_hit", (boolean)true));
        }
        if (this.sample) {
            DiversifiedAggregationBuilder sab = new DiversifiedAggregationBuilder("sample");
            ((DiversifiedAggregationBuilder)sab.field(this.partitionField)).maxDocsPerValue(this.maxURLsPerBucket);
            sab.shardSize(this.maxURLsPerBucket * this.maxBucketNum);
            sab.subAggregation((AggregationBuilder)aggregations);
            sourceBuilder.aggregation((AggregationBuilder)sab);
        } else {
            sourceBuilder.aggregation((AggregationBuilder)aggregations);
        }
        request.source(sourceBuilder);
        if (this.shardID != -1) {
            request.preference("_shards:" + this.shardID + "|_local");
        }
        LOG.debug("{} OpenSearch query {}", (Object)this.logIdprefix, (Object)request);
        LOG.trace("{} isInquery set to true");
        this.isInQuery.set(true);
        client.searchAsync(request, RequestOptions.DEFAULT, (ActionListener)this);
    }

    public void onFailure(Exception arg0) {
        LOG.error("{} Exception with OpenSearch query", (Object)this.logIdprefix, (Object)arg0);
        this.markQueryReceivedNow();
    }

    public void onResponse(SearchResponse response) {
        long timeTaken = System.currentTimeMillis() - this.getTimeLastQuerySent();
        Aggregations aggregs = response.getAggregations();
        if (aggregs == null) {
            this.markQueryReceivedNow();
            return;
        }
        SingleBucketAggregation sample = (SingleBucketAggregation)aggregs.get("sample");
        if (sample != null) {
            aggregs = sample.getAggregations();
        }
        Terms agg = (Terms)aggregs.get("partition");
        int numhits = 0;
        int numBuckets = 0;
        int alreadyprocessed = 0;
        Instant mostRecentDateFound = null;
        this.currentBuckets.clear();
        Iterator iterator = agg.getBuckets().iterator();
        while (iterator.hasNext()) {
            Terms.Bucket entry = (Terms.Bucket)iterator.next();
            String key = (String)entry.getKey();
            this.currentBuckets.add(key);
            long docCount = entry.getDocCount();
            int hitsForThisBucket = 0;
            SearchHit lastHit = null;
            TopHits topHits = (TopHits)entry.getAggregations().get("docs");
            for (SearchHit hit : topHits.getHits().getHits()) {
                LOG.debug("{} -> id [{}], _source [{}]", new Object[]{this.logIdprefix, hit.getId(), hit.getSourceAsString()});
                lastHit = hit;
                Map keyValues = hit.getSourceAsMap();
                String url = (String)keyValues.get("url");
                if (++hitsForThisBucket == 1 && !iterator.hasNext()) {
                    String strDate = (String)keyValues.get("nextFetchDate");
                    try {
                        mostRecentDateFound = Instant.parse(strDate);
                    }
                    catch (Exception e) {
                        throw new RuntimeException("can't parse date :" + strDate);
                    }
                }
                if (this.beingProcessed.containsKey((Object)url)) {
                    LOG.debug("{} -> already processed: {}", (Object)this.logIdprefix, (Object)url);
                    ++alreadyprocessed;
                    continue;
                }
                Metadata metadata = this.fromKeyValues(keyValues);
                boolean added = this.buffer.add(url, metadata);
                if (!added) {
                    LOG.debug("{} -> already in buffer: {}", (Object)this.logIdprefix, (Object)url);
                    ++alreadyprocessed;
                    continue;
                }
                LOG.debug("{} -> added to buffer : {}", (Object)this.logIdprefix, (Object)url);
            }
            if (lastHit != null) {
                this.sortValuesForKey(key, lastHit.getSortValues());
            }
            if (hitsForThisBucket > 0) {
                ++numBuckets;
            }
            numhits += hitsForThisBucket;
            LOG.debug("{} key [{}], hits[{}], doc_count [{}]", new Object[]{this.logIdprefix, key, hitsForThisBucket, docCount, alreadyprocessed});
        }
        LOG.info("{} OpenSearch query returned {} hits from {} buckets in {} msec with {} already being processed. Took {} msec per doc on average.", new Object[]{this.logIdprefix, numhits, numBuckets, timeTaken, alreadyprocessed, Float.valueOf((float)timeTaken / (float)numhits)});
        this.queryTimes.addMeasurement(timeTaken);
        this.eventCounter.scope("already_being_processed").incrBy((long)alreadyprocessed);
        this.eventCounter.scope("ES_queries").incrBy(1L);
        this.eventCounter.scope("ES_docs").incrBy((long)numhits);
        if (mostRecentDateFound != null && this.recentDateIncrease >= 0) {
            Calendar potentialNewDate = Calendar.getInstance();
            potentialNewDate.setTimeInMillis(mostRecentDateFound.getEpochSecond());
            potentialNewDate.add(12, this.recentDateIncrease);
            Date oldDate = null;
            if (this.recentDateMinGap > 0) {
                Calendar low = Calendar.getInstance();
                low.setTime(this.queryDate);
                low.add(12, -this.recentDateMinGap);
                Calendar high = Calendar.getInstance();
                high.setTime(this.queryDate);
                high.add(12, this.recentDateMinGap);
                if (high.before(potentialNewDate) || low.after(potentialNewDate)) {
                    oldDate = this.queryDate;
                }
            } else {
                oldDate = this.queryDate;
            }
            if (oldDate != null) {
                this.queryDate = potentialNewDate.getTime();
                LOG.info("{} queryDate changed from {} to {} based on mostRecentDateFound {}", new Object[]{this.logIdprefix, oldDate, this.queryDate, mostRecentDateFound});
            } else {
                LOG.info("{} queryDate kept at {} based on mostRecentDateFound {}", new Object[]{this.logIdprefix, this.queryDate, mostRecentDateFound});
            }
        }
        if (this.resetFetchDateAfterNSecs != -1) {
            Instant changeNeededOn = Instant.ofEpochMilli(this.lastTimeResetToNOW.toEpochMilli() + (long)this.resetFetchDateAfterNSecs * 1000L);
            if (Instant.now().isAfter(changeNeededOn)) {
                LOG.info("{} queryDate set to null based on resetFetchDateAfterNSecs {}", (Object)this.logIdprefix, (Object)this.resetFetchDateAfterNSecs);
                this.queryDate = null;
            }
        }
        if (numBuckets == 0) {
            this.queryDate = null;
        }
        this.markQueryReceivedNow();
    }

    protected void sortValuesForKey(String key, Object[] sortValues) {
    }
}

