/*
 * Decompiled with CFR 0.152.
 */
package org.graylog2.shared.buffers.processors;

import com.codahale.metrics.Counter;
import com.codahale.metrics.MetricRegistry;
import com.codahale.metrics.Timer;
import com.google.common.base.Strings;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.net.InetAddresses;
import com.google.inject.assistedinject.Assisted;
import com.google.inject.assistedinject.AssistedInject;
import com.lmax.disruptor.EventHandler;
import java.net.InetAddress;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nullable;
import org.graylog2.plugin.Message;
import org.graylog2.plugin.ResolvableInetSocketAddress;
import org.graylog2.plugin.ServerStatus;
import org.graylog2.plugin.buffers.MessageEvent;
import org.graylog2.plugin.inputs.codecs.Codec;
import org.graylog2.plugin.inputs.codecs.MultiMessageCodec;
import org.graylog2.plugin.journal.RawMessage;
import org.graylog2.shared.journal.Journal;
import org.graylog2.shared.messageq.MessageQueueAcknowledger;
import org.graylog2.shared.utilities.ExceptionUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class DecodingProcessor
implements EventHandler<MessageEvent> {
    private static final Logger LOG = LoggerFactory.getLogger(DecodingProcessor.class);
    private final Timer decodeTime;
    private final Counter decodedTrafficCounter;
    private final Map<String, Codec.Factory<? extends Codec>> codecFactory;
    private final ServerStatus serverStatus;
    private final MetricRegistry metricRegistry;
    private final Journal journal;
    private final MessageQueueAcknowledger acknowledger;
    private final Timer parseTime;

    @AssistedInject
    public DecodingProcessor(Map<String, Codec.Factory<? extends Codec>> codecFactory, ServerStatus serverStatus, MetricRegistry metricRegistry, Journal journal, MessageQueueAcknowledger acknowledger, @Assisted(value="decodeTime") Timer decodeTime, @Assisted(value="parseTime") Timer parseTime) {
        this.codecFactory = codecFactory;
        this.serverStatus = serverStatus;
        this.metricRegistry = metricRegistry;
        this.journal = journal;
        this.acknowledger = acknowledger;
        this.parseTime = parseTime;
        this.decodeTime = decodeTime;
        this.decodedTrafficCounter = metricRegistry.counter("org.graylog2.traffic.decoded");
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void onEvent(MessageEvent event, long sequence, boolean endOfBatch) throws Exception {
        Timer.Context context = this.decodeTime.time();
        try {
            this.processMessage(event);
        }
        catch (Exception e) {
            RawMessage rawMessage = event.getRaw();
            LOG.error("Error processing message " + rawMessage, ExceptionUtils.getRootCause(e));
            this.acknowledger.acknowledge(rawMessage.getMessageQueueId());
            event.clearMessages();
        }
        finally {
            if (event.getMessage() != null) {
                event.getMessage().recordTiming(this.serverStatus, "decode", context.stop());
            } else if (event.getMessages() != null) {
                for (Message message : event.getMessages()) {
                    message.recordTiming(this.serverStatus, "decode", context.stop());
                }
            } else {
                this.acknowledger.acknowledge(event.getRaw().getMessageQueueId());
            }
            event.clearRaw();
        }
    }

    private void processMessage(MessageEvent event) throws ExecutionException {
        String inputIdOnCurrentNode;
        RawMessage raw = event.getRaw();
        try {
            inputIdOnCurrentNode = ((RawMessage.SourceNode)Iterables.getLast(raw.getSourceNodes())).inputId;
        }
        catch (NoSuchElementException e) {
            inputIdOnCurrentNode = null;
        }
        Codec.Factory<? extends Codec> factory = this.codecFactory.get(raw.getCodecName());
        if (factory == null) {
            LOG.warn("Couldn't find factory for codec <{}>, skipping message {} on input <{}>.", new Object[]{raw.getCodecName(), raw, inputIdOnCurrentNode});
            return;
        }
        Codec codec = factory.create(raw.getCodecConfig());
        String baseMetricName = MetricRegistry.name(codec.getClass(), (String[])new String[]{inputIdOnCurrentNode});
        Message message = null;
        Collection<Message> messages = null;
        Timer.Context decodeTimeCtx = this.parseTime.time();
        try {
            if (codec instanceof MultiMessageCodec) {
                messages = ((MultiMessageCodec)codec).decodeMessages(raw);
            } else {
                message = codec.decode(raw);
            }
        }
        catch (RuntimeException e) {
            LOG.error("Unable to decode raw message {} on input <{}>.", (Object)raw, (Object)inputIdOnCurrentNode);
            this.metricRegistry.meter(MetricRegistry.name((String)baseMetricName, (String[])new String[]{"failures"})).mark();
            throw e;
        }
        finally {
            long decodeTime = decodeTimeCtx.stop();
        }
        if (message != null) {
            event.setMessage(this.postProcessMessage(raw, codec, inputIdOnCurrentNode, baseMetricName, message, decodeTime));
        } else if (messages != null && !messages.isEmpty()) {
            ArrayList processedMessages = Lists.newArrayListWithCapacity((int)messages.size());
            for (Message msg : messages) {
                Message processedMessage = this.postProcessMessage(raw, codec, inputIdOnCurrentNode, baseMetricName, msg, decodeTime);
                if (processedMessage == null) continue;
                processedMessages.add(processedMessage);
            }
            event.setMessages(processedMessages);
        }
    }

    @Nullable
    private Message postProcessMessage(RawMessage raw, Codec codec, String inputIdOnCurrentNode, String baseMetricName, Message message, long decodeTime) {
        ResolvableInetSocketAddress remoteAddress;
        if (message == null) {
            this.metricRegistry.meter(MetricRegistry.name((String)baseMetricName, (String[])new String[]{"failures"})).mark();
            return null;
        }
        if (!message.isComplete()) {
            this.metricRegistry.meter(MetricRegistry.name((String)baseMetricName, (String[])new String[]{"incomplete"})).mark();
            if (LOG.isDebugEnabled()) {
                LOG.debug("Dropping incomplete message {} on input <{}>. Parsed fields: [{}]", new Object[]{raw, inputIdOnCurrentNode, message.getFields()});
            }
            return null;
        }
        message.setMessageQueueId(raw.getMessageQueueId());
        if (message.getSequenceNr() == 0) {
            message.setSequenceNr(raw.getSequenceNr());
        }
        message.recordTiming(this.serverStatus, "parse", decodeTime);
        this.metricRegistry.timer(MetricRegistry.name((String)baseMetricName, (String[])new String[]{"parseTime"})).update(decodeTime, TimeUnit.NANOSECONDS);
        for (RawMessage.SourceNode node : raw.getSourceNodes()) {
            switch (node.type) {
                case SERVER: {
                    if (message.getField("gl2_source_input") != null) {
                        LOG.debug("Multiple server nodes ({} {}) set for message id {}", new Object[]{message.getField("gl2_source_input"), node.nodeId, message.getId()});
                    }
                    message.addField("gl2_source_input", node.inputId);
                    message.addField("gl2_source_node", node.nodeId);
                    break;
                }
                case RADIO: {
                    if (message.getField("gl2_source_radio_input") != null) {
                        LOG.debug("Multiple radio nodes ({} {}) set for message id {}", new Object[]{message.getField("gl2_source_radio_input"), node.nodeId, message.getId()});
                    }
                    message.addField("gl2_source_radio_input", node.inputId);
                    message.addField("gl2_source_radio", node.nodeId);
                }
            }
        }
        if (inputIdOnCurrentNode != null) {
            try {
                message.setSourceInputId(inputIdOnCurrentNode);
            }
            catch (RuntimeException e) {
                LOG.warn("Unable to find input with id " + inputIdOnCurrentNode + ", not setting input id in this message.", (Throwable)e);
            }
        }
        if ((remoteAddress = raw.getRemoteAddress()) != null) {
            String addrString = InetAddresses.toAddrString((InetAddress)remoteAddress.getAddress());
            message.addField("gl2_remote_ip", addrString);
            if (remoteAddress.getPort() > 0) {
                message.addField("gl2_remote_port", remoteAddress.getPort());
            }
            if (remoteAddress.isReverseLookedUp()) {
                message.addField("gl2_remote_hostname", remoteAddress.getHostName());
            }
            if (Strings.isNullOrEmpty((String)message.getSource())) {
                message.setSource(addrString);
            }
        }
        if (codec.getConfiguration() != null && codec.getConfiguration().stringIsSet("override_source")) {
            message.setSource(codec.getConfiguration().getString("override_source"));
        }
        if (Strings.isNullOrEmpty((String)message.getSource())) {
            message.setSource("unknown");
        }
        if (message.getReceiveTime() == null) {
            message.setReceiveTime(raw.getTimestamp());
        }
        this.metricRegistry.meter(MetricRegistry.name((String)baseMetricName, (String[])new String[]{"processedMessages"})).mark();
        this.decodedTrafficCounter.inc(message.getSize());
        return message;
    }

    public static interface Factory {
        public DecodingProcessor create(@Assisted(value="decodeTime") Timer var1, @Assisted(value="parseTime") Timer var2);
    }
}

