/*
 * Decompiled with CFR 0.152.
 */
package org.graylog2.shared.buffers;

import com.codahale.metrics.Gauge;
import com.codahale.metrics.InstrumentedThreadFactory;
import com.codahale.metrics.Meter;
import com.codahale.metrics.MetricRegistry;
import com.codahale.metrics.Timer;
import com.google.common.collect.ImmutableMap;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.lmax.disruptor.ExceptionHandler;
import com.lmax.disruptor.WaitStrategy;
import com.lmax.disruptor.WorkHandler;
import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.dsl.ProducerType;
import java.util.concurrent.ThreadFactory;
import javax.annotation.Nonnull;
import javax.inject.Inject;
import javax.inject.Named;
import javax.inject.Singleton;
import org.graylog2.plugin.Message;
import org.graylog2.plugin.buffers.Buffer;
import org.graylog2.plugin.buffers.MessageEvent;
import org.graylog2.plugin.journal.RawMessage;
import org.graylog2.shared.buffers.LoggingExceptionHandler;
import org.graylog2.shared.buffers.processors.DecodingProcessor;
import org.graylog2.shared.buffers.processors.ProcessBufferProcessor;
import org.graylog2.shared.metrics.MetricUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Singleton
public class ProcessBuffer
extends Buffer {
    private static final Logger LOG = LoggerFactory.getLogger(ProcessBuffer.class);
    private final Meter incomingMessages;
    private final ProcessBufferProcessor[] processors;

    @Inject
    public ProcessBuffer(MetricRegistry metricRegistry, DecodingProcessor.Factory decodingProcessorFactory, ProcessBufferProcessor.Factory bufferProcessorFactory, @Named(value="processbuffer_processors") int processorCount, @Named(value="ring_size") int ringSize, @Named(value="processor_wait_strategy") String waitStrategyName) {
        this.ringBufferSize = ringSize;
        this.incomingMessages = metricRegistry.meter(MetricRegistry.name(ProcessBuffer.class, (String[])new String[]{"incomingMessages"}));
        Timer parseTime = metricRegistry.timer(MetricRegistry.name(ProcessBuffer.class, (String[])new String[]{"parseTime"}));
        Timer decodeTime = metricRegistry.timer(MetricRegistry.name(ProcessBuffer.class, (String[])new String[]{"decodeTime"}));
        MetricUtils.safelyRegister(metricRegistry, "org.graylog2.buffers.process.usage", new Gauge<Long>(){

            public Long getValue() {
                return ProcessBuffer.this.getUsage();
            }
        });
        MetricUtils.safelyRegister(metricRegistry, "org.graylog2.buffers.process.size", MetricUtils.constantGauge(this.ringBufferSize));
        WaitStrategy waitStrategy = this.getWaitStrategy(waitStrategyName, "processor_wait_strategy");
        Disruptor disruptor = new Disruptor(MessageEvent.EVENT_FACTORY, this.ringBufferSize, this.threadFactory(metricRegistry), ProducerType.MULTI, waitStrategy);
        disruptor.setDefaultExceptionHandler((ExceptionHandler)new LoggingExceptionHandler(LOG));
        LOG.info("Initialized ProcessBuffer with ring size <{}> and wait strategy <{}>.", (Object)this.ringBufferSize, (Object)waitStrategy.getClass().getSimpleName());
        this.processors = new ProcessBufferProcessor[processorCount];
        for (int i = 0; i < processorCount; ++i) {
            this.processors[i] = bufferProcessorFactory.create(decodingProcessorFactory.create(decodeTime, parseTime));
        }
        disruptor.handleEventsWithWorkerPool((WorkHandler[])this.processors);
        this.ringBuffer = disruptor.start();
    }

    private ThreadFactory threadFactory(MetricRegistry metricRegistry) {
        ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("processbufferprocessor-%d").build();
        return new InstrumentedThreadFactory(threadFactory, metricRegistry, MetricRegistry.name(this.getClass(), (String[])new String[]{"thread-factory"}));
    }

    public void insertBlocking(@Nonnull RawMessage rawMessage) {
        long sequence = this.ringBuffer.next();
        MessageEvent event = (MessageEvent)this.ringBuffer.get(sequence);
        event.setRaw(rawMessage);
        this.ringBuffer.publish(sequence);
        this.afterInsert(1);
    }

    @Override
    protected void afterInsert(int n) {
        this.incomingMessages.mark((long)n);
    }

    public ImmutableMap<String, String> getDump() {
        ImmutableMap.Builder processBufferDump = ImmutableMap.builder();
        int processorsLength = this.processors.length;
        for (int i = 0; i < processorsLength; ++i) {
            ProcessBufferProcessor proc = this.processors[i];
            processBufferDump.put((Object)("ProcessBufferProcessor #" + i), (Object)proc.getCurrentMessage().map(Message::toString).orElse("idle"));
        }
        return processBufferDump.build();
    }
}

