/*
 * Decompiled with CFR 0.152.
 */
package org.graylog2.shared.messageq.localkafka;

import com.codahale.metrics.Counter;
import com.codahale.metrics.Histogram;
import com.codahale.metrics.Meter;
import com.codahale.metrics.Metric;
import com.codahale.metrics.MetricRegistry;
import com.google.common.eventbus.EventBus;
import com.google.common.util.concurrent.Uninterruptibles;
import jakarta.inject.Inject;
import jakarta.inject.Named;
import jakarta.inject.Singleton;
import java.util.List;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import org.graylog2.plugin.journal.RawMessage;
import org.graylog2.shared.buffers.ProcessBuffer;
import org.graylog2.shared.journal.Journal;
import org.graylog2.shared.messageq.AbstractMessageQueueReader;
import org.graylog2.shared.messageq.MessageQueueReader;
import org.graylog2.shared.metrics.HdrHistogram;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Singleton
public class LocalKafkaMessageQueueReader
extends AbstractMessageQueueReader {
    private static final Logger log = LoggerFactory.getLogger(LocalKafkaMessageQueueReader.class);
    private final Journal journal;
    private final ProcessBuffer processBuffer;
    private final Semaphore journalFilled;
    private final MetricRegistry metricRegistry;
    private final Meter readMessages;
    private final MessageQueueReader.Metrics readerMetrics;
    private Histogram requestedReadCount;
    private final Counter readBlocked;
    private Thread executionThread;

    @Inject
    public LocalKafkaMessageQueueReader(Journal journal, ProcessBuffer processBuffer, @Named(value="JournalSignal") Semaphore journalFilled, MetricRegistry metricRegistry, EventBus eventBus, MessageQueueReader.Metrics readerMetrics) {
        super(eventBus);
        this.journal = journal;
        this.processBuffer = processBuffer;
        this.journalFilled = journalFilled;
        this.metricRegistry = metricRegistry;
        this.readBlocked = metricRegistry.counter(MetricRegistry.name(this.getClass(), (String[])new String[]{"readBlocked"}));
        this.readMessages = metricRegistry.meter(MetricRegistry.name(this.getClass(), (String[])new String[]{"readMessages"}));
        this.readerMetrics = readerMetrics;
    }

    @Override
    protected void startUp() throws Exception {
        super.startUp();
        this.executionThread = Thread.currentThread();
    }

    @Override
    protected void shutDown() throws Exception {
        super.shutDown();
    }

    protected void triggerShutdown() {
        this.executionThread.interrupt();
    }

    protected void run() throws Exception {
        try {
            this.requestedReadCount = (Histogram)this.metricRegistry.register(MetricRegistry.name(this.getClass(), (String[])new String[]{"requestedReadCount"}), (Metric)new HdrHistogram(this.processBuffer.getRingBufferSize() + 1, 3));
        }
        catch (IllegalArgumentException e) {
            log.warn("Metric already exists", (Throwable)e);
            throw e;
        }
        while (this.isRunning()) {
            if (!this.shouldBeReading()) {
                Uninterruptibles.sleepUninterruptibly((long)100L, (TimeUnit)TimeUnit.MILLISECONDS);
                continue;
            }
            long remainingCapacity = this.processBuffer.getRemainingCapacity();
            this.requestedReadCount.update(remainingCapacity);
            List<Journal.JournalReadEntry> encodedRawMessages = this.journal.read(remainingCapacity);
            if (encodedRawMessages.isEmpty()) {
                log.debug("No messages to read from Journal, waiting until the writer adds more messages.");
                try {
                    this.readBlocked.inc();
                    this.journalFilled.acquire();
                }
                catch (InterruptedException ignored) {
                    continue;
                }
                log.debug("Messages have been written to Journal, continuing to read.");
                this.journalFilled.drainPermits();
                continue;
            }
            this.readMessages.mark((long)encodedRawMessages.size());
            this.readerMetrics.readMessages().mark((long)encodedRawMessages.size());
            log.debug("Processing {} messages from journal.", (Object)encodedRawMessages.size());
            for (Journal.JournalReadEntry encodedRawMessage : encodedRawMessages) {
                RawMessage rawMessage = RawMessage.decode(encodedRawMessage.getPayload(), encodedRawMessage.getOffset());
                this.readerMetrics.readBytes().mark((long)encodedRawMessage.getPayload().length);
                if (rawMessage == null) {
                    log.error("Found null raw message!");
                    this.journal.markJournalOffsetCommitted(encodedRawMessage.getOffset());
                    continue;
                }
                this.processBuffer.insertBlocking(rawMessage);
            }
        }
        log.info("Stopping.");
    }
}

