/*
 * Decompiled with CFR 0.152.
 */
package com.digitalpebble.stormcrawler.opensearch.persistence;

import com.digitalpebble.stormcrawler.Metadata;
import com.digitalpebble.stormcrawler.opensearch.BulkItemResponseToFailedFlag;
import com.digitalpebble.stormcrawler.opensearch.IndexCreation;
import com.digitalpebble.stormcrawler.opensearch.OpenSearchConnection;
import com.digitalpebble.stormcrawler.persistence.AbstractStatusUpdaterBolt;
import com.digitalpebble.stormcrawler.persistence.Status;
import com.digitalpebble.stormcrawler.util.ConfUtils;
import com.digitalpebble.stormcrawler.util.URLPartitioner;
import com.github.benmanes.caffeine.cache.Cache;
import com.github.benmanes.caffeine.cache.Caffeine;
import com.github.benmanes.caffeine.cache.RemovalCause;
import com.github.benmanes.caffeine.cache.RemovalListener;
import java.io.IOException;
import java.util.Arrays;
import java.util.Date;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.Collectors;
import org.apache.commons.lang.StringUtils;
import org.apache.storm.metric.api.IMetric;
import org.apache.storm.metric.api.MultiCountMetric;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.tuple.Tuple;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import org.opensearch.action.DocWriteRequest;
import org.opensearch.action.bulk.BulkItemResponse;
import org.opensearch.action.bulk.BulkProcessor;
import org.opensearch.action.bulk.BulkRequest;
import org.opensearch.action.bulk.BulkResponse;
import org.opensearch.action.index.IndexRequest;
import org.opensearch.common.xcontent.XContentFactory;
import org.opensearch.core.xcontent.XContentBuilder;
import org.opensearch.rest.RestStatus;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class StatusUpdaterBolt
extends AbstractStatusUpdaterBolt
implements RemovalListener<String, List<Tuple>>,
BulkProcessor.Listener {
    private static final Logger LOG = LoggerFactory.getLogger(StatusUpdaterBolt.class);
    private String OSBoltType = "status";
    private static final String OSStatusIndexNameParamName = "opensearch.%s.index.name";
    private static final String OSStatusRoutingParamName = "opensearch.%s.routing";
    private static final String OSStatusRoutingFieldParamName = "opensearch.%s.routing.fieldname";
    private boolean routingFieldNameInMetadata = false;
    private String indexName;
    private URLPartitioner partitioner;
    private boolean doRouting;
    private String fieldNameForRoutingKey = null;
    private OpenSearchConnection connection;
    private Cache<String, List<Tuple>> waitAck;
    private final ReentrantLock waitAckLock = new ReentrantLock(true);
    private MultiCountMetric eventCounter;

    public StatusUpdaterBolt() {
    }

    public StatusUpdaterBolt(String boltType) {
        this.OSBoltType = boltType;
    }

    public void prepare(Map<String, Object> stormConf, TopologyContext context, OutputCollector collector) {
        super.prepare(stormConf, context, collector);
        this.indexName = ConfUtils.getString(stormConf, (String)String.format(OSStatusIndexNameParamName, this.OSBoltType), (String)"status");
        this.doRouting = ConfUtils.getBoolean(stormConf, (String)String.format(OSStatusRoutingParamName, this.OSBoltType), (boolean)false);
        this.partitioner = new URLPartitioner();
        this.partitioner.configure(stormConf);
        this.fieldNameForRoutingKey = ConfUtils.getString(stormConf, (String)String.format(OSStatusRoutingFieldParamName, this.OSBoltType));
        if (StringUtils.isNotBlank((String)this.fieldNameForRoutingKey)) {
            if (this.fieldNameForRoutingKey.startsWith("metadata.")) {
                this.routingFieldNameInMetadata = true;
                this.fieldNameForRoutingKey = this.fieldNameForRoutingKey.substring("metadata.".length());
            }
            this.fieldNameForRoutingKey = this.fieldNameForRoutingKey.replaceAll("\\.", "%2E");
        }
        this.waitAck = Caffeine.newBuilder().expireAfterWrite(60L, TimeUnit.SECONDS).removalListener((RemovalListener)this).build();
        context.registerMetric("waitAck", () -> this.waitAck.estimatedSize(), 10);
        try {
            this.connection = OpenSearchConnection.getConnection(stormConf, this.OSBoltType, this);
        }
        catch (Exception e1) {
            LOG.error("Can't connect to ElasticSearch", (Throwable)e1);
            throw new RuntimeException(e1);
        }
        this.eventCounter = (MultiCountMetric)context.registerMetric("counters", (IMetric)new MultiCountMetric(), 30);
        try {
            IndexCreation.checkOrCreateIndex(this.connection.getClient(), this.indexName, this.OSBoltType, LOG);
        }
        catch (IOException e) {
            throw new RuntimeException(e);
        }
    }

    public void cleanup() {
        if (this.connection == null) {
            return;
        }
        this.connection.close();
        this.connection = null;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void store(String url, Status status, Metadata metadata, Optional<Date> nextFetch, Tuple tuple) throws Exception {
        boolean isAlreadySentAndDiscovered;
        String documentID = this.getDocumentID(metadata, url);
        this.waitAckLock.lock();
        try {
            List alreadySent = (List)this.waitAck.getIfPresent((Object)documentID);
            isAlreadySentAndDiscovered = status.equals((Object)Status.DISCOVERED) && alreadySent != null;
        }
        finally {
            this.waitAckLock.unlock();
        }
        if (isAlreadySentAndDiscovered) {
            LOG.debug("Already being sent to OpenSearch {} with status {} and ID {}", new Object[]{url, status, documentID});
            this.eventCounter.scope("acked").incrBy(1L);
            super.ack(tuple, url);
            return;
        }
        XContentBuilder builder = XContentFactory.jsonBuilder().startObject();
        builder.field("url", url);
        builder.field("status", (Object)status);
        builder.startObject("metadata");
        for (String mdKey : metadata.keySet()) {
            String[] values = metadata.getValues(mdKey);
            mdKey = mdKey.replaceAll("\\.", "%2E");
            builder.array(mdKey, values);
        }
        String partitionKey = this.partitioner.getPartition(url, metadata);
        if (partitionKey == null) {
            partitionKey = "_DEFAULT_";
        }
        if (StringUtils.isNotBlank((String)this.fieldNameForRoutingKey) && this.routingFieldNameInMetadata) {
            builder.field(this.fieldNameForRoutingKey, partitionKey);
        }
        builder.endObject();
        if (StringUtils.isNotBlank((String)this.fieldNameForRoutingKey) && !this.routingFieldNameInMetadata) {
            builder.field(this.fieldNameForRoutingKey, partitionKey);
        }
        if (nextFetch.isPresent()) {
            builder.timeField("nextFetchDate", (Object)nextFetch.get());
        }
        builder.endObject();
        IndexRequest request = new IndexRequest(this.getIndexName(metadata));
        boolean create = status.equals((Object)Status.DISCOVERED);
        request.source(builder).id(documentID).create(create);
        if (this.doRouting) {
            request.routing(partitionKey);
        }
        this.waitAckLock.lock();
        try {
            List tt = (List)this.waitAck.get((Object)documentID, k -> new LinkedList());
            tt.add(tuple);
            LOG.debug("Added to waitAck {} with ID {} total {}", new Object[]{url, documentID, tt.size()});
        }
        finally {
            this.waitAckLock.unlock();
        }
        LOG.debug("Sending to ES buffer {} with ID {}", (Object)url, (Object)documentID);
        this.connection.addToProcessor((DocWriteRequest<?>)request);
    }

    public void onRemoval(@Nullable String key, @Nullable List<Tuple> value, @NotNull RemovalCause cause) {
        if (!cause.wasEvicted()) {
            return;
        }
        LOG.error("Purged from waitAck {} with {} values", (Object)key, (Object)value.size());
        for (Tuple t : value) {
            this.eventCounter.scope("failed").incrBy(1L);
            this._collector.fail(t);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void afterBulk(long executionId, BulkRequest request, BulkResponse response) {
        long estimatedSize;
        Map presentTuples;
        LOG.debug("afterBulk [{}] with {} responses", (Object)executionId, (Object)request.numberOfActions());
        this.eventCounter.scope("bulks_received").incrBy(1L);
        this.eventCounter.scope("bulk_msec").incrBy(response.getTook().getMillis());
        Map idsToBulkItemsWithFailedFlag = Arrays.stream(response.getItems()).map(bir -> {
            String id = bir.getId();
            BulkItemResponse.Failure f = bir.getFailure();
            boolean failed = false;
            if (f != null) {
                if (f.getStatus().equals((Object)RestStatus.CONFLICT)) {
                    this.eventCounter.scope("doc_conflicts").incrBy(1L);
                    LOG.debug("Doc conflict ID {}", (Object)id);
                } else {
                    LOG.error("Update ID {}, failure: {}", (Object)id, (Object)f);
                    failed = true;
                }
            }
            return new BulkItemResponseToFailedFlag((BulkItemResponse)bir, failed);
        }).collect(Collectors.groupingBy(idWithFailedFlagTuple -> idWithFailedFlagTuple.id, Collectors.toUnmodifiableList()));
        HashSet debugInfo = null;
        this.waitAckLock.lock();
        try {
            presentTuples = this.waitAck.getAllPresent(idsToBulkItemsWithFailedFlag.keySet());
            if (!presentTuples.isEmpty()) {
                this.waitAck.invalidateAll(presentTuples.keySet());
            }
            estimatedSize = this.waitAck.estimatedSize();
            if (LOG.isDebugEnabled() && estimatedSize > 0L) {
                debugInfo = new HashSet(this.waitAck.asMap().keySet());
            }
        }
        finally {
            this.waitAckLock.unlock();
        }
        int ackCount = 0;
        int failureCount = 0;
        for (Map.Entry entry : presentTuples.entrySet()) {
            BulkItemResponseToFailedFlag selected;
            String id = (String)entry.getKey();
            List associatedTuple = (List)entry.getValue();
            List bulkItemsWithFailedFlag = idsToBulkItemsWithFailedFlag.get(id);
            if (bulkItemsWithFailedFlag.size() == 1) {
                selected = (BulkItemResponseToFailedFlag)bulkItemsWithFailedFlag.get(0);
            } else {
                BulkItemResponseToFailedFlag tmp = null;
                int ctFailed = 0;
                for (BulkItemResponseToFailedFlag buwff : bulkItemsWithFailedFlag) {
                    if (tmp == null) {
                        tmp = buwff;
                    }
                    if (buwff.failed) {
                        ++ctFailed;
                        continue;
                    }
                    tmp = buwff;
                }
                if (ctFailed != bulkItemsWithFailedFlag.size()) {
                    LOG.warn("The id {} would result in an ack and a failure. Using only the ack for processing.", (Object)id);
                }
                selected = Objects.requireNonNull(tmp);
            }
            if (associatedTuple != null) {
                LOG.debug("Acked {} tuple(s) for ID {}", (Object)associatedTuple.size(), (Object)id);
                for (Tuple tuple : associatedTuple) {
                    if (!selected.failed) {
                        String url = tuple.getStringByField("url");
                        ++ackCount;
                        LOG.debug("Acked {} with ID {}", (Object)url, (Object)id);
                        this.eventCounter.scope("acked").incrBy(1L);
                        super.ack(tuple, url);
                        continue;
                    }
                    ++failureCount;
                    this.eventCounter.scope("failed").incrBy(1L);
                    this._collector.fail(tuple);
                }
                continue;
            }
            LOG.warn("Could not find unacked tuple for {}", (Object)id);
        }
        LOG.info("Bulk response [{}] : items {}, waitAck {}, acked {}, failed {}", new Object[]{executionId, idsToBulkItemsWithFailedFlag.size(), estimatedSize, ackCount, failureCount});
        if (debugInfo != null) {
            for (String kinaw : debugInfo) {
                LOG.debug("Still in wait ack after bulk response [{}] => {}", (Object)executionId, (Object)kinaw);
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void afterBulk(long executionId, BulkRequest request, Throwable throwable) {
        Map failedTupleLists;
        this.eventCounter.scope("bulks_received").incrBy(1L);
        LOG.error("Exception with bulk {} - failing the whole lot ", (Object)executionId, (Object)throwable);
        Set failedIds = request.requests().stream().map(DocWriteRequest::id).collect(Collectors.toUnmodifiableSet());
        this.waitAckLock.lock();
        try {
            failedTupleLists = this.waitAck.getAllPresent(failedIds);
            if (!failedTupleLists.isEmpty()) {
                this.waitAck.invalidateAll(failedTupleLists.keySet());
            }
        }
        finally {
            this.waitAckLock.unlock();
        }
        for (String id : failedIds) {
            List failedTuples = (List)failedTupleLists.get(id);
            if (failedTuples != null) {
                LOG.debug("Failed {} tuple(s) for ID {}", (Object)failedTuples.size(), (Object)id);
                for (Tuple x : failedTuples) {
                    this.eventCounter.scope("failed").incrBy(1L);
                    this._collector.fail(x);
                }
                continue;
            }
            LOG.warn("Could not find unacked tuple for {}", (Object)id);
        }
    }

    public void beforeBulk(long executionId, BulkRequest request) {
        LOG.debug("beforeBulk {} with {} actions", (Object)executionId, (Object)request.numberOfActions());
        this.eventCounter.scope("bulks_received").incrBy(1L);
    }

    protected String getIndexName(Metadata m) {
        return this.indexName;
    }
}

