/*
 * Decompiled with CFR 0.152.
 */
package org.graylog2.plugin.inputs.transports;

import com.codahale.metrics.Meter;
import com.codahale.metrics.MetricRegistry;
import com.codahale.metrics.MetricSet;
import com.codahale.metrics.Timer;
import com.google.common.collect.Maps;
import com.google.common.util.concurrent.Callables;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.concurrent.Callable;
import javax.annotation.Nullable;
import org.graylog2.plugin.LocalMetricRegistry;
import org.graylog2.plugin.MetricSets;
import org.graylog2.plugin.configuration.Configuration;
import org.graylog2.plugin.configuration.ConfigurationRequest;
import org.graylog2.plugin.inputs.MessageInput;
import org.graylog2.plugin.inputs.MisfireException;
import org.graylog2.plugin.inputs.codecs.CodecAggregator;
import org.graylog2.plugin.inputs.transports.Transport;
import org.graylog2.plugin.inputs.util.PacketInformationDumper;
import org.graylog2.plugin.inputs.util.ThroughputCounter;
import org.graylog2.plugin.journal.RawMessage;
import org.jboss.netty.bootstrap.Bootstrap;
import org.jboss.netty.bootstrap.ConnectionlessBootstrap;
import org.jboss.netty.bootstrap.ServerBootstrap;
import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelHandler;
import org.jboss.netty.channel.ChannelHandlerContext;
import org.jboss.netty.channel.ChannelPipeline;
import org.jboss.netty.channel.ChannelPipelineFactory;
import org.jboss.netty.channel.Channels;
import org.jboss.netty.channel.ExceptionEvent;
import org.jboss.netty.channel.MessageEvent;
import org.jboss.netty.channel.SimpleChannelHandler;
import org.jboss.netty.channel.SimpleChannelUpstreamHandler;
import org.jboss.netty.channel.socket.DatagramChannel;
import org.jboss.netty.channel.socket.DefaultDatagramChannelConfig;
import org.jboss.netty.channel.socket.ServerSocketChannelConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class NettyTransport
implements Transport {
    public static final String CK_BIND_ADDRESS = "bind_address";
    public static final String CK_PORT = "port";
    public static final String CK_RECV_BUFFER_SIZE = "recv_buffer_size";
    private static final Logger log = LoggerFactory.getLogger(NettyTransport.class);
    protected final MetricRegistry localRegistry;
    private final InetSocketAddress socketAddress;
    protected final ThroughputCounter throughputCounter;
    private final long recvBufferSize;
    @Nullable
    private CodecAggregator aggregator;
    private Bootstrap bootstrap;
    private Channel acceptChannel;

    public NettyTransport(Configuration configuration, ThroughputCounter throughputCounter, LocalMetricRegistry localRegistry) {
        this.throughputCounter = throughputCounter;
        this.socketAddress = configuration.stringIsSet(CK_BIND_ADDRESS) && configuration.intIsSet(CK_PORT) ? new InetSocketAddress(configuration.getString(CK_BIND_ADDRESS), configuration.getInt(CK_PORT)) : null;
        this.recvBufferSize = configuration.intIsSet(CK_RECV_BUFFER_SIZE) ? (long)configuration.getInt(CK_RECV_BUFFER_SIZE) : MessageInput.getDefaultRecvBufferSize();
        this.localRegistry = localRegistry;
        localRegistry.registerAll(MetricSets.of(throughputCounter.gauges()));
    }

    private ChannelPipelineFactory getPipelineFactory(final LinkedHashMap<String, Callable<? extends ChannelHandler>> handlerList) {
        return new ChannelPipelineFactory(){

            public ChannelPipeline getPipeline() throws Exception {
                ChannelPipeline p = Channels.pipeline();
                for (Map.Entry entry : handlerList.entrySet()) {
                    p.addLast((String)entry.getKey(), (ChannelHandler)((Callable)entry.getValue()).call());
                }
                return p;
            }
        };
    }

    @Override
    public void setMessageAggregator(@Nullable CodecAggregator aggregator) {
        this.aggregator = aggregator;
    }

    @Override
    public void launch(MessageInput input) throws MisfireException {
        LinkedHashMap<String, Callable<? extends ChannelHandler>> handlerList = this.getBaseChannelHandlers(input);
        LinkedHashMap<String, Callable<? extends ChannelHandler>> finalHandlers = this.getFinalChannelHandlers(input);
        handlerList.putAll(finalHandlers);
        try {
            int receiveBufferSize;
            this.bootstrap = this.getBootstrap();
            this.bootstrap.setPipelineFactory(this.getPipelineFactory(handlerList));
            if (this.bootstrap instanceof ConnectionlessBootstrap) {
                this.acceptChannel = ((ConnectionlessBootstrap)this.bootstrap).bind((SocketAddress)this.socketAddress);
                DefaultDatagramChannelConfig channelConfig = (DefaultDatagramChannelConfig)this.acceptChannel.getConfig();
                receiveBufferSize = channelConfig.getReceiveBufferSize();
            } else if (this.bootstrap instanceof ServerBootstrap) {
                this.acceptChannel = ((ServerBootstrap)this.bootstrap).bind((SocketAddress)this.socketAddress);
                ServerSocketChannelConfig channelConfig = (ServerSocketChannelConfig)this.acceptChannel.getConfig();
                receiveBufferSize = channelConfig.getReceiveBufferSize();
            } else {
                log.error("Unknown Netty bootstrap class returned: {}. Cannot safely bind.", (Object)this.bootstrap);
                throw new IllegalStateException("Unknown netty bootstrap class returned: " + this.bootstrap + ". Cannot safely bind.");
            }
            if ((long)receiveBufferSize != this.getRecvBufferSize()) {
                log.warn("receiveBufferSize (SO_RCVBUF) for input {} should be {} but is {}.", new Object[]{input, this.getRecvBufferSize(), receiveBufferSize});
            }
        }
        catch (Exception e) {
            throw new MisfireException(e);
        }
    }

    @Override
    public void stop() {
        if (this.acceptChannel != null && this.acceptChannel.isOpen()) {
            this.acceptChannel.close();
        }
        if (this.bootstrap != null) {
            this.bootstrap.shutdown();
        }
    }

    protected abstract Bootstrap getBootstrap();

    protected LinkedHashMap<String, Callable<? extends ChannelHandler>> getBaseChannelHandlers(final MessageInput input) {
        LinkedHashMap handlerList = Maps.newLinkedHashMap();
        handlerList.put("exception-logger", new Callable<ChannelHandler>(){

            @Override
            public ChannelHandler call() throws Exception {
                return new SimpleChannelUpstreamHandler(){

                    public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) throws Exception {
                        log.error("Error on Input [" + input.getName() + "/" + input.getId() + "] (channel " + e.getChannel().toString() + ")", e.getCause());
                        super.exceptionCaught(ctx, e);
                    }
                };
            }
        });
        handlerList.put("packet-meta-dumper", new Callable<ChannelHandler>(){

            @Override
            public ChannelHandler call() throws Exception {
                return new PacketInformationDumper(input);
            }
        });
        handlerList.put("traffic-counter", Callables.returning((Object)((Object)this.throughputCounter)));
        return handlerList;
    }

    protected LinkedHashMap<String, Callable<? extends ChannelHandler>> getFinalChannelHandlers(final MessageInput input) {
        LinkedHashMap handlerList = Maps.newLinkedHashMap();
        if (this.aggregator != null) {
            log.debug("Adding codec aggregator {} to channel pipeline", (Object)this.aggregator);
            handlerList.put("codec-aggregator", new Callable<ChannelHandler>(){

                @Override
                public ChannelHandler call() throws Exception {
                    return new MessageAggregationHandler(NettyTransport.this.aggregator);
                }
            });
        }
        handlerList.put("rawmessage-handler", new Callable<ChannelHandler>(){

            @Override
            public ChannelHandler call() throws Exception {
                return new RawMessageHandler(input);
            }
        });
        return handlerList;
    }

    protected long getRecvBufferSize() {
        return this.recvBufferSize;
    }

    public SocketAddress getLocalAddress() {
        if (this.acceptChannel == null || !this.acceptChannel.isBound()) {
            return null;
        }
        return this.acceptChannel.getLocalAddress();
    }

    @Override
    public MetricSet getMetricSet() {
        return this.localRegistry;
    }

    public static class Config
    implements Transport.Config {
        @Override
        public ConfigurationRequest getRequestedConfiguration() {
            ConfigurationRequest r = new ConfigurationRequest();
            r.addField(ConfigurationRequest.Templates.bindAddress(NettyTransport.CK_BIND_ADDRESS));
            r.addField(ConfigurationRequest.Templates.portNumber(NettyTransport.CK_PORT, 5555));
            r.addField(ConfigurationRequest.Templates.recvBufferSize(NettyTransport.CK_RECV_BUFFER_SIZE, 0x100000));
            return r;
        }
    }

    private class RawMessageHandler
    extends SimpleChannelHandler {
        private final MessageInput input;

        public RawMessageHandler(MessageInput input) {
            this.input = input;
        }

        public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
            Object msg = e.getMessage();
            if (!(msg instanceof ChannelBuffer)) {
                log.error("Invalid message type received from transport pipeline. Should be ChannelBuffer but was {}. Discarding message.", msg.getClass());
                return;
            }
            ChannelBuffer buffer = (ChannelBuffer)msg;
            byte[] payload = new byte[buffer.readableBytes()];
            buffer.toByteBuffer().get(payload, buffer.readerIndex(), buffer.readableBytes());
            RawMessage raw = new RawMessage(payload, (InetSocketAddress)e.getRemoteAddress());
            this.input.processRawMessage(raw);
        }

        public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) throws Exception {
            log.debug("Could not handle message, closing connection: {}", (Object)e);
            if (ctx.getChannel() != null && !(ctx.getChannel() instanceof DatagramChannel)) {
                ctx.getChannel().close();
            }
        }
    }

    private class MessageAggregationHandler
    extends SimpleChannelHandler {
        private final CodecAggregator aggregator;
        private final Timer aggregationTimer;
        private final Meter invalidChunksMeter;

        public MessageAggregationHandler(CodecAggregator aggregator) {
            this.aggregator = aggregator;
            this.aggregationTimer = NettyTransport.this.localRegistry.timer("aggregationTime");
            this.invalidChunksMeter = NettyTransport.this.localRegistry.meter("invalidMessages");
        }

        public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
            Object message = e.getMessage();
            if (message instanceof ChannelBuffer) {
                CodecAggregator.Result result;
                ChannelBuffer buf = (ChannelBuffer)message;
                try (Timer.Context ignored = this.aggregationTimer.time();){
                    result = this.aggregator.addChunk(buf);
                }
                ChannelBuffer completeMessage = result.getMessage();
                if (completeMessage != null) {
                    log.debug("Message aggregation completion, forwarding {}", (Object)completeMessage);
                    Channels.fireMessageReceived((ChannelHandlerContext)ctx, (Object)completeMessage);
                } else if (result.isValid()) {
                    log.debug("More chunks necessary to complete this message");
                } else {
                    this.invalidChunksMeter.mark();
                    log.debug("Message chunk was not valid and discarded.");
                }
            } else {
                log.debug("Could not handle netty message {}, sending further upstream.", (Object)e);
                Channels.fireMessageReceived((ChannelHandlerContext)ctx, (Object)message);
            }
        }
    }
}

