/*
 * Decompiled with CFR 0.152.
 */
package org.graylog.failure;

import com.google.common.collect.Lists;
import com.google.common.util.concurrent.AbstractExecutionThreadService;
import jakarta.inject.Inject;
import jakarta.inject.Named;
import jakarta.inject.Singleton;
import java.util.Collection;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.graylog.failure.Failure;
import org.graylog.failure.FailureBatch;
import org.graylog.failure.FailureHandler;
import org.graylog.failure.FailureSubmissionQueue;
import org.graylog2.Configuration;
import org.graylog2.plugin.Message;
import org.graylog2.shared.messageq.MessageQueueAcknowledger;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Singleton
public class FailureHandlingService
extends AbstractExecutionThreadService {
    private final Logger logger = LoggerFactory.getLogger(((Object)((Object)this)).getClass());
    private final List<FailureHandler> fallbackFailureHandlerAsList;
    private final Set<FailureHandler> failureHandlers;
    private final FailureSubmissionQueue failureSubmissionQueue;
    private final Configuration configuration;
    private final MessageQueueAcknowledger acknowledger;
    private Thread executionThread;

    @Inject
    public FailureHandlingService(@Named(value="fallbackFailureHandler") FailureHandler fallbackFailureHandler, Set<FailureHandler> failureHandlers, FailureSubmissionQueue failureSubmissionQueue, Configuration configuration, MessageQueueAcknowledger acknowledger) {
        this.fallbackFailureHandlerAsList = Lists.newArrayList((Object[])new FailureHandler[]{fallbackFailureHandler});
        this.failureHandlers = failureHandlers;
        this.failureSubmissionQueue = failureSubmissionQueue;
        this.configuration = configuration;
        this.acknowledger = acknowledger;
    }

    protected void startUp() throws Exception {
        this.executionThread = Thread.currentThread();
        this.logger.debug("Starting up the service.");
    }

    protected void shutDown() throws Exception {
        long shutdownAwaitInsMs = this.configuration.getFailureHandlingShutdownAwait().toMilliseconds();
        int remainingBatchCount = 0;
        FailureBatch remainingFailureBatch = this.failureSubmissionQueue.consumeBlockingWithTimeout(shutdownAwaitInsMs);
        while (remainingFailureBatch != null) {
            this.handle(remainingFailureBatch);
            ++remainingBatchCount;
            remainingFailureBatch = this.failureSubmissionQueue.consumeBlockingWithTimeout(shutdownAwaitInsMs);
        }
        this.logger.info("Shutting down the service. Processed {} remaining failure batches.", (Object)remainingBatchCount);
        this.failureSubmissionQueue.logStats("FailureHandlerService#shutDown");
    }

    protected void triggerShutdown() {
        this.logger.debug("Requested to shut down.");
        this.executionThread.interrupt();
        this.failureSubmissionQueue.logStats("FailureHandlerService#triggerShutdown");
    }

    protected void run() throws Exception {
        if (this.isRunning()) {
            this.logger.debug("The service is up and running!");
        }
        while (this.isRunning()) {
            try {
                this.handle(this.failureSubmissionQueue.consumeBlocking());
            }
            catch (InterruptedException ignored) {
                this.logger.info("The service's thread has been interrupted. The queue currently contains {} failure batches.", (Object)this.failureSubmissionQueue.queueSize());
            }
            catch (Exception e) {
                this.logger.error("Error occurred while handling failures!", (Throwable)e);
            }
        }
        this.logger.debug("The service has been interrupted.");
    }

    private void handle(FailureBatch failureBatch) {
        this.suitableHandlers(failureBatch).forEach(handler -> {
            try {
                handler.handle(failureBatch);
            }
            catch (Exception e) {
                this.logger.error("Error occurred while handling failures by {}", (Object)handler.getClass().getName());
            }
        });
        List<Message> requiresAcknowledgement = failureBatch.getFailures().stream().filter(Failure::requiresAcknowledgement).map(Failure::failedMessage).filter(Message.class::isInstance).map(Message.class::cast).collect(Collectors.toList());
        if (!requiresAcknowledgement.isEmpty()) {
            this.acknowledger.acknowledge(requiresAcknowledgement);
        }
    }

    private List<FailureHandler> suitableHandlers(FailureBatch failureBatch) {
        List<FailureHandler> suitableHandlers = this.suitableHandlers(this.failureHandlers, failureBatch).filter(FailureHandler::isEnabled).collect(Collectors.toList());
        if (suitableHandlers.isEmpty()) {
            return this.suitableHandlers(this.fallbackFailureHandlerAsList, failureBatch).collect(Collectors.toList());
        }
        return suitableHandlers;
    }

    private Stream<FailureHandler> suitableHandlers(Collection<FailureHandler> handlers, FailureBatch failureBatch) {
        return handlers.stream().filter(h -> h.supports(failureBatch));
    }
}

