/*
 * Decompiled with CFR 0.152.
 */
package org.graylog2.shared.buffers;

import com.codahale.metrics.Counter;
import com.codahale.metrics.MetricRegistry;
import com.google.common.base.Function;
import com.google.common.collect.Lists;
import com.lmax.disruptor.EventHandler;
import jakarta.inject.Inject;
import java.util.List;
import java.util.Objects;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.graylog2.shared.buffers.RawMessageEvent;
import org.graylog2.shared.messageq.MessageQueueWriter;
import org.graylog2.system.processing.ProcessingStatusRecorder;
import org.joda.time.DateTime;
import org.joda.time.DateTimeZone;
import org.joda.time.ReadableInstant;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class JournallingMessageHandler
implements EventHandler<RawMessageEvent> {
    private static final Logger log = LoggerFactory.getLogger(JournallingMessageHandler.class);
    private final List<RawMessageEvent> batch = Lists.newArrayList();
    private final Counter byteCounter;
    private final MessageQueueWriter messageQueueWriter;
    private final ProcessingStatusRecorder processingStatusRecorder;

    @Inject
    public JournallingMessageHandler(MetricRegistry metrics, MessageQueueWriter messageQueueWriter, ProcessingStatusRecorder processingStatusRecorder) {
        this.messageQueueWriter = messageQueueWriter;
        this.processingStatusRecorder = processingStatusRecorder;
        this.byteCounter = metrics.counter(MetricRegistry.name(JournallingMessageHandler.class, (String[])new String[]{"written_bytes"}));
    }

    public void onEvent(RawMessageEvent event, long sequence, boolean endOfBatch) throws Exception {
        this.batch.add(event);
        if (endOfBatch) {
            log.debug("End of batch, journaling {} messages", (Object)this.batch.size());
            Filter metricsFilter = new Filter();
            List<RawMessageEvent> entries = this.batch.stream().map(metricsFilter).filter(Objects::nonNull).collect(Collectors.toList());
            this.processingStatusRecorder.updateIngestReceiveTime(metricsFilter.getLatestReceiveTime());
            this.messageQueueWriter.write(entries);
            this.batch.stream().filter(Objects::nonNull).forEach(RawMessageEvent::clear);
            this.batch.clear();
        }
    }

    private class Filter
    implements Function<RawMessageEvent, RawMessageEvent> {
        private long bytesWritten = 0L;
        private DateTime latestReceiveTime = new DateTime(0L, DateTimeZone.UTC);

        private Filter() {
        }

        public long getBytesWritten() {
            return this.bytesWritten;
        }

        public DateTime getLatestReceiveTime() {
            return this.latestReceiveTime;
        }

        @Nullable
        public RawMessageEvent apply(RawMessageEvent input) {
            if (log.isTraceEnabled()) {
                log.trace("Journalling message {}", (Object)input.getMessageId());
            }
            if (input.getEncodedRawMessage() == null) {
                log.error("Skipping RawMessageEvent with null encodedRawMessage");
                return null;
            }
            int size = input.getEncodedRawMessage().length;
            this.bytesWritten += (long)size;
            JournallingMessageHandler.this.byteCounter.inc((long)size);
            DateTime messageTimestamp = input.getMessageTimestamp();
            if (messageTimestamp != null) {
                this.latestReceiveTime = this.latestReceiveTime.isBefore((ReadableInstant)messageTimestamp) ? messageTimestamp : this.latestReceiveTime;
            }
            return input;
        }
    }
}

