/*
 * Decompiled with CFR 0.152.
 */
package io.vertx.eventbusclient.transport;

import io.netty.buffer.ByteBuf;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ByteToMessageDecoder;
import io.vertx.eventbusclient.EventBusClientOptions;
import io.vertx.eventbusclient.transport.Transport;
import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;

public class TcpTransport
extends Transport {
    private ChannelHandlerContext handlerCtx;
    private boolean baseHandshakeComplete = false;
    private boolean tcpHandshakeComplete = false;
    private AtomicBoolean connectedHandlerInvoked = new AtomicBoolean(false);
    private boolean reading;
    private boolean flush;

    public TcpTransport(EventBusClientOptions options) {
        super(options);
    }

    @Override
    protected void initChannel(Channel channel) throws Exception {
        super.initChannel(channel);
        this.baseHandshakeComplete = false;
        this.tcpHandshakeComplete = false;
        this.connectedHandlerInvoked.set(false);
        channel.pipeline().addLast(new ChannelHandler[]{new ByteToMessageDecoder(){

            public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                TcpTransport.this.reading = true;
                super.channelRead(ctx, msg);
            }

            public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
                super.channelReadComplete(ctx);
                TcpTransport.this.reading = false;
                if (TcpTransport.this.flush) {
                    TcpTransport.this.flush = false;
                    ctx.flush();
                }
            }

            public void channelActive(ChannelHandlerContext ctx) throws Exception {
                super.channelActive(ctx);
                TcpTransport.this.handlerCtx = ctx;
                TcpTransport.this.tcpHandshakeComplete = true;
                if (TcpTransport.this.baseHandshakeComplete && !TcpTransport.this.connectedHandlerInvoked.getAndSet(true)) {
                    TcpTransport.this.connectedHandler.handle(null);
                }
            }

            protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
                while (in.readableBytes() >= 4) {
                    int readerIdx = in.readerIndex();
                    int len = in.getInt(readerIdx);
                    if (in.readableBytes() < 4 + len) {
                        return;
                    }
                    String json = in.toString(readerIdx + 4, len, StandardCharsets.UTF_8);
                    in.readerIndex(readerIdx + 4 + len);
                    TcpTransport.this.messageHandler.handle(json);
                }
            }

            public void channelInactive(ChannelHandlerContext ctx) throws Exception {
                TcpTransport.this.handlerCtx = null;
                if (TcpTransport.this.connectedHandlerInvoked.get()) {
                    TcpTransport.this.closeHandler.handle(null);
                }
            }
        }});
    }

    @Override
    void handshakeCompleteHandler(Channel channel) {
        this.baseHandshakeComplete = true;
        if (this.tcpHandshakeComplete && !this.connectedHandlerInvoked.getAndSet(true)) {
            this.connectedHandler.handle(null);
        }
    }

    @Override
    public void send(final String message) {
        if (this.handlerCtx.executor().inEventLoop()) {
            ByteBuf buff = this.handlerCtx.alloc().buffer();
            buff.writeInt(0);
            buff.writeCharSequence((CharSequence)message, StandardCharsets.UTF_8);
            buff.setInt(0, buff.readableBytes() - 4);
            if (this.reading) {
                this.flush = true;
                this.addSendErrorHandler(this.handlerCtx, message, this.handlerCtx.write((Object)buff));
            } else {
                this.addSendErrorHandler(this.handlerCtx, message, this.handlerCtx.writeAndFlush((Object)buff));
            }
        } else {
            this.handlerCtx.executor().execute(new Runnable(){

                @Override
                public void run() {
                    TcpTransport.this.send(message);
                }
            });
        }
    }
}

