/*
 * Decompiled with CFR 0.152.
 */
package org.logstash.beats;

import com.github.benmanes.caffeine.cache.Cache;
import com.github.benmanes.caffeine.cache.Caffeine;
import com.wavefront.common.Utils;
import com.yammer.metrics.Metrics;
import com.yammer.metrics.core.Counter;
import com.yammer.metrics.core.MetricName;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.util.concurrent.GenericFutureListener;
import java.net.InetSocketAddress;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Supplier;
import javax.net.ssl.SSLHandshakeException;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.logstash.beats.Ack;
import org.logstash.beats.Batch;
import org.logstash.beats.BatchIdentity;
import org.logstash.beats.ConnectionHandler;
import org.logstash.beats.IMessageListener;
import org.logstash.beats.Message;

@ChannelHandler.Sharable
public class BeatsHandler
extends SimpleChannelInboundHandler<Batch> {
    private static final Logger logger = LogManager.getLogger(BeatsHandler.class);
    private final IMessageListener messageListener;
    private final Supplier<Counter> duplicateBatchesIgnored = Utils.lazySupplier(() -> Metrics.newCounter((MetricName)new MetricName("logsharvesting", "", "filebeat-duplicate-batches")));
    private final Cache<String, BatchIdentity> batchDedupeCache = Caffeine.newBuilder().expireAfterAccess(1L, TimeUnit.HOURS).build();

    public BeatsHandler(IMessageListener listener) {
        this.messageListener = listener;
    }

    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        if (logger.isTraceEnabled()) {
            logger.trace(this.format(ctx, "Channel Active"));
        }
        super.channelActive(ctx);
        this.messageListener.onNewConnection(ctx);
    }

    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        super.channelInactive(ctx);
        if (logger.isTraceEnabled()) {
            logger.trace(this.format(ctx, "Channel Inactive"));
        }
        this.messageListener.onConnectionClose(ctx);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void channelRead0(ChannelHandlerContext ctx, Batch batch) throws Exception {
        if (logger.isDebugEnabled()) {
            logger.debug(this.format(ctx, "Received a new payload"));
        }
        try {
            boolean isFirstMessage = true;
            for (Message message : batch) {
                if (isFirstMessage) {
                    isFirstMessage = false;
                    String key = BatchIdentity.keyFrom(message);
                    BatchIdentity value = BatchIdentity.valueFrom(message);
                    if (key != null && value != null) {
                        BatchIdentity cached = (BatchIdentity)this.batchDedupeCache.getIfPresent((Object)key);
                        if (value.equals(cached)) {
                            this.duplicateBatchesIgnored.get().inc();
                            if (logger.isDebugEnabled()) {
                                logger.debug(this.format(ctx, "Duplicate filebeat batch received, ignoring"));
                            }
                            this.writeAck(ctx, message.getBatch().getProtocol(), message.getBatch().getHighestSequence());
                            break;
                        }
                        this.batchDedupeCache.put((Object)key, (Object)value);
                    }
                }
                if (logger.isDebugEnabled()) {
                    logger.debug(this.format(ctx, "Sending a new message for the listener, sequence: " + message.getSequence()));
                }
                this.messageListener.onNewMessage(ctx, message);
                if (!this.needAck(message)) continue;
                this.ack(ctx, message);
            }
        }
        finally {
            ((AtomicBoolean)ctx.channel().attr(ConnectionHandler.CHANNEL_SEND_KEEP_ALIVE).get()).set(false);
            if (logger.isDebugEnabled()) {
                logger.debug("{}: batches pending: {}", (Object)ctx.channel().id().asShortText(), (Object)((AtomicBoolean)ctx.channel().attr(ConnectionHandler.CHANNEL_SEND_KEEP_ALIVE).get()).get());
            }
            batch.release();
            ctx.flush();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        try {
            String causeMessage;
            if (!(cause instanceof SSLHandshakeException)) {
                this.messageListener.onException(ctx, cause);
            }
            String string = causeMessage = cause.getMessage() == null ? cause.getClass().toString() : cause.getMessage();
            if (logger.isDebugEnabled()) {
                logger.debug(this.format(ctx, "Handling exception: " + causeMessage), cause);
            }
            logger.info(this.format(ctx, "Handling exception: " + causeMessage));
        }
        finally {
            super.exceptionCaught(ctx, cause);
            ctx.flush();
            ctx.close();
        }
    }

    private boolean needAck(Message message) {
        return message.getSequence() == message.getBatch().getHighestSequence();
    }

    private void ack(ChannelHandlerContext ctx, Message message) {
        if (logger.isTraceEnabled()) {
            logger.trace(this.format(ctx, "Acking message number " + message.getSequence()));
        }
        this.writeAck(ctx, message.getBatch().getProtocol(), message.getSequence());
        this.writeAck(ctx, message.getBatch().getProtocol(), 0);
    }

    private void writeAck(ChannelHandlerContext ctx, byte protocol, int sequence) {
        ctx.writeAndFlush((Object)new Ack(protocol, sequence)).addListener((GenericFutureListener)((ChannelFutureListener)channelFuture -> {
            if (channelFuture.isSuccess() && logger.isTraceEnabled() && sequence > 0) {
                logger.trace(this.format(ctx, "Ack complete for message number " + sequence));
            }
        }));
    }

    private String format(ChannelHandlerContext ctx, String message) {
        InetSocketAddress local = (InetSocketAddress)ctx.channel().localAddress();
        InetSocketAddress remote = (InetSocketAddress)ctx.channel().remoteAddress();
        String localhost = local != null ? local.getAddress().getHostAddress() + ":" + local.getPort() : "undefined";
        String remotehost = remote != null ? remote.getAddress().getHostAddress() + ":" + remote.getPort() : "undefined";
        return "[local: " + localhost + ", remote: " + remotehost + "] " + message;
    }
}

