/*
 * Decompiled with CFR 0.152.
 */
package io.seata.core.rpc.netty;

import io.netty.channel.Channel;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.timeout.IdleStateEvent;
import io.netty.util.concurrent.EventExecutorGroup;
import io.seata.common.exception.FrameworkErrorCode;
import io.seata.common.exception.FrameworkException;
import io.seata.common.thread.NamedThreadFactory;
import io.seata.common.util.NetUtil;
import io.seata.core.model.Resource;
import io.seata.core.model.ResourceManager;
import io.seata.core.protocol.AbstractMessage;
import io.seata.core.protocol.HeartbeatMessage;
import io.seata.core.protocol.RegisterRMRequest;
import io.seata.core.protocol.RegisterRMResponse;
import io.seata.core.rpc.netty.AbstractRpcRemotingClient;
import io.seata.core.rpc.netty.NettyClientConfig;
import io.seata.core.rpc.netty.NettyPoolKey;
import java.net.SocketAddress;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.commons.pool.impl.GenericKeyedObjectPool;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@ChannelHandler.Sharable
public final class RmRpcClient
extends AbstractRpcRemotingClient {
    private static final Logger LOGGER = LoggerFactory.getLogger(RmRpcClient.class);
    private ResourceManager resourceManager;
    private String applicationId;
    private String transactionServiceGroup;
    private static volatile RmRpcClient instance;
    private final ConcurrentMap<String, Object> channelLocks = new ConcurrentHashMap<String, Object>();
    private final ConcurrentMap<String, NettyPoolKey> poolKeyMap = new ConcurrentHashMap<String, NettyPoolKey>();
    private final ConcurrentMap<String, Channel> channels = new ConcurrentHashMap<String, Channel>();
    private String customerKeys;
    private final AtomicBoolean initialized = new AtomicBoolean(false);
    private static final int MAX_MERGE_SEND_THREAD = 1;
    private static final long KEEP_ALIVE_TIME = Integer.MAX_VALUE;
    private static final int MAX_QUEUE_SIZE = 20000;
    private static final int SCHEDULE_INTERVAL_MILLS = 5;
    private static final String MERGE_THREAD_PREFIX = "rpcMergeMessageSend";
    private final NettyClientConfig rmClientConfig;

    private RmRpcClient(NettyClientConfig nettyClientConfig) {
        super(nettyClientConfig);
        this.rmClientConfig = nettyClientConfig;
    }

    private RmRpcClient(NettyClientConfig nettyClientConfig, EventExecutorGroup eventExecutorGroup, ThreadPoolExecutor messageExecutor) {
        super(nettyClientConfig, eventExecutorGroup, messageExecutor);
        this.rmClientConfig = nettyClientConfig;
    }

    public void setResourceManager(ResourceManager resourceManager) {
        this.resourceManager = resourceManager;
    }

    public static RmRpcClient getInstance(String applicationId, String transactionServiceGroup) {
        RmRpcClient rmRpcClient = RmRpcClient.getInstance();
        rmRpcClient.setApplicationId(applicationId);
        rmRpcClient.setTransactionServiceGroup(transactionServiceGroup);
        return rmRpcClient;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     * Enabled force condition propagation
     * Lifted jumps to return sites
     */
    public static RmRpcClient getInstance() {
        if (null != instance) return instance;
        Class<RmRpcClient> clazz = RmRpcClient.class;
        synchronized (RmRpcClient.class) {
            if (null != instance) return instance;
            NettyClientConfig nettyClientConfig = new NettyClientConfig();
            ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(nettyClientConfig.getClientWorkerThreads(), nettyClientConfig.getClientWorkerThreads(), Integer.MAX_VALUE, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(20000), (ThreadFactory)new NamedThreadFactory(nettyClientConfig.getRmDispatchThreadPrefix(), nettyClientConfig.getClientWorkerThreads()), new ThreadPoolExecutor.CallerRunsPolicy());
            instance = new RmRpcClient(nettyClientConfig, null, threadPoolExecutor);
            // ** MonitorExit[var0] (shouldn't be in output)
            return instance;
        }
    }

    @Override
    public void init() {
        if (this.initialized.compareAndSet(false, true)) {
            super.init();
            this.timerExecutor.scheduleAtFixedRate(new Runnable(){

                @Override
                public void run() {
                    RmRpcClient.this.reconnect(RmRpcClient.this.transactionServiceGroup);
                }
            }, 5L, 5L, TimeUnit.SECONDS);
            ThreadPoolExecutor mergeSendExecutorService = new ThreadPoolExecutor(1, 1, Integer.MAX_VALUE, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(), (ThreadFactory)new NamedThreadFactory(this.getThreadPrefix(MERGE_THREAD_PREFIX), 1));
            mergeSendExecutorService.submit(new AbstractRpcRemotingClient.MergedSendRunnable());
        }
    }

    public void setApplicationId(String applicationId) {
        this.applicationId = applicationId;
    }

    public void setTransactionServiceGroup(String transactionServiceGroup) {
        this.transactionServiceGroup = transactionServiceGroup;
    }

    @Override
    protected GenericKeyedObjectPool.Config getNettyPoolConfig() {
        GenericKeyedObjectPool.Config poolConfig = new GenericKeyedObjectPool.Config();
        poolConfig.maxActive = this.rmClientConfig.getMaxPoolActive();
        poolConfig.minIdle = this.rmClientConfig.getMinPoolIdle();
        poolConfig.maxWait = this.rmClientConfig.getMaxAcquireConnMills();
        poolConfig.testOnBorrow = this.rmClientConfig.isPoolTestBorrow();
        poolConfig.testOnReturn = this.rmClientConfig.isPoolTestReturn();
        poolConfig.lifo = this.rmClientConfig.isPoolLifo();
        return poolConfig;
    }

    @Override
    protected NettyPoolKey.TransactionRole getTransactionRole() {
        return NettyPoolKey.TransactionRole.RMROLE;
    }

    @Override
    public Object sendMsgWithResponse(Object msg, long timeout) throws TimeoutException {
        String validAddress = this.loadBalance(this.transactionServiceGroup);
        Channel acquireChannel = this.connect(validAddress);
        return super.sendAsyncRequestWithResponse(validAddress, acquireChannel, msg, timeout);
    }

    @Override
    public Object sendMsgWithResponse(String serverAddress, Object msg, long timeout) throws TimeoutException {
        return super.sendAsyncRequestWithResponse(serverAddress, this.connect(serverAddress), msg, timeout);
    }

    @Override
    public Object sendMsgWithResponse(Object msg) throws TimeoutException {
        return this.sendMsgWithResponse(msg, NettyClientConfig.getRpcRequestTimeout());
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) {
        if (evt instanceof IdleStateEvent) {
            IdleStateEvent idleStateEvent = (IdleStateEvent)evt;
            if (idleStateEvent == IdleStateEvent.READER_IDLE_STATE_EVENT) {
                if (LOGGER.isInfoEnabled()) {
                    LOGGER.info("RmRpcClient channel" + ctx.channel() + " idle.");
                }
                try {
                    String serverAddress = NetUtil.toStringAddress((SocketAddress)ctx.channel().remoteAddress());
                    this.nettyClientKeyPool.invalidateObject(this.poolKeyMap.get(serverAddress), (Object)ctx.channel());
                }
                catch (Exception exx) {
                    LOGGER.error(exx.getMessage());
                }
                finally {
                    this.releaseChannel(ctx.channel(), this.getAddressFromContext(ctx));
                }
            }
            if (idleStateEvent == IdleStateEvent.WRITER_IDLE_STATE_EVENT) {
                try {
                    this.sendRequest(ctx.channel(), HeartbeatMessage.PING);
                }
                catch (Throwable throwable) {
                    LOGGER.error("", (Object)"send request error", (Object)throwable);
                }
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    protected void releaseChannel(Channel channel, String serverAddress) {
        if (null == channel || null == serverAddress) {
            return;
        }
        Object connectLock = this.channelLocks.get(serverAddress);
        try {
            Object v = connectLock;
            synchronized (v) {
                Channel ch = (Channel)this.channels.get(serverAddress);
                if (null == ch) {
                    this.nettyClientKeyPool.returnObject(this.poolKeyMap.get(serverAddress), (Object)channel);
                    return;
                }
                if (ch.compareTo((Object)channel) == 0) {
                    if (LOGGER.isInfoEnabled()) {
                        LOGGER.info("return to pool, rm channel:" + channel);
                    }
                    this.destroyChannel(serverAddress, channel);
                } else {
                    this.nettyClientKeyPool.returnObject(this.poolKeyMap.get(serverAddress), (Object)channel);
                }
            }
        }
        catch (Exception exx) {
            LOGGER.error(exx.getMessage());
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    protected Channel connect(String serverAddress) {
        Object connectLock;
        Channel channelToServer = (Channel)this.channels.get(serverAddress);
        if (channelToServer != null && null != (channelToServer = this.getExistAliveChannel(channelToServer, serverAddress))) {
            return channelToServer;
        }
        if (LOGGER.isInfoEnabled()) {
            LOGGER.info("will connect to " + serverAddress);
        }
        this.channelLocks.putIfAbsent(serverAddress, new Object());
        Object v = connectLock = this.channelLocks.get(serverAddress);
        synchronized (v) {
            Channel channel = this.doConnect(serverAddress);
            return channel;
        }
    }

    private Channel doConnect(String serverAddress) {
        Channel channelToServer = (Channel)this.channels.get(serverAddress);
        if (channelToServer != null && channelToServer.isActive()) {
            return channelToServer;
        }
        Channel channelFromPool = null;
        try {
            String resourceIds;
            String string = resourceIds = this.customerKeys == null ? this.getMergedResourceKeys(this.resourceManager) : this.customerKeys;
            if (LOGGER.isInfoEnabled()) {
                LOGGER.info("RM will register :" + resourceIds);
            }
            RegisterRMRequest message = null;
            if (null == this.poolKeyMap.get(serverAddress)) {
                message = new RegisterRMRequest(this.applicationId, this.transactionServiceGroup);
                message.setResourceIds(resourceIds);
                this.poolKeyMap.putIfAbsent(serverAddress, new NettyPoolKey(this.getTransactionRole(), serverAddress, message));
            } else {
                message = (RegisterRMRequest)((NettyPoolKey)this.poolKeyMap.get(serverAddress)).getMessage();
                message.setResourceIds(resourceIds);
            }
            channelFromPool = (Channel)this.nettyClientKeyPool.borrowObject(this.poolKeyMap.get(serverAddress));
        }
        catch (Exception exx) {
            LOGGER.error(FrameworkErrorCode.RegistRM.getErrCode(), (Object)"register RM failed.", (Object)exx);
            throw new FrameworkException("can not register RM,err:" + exx.getMessage());
        }
        return channelFromPool;
    }

    private Channel getExistAliveChannel(Channel rmChannel, String serverAddress) {
        int i;
        if (rmChannel.isActive()) {
            return rmChannel;
        }
        for (i = 0; i < NettyClientConfig.getMaxCheckAliveRetry(); ++i) {
            try {
                Thread.sleep(NettyClientConfig.getCheckAliveInternal());
            }
            catch (InterruptedException exx) {
                LOGGER.error(exx.getMessage());
            }
            rmChannel = (Channel)this.channels.get(serverAddress);
            if (null != rmChannel && !rmChannel.isActive()) continue;
            return rmChannel;
        }
        if (i == NettyClientConfig.getMaxCheckAliveRetry()) {
            LOGGER.warn("channel " + rmChannel + " is not active after long wait, close it.");
            this.releaseChannel(rmChannel, serverAddress);
            return null;
        }
        return null;
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        LOGGER.error(FrameworkErrorCode.ExceptionCaught.getErrCode(), (Object)(NetUtil.toStringAddress((SocketAddress)ctx.channel().remoteAddress()) + "connect exception. " + cause.getMessage()), (Object)cause);
        this.releaseChannel(ctx.channel(), this.getAddressFromChannel(ctx.channel()));
        if (LOGGER.isInfoEnabled()) {
            LOGGER.info("remove exception rm channel:" + ctx.channel());
        }
        super.exceptionCaught(ctx, cause);
    }

    private void sendRegisterMessage(String serverAddress, Channel channel, String dbKey) {
        RegisterRMRequest message = new RegisterRMRequest(this.applicationId, this.transactionServiceGroup);
        message.setResourceIds(dbKey);
        try {
            super.sendAsyncRequestWithoutResponse(null, channel, message);
        }
        catch (FrameworkException e) {
            if (e.getErrcode() == FrameworkErrorCode.ChannelIsNotWritable && serverAddress != null) {
                this.releaseChannel(channel, serverAddress);
                if (LOGGER.isInfoEnabled()) {
                    LOGGER.info("remove channel:" + channel);
                }
            } else {
                LOGGER.error("", (Object)"register failed", (Object)e);
            }
        }
        catch (TimeoutException e) {
            LOGGER.error(e.getMessage());
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void registerResource(String resourceGroupId, String resourceId) {
        if (LOGGER.isInfoEnabled()) {
            LOGGER.info("register to RM resourceId:" + resourceId);
        }
        ConcurrentMap<String, Channel> concurrentMap = this.channels;
        synchronized (concurrentMap) {
            for (Map.Entry entry : this.channels.entrySet()) {
                String serverAddress = (String)entry.getKey();
                Channel rmChannel = (Channel)entry.getValue();
                if (LOGGER.isInfoEnabled()) {
                    LOGGER.info("register resource, resourceId:" + resourceId);
                }
                this.sendRegisterMessage(serverAddress, rmChannel, resourceId);
            }
        }
    }

    @Override
    public void sendResponse(long msgId, String serverAddress, Object msg) {
        if (LOGGER.isInfoEnabled()) {
            LOGGER.info("RmRpcClient sendResponse " + msg);
        }
        super.sendResponse(msgId, this.connect(serverAddress), msg);
    }

    public String getCustomerKeys() {
        return this.customerKeys;
    }

    public void setCustomerKeys(String customerKeys) {
        this.customerKeys = customerKeys;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void onRegisterMsgSuccess(String serverAddress, Channel channel, Object response, AbstractMessage requestMessage) {
        if (LOGGER.isInfoEnabled()) {
            LOGGER.info("register RM success. server version:" + ((RegisterRMResponse)response).getVersion() + ",channel:" + channel);
        }
        if (this.customerKeys == null) {
            ConcurrentMap<String, Channel> concurrentMap = this.channels;
            synchronized (concurrentMap) {
                this.channels.put(serverAddress, channel);
            }
            String dbKey = this.getMergedResourceKeys(this.resourceManager);
            RegisterRMRequest message = (RegisterRMRequest)requestMessage;
            if (message.getResourceIds() != null && !message.getResourceIds().equals(dbKey)) {
                this.sendRegisterMessage(serverAddress, channel, dbKey);
            }
        }
    }

    public String getMergedResourceKeys(ResourceManager resourceManager) {
        Map<String, Resource> managedResources = resourceManager.getManagedResources();
        Set<String> resourceIds = managedResources.keySet();
        if (!resourceIds.isEmpty()) {
            StringBuffer sb = new StringBuffer();
            boolean first = true;
            for (String resourceId : resourceIds) {
                if (first) {
                    first = false;
                } else {
                    sb.append(",");
                }
                sb.append(resourceId);
            }
            return sb.toString();
        }
        return null;
    }

    @Override
    public void onRegisterMsgFail(String serverAddress, Channel channel, Object response, AbstractMessage requestMessage) {
        if (response instanceof RegisterRMResponse && LOGGER.isInfoEnabled()) {
            LOGGER.info("register RM failed. server version:" + ((RegisterRMResponse)response).getVersion());
        }
        throw new FrameworkException("register RM failed.");
    }

    @Override
    public void destroyChannel(String serverAddress, Channel channel) {
        if (null == channel) {
            return;
        }
        try {
            if (channel.equals(this.channels.get(serverAddress))) {
                this.channels.remove(serverAddress);
            }
            this.nettyClientKeyPool.returnObject(this.poolKeyMap.get(serverAddress), (Object)channel);
        }
        catch (Exception exx) {
            LOGGER.error("return channel to rmPool error:" + exx.getMessage());
        }
    }
}

