/*
 * Decompiled with CFR 0.152.
 */
package com.digitalpebble.stormcrawler.opensearch.persistence;

import com.digitalpebble.stormcrawler.opensearch.persistence.AggregationSpout;
import com.digitalpebble.stormcrawler.persistence.EmptyQueueListener;
import com.digitalpebble.stormcrawler.util.ConfUtils;
import com.github.benmanes.caffeine.cache.Cache;
import com.github.benmanes.caffeine.cache.Caffeine;
import java.time.Instant;
import java.util.Date;
import java.util.List;
import java.util.Map;
import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.task.TopologyContext;
import org.joda.time.format.ISODateTimeFormat;
import org.opensearch.action.ActionListener;
import org.opensearch.action.search.SearchRequest;
import org.opensearch.action.search.SearchResponse;
import org.opensearch.client.RequestOptions;
import org.opensearch.index.query.BoolQueryBuilder;
import org.opensearch.index.query.QueryBuilder;
import org.opensearch.index.query.QueryBuilders;
import org.opensearch.search.SearchHit;
import org.opensearch.search.builder.SearchSourceBuilder;
import org.opensearch.search.sort.FieldSortBuilder;
import org.opensearch.search.sort.SortBuilder;
import org.opensearch.search.sort.SortBuilders;
import org.opensearch.search.sort.SortOrder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class HybridSpout
extends AggregationSpout
implements EmptyQueueListener {
    private static final Logger LOG = LoggerFactory.getLogger(HybridSpout.class);
    protected static final String RELOADPARAMNAME = "opensearch.status.max.urls.per.reload";
    private int bufferReloadSize = 10;
    private Cache<String, Object[]> searchAfterCache;
    private HostResultListener hrl;

    @Override
    public void open(Map<String, Object> stormConf, TopologyContext context, SpoutOutputCollector collector) {
        super.open(stormConf, context, collector);
        this.bufferReloadSize = ConfUtils.getInt(stormConf, (String)RELOADPARAMNAME, (int)this.maxURLsPerBucket);
        this.buffer.setEmptyQueueListener((EmptyQueueListener)this);
        this.searchAfterCache = Caffeine.newBuilder().build();
        this.hrl = new HostResultListener();
    }

    public void emptyQueue(String queueName) {
        LOG.info("{} Emptied buffer queue for {}", (Object)this.logIdprefix, (Object)queueName);
        if (!this.currentBuckets.contains(queueName)) {
            return;
        }
        if (this.isInQuery.get()) {
            LOG.trace("{} isInquery true", (Object)this.logIdprefix, (Object)queueName);
            return;
        }
        LOG.info("{} Querying for more docs for {}", (Object)this.logIdprefix, (Object)queueName);
        if (this.queryDate == null) {
            this.queryDate = new Date();
            this.lastTimeResetToNOW = Instant.now();
        }
        String formattedQueryDate = ISODateTimeFormat.dateTimeNoMillis().print(this.queryDate.getTime());
        BoolQueryBuilder queryBuilder = QueryBuilders.boolQuery().filter((QueryBuilder)QueryBuilders.rangeQuery((String)"nextFetchDate").lte((Object)formattedQueryDate));
        queryBuilder.filter((QueryBuilder)QueryBuilders.termQuery((String)this.partitionField, (String)queueName));
        SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
        sourceBuilder.query((QueryBuilder)queryBuilder);
        sourceBuilder.from(0);
        sourceBuilder.size(this.bufferReloadSize);
        sourceBuilder.explain(Boolean.valueOf(false));
        sourceBuilder.trackTotalHits(false);
        for (String bsf : this.bucketSortField) {
            FieldSortBuilder sorter = (FieldSortBuilder)SortBuilders.fieldSort((String)bsf).order(SortOrder.ASC);
            sourceBuilder.sort((SortBuilder)sorter);
        }
        Object[] searchAfterValues = (Object[])this.searchAfterCache.getIfPresent((Object)queueName);
        if (searchAfterValues != null) {
            sourceBuilder.searchAfter(searchAfterValues);
        }
        SearchRequest request = new SearchRequest(new String[]{this.indexName});
        request.source(sourceBuilder);
        if (this.shardID != -1) {
            request.preference("_shards:" + this.shardID + "|_local");
        }
        LOG.debug("{} OpenSearch query {} - {}", new Object[]{this.logIdprefix, queueName, request.toString()});
        client.searchAsync(request, RequestOptions.DEFAULT, (ActionListener)this.hrl);
    }

    @Override
    public void onResponse(SearchResponse response) {
        this.searchAfterCache.invalidateAll();
        super.onResponse(response);
    }

    @Override
    protected void sortValuesForKey(String key, Object[] sortValues) {
        if (sortValues != null && sortValues.length > 0) {
            this.searchAfterCache.put((Object)key, (Object)sortValues);
        }
    }

    class HostResultListener
    implements ActionListener<SearchResponse> {
        HostResultListener() {
        }

        public void onResponse(SearchResponse response) {
            int alreadyprocessed = 0;
            int numDocs = 0;
            SearchHit[] hits = response.getHits().getHits();
            Object[] sortValues = null;
            String key = null;
            for (SearchHit hit : hits) {
                Object key_as_object;
                ++numDocs;
                String pfield = HybridSpout.this.partitionField;
                Map sourceAsMap = hit.getSourceAsMap();
                if (pfield.startsWith("metadata.")) {
                    sourceAsMap = (Map)sourceAsMap.get("metadata");
                    pfield = pfield.substring(9);
                }
                if ((key_as_object = sourceAsMap.get(pfield)) instanceof List) {
                    if (((List)key_as_object).size() == 1) {
                        key = (String)((List)key_as_object).get(0);
                    }
                } else {
                    key = key_as_object.toString();
                }
                sortValues = hit.getSortValues();
                if (HybridSpout.this.addHitToBuffer(hit)) continue;
                ++alreadyprocessed;
            }
            if (key != null) {
                HybridSpout.this.searchAfterCache.put(key, sortValues);
            }
            HybridSpout.this.eventCounter.scope("OpenSearch_queries_host").incrBy(1L);
            HybridSpout.this.eventCounter.scope("OpenSearch_docs_host").incrBy((long)numDocs);
            HybridSpout.this.eventCounter.scope("already_being_processed_host").incrBy((long)alreadyprocessed);
            LOG.info("{} OpenSearch term query returned {} hits  in {} msec with {} already being processed for {}", new Object[]{HybridSpout.this.logIdprefix, numDocs, response.getTook().getMillis(), alreadyprocessed, key});
        }

        public void onFailure(Exception e) {
            LOG.error("Exception with OpenSearch query", (Throwable)e);
        }
    }
}

