/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.org.apache.hadoop.hbase.ipc;

import edu.umd.cs.findbugs.annotations.SuppressWarnings;
import java.io.Closeable;
import java.io.IOException;
import java.net.BindException;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.net.SocketException;
import java.net.UnknownHostException;
import java.nio.channels.CancelledKeyException;
import java.nio.channels.GatheringByteChannel;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.security.authorize.ServiceAuthorizationManager;
import org.apache.hudi.org.apache.hadoop.hbase.CellScanner;
import org.apache.hudi.org.apache.hadoop.hbase.Server;
import org.apache.hudi.org.apache.hadoop.hbase.ipc.BufferChain;
import org.apache.hudi.org.apache.hadoop.hbase.ipc.RpcScheduler;
import org.apache.hudi.org.apache.hadoop.hbase.ipc.RpcSchedulerContext;
import org.apache.hudi.org.apache.hadoop.hbase.ipc.RpcServer;
import org.apache.hudi.org.apache.hadoop.hbase.ipc.SimpleRpcServerResponder;
import org.apache.hudi.org.apache.hadoop.hbase.ipc.SimpleServerCall;
import org.apache.hudi.org.apache.hadoop.hbase.ipc.SimpleServerRpcConnection;
import org.apache.hudi.org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler;
import org.apache.hudi.org.apache.hadoop.hbase.security.HBasePolicyProvider;
import org.apache.hudi.org.apache.hadoop.hbase.security.token.AuthenticationTokenSecretManager;
import org.apache.hudi.org.apache.hadoop.hbase.util.Pair;
import org.apache.hudi.org.apache.hadoop.hbase.util.Threads;
import org.apache.hudi.org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.hudi.org.apache.hbase.thirdparty.com.google.protobuf.BlockingService;
import org.apache.hudi.org.apache.hbase.thirdparty.com.google.protobuf.Descriptors;
import org.apache.hudi.org.apache.hbase.thirdparty.com.google.protobuf.Message;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;

