/*
 * Decompiled with CFR 0.152.
 */
package com.digitalpebble.stormcrawler.opensearch.bolt;

import com.digitalpebble.stormcrawler.Metadata;
import com.digitalpebble.stormcrawler.indexing.AbstractIndexerBolt;
import com.digitalpebble.stormcrawler.opensearch.BulkItemResponseToFailedFlag;
import com.digitalpebble.stormcrawler.opensearch.IndexCreation;
import com.digitalpebble.stormcrawler.opensearch.OpenSearchConnection;
import com.digitalpebble.stormcrawler.persistence.Status;
import com.digitalpebble.stormcrawler.util.ConfUtils;
import com.digitalpebble.stormcrawler.util.PerSecondReducer;
import com.github.benmanes.caffeine.cache.Cache;
import com.github.benmanes.caffeine.cache.Caffeine;
import com.github.benmanes.caffeine.cache.RemovalCause;
import com.github.benmanes.caffeine.cache.RemovalListener;
import java.io.IOException;
import java.util.Arrays;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.Collectors;
import org.apache.commons.lang.StringUtils;
import org.apache.storm.metric.api.IMetric;
import org.apache.storm.metric.api.IReducer;
import org.apache.storm.metric.api.MultiCountMetric;
import org.apache.storm.metric.api.MultiReducedMetric;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import org.opensearch.action.DocWriteRequest;
import org.opensearch.action.bulk.BulkItemResponse;
import org.opensearch.action.bulk.BulkProcessor;
import org.opensearch.action.bulk.BulkRequest;
import org.opensearch.action.bulk.BulkResponse;
import org.opensearch.action.index.IndexRequest;
import org.opensearch.common.xcontent.XContentFactory;
import org.opensearch.core.xcontent.XContentBuilder;
import org.opensearch.rest.RestStatus;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class IndexerBolt
extends AbstractIndexerBolt
implements RemovalListener<String, List<Tuple>>,
BulkProcessor.Listener {
    private static final Logger LOG = LoggerFactory.getLogger(IndexerBolt.class);
    private static final String OSBoltType = "indexer";
    static final String OSIndexNameParamName = "opensearch.indexer.index.name";
    private static final String OSCreateParamName = "opensearch.indexer.create";
    private static final String OSIndexPipelineParamName = "opensearch.indexer.pipeline";
    private OutputCollector _collector;
    private String indexName;
    private String pipeline;
    private boolean create = false;
    private MultiCountMetric eventCounter;
    private OpenSearchConnection connection;
    private MultiReducedMetric perSecMetrics;
    private Cache<String, List<Tuple>> waitAck;
    private final ReentrantLock waitAckLock = new ReentrantLock(true);

    public IndexerBolt() {
    }

    public IndexerBolt(String indexName) {
        this.indexName = indexName;
    }

    public void prepare(Map<String, Object> conf, TopologyContext context, OutputCollector collector) {
        super.prepare(conf, context, collector);
        this._collector = collector;
        if (this.indexName == null) {
            this.indexName = ConfUtils.getString(conf, (String)OSIndexNameParamName, (String)"content");
        }
        this.create = ConfUtils.getBoolean(conf, (String)OSCreateParamName, (boolean)false);
        this.pipeline = ConfUtils.getString(conf, (String)OSIndexPipelineParamName);
        try {
            this.connection = OpenSearchConnection.getConnection(conf, OSBoltType, this);
        }
        catch (Exception e1) {
            LOG.error("Can't connect to opensearch", (Throwable)e1);
            throw new RuntimeException(e1);
        }
        this.eventCounter = (MultiCountMetric)context.registerMetric("OpensearchIndexer", (IMetric)new MultiCountMetric(), 10);
        this.perSecMetrics = (MultiReducedMetric)context.registerMetric("Indexer_average_persec", (IMetric)new MultiReducedMetric((IReducer)new PerSecondReducer()), 10);
        this.waitAck = Caffeine.newBuilder().expireAfterWrite(60L, TimeUnit.SECONDS).removalListener((RemovalListener)this).build();
        context.registerMetric("waitAck", () -> this.waitAck.estimatedSize(), 10);
        try {
            IndexCreation.checkOrCreateIndex(this.connection.getClient(), this.indexName, OSBoltType, LOG);
        }
        catch (IOException e) {
            throw new RuntimeException(e);
        }
    }

    public void onRemoval(@Nullable String key, @Nullable List<Tuple> value, @NotNull RemovalCause cause) {
        if (!cause.wasEvicted()) {
            return;
        }
        if (value != null) {
            LOG.error("Purged from waitAck {} with {} values", (Object)key, (Object)value.size());
            for (Tuple t : value) {
                this._collector.fail(t);
            }
        } else {
            LOG.error("Purged from waitAck {} with no values", (Object)key);
        }
    }

    public void cleanup() {
        if (this.connection != null) {
            this.connection.close();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void execute(Tuple tuple) {
        String url = tuple.getStringByField("url");
        String normalisedurl = this.valueForURL(tuple);
        LOG.info("Indexing {} as {}", (Object)url, (Object)normalisedurl);
        Metadata metadata = (Metadata)tuple.getValueByField("metadata");
        if (!this.filterDocument(metadata)) {
            LOG.info("Filtered {}", (Object)url);
            this.eventCounter.scope("Filtered").incrBy(1L);
            this._collector.emit("status", tuple, (List)new Values(new Object[]{url, metadata, Status.FETCHED}));
            this._collector.ack(tuple);
            return;
        }
        String docID = this.getDocumentID(metadata, normalisedurl);
        try {
            XContentBuilder builder = XContentFactory.jsonBuilder().startObject();
            if (StringUtils.isNotBlank((String)this.fieldNameForText())) {
                String text = this.trimText(tuple.getStringByField("text"));
                if (!this.ignoreEmptyFields() || StringUtils.isNotBlank((String)text)) {
                    builder.field(this.fieldNameForText(), this.trimText(text));
                }
            }
            if (StringUtils.isNotBlank((String)this.fieldNameForURL())) {
                builder.field(this.fieldNameForURL(), normalisedurl);
            }
            Map keyVals = this.filterMetadata(metadata);
            for (Map.Entry entry : keyVals.entrySet()) {
                if (((String[])entry.getValue()).length == 1) {
                    String value = ((String[])entry.getValue())[0];
                    if (this.ignoreEmptyFields() && !StringUtils.isNotBlank((String)value)) continue;
                    builder.field((String)entry.getKey(), value);
                    continue;
                }
                if (((String[])entry.getValue()).length <= 1) continue;
                builder.array((String)entry.getKey(), (String[])entry.getValue());
            }
            builder.endObject();
            IndexRequest indexRequest = new IndexRequest(this.getIndexName(metadata)).source(builder).id(docID).create(this.create);
            if (this.pipeline != null) {
                indexRequest.setPipeline(this.pipeline);
            }
            this.connection.addToProcessor((DocWriteRequest<?>)indexRequest);
            this.eventCounter.scope("Indexed").incrBy(1L);
            this.perSecMetrics.scope("Indexed").update((Object)1);
            this.waitAckLock.lock();
            try {
                LinkedList<Tuple> tt = (LinkedList<Tuple>)this.waitAck.getIfPresent((Object)docID);
                if (tt == null) {
                    tt = new LinkedList<Tuple>();
                    this.waitAck.put((Object)docID, tt);
                }
                tt.add(tuple);
                LOG.debug("Added to waitAck {} with ID {} total {}", new Object[]{url, docID, tt.size()});
            }
            finally {
                this.waitAckLock.unlock();
            }
        }
        catch (IOException e) {
            LOG.error("Error building document for OpenSearch", (Throwable)e);
            this._collector.fail(tuple);
            this.waitAckLock.lock();
            try {
                this.waitAck.invalidate((Object)docID);
            }
            finally {
                this.waitAckLock.unlock();
            }
        }
    }

    protected String getIndexName(Metadata m) {
        return this.indexName;
    }

    public void beforeBulk(long executionId, BulkRequest request) {
        this.eventCounter.scope("bulks_sent").incrBy(1L);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void afterBulk(long executionId, BulkRequest request, BulkResponse response) {
        long estimatedSize;
        Map presentTuples;
        this.eventCounter.scope("bulks_received").incrBy(1L);
        this.eventCounter.scope("bulk_msec").incrBy(response.getTook().getMillis());
        Map idsToBulkItemsWithFailedFlag = Arrays.stream(response.getItems()).map(bir -> {
            String id = bir.getId();
            BulkItemResponse.Failure f = bir.getFailure();
            boolean failed = false;
            if (f != null) {
                if (f.getStatus().equals((Object)RestStatus.CONFLICT)) {
                    this.eventCounter.scope("doc_conflicts").incrBy(1L);
                    LOG.debug("Doc conflict ID {}", (Object)id);
                } else {
                    failed = true;
                }
            }
            return new BulkItemResponseToFailedFlag((BulkItemResponse)bir, failed);
        }).collect(Collectors.groupingBy(idWithFailedFlagTuple -> idWithFailedFlagTuple.id, Collectors.toUnmodifiableList()));
        HashSet debugInfo = null;
        this.waitAckLock.lock();
        try {
            presentTuples = this.waitAck.getAllPresent(idsToBulkItemsWithFailedFlag.keySet());
            if (!presentTuples.isEmpty()) {
                this.waitAck.invalidateAll(presentTuples.keySet());
            }
            estimatedSize = this.waitAck.estimatedSize();
            if (LOG.isDebugEnabled() && estimatedSize > 0L) {
                debugInfo = new HashSet(this.waitAck.asMap().keySet());
            }
        }
        finally {
            this.waitAckLock.unlock();
        }
        int ackCount = 0;
        int failureCount = 0;
        for (Map.Entry entry : presentTuples.entrySet()) {
            BulkItemResponseToFailedFlag selected;
            String id = (String)entry.getKey();
            List associatedTuple = (List)entry.getValue();
            List bulkItemsWithFailedFlag = idsToBulkItemsWithFailedFlag.get(id);
            if (bulkItemsWithFailedFlag.size() == 1) {
                selected = (BulkItemResponseToFailedFlag)bulkItemsWithFailedFlag.get(0);
            } else {
                BulkItemResponseToFailedFlag tmp = null;
                int ctFailed = 0;
                for (BulkItemResponseToFailedFlag buwff : bulkItemsWithFailedFlag) {
                    if (tmp == null) {
                        tmp = buwff;
                    }
                    if (buwff.failed) {
                        ++ctFailed;
                        continue;
                    }
                    tmp = buwff;
                }
                if (ctFailed != bulkItemsWithFailedFlag.size()) {
                    LOG.warn("The id {} would result in an ack and a failure. Using only the ack for processing.", (Object)id);
                }
                selected = Objects.requireNonNull(tmp);
            }
            if (associatedTuple != null) {
                LOG.debug("Found {} tuple(s) for ID {}", (Object)associatedTuple.size(), (Object)id);
                for (Tuple t : associatedTuple) {
                    String url = (String)t.getValueByField("url");
                    Metadata metadata = (Metadata)t.getValueByField("metadata");
                    if (!selected.failed) {
                        ++ackCount;
                        this._collector.emit("status", t, (List)new Values(new Object[]{url, metadata, Status.FETCHED}));
                        this._collector.ack(t);
                        continue;
                    }
                    ++failureCount;
                    BulkItemResponse.Failure failure = selected.getFailure();
                    LOG.error("update ID {}, URL {}, failure: {}", new Object[]{id, url, failure});
                    if (selected.getFailure().getStatus().equals((Object)RestStatus.BAD_REQUEST)) {
                        metadata.setValue("error.source", "OpenSearch indexing");
                        metadata.setValue("error.message", "invalid content");
                        this._collector.emit("status", t, (List)new Values(new Object[]{url, metadata, Status.ERROR}));
                        this._collector.ack(t);
                        LOG.debug("Acked {} with ID {}", (Object)url, (Object)id);
                        continue;
                    }
                    LOG.error("update ID {}, URL {}, failure: {}", new Object[]{id, url, failure});
                    if (failure.getStatus().equals((Object)RestStatus.BAD_REQUEST)) {
                        metadata.setValue("error.source", "OpenSearch indexing");
                        metadata.setValue("error.message", "invalid content");
                        this._collector.emit("status", t, (List)new Values(new Object[]{url, metadata, Status.ERROR}));
                        this._collector.ack(t);
                        continue;
                    }
                    this._collector.fail(t);
                }
                continue;
            }
            LOG.warn("Could not find unacked tuples for {}", entry.getKey());
        }
        LOG.info("Bulk response [{}] : items {}, waitAck {}, acked {}, failed {}", new Object[]{executionId, idsToBulkItemsWithFailedFlag.size(), estimatedSize, ackCount, failureCount});
        if (debugInfo != null) {
            for (String kinaw : debugInfo) {
                LOG.debug("Still in wait ack after bulk response [{}] => {}", (Object)executionId, (Object)kinaw);
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void afterBulk(long executionId, BulkRequest request, Throwable failure) {
        Map failedTupleLists;
        this.eventCounter.scope("bulks_received").incrBy(1L);
        LOG.error("Exception with bulk {} - failing the whole lot ", (Object)executionId, (Object)failure);
        Set failedIds = request.requests().stream().map(DocWriteRequest::id).collect(Collectors.toUnmodifiableSet());
        this.waitAckLock.lock();
        try {
            failedTupleLists = this.waitAck.getAllPresent(failedIds);
            if (!failedTupleLists.isEmpty()) {
                this.waitAck.invalidateAll(failedTupleLists.keySet());
            }
        }
        finally {
            this.waitAckLock.unlock();
        }
        for (String id : failedIds) {
            List failedTuples = (List)failedTupleLists.get(id);
            if (failedTuples != null) {
                LOG.debug("Failed {} tuple(s) for ID {}", (Object)failedTuples.size(), (Object)id);
                for (Tuple x : failedTuples) {
                    this.eventCounter.scope("failed").incrBy(1L);
                    this._collector.fail(x);
                }
                continue;
            }
            LOG.warn("Could not find unacked tuple for {}", (Object)id);
        }
    }
}

