/*
 * Decompiled with CFR 0.152.
 */
package org.graylog2.indexer.messages;

import com.github.rholder.retry.Attempt;
import com.github.rholder.retry.RetryException;
import com.github.rholder.retry.RetryListener;
import com.github.rholder.retry.Retryer;
import com.github.rholder.retry.RetryerBuilder;
import com.github.rholder.retry.WaitStrategies;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Sets;
import jakarta.inject.Inject;
import jakarta.inject.Singleton;
import java.io.IOException;
import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.graylog.failure.FailureSubmissionService;
import org.graylog2.indexer.InvalidWriteTargetException;
import org.graylog2.indexer.MasterNotDiscoveredException;
import org.graylog2.indexer.messages.DocumentNotFoundException;
import org.graylog2.indexer.messages.Indexable;
import org.graylog2.indexer.messages.IndexingError;
import org.graylog2.indexer.messages.IndexingRequest;
import org.graylog2.indexer.messages.IndexingResults;
import org.graylog2.indexer.messages.IndexingSuccess;
import org.graylog2.indexer.messages.MessageWithIndex;
import org.graylog2.indexer.messages.MessagesAdapter;
import org.graylog2.indexer.messages.RetryWait;
import org.graylog2.indexer.messages.TrafficAccounting;
import org.graylog2.indexer.results.ResultMessage;
import org.graylog2.shared.utilities.ExceptionUtils;
import org.graylog2.system.processing.ProcessingStatusRecorder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Singleton
public class Messages {
    private static final Logger LOG = LoggerFactory.getLogger(Messages.class);
    private static final int retrySecondsMultiplier = 500;
    static final RetryWait retryWait = new RetryWait(500);
    private final FailureSubmissionService failureSubmissionService;
    private final MessagesAdapter messagesAdapter;
    private final ProcessingStatusRecorder processingStatusRecorder;
    private final TrafficAccounting trafficAccounting;

    private RetryerBuilder<IndexingResults> createBulkRequestRetryerBuilder() {
        return RetryerBuilder.newBuilder().retryIfException(t -> ExceptionUtils.hasCauseOf(t, IOException.class) || t instanceof InvalidWriteTargetException || t instanceof MasterNotDiscoveredException).withWaitStrategy(WaitStrategies.exponentialWait((long)RetryWait.MAX_WAIT_TIME.getQuantity(), (TimeUnit)RetryWait.MAX_WAIT_TIME.getUnit())).withRetryListener(new RetryListener(){

            public <V> void onRetry(Attempt<V> attempt) {
                if (attempt.hasException()) {
                    LOG.warn("Caught exception during bulk indexing: {}, retrying (attempt #{}).", (Object)attempt.getExceptionCause(), (Object)attempt.getAttemptNumber());
                } else if (attempt.getAttemptNumber() > 1L) {
                    LOG.info("Bulk indexing finally successful (attempt #{}).", (Object)attempt.getAttemptNumber());
                }
            }
        });
    }

    @Inject
    public Messages(TrafficAccounting trafficAccounting, MessagesAdapter messagesAdapter, ProcessingStatusRecorder processingStatusRecorder, FailureSubmissionService failureSubmissionService) {
        this.trafficAccounting = trafficAccounting;
        this.messagesAdapter = messagesAdapter;
        this.processingStatusRecorder = processingStatusRecorder;
        this.failureSubmissionService = failureSubmissionService;
    }

    public ResultMessage get(String messageId, String index) throws DocumentNotFoundException, IOException {
        return this.messagesAdapter.get(messageId, index);
    }

    public List<String> analyze(String toAnalyze, String index, String analyzer) throws IOException {
        return this.messagesAdapter.analyze(toAnalyze, index, analyzer);
    }

    public IndexingResults bulkIndex(List<MessageWithIndex> messageList) {
        return this.bulkIndex(messageList, false, null);
    }

    public IndexingResults bulkIndex(List<MessageWithIndex> messageList, IndexingListener indexingListener) {
        return this.bulkIndex(messageList, false, indexingListener);
    }

    public IndexingResults bulkIndex(List<MessageWithIndex> messageList, boolean isSystemTraffic) {
        return this.bulkIndex(messageList, isSystemTraffic, null);
    }

    public IndexingResults bulkIndex(List<MessageWithIndex> messageList, boolean isSystemTraffic, IndexingListener indexingListener) {
        if (messageList.isEmpty()) {
            return IndexingResults.empty();
        }
        List<IndexingRequest> indexingRequestList = messageList.stream().map(entry -> IndexingRequest.create(entry.indexSet(), entry.message())).collect(Collectors.toList());
        return this.bulkIndexRequests(indexingRequestList, isSystemTraffic, indexingListener);
    }

    public IndexingResults bulkIndexRequests(List<IndexingRequest> indexingRequestList, boolean isSystemTraffic) {
        return this.bulkIndexRequests(indexingRequestList, isSystemTraffic, null);
    }

