/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.hbase.ipc;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.net.UnknownHostException;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.client.MetricsConnection;
import org.apache.hadoop.hbase.codec.Codec;
import org.apache.hadoop.hbase.codec.KeyValueCodec;
import org.apache.hadoop.hbase.ipc.BlockingRpcCallback;
import org.apache.hadoop.hbase.ipc.Call;
import org.apache.hadoop.hbase.ipc.CellBlockBuilder;
import org.apache.hadoop.hbase.ipc.ConnectionId;
import org.apache.hadoop.hbase.ipc.FailedServerException;
import org.apache.hadoop.hbase.ipc.FailedServers;
import org.apache.hadoop.hbase.ipc.HBaseRpcController;
import org.apache.hadoop.hbase.ipc.HBaseRpcControllerImpl;
import org.apache.hadoop.hbase.ipc.IPCUtil;
import org.apache.hadoop.hbase.ipc.RpcClient;
import org.apache.hadoop.hbase.ipc.RpcConnection;
import org.apache.hadoop.hbase.ipc.ServerTooBusyException;
import org.apache.hadoop.hbase.ipc.StoppedRpcClientException;
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.security.UserProvider;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.PoolMap;
import org.apache.hadoop.hbase.util.Threads;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.ipc.RemoteException;
import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
import org.apache.hbase.thirdparty.com.google.common.cache.CacheBuilder;
import org.apache.hbase.thirdparty.com.google.common.cache.CacheLoader;
import org.apache.hbase.thirdparty.com.google.common.cache.LoadingCache;
import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.hbase.thirdparty.com.google.protobuf.BlockingRpcChannel;
import org.apache.hbase.thirdparty.com.google.protobuf.Descriptors;
import org.apache.hbase.thirdparty.com.google.protobuf.Message;
import org.apache.hbase.thirdparty.com.google.protobuf.RpcCallback;
import org.apache.hbase.thirdparty.com.google.protobuf.RpcChannel;
import org.apache.hbase.thirdparty.com.google.protobuf.RpcController;
import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException;
import org.apache.hbase.thirdparty.io.netty.util.HashedWheelTimer;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@InterfaceAudience.Private
public abstract class AbstractRpcClient<T extends RpcConnection>
implements RpcClient {
    public static final Logger LOG = LoggerFactory.getLogger(AbstractRpcClient.class);
    protected static final HashedWheelTimer WHEEL_TIMER = new HashedWheelTimer(new ThreadFactoryBuilder().setNameFormat("RpcClient-timer-pool-%d").setDaemon(true).setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER).build(), 10L, TimeUnit.MILLISECONDS);
    private static final ScheduledExecutorService IDLE_CONN_SWEEPER = Executors.newScheduledThreadPool(1, new ThreadFactoryBuilder().setNameFormat("Idle-Rpc-Conn-Sweeper-pool-%d").setDaemon(true).setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER).build());
    private boolean running = true;
    protected final Configuration conf;
    protected final String clusterId;
    protected final SocketAddress localAddr;
    protected final MetricsConnection metrics;
    protected final UserProvider userProvider;
    protected final CellBlockBuilder cellBlockBuilder;
    protected final int minIdleTimeBeforeClose;
    protected final int maxRetries;
    protected final long failureSleep;
    protected final boolean tcpNoDelay;
    protected final boolean tcpKeepAlive;
    protected final Codec codec;
    protected final CompressionCodec compressor;
    protected final boolean fallbackAllowed;
    protected final FailedServers failedServers;
    protected final int connectTO;
    protected final int readTO;
    protected final int writeTO;
    private final PoolMap<ConnectionId, T> connections;
    private final AtomicInteger callIdCnt = new AtomicInteger(0);
    private final ScheduledFuture<?> cleanupIdleConnectionTask;
    private int maxConcurrentCallsPerServer;
    private static final LoadingCache<InetSocketAddress, AtomicInteger> concurrentCounterCache = CacheBuilder.newBuilder().expireAfterAccess(1L, TimeUnit.HOURS).build(new CacheLoader<InetSocketAddress, AtomicInteger>(){

        @Override
        public AtomicInteger load(InetSocketAddress key) throws Exception {
            return new AtomicInteger(0);
        }
    });

    public AbstractRpcClient(Configuration conf, String clusterId, SocketAddress localAddr, MetricsConnection metrics) {
        this.userProvider = UserProvider.instantiate(conf);
        this.localAddr = localAddr;
        this.tcpKeepAlive = conf.getBoolean("hbase.ipc.client.tcpkeepalive", true);
        this.clusterId = clusterId != null ? clusterId : "default-cluster";
        this.failureSleep = conf.getLong("hbase.client.pause", 100L);
        this.maxRetries = conf.getInt("hbase.ipc.client.connect.max.retries", 0);
        this.tcpNoDelay = conf.getBoolean("hbase.ipc.client.tcpnodelay", true);
        this.cellBlockBuilder = new CellBlockBuilder(conf);
        this.minIdleTimeBeforeClose = conf.getInt("hbase.ipc.client.connection.minIdleTimeBeforeClose", 120000);
        this.conf = conf;
        this.codec = this.getCodec();
        this.compressor = AbstractRpcClient.getCompressor(conf);
        this.fallbackAllowed = conf.getBoolean("hbase.ipc.client.fallback-to-simple-auth-allowed", false);
        this.failedServers = new FailedServers(conf);
        this.connectTO = conf.getInt("hbase.ipc.client.socket.timeout.connect", 10000);
        this.readTO = conf.getInt("hbase.ipc.client.socket.timeout.read", 20000);
        this.writeTO = conf.getInt("hbase.ipc.client.socket.timeout.write", 60000);
        this.metrics = metrics;
        this.maxConcurrentCallsPerServer = conf.getInt("hbase.client.perserver.requests.threshold", Integer.MAX_VALUE);
        this.connections = new PoolMap(AbstractRpcClient.getPoolType(conf), AbstractRpcClient.getPoolSize(conf));
        this.cleanupIdleConnectionTask = IDLE_CONN_SWEEPER.scheduleAtFixedRate(new Runnable(){

            @Override
            public void run() {
                AbstractRpcClient.this.cleanupIdleConnections();
            }
        }, this.minIdleTimeBeforeClose, this.minIdleTimeBeforeClose, TimeUnit.MILLISECONDS);
        if (LOG.isDebugEnabled()) {
            LOG.debug("Codec=" + this.codec + ", compressor=" + this.compressor + ", tcpKeepAlive=" + this.tcpKeepAlive + ", tcpNoDelay=" + this.tcpNoDelay + ", connectTO=" + this.connectTO + ", readTO=" + this.readTO + ", writeTO=" + this.writeTO + ", minIdleTimeBeforeClose=" + this.minIdleTimeBeforeClose + ", maxRetries=" + this.maxRetries + ", fallbackAllowed=" + this.fallbackAllowed + ", bind address=" + (this.localAddr != null ? this.localAddr : "null"));
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void cleanupIdleConnections() {
        long closeBeforeTime = EnvironmentEdgeManager.currentTime() - (long)this.minIdleTimeBeforeClose;
        PoolMap<ConnectionId, T> poolMap = this.connections;
        synchronized (poolMap) {
            for (RpcConnection conn : this.connections.values()) {
                if (conn.getLastTouched() >= closeBeforeTime || conn.isActive()) continue;
                if (LOG.isTraceEnabled()) {
                    LOG.trace("Cleanup idle connection to {}", (Object)conn.remoteId().address);
                }
                this.connections.remove(conn.remoteId(), conn);
                conn.cleanupConnection();
            }
        }
    }

    public static String getDefaultCodec(Configuration c) {
        return c.get("hbase.client.default.rpc.codec", KeyValueCodec.class.getCanonicalName());
    }

    Codec getCodec() {
        String className = this.conf.get("hbase.client.rpc.codec", AbstractRpcClient.getDefaultCodec(this.conf));
        if (className == null || className.length() == 0) {
            return null;
        }
        try {
            return (Codec)Class.forName(className).getDeclaredConstructor(new Class[0]).newInstance(new Object[0]);
        }
        catch (Exception e) {
            throw new RuntimeException("Failed getting codec " + className, e);
        }
    }

    @Override
    public boolean hasCellBlockSupport() {
        return this.codec != null;
    }

    boolean isTcpNoDelay() {
        return this.tcpNoDelay;
    }

    private static CompressionCodec getCompressor(Configuration conf) {
        String className = conf.get("hbase.client.rpc.compressor", null);
        if (className == null || className.isEmpty()) {
            return null;
        }
        try {
            return (CompressionCodec)Class.forName(className).getDeclaredConstructor(new Class[0]).newInstance(new Object[0]);
        }
        catch (Exception e) {
            throw new RuntimeException("Failed getting compressor " + className, e);
        }
    }

    private static PoolMap.PoolType getPoolType(Configuration config) {
        return PoolMap.PoolType.valueOf(config.get("hbase.client.ipc.pool.type"), PoolMap.PoolType.RoundRobin);
    }

    private static int getPoolSize(Configuration config) {
        int poolSize = config.getInt("hbase.client.ipc.pool.size", 1);
        if (poolSize <= 0) {
            LOG.warn("{} must be positive. Using default value: 1", (Object)"hbase.client.ipc.pool.size");
            return 1;
        }
        return poolSize;
    }

    private int nextCallId() {
        int next;
        int id;
        while (!this.callIdCnt.compareAndSet(id, next = (id = this.callIdCnt.get()) < Integer.MAX_VALUE ? id + 1 : 0)) {
        }
        return id;
    }

    private Message callBlockingMethod(Descriptors.MethodDescriptor md, HBaseRpcController hrc, Message param, Message returnType, User ticket, InetSocketAddress isa) throws ServiceException {
        Message val;
        BlockingRpcCallback<Message> done = new BlockingRpcCallback<Message>();
        this.callMethod(md, hrc, param, returnType, ticket, isa, done);
        try {
            val = done.get();
        }
        catch (IOException e) {
            throw new ServiceException(e);
        }
        if (hrc.failed()) {
            throw new ServiceException(hrc.getFailed());
        }
        return val;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private T getConnection(ConnectionId remoteId) throws IOException {
        RpcConnection conn;
        if (this.failedServers.isFailedServer(remoteId.getAddress())) {
            if (LOG.isDebugEnabled()) {
                LOG.debug("Not trying to connect to " + remoteId.address + " this server is in the failed servers list");
            }
            throw new FailedServerException("This server is in the failed servers list: " + remoteId.address);
        }
        PoolMap<ConnectionId, T> poolMap = this.connections;
        synchronized (poolMap) {
            if (!this.running) {
                throw new StoppedRpcClientException();
            }
            conn = this.connections.getOrCreate(remoteId, () -> this.createConnection(remoteId));
            conn.setLastTouched(EnvironmentEdgeManager.currentTime());
        }
        return (T)conn;
    }

    protected abstract T createConnection(ConnectionId var1) throws IOException;

    private void onCallFinished(Call call, HBaseRpcController hrc, InetSocketAddress addr, RpcCallback<Message> callback) {
        call.callStats.setCallTimeMs(EnvironmentEdgeManager.currentTime() - call.getStartTime());
        if (this.metrics != null) {
            this.metrics.updateRpc(call.md, call.param, call.callStats);
        }
        if (LOG.isTraceEnabled()) {
            LOG.trace("Call: " + call.md.getName() + ", callTime: " + call.callStats.getCallTimeMs() + "ms");
        }
        if (call.error != null) {
            if (call.error instanceof RemoteException) {
                call.error.fillInStackTrace();
                hrc.setFailed(call.error);
            } else {
                hrc.setFailed(IPCUtil.wrapException(addr, call.error));
            }
            callback.run(null);
        } else {
            hrc.setDone(call.cells);
            callback.run(call.response);
        }
    }

    Call callMethod(Descriptors.MethodDescriptor md, final HBaseRpcController hrc, Message param, Message returnType, User ticket, final InetSocketAddress addr, final RpcCallback<Message> callback) {
        MetricsConnection.CallStats cs = MetricsConnection.newCallStats();
        cs.setStartTime(EnvironmentEdgeManager.currentTime());
        if (param instanceof ClientProtos.MultiRequest) {
            ClientProtos.MultiRequest req = (ClientProtos.MultiRequest)param;
            int numActions = 0;
            for (ClientProtos.RegionAction regionAction : req.getRegionActionList()) {
                numActions += regionAction.getActionCount();
            }
            cs.setNumActionsPerServer(numActions);
        }
        final AtomicInteger counter = concurrentCounterCache.getUnchecked(addr);
        Call call = new Call(this.nextCallId(), md, param, hrc.cellScanner(), returnType, hrc.getCallTimeout(), hrc.getPriority(), new RpcCallback<Call>(){

            @Override
            public void run(Call call) {
                counter.decrementAndGet();
                AbstractRpcClient.this.onCallFinished(call, hrc, addr, callback);
            }
        }, cs);
        ConnectionId remoteId = new ConnectionId(ticket, md.getService().getName(), addr);
        int count = counter.incrementAndGet();
        try {
            if (count > this.maxConcurrentCallsPerServer) {
                throw new ServerTooBusyException(addr, count);
            }
            cs.setConcurrentCallsPerServer(count);
            T connection = this.getConnection(remoteId);
            ((RpcConnection)connection).sendRequest(call, hrc);
        }
        catch (Exception e) {
            call.setException(IPCUtil.toIOE(e));
        }
        return call;
    }

    InetSocketAddress createAddr(ServerName sn) throws UnknownHostException {
        InetSocketAddress addr = new InetSocketAddress(sn.getHostname(), sn.getPort());
        if (addr.isUnresolved()) {
            throw new UnknownHostException("can not resolve " + sn.getServerName());
        }
        return addr;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void cancelConnections(ServerName sn) {
        PoolMap<ConnectionId, T> poolMap = this.connections;
        synchronized (poolMap) {
            for (RpcConnection connection : this.connections.values()) {
                ConnectionId remoteId = connection.remoteId();
                if (remoteId.address.getPort() != sn.getPort() || !remoteId.address.getHostName().equals(sn.getHostname())) continue;
                LOG.info("The server on " + sn.toString() + " is dead - stopping the connection " + connection.remoteId);
                this.connections.remove(remoteId, connection);
                connection.shutdown();
                connection.cleanupConnection();
            }
        }
    }

    static HBaseRpcController configureHBaseRpcController(RpcController controller, int channelOperationTimeout) {
        HBaseRpcController hrc;
        if (controller != null && controller instanceof HBaseRpcController) {
            hrc = (HBaseRpcController)controller;
            if (!hrc.hasCallTimeout()) {
                hrc.setCallTimeout(channelOperationTimeout);
            }
        } else {
            hrc = new HBaseRpcControllerImpl();
            hrc.setCallTimeout(channelOperationTimeout);
        }
        return hrc;
    }

    protected abstract void closeInternal();

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void close() {
        List<T> connToClose;
        if (LOG.isDebugEnabled()) {
            LOG.debug("Stopping rpc client");
        }
        Iterator iterator = this.connections;
        synchronized (iterator) {
            if (!this.running) {
                return;
            }
            this.running = false;
            connToClose = this.connections.values();
            this.connections.clear();
        }
        this.cleanupIdleConnectionTask.cancel(true);
        for (RpcConnection conn : connToClose) {
            conn.shutdown();
        }
        this.closeInternal();
        for (RpcConnection conn : connToClose) {
            conn.cleanupConnection();
        }
    }

    @Override
    public BlockingRpcChannel createBlockingRpcChannel(ServerName sn, User ticket, int rpcTimeout) throws UnknownHostException {
        return new BlockingRpcChannelImplementation(this, this.createAddr(sn), ticket, rpcTimeout);
    }

    @Override
    public RpcChannel createRpcChannel(ServerName sn, User user, int rpcTimeout) throws UnknownHostException {
        return new RpcChannelImplementation(this, this.createAddr(sn), user, rpcTimeout);
    }

    public static class RpcChannelImplementation
    extends AbstractRpcChannel
    implements RpcChannel {
        protected RpcChannelImplementation(AbstractRpcClient<?> rpcClient, InetSocketAddress addr, User ticket, int rpcTimeout) throws UnknownHostException {
            super(rpcClient, addr, ticket, rpcTimeout);
        }

        @Override
        public void callMethod(Descriptors.MethodDescriptor md, RpcController controller, Message param, Message returnType, RpcCallback<Message> done) {
            this.rpcClient.callMethod(md, this.configureRpcController(Preconditions.checkNotNull(controller, "RpcController can not be null for async rpc call")), param, returnType, this.ticket, this.addr, done);
        }
    }

    public static class BlockingRpcChannelImplementation
    extends AbstractRpcChannel
    implements BlockingRpcChannel {
        protected BlockingRpcChannelImplementation(AbstractRpcClient<?> rpcClient, InetSocketAddress addr, User ticket, int rpcTimeout) {
            super(rpcClient, addr, ticket, rpcTimeout);
        }

        @Override
        public Message callBlockingMethod(Descriptors.MethodDescriptor md, RpcController controller, Message param, Message returnType) throws ServiceException {
            return this.rpcClient.callBlockingMethod(md, this.configureRpcController(controller), param, returnType, this.ticket, this.addr);
        }
    }

    private static class AbstractRpcChannel {
        protected final InetSocketAddress addr;
        protected final AbstractRpcClient<?> rpcClient;
        protected final User ticket;
        protected final int rpcTimeout;

        protected AbstractRpcChannel(AbstractRpcClient<?> rpcClient, InetSocketAddress addr, User ticket, int rpcTimeout) {
            this.addr = addr;
            this.rpcClient = rpcClient;
            this.ticket = ticket;
            this.rpcTimeout = rpcTimeout;
        }

        protected HBaseRpcController configureRpcController(RpcController controller) {
            HBaseRpcController hrc;
            if (controller != null && controller instanceof HBaseRpcController) {
                hrc = (HBaseRpcController)controller;
                if (!hrc.hasCallTimeout()) {
                    hrc.setCallTimeout(this.rpcTimeout);
                }
            } else {
                hrc = new HBaseRpcControllerImpl();
                hrc.setCallTimeout(this.rpcTimeout);
            }
            return hrc;
        }
    }
}

