/*
 * Decompiled with CFR 0.152.
 */
package org.logstash.beats;

import io.netty.channel.ChannelDuplexHandler;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.timeout.IdleState;
import io.netty.handler.timeout.IdleStateEvent;
import io.netty.util.AttributeKey;
import io.netty.util.concurrent.GenericFutureListener;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.logstash.beats.Ack;

public class ConnectionHandler
extends ChannelDuplexHandler {
    private static final Logger logger = LogManager.getLogger(ConnectionHandler.class);
    public static final AttributeKey<AtomicBoolean> CHANNEL_SEND_KEEP_ALIVE = AttributeKey.valueOf((String)"channel-send-keep-alive");

    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        ctx.channel().attr(CHANNEL_SEND_KEEP_ALIVE).set((Object)new AtomicBoolean(false));
        if (logger.isTraceEnabled()) {
            logger.trace("{}: channel activated", (Object)ctx.channel().id().asShortText());
        }
        super.channelActive(ctx);
    }

    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        ((AtomicBoolean)ctx.channel().attr(CHANNEL_SEND_KEEP_ALIVE).get()).set(true);
        if (logger.isDebugEnabled()) {
            logger.debug("{}: batches pending: {}", (Object)ctx.channel().id().asShortText(), (Object)((AtomicBoolean)ctx.channel().attr(CHANNEL_SEND_KEEP_ALIVE).get()).get());
        }
        super.channelRead(ctx, msg);
    }

    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
        if (evt instanceof IdleStateEvent) {
            IdleStateEvent e = (IdleStateEvent)evt;
            if (e.state() == IdleState.WRITER_IDLE) {
                if (this.sendKeepAlive(ctx)) {
                    ChannelFuture f = ctx.writeAndFlush((Object)new Ack(50, 0));
                    if (logger.isTraceEnabled()) {
                        logger.trace("{}: sending keep alive ack to libbeat", (Object)ctx.channel().id().asShortText());
                        f.addListener((GenericFutureListener)((ChannelFutureListener)future -> {
                            if (future.isSuccess()) {
                                logger.trace("{}: acking was successful", (Object)ctx.channel().id().asShortText());
                            } else {
                                logger.trace("{}: acking failed", (Object)ctx.channel().id().asShortText());
                            }
                        }));
                    }
                }
            } else if (e.state() == IdleState.ALL_IDLE) {
                logger.debug("{}: reader and writer are idle, closing remote connection", (Object)ctx.channel().id().asShortText());
                ctx.flush();
                ChannelFuture f = ctx.close();
                if (logger.isTraceEnabled()) {
                    f.addListener(future -> {
                        if (future.isSuccess()) {
                            logger.trace("closed ctx successfully");
                        } else {
                            logger.trace("could not close ctx");
                        }
                    });
                }
            }
        }
    }

    public boolean sendKeepAlive(ChannelHandlerContext ctx) {
        return ctx.channel().hasAttr(CHANNEL_SEND_KEEP_ALIVE) && ((AtomicBoolean)ctx.channel().attr(CHANNEL_SEND_KEEP_ALIVE).get()).get();
    }
}

