/*
 * Decompiled with CFR 0.152.
 */
package com.digitalpebble.stormcrawler.opensearch.bolt;

import com.digitalpebble.stormcrawler.Metadata;
import com.digitalpebble.stormcrawler.opensearch.BulkItemResponseToFailedFlag;
import com.digitalpebble.stormcrawler.opensearch.OpenSearchConnection;
import com.digitalpebble.stormcrawler.util.ConfUtils;
import com.github.benmanes.caffeine.cache.Cache;
import com.github.benmanes.caffeine.cache.Caffeine;
import com.github.benmanes.caffeine.cache.RemovalCause;
import com.github.benmanes.caffeine.cache.RemovalListener;
import java.lang.invoke.MethodHandles;
import java.util.Arrays;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.Collectors;
import org.apache.commons.codec.digest.DigestUtils;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichBolt;
import org.apache.storm.tuple.Tuple;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import org.opensearch.action.DocWriteRequest;
import org.opensearch.action.bulk.BulkItemResponse;
import org.opensearch.action.bulk.BulkProcessor;
import org.opensearch.action.bulk.BulkRequest;
import org.opensearch.action.bulk.BulkResponse;
import org.opensearch.action.delete.DeleteRequest;
import org.opensearch.rest.RestStatus;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class DeletionBolt
extends BaseRichBolt
implements RemovalListener<String, List<Tuple>>,
BulkProcessor.Listener {
    static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
    private static final String BOLT_TYPE = "indexer";
    private OutputCollector _collector;
    private String indexName;
    private OpenSearchConnection connection;
    private Cache<String, List<Tuple>> waitAck;
    private final ReentrantLock waitAckLock = new ReentrantLock(true);

    public DeletionBolt() {
    }

    public DeletionBolt(String indexName) {
        this.indexName = indexName;
    }

    public void prepare(Map<String, Object> conf, TopologyContext context, OutputCollector collector) {
        this._collector = collector;
        if (this.indexName == null) {
            this.indexName = ConfUtils.getString(conf, (String)"opensearch.indexer.index.name", (String)"content");
        }
        try {
            this.connection = OpenSearchConnection.getConnection(conf, BOLT_TYPE, this);
        }
        catch (Exception e1) {
            LOG.error("Can't connect to opensearch", (Throwable)e1);
            throw new RuntimeException(e1);
        }
        this.waitAck = Caffeine.newBuilder().expireAfterWrite(60L, TimeUnit.SECONDS).removalListener((RemovalListener)this).build();
        context.registerMetric("waitAck", () -> this.waitAck.estimatedSize(), 10);
    }

    public void onRemoval(@Nullable String key, @Nullable List<Tuple> value, @NotNull RemovalCause cause) {
        if (!cause.wasEvicted()) {
            return;
        }
        if (value != null) {
            LOG.error("Purged from waitAck {} with {} values", (Object)key, (Object)value.size());
            for (Tuple t : value) {
                this._collector.fail(t);
            }
        } else {
            LOG.error("Purged from waitAck {} with no values", (Object)key);
        }
    }

    public void cleanup() {
        if (this.connection != null) {
            this.connection.close();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void execute(Tuple tuple) {
        String url = tuple.getStringByField("url");
        Metadata metadata = (Metadata)tuple.getValueByField("metadata");
        String docID = DigestUtils.sha256Hex((String)url);
        DeleteRequest dr = new DeleteRequest(this.getIndexName(metadata), docID);
        this.connection.addToProcessor((DocWriteRequest<?>)dr);
        this.waitAckLock.lock();
        try {
            LinkedList<Tuple> tt = (LinkedList<Tuple>)this.waitAck.getIfPresent((Object)docID);
            if (tt == null) {
                tt = new LinkedList<Tuple>();
                this.waitAck.put((Object)docID, tt);
            }
            tt.add(tuple);
            LOG.debug("Added to waitAck {} with ID {} total {}", new Object[]{url, docID, tt.size()});
        }
        finally {
            this.waitAckLock.unlock();
        }
    }

    public void declareOutputFields(OutputFieldsDeclarer arg0) {
    }

    protected String getIndexName(Metadata m) {
        return this.indexName;
    }

    public void beforeBulk(long executionId, BulkRequest request) {
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void afterBulk(long executionId, BulkRequest request, BulkResponse response) {
        long estimatedSize;
        Map presentTuples;
        Map idsToBulkItemsWithFailedFlag = Arrays.stream(response.getItems()).map(bir -> {
            String id = bir.getId();
            BulkItemResponse.Failure f = bir.getFailure();
            boolean failed = false;
            if (f != null) {
                if (f.getStatus().equals((Object)RestStatus.CONFLICT)) {
                    LOG.debug("Doc conflict ID {}", (Object)id);
                } else {
                    failed = true;
                }
            }
            return new BulkItemResponseToFailedFlag((BulkItemResponse)bir, failed);
        }).collect(Collectors.groupingBy(idWithFailedFlagTuple -> idWithFailedFlagTuple.id, Collectors.toUnmodifiableList()));
        this.waitAckLock.lock();
        try {
            presentTuples = this.waitAck.getAllPresent(idsToBulkItemsWithFailedFlag.keySet());
            if (!presentTuples.isEmpty()) {
                this.waitAck.invalidateAll(presentTuples.keySet());
            }
            estimatedSize = this.waitAck.estimatedSize();
        }
        finally {
            this.waitAckLock.unlock();
        }
        int ackCount = 0;
        int failureCount = 0;
        for (Map.Entry entry : presentTuples.entrySet()) {
            BulkItemResponseToFailedFlag selected;
            String id = (String)entry.getKey();
            List associatedTuple = (List)entry.getValue();
            List bulkItemsWithFailedFlag = idsToBulkItemsWithFailedFlag.get(id);
            if (bulkItemsWithFailedFlag.size() == 1) {
                selected = (BulkItemResponseToFailedFlag)bulkItemsWithFailedFlag.get(0);
            } else {
                BulkItemResponseToFailedFlag tmp = null;
                int ctFailed = 0;
                for (BulkItemResponseToFailedFlag buwff : bulkItemsWithFailedFlag) {
                    if (tmp == null) {
                        tmp = buwff;
                    }
                    if (buwff.failed) {
                        ++ctFailed;
                        continue;
                    }
                    tmp = buwff;
                }
                if (ctFailed != bulkItemsWithFailedFlag.size()) {
                    LOG.warn("The id {} would result in an ack and a failure. Using only the ack for processing.", (Object)id);
                }
                selected = Objects.requireNonNull(tmp);
            }
            if (associatedTuple != null) {
                LOG.debug("Found {} tuple(s) for ID {}", (Object)associatedTuple.size(), (Object)id);
                for (Tuple t : associatedTuple) {
                    String url = (String)t.getValueByField("url");
                    Metadata metadata = (Metadata)t.getValueByField("metadata");
                    if (!selected.failed) {
                        ++ackCount;
                        this._collector.ack(t);
                        continue;
                    }
                    ++failureCount;
                    BulkItemResponse.Failure failure = selected.getFailure();
                    LOG.error("update ID {}, URL {}, failure: {}", new Object[]{id, url, failure});
                    this._collector.fail(t);
                }
                continue;
            }
            LOG.warn("Could not find unacked tuples for {}", entry.getKey());
        }
        LOG.info("Bulk response [{}] : items {}, waitAck {}, acked {}, failed {}", new Object[]{executionId, idsToBulkItemsWithFailedFlag.size(), estimatedSize, ackCount, failureCount});
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void afterBulk(long executionId, BulkRequest request, Throwable failure) {
        Map failedTupleLists;
        LOG.error("Exception with bulk {} - failing the whole lot ", (Object)executionId, (Object)failure);
        Set failedIds = request.requests().stream().map(DocWriteRequest::id).collect(Collectors.toUnmodifiableSet());
        this.waitAckLock.lock();
        try {
            failedTupleLists = this.waitAck.getAllPresent(failedIds);
            if (!failedTupleLists.isEmpty()) {
                this.waitAck.invalidateAll(failedTupleLists.keySet());
            }
        }
        finally {
            this.waitAckLock.unlock();
        }
        for (String id : failedIds) {
            List failedTuples = (List)failedTupleLists.get(id);
            if (failedTuples != null) {
                LOG.debug("Failed {} tuple(s) for ID {}", (Object)failedTuples.size(), (Object)id);
                for (Tuple x : failedTuples) {
                    this._collector.fail(x);
                }
                continue;
            }
            LOG.warn("Could not find unacked tuple for {}", (Object)id);
        }
    }
}

