/*
 * Decompiled with CFR 0.152.
 */
package org.graylog2.outputs;

import com.codahale.metrics.Histogram;
import com.codahale.metrics.Meter;
import com.codahale.metrics.MetricRegistry;
import com.codahale.metrics.Timer;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import jakarta.inject.Inject;
import jakarta.inject.Named;
import jakarta.inject.Singleton;
import java.time.Duration;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import org.graylog2.indexer.cluster.Cluster;
import org.graylog2.outputs.IndexSetAwareMessageOutputBuffer;
import org.graylog2.outputs.filter.FilteredMessage;
import org.graylog2.outputs.filter.OutputFilter;
import org.graylog2.plugin.Message;
import org.graylog2.plugin.outputs.FilteredMessageOutput;
import org.graylog2.plugin.outputs.MessageOutput;
import org.graylog2.shared.messageq.MessageQueueAcknowledger;
import org.graylog2.system.shutdown.GracefulShutdownHook;
import org.graylog2.system.shutdown.GracefulShutdownService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Singleton
public class BatchedMessageFilterOutput
implements MessageOutput,
GracefulShutdownHook {
    private static final Logger LOG = LoggerFactory.getLogger(BatchedMessageFilterOutput.class);
    private final Map<String, FilteredMessageOutput> outputs;
    private final OutputFilter outputFilter;
    private final Duration outputFlushInterval;
    private final Duration shutdownTimeout;
    private final ScheduledExecutorService daemonScheduler;
    private final AtomicInteger activeFlushThreads = new AtomicInteger(0);
    private final Histogram batchSize;
    private final Meter bufferFlushes;
    private final Meter bufferFlushFailures;
    private final Meter bufferFlushesRequested;
    private final Cluster cluster;
    private final MessageQueueAcknowledger acknowledger;
    private final Meter outputWriteFailures;
    private final Timer processTime;
    private final GracefulShutdownService gracefulShutdownService;
    private final IndexSetAwareMessageOutputBuffer buffer;
    private ScheduledFuture<?> flushTask;

    @Inject
    public BatchedMessageFilterOutput(Map<String, FilteredMessageOutput> outputs, OutputFilter outputFilter, MetricRegistry metricRegistry, Cluster cluster, MessageQueueAcknowledger acknowledger, IndexSetAwareMessageOutputBuffer indexSetAwareMessageOutputBuffer, GracefulShutdownService gracefulShutdownService, @Named(value="output_flush_interval") int outputFlushInterval, @Named(value="shutdown_timeout") int shutdownTimeoutMs, @Named(value="daemonScheduler") ScheduledExecutorService daemonScheduler) {
        if (outputs.isEmpty()) {
            throw new IllegalStateException("No registered outputs found!");
        }
        this.outputs = outputs;
        this.outputFilter = outputFilter;
        this.cluster = cluster;
        this.acknowledger = acknowledger;
        this.outputFlushInterval = Duration.ofSeconds(outputFlushInterval);
        this.shutdownTimeout = Duration.ofMillis(shutdownTimeoutMs);
        this.daemonScheduler = daemonScheduler;
        this.buffer = indexSetAwareMessageOutputBuffer;
        this.batchSize = metricRegistry.histogram(MetricRegistry.name(this.getClass(), (String[])new String[]{"batchSize"}));
        this.bufferFlushes = metricRegistry.meter(MetricRegistry.name(this.getClass(), (String[])new String[]{"bufferFlushes"}));
        this.bufferFlushFailures = metricRegistry.meter(MetricRegistry.name(this.getClass(), (String[])new String[]{"bufferFlushFailures"}));
        this.bufferFlushesRequested = metricRegistry.meter(MetricRegistry.name(this.getClass(), (String[])new String[]{"bufferFlushesRequested"}));
        this.processTime = metricRegistry.timer(MetricRegistry.name(this.getClass(), (String[])new String[]{"processTime"}));
        this.outputWriteFailures = metricRegistry.meter(MetricRegistry.name(this.getClass(), (String[])new String[]{"outputWriteFailures"}));
        this.gracefulShutdownService = gracefulShutdownService;
    }

    @Override
    public void initialize() throws Exception {
        LOG.debug("Starting buffer flush task to run every {} milliseconds", (Object)this.outputFlushInterval.toMillis());
        this.flushTask = this.daemonScheduler.scheduleAtFixedRate(() -> {
            try {
                if (this.buffer.shouldFlush(this.outputFlushInterval)) {
                    this.forceFlush();
                }
            }
            catch (Exception e) {
                LOG.error("Caught exception while trying to flush outputs", (Throwable)e);
            }
        }, this.outputFlushInterval.toMillis(), this.outputFlushInterval.toMillis(), TimeUnit.MILLISECONDS);
        this.gracefulShutdownService.register(this);
    }

    @VisibleForTesting
    void forceFlush() {
        LOG.debug("Force-flushing the buffer");
        this.bufferFlushesRequested.mark();
        this.buffer.flush(this::flush);
    }

    private void flush(List<FilteredMessage> filteredMessages) {
        if (filteredMessages.isEmpty()) {
            if (LOG.isTraceEnabled()) {
                LOG.trace("Skipping buffer flush with empty buffer");
            }
            return;
        }
        this.batchSize.update(filteredMessages.size());
        this.activeFlushThreads.incrementAndGet();
        if (LOG.isTraceEnabled()) {
            LOG.trace("Starting flushing {} messages, flush threads active {}", (Object)filteredMessages.size(), (Object)this.activeFlushThreads.get());
        }
        try (Timer.Context ignored = this.processTime.time();){
            for (FilteredMessageOutput output : this.outputs.values()) {
                try {
                    if (LOG.isTraceEnabled()) {
                        LOG.trace("Writing {} message(s) to output <{}>", (Object)filteredMessages.size(), (Object)output);
                    }
                    output.writeFiltered(filteredMessages);
                }
                catch (Exception e) {
                    LOG.error("Couldn't write {} message(s) to output <{}>", new Object[]{filteredMessages.size(), output.getClass(), e});
                    this.outputWriteFailures.mark();
                }
            }
            this.bufferFlushes.mark();
            this.acknowledger.acknowledge(filteredMessages.stream().map(FilteredMessage::message).toList());
        }
        catch (Exception e) {
            LOG.error("Error while flushing messages", (Throwable)e);
            this.bufferFlushFailures.mark();
        }
        this.activeFlushThreads.decrementAndGet();
        if (LOG.isTraceEnabled()) {
            LOG.trace("Flushing {} messages completed", (Object)filteredMessages.size());
        }
    }

    @Override
    public boolean isRunning() {
        return true;
    }

    @Override
    public void write(List<Message> messages) throws Exception {
        if (LOG.isTraceEnabled()) {
            LOG.trace("Writing {} messages", (Object)messages.size());
        }
        for (Message message : messages) {
            this.write(message);
        }
    }

    @Override
    public void write(Message message) throws Exception {
        if (LOG.isTraceEnabled()) {
            LOG.trace("Running output filter on message: {}", (Object)message);
        }
        FilteredMessage filteredMessage = this.outputFilter.apply(message);
        if (LOG.isTraceEnabled()) {
            LOG.trace("Appending filtered message <{}> to buffer", (Object)filteredMessage);
        }
        this.buffer.appendAndFlush(filteredMessage, this::flush);
    }

    @VisibleForTesting
    void cancelFlushTask() {
        if (this.flushTask != null) {
            LOG.debug("Cancelling flush task");
            this.flushTask.cancel(false);
        }
    }

    @Override
    public void stop() {
        LOG.debug("Stopping output filter");
        this.doGracefulShutdown();
        try {
            this.gracefulShutdownService.unregister(this);
        }
        catch (IllegalStateException e) {
            LOG.debug("Couldn't unregister from graceful shutdown service: {}", (Object)e.getMessage());
        }
    }

    @Override
    public void doGracefulShutdown() {
        this.cancelFlushTask();
        if (this.cluster.isConnected() && this.cluster.isDeflectorHealthy()) {
            ExecutorService executorService = Executors.newSingleThreadExecutor(new ThreadFactoryBuilder().setNameFormat("batched-message-filter-output-shutdown-flush").build());
            try {
                LOG.debug("Flushing the current buffer for shutdown");
                executorService.submit(this::forceFlush).get(this.shutdownTimeout.toMillis(), TimeUnit.MILLISECONDS);
            }
            catch (InterruptedException interruptedException) {
            }
            catch (ExecutionException e) {
                LOG.warn("Flushing current message batch to outputs while stopping failed: {}.", (Object)e.getMessage());
            }
            catch (TimeoutException e) {
                LOG.warn("Timed out flushing current batch to outputs while stopping.");
            }
            finally {
                executorService.shutdownNow();
            }
        }
    }
}

