/*
 * Decompiled with CFR 0.152.
 */
package io.seata.core.rpc.netty;

import io.netty.channel.Channel;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.timeout.IdleState;
import io.netty.handler.timeout.IdleStateEvent;
import io.seata.common.util.NetUtil;
import io.seata.core.protocol.HeartbeatMessage;
import io.seata.core.protocol.RegisterRMRequest;
import io.seata.core.protocol.RegisterTMRequest;
import io.seata.core.protocol.RpcMessage;
import io.seata.core.rpc.ChannelManager;
import io.seata.core.rpc.DefaultServerMessageListenerImpl;
import io.seata.core.rpc.RpcContext;
import io.seata.core.rpc.ServerMessageListener;
import io.seata.core.rpc.ServerMessageSender;
import io.seata.core.rpc.TransactionMessageHandler;
import io.seata.core.rpc.netty.AbstractRpcRemotingServer;
import io.seata.core.rpc.netty.NettyServerConfig;
import io.seata.core.rpc.netty.RegisterCheckAuthHandler;
import java.io.IOException;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeoutException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@ChannelHandler.Sharable
public class RpcServer
extends AbstractRpcRemotingServer
implements ServerMessageSender {
    private static final Logger LOGGER = LoggerFactory.getLogger(RpcServer.class);
    protected ServerMessageListener serverMessageListener;
    private TransactionMessageHandler transactionMessageHandler;
    private RegisterCheckAuthHandler checkAuthHandler;

    public void setHandler(TransactionMessageHandler transactionMessageHandler) {
        this.setHandler(transactionMessageHandler, null);
    }

    public void setHandler(TransactionMessageHandler transactionMessageHandler, RegisterCheckAuthHandler checkAuthHandler) {
        this.transactionMessageHandler = transactionMessageHandler;
        this.checkAuthHandler = checkAuthHandler;
    }

    public RpcServer(ThreadPoolExecutor messageExecutor) {
        super(new NettyServerConfig(), messageExecutor, new ChannelHandler[0]);
    }

    public ServerMessageListener getServerMessageListener() {
        return this.serverMessageListener;
    }

    public void setServerMessageListener(ServerMessageListener serverMessageListener) {
        this.serverMessageListener = serverMessageListener;
    }

    public void debugLog(String info) {
        if (LOGGER.isDebugEnabled()) {
            LOGGER.debug(info);
        }
    }

    @Override
    public void init() {
        super.init();
        this.setChannelHandlers(new ChannelHandler[]{this});
        DefaultServerMessageListenerImpl defaultServerMessageListenerImpl = new DefaultServerMessageListenerImpl(this.transactionMessageHandler);
        defaultServerMessageListenerImpl.init();
        defaultServerMessageListenerImpl.setServerMessageSender(this);
        this.setServerMessageListener(defaultServerMessageListenerImpl);
        super.start();
    }

    private void closeChannelHandlerContext(ChannelHandlerContext ctx) {
        if (LOGGER.isInfoEnabled()) {
            LOGGER.info("closeChannelHandlerContext channel:" + ctx.channel());
        }
        ctx.disconnect();
        ctx.close();
    }

    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) {
        if (evt instanceof IdleStateEvent) {
            this.debugLog("idle:" + evt);
            IdleStateEvent idleStateEvent = (IdleStateEvent)evt;
            if (idleStateEvent.state() == IdleState.READER_IDLE) {
                if (LOGGER.isInfoEnabled()) {
                    LOGGER.info("channel:" + ctx.channel() + " read idle.");
                }
                this.handleDisconnect(ctx);
                try {
                    this.closeChannelHandlerContext(ctx);
                }
                catch (Exception e) {
                    LOGGER.error(e.getMessage());
                }
            }
        }
    }

    @Override
    public void destroy() {
        super.destroy();
        super.shutdown();
        if (LOGGER.isInfoEnabled()) {
            LOGGER.info("destroyed rpcServer");
        }
    }

    @Override
    public void sendResponse(RpcMessage request, Channel channel, Object msg) {
        Channel clientChannel = channel;
        if (!(msg instanceof HeartbeatMessage)) {
            clientChannel = ChannelManager.getSameClientChannel(channel);
        }
        if (clientChannel == null) {
            throw new RuntimeException("channel is error. channel:" + clientChannel);
        }
        super.sendResponse(request, clientChannel, msg);
    }

    @Override
    public Object sendSyncRequest(String resourceId, String clientId, Object message, long timeout) throws TimeoutException {
        Channel clientChannel = ChannelManager.getChannel(resourceId, clientId);
        if (clientChannel == null) {
            throw new RuntimeException("rm client is not connected. dbkey:" + resourceId + ",clientId:" + clientId);
        }
        return this.sendAsyncRequestWithResponse(null, clientChannel, message, timeout);
    }

    @Override
    public Object sendSyncRequest(Channel clientChannel, Object message) throws TimeoutException {
        return this.sendSyncRequest(clientChannel, message, NettyServerConfig.getRpcRequestTimeout());
    }

    @Override
    public Object sendSyncRequest(Channel clientChannel, Object message, long timeout) throws TimeoutException {
        if (clientChannel == null) {
            throw new RuntimeException("rm client is not connected");
        }
        return this.sendAsyncRequestWithResponse(null, clientChannel, message, timeout);
    }

    @Override
    public Object sendSyncRequest(String resourceId, String clientId, Object message) throws TimeoutException {
        return this.sendSyncRequest(resourceId, clientId, message, NettyServerConfig.getRpcRequestTimeout());
    }

    @Override
    public Object sendASyncRequest(Channel channel, Object message) throws IOException, TimeoutException {
        return this.sendAsyncRequestWithoutResponse(channel, message);
    }

    @Override
    public void dispatch(RpcMessage request, ChannelHandlerContext ctx) {
        Object msg = request.getBody();
        if (msg instanceof RegisterRMRequest) {
            this.serverMessageListener.onRegRmMessage(request, ctx, this, this.checkAuthHandler);
        } else if (ChannelManager.isRegistered(ctx.channel())) {
            this.serverMessageListener.onTrxMessage(request, ctx, this);
        } else {
            try {
                this.closeChannelHandlerContext(ctx);
            }
            catch (Exception exx) {
                LOGGER.error(exx.getMessage());
            }
            if (LOGGER.isInfoEnabled()) {
                LOGGER.info(String.format("close a unhandled connection! [%s]", ctx.channel().toString()));
            }
        }
    }

    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        this.debugLog("inactive:" + ctx);
        if (this.messageExecutor.isShutdown()) {
            return;
        }
        this.handleDisconnect(ctx);
        super.channelInactive(ctx);
    }

    private void handleDisconnect(ChannelHandlerContext ctx) {
        String ipAndPort = NetUtil.toStringAddress(ctx.channel().remoteAddress());
        RpcContext rpcContext = ChannelManager.getContextFromIdentified(ctx.channel());
        if (LOGGER.isInfoEnabled()) {
            LOGGER.info(ipAndPort + " to server channel inactive.");
        }
        if (null != rpcContext && null != rpcContext.getClientRole()) {
            rpcContext.release();
            if (LOGGER.isInfoEnabled()) {
                LOGGER.info("remove channel:" + ctx.channel() + "context:" + rpcContext);
            }
        } else if (LOGGER.isInfoEnabled()) {
            LOGGER.info("remove unused channel:" + ctx.channel());
        }
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        if (msg instanceof RpcMessage) {
            RpcMessage rpcMessage = (RpcMessage)msg;
            this.debugLog("read:" + rpcMessage.getBody());
            if (rpcMessage.getBody() instanceof RegisterTMRequest) {
                this.serverMessageListener.onRegTmMessage(rpcMessage, ctx, this, this.checkAuthHandler);
                return;
            }
            if (rpcMessage.getBody() == HeartbeatMessage.PING) {
                this.serverMessageListener.onCheckMessage(rpcMessage, ctx, this);
                return;
            }
        }
        super.channelRead(ctx, msg);
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        if (LOGGER.isInfoEnabled()) {
            LOGGER.info("channel exx:" + cause.getMessage() + ",channel:" + ctx.channel());
        }
        ChannelManager.releaseRpcContext(ctx.channel());
        super.exceptionCaught(ctx, cause);
    }
}

