/*
 * Decompiled with CFR 0.152.
 */
package org.apache.accumulo.server.rpc;

import com.google.common.base.Preconditions;
import com.google.common.net.HostAndPort;
import java.io.IOException;
import java.lang.reflect.Field;
import java.net.BindException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.net.UnknownHostException;
import java.util.Arrays;
import java.util.HashSet;
import java.util.Random;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import javax.net.ssl.SSLServerSocket;
import javax.security.auth.callback.CallbackHandler;
import org.apache.accumulo.core.conf.AccumuloConfiguration;
import org.apache.accumulo.core.conf.Property;
import org.apache.accumulo.core.rpc.SslConnectionParams;
import org.apache.accumulo.core.rpc.ThriftUtil;
import org.apache.accumulo.core.rpc.UGIAssumingTransportFactory;
import org.apache.accumulo.core.util.Daemon;
import org.apache.accumulo.core.util.SimpleThreadPool;
import org.apache.accumulo.core.util.UtilWaitThread;
import org.apache.accumulo.fate.util.LoggingRunnable;
import org.apache.accumulo.server.AccumuloServerContext;
import org.apache.accumulo.server.rpc.ClientInfoProcessorFactory;
import org.apache.accumulo.server.rpc.CustomNonBlockingServer;
import org.apache.accumulo.server.rpc.SaslServerConnectionParams;
import org.apache.accumulo.server.rpc.SaslServerDigestCallbackHandler;
import org.apache.accumulo.server.rpc.ServerAddress;
import org.apache.accumulo.server.rpc.TNonblockingServerSocket;
import org.apache.accumulo.server.rpc.ThriftServerType;
import org.apache.accumulo.server.rpc.TimedProcessor;
import org.apache.accumulo.server.rpc.UGIAssumingProcessor;
import org.apache.accumulo.server.util.Halt;
import org.apache.accumulo.server.util.time.SimpleTimer;
import org.apache.hadoop.security.SaslRpcServer;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.thrift.TProcessor;
import org.apache.thrift.TProcessorFactory;
import org.apache.thrift.protocol.TProtocolFactory;
import org.apache.thrift.server.THsHaServer;
import org.apache.thrift.server.TServer;
import org.apache.thrift.server.TThreadPoolServer;
import org.apache.thrift.transport.TNonblockingServerTransport;
import org.apache.thrift.transport.TSSLTransportFactory;
import org.apache.thrift.transport.TSaslServerTransport;
import org.apache.thrift.transport.TServerSocket;
import org.apache.thrift.transport.TServerTransport;
import org.apache.thrift.transport.TTransportException;
import org.apache.thrift.transport.TTransportFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class TServerUtils {
    private static final Logger log = LoggerFactory.getLogger(TServerUtils.class);
    public static final ThreadLocal<String> clientAddress = new ThreadLocal();

    public static ServerAddress startServer(AccumuloServerContext service, String hostname, Property portHintProperty, TProcessor processor, String serverName, String threadName, Property portSearchProperty, Property minThreadProperty, Property timeBetweenThreadChecksProperty, Property maxMessageSizeProperty) throws UnknownHostException {
        AccumuloConfiguration config = service.getConfiguration();
        int portHint = config.getPort(portHintProperty);
        int minThreads = 2;
        if (minThreadProperty != null) {
            minThreads = config.getCount(minThreadProperty);
        }
        long timeBetweenThreadChecks = 1000L;
        if (timeBetweenThreadChecksProperty != null) {
            timeBetweenThreadChecks = config.getTimeInMillis(timeBetweenThreadChecksProperty);
        }
        long maxMessageSize = 10000000L;
        if (maxMessageSizeProperty != null) {
            maxMessageSize = config.getMemoryInBytes(maxMessageSizeProperty);
        }
        boolean portSearch = false;
        if (portSearchProperty != null) {
            portSearch = config.getBoolean(portSearchProperty);
        }
        int simpleTimerThreadpoolSize = config.getCount(Property.GENERAL_SIMPLETIMER_THREADPOOL_SIZE);
        ThriftServerType serverType = service.getThriftServerType();
        if (ThriftServerType.SASL == serverType) {
            processor = TServerUtils.updateSaslProcessor(serverType, processor);
        }
        TimedProcessor timedProcessor = new TimedProcessor(config, processor, serverName, threadName);
        Random random = new Random();
        block2: for (int j = 0; j < 100; ++j) {
            int portsToSearch = 1;
            if (portSearch) {
                portsToSearch = 1000;
            }
            for (int i = 0; i < portsToSearch; ++i) {
                int port = portHint + i;
                if (portHint != 0 && i > 0) {
                    port = 1024 + random.nextInt(64511);
                }
                if (port > 65535) {
                    port = 1024 + port % 64511;
                }
                try {
                    HostAndPort addr = HostAndPort.fromParts((String)hostname, (int)port);
                    return TServerUtils.startTServer(addr, serverType, timedProcessor, serverName, threadName, minThreads, simpleTimerThreadpoolSize, timeBetweenThreadChecks, maxMessageSize, service.getServerSslParams(), service.getSaslParams(), service.getClientTimeoutInMillis());
                }
                catch (TTransportException ex) {
                    log.error("Unable to start TServer", (Throwable)ex);
                    if (ex.getCause() != null && ex.getCause().getClass() != BindException.class) {
                        log.error("Unable to start TServer", (Throwable)ex);
                        continue block2;
                    }
                    log.info("Unable to use port {}, retrying. (Thread Name = {})", (Object)port, (Object)threadName);
                    UtilWaitThread.sleep((long)250L);
                    continue;
                }
            }
        }
        throw new UnknownHostException("Unable to find a listen port");
    }

    public static ServerAddress createNonBlockingServer(HostAndPort address, TProcessor processor, TProtocolFactory protocolFactory, String serverName, String threadName, int numThreads, int numSTThreads, long timeBetweenThreadChecks, long maxMessageSize) throws TTransportException {
        TNonblockingServerSocket transport = new TNonblockingServerSocket(new InetSocketAddress(address.getHostText(), address.getPort()));
        THsHaServer.Args options = new THsHaServer.Args((TNonblockingServerTransport)transport);
        options.protocolFactory(protocolFactory);
        options.transportFactory(ThriftUtil.transportFactory((long)maxMessageSize));
        options.maxReadBufferBytes = maxMessageSize;
        options.stopTimeoutVal(5);
        ThreadPoolExecutor pool = TServerUtils.createSelfResizingThreadPool(serverName, numThreads, numSTThreads, timeBetweenThreadChecks);
        options.executorService((ExecutorService)pool);
        options.processorFactory(new TProcessorFactory(processor));
        if (address.getPort() == 0) {
            address = HostAndPort.fromParts((String)address.getHostText(), (int)transport.getPort());
        }
        return new ServerAddress((TServer)new CustomNonBlockingServer(options), address);
    }

    public static ThreadPoolExecutor createSelfResizingThreadPool(String serverName, int executorThreads, int simpleTimerThreads, long timeBetweenThreadChecks) {
        SimpleThreadPool pool = new SimpleThreadPool(executorThreads, "ClientPool");
        SimpleTimer.getInstance(simpleTimerThreads).schedule(new Runnable((ThreadPoolExecutor)pool, serverName, executorThreads){
            final /* synthetic */ ThreadPoolExecutor val$pool;
            final /* synthetic */ String val$serverName;
            final /* synthetic */ int val$executorThreads;
            {
                this.val$pool = threadPoolExecutor;
                this.val$serverName = string;
                this.val$executorThreads = n;
            }

            @Override
            public void run() {
                int smaller;
                if (this.val$pool.getCorePoolSize() <= this.val$pool.getActiveCount()) {
                    int larger = this.val$pool.getCorePoolSize() + Math.min(this.val$pool.getQueue().size(), 2);
                    log.info("Increasing server thread pool size on {} to {}", (Object)this.val$serverName, (Object)larger);
                    this.val$pool.setMaximumPoolSize(larger);
                    this.val$pool.setCorePoolSize(larger);
                } else if (this.val$pool.getCorePoolSize() > this.val$pool.getActiveCount() + 3 && (smaller = Math.max(this.val$executorThreads, this.val$pool.getCorePoolSize() - 1)) != this.val$pool.getCorePoolSize()) {
                    log.info("Decreasing server thread pool size on {} to {}", (Object)this.val$serverName, (Object)smaller);
                    this.val$pool.setCorePoolSize(smaller);
                }
            }
        }, timeBetweenThreadChecks, timeBetweenThreadChecks);
        return pool;
    }

    public static ServerAddress createBlockingServer(HostAndPort address, TProcessor processor, TProtocolFactory protocolFactory, long maxMessageSize, String serverName, int numThreads, int numSimpleTimerThreads, long timeBetweenThreadChecks) throws TTransportException {
        TServerSocket transport = new TServerSocket(address.getPort());
        ThreadPoolExecutor pool = TServerUtils.createSelfResizingThreadPool(serverName, numThreads, numSimpleTimerThreads, timeBetweenThreadChecks);
        TThreadPoolServer server = TServerUtils.createTThreadPoolServer((TServerTransport)transport, processor, ThriftUtil.transportFactory((long)maxMessageSize), protocolFactory, pool);
        if (address.getPort() == 0) {
            address = HostAndPort.fromParts((String)address.getHostText(), (int)transport.getServerSocket().getLocalPort());
            log.info("Blocking Server bound on {}", (Object)address);
        }
        return new ServerAddress((TServer)server, address);
    }

    public static TThreadPoolServer createTThreadPoolServer(TServerTransport transport, TProcessor processor, TTransportFactory transportFactory, TProtocolFactory protocolFactory) {
        return TServerUtils.createTThreadPoolServer(transport, processor, transportFactory, protocolFactory, null);
    }

    public static TThreadPoolServer createTThreadPoolServer(TServerTransport transport, TProcessor processor, TTransportFactory transportFactory, TProtocolFactory protocolFactory, ExecutorService service) {
        TThreadPoolServer.Args options = new TThreadPoolServer.Args(transport);
        options.protocolFactory(protocolFactory);
        options.transportFactory(transportFactory);
        options.processorFactory((TProcessorFactory)new ClientInfoProcessorFactory(clientAddress, processor));
        if (null != service) {
            options.executorService(service);
        }
        return new TThreadPoolServer(options);
    }

    public static TServerSocket getSslServerSocket(int port, int timeout, InetAddress address, SslConnectionParams params) throws TTransportException {
        TServerSocket tServerSock = params.useJsse() ? TSSLTransportFactory.getServerSocket((int)port, (int)timeout, (boolean)params.isClientAuth(), (InetAddress)address) : TSSLTransportFactory.getServerSocket((int)port, (int)timeout, (InetAddress)address, (TSSLTransportFactory.TSSLTransportParameters)params.getTTransportParams());
        ServerSocket serverSock = tServerSock.getServerSocket();
        if (serverSock instanceof SSLServerSocket) {
            SSLServerSocket sslServerSock = (SSLServerSocket)serverSock;
            Object[] protocols = params.getServerProtocols();
            HashSet<String> socketEnabledProtocols = new HashSet<String>(Arrays.asList(sslServerSock.getEnabledProtocols()));
            socketEnabledProtocols.retainAll(Arrays.asList(protocols));
            if (socketEnabledProtocols.isEmpty()) {
                throw new RuntimeException("No available protocols available for secure socket. Availaable protocols: " + Arrays.toString(sslServerSock.getEnabledProtocols()) + ", allowed protocols: " + Arrays.toString(protocols));
            }
            sslServerSock.setEnabledProtocols(socketEnabledProtocols.toArray(new String[0]));
        }
        return tServerSock;
    }

    public static ServerAddress createSslThreadPoolServer(HostAndPort address, TProcessor processor, TProtocolFactory protocolFactory, long socketTimeout, SslConnectionParams sslParams, String serverName, int numThreads, int numSimpleTimerThreads, long timeBetweenThreadChecks) throws TTransportException {
        TServerSocket transport;
        try {
            transport = TServerUtils.getSslServerSocket(address.getPort(), (int)socketTimeout, InetAddress.getByName(address.getHostText()), sslParams);
        }
        catch (UnknownHostException e) {
            throw new TTransportException((Throwable)e);
        }
        if (address.getPort() == 0) {
            address = HostAndPort.fromParts((String)address.getHostText(), (int)transport.getServerSocket().getLocalPort());
            log.info("SSL Thread Pool Server bound on {}", (Object)address);
        }
        ThreadPoolExecutor pool = TServerUtils.createSelfResizingThreadPool(serverName, numThreads, numSimpleTimerThreads, timeBetweenThreadChecks);
        return new ServerAddress((TServer)TServerUtils.createTThreadPoolServer((TServerTransport)transport, processor, ThriftUtil.transportFactory(), protocolFactory, pool), address);
    }

    public static ServerAddress createSaslThreadPoolServer(HostAndPort address, TProcessor processor, TProtocolFactory protocolFactory, long socketTimeout, SaslServerConnectionParams params, String serverName, String threadName, int numThreads, int numSTThreads, long timeBetweenThreadChecks) throws TTransportException {
        UserGroupInformation serverUser;
        String fqdn;
        String hostname;
        log.info("Creating SASL thread pool thrift server on listening on {}:{}", (Object)address.getHostText(), (Object)address.getPort());
        TServerSocket transport = new TServerSocket(address.getPort(), (int)socketTimeout);
        try {
            hostname = InetAddress.getByName(address.getHostText()).getCanonicalHostName();
            fqdn = InetAddress.getLocalHost().getCanonicalHostName();
        }
        catch (UnknownHostException e) {
            throw new TTransportException((Throwable)e);
        }
        if ("0.0.0.0".equals(hostname)) {
            hostname = fqdn;
        }
        if (!hostname.equals(fqdn)) {
            log.error("Expected hostname of '{}' but got '{}'. Ensure the entries in the Accumulo hosts files (e.g. masters, slaves) are the FQDN for each host when using SASL.", (Object)fqdn, (Object)hostname);
            throw new RuntimeException("SASL requires that the address the thrift server listens on is the same as the FQDN for this host");
        }
        try {
            serverUser = UserGroupInformation.getLoginUser();
        }
        catch (IOException e) {
            throw new TTransportException((Throwable)e);
        }
        log.debug("Logged in as {}, creating TSaslServerTransport factory with {}/{}", new Object[]{serverUser, params.getKerberosServerPrimary(), hostname});
        TSaslServerTransport.Factory saslTransportFactory = new TSaslServerTransport.Factory();
        saslTransportFactory.addServerDefinition("GSSAPI", params.getKerberosServerPrimary(), hostname, params.getSaslProperties(), (CallbackHandler)new SaslRpcServer.SaslGssCallbackHandler());
        if (null != params.getSecretManager()) {
            log.info("Adding DIGEST-MD5 server definition for delegation tokens");
            saslTransportFactory.addServerDefinition("DIGEST-MD5", params.getKerberosServerPrimary(), hostname, params.getSaslProperties(), (CallbackHandler)((Object)new SaslServerDigestCallbackHandler(params.getSecretManager())));
        } else {
            log.info("SecretManager is null, not adding support for delegation token authentication");
        }
        UGIAssumingTransportFactory ugiTransportFactory = new UGIAssumingTransportFactory((TTransportFactory)saslTransportFactory, serverUser);
        if (address.getPort() == 0) {
            address = HostAndPort.fromParts((String)address.getHostText(), (int)transport.getServerSocket().getLocalPort());
            log.info("SASL thrift server bound on {}", (Object)address);
        }
        ThreadPoolExecutor pool = TServerUtils.createSelfResizingThreadPool(serverName, numThreads, numSTThreads, timeBetweenThreadChecks);
        TThreadPoolServer server = TServerUtils.createTThreadPoolServer((TServerTransport)transport, processor, (TTransportFactory)ugiTransportFactory, protocolFactory, pool);
        return new ServerAddress((TServer)server, address);
    }

    public static ServerAddress startTServer(AccumuloConfiguration conf, HostAndPort address, ThriftServerType serverType, TProcessor processor, String serverName, String threadName, int numThreads, int numSTThreads, long timeBetweenThreadChecks, long maxMessageSize, SslConnectionParams sslParams, SaslServerConnectionParams saslParams, long serverSocketTimeout) throws TTransportException {
        if (ThriftServerType.SASL == serverType) {
            processor = TServerUtils.updateSaslProcessor(serverType, processor);
        }
        return TServerUtils.startTServer(address, serverType, new TimedProcessor(conf, processor, serverName, threadName), serverName, threadName, numThreads, numSTThreads, timeBetweenThreadChecks, maxMessageSize, sslParams, saslParams, serverSocketTimeout);
    }

    public static ServerAddress startTServer(HostAndPort address, ThriftServerType serverType, TimedProcessor processor, String serverName, String threadName, int numThreads, int numSTThreads, long timeBetweenThreadChecks, long maxMessageSize, SslConnectionParams sslParams, SaslServerConnectionParams saslParams, long serverSocketTimeout) throws TTransportException {
        return TServerUtils.startTServer(address, serverType, processor, ThriftUtil.protocolFactory(), serverName, threadName, numThreads, numSTThreads, timeBetweenThreadChecks, maxMessageSize, sslParams, saslParams, serverSocketTimeout);
    }

    public static ServerAddress startTServer(HostAndPort address, ThriftServerType serverType, TimedProcessor processor, TProtocolFactory protocolFactory, String serverName, String threadName, int numThreads, int numSTThreads, long timeBetweenThreadChecks, long maxMessageSize, SslConnectionParams sslParams, SaslServerConnectionParams saslParams, long serverSocketTimeout) throws TTransportException {
        ServerAddress serverAddress;
        Preconditions.checkArgument((sslParams == null || saslParams == null ? 1 : 0) != 0, (Object)"Cannot start a Thrift server using both SSL and SASL");
        switch (serverType) {
            case SSL: {
                log.debug("Instantiating SSL Thrift server");
                serverAddress = TServerUtils.createSslThreadPoolServer(address, processor, protocolFactory, serverSocketTimeout, sslParams, serverName, numThreads, numSTThreads, timeBetweenThreadChecks);
                break;
            }
            case SASL: {
                log.debug("Instantiating SASL Thrift server");
                serverAddress = TServerUtils.createSaslThreadPoolServer(address, processor, protocolFactory, serverSocketTimeout, saslParams, serverName, threadName, numThreads, numSTThreads, timeBetweenThreadChecks);
                break;
            }
            case THREADPOOL: {
                log.debug("Instantiating unsecure TThreadPool Thrift server");
                serverAddress = TServerUtils.createBlockingServer(address, processor, protocolFactory, maxMessageSize, serverName, numThreads, numSTThreads, timeBetweenThreadChecks);
                break;
            }
            default: {
                log.debug("Instantiating default, unsecure custom half-async Thrift server");
                serverAddress = TServerUtils.createNonBlockingServer(address, processor, protocolFactory, serverName, threadName, numThreads, numSTThreads, timeBetweenThreadChecks, maxMessageSize);
            }
        }
        final TServer finalServer = serverAddress.server;
        Runnable serveTask = new Runnable(){

            @Override
            public void run() {
                try {
                    finalServer.serve();
                }
                catch (Error e) {
                    Halt.halt("Unexpected error in TThreadPoolServer " + e + ", halting.", 1);
                }
            }
        };
        serveTask = new LoggingRunnable(log, serveTask);
        Daemon thread = new Daemon(serveTask, threadName);
        thread.start();
        if (serverAddress.address.getHostText().equals("0.0.0.0")) {
            try {
                serverAddress = new ServerAddress(finalServer, HostAndPort.fromParts((String)InetAddress.getLocalHost().getHostName(), (int)serverAddress.address.getPort()));
            }
            catch (UnknownHostException e) {
                throw new TTransportException((Throwable)e);
            }
        }
        return serverAddress;
    }

    public static void stopTServer(TServer s) {
        if (s == null) {
            return;
        }
        s.stop();
        try {
            Field f = s.getClass().getDeclaredField("executorService_");
            f.setAccessible(true);
            ExecutorService es = (ExecutorService)f.get(s);
            es.shutdownNow();
        }
        catch (Exception e) {
            log.error("Unable to call shutdownNow", (Throwable)e);
        }
    }

    private static TProcessor updateSaslProcessor(ThriftServerType serverType, TProcessor processor) {
        Preconditions.checkArgument((ThriftServerType.SASL == serverType ? 1 : 0) != 0);
        log.info("Wrapping {} in UGIAssumingProcessor", processor.getClass());
        return new UGIAssumingProcessor(processor);
    }
}

