/*
 * Decompiled with CFR 0.152.
 */
package org.fisco.bcos.channel.handler;

import com.fasterxml.jackson.core.JsonProcessingException;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.socket.SocketChannel;
import io.netty.util.AttributeKey;
import io.netty.util.Timeout;
import io.netty.util.TimerTask;
import io.netty.util.concurrent.GenericFutureListener;
import java.io.IOException;
import java.math.BigInteger;
import java.net.InetSocketAddress;
import java.util.Arrays;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import org.fisco.bcos.channel.client.BcosResponseCallback;
import org.fisco.bcos.channel.client.Service;
import org.fisco.bcos.channel.dto.BcosMessage;
import org.fisco.bcos.channel.dto.BcosResponse;
import org.fisco.bcos.channel.dto.ChannelMessage2;
import org.fisco.bcos.channel.dto.TopicVerifyMessage;
import org.fisco.bcos.channel.handler.ChannelConnections;
import org.fisco.bcos.channel.handler.ChannelHandlerContextHelper;
import org.fisco.bcos.channel.handler.Message;
import org.fisco.bcos.channel.protocol.ChannelHandshake;
import org.fisco.bcos.channel.protocol.ChannelMessageError;
import org.fisco.bcos.channel.protocol.ChannelMessageType;
import org.fisco.bcos.channel.protocol.ChannelPrococolExceiption;
import org.fisco.bcos.channel.protocol.ChannelProtocol;
import org.fisco.bcos.channel.protocol.EnumChannelProtocolVersion;
import org.fisco.bcos.channel.protocol.EnumSocketChannelAttributeKey;
import org.fisco.bcos.fisco.EnumNodeVersion;
import org.fisco.bcos.web3j.protocol.ObjectMapperFactory;
import org.fisco.bcos.web3j.protocol.channel.ChannelEthereumService;
import org.fisco.bcos.web3j.protocol.core.Request;
import org.fisco.bcos.web3j.protocol.core.Response;
import org.fisco.bcos.web3j.protocol.core.methods.response.BlockNumber;
import org.fisco.bcos.web3j.protocol.core.methods.response.NodeVersion;
import org.fisco.bcos.web3j.protocol.exceptions.MessageDecodingException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ConnectionCallback
implements ChannelConnections.Callback {
    private static Logger logger = LoggerFactory.getLogger(ConnectionCallback.class);
    private Integer connectTimeoutMS = 10000;
    private Integer queryNodeVersionTimeoutMS = 5000;
    private Service channelService;
    private Set<String> topics;

    public Integer getQueryNodeVersionTimeoutMS() {
        return this.queryNodeVersionTimeoutMS;
    }

    public void setQueryNodeVersionTimeoutMS(Integer queryNodeVersionTimeoutMS) {
        this.queryNodeVersionTimeoutMS = queryNodeVersionTimeoutMS;
    }

    public Integer getConnectTimeoutMS() {
        return this.connectTimeoutMS;
    }

    public void setConnectTimeoutMS(Integer connectTimeoutMS) {
        this.connectTimeoutMS = connectTimeoutMS;
    }

    public Service getChannelService() {
        return this.channelService;
    }

    public void setChannelService(Service channelService) {
        this.channelService = channelService;
    }

    public ConnectionCallback(Set<String> topics) {
        this.topics = topics;
    }

    public void setTopics(Set<String> topics) {
        try {
            this.topics = topics;
        }
        catch (Exception e) {
            logger.error("system error", (Throwable)e);
        }
    }

    @Override
    public void onConnect(ChannelHandlerContext ctx) {
        String host = ChannelHandlerContextHelper.getPeerHost(ctx);
        logger.info(" connect {} success, ctx: {}", (Object)host, (Object)System.identityHashCode(ctx));
        try {
            this.channelService.setNumber(BigInteger.ONE);
            this.queryNodeVersion(ctx);
        }
        catch (JsonProcessingException e) {
            logger.error(" query node version exception, ctx: {}, message: {} ", (Object)ctx, (Object)e.getMessage());
            ctx.writeAndFlush((Object)"").addListener((GenericFutureListener)ChannelFutureListener.CLOSE);
        }
    }

    private void queryChannelProtocolVersion(final ChannelHandlerContext ctx) throws ChannelPrococolExceiption, IOException {
        final String host = ChannelHandlerContextHelper.getPeerHost(ctx);
        ChannelHandshake channelHandshake = new ChannelHandshake();
        String seq = UUID.randomUUID().toString().replaceAll("-", "");
        byte[] payload = ObjectMapperFactory.getObjectMapper().writeValueAsBytes((Object)channelHandshake);
        String content = new String(payload);
        logger.debug(" channel protocol handshake, host: {}, seq: {}, content: {}", new Object[]{host, seq, content});
        BcosMessage bcosMessage = new BcosMessage();
        bcosMessage.setType((short)ChannelMessageType.CLIENT_HANDSHAKE.getType());
        bcosMessage.setSeq(seq);
        bcosMessage.setResult(0);
        bcosMessage.setData(payload);
        ByteBuf byteBuf = ctx.alloc().buffer();
        bcosMessage.writeHeader(byteBuf);
        bcosMessage.writeExtra(byteBuf);
        ctx.writeAndFlush((Object)byteBuf);
        this.channelService.getSeq2Callback().put(seq, new BcosResponseCallback(){

            @Override
            public void onResponse(BcosResponse response) {
                try {
                    if (response.getErrorCode() != 0) {
                        logger.error(" channel protocol handshake request failed, code: {}, message: {}", (Object)response.getErrorCode(), (Object)response.getErrorMessage());
                        throw new ChannelPrococolExceiption(" channel protocol handshake request failed, code: " + response.getErrorCode() + ", message: " + response.getErrorMessage());
                    }
                    ChannelProtocol channelProtocol = (ChannelProtocol)ObjectMapperFactory.getObjectMapper().readValue(response.getContent(), ChannelProtocol.class);
                    EnumChannelProtocolVersion enumChannelProtocolVersion = EnumChannelProtocolVersion.toEnum(channelProtocol.getProtocol());
                    channelProtocol.setEnumProtocol(enumChannelProtocolVersion);
                    logger.info(" channel protocol handshake success, set socket channel protocol, host: {}, channel protocol: {}", (Object)host, (Object)channelProtocol);
                    ctx.channel().attr(AttributeKey.valueOf((String)EnumSocketChannelAttributeKey.CHANNEL_PROTOCOL_KEY.getKey())).set((Object)channelProtocol);
                    ConnectionCallback.this.sendUpdateTopicMessage(ctx);
                    ConnectionCallback.this.queryBlockNumber(ctx);
                }
                catch (Exception e) {
                    logger.error(" channel protocol handshake failed, exception: {}", (Object)e.getMessage());
                    ctx.writeAndFlush((Object)"").addListener((GenericFutureListener)ChannelFutureListener.CLOSE);
                }
            }
        });
    }

    private void queryNodeVersion(final ChannelHandlerContext ctx) throws JsonProcessingException {
        BcosResponseCallback callback;
        String host = ChannelHandlerContextHelper.getPeerHost(ctx);
        String seq = UUID.randomUUID().toString().replaceAll("-", "");
        Request<Object, NodeVersion> request = new Request<Object, NodeVersion>("getClientVersion", Arrays.asList(new Object[0]), null, NodeVersion.class);
        byte[] payload = ObjectMapperFactory.getObjectMapper().writeValueAsBytes(request);
        String content = new String(payload);
        logger.info(" query node version host: {}, seq: {}, content: {}", new Object[]{host, seq, content});
        final BcosMessage bcosMessage = new BcosMessage();
        bcosMessage.setType((short)ChannelMessageType.CHANNEL_RPC_REQUEST.getType());
        bcosMessage.setSeq(seq);
        bcosMessage.setResult(0);
        bcosMessage.setData(payload);
        ByteBuf byteBuf = ctx.alloc().buffer();
        bcosMessage.writeHeader(byteBuf);
        bcosMessage.writeExtra(byteBuf);
        ctx.writeAndFlush((Object)byteBuf);
        final BcosResponseCallback callbackInner = callback = new BcosResponseCallback(){

            @Override
            public void onResponse(BcosResponse response) {
                try {
                    if (response.getErrorCode().intValue() == ChannelMessageError.MESSAGE_TIMEOUT.getError()) {
                        ChannelHandlerContextHelper.setProtocolVersion(ctx, EnumChannelProtocolVersion.VERSION_1, "below-2.1.0-timeout");
                        logger.info(" query node version timeout, content: {}", (Object)response.getContent());
                        ConnectionCallback.this.sendUpdateTopicMessage(ctx);
                        ConnectionCallback.this.queryBlockNumber(ctx);
                        return;
                    }
                    if (response.getErrorCode() != 0) {
                        logger.error(" fisco node version response, code: {}, message: {}", (Object)response.getErrorCode(), (Object)response.getErrorMessage());
                        throw new ChannelPrococolExceiption(" query node version failed, code: " + response.getErrorCode() + ", message: " + response.getErrorMessage());
                    }
                    Response nodeVersion = (Response)ObjectMapperFactory.getObjectMapper().readValue(response.getContent(), NodeVersion.class);
                    logger.info(" node: {}, content: {}", nodeVersion.getResult(), (Object)response.getContent());
                    if (EnumNodeVersion.channelProtocolHandleShakeSupport(((NodeVersion.Version)nodeVersion.getResult()).getSupportedVersion())) {
                        logger.info(" support channel handshake node: {}, content: {}", nodeVersion.getResult(), (Object)response.getContent());
                        ConnectionCallback.this.queryChannelProtocolVersion(ctx);
                    } else {
                        ChannelHandlerContextHelper.setProtocolVersion(ctx, EnumChannelProtocolVersion.VERSION_1, ((NodeVersion.Version)nodeVersion.getResult()).getSupportedVersion());
                        logger.info(" not support channel handshake set default ,node: {}, content: {}", nodeVersion.getResult(), (Object)response.getContent());
                        ConnectionCallback.this.sendUpdateTopicMessage(ctx);
                        ConnectionCallback.this.queryBlockNumber(ctx);
                    }
                }
                catch (Exception e) {
                    logger.error(" query node version failed, message: {}", (Object)e.getMessage());
                    ctx.writeAndFlush((Object)"").addListener((GenericFutureListener)ChannelFutureListener.CLOSE);
                }
            }
        };
        callback.setTimeout(this.channelService.getTimeoutHandler().newTimeout(new TimerTask(){
            BcosResponseCallback _callback;
            {
                this._callback = callbackInner;
            }

            public void run(Timeout timeout) throws Exception {
                this._callback.onTimeout();
                logger.error("queryNodeVersion timeout, seq: {}", (Object)bcosMessage.getSeq());
            }
        }, (long)this.queryNodeVersionTimeoutMS.intValue(), TimeUnit.MILLISECONDS));
        this.channelService.getSeq2Callback().put(seq, callback);
    }

    public void sendUpdateTopicMessage(ChannelHandlerContext ctx) throws JsonProcessingException {
        Message message = new Message();
        message.setResult(0);
        message.setType((short)ChannelMessageType.AMOP_CLIENT_TOPICS.getType());
        message.setSeq(UUID.randomUUID().toString().replaceAll("-", ""));
        this.topics.add("_block_notify_" + this.channelService.getGroupId());
        message.setData(ObjectMapperFactory.getObjectMapper().writeValueAsBytes((Object)this.topics.toArray()));
        String content = new String(message.getData());
        ByteBuf out = ctx.alloc().buffer();
        message.writeHeader(out);
        message.writeExtra(out);
        ctx.writeAndFlush((Object)out);
        logger.info(" send update topic message request, seq: {}, content: {}", (Object)message.getSeq(), (Object)content);
    }

    private void queryBlockNumber(final ChannelHandlerContext ctx) throws JsonProcessingException {
        final String host = ChannelHandlerContextHelper.getPeerHost(ctx);
        String seq = this.channelService.newSeq();
        BcosMessage bcosMessage = new BcosMessage();
        bcosMessage.setType((short)ChannelMessageType.CHANNEL_RPC_REQUEST.getType());
        bcosMessage.setSeq(seq);
        ChannelEthereumService channelEthereumService = new ChannelEthereumService();
        channelEthereumService.setChannelService(this.channelService);
        Request<Integer, BlockNumber> request = new Request<Integer, BlockNumber>("getBlockNumber", Arrays.asList(this.channelService.getGroupId()), channelEthereumService, BlockNumber.class);
        bcosMessage.setData(ObjectMapperFactory.getObjectMapper().writeValueAsBytes(request));
        ByteBuf byteBuf = ctx.alloc().buffer();
        bcosMessage.writeHeader(byteBuf);
        bcosMessage.writeExtra(byteBuf);
        ctx.writeAndFlush((Object)byteBuf);
        String content = new String(bcosMessage.getData());
        logger.info(" query block number host: {}, seq: {}, content: {}", new Object[]{host, seq, content});
        this.channelService.getSeq2Callback().put(seq, new BcosResponseCallback(){

            @Override
            public void onResponse(BcosResponse response) {
                try {
                    BlockNumber blockNumber = (BlockNumber)ObjectMapperFactory.getObjectMapper().readValue(response.getContent(), BlockNumber.class);
                    SocketChannel socketChannel = (SocketChannel)ctx.channel();
                    InetSocketAddress socketAddress = socketChannel.remoteAddress();
                    ConnectionCallback.this.channelService.getNodeToBlockNumberMap().put(socketAddress.getAddress().getHostAddress() + socketAddress.getPort(), blockNumber.getBlockNumber());
                    logger.info(" query blocknumer, host:{}, blockNumber: {} ", (Object)host, (Object)blockNumber.getBlockNumber());
                }
                catch (Exception e) {
                    logger.error(" query blocknumer failed, host: {}, message: {} ", (Object)host, (Object)e.getMessage());
                    throw new MessageDecodingException(response.getContent());
                }
            }
        });
    }

    @Override
    public void onDisconnect(ChannelHandlerContext ctx) {
        String host = ChannelHandlerContextHelper.getPeerHost(ctx);
        this.channelService.getEventLogFilterManager().updateEventLogFilterStatus(ctx);
        logger.debug(" disconnect, host: {}, ctx: {}", (Object)host, (Object)System.identityHashCode(ctx));
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void onMessage(ChannelHandlerContext ctx, ByteBuf message) {
        try {
            Message msg = new Message();
            try {
                msg.readHeader(message);
            }
            catch (Exception e) {
                String host = ((SocketChannel)ctx.channel()).remoteAddress().getAddress().getHostAddress();
                Integer port = ((SocketChannel)ctx.channel()).remoteAddress().getPort();
                logger.error(" Maybe p2p port is used to channel connection, please check the configuration, peer {}:{}", (Object)host, (Object)port);
                throw new RuntimeException(e.getCause());
            }
            logger.trace("onMessage, seq:{}, type: {}, result: {}", new Object[]{msg.getSeq(), msg.getType(), msg.getResult()});
            if (msg.getType().shortValue() == ChannelMessageType.AMOP_REQUEST.getType() || msg.getType().shortValue() == ChannelMessageType.AMOP_RESPONSE.getType() || msg.getType().shortValue() == ChannelMessageType.AMOP_MULBROADCAST.getType()) {
                ChannelMessage2 channelMessage = new ChannelMessage2(msg);
                channelMessage.readExtra(message);
                this.channelService.onReceiveChannelMessage2(ctx, channelMessage);
            } else if (msg.getType().shortValue() == ChannelMessageType.CHANNEL_RPC_REQUEST.getType()) {
                BcosMessage fiscoMessage = new BcosMessage(msg);
                fiscoMessage.readExtra(message);
                this.channelService.onReceiveEthereumMessage(ctx, fiscoMessage);
            } else if (msg.getType().shortValue() == ChannelMessageType.CLIENT_HEARTBEAT.getType()) {
                msg.readExtra(message);
                this.channelService.onReceiveHeartbeat(ctx, msg);
            } else if (msg.getType().shortValue() == ChannelMessageType.CLIENT_HANDSHAKE.getType()) {
                BcosMessage fiscoMessage = new BcosMessage(msg);
                fiscoMessage.readExtra(message);
                this.channelService.onReceiveEthereumMessage(ctx, fiscoMessage);
            } else if (msg.getType().shortValue() == ChannelMessageType.CLIENT_REGISTER_EVENT_LOG.getType()) {
                ChannelMessage2 channelMessage = new ChannelMessage2(msg);
                channelMessage.readExtra(message);
                this.channelService.onReceiveRegisterEventResponse(ctx, channelMessage);
            } else if (msg.getType().shortValue() == ChannelMessageType.TRANSACTION_NOTIFY.getType()) {
                BcosMessage fiscoMessage = new BcosMessage(msg);
                fiscoMessage.readExtra(message);
                this.channelService.onReceiveTransactionMessage(ctx, fiscoMessage);
            } else if (msg.getType().shortValue() == ChannelMessageType.BLOCK_NOTIFY.getType()) {
                ChannelMessage2 channelMessage = new ChannelMessage2(msg);
                channelMessage.readExtra(message);
                this.channelService.onReceiveBlockNotify(ctx, channelMessage);
            } else if (msg.getType().shortValue() == ChannelMessageType.EVENT_LOG_PUSH.getType()) {
                BcosMessage bcosMessage = new BcosMessage(msg);
                bcosMessage.readExtra(message);
                this.channelService.onReceiveEventLogPush(ctx, bcosMessage);
            } else if (msg.getType().shortValue() == ChannelMessageType.REQUEST_TOPICCERT.getType()) {
                logger.info("get generate rand value request data");
                TopicVerifyMessage channelMessage = new TopicVerifyMessage(msg);
                channelMessage.readExtra(message);
                try {
                    this.channelService.checkTopicVerify(ctx, channelMessage);
                }
                catch (IOException e) {
                    logger.error("on receive channel failed");
                }
            } else {
                logger.error("unknown message type:{}", (Object)msg.getType());
            }
        }
        finally {
            message.release();
        }
    }

    @Override
    public void sendHeartbeat(ChannelHandlerContext ctx) {
        this.channelService.sendHeartbeatMessage(ctx);
    }
}

