/*
 * Decompiled with CFR 0.152.
 */
package com.digitalpebble.stormcrawler.opensearch.persistence;

import com.digitalpebble.stormcrawler.Metadata;
import com.digitalpebble.stormcrawler.opensearch.IndexCreation;
import com.digitalpebble.stormcrawler.opensearch.OpenSearchConnection;
import com.digitalpebble.stormcrawler.persistence.AbstractQueryingSpout;
import com.digitalpebble.stormcrawler.util.ConfUtils;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Date;
import java.util.List;
import java.util.Map;
import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.task.TopologyContext;
import org.opensearch.client.RestHighLevelClient;
import org.opensearch.search.SearchHit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class AbstractSpout
extends AbstractQueryingSpout {
    private static final Logger LOG = LoggerFactory.getLogger(AbstractSpout.class);
    protected static final String OSBoltType = "status";
    protected static final String OSStatusIndexNameParamName = "opensearch.status.index.name";
    protected static final String OSStatusBucketFieldParamName = "opensearch.status.bucket.field";
    protected static final String OSStatusMaxBucketParamName = "opensearch.status.max.buckets";
    protected static final String OSStatusMaxURLsParamName = "opensearch.status.max.urls.per.bucket";
    protected static final String OSStatusBucketSortFieldParamName = "opensearch.status.bucket.sort.field";
    protected static final String OSStatusGlobalSortFieldParamName = "opensearch.status.global.sort.field";
    protected static final String OSStatusFilterParamName = "opensearch.status.filterQuery";
    protected static final String OSStatusQueryTimeoutParamName = "opensearch.status.query.timeout";
    protected List<String> filterQueries = null;
    protected String indexName;
    protected static RestHighLevelClient client;
    protected int shardID = -1;
    protected String logIdprefix = "";
    protected String partitionField;
    protected int maxURLsPerBucket = 10;
    protected int maxBucketNum = 10;
    protected List<String> bucketSortField = new ArrayList<String>();
    protected String totalSortField = "";
    protected Date queryDate;
    protected int queryTimeout = -1;

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void open(Map<String, Object> stormConf, TopologyContext context, SpoutOutputCollector collector) {
        super.open(stormConf, context, collector);
        this.indexName = ConfUtils.getString(stormConf, (String)OSStatusIndexNameParamName, (String)OSBoltType);
        Class<AbstractSpout> clazz = AbstractSpout.class;
        synchronized (AbstractSpout.class) {
            try {
                if (client == null) {
                    client = OpenSearchConnection.getClient(stormConf, OSBoltType);
                }
            }
            catch (Exception e1) {
                LOG.error("Can't connect to ElasticSearch", (Throwable)e1);
                throw new RuntimeException(e1);
            }
            try {
                IndexCreation.checkOrCreateIndex(client, this.indexName, OSBoltType, LOG);
            }
            catch (IOException e) {
                throw new RuntimeException(e);
            }
            int totalTasks = context.getComponentTasks(context.getThisComponentId()).size();
            if (totalTasks > 1) {
                this.logIdprefix = "[" + context.getThisComponentId() + " #" + context.getThisTaskIndex() + "] ";
                this.shardID = context.getThisTaskIndex();
                LOG.info("{} assigned shard ID {}", (Object)this.logIdprefix, (Object)this.shardID);
            }
            this.partitionField = ConfUtils.getString(stormConf, (String)OSStatusBucketFieldParamName, (String)"key");
            this.bucketSortField = ConfUtils.loadListFromConf((String)OSStatusBucketSortFieldParamName, stormConf);
            this.totalSortField = ConfUtils.getString(stormConf, (String)OSStatusGlobalSortFieldParamName);
            this.maxURLsPerBucket = ConfUtils.getInt(stormConf, (String)OSStatusMaxURLsParamName, (int)1);
            this.maxBucketNum = ConfUtils.getInt(stormConf, (String)OSStatusMaxBucketParamName, (int)10);
            this.queryTimeout = ConfUtils.getInt(stormConf, (String)OSStatusQueryTimeoutParamName, (int)-1);
            this.filterQueries = ConfUtils.loadListFromConf((String)OSStatusFilterParamName, stormConf);
            return;
        }
    }

    protected abstract void populateBuffer();

    protected final boolean addHitToBuffer(SearchHit hit) {
        Map keyValues = hit.getSourceAsMap();
        String url = (String)keyValues.get("url");
        if (this.beingProcessed.containsKey((Object)url)) {
            return false;
        }
        return this.buffer.add(url, this.fromKeyValues(keyValues));
    }

    protected final Metadata fromKeyValues(Map<String, Object> keyValues) {
        Map mdAsMap = (Map)keyValues.get("metadata");
        Metadata metadata = new Metadata();
        if (mdAsMap != null) {
            for (Map.Entry mdEntry : mdAsMap.entrySet()) {
                String key = (String)mdEntry.getKey();
                key = key.replaceAll("%2E", "\\.");
                Object mdValObj = mdEntry.getValue();
                if (mdValObj instanceof String) {
                    metadata.addValue(key, (String)mdValObj);
                    continue;
                }
                metadata.addValues(key, (Collection)((List)mdValObj));
            }
        }
        return metadata;
    }

    public void ack(Object msgId) {
        LOG.debug("{}  Ack for {}", (Object)this.logIdprefix, msgId);
        super.ack(msgId);
    }

    public void fail(Object msgId) {
        LOG.info("{}  Fail for {}", (Object)this.logIdprefix, msgId);
        super.fail(msgId);
    }

    public void close() {
        if (client != null) {
            try {
                client.close();
            }
            catch (IOException e) {
                LOG.error("Exception caught when closing client", (Throwable)e);
            }
        }
    }
}