    public IndexingResults bulkIndexRequests(List<IndexingRequest> indexingRequestList, boolean isSystemTraffic, IndexingListener indexingListener) {
        IndexingResults indexingResults = this.runBulkRequest(indexingRequestList, indexingRequestList.size(), indexingListener);
        IndexingResults retryBlockResults = this.retryQualifyingIndividualItems(indexingRequestList, (List<IndexingError>)indexingResults.errors(), indexingListener);
        IndexingResults finalResults = retryBlockResults.mergeWith((List<IndexingSuccess>)indexingResults.successes(), List.of());
        this.recordTimestamp((List<IndexingSuccess>)finalResults.successes());
        this.accountTotalMessageSizes((List<IndexingSuccess>)finalResults.successes(), isSystemTraffic);
        if (!finalResults.errors().isEmpty()) {
            this.failureSubmissionService.submitIndexingErrors((Collection<IndexingError>)finalResults.errors());
        }
        return finalResults;
    }

    private IndexingResults retryQualifyingIndividualItems(List<IndexingRequest> messages, List<IndexingError> allFailedItems, IndexingListener indexingListener) {
        Set<IndexingError> retryableErrors = this.retryableErrorsFrom(allFailedItems);
        HashSet otherFailures = new HashSet(Sets.difference(new HashSet<IndexingError>(allFailedItems), retryableErrors));
        List<IndexingRequest> blockedMessages = this.messagesForResultItems(messages, retryableErrors);
        if (!retryableErrors.isEmpty()) {
            LOG.warn("Retrying {} messages, because their indices are blocked with status [read-only / allow delete]", (Object)retryableErrors.size());
        }
        long attempt = 1L;
        IndexingResults.Builder builder = IndexingResults.Builder.create();
        while (!retryableErrors.isEmpty()) {
            retryWait.waitBeforeRetrying(attempt++);
            IndexingResults indexingResults = this.runBulkRequest(blockedMessages, messages.size(), indexingListener);
            builder.addSuccesses((List<IndexingSuccess>)indexingResults.successes());
            ImmutableList<IndexingError> failedItems = indexingResults.errors();
            retryableErrors = this.retryableErrorsFrom((List<IndexingError>)failedItems);
            blockedMessages = this.messagesForResultItems(blockedMessages, retryableErrors);
            Sets.SetView newOtherFailures = Sets.difference(new HashSet<IndexingError>((Collection<IndexingError>)failedItems), retryableErrors);
            otherFailures.addAll(newOtherFailures);
            if (!retryableErrors.isEmpty()) continue;
            LOG.info("Retries were successful after {} attempts. Ingestion will continue now.", (Object)attempt);
        }
        builder.addErrors(otherFailures.stream().toList());
        return builder.build();
    }

    private List<IndexingRequest> messagesForResultItems(List<IndexingRequest> chunk, Set<IndexingError> indexBlocks) {
        Set blockedMessageIds = indexBlocks.stream().map(item -> item.message().getId()).collect(Collectors.toSet());
        return chunk.stream().filter(entry -> blockedMessageIds.contains(entry.message().getId())).collect(Collectors.toList());
    }

    private Set<IndexingError> retryableErrorsFrom(List<IndexingError> allFailedItems) {
        return allFailedItems.stream().filter(this::isRetryable).collect(Collectors.toSet());
    }

    private boolean isRetryable(IndexingError indexingError) {
        IndexingError.Type errorType = indexingError.error().type();
        return errorType.equals((Object)IndexingError.Type.IndexBlocked) || errorType.equals((Object)IndexingError.Type.DataTooLarge);
    }

    private IndexingResults runBulkRequest(List<IndexingRequest> indexingRequestList, int count, @Nullable IndexingListener indexingListener) {
        Retryer bulkRequestRetryer = indexingListener == null ? this.createBulkRequestRetryerBuilder().build() : this.createBulkRequestRetryerBuilder().withRetryListener(this.retryListenerFor(indexingListener)).build();
        try {
            return (IndexingResults)bulkRequestRetryer.call(() -> this.messagesAdapter.bulkIndex(indexingRequestList));
        }
        catch (RetryException | ExecutionException e) {
            if (e instanceof RetryException) {
                LOG.error("Could not bulk index {} messages. Giving up after {} attempts.", (Object)count, (Object)((RetryException)e).getNumberOfFailedAttempts());
            } else {
                LOG.error("Couldn't bulk index " + count + " messages.", e);
            }
            throw new RuntimeException(e);
        }
    }

    private RetryListener retryListenerFor(final IndexingListener indexingListener) {
        return new RetryListener(){

            public <V> void onRetry(Attempt<V> attempt) {
                if (attempt.hasException()) {
                    indexingListener.onRetry(attempt.getAttemptNumber());
                } else {
                    indexingListener.onSuccess(attempt.getDelaySinceFirstAttempt());
                }
            }
        };
    }

    private void accountTotalMessageSizes(List<IndexingSuccess> requests, boolean isSystemTraffic) {
        long totalSizeOfIndexedMessages = requests.stream().map(IndexingSuccess::message).mapToLong(Indexable::getSize).sum();
        if (isSystemTraffic) {
            this.trafficAccounting.addSystemTraffic(totalSizeOfIndexedMessages);
        } else {
            this.trafficAccounting.addOutputTraffic(totalSizeOfIndexedMessages);
        }
    }

    private void recordTimestamp(List<IndexingSuccess> messageList) {
        for (IndexingSuccess entry : messageList) {
            Indexable message = entry.message();
            this.processingStatusRecorder.updatePostIndexingReceiveTime(message.getReceiveTime());
        }
    }

    public static interface IndexingListener {
        public void onRetry(long var1);

        public void onSuccess(long var1);
    }
}