@InterfaceAudience.LimitedPrivate(value={"Configuration"})
public class SimpleRpcServer
extends RpcServer {
    protected int port;
    protected InetSocketAddress address;
    private int readThreads;
    protected int socketSendBufferSize = 0;
    protected final long purgeTimeout;
    private ConnectionManager connectionManager;
    private Listener listener = null;
    protected SimpleRpcServerResponder responder = null;

    public SimpleRpcServer(Server server, String name, List<RpcServer.BlockingServiceAndInterface> services, InetSocketAddress bindAddress, Configuration conf, RpcScheduler scheduler, boolean reservoirEnabled) throws IOException {
        super(server, name, services, bindAddress, conf, scheduler, reservoirEnabled);
        this.readThreads = conf.getInt("hbase.ipc.server.read.threadpool.size", 10);
        this.purgeTimeout = conf.getLong("hbase.ipc.client.call.purge.timeout", 120000L);
        this.listener = new Listener(name);
        this.port = this.listener.getAddress().getPort();
        this.responder = new SimpleRpcServerResponder(this);
        this.connectionManager = new ConnectionManager();
        this.initReconfigurable(conf);
        this.scheduler.init(new RpcSchedulerContext(this));
    }

    protected SimpleServerRpcConnection getConnection(SocketChannel channel, long time) {
        return new SimpleServerRpcConnection(this, channel, time);
    }

    protected void closeConnection(SimpleServerRpcConnection connection) {
        this.connectionManager.close(connection);
    }

    @Override
    public void setSocketSendBufSize(int size) {
        this.socketSendBufferSize = size;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public synchronized void start() {
        if (this.started) {
            return;
        }
        this.authTokenSecretMgr = this.createSecretManager();
        if (this.authTokenSecretMgr != null) {
            AuthenticationTokenSecretManager authenticationTokenSecretManager = this.authTokenSecretMgr;
            synchronized (authenticationTokenSecretManager) {
                this.setSecretManager(this.authTokenSecretMgr);
                this.authTokenSecretMgr.start();
            }
        }
        this.authManager = new ServiceAuthorizationManager();
        HBasePolicyProvider.init(this.conf, this.authManager);
        this.responder.start();
        this.listener.start();
        this.scheduler.start();
        this.started = true;
    }

    @Override
    public synchronized void stop() {
        LOG.info("Stopping server on " + this.port);
        this.running = false;
        if (this.authTokenSecretMgr != null) {
            this.authTokenSecretMgr.stop();
            this.authTokenSecretMgr = null;
        }
        this.listener.interrupt();
        this.listener.doStop();
        this.responder.interrupt();
        this.scheduler.stop();
        this.notifyAll();
    }

    @Override
    public synchronized void join() throws InterruptedException {
        while (this.running) {
            this.wait();
        }
    }

    @Override
    public synchronized InetSocketAddress getListenerAddress() {
        if (this.listener == null) {
            return null;
        }
        return this.listener.getAddress();
    }

    @Override
    public Pair<Message, CellScanner> call(BlockingService service2, Descriptors.MethodDescriptor md, Message param, CellScanner cellScanner, long receiveTime, MonitoredRPCHandler status) throws IOException {
        return this.call(service2, md, param, cellScanner, receiveTime, status, System.currentTimeMillis(), 0);
    }

    @Override
    public Pair<Message, CellScanner> call(BlockingService service2, Descriptors.MethodDescriptor md, Message param, CellScanner cellScanner, long receiveTime, MonitoredRPCHandler status, long startTime, int timeout) throws IOException {
        SimpleServerCall fakeCall = new SimpleServerCall(-1, service2, md, null, param, cellScanner, null, -1L, null, receiveTime, timeout, this.bbAllocator, this.cellBlockBuilder, null, null);
        return this.call(fakeCall, status);
    }

    protected long channelWrite(GatheringByteChannel channel, BufferChain bufferChain) throws IOException {
        long count2 = bufferChain.write(channel, 65536);
        if (count2 > 0L) {
            this.metrics.sentBytes(count2);
        }
        return count2;
    }

    public static void bind(ServerSocket socket, InetSocketAddress address, int backlog) throws IOException {
        try {
            socket.bind(address, backlog);
        }
        catch (BindException e) {
            BindException bindException = new BindException("Problem binding to " + address + " : " + e.getMessage());
            bindException.initCause(e);
            throw bindException;
        }
        catch (SocketException e) {
            if ("Unresolved address".equals(e.getMessage())) {
                throw new UnknownHostException("Invalid hostname for server: " + address.getHostName());
            }
            throw e;
        }
    }

    @Override
    public int getNumOpenConnections() {
        return this.connectionManager.size();
    }

    private class ConnectionManager {
        private final AtomicInteger count = new AtomicInteger();
        private final Set<SimpleServerRpcConnection> connections;
        private final Timer idleScanTimer;
        private final int idleScanThreshold;
        private final int idleScanInterval;
        private final int maxIdleTime;
        private final int maxIdleToClose;

        ConnectionManager() {
            this.idleScanTimer = new Timer("RpcServer idle connection scanner for port " + SimpleRpcServer.this.port, true);
            this.idleScanThreshold = SimpleRpcServer.this.conf.getInt("hbase.ipc.client.idlethreshold", 4000);
            this.idleScanInterval = SimpleRpcServer.this.conf.getInt("hbase.ipc.client.connection.idle-scan-interval.ms", 10000);
            this.maxIdleTime = 2 * SimpleRpcServer.this.conf.getInt("hbase.ipc.client.connection.maxidletime", 10000);
            this.maxIdleToClose = SimpleRpcServer.this.conf.getInt("hbase.ipc.client.kill.max", 10);
            int handlerCount = SimpleRpcServer.this.conf.getInt("hbase.regionserver.handler.count", 30);
            int maxConnectionQueueSize = handlerCount * SimpleRpcServer.this.conf.getInt("hbase.ipc.server.handler.queue.size", 100);
            this.connections = Collections.newSetFromMap(new ConcurrentHashMap(maxConnectionQueueSize, 0.75f, SimpleRpcServer.this.readThreads + 2));
        }

        private boolean add(SimpleServerRpcConnection connection) {
            boolean added = this.connections.add(connection);
            if (added) {
                this.count.getAndIncrement();
            }
            return added;
        }

        private boolean remove(SimpleServerRpcConnection connection) {
            boolean removed = this.connections.remove(connection);
            if (removed) {
                this.count.getAndDecrement();
            }
            return removed;
        }

        int size() {
            return this.count.get();
        }

        SimpleServerRpcConnection[] toArray() {
            return this.connections.toArray(new SimpleServerRpcConnection[0]);
        }

        SimpleServerRpcConnection register(SocketChannel channel) {
            SimpleServerRpcConnection connection = SimpleRpcServer.this.getConnection(channel, System.currentTimeMillis());
            this.add(connection);
            if (RpcServer.LOG.isTraceEnabled()) {
                RpcServer.LOG.trace("Connection from " + connection + "; connections=" + this.size() + ", queued calls size (bytes)=" + SimpleRpcServer.this.callQueueSizeInBytes.sum() + ", general queued calls=" + SimpleRpcServer.this.scheduler.getGeneralQueueLength() + ", priority queued calls=" + SimpleRpcServer.this.scheduler.getPriorityQueueLength() + ", meta priority queued calls=" + SimpleRpcServer.this.scheduler.getMetaPriorityQueueLength());
            }
            return connection;
        }

        boolean close(SimpleServerRpcConnection connection) {
            boolean exists = this.remove(connection);
            if (exists) {
                if (RpcServer.LOG.isTraceEnabled()) {
                    RpcServer.LOG.trace(Thread.currentThread().getName() + ": disconnecting client " + connection + ". Number of active connections: " + this.size());
                }
                connection.close();
            }
            return exists;
        }

        synchronized void closeIdle(boolean scanAll) {
            long minLastContact = System.currentTimeMillis() - (long)this.maxIdleTime;
            int closed = 0;
            for (SimpleServerRpcConnection connection : this.connections) {
                if ((scanAll || this.size() >= this.idleScanThreshold) && (!connection.isIdle() || connection.getLastContact() >= minLastContact || !this.close(connection) || scanAll || ++closed != this.maxIdleToClose)) continue;
                break;
            }
        }

        void closeAll() {
            for (SimpleServerRpcConnection connection : this.toArray()) {
                this.close(connection);
            }
        }

        void startIdleScan() {
            this.scheduleIdleScanTask();
        }

        void stopIdleScan() {
            this.idleScanTimer.cancel();
        }

        private void scheduleIdleScanTask() {
            if (!SimpleRpcServer.this.running) {
                return;
            }
            TimerTask idleScanTask = new TimerTask(){

                @Override
                public void run() {
                    if (!SimpleRpcServer.this.running) {
                        return;
                    }
                    if (RpcServer.LOG.isTraceEnabled()) {
                        RpcServer.LOG.trace("running");
                    }
                    try {
                        ConnectionManager.this.closeIdle(false);
                    }
                    finally {
                        ConnectionManager.this.scheduleIdleScanTask();
                    }
                }
            };
            this.idleScanTimer.schedule(idleScanTask, this.idleScanInterval);
        }
    }

    private class Listener
    extends Thread {
        private ServerSocketChannel acceptChannel;
        private Selector selector;
        private Reader[] readers;
        private int currentReader;
        private final int readerPendingConnectionQueueLength;
        private ExecutorService readPool;

        public Listener(String name) throws IOException {
            super(name);
            this.acceptChannel = null;
            this.selector = null;
            this.readers = null;
            this.currentReader = 0;
            int backlogLength = SimpleRpcServer.this.conf.getInt("hbase.ipc.server.listen.queue.size", 128);
            this.readerPendingConnectionQueueLength = SimpleRpcServer.this.conf.getInt("hbase.ipc.server.read.connection-queue.size", 100);
            this.acceptChannel = ServerSocketChannel.open();
            this.acceptChannel.configureBlocking(false);
            SimpleRpcServer.bind(this.acceptChannel.socket(), SimpleRpcServer.this.bindAddress, backlogLength);
            SimpleRpcServer.this.port = this.acceptChannel.socket().getLocalPort();
            SimpleRpcServer.this.address = (InetSocketAddress)this.acceptChannel.socket().getLocalSocketAddress();
            this.selector = Selector.open();
            this.readers = new Reader[SimpleRpcServer.this.readThreads];
            this.readPool = Executors.newFixedThreadPool(SimpleRpcServer.this.readThreads, new ThreadFactoryBuilder().setNameFormat("Reader=%d,bindAddress=" + SimpleRpcServer.this.bindAddress.getHostName() + ",port=" + SimpleRpcServer.this.port).setDaemon(true).setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER).build());
            for (int i = 0; i < SimpleRpcServer.this.readThreads; ++i) {
                Reader reader;
                this.readers[i] = reader = new Reader();
                this.readPool.execute(reader);
            }
            RpcServer.LOG.info(this.getName() + ": started " + SimpleRpcServer.this.readThreads + " reader(s) listening on port=" + SimpleRpcServer.this.port);
            this.acceptChannel.register(this.selector, 16);
            this.setName("Listener,port=" + SimpleRpcServer.this.port);
            this.setDaemon(true);
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        @SuppressWarnings(value={"IS2_INCONSISTENT_SYNC"}, justification="selector access is not synchronized; seems fine but concerned changing it will have per impact")
        public void run() {
            RpcServer.LOG.info(this.getName() + ": starting");
            SimpleRpcServer.this.connectionManager.startIdleScan();
            while (SimpleRpcServer.this.running) {
                SelectionKey key = null;
                try {
                    this.selector.select();
                    Iterator<SelectionKey> iter = this.selector.selectedKeys().iterator();
                    while (iter.hasNext()) {
                        block16: {
                            key = iter.next();
                            iter.remove();
                            try {
                                if (key.isValid() && key.isAcceptable()) {
                                    this.doAccept(key);
                                }
                            }
                            catch (IOException ignored) {
                                if (!RpcServer.LOG.isTraceEnabled()) break block16;
                                RpcServer.LOG.trace("ignored", (Throwable)ignored);
                            }
                        }
                        key = null;
                    }
                }
                catch (OutOfMemoryError e) {
                    if (SimpleRpcServer.this.errorHandler != null) {
                        if (!SimpleRpcServer.this.errorHandler.checkOOME(e)) continue;
                        RpcServer.LOG.info(this.getName() + ": exiting on OutOfMemoryError");
                        this.closeCurrentConnection(key, e);
                        SimpleRpcServer.this.connectionManager.closeIdle(true);
                        return;
                    }
                    RpcServer.LOG.warn(this.getName() + ": OutOfMemoryError in server select", (Throwable)e);
                    this.closeCurrentConnection(key, e);
                    SimpleRpcServer.this.connectionManager.closeIdle(true);
                    try {
                        Thread.sleep(60000L);
                    }
                    catch (InterruptedException ex) {
                        RpcServer.LOG.debug("Interrupted while sleeping");
                    }
                }
                catch (Exception e) {
                    this.closeCurrentConnection(key, e);
                }
            }
            RpcServer.LOG.info(this.getName() + ": stopping");
            Listener listener = this;
            synchronized (listener) {
                block17: {
                    try {
                        this.acceptChannel.close();
                        this.selector.close();
                    }
                    catch (IOException ignored) {
                        if (!RpcServer.LOG.isTraceEnabled()) break block17;
                        RpcServer.LOG.trace("ignored", (Throwable)ignored);
                    }
                }
                this.selector = null;
                this.acceptChannel = null;
                SimpleRpcServer.this.connectionManager.stopIdleScan();
                SimpleRpcServer.this.connectionManager.closeAll();
            }
        }

        private void closeCurrentConnection(SelectionKey key, Throwable e) {
            SimpleServerRpcConnection c;
            if (key != null && (c = (SimpleServerRpcConnection)key.attachment()) != null) {
                SimpleRpcServer.this.closeConnection(c);
                key.attach(null);
            }
        }

        InetSocketAddress getAddress() {
            return SimpleRpcServer.this.address;
        }

        void doAccept(SelectionKey key) throws InterruptedException, IOException, OutOfMemoryError {
            SocketChannel channel;
            ServerSocketChannel server = (ServerSocketChannel)key.channel();
            while ((channel = server.accept()) != null) {
                channel.configureBlocking(false);
                channel.socket().setTcpNoDelay(SimpleRpcServer.this.tcpNoDelay);
                channel.socket().setKeepAlive(SimpleRpcServer.this.tcpKeepAlive);
                Reader reader = this.getReader();
                SimpleServerRpcConnection c = SimpleRpcServer.this.connectionManager.register(channel);
                if (c == null) {
                    if (!channel.isOpen()) continue;
                    IOUtils.cleanupWithLogger((Logger)RpcServer.LOG, (Closeable[])new Closeable[]{channel});
                    continue;
                }
                key.attach(c);
                reader.addConnection(c);
            }
        }

        void doRead(SelectionKey key) throws InterruptedException {
            int count2;
            SimpleServerRpcConnection c = (SimpleServerRpcConnection)key.attachment();
            if (c == null) {
                return;
            }
            c.setLastContact(System.currentTimeMillis());
            try {
                count2 = c.readAndProcess();
            }
            catch (InterruptedException ieo) {
                RpcServer.LOG.info(Thread.currentThread().getName() + ": readAndProcess caught InterruptedException", (Throwable)ieo);
                throw ieo;
            }
            catch (Exception e) {
                if (RpcServer.LOG.isDebugEnabled()) {
                    RpcServer.LOG.debug("Caught exception while reading:", (Throwable)e);
                }
                count2 = -1;
            }
            if (count2 < 0) {
                SimpleRpcServer.this.closeConnection(c);
                c = null;
            } else {
                c.setLastContact(System.currentTimeMillis());
            }
        }

        synchronized void doStop() {
            if (this.selector != null) {
                this.selector.wakeup();
                Thread.yield();
            }
            if (this.acceptChannel != null) {
                try {
                    this.acceptChannel.socket().close();
                }
                catch (IOException e) {
                    RpcServer.LOG.info(this.getName() + ": exception in closing listener socket. " + e);
                }
            }
            this.readPool.shutdownNow();
        }

        Reader getReader() {
            this.currentReader = (this.currentReader + 1) % this.readers.length;
            return this.readers[this.currentReader];
        }

        private class Reader
        implements Runnable {
            private final LinkedBlockingQueue<SimpleServerRpcConnection> pendingConnections;
            private final Selector readSelector;

            Reader() throws IOException {
                this.pendingConnections = new LinkedBlockingQueue(Listener.this.readerPendingConnectionQueueLength);
                this.readSelector = Selector.open();
            }

            @Override
            public void run() {
                try {
                    this.doRunLoop();
                }
                finally {
                    try {
                        this.readSelector.close();
                    }
                    catch (IOException ioe) {
                        RpcServer.LOG.error(Listener.this.getName() + ": error closing read selector in " + Listener.this.getName(), (Throwable)ioe);
                    }
                }
            }

            private synchronized void doRunLoop() {
                while (SimpleRpcServer.this.running) {
                    try {
                        int size;
                        for (int i = size = this.pendingConnections.size(); i > 0; --i) {
                            SimpleServerRpcConnection conn = this.pendingConnections.take();
                            conn.channel.register(this.readSelector, 1, conn);
                        }
                        this.readSelector.select();
                        Iterator<SelectionKey> iter = this.readSelector.selectedKeys().iterator();
                        while (iter.hasNext()) {
                            SelectionKey key = iter.next();
                            iter.remove();
                            if (key.isValid() && key.isReadable()) {
                                Listener.this.doRead(key);
                            }
                            Object var3_7 = null;
                        }
                    }
                    catch (InterruptedException e) {
                        if (!SimpleRpcServer.this.running) continue;
                        RpcServer.LOG.info(Thread.currentThread().getName() + " unexpectedly interrupted", (Throwable)e);
                    }
                    catch (CancelledKeyException e) {
                        RpcServer.LOG.error(Listener.this.getName() + ": CancelledKeyException in Reader", (Throwable)e);
                    }
                    catch (IOException ex) {
                        RpcServer.LOG.info(Listener.this.getName() + ": IOException in Reader", (Throwable)ex);
                    }
                }
            }

            public void addConnection(SimpleServerRpcConnection conn) throws IOException {
                this.pendingConnections.add(conn);
                this.readSelector.wakeup();
            }
        }
    }
}

