/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.ipc;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.Closeable;
import java.io.DataOutputStream;
import java.io.IOException;
import java.lang.reflect.UndeclaredThrowableException;
import java.net.BindException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.net.Socket;
import java.net.SocketException;
import java.net.SocketOption;
import java.net.StandardSocketOptions;
import java.nio.ByteBuffer;
import java.nio.channels.CancelledKeyException;
import java.nio.channels.Channels;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.ReadableByteChannel;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.nio.channels.WritableByteChannel;
import java.nio.charset.StandardCharsets;
import java.security.PrivilegedExceptionAction;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.ListIterator;
import java.util.Map;
import java.util.Set;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import javax.security.sasl.SaslException;
import javax.security.sasl.SaslServer;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.ha.HealthCheckFailedException;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableUtils;
import org.apache.hadoop.ipc.AlignmentContext;
import org.apache.hadoop.ipc.CallQueueManager;
import org.apache.hadoop.ipc.CallerContext;
import org.apache.hadoop.ipc.DecayRpcScheduler;
import org.apache.hadoop.ipc.DefaultRpcScheduler;
import org.apache.hadoop.ipc.FairCallQueue;
import org.apache.hadoop.ipc.IpcException;
import org.apache.hadoop.ipc.ProcessingDetails;
import org.apache.hadoop.ipc.ProtobufRpcEngine2;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.ipc.ResponseBuffer;
import org.apache.hadoop.ipc.RetriableException;
import org.apache.hadoop.ipc.RpcClientUtil;
import org.apache.hadoop.ipc.RpcConstants;
import org.apache.hadoop.ipc.RpcScheduler;
import org.apache.hadoop.ipc.RpcServerException;
import org.apache.hadoop.ipc.RpcWritable;
import org.apache.hadoop.ipc.Schedulable;
import org.apache.hadoop.ipc.StandbyException;
import org.apache.hadoop.ipc.metrics.RpcDetailedMetrics;
import org.apache.hadoop.ipc.metrics.RpcMetrics;
import org.apache.hadoop.ipc.protobuf.IpcConnectionContextProtos;
import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.AccessControlException;
import org.apache.hadoop.security.SaslPropertiesResolver;
import org.apache.hadoop.security.SaslRpcServer;
import org.apache.hadoop.security.SecurityUtil;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.authorize.AuthorizationException;
import org.apache.hadoop.security.authorize.PolicyProvider;
import org.apache.hadoop.security.authorize.ProxyUsers;
import org.apache.hadoop.security.authorize.ServiceAuthorizationManager;
import org.apache.hadoop.security.token.SecretManager;
import org.apache.hadoop.security.token.TokenIdentifier;
import org.apache.hadoop.shaded.com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.hadoop.shaded.com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.thirdparty.protobuf.ByteString;
import org.apache.hadoop.thirdparty.protobuf.CodedOutputStream;
import org.apache.hadoop.thirdparty.protobuf.Message;
import org.apache.hadoop.util.ExitUtil;
import org.apache.hadoop.util.ProtoUtil;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.util.Time;
import org.apache.htrace.core.SpanId;
import org.apache.htrace.core.TraceScope;
import org.apache.htrace.core.Tracer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@InterfaceAudience.Public
@InterfaceStability.Evolving
public abstract class Server {
    private final boolean authorize;
    private List<SaslRpcServer.AuthMethod> enabledAuthMethods;
    private RpcHeaderProtos.RpcSaslProto negotiateResponse;
    private ExceptionsHandler exceptionsHandler = new ExceptionsHandler();
    private Tracer tracer;
    private AlignmentContext alignmentContext;
    private final String serverName;
    private static final ByteBuffer HTTP_GET_BYTES = ByteBuffer.wrap("GET ".getBytes(StandardCharsets.UTF_8));
    static final String RECEIVED_HTTP_REQ_RESPONSE = "HTTP/1.1 404 Not Found\r\nContent-type: text/plain\r\n\r\nIt looks like you are making an HTTP request to a Hadoop IPC port. This is not the correct port for the web interface on this daemon.\r\n";
    static int INITIAL_RESP_BUF_SIZE = 10240;
    static Map<RPC.RpcKind, RpcKindMapValue> rpcKindMap = new HashMap<RPC.RpcKind, RpcKindMapValue>(4);
    public static final Logger LOG = LoggerFactory.getLogger(Server.class);
    public static final Logger AUDITLOG = LoggerFactory.getLogger((String)("SecurityLogger." + Server.class.getName()));
    private static final String AUTH_FAILED_FOR = "Auth failed for ";
    private static final String AUTH_SUCCESSFUL_FOR = "Auth successful for ";
    private static final ThreadLocal<Server> SERVER = new ThreadLocal();
    private static final Map<String, Class<?>> PROTOCOL_CACHE = new ConcurrentHashMap();
    private static final ThreadLocal<Call> CurCall = new ThreadLocal();
    private String bindAddress;
    private int port;
    private int handlerCount;
    private int readThreads;
    private int readerPendingConnectionQueue;
    private Class<? extends Writable> rpcRequestClass;
    protected final RpcMetrics rpcMetrics;
    protected final RpcDetailedMetrics rpcDetailedMetrics;
    private Configuration conf;
    private String portRangeConfig = null;
    private SecretManager<TokenIdentifier> secretManager;
    private SaslPropertiesResolver saslPropsResolver;
    private ServiceAuthorizationManager serviceAuthorizationManager = new ServiceAuthorizationManager();
    private int maxQueueSize;
    private final int maxRespSize;
    private final ThreadLocal<ResponseBuffer> responseBuffer = new ThreadLocal<ResponseBuffer>(){

        @Override
        protected ResponseBuffer initialValue() {
            return new ResponseBuffer(INITIAL_RESP_BUF_SIZE);
        }
    };
    private int socketSendBufferSize;
    private final int maxDataLength;
    private final boolean tcpNoDelay;
    private volatile boolean running = true;
    private CallQueueManager<Call> callQueue;
    private ConnectionManager connectionManager;
    private Listener listener = null;
    private Map<Integer, Listener> auxiliaryListenerMap;
    private Responder responder = null;
    private Handler[] handlers = null;
    private boolean logSlowRPC = false;
    private static final long PURGE_INTERVAL_NANOS = TimeUnit.NANOSECONDS.convert(15L, TimeUnit.MINUTES);
    private static int NIO_BUFFER_LIMIT = 8192;

    public void addTerseExceptions(Class<?> ... exceptionClass) {
        this.exceptionsHandler.addTerseLoggingExceptions(exceptionClass);
    }

    public void addSuppressedLoggingExceptions(Class<?> ... exceptionClass) {
        this.exceptionsHandler.addSuppressedLoggingExceptions(exceptionClass);
    }

    public void setAlignmentContext(AlignmentContext alignmentContext) {
        this.alignmentContext = alignmentContext;
    }

    public static void registerProtocolEngine(RPC.RpcKind rpcKind, Class<? extends Writable> rpcRequestWrapperClass, RPC.RpcInvoker rpcInvoker) {
        RpcKindMapValue old = rpcKindMap.put(rpcKind, new RpcKindMapValue(rpcRequestWrapperClass, rpcInvoker));
        if (old != null) {
            rpcKindMap.put(rpcKind, old);
            throw new IllegalArgumentException("ReRegistration of rpcKind: " + (Object)((Object)rpcKind));
        }
        if (LOG.isDebugEnabled()) {
            LOG.debug("rpcKind=" + (Object)((Object)rpcKind) + ", rpcRequestWrapperClass=" + rpcRequestWrapperClass + ", rpcInvoker=" + rpcInvoker);
        }
    }

    public Class<? extends Writable> getRpcRequestWrapper(RpcHeaderProtos.RpcKindProto rpcKind) {
        if (this.rpcRequestClass != null) {
            return this.rpcRequestClass;
        }
        RpcKindMapValue val = rpcKindMap.get((Object)ProtoUtil.convert(rpcKind));
        return val == null ? null : val.rpcRequestWrapperClass;
    }

    protected RPC.RpcInvoker getServerRpcInvoker(RPC.RpcKind rpcKind) {
        return Server.getRpcInvoker(rpcKind);
    }

    public static RPC.RpcInvoker getRpcInvoker(RPC.RpcKind rpcKind) {
        RpcKindMapValue val = rpcKindMap.get((Object)rpcKind);
        return val == null ? null : val.rpcInvoker;
    }

    static Class<?> getProtocolClass(String protocolName, Configuration conf) throws ClassNotFoundException {
        Class<?> protocol = PROTOCOL_CACHE.get(protocolName);
        if (protocol == null) {
            protocol = conf.getClassByName(protocolName);
            PROTOCOL_CACHE.put(protocolName, protocol);
        }
        return protocol;
    }

    public static Server get() {
        return SERVER.get();
    }

    @VisibleForTesting
    public static ThreadLocal<Call> getCurCall() {
        return CurCall;
    }

    public static int getCallId() {
        Call call = CurCall.get();
        return call != null ? call.callId : -2;
    }

    public static int getCallRetryCount() {
        Call call = CurCall.get();
        return call != null ? call.retryCount : -1;
    }

    public static InetAddress getRemoteIp() {
        Call call = CurCall.get();
        return call != null ? call.getHostInetAddress() : null;
    }

    public static String getAuxiliaryPortEstablishedQOP() {
        Call call = CurCall.get();
        if (!(call instanceof RpcCall)) {
            return null;
        }
        RpcCall rpcCall = (RpcCall)call;
        if (rpcCall.connection.isOnAuxiliaryPort()) {
            return rpcCall.connection.getEstablishedQOP();
        }
        return null;
    }

    public static byte[] getClientId() {
        Call call = CurCall.get();
        return call != null ? call.clientId : RpcConstants.DUMMY_CLIENT_ID;
    }

    public static String getRemoteAddress() {
        InetAddress addr = Server.getRemoteIp();
        return addr == null ? null : addr.getHostAddress();
    }

    public static UserGroupInformation getRemoteUser() {
        Call call = CurCall.get();
        return call != null ? call.getRemoteUser() : null;
    }

    public static String getProtocol() {
        Call call = CurCall.get();
        return call != null ? call.getProtocol() : null;
    }

    public static boolean isRpcInvocation() {
        return CurCall.get() != null;
    }

    public static int getPriorityLevel() {
        Call call = CurCall.get();
        return call != null ? call.getPriorityLevel() : 0;
    }

    protected boolean isLogSlowRPC() {
        return this.logSlowRPC;
    }

    @VisibleForTesting
    protected void setLogSlowRPC(boolean logSlowRPCFlag) {
        this.logSlowRPC = logSlowRPCFlag;
    }

    void logSlowRpcCalls(String methodName, Call call, ProcessingDetails details) {
        int deviation = 3;
        int minSampleSize = 1024;
        double threeSigma = this.rpcMetrics.getProcessingMean() + this.rpcMetrics.getProcessingStdDev() * 3.0;
        long processingTime = details.get(ProcessingDetails.Timing.PROCESSING, RpcMetrics.TIMEUNIT);
        if (this.rpcMetrics.getProcessingSampleCount() > 1024L && (double)processingTime > threeSigma) {
            LOG.warn("Slow RPC : {} took {} {} to process from client {}, the processing detail is {}", new Object[]{methodName, processingTime, RpcMetrics.TIMEUNIT, call, details.toString()});
            this.rpcMetrics.incrSlowRpc();
        }
    }

    void updateMetrics(Call call, long startTime, boolean connDropped) {
        long deltaNanos = Time.monotonicNowNanos() - startTime;
        long timestampNanos = call.timestampNanos;
        ProcessingDetails details = call.getProcessingDetails();
        details.set(ProcessingDetails.Timing.QUEUE, startTime - timestampNanos - details.get(ProcessingDetails.Timing.ENQUEUE));
        deltaNanos -= details.get(ProcessingDetails.Timing.PROCESSING);
        details.set(ProcessingDetails.Timing.HANDLER, deltaNanos -= details.get(ProcessingDetails.Timing.RESPONSE));
        long queueTime = details.get(ProcessingDetails.Timing.QUEUE, RpcMetrics.TIMEUNIT);
        this.rpcMetrics.addRpcQueueTime(queueTime);
        if (call.isResponseDeferred() || connDropped) {
            return;
        }
        long processingTime = details.get(ProcessingDetails.Timing.PROCESSING, RpcMetrics.TIMEUNIT);
        long waitTime = details.get(ProcessingDetails.Timing.LOCKWAIT, RpcMetrics.TIMEUNIT);
        this.rpcMetrics.addRpcLockWaitTime(waitTime);
        this.rpcMetrics.addRpcProcessingTime(processingTime);
        String name = call.getDetailedMetricsName();
        this.rpcDetailedMetrics.addProcessingTime(name, processingTime -= waitTime);
        this.callQueue.addResponseTime(name, call, details);
        if (this.isLogSlowRPC()) {
            this.logSlowRpcCalls(name, call, details);
        }
    }

    void updateDeferredMetrics(String name, long processingTime) {
        this.rpcMetrics.addDeferredRpcProcessingTime(processingTime);
        this.rpcDetailedMetrics.addDeferredProcessingTime(name, processingTime);
    }

    public static void bind(ServerSocket socket, InetSocketAddress address, int backlog) throws IOException {
        Server.bind(socket, address, backlog, null, null);
    }

    public static void bind(ServerSocket socket, InetSocketAddress address, int backlog, Configuration conf, String rangeConf) throws IOException {
        try {
            Configuration.IntegerRanges range = null;
            if (rangeConf != null) {
                range = conf.getRange(rangeConf, "");
            }
            if (range == null || range.isEmpty() || address.getPort() != 0) {
                socket.bind(address, backlog);
            } else {
                for (Integer port : range) {
                    if (socket.isBound()) break;
                    try {
                        InetSocketAddress temp = new InetSocketAddress(address.getAddress(), (int)port);
                        socket.bind(temp, backlog);
                    }
                    catch (BindException bindException) {}
                }
                if (!socket.isBound()) {
                    throw new BindException("Could not find a free port in " + range);
                }
            }
        }
        catch (SocketException e) {
            throw NetUtils.wrapException(null, 0, address.getHostName(), address.getPort(), e);
        }
    }

    @VisibleForTesting
    public RpcMetrics getRpcMetrics() {
        return this.rpcMetrics;
    }

    @VisibleForTesting
    public RpcDetailedMetrics getRpcDetailedMetrics() {
        return this.rpcDetailedMetrics;
    }

    @VisibleForTesting
    Iterable<? extends Thread> getHandlers() {
        return Arrays.asList(this.handlers);
    }

    @VisibleForTesting
    Connection[] getConnections() {
        return this.connectionManager.toArray();
    }

    public void refreshServiceAcl(Configuration conf, PolicyProvider provider) {
        this.serviceAuthorizationManager.refresh(conf, provider);
    }

    @InterfaceAudience.Private
    public void refreshServiceAclWithLoadedConfiguration(Configuration conf, PolicyProvider provider) {
        this.serviceAuthorizationManager.refreshWithLoadedConfiguration(conf, provider);
    }

    @InterfaceAudience.LimitedPrivate(value={"HDFS", "MapReduce"})
    public ServiceAuthorizationManager getServiceAuthorizationManager() {
        return this.serviceAuthorizationManager;
    }

    private String getQueueClassPrefix() {
        return "ipc." + this.port;
    }

    static Class<? extends BlockingQueue<Call>> getQueueClass(String prefix, Configuration conf) {
        String name = prefix + "." + "callqueue.impl";
        Class<?> queueClass = conf.getClass(name, LinkedBlockingQueue.class);
        return CallQueueManager.convertQueueClass(queueClass, Call.class);
    }

    static Class<? extends RpcScheduler> getSchedulerClass(String prefix, Configuration conf) {
        String queueKeyName;
        Class<?> queueClass;
        String schedulerKeyname = prefix + "." + "scheduler.impl";
        Class<?> schedulerClass = conf.getClass(schedulerKeyname, null);
        if (schedulerClass == null && (queueClass = conf.getClass(queueKeyName = prefix + "." + "callqueue.impl", null)) != null && queueClass.getCanonicalName().equals(FairCallQueue.class.getCanonicalName())) {
            conf.setClass(schedulerKeyname, DecayRpcScheduler.class, RpcScheduler.class);
        }
        schedulerClass = conf.getClass(schedulerKeyname, DefaultRpcScheduler.class);
        return CallQueueManager.convertSchedulerClass(schedulerClass);
    }

    public synchronized void refreshCallQueue(Configuration conf) {
        String prefix = this.getQueueClassPrefix();
        this.maxQueueSize = this.handlerCount * conf.getInt("ipc.server.handler.queue.size", 100);
        this.callQueue.swapQueue(Server.getSchedulerClass(prefix, conf), Server.getQueueClass(prefix, conf), this.maxQueueSize, prefix, conf);
        this.callQueue.setClientBackoffEnabled(Server.getClientBackoffEnable(prefix, conf));
    }

    static boolean getClientBackoffEnable(String prefix, Configuration conf) {
        String name = prefix + "." + "backoff.enable";
        return conf.getBoolean(name, false);
    }

    public void queueCall(Call call) throws IOException, InterruptedException {
        try {
            this.internalQueueCall(call);
        }
        catch (RpcServerException rse) {
            throw (IOException)rse.getCause();
        }
    }

    private void internalQueueCall(Call call) throws IOException, InterruptedException {
        this.internalQueueCall(call, true);
    }

    private void internalQueueCall(Call call, boolean blocking) throws IOException, InterruptedException {
        try {
            if (blocking) {
                this.callQueue.put(call);
            } else {
                this.callQueue.add(call);
            }
            long deltaNanos = Time.monotonicNowNanos() - call.timestampNanos;
            call.getProcessingDetails().set(ProcessingDetails.Timing.ENQUEUE, deltaNanos, TimeUnit.NANOSECONDS);
        }
        catch (CallQueueManager.CallQueueOverflowException cqe) {
            this.rpcMetrics.incrClientBackoff();
            throw cqe.getCause();
        }
    }

    @VisibleForTesting
    void logException(Logger logger, Throwable e, Call call) {
        if (this.exceptionsHandler.isSuppressedLog(e.getClass())) {
            return;
        }
        String logMsg = Thread.currentThread().getName() + ", call " + call;
        if (this.exceptionsHandler.isTerseLog(e.getClass())) {
            logger.info(logMsg + ": " + e);
        } else if (e instanceof RuntimeException || e instanceof Error) {
            logger.warn(logMsg, e);
        } else {
            logger.info(logMsg, e);
        }
    }

    protected Server(String bindAddress, int port, Class<? extends Writable> paramClass, int handlerCount, Configuration conf) throws IOException {
        this(bindAddress, port, paramClass, handlerCount, -1, -1, conf, Integer.toString(port), null, null);
    }

    protected Server(String bindAddress, int port, Class<? extends Writable> rpcRequestClass, int handlerCount, int numReaders, int queueSizePerHandler, Configuration conf, String serverName, SecretManager<? extends TokenIdentifier> secretManager) throws IOException {
        this(bindAddress, port, rpcRequestClass, handlerCount, numReaders, queueSizePerHandler, conf, serverName, secretManager, null);
    }

    protected Server(String bindAddress, int port, Class<? extends Writable> rpcRequestClass, int handlerCount, int numReaders, int queueSizePerHandler, Configuration conf, String serverName, SecretManager<? extends TokenIdentifier> secretManager, String portRangeConfig) throws IOException {
        this.bindAddress = bindAddress;
        this.conf = conf;
        this.portRangeConfig = portRangeConfig;
        this.port = port;
        this.rpcRequestClass = rpcRequestClass;
        this.handlerCount = handlerCount;
        this.socketSendBufferSize = 0;
        this.serverName = serverName;
        this.auxiliaryListenerMap = null;
        this.maxDataLength = conf.getInt("ipc.maximum.data.length", 0x8000000);
        this.maxQueueSize = queueSizePerHandler != -1 ? handlerCount * queueSizePerHandler : handlerCount * conf.getInt("ipc.server.handler.queue.size", 100);
        this.maxRespSize = conf.getInt("ipc.server.max.response.size", 0x100000);
        this.readThreads = numReaders != -1 ? numReaders : conf.getInt("ipc.server.read.threadpool.size", 1);
        this.readerPendingConnectionQueue = conf.getInt("ipc.server.read.connection-queue.size", 100);
        String prefix = this.getQueueClassPrefix();
        this.callQueue = new CallQueueManager(Server.getQueueClass(prefix, conf), Server.getSchedulerClass(prefix, conf), Server.getClientBackoffEnable(prefix, conf), this.maxQueueSize, prefix, conf);
        this.secretManager = secretManager;
        this.authorize = conf.getBoolean("hadoop.security.authorization", false);
        this.enabledAuthMethods = this.getAuthMethods(secretManager, conf);
        this.negotiateResponse = this.buildNegotiateResponse(this.enabledAuthMethods);
        this.listener = new Listener(port);
        this.port = this.listener.getAddress().getPort();
        this.connectionManager = new ConnectionManager();
        this.rpcMetrics = RpcMetrics.create(this, conf);
        this.rpcDetailedMetrics = RpcDetailedMetrics.create(this.port);
        this.tcpNoDelay = conf.getBoolean("ipc.server.tcpnodelay", true);
        this.setLogSlowRPC(conf.getBoolean("ipc.server.log.slow.rpc", false));
        this.responder = new Responder();
        if (secretManager != null || UserGroupInformation.isSecurityEnabled()) {
            SaslRpcServer.init(conf);
            this.saslPropsResolver = SaslPropertiesResolver.getInstance(conf);
        }
        this.exceptionsHandler.addTerseLoggingExceptions(StandbyException.class);
        this.exceptionsHandler.addTerseLoggingExceptions(HealthCheckFailedException.class);
    }

    public synchronized void addAuxiliaryListener(int auxiliaryPort) throws IOException {
        if (this.auxiliaryListenerMap == null) {
            this.auxiliaryListenerMap = new HashMap<Integer, Listener>();
        }
        if (this.auxiliaryListenerMap.containsKey(auxiliaryPort) && auxiliaryPort != 0) {
            throw new IOException("There is already a listener binding to: " + auxiliaryPort);
        }
        Listener newListener = new Listener(auxiliaryPort);
        newListener.setIsAuxiliary();
        LOG.info("Adding a server listener on port " + newListener.getAddress().getPort());
        this.auxiliaryListenerMap.put(newListener.getAddress().getPort(), newListener);
    }

    private RpcHeaderProtos.RpcSaslProto buildNegotiateResponse(List<SaslRpcServer.AuthMethod> authMethods) throws IOException {
        RpcHeaderProtos.RpcSaslProto.Builder negotiateBuilder = RpcHeaderProtos.RpcSaslProto.newBuilder();
        if (authMethods.contains((Object)SaslRpcServer.AuthMethod.SIMPLE) && authMethods.size() == 1) {
            negotiateBuilder.setState(RpcHeaderProtos.RpcSaslProto.SaslState.SUCCESS);
        } else {
            negotiateBuilder.setState(RpcHeaderProtos.RpcSaslProto.SaslState.NEGOTIATE);
            for (SaslRpcServer.AuthMethod authMethod : authMethods) {
                SaslRpcServer saslRpcServer = new SaslRpcServer(authMethod);
                RpcHeaderProtos.RpcSaslProto.SaslAuth.Builder builder = negotiateBuilder.addAuthsBuilder().setMethod(authMethod.toString()).setMechanism(saslRpcServer.mechanism);
                if (saslRpcServer.protocol != null) {
                    builder.setProtocol(saslRpcServer.protocol);
                }
                if (saslRpcServer.serverId == null) continue;
                builder.setServerId(saslRpcServer.serverId);
            }
        }
        return negotiateBuilder.build();
    }

    private List<SaslRpcServer.AuthMethod> getAuthMethods(SecretManager<?> secretManager, Configuration conf) {
        UserGroupInformation.AuthenticationMethod confAuthenticationMethod = SecurityUtil.getAuthenticationMethod(conf);
        ArrayList<SaslRpcServer.AuthMethod> authMethods = new ArrayList<SaslRpcServer.AuthMethod>();
        if (confAuthenticationMethod == UserGroupInformation.AuthenticationMethod.TOKEN) {
            if (secretManager == null) {
                throw new IllegalArgumentException((Object)((Object)UserGroupInformation.AuthenticationMethod.TOKEN) + " authentication requires a secret manager");
            }
        } else if (secretManager != null) {
            LOG.debug((Object)((Object)UserGroupInformation.AuthenticationMethod.TOKEN) + " authentication enabled for secret manager");
            authMethods.add(UserGroupInformation.AuthenticationMethod.TOKEN.getAuthMethod());
        }
        authMethods.add(confAuthenticationMethod.getAuthMethod());
        LOG.debug("Server accepts auth methods:" + authMethods);
        return authMethods;
    }

    private void closeConnection(Connection connection) {
        this.connectionManager.close(connection);
    }

    private void setupResponse(RpcCall call, RpcHeaderProtos.RpcResponseHeaderProto.RpcStatusProto status, RpcHeaderProtos.RpcResponseHeaderProto.RpcErrorCodeProto erCode, Writable rv, String errorClass, String error) throws IOException {
        if (status == RpcHeaderProtos.RpcResponseHeaderProto.RpcStatusProto.FATAL) {
            call.connection.setShouldClose();
        }
        RpcHeaderProtos.RpcResponseHeaderProto.Builder headerBuilder = RpcHeaderProtos.RpcResponseHeaderProto.newBuilder();
        headerBuilder.setClientId(ByteString.copyFrom((byte[])call.clientId));
        headerBuilder.setCallId(call.callId);
        headerBuilder.setRetryCount(call.retryCount);
        headerBuilder.setStatus(status);
        headerBuilder.setServerIpcVersionNum(9);
        if (this.alignmentContext != null) {
            this.alignmentContext.updateResponseState(headerBuilder);
        }
        if (status == RpcHeaderProtos.RpcResponseHeaderProto.RpcStatusProto.SUCCESS) {
            RpcHeaderProtos.RpcResponseHeaderProto header = headerBuilder.build();
            try {
                this.setupResponse(call, header, rv);
            }
            catch (Throwable t) {
                LOG.warn("Error serializing call response for call " + call, t);
                this.setupResponse(call, RpcHeaderProtos.RpcResponseHeaderProto.RpcStatusProto.ERROR, RpcHeaderProtos.RpcResponseHeaderProto.RpcErrorCodeProto.ERROR_SERIALIZING_RESPONSE, null, t.getClass().getName(), StringUtils.stringifyException(t));
                return;
            }
        } else {
            headerBuilder.setExceptionClassName(errorClass);
            headerBuilder.setErrorMsg(error);
            headerBuilder.setErrorDetail(erCode);
            this.setupResponse(call, headerBuilder.build(), null);
        }
    }

    private void setupResponse(RpcCall call, RpcHeaderProtos.RpcResponseHeaderProto header, Writable rv) throws IOException {
        byte[] response = rv == null || rv instanceof RpcWritable.ProtobufWrapper ? this.setupResponseForProtobuf(header, rv) : this.setupResponseForWritable(header, rv);
        if (response.length > this.maxRespSize) {
            LOG.warn("Large response size " + response.length + " for call " + call.toString());
        }
        call.setResponse(ByteBuffer.wrap(response));
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private byte[] setupResponseForWritable(RpcHeaderProtos.RpcResponseHeaderProto header, Writable rv) throws IOException {
        ResponseBuffer buf = this.responseBuffer.get().reset();
        try {
            RpcWritable.wrap(header).writeTo(buf);
            if (rv != null) {
                RpcWritable.wrap(rv).writeTo(buf);
            }
            byte[] byArray = buf.toByteArray();
            return byArray;
        }
        finally {
            if (buf.capacity() > this.maxRespSize) {
                buf.setCapacity(INITIAL_RESP_BUF_SIZE);
            }
        }
    }

    private byte[] setupResponseForProtobuf(RpcHeaderProtos.RpcResponseHeaderProto header, Writable rv) throws IOException {
        Message payload = rv != null ? ((RpcWritable.ProtobufWrapper)rv).getMessage() : null;
        int length = Server.getDelimitedLength((Message)header);
        if (payload != null) {
            length += Server.getDelimitedLength(payload);
        }
        byte[] buf = new byte[length + 4];
        CodedOutputStream cos = CodedOutputStream.newInstance((byte[])buf);
        cos.writeRawByte((byte)(length >>> 24 & 0xFF));
        cos.writeRawByte((byte)(length >>> 16 & 0xFF));
        cos.writeRawByte((byte)(length >>> 8 & 0xFF));
        cos.writeRawByte((byte)(length >>> 0 & 0xFF));
        cos.writeUInt32NoTag(header.getSerializedSize());
        header.writeTo(cos);
        if (payload != null) {
            cos.writeUInt32NoTag(payload.getSerializedSize());
            payload.writeTo(cos);
        }
        return buf;
    }

    private static int getDelimitedLength(Message message) {
        int length = message.getSerializedSize();
        return length + CodedOutputStream.computeUInt32SizeNoTag((int)length);
    }

    private void setupResponseOldVersionFatal(ByteArrayOutputStream response, RpcCall call, Writable rv, String errorClass, String error) throws IOException {
        int OLD_VERSION_FATAL_STATUS = -1;
        response.reset();
        DataOutputStream out = new DataOutputStream(response);
        out.writeInt(call.callId);
        out.writeInt(-1);
        WritableUtils.writeString(out, errorClass);
        WritableUtils.writeString(out, error);
        call.setResponse(ByteBuffer.wrap(response.toByteArray()));
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void wrapWithSasl(RpcCall call) throws IOException {
        if (call.connection.saslServer != null) {
            byte[] token = call.rpcResponse.array();
            SaslServer saslServer = call.connection.saslServer;
            synchronized (saslServer) {
                token = call.connection.saslServer.wrap(token, 0, token.length);
            }
            if (LOG.isDebugEnabled()) {
                LOG.debug("Adding saslServer wrapped token of size " + token.length + " as call response.");
            }
            RpcHeaderProtos.RpcResponseHeaderProto saslHeader = RpcHeaderProtos.RpcResponseHeaderProto.newBuilder().setCallId(AuthProtocol.SASL.callId).setStatus(RpcHeaderProtos.RpcResponseHeaderProto.RpcStatusProto.SUCCESS).build();
            RpcHeaderProtos.RpcSaslProto saslMessage = RpcHeaderProtos.RpcSaslProto.newBuilder().setState(RpcHeaderProtos.RpcSaslProto.SaslState.WRAP).setToken(ByteString.copyFrom((byte[])token)).build();
            this.setupResponse(call, saslHeader, RpcWritable.wrap(saslMessage));
        }
    }

    Configuration getConf() {
        return this.conf;
    }

    public void setSocketSendBufSize(int size) {
        this.socketSendBufferSize = size;
    }

    public void setTracer(Tracer t) {
        this.tracer = t;
    }

    public synchronized void start() {
        this.responder.start();
        this.listener.start();
        if (this.auxiliaryListenerMap != null && this.auxiliaryListenerMap.size() > 0) {
            for (Listener newListener : this.auxiliaryListenerMap.values()) {
                newListener.start();
            }
        }
        this.handlers = new Handler[this.handlerCount];
        for (int i = 0; i < this.handlerCount; ++i) {
            this.handlers[i] = new Handler(i);
            this.handlers[i].start();
        }
    }

    public synchronized void stop() {
        LOG.info("Stopping server on " + this.port);
        this.running = false;
        if (this.handlers != null) {
            for (int i = 0; i < this.handlerCount; ++i) {
                if (this.handlers[i] == null) continue;
                this.handlers[i].interrupt();
            }
        }
        this.listener.interrupt();
        this.listener.doStop();
        if (this.auxiliaryListenerMap != null && this.auxiliaryListenerMap.size() > 0) {
            for (Listener newListener : this.auxiliaryListenerMap.values()) {
                newListener.interrupt();
                newListener.doStop();
            }
        }
        this.responder.interrupt();
        this.notifyAll();
        this.rpcMetrics.shutdown();
        this.rpcDetailedMetrics.shutdown();
    }

    public synchronized void join() throws InterruptedException {
        while (this.running) {
            this.wait();
        }
    }

    public synchronized InetSocketAddress getListenerAddress() {
        return this.listener.getAddress();
    }

    public synchronized Set<InetSocketAddress> getAuxiliaryListenerAddresses() {
        HashSet<InetSocketAddress> allAddrs = new HashSet<InetSocketAddress>();
        if (this.auxiliaryListenerMap != null && this.auxiliaryListenerMap.size() > 0) {
            for (Listener auxListener : this.auxiliaryListenerMap.values()) {
                allAddrs.add(auxListener.getAddress());
            }
        }
        return allAddrs;
    }

    @Deprecated
    public Writable call(Writable param, long receiveTime) throws Exception {
        return this.call(RPC.RpcKind.RPC_BUILTIN, null, param, receiveTime);
    }

    public abstract Writable call(RPC.RpcKind var1, String var2, Writable var3, long var4) throws Exception;

    private void authorize(UserGroupInformation user, String protocolName, InetAddress addr) throws AuthorizationException {
        if (this.authorize) {
            if (protocolName == null) {
                throw new AuthorizationException("Null protocol not authorized");
            }
            Class<?> protocol = null;
            try {
                protocol = Server.getProtocolClass(protocolName, this.getConf());
            }
            catch (ClassNotFoundException cfne) {
                throw new AuthorizationException("Unknown protocol: " + protocolName);
            }
            this.serviceAuthorizationManager.authorize(user, protocol, this.getConf(), addr);
        }
    }

    public int getPort() {
        return this.port;
    }

    public int getNumOpenConnections() {
        return this.connectionManager.size();
    }

    public String getNumOpenConnectionsPerUser() {
        ObjectMapper mapper = new ObjectMapper();
        try {
            return mapper.writeValueAsString(this.connectionManager.getUserToConnectionsMap());
        }
        catch (IOException iOException) {
            return null;
        }
    }

    public long getNumDroppedConnections() {
        return this.connectionManager.getDroppedConnections();
    }

    public int getCallQueueLen() {
        return this.callQueue.size();
    }

    public boolean isClientBackoffEnabled() {
        return this.callQueue.isClientBackoffEnabled();
    }

    public void setClientBackoffEnabled(boolean value) {
        this.callQueue.setClientBackoffEnabled(value);
    }

    public int getMaxQueueSize() {
        return this.maxQueueSize;
    }

    public int getNumReaders() {
        return this.readThreads;
    }

    private int channelWrite(WritableByteChannel channel, ByteBuffer buffer) throws IOException {
        int count;
        int n = count = buffer.remaining() <= NIO_BUFFER_LIMIT ? channel.write(buffer) : Server.channelIO(null, channel, buffer);
        if (count > 0) {
            this.rpcMetrics.incrSentBytes(count);
        }
        return count;
    }

    private int channelRead(ReadableByteChannel channel, ByteBuffer buffer) throws IOException {
        int count;
        int n = count = buffer.remaining() <= NIO_BUFFER_LIMIT ? channel.read(buffer) : Server.channelIO(channel, null, buffer);
        if (count > 0) {
            this.rpcMetrics.incrReceivedBytes(count);
        }
        return count;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private static int channelIO(ReadableByteChannel readCh, WritableByteChannel writeCh, ByteBuffer buf) throws IOException {
        int nBytes;
        int originalLimit = buf.limit();
        int initialRemaining = buf.remaining();
        int ret = 0;
        while (buf.remaining() > 0) {
            try {
                int ioSize = Math.min(buf.remaining(), NIO_BUFFER_LIMIT);
                buf.limit(buf.position() + ioSize);
                ret = readCh == null ? writeCh.write(buf) : readCh.read(buf);
                if (ret >= ioSize) continue;
                break;
            }
            finally {
                buf.limit(originalLimit);
            }
        }
        return (nBytes = initialRemaining - buf.remaining()) > 0 ? nBytes : ret;
    }

    protected int getMaxIdleTime() {
        return this.connectionManager.maxIdleTime;
    }

    public String getServerName() {
        return this.serverName;
    }

    private class ConnectionManager {
        private final AtomicInteger count = new AtomicInteger();
        private final AtomicLong droppedConnections = new AtomicLong();
        private final Set<Connection> connections;
        private final Map<String, Integer> userToConnectionsMap;
        private final Object userToConnectionsMapLock = new Object();
        private final Timer idleScanTimer;
        private final int idleScanThreshold;
        private final int idleScanInterval;
        private final int maxIdleTime;
        private final int maxIdleToClose;
        private final int maxConnections;

        ConnectionManager() {
            this.idleScanTimer = new Timer("IPC Server idle connection scanner for port " + Server.this.getPort(), true);
            this.idleScanThreshold = Server.this.conf.getInt("ipc.client.idlethreshold", 4000);
            this.idleScanInterval = Server.this.conf.getInt("ipc.client.connection.idle-scan-interval.ms", 10000);
            this.maxIdleTime = 2 * Server.this.conf.getInt("ipc.client.connection.maxidletime", 10000);
            this.maxIdleToClose = Server.this.conf.getInt("ipc.client.kill.max", 10);
            this.maxConnections = Server.this.conf.getInt("ipc.server.max.connections", 0);
            this.connections = Collections.newSetFromMap(new ConcurrentHashMap(Server.this.maxQueueSize, 0.75f, Server.this.readThreads + 2));
            this.userToConnectionsMap = new ConcurrentHashMap<String, Integer>();
        }

        private boolean add(Connection connection) {
            boolean added = this.connections.add(connection);
            if (added) {
                this.count.getAndIncrement();
            }
            return added;
        }

        private boolean remove(Connection connection) {
            boolean removed = this.connections.remove(connection);
            if (removed) {
                this.count.getAndDecrement();
            }
            return removed;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        void incrUserConnections(String user) {
            Object object = this.userToConnectionsMapLock;
            synchronized (object) {
                Integer count = this.userToConnectionsMap.get(user);
                if (count == null) {
                    count = 1;
                } else {
                    Integer n = count;
                    Integer n2 = count = Integer.valueOf(count + 1);
                }
                this.userToConnectionsMap.put(user, count);
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        void decrUserConnections(String user) {
            Object object = this.userToConnectionsMapLock;
            synchronized (object) {
                Integer count = this.userToConnectionsMap.get(user);
                if (count == null) {
                    return;
                }
                Integer n = count;
                Integer n2 = count = Integer.valueOf(count - 1);
                if (count == 0) {
                    this.userToConnectionsMap.remove(user);
                } else {
                    this.userToConnectionsMap.put(user, count);
                }
            }
        }

        Map<String, Integer> getUserToConnectionsMap() {
            return this.userToConnectionsMap;
        }

        long getDroppedConnections() {
            return this.droppedConnections.get();
        }

        int size() {
            return this.count.get();
        }

        boolean isFull() {
            return this.maxConnections > 0 && this.size() >= this.maxConnections;
        }

        Connection[] toArray() {
            return this.connections.toArray(new Connection[0]);
        }

        Connection register(SocketChannel channel, int ingressPort, boolean isOnAuxiliaryPort) {
            if (this.isFull()) {
                return null;
            }
            Connection connection = new Connection(channel, Time.now(), ingressPort, isOnAuxiliaryPort);
            this.add(connection);
            if (LOG.isDebugEnabled()) {
                LOG.debug("Server connection from " + connection + "; # active connections: " + this.size() + "; # queued calls: " + Server.this.callQueue.size());
            }
            return connection;
        }

        boolean close(Connection connection) {
            boolean exists = this.remove(connection);
            if (exists) {
                if (LOG.isDebugEnabled()) {
                    LOG.debug(Thread.currentThread().getName() + ": disconnecting client " + connection + ". Number of active connections: " + this.size());
                }
                connection.close();
                if (connection.user != null && connection.connectionContextRead) {
                    this.decrUserConnections(connection.user.getShortUserName());
                }
            }
            return exists;
        }

        synchronized void closeIdle(boolean scanAll) {
            long minLastContact = Time.now() - (long)this.maxIdleTime;
            int closed = 0;
            for (Connection connection : this.connections) {
                if ((scanAll || this.size() >= this.idleScanThreshold) && (!connection.isIdle() || connection.getLastContact() >= minLastContact || !this.close(connection) || scanAll || ++closed != this.maxIdleToClose)) continue;
                break;
            }
        }

        void closeAll() {
            for (Connection connection : this.toArray()) {
                this.close(connection);
            }
        }

        void startIdleScan() {
            this.scheduleIdleScanTask();
        }

        void stopIdleScan() {
            this.idleScanTimer.cancel();
        }

        private void scheduleIdleScanTask() {
            if (!Server.this.running) {
                return;
            }
            TimerTask idleScanTask = new TimerTask(){

                @Override
                public void run() {
                    if (!Server.this.running) {
                        return;
                    }
                    if (LOG.isDebugEnabled()) {
                        LOG.debug(Thread.currentThread().getName() + ": task running");
                    }
                    try {
                        ConnectionManager.this.closeIdle(false);
                    }
                    finally {
                        ConnectionManager.this.scheduleIdleScanTask();
                    }
                }
            };
            this.idleScanTimer.schedule(idleScanTask, this.idleScanInterval);
        }
    }

    private class Handler
    extends Thread {
        public Handler(int instanceNumber) {
            this.setDaemon(true);
            this.setName("IPC Server handler " + instanceNumber + " on default port " + Server.this.port);
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         * Loose catch block
         */
        @Override
        public void run() {
            LOG.debug(Thread.currentThread().getName() + ": starting");
            SERVER.set(Server.this);
            while (Server.this.running) {
                boolean connDropped;
                long startTimeNanos;
                Call call;
                TraceScope traceScope;
                block16: {
                    traceScope = null;
                    call = null;
                    startTimeNanos = 0L;
                    connDropped = true;
                    call = (Call)Server.this.callQueue.take();
                    startTimeNanos = Time.monotonicNowNanos();
                    if (Server.this.alignmentContext == null || !call.isCallCoordinated() || call.getClientStateId() <= Server.this.alignmentContext.getLastSeenStateId()) break block16;
                    this.requeueCall(call);
                    CurCall.set(null);
                    IOUtils.cleanupWithLogger(LOG, new Closeable[]{traceScope});
                    if (call == null) continue;
                    Server.this.updateMetrics(call, startTimeNanos, connDropped);
                    ProcessingDetails.LOG.debug("Served: [{}]{} name={} user={} details={}", new Object[]{call, call.isResponseDeferred() ? ", deferred" : "", call.getDetailedMetricsName(), call.getRemoteUser(), call.getProcessingDetails()});
                    continue;
                }
                try {
                    if (LOG.isDebugEnabled()) {
                        LOG.debug(Thread.currentThread().getName() + ": " + call + " for RpcKind " + (Object)((Object)call.rpcKind));
                    }
                    CurCall.set(call);
                    if (call.traceScope != null) {
                        call.traceScope.reattach();
                        traceScope = call.traceScope;
                        traceScope.getSpan().addTimelineAnnotation("called");
                    }
                    CallerContext.setCurrent(call.callerContext);
                    UserGroupInformation remoteUser = call.getRemoteUser();
                    boolean bl = connDropped = !call.isOpen();
                    if (remoteUser != null) {
                        remoteUser.doAs(call);
                    } else {
                        call.run();
                    }
                    CurCall.set(null);
                }
                catch (InterruptedException e) {
                    if (Server.this.running) {
                        LOG.info(Thread.currentThread().getName() + " unexpectedly interrupted", (Throwable)e);
                        if (traceScope != null) {
                            traceScope.getSpan().addTimelineAnnotation("unexpectedly interrupted: " + StringUtils.stringifyException(e));
                        }
                    }
                    CurCall.set(null);
                    IOUtils.cleanupWithLogger(LOG, new Closeable[]{traceScope});
                    if (call == null) continue;
                    Server.this.updateMetrics(call, startTimeNanos, connDropped);
                    ProcessingDetails.LOG.debug("Served: [{}]{} name={} user={} details={}", new Object[]{call, call.isResponseDeferred() ? ", deferred" : "", call.getDetailedMetricsName(), call.getRemoteUser(), call.getProcessingDetails()});
                    continue;
                }
                catch (Exception e2) {
                    LOG.info(Thread.currentThread().getName() + " caught an exception", (Throwable)e2);
                    if (traceScope != null) {
                        traceScope.getSpan().addTimelineAnnotation("Exception: " + StringUtils.stringifyException(e2));
                    }
                    CurCall.set(null);
                    {
                        catch (Throwable throwable) {
                            CurCall.set(null);
                            IOUtils.cleanupWithLogger(LOG, new Closeable[]{traceScope});
                            if (call != null) {
                                Server.this.updateMetrics(call, startTimeNanos, connDropped);
                                ProcessingDetails.LOG.debug("Served: [{}]{} name={} user={} details={}", new Object[]{call, call.isResponseDeferred() ? ", deferred" : "", call.getDetailedMetricsName(), call.getRemoteUser(), call.getProcessingDetails()});
                            }
                            throw throwable;
                        }
                    }
                    IOUtils.cleanupWithLogger(LOG, new Closeable[]{traceScope});
                    if (call == null) continue;
                    Server.this.updateMetrics(call, startTimeNanos, connDropped);
                    ProcessingDetails.LOG.debug("Served: [{}]{} name={} user={} details={}", new Object[]{call, call.isResponseDeferred() ? ", deferred" : "", call.getDetailedMetricsName(), call.getRemoteUser(), call.getProcessingDetails()});
                    continue;
                }
                IOUtils.cleanupWithLogger(LOG, new Closeable[]{traceScope});
                if (call == null) continue;
                Server.this.updateMetrics(call, startTimeNanos, connDropped);
                ProcessingDetails.LOG.debug("Served: [{}]{} name={} user={} details={}", new Object[]{call, call.isResponseDeferred() ? ", deferred" : "", call.getDetailedMetricsName(), call.getRemoteUser(), call.getProcessingDetails()});
            }
            LOG.debug(Thread.currentThread().getName() + ": exiting");
        }

        private void requeueCall(Call call) throws IOException, InterruptedException {
            try {
                Server.this.internalQueueCall(call, false);
            }
            catch (RpcServerException rse) {
                call.doResponse(rse.getCause(), rse.getRpcStatusProto());
            }
        }
    }

    public class Connection {
        private boolean connectionHeaderRead = false;
        private boolean connectionContextRead = false;
        private SocketChannel channel;
        private ByteBuffer data;
        private final ByteBuffer dataLengthBuffer;
        private LinkedList<RpcCall> responseQueue;
        private AtomicInteger rpcCount = new AtomicInteger();
        private long lastContact;
        private int dataLength;
        private Socket socket;
        private String hostAddress;
        private int remotePort;
        private InetAddress addr;
        IpcConnectionContextProtos.IpcConnectionContextProto connectionContext;
        String protocolName;
        SaslServer saslServer;
        private String establishedQOP;
        private SaslRpcServer.AuthMethod authMethod;
        private AuthProtocol authProtocol;
        private boolean saslContextEstablished;
        private ByteBuffer connectionHeaderBuf = null;
        private ByteBuffer unwrappedData;
        private ByteBuffer unwrappedDataLengthBuffer;
        private int serviceClass;
        private boolean shouldClose = false;
        private int ingressPort;
        private boolean isOnAuxiliaryPort;
        UserGroupInformation user = null;
        public UserGroupInformation attemptingUser = null;
        private final RpcCall authFailedCall = new RpcCall(this, -1);
        private boolean sentNegotiate = false;
        private boolean useWrap = false;

        public Connection(SocketChannel channel, long lastContact, int ingressPort, boolean isOnAuxiliaryPort) {
            this.channel = channel;
            this.lastContact = lastContact;
            this.data = null;
            this.dataLengthBuffer = ByteBuffer.allocate(4);
            this.unwrappedData = null;
            this.unwrappedDataLengthBuffer = ByteBuffer.allocate(4);
            this.socket = channel.socket();
            this.addr = this.socket.getInetAddress();
            this.ingressPort = ingressPort;
            this.isOnAuxiliaryPort = isOnAuxiliaryPort;
            this.hostAddress = this.addr == null ? "*Unknown*" : this.addr.getHostAddress();
            this.remotePort = this.socket.getPort();
            this.responseQueue = new LinkedList();
            if (Server.this.socketSendBufferSize != 0) {
                try {
                    this.socket.setSendBufferSize(Server.this.socketSendBufferSize);
                }
                catch (IOException e) {
                    LOG.warn("Connection: unable to set socket send buffer size to " + Server.this.socketSendBufferSize);
                }
            }
        }

        public String toString() {
            return this.getHostAddress() + ":" + this.remotePort;
        }

        boolean setShouldClose() {
            this.shouldClose = true;
            return true;
        }

        boolean shouldClose() {
            return this.shouldClose;
        }

        public String getHostAddress() {
            return this.hostAddress;
        }

        public int getIngressPort() {
            return this.ingressPort;
        }

        public InetAddress getHostInetAddress() {
            return this.addr;
        }

        public String getEstablishedQOP() {
            return this.establishedQOP;
        }

        public boolean isOnAuxiliaryPort() {
            return this.isOnAuxiliaryPort;
        }

        public void setLastContact(long lastContact) {
            this.lastContact = lastContact;
        }

        public long getLastContact() {
            return this.lastContact;
        }

        public Server getServer() {
            return Server.this;
        }

        private boolean isIdle() {
            return this.rpcCount.get() == 0;
        }

        private void decRpcCount() {
            this.rpcCount.decrementAndGet();
        }

        private void incRpcCount() {
            this.rpcCount.incrementAndGet();
        }

        private UserGroupInformation getAuthorizedUgi(String authorizedId) throws SecretManager.InvalidToken, AccessControlException {
            if (this.authMethod == SaslRpcServer.AuthMethod.TOKEN) {
                Object tokenId = SaslRpcServer.getIdentifier(authorizedId, Server.this.secretManager);
                UserGroupInformation ugi = ((TokenIdentifier)tokenId).getUser();
                if (ugi == null) {
                    throw new AccessControlException("Can't retrieve username from tokenIdentifier.");
                }
                ugi.addTokenIdentifier((TokenIdentifier)tokenId);
                return ugi;
            }
            return UserGroupInformation.createRemoteUser(authorizedId, this.authMethod);
        }

        private void saslReadAndProcess(RpcWritable.Buffer buffer) throws RpcServerException, IOException, InterruptedException {
            RpcHeaderProtos.RpcSaslProto saslMessage = (RpcHeaderProtos.RpcSaslProto)this.getMessage((Message)RpcHeaderProtos.RpcSaslProto.getDefaultInstance(), buffer);
            switch (saslMessage.getState()) {
                case WRAP: {
                    if (!this.saslContextEstablished || !this.useWrap) {
                        throw new FatalRpcServerException(RpcHeaderProtos.RpcResponseHeaderProto.RpcErrorCodeProto.FATAL_INVALID_RPC_HEADER, new SaslException("Server is not wrapping data"));
                    }
                    this.unwrapPacketAndProcessRpcs(saslMessage.getToken().toByteArray());
                    break;
                }
                default: {
                    this.saslProcess(saslMessage);
                }
            }
        }

        private Throwable getTrueCause(IOException e) {
            for (Throwable cause = e; cause != null; cause = cause.getCause()) {
                if (cause instanceof RetriableException) {
                    return cause;
                }
                if (cause instanceof StandbyException) {
                    return cause;
                }
                if (!(cause instanceof SecretManager.InvalidToken)) continue;
                if (cause.getCause() != null) {
                    cause = cause.getCause();
                }
                return cause;
            }
            return e;
        }

        private void saslProcess(RpcHeaderProtos.RpcSaslProto saslMessage) throws RpcServerException, IOException, InterruptedException {
            if (this.saslContextEstablished) {
                throw new FatalRpcServerException(RpcHeaderProtos.RpcResponseHeaderProto.RpcErrorCodeProto.FATAL_INVALID_RPC_HEADER, new SaslException("Negotiation is already complete"));
            }
            RpcHeaderProtos.RpcSaslProto saslResponse = null;
            try {
                try {
                    saslResponse = this.processSaslMessage(saslMessage);
                }
                catch (IOException e) {
                    Server.this.rpcMetrics.incrAuthenticationFailures();
                    if (LOG.isDebugEnabled()) {
                        LOG.debug(StringUtils.stringifyException(e));
                    }
                    IOException tce = (IOException)this.getTrueCause(e);
                    AUDITLOG.warn(Server.AUTH_FAILED_FOR + this.toString() + ":" + this.attemptingUser + " (" + e.getLocalizedMessage() + ") with true cause: (" + tce.getLocalizedMessage() + ")");
                    throw tce;
                }
                if (this.saslServer != null && this.saslServer.isComplete()) {
                    if (LOG.isDebugEnabled()) {
                        LOG.debug("SASL server context established. Negotiated QoP is " + this.saslServer.getNegotiatedProperty("javax.security.sasl.qop"));
                    }
                    this.user = this.getAuthorizedUgi(this.saslServer.getAuthorizationID());
                    if (LOG.isDebugEnabled()) {
                        LOG.debug("SASL server successfully authenticated client: " + this.user);
                    }
                    Server.this.rpcMetrics.incrAuthenticationSuccesses();
                    AUDITLOG.info(Server.AUTH_SUCCESSFUL_FOR + this.user);
                    this.saslContextEstablished = true;
                }
            }
            catch (RpcServerException rse) {
                throw rse;
            }
            catch (IOException ioe) {
                throw new FatalRpcServerException(RpcHeaderProtos.RpcResponseHeaderProto.RpcErrorCodeProto.FATAL_UNAUTHORIZED, ioe);
            }
            if (saslResponse != null) {
                this.doSaslReply((Message)saslResponse);
            }
            if (this.saslContextEstablished) {
                String qop;
                this.establishedQOP = qop = (String)this.saslServer.getNegotiatedProperty("javax.security.sasl.qop");
                boolean bl = this.useWrap = qop != null && !"auth".equalsIgnoreCase(qop);
                if (!this.useWrap) {
                    this.disposeSasl();
                }
            }
        }

        private RpcHeaderProtos.RpcSaslProto processSaslMessage(RpcHeaderProtos.RpcSaslProto saslMessage) throws SaslException, IOException, AccessControlException, InterruptedException {
            RpcHeaderProtos.RpcSaslProto saslResponse;
            RpcHeaderProtos.RpcSaslProto.SaslState state = saslMessage.getState();
            switch (state) {
                case NEGOTIATE: {
                    if (this.sentNegotiate) {
                        throw new AccessControlException("Client already attempted negotiation");
                    }
                    saslResponse = this.buildSaslNegotiateResponse();
                    if (saslResponse.getState() != RpcHeaderProtos.RpcSaslProto.SaslState.SUCCESS) break;
                    this.switchToSimple();
                    break;
                }
                case INITIATE: {
                    if (saslMessage.getAuthsCount() != 1) {
                        throw new SaslException("Client mechanism is malformed");
                    }
                    RpcHeaderProtos.RpcSaslProto.SaslAuth clientSaslAuth = saslMessage.getAuths(0);
                    if (!Server.this.negotiateResponse.getAuthsList().contains(clientSaslAuth)) {
                        if (this.sentNegotiate) {
                            throw new AccessControlException(clientSaslAuth.getMethod() + " authentication is not enabled.  Available:" + Server.this.enabledAuthMethods);
                        }
                        saslResponse = this.buildSaslNegotiateResponse();
                        break;
                    }
                    this.authMethod = SaslRpcServer.AuthMethod.valueOf(clientSaslAuth.getMethod());
                    if (this.authMethod == SaslRpcServer.AuthMethod.SIMPLE) {
                        this.switchToSimple();
                        saslResponse = null;
                        break;
                    }
                    if (this.saslServer == null || this.authMethod != SaslRpcServer.AuthMethod.TOKEN) {
                        this.saslServer = this.createSaslServer(this.authMethod);
                    }
                    saslResponse = this.processSaslToken(saslMessage);
                    break;
                }
                case RESPONSE: {
                    saslResponse = this.processSaslToken(saslMessage);
                    break;
                }
                default: {
                    throw new SaslException("Client sent unsupported state " + (Object)((Object)state));
                }
            }
            return saslResponse;
        }

        private RpcHeaderProtos.RpcSaslProto processSaslToken(RpcHeaderProtos.RpcSaslProto saslMessage) throws SaslException {
            if (!saslMessage.hasToken()) {
                throw new SaslException("Client did not send a token");
            }
            byte[] saslToken = saslMessage.getToken().toByteArray();
            if (LOG.isDebugEnabled()) {
                LOG.debug("Have read input token of size " + saslToken.length + " for processing by saslServer.evaluateResponse()");
            }
            saslToken = this.saslServer.evaluateResponse(saslToken);
            return this.buildSaslResponse(this.saslServer.isComplete() ? RpcHeaderProtos.RpcSaslProto.SaslState.SUCCESS : RpcHeaderProtos.RpcSaslProto.SaslState.CHALLENGE, saslToken);
        }

        private void switchToSimple() {
            this.authProtocol = AuthProtocol.NONE;
            this.disposeSasl();
        }

        private RpcHeaderProtos.RpcSaslProto buildSaslResponse(RpcHeaderProtos.RpcSaslProto.SaslState state, byte[] replyToken) {
            if (LOG.isDebugEnabled()) {
                LOG.debug("Will send " + (Object)((Object)state) + " token of size " + (replyToken != null ? Integer.valueOf(replyToken.length) : null) + " from saslServer.");
            }
            RpcHeaderProtos.RpcSaslProto.Builder response = RpcHeaderProtos.RpcSaslProto.newBuilder();
            response.setState(state);
            if (replyToken != null) {
                response.setToken(ByteString.copyFrom((byte[])replyToken));
            }
            return response.build();
        }

        private void doSaslReply(Message message) throws IOException {
            RpcCall saslCall = new RpcCall(this, AuthProtocol.SASL.callId);
            Server.this.setupResponse(saslCall, RpcHeaderProtos.RpcResponseHeaderProto.RpcStatusProto.SUCCESS, null, RpcWritable.wrap(message), null, null);
            this.sendResponse(saslCall);
        }

        private void doSaslReply(Exception ioe) throws IOException {
            Server.this.setupResponse(this.authFailedCall, RpcHeaderProtos.RpcResponseHeaderProto.RpcStatusProto.FATAL, RpcHeaderProtos.RpcResponseHeaderProto.RpcErrorCodeProto.FATAL_UNAUTHORIZED, null, ioe.getClass().getName(), ioe.toString());
            this.sendResponse(this.authFailedCall);
        }

        private void disposeSasl() {
            if (this.saslServer != null) {
                try {
                    this.saslServer.dispose();
                }
                catch (SaslException saslException) {
                }
                finally {
                    this.saslServer = null;
                }
            }
        }

        private void checkDataLength(int dataLength) throws IOException {
            if (dataLength < 0) {
                String error = "Unexpected data length " + dataLength + "!! from " + this.getHostAddress();
                LOG.warn(error);
                throw new IOException(error);
            }
            if (dataLength > Server.this.maxDataLength) {
                String error = "Requested data length " + dataLength + " is longer than maximum configured RPC length " + Server.this.maxDataLength + ".  RPC came from " + this.getHostAddress();
                LOG.warn(error);
                throw new IOException(error);
            }
        }

        public int readAndProcess() throws IOException, InterruptedException {
            while (!this.shouldClose()) {
                int count = -1;
                if (this.dataLengthBuffer.remaining() > 0 && ((count = Server.this.channelRead(this.channel, this.dataLengthBuffer)) < 0 || this.dataLengthBuffer.remaining() > 0)) {
                    return count;
                }
                if (!this.connectionHeaderRead) {
                    if (this.connectionHeaderBuf == null) {
                        this.connectionHeaderBuf = ByteBuffer.allocate(3);
                    }
                    if ((count = Server.this.channelRead(this.channel, this.connectionHeaderBuf)) < 0 || this.connectionHeaderBuf.remaining() > 0) {
                        return count;
                    }
                    byte version = this.connectionHeaderBuf.get(0);
                    this.setServiceClass(this.connectionHeaderBuf.get(1));
                    this.dataLengthBuffer.flip();
                    if (HTTP_GET_BYTES.equals(this.dataLengthBuffer)) {
                        this.setupHttpRequestOnIpcPortResponse();
                        return -1;
                    }
                    if (!RpcConstants.HEADER.equals(this.dataLengthBuffer)) {
                        LOG.warn("Incorrect RPC Header length from {}:{} expected length: {} got length: {}", new Object[]{this.hostAddress, this.remotePort, RpcConstants.HEADER, this.dataLengthBuffer});
                        this.setupBadVersionResponse(version);
                        return -1;
                    }
                    if (version != 9) {
                        LOG.warn("Version mismatch from " + this.hostAddress + ":" + this.remotePort + " got version " + version + " expected version " + 9);
                        this.setupBadVersionResponse(version);
                        return -1;
                    }
                    this.authProtocol = this.initializeAuthContext(this.connectionHeaderBuf.get(2));
                    this.dataLengthBuffer.clear();
                    this.connectionHeaderBuf = null;
                    this.connectionHeaderRead = true;
                    continue;
                }
                if (this.data == null) {
                    this.dataLengthBuffer.flip();
                    this.dataLength = this.dataLengthBuffer.getInt();
                    this.checkDataLength(this.dataLength);
                    this.data = ByteBuffer.allocate(this.dataLength);
                }
                count = Server.this.channelRead(this.channel, this.data);
                if (this.data.remaining() == 0) {
                    this.dataLengthBuffer.clear();
                    this.data.flip();
                    ByteBuffer requestData = this.data;
                    this.data = null;
                    boolean isHeaderRead = this.connectionContextRead;
                    this.processOneRpc(requestData);
                    if (!isHeaderRead) continue;
                }
                return count;
            }
            return -1;
        }

        private AuthProtocol initializeAuthContext(int authType) throws IOException {
            AuthProtocol authProtocol = AuthProtocol.valueOf(authType);
            if (authProtocol == null) {
                IpcException ioe = new IpcException("Unknown auth protocol:" + authType);
                this.doSaslReply(ioe);
                throw ioe;
            }
            boolean isSimpleEnabled = Server.this.enabledAuthMethods.contains((Object)SaslRpcServer.AuthMethod.SIMPLE);
            switch (authProtocol) {
                case NONE: {
                    if (isSimpleEnabled) break;
                    AccessControlException ioe = new AccessControlException("SIMPLE authentication is not enabled.  Available:" + Server.this.enabledAuthMethods);
                    this.doSaslReply(ioe);
                    throw ioe;
                }
            }
            return authProtocol;
        }

        private RpcHeaderProtos.RpcSaslProto buildSaslNegotiateResponse() throws InterruptedException, SaslException, IOException {
            RpcHeaderProtos.RpcSaslProto negotiateMessage = Server.this.negotiateResponse;
            if (Server.this.enabledAuthMethods.contains((Object)SaslRpcServer.AuthMethod.TOKEN)) {
                this.saslServer = this.createSaslServer(SaslRpcServer.AuthMethod.TOKEN);
                byte[] challenge = this.saslServer.evaluateResponse(new byte[0]);
                RpcHeaderProtos.RpcSaslProto.Builder negotiateBuilder = RpcHeaderProtos.RpcSaslProto.newBuilder(Server.this.negotiateResponse);
                negotiateBuilder.getAuthsBuilder(0).setChallenge(ByteString.copyFrom((byte[])challenge));
                negotiateMessage = negotiateBuilder.build();
            }
            this.sentNegotiate = true;
            return negotiateMessage;
        }

        private SaslServer createSaslServer(SaslRpcServer.AuthMethod authMethod) throws IOException, InterruptedException {
            Map<String, String> saslProps = Server.this.saslPropsResolver.getServerProperties(this.addr, this.ingressPort);
            return new SaslRpcServer(authMethod).create(this, saslProps, Server.this.secretManager);
        }

        private void setupBadVersionResponse(int clientVersion) throws IOException {
            String errMsg = "Server IPC version 9 cannot communicate with client version " + clientVersion;
            ByteArrayOutputStream buffer = new ByteArrayOutputStream();
            if (clientVersion >= 9) {
                RpcCall fakeCall = new RpcCall(this, -1);
                Server.this.setupResponse(fakeCall, RpcHeaderProtos.RpcResponseHeaderProto.RpcStatusProto.FATAL, RpcHeaderProtos.RpcResponseHeaderProto.RpcErrorCodeProto.FATAL_VERSION_MISMATCH, null, RPC.VersionMismatch.class.getName(), errMsg);
                this.sendResponse(fakeCall);
            } else if (clientVersion >= 3) {
                RpcCall fakeCall = new RpcCall(this, -1);
                Server.this.setupResponseOldVersionFatal(buffer, fakeCall, null, RPC.VersionMismatch.class.getName(), errMsg);
                this.sendResponse(fakeCall);
            } else if (clientVersion == 2) {
                RpcCall fakeCall = new RpcCall(this, 0);
                DataOutputStream out = new DataOutputStream(buffer);
                out.writeInt(0);
                out.writeBoolean(true);
                WritableUtils.writeString(out, RPC.VersionMismatch.class.getName());
                WritableUtils.writeString(out, errMsg);
                fakeCall.setResponse(ByteBuffer.wrap(buffer.toByteArray()));
                this.sendResponse(fakeCall);
            }
        }

        private void setupHttpRequestOnIpcPortResponse() throws IOException {
            RpcCall fakeCall = new RpcCall(this, 0);
            fakeCall.setResponse(ByteBuffer.wrap(Server.RECEIVED_HTTP_REQ_RESPONSE.getBytes(StandardCharsets.UTF_8)));
            this.sendResponse(fakeCall);
        }

        private void processConnectionContext(RpcWritable.Buffer buffer) throws RpcServerException {
            if (this.connectionContextRead) {
                throw new FatalRpcServerException(RpcHeaderProtos.RpcResponseHeaderProto.RpcErrorCodeProto.FATAL_INVALID_RPC_HEADER, "Connection context already processed");
            }
            this.connectionContext = (IpcConnectionContextProtos.IpcConnectionContextProto)this.getMessage((Message)IpcConnectionContextProtos.IpcConnectionContextProto.getDefaultInstance(), buffer);
            this.protocolName = this.connectionContext.hasProtocol() ? this.connectionContext.getProtocol() : null;
            UserGroupInformation protocolUser = ProtoUtil.getUgi(this.connectionContext);
            if (this.authProtocol == AuthProtocol.NONE) {
                this.user = protocolUser;
            } else {
                this.user.setAuthenticationMethod(this.authMethod);
                if (protocolUser != null && !protocolUser.getUserName().equals(this.user.getUserName())) {
                    if (this.authMethod == SaslRpcServer.AuthMethod.TOKEN) {
                        throw new FatalRpcServerException(RpcHeaderProtos.RpcResponseHeaderProto.RpcErrorCodeProto.FATAL_UNAUTHORIZED, new AccessControlException("Authenticated user (" + this.user + ") doesn't match what the client claims to be (" + protocolUser + ")"));
                    }
                    UserGroupInformation realUser = this.user;
                    this.user = UserGroupInformation.createProxyUser(protocolUser.getUserName(), realUser);
                }
            }
            this.authorizeConnection();
            this.connectionContextRead = true;
            if (this.user != null) {
                Server.this.connectionManager.incrUserConnections(this.user.getShortUserName());
            }
        }

        private void unwrapPacketAndProcessRpcs(byte[] inBuf) throws IOException, InterruptedException {
            if (LOG.isDebugEnabled()) {
                LOG.debug("Have read input token of size " + inBuf.length + " for processing by saslServer.unwrap()");
            }
            inBuf = this.saslServer.unwrap(inBuf, 0, inBuf.length);
            ReadableByteChannel ch = Channels.newChannel(new ByteArrayInputStream(inBuf));
            while (!this.shouldClose()) {
                int count = -1;
                if (this.unwrappedDataLengthBuffer.remaining() > 0 && ((count = Server.this.channelRead(ch, this.unwrappedDataLengthBuffer)) <= 0 || this.unwrappedDataLengthBuffer.remaining() > 0)) {
                    return;
                }
                if (this.unwrappedData == null) {
                    this.unwrappedDataLengthBuffer.flip();
                    int unwrappedDataLength = this.unwrappedDataLengthBuffer.getInt();
                    this.unwrappedData = ByteBuffer.allocate(unwrappedDataLength);
                }
                if ((count = Server.this.channelRead(ch, this.unwrappedData)) <= 0 || this.unwrappedData.remaining() > 0) {
                    return;
                }
                if (this.unwrappedData.remaining() != 0) continue;
                this.unwrappedDataLengthBuffer.clear();
                this.unwrappedData.flip();
                ByteBuffer requestData = this.unwrappedData;
                this.unwrappedData = null;
                this.processOneRpc(requestData);
            }
        }

        private void processOneRpc(ByteBuffer bb) throws IOException, InterruptedException {
            int callId = -1;
            int retry = -1;
            try {
                RpcWritable.Buffer buffer = RpcWritable.Buffer.wrap(bb);
                RpcHeaderProtos.RpcRequestHeaderProto header = (RpcHeaderProtos.RpcRequestHeaderProto)this.getMessage((Message)RpcHeaderProtos.RpcRequestHeaderProto.getDefaultInstance(), buffer);
                callId = header.getCallId();
                retry = header.getRetryCount();
                if (LOG.isDebugEnabled()) {
                    LOG.debug(" got #" + callId);
                }
                this.checkRpcHeaders(header);
                if (callId < 0) {
                    this.processRpcOutOfBandRequest(header, buffer);
                } else {
                    if (!this.connectionContextRead) {
                        throw new FatalRpcServerException(RpcHeaderProtos.RpcResponseHeaderProto.RpcErrorCodeProto.FATAL_INVALID_RPC_HEADER, "Connection context not established");
                    }
                    this.processRpcRequest(header, buffer);
                }
            }
            catch (RpcServerException rse) {
                if (LOG.isDebugEnabled()) {
                    LOG.debug(Thread.currentThread().getName() + ": processOneRpc from client " + this + " threw exception [" + rse + "]");
                }
                Throwable t = rse.getCause() != null ? rse.getCause() : rse;
                RpcCall call = new RpcCall(this, callId, retry);
                Server.this.setupResponse(call, rse.getRpcStatusProto(), rse.getRpcErrorCodeProto(), null, t.getClass().getName(), t.getMessage() != null ? t.getMessage() : t.toString());
                this.sendResponse(call);
            }
        }

        private void checkRpcHeaders(RpcHeaderProtos.RpcRequestHeaderProto header) throws RpcServerException {
            if (!header.hasRpcOp()) {
                String err = " IPC Server: No rpc op in rpcRequestHeader";
                throw new FatalRpcServerException(RpcHeaderProtos.RpcResponseHeaderProto.RpcErrorCodeProto.FATAL_INVALID_RPC_HEADER, err);
            }
            if (header.getRpcOp() != RpcHeaderProtos.RpcRequestHeaderProto.OperationProto.RPC_FINAL_PACKET) {
                String err = "IPC Server does not implement rpc header operation" + (Object)((Object)header.getRpcOp());
                throw new FatalRpcServerException(RpcHeaderProtos.RpcResponseHeaderProto.RpcErrorCodeProto.FATAL_INVALID_RPC_HEADER, err);
            }
            if (!header.hasRpcKind()) {
                String err = " IPC Server: No rpc kind in rpcRequestHeader";
                throw new FatalRpcServerException(RpcHeaderProtos.RpcResponseHeaderProto.RpcErrorCodeProto.FATAL_INVALID_RPC_HEADER, err);
            }
        }

        private void processRpcRequest(RpcHeaderProtos.RpcRequestHeaderProto header, RpcWritable.Buffer buffer) throws RpcServerException, InterruptedException {
            Writable rpcRequest;
            Class<? extends Writable> rpcRequestClass = Server.this.getRpcRequestWrapper(header.getRpcKind());
            if (rpcRequestClass == null) {
                LOG.warn("Unknown rpc kind " + (Object)((Object)header.getRpcKind()) + " from client " + this.getHostAddress());
                String err = "Unknown rpc kind in rpc header" + (Object)((Object)header.getRpcKind());
                throw new FatalRpcServerException(RpcHeaderProtos.RpcResponseHeaderProto.RpcErrorCodeProto.FATAL_INVALID_RPC_HEADER, err);
            }
            try {
                rpcRequest = buffer.newInstance(rpcRequestClass, Server.this.conf);
            }
            catch (RpcServerException rse) {
                throw rse;
            }
            catch (Throwable t) {
                LOG.warn("Unable to read call parameters for client " + this.getHostAddress() + "on connection protocol " + this.protocolName + " for rpcKind " + (Object)((Object)header.getRpcKind()), t);
                String err = "IPC server unable to read call parameters: " + t.getMessage();
                throw new FatalRpcServerException(RpcHeaderProtos.RpcResponseHeaderProto.RpcErrorCodeProto.FATAL_DESERIALIZING_REQUEST, err);
            }
            TraceScope traceScope = null;
            if (header.hasTraceInfo() && Server.this.tracer != null) {
                SpanId parentSpanId = new SpanId(header.getTraceInfo().getTraceId(), header.getTraceInfo().getParentId());
                traceScope = Server.this.tracer.newScope(RpcClientUtil.toTraceName(rpcRequest.toString()), parentSpanId);
                traceScope.detach();
            }
            CallerContext callerContext = null;
            if (header.hasCallerContext()) {
                callerContext = new CallerContext.Builder(header.getCallerContext().getContext()).setSignature(header.getCallerContext().getSignature().toByteArray()).build();
            }
            RpcCall call = new RpcCall(this, header.getCallId(), header.getRetryCount(), rpcRequest, ProtoUtil.convert(header.getRpcKind()), header.getClientId().toByteArray(), traceScope, callerContext);
            call.setPriorityLevel(Server.this.callQueue.getPriorityLevel(call));
            call.markCallCoordinated(false);
            if (Server.this.alignmentContext != null && call.rpcRequest != null && call.rpcRequest instanceof ProtobufRpcEngine2.RpcProtobufRequest) {
                ProtobufRpcEngine2.RpcProtobufRequest req = (ProtobufRpcEngine2.RpcProtobufRequest)call.rpcRequest;
                try {
                    String methodName = req.getRequestHeader().getMethodName();
                    String protoName = req.getRequestHeader().getDeclaringClassProtocolName();
                    if (Server.this.alignmentContext.isCoordinatedCall(protoName, methodName)) {
                        call.markCallCoordinated(true);
                        long stateId = Server.this.alignmentContext.receiveRequestState(header, Server.this.getMaxIdleTime());
                        call.setClientStateId(stateId);
                    }
                }
                catch (IOException ioe) {
                    throw new RpcServerException("Processing RPC request caught ", ioe);
                }
            }
            try {
                Server.this.internalQueueCall(call);
            }
            catch (RpcServerException rse) {
                throw rse;
            }
            catch (IOException ioe) {
                throw new FatalRpcServerException(RpcHeaderProtos.RpcResponseHeaderProto.RpcErrorCodeProto.ERROR_RPC_SERVER, ioe);
            }
            this.incRpcCount();
        }

        private void processRpcOutOfBandRequest(RpcHeaderProtos.RpcRequestHeaderProto header, RpcWritable.Buffer buffer) throws RpcServerException, IOException, InterruptedException {
            int callId = header.getCallId();
            if (callId == -3) {
                if (this.authProtocol == AuthProtocol.SASL && !this.saslContextEstablished) {
                    throw new FatalRpcServerException(RpcHeaderProtos.RpcResponseHeaderProto.RpcErrorCodeProto.FATAL_INVALID_RPC_HEADER, "Connection header sent during SASL negotiation");
                }
                this.processConnectionContext(buffer);
            } else if (callId == AuthProtocol.SASL.callId) {
                if (this.authProtocol != AuthProtocol.SASL) {
                    throw new FatalRpcServerException(RpcHeaderProtos.RpcResponseHeaderProto.RpcErrorCodeProto.FATAL_INVALID_RPC_HEADER, "SASL protocol not requested by client");
                }
                this.saslReadAndProcess(buffer);
            } else if (callId == -4) {
                LOG.debug("Received ping message");
            } else {
                throw new FatalRpcServerException(RpcHeaderProtos.RpcResponseHeaderProto.RpcErrorCodeProto.FATAL_INVALID_RPC_HEADER, "Unknown out of band call #" + callId);
            }
        }

        private void authorizeConnection() throws RpcServerException {
            try {
                if (this.user != null && this.user.getRealUser() != null && this.authMethod != SaslRpcServer.AuthMethod.TOKEN) {
                    ProxyUsers.authorize(this.user, this.getHostAddress());
                }
                Server.this.authorize(this.user, this.protocolName, this.getHostInetAddress());
                if (LOG.isDebugEnabled()) {
                    LOG.debug("Successfully authorized " + this.connectionContext);
                }
                Server.this.rpcMetrics.incrAuthorizationSuccesses();
            }
            catch (AuthorizationException ae) {
                LOG.info("Connection from " + this + " for protocol " + this.connectionContext.getProtocol() + " is unauthorized for user " + this.user);
                Server.this.rpcMetrics.incrAuthorizationFailures();
                throw new FatalRpcServerException(RpcHeaderProtos.RpcResponseHeaderProto.RpcErrorCodeProto.FATAL_UNAUTHORIZED, ae);
            }
        }

        <T extends Message> T getMessage(Message message, RpcWritable.Buffer buffer) throws RpcServerException {
            try {
                return (T)buffer.getValue(message);
            }
            catch (Exception ioe) {
                Class<?> protoClass = message.getClass();
                throw new FatalRpcServerException(RpcHeaderProtos.RpcResponseHeaderProto.RpcErrorCodeProto.FATAL_DESERIALIZING_REQUEST, "Error decoding " + protoClass.getSimpleName() + ": " + ioe);
            }
        }

        private void sendResponse(RpcCall call) throws IOException {
            Server.this.responder.doRespond(call);
        }

        public int getServiceClass() {
            return this.serviceClass;
        }

        public void setServiceClass(int serviceClass) {
            this.serviceClass = serviceClass;
        }

        private synchronized void close() {
            this.disposeSasl();
            this.data = null;
            if (!this.channel.isOpen()) {
                return;
            }
            try {
                this.socket.shutdownOutput();
            }
            catch (Exception e) {
                LOG.debug("Ignoring socket shutdown exception", (Throwable)e);
            }
            if (this.channel.isOpen()) {
                IOUtils.cleanupWithLogger(LOG, this.channel);
            }
            IOUtils.cleanupWithLogger(LOG, this.socket);
        }
    }

    private static class FatalRpcServerException
    extends RpcServerException {
        private final RpcHeaderProtos.RpcResponseHeaderProto.RpcErrorCodeProto errCode;

        public FatalRpcServerException(RpcHeaderProtos.RpcResponseHeaderProto.RpcErrorCodeProto errCode, IOException ioe) {
            super(ioe.toString(), ioe);
            this.errCode = errCode;
        }

        public FatalRpcServerException(RpcHeaderProtos.RpcResponseHeaderProto.RpcErrorCodeProto errCode, String message) {
            this(errCode, new RpcServerException(message));
        }

        @Override
        public RpcHeaderProtos.RpcResponseHeaderProto.RpcStatusProto getRpcStatusProto() {
            return RpcHeaderProtos.RpcResponseHeaderProto.RpcStatusProto.FATAL;
        }

        @Override
        public RpcHeaderProtos.RpcResponseHeaderProto.RpcErrorCodeProto getRpcErrorCodeProto() {
            return this.errCode;
        }

        @Override
        public String toString() {
            return this.getCause().toString();
        }
    }

    @InterfaceAudience.Private
    public static enum AuthProtocol {
        NONE(0),
        SASL(-33);

        public final int callId;

        private AuthProtocol(int callId) {
            this.callId = callId;
        }

        static AuthProtocol valueOf(int callId) {
            for (AuthProtocol authType : AuthProtocol.values()) {
                if (authType.callId != callId) continue;
                return authType;
            }
            return null;
        }
    }

    private class Responder
    extends Thread {
        private final Selector writeSelector;
        private int pending;

        Responder() throws IOException {
            this.setName("IPC Server Responder");
            this.setDaemon(true);
            this.writeSelector = Selector.open();
            this.pending = 0;
        }

        @Override
        public void run() {
            LOG.info(Thread.currentThread().getName() + ": starting");
            SERVER.set(Server.this);
            try {
                this.doRunLoop();
            }
            finally {
                LOG.info("Stopping " + Thread.currentThread().getName());
                try {
                    this.writeSelector.close();
                }
                catch (IOException ioe) {
                    LOG.error("Couldn't close write selector in " + Thread.currentThread().getName(), (Throwable)ioe);
                }
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        private void doRunLoop() {
            long lastPurgeTimeNanos = 0L;
            while (Server.this.running) {
                try {
                    ArrayList<RpcCall> calls;
                    this.waitPending();
                    this.writeSelector.select(TimeUnit.NANOSECONDS.toMillis(PURGE_INTERVAL_NANOS));
                    Iterator<SelectionKey> iter = this.writeSelector.selectedKeys().iterator();
                    while (iter.hasNext()) {
                        SelectionKey key = iter.next();
                        iter.remove();
                        try {
                            if (!key.isWritable()) continue;
                            this.doAsyncWrite(key);
                        }
                        catch (CancelledKeyException cke) {
                            RpcCall call = (RpcCall)key.attachment();
                            if (call == null) continue;
                            LOG.info(Thread.currentThread().getName() + ": connection aborted from " + call.connection);
                        }
                        catch (IOException e) {
                            LOG.info(Thread.currentThread().getName() + ": doAsyncWrite threw exception " + e);
                        }
                    }
                    long nowNanos = Time.monotonicNowNanos();
                    if (nowNanos < lastPurgeTimeNanos + PURGE_INTERVAL_NANOS) continue;
                    lastPurgeTimeNanos = nowNanos;
                    if (LOG.isDebugEnabled()) {
                        LOG.debug("Checking for old call responses.");
                    }
                    Set<SelectionKey> set = this.writeSelector.keys();
                    synchronized (set) {
                        calls = new ArrayList<RpcCall>(this.writeSelector.keys().size());
                        for (SelectionKey key : this.writeSelector.keys()) {
                            RpcCall call = (RpcCall)key.attachment();
                            if (call == null || key.channel() != call.connection.channel) continue;
                            calls.add(call);
                        }
                    }
                    for (RpcCall call : calls) {
                        this.doPurge(call, nowNanos);
                    }
                }
                catch (OutOfMemoryError e) {
                    LOG.warn("Out of Memory in server select", (Throwable)e);
                    try {
                        Thread.sleep(60000L);
                    }
                    catch (Exception exception) {}
                }
                catch (Exception e) {
                    LOG.warn("Exception in Responder", (Throwable)e);
                }
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        private void doAsyncWrite(SelectionKey key) throws IOException {
            RpcCall call = (RpcCall)key.attachment();
            if (call == null) {
                return;
            }
            if (key.channel() != call.connection.channel) {
                throw new IOException("doAsyncWrite: bad channel");
            }
            LinkedList linkedList = call.connection.responseQueue;
            synchronized (linkedList) {
                if (this.processResponse(call.connection.responseQueue, false)) {
                    try {
                        key.interestOps(0);
                    }
                    catch (CancelledKeyException e) {
                        LOG.warn("Exception while changing ops : " + e);
                    }
                }
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        private void doPurge(RpcCall call, long now) {
            LinkedList responseQueue;
            LinkedList linkedList = responseQueue = call.connection.responseQueue;
            synchronized (linkedList) {
                ListIterator iter = responseQueue.listIterator(0);
                while (iter.hasNext()) {
                    call = (RpcCall)iter.next();
                    if (now <= call.responseTimestampNanos + PURGE_INTERVAL_NANOS) continue;
                    Server.this.closeConnection(call.connection);
                    break;
                }
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         * Enabled aggressive block sorting
         * Enabled unnecessary exception pruning
         * Enabled aggressive exception aggregation
         * Converted monitor instructions to comments
         * Lifted jumps to return sites
         */
        private boolean processResponse(LinkedList<RpcCall> responseQueue, boolean inHandler) throws IOException {
            int numBytes;
            RpcCall call;
            int numElements;
            boolean done;
            boolean error;
            block21: {
                error = true;
                done = false;
                numElements = 0;
                call = null;
                LinkedList<RpcCall> linkedList = responseQueue;
                // MONITORENTER : linkedList
                numElements = responseQueue.size();
                if (numElements != 0) break block21;
                error = false;
                boolean bl = true;
                // MONITOREXIT : linkedList
                if (!error) return bl;
                if (call == null) return bl;
                LOG.warn(Thread.currentThread().getName() + ", call " + call + ": output error");
                done = true;
                Server.this.closeConnection(call.connection);
                return bl;
            }
            call = responseQueue.removeFirst();
            SocketChannel channel = call.connection.channel;
            if (LOG.isDebugEnabled()) {
                LOG.debug(Thread.currentThread().getName() + ": responding to " + call);
            }
            if ((numBytes = Server.this.channelWrite(channel, call.rpcResponse)) < 0) {
                boolean bl = true;
                // MONITOREXIT : linkedList
                if (!error) return bl;
                if (call == null) return bl;
                LOG.warn(Thread.currentThread().getName() + ", call " + call + ": output error");
                done = true;
                Server.this.closeConnection(call.connection);
                return bl;
            }
            try {
                if (!call.rpcResponse.hasRemaining()) {
                    call.rpcResponse = null;
                    call.connection.decRpcCount();
                    done = numElements == 1;
                    if (LOG.isDebugEnabled()) {
                        LOG.debug(Thread.currentThread().getName() + ": responding to " + call + " Wrote " + numBytes + " bytes.");
                    }
                } else {
                    call.connection.responseQueue.addFirst(call);
                    if (inHandler) {
                        call.responseTimestampNanos = Time.monotonicNowNanos();
                        this.incPending();
                        try {
                            this.writeSelector.wakeup();
                            channel.register(this.writeSelector, 4, call);
                        }
                        catch (ClosedChannelException e) {
                            done = true;
                        }
                        finally {
                            this.decPending();
                        }
                    }
                    if (LOG.isDebugEnabled()) {
                        LOG.debug(Thread.currentThread().getName() + ": responding to " + call + " Wrote partial " + numBytes + " bytes.");
                    }
                }
                error = false;
                // MONITOREXIT : linkedList
                return done;
            }
            finally {
                if (error && call != null) {
                    LOG.warn(Thread.currentThread().getName() + ", call " + call + ": output error");
                    done = true;
                    Server.this.closeConnection(call.connection);
                }
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        void doRespond(RpcCall call) throws IOException {
            LinkedList linkedList = call.connection.responseQueue;
            synchronized (linkedList) {
                if (call.connection.useWrap) {
                    Server.this.wrapWithSasl(call);
                }
                call.connection.responseQueue.addLast(call);
                if (call.connection.responseQueue.size() == 1) {
                    this.processResponse(call.connection.responseQueue, true);
                }
            }
        }

        private synchronized void incPending() {
            ++this.pending;
        }

        private synchronized void decPending() {
            --this.pending;
            this.notify();
        }

        private synchronized void waitPending() throws InterruptedException {
            while (this.pending > 0) {
                this.wait();
            }
        }
    }

    private class Listener
    extends Thread {
        private ServerSocketChannel acceptChannel = null;
        private Selector selector = null;
        private Reader[] readers = null;
        private int currentReader = 0;
        private InetSocketAddress address;
        private int listenPort;
        private int backlogLength = Server.access$500(Server.this).getInt("ipc.server.listen.queue.size", 256);
        private boolean reuseAddr = Server.access$500(Server.this).getBoolean("ipc.server.reuseaddr", true);
        private boolean isOnAuxiliaryPort;

        Listener(int port) throws IOException {
            this.address = new InetSocketAddress(Server.this.bindAddress, port);
            this.acceptChannel = ServerSocketChannel.open();
            this.acceptChannel.configureBlocking(false);
            this.acceptChannel.setOption((SocketOption)StandardSocketOptions.SO_REUSEADDR, (Object)this.reuseAddr);
            Server.bind(this.acceptChannel.socket(), this.address, this.backlogLength, Server.this.conf, Server.this.portRangeConfig);
            this.listenPort = this.acceptChannel.socket().getLocalPort();
            Thread.currentThread().setName("Listener at " + Server.this.bindAddress + "/" + this.listenPort);
            this.selector = Selector.open();
            this.readers = new Reader[Server.this.readThreads];
            for (int i = 0; i < Server.this.readThreads; ++i) {
                Reader reader;
                this.readers[i] = reader = new Reader("Socket Reader #" + (i + 1) + " for port " + port);
                reader.start();
            }
            this.acceptChannel.register(this.selector, 16);
            this.setName("IPC Server listener on " + port);
            this.setDaemon(true);
            this.isOnAuxiliaryPort = false;
        }

        void setIsAuxiliary() {
            this.isOnAuxiliaryPort = true;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void run() {
            LOG.info(Thread.currentThread().getName() + ": starting");
            SERVER.set(Server.this);
            Server.this.connectionManager.startIdleScan();
            while (Server.this.running) {
                SelectionKey key = null;
                try {
                    this.getSelector().select();
                    Iterator<SelectionKey> iter = this.getSelector().selectedKeys().iterator();
                    while (iter.hasNext()) {
                        key = iter.next();
                        iter.remove();
                        try {
                            if (key.isValid() && key.isAcceptable()) {
                                this.doAccept(key);
                            }
                        }
                        catch (IOException iOException) {
                            // empty catch block
                        }
                        key = null;
                    }
                }
                catch (OutOfMemoryError e) {
                    LOG.warn("Out of Memory in server select", (Throwable)e);
                    this.closeCurrentConnection(key, e);
                    Server.this.connectionManager.closeIdle(true);
                    try {
                        Thread.sleep(60000L);
                    }
                    catch (Exception exception) {}
                }
                catch (Exception e) {
                    this.closeCurrentConnection(key, e);
                }
            }
            LOG.info("Stopping " + Thread.currentThread().getName());
            Listener listener = this;
            synchronized (listener) {
                try {
                    this.acceptChannel.close();
                    this.selector.close();
                }
                catch (IOException iOException) {
                    // empty catch block
                }
                this.selector = null;
                this.acceptChannel = null;
                Server.this.connectionManager.stopIdleScan();
                Server.this.connectionManager.closeAll();
            }
        }

        private void closeCurrentConnection(SelectionKey key, Throwable e) {
            Connection c;
            if (key != null && (c = (Connection)key.attachment()) != null) {
                Server.this.closeConnection(c);
                Object var3_3 = null;
            }
        }

        InetSocketAddress getAddress() {
            return (InetSocketAddress)this.acceptChannel.socket().getLocalSocketAddress();
        }

        void doAccept(SelectionKey key) throws InterruptedException, IOException, OutOfMemoryError {
            SocketChannel channel;
            ServerSocketChannel server = (ServerSocketChannel)key.channel();
            while ((channel = server.accept()) != null) {
                channel.configureBlocking(false);
                channel.socket().setTcpNoDelay(Server.this.tcpNoDelay);
                channel.socket().setKeepAlive(true);
                Reader reader = this.getReader();
                Connection c = Server.this.connectionManager.register(channel, this.listenPort, this.isOnAuxiliaryPort);
                if (c == null) {
                    if (channel.isOpen()) {
                        IOUtils.cleanupWithLogger(LOG, channel);
                    }
                    Server.this.connectionManager.droppedConnections.getAndIncrement();
                    continue;
                }
                key.attach(c);
                reader.addConnection(c);
            }
        }

        void doRead(SelectionKey key) throws InterruptedException {
            int count;
            Connection c = (Connection)key.attachment();
            if (c == null) {
                return;
            }
            c.setLastContact(Time.now());
            try {
                count = c.readAndProcess();
            }
            catch (InterruptedException ieo) {
                LOG.info(Thread.currentThread().getName() + ": readAndProcess caught InterruptedException", (Throwable)ieo);
                throw ieo;
            }
            catch (Exception e) {
                LOG.info(Thread.currentThread().getName() + ": readAndProcess from client " + c + " threw exception [" + e + "]", (Throwable)e);
                count = -1;
            }
            if (count < 0 || c.shouldClose()) {
                Server.this.closeConnection(c);
                c = null;
            } else {
                c.setLastContact(Time.now());
            }
        }

        synchronized void doStop() {
            if (this.selector != null) {
                this.selector.wakeup();
                Thread.yield();
            }
            if (this.acceptChannel != null) {
                try {
                    this.acceptChannel.socket().close();
                }
                catch (IOException e) {
                    LOG.info(Thread.currentThread().getName() + ":Exception in closing listener socket. " + e);
                }
            }
            for (Reader r : this.readers) {
                r.shutdown();
            }
        }

        synchronized Selector getSelector() {
            return this.selector;
        }

        Reader getReader() {
            this.currentReader = (this.currentReader + 1) % this.readers.length;
            return this.readers[this.currentReader];
        }

        private class Reader
        extends Thread {
            private final BlockingQueue<Connection> pendingConnections;
            private final Selector readSelector;

            Reader(String name) throws IOException {
                super(name);
                this.pendingConnections = new LinkedBlockingQueue<Connection>(Server.this.readerPendingConnectionQueue);
                this.readSelector = Selector.open();
            }

            @Override
            public void run() {
                LOG.info("Starting " + Thread.currentThread().getName());
                try {
                    this.doRunLoop();
                }
                finally {
                    try {
                        this.readSelector.close();
                    }
                    catch (IOException ioe) {
                        LOG.error("Error closing read selector in " + Thread.currentThread().getName(), (Throwable)ioe);
                    }
                }
            }

            private synchronized void doRunLoop() {
                while (Server.this.running) {
                    SelectionKey key = null;
                    try {
                        int size;
                        for (int i = size = this.pendingConnections.size(); i > 0; --i) {
                            Connection conn = this.pendingConnections.take();
                            conn.channel.register(this.readSelector, 1, conn);
                        }
                        this.readSelector.select();
                        Iterator<SelectionKey> iter = this.readSelector.selectedKeys().iterator();
                        while (iter.hasNext()) {
                            key = iter.next();
                            iter.remove();
                            try {
                                if (key.isReadable()) {
                                    Listener.this.doRead(key);
                                }
                            }
                            catch (CancelledKeyException cke) {
                                LOG.info(Thread.currentThread().getName() + ": connection aborted from " + key.attachment());
                            }
                            key = null;
                        }
                    }
                    catch (InterruptedException e) {
                        if (!Server.this.running) continue;
                        LOG.info(Thread.currentThread().getName() + " unexpectedly interrupted", (Throwable)e);
                    }
                    catch (IOException ex) {
                        LOG.error("Error in Reader", (Throwable)ex);
                    }
                    catch (Throwable re) {
                        LOG.error("Bug in read selector!", re);
                        ExitUtil.terminate(1, "Bug in read selector!");
                    }
                }
            }

            public void addConnection(Connection conn) throws InterruptedException {
                this.pendingConnections.put(conn);
                this.readSelector.wakeup();
            }

            void shutdown() {
                assert (!Server.this.running);
                this.readSelector.wakeup();
                try {
                    super.interrupt();
                    super.join();
                }
                catch (InterruptedException ie) {
                    Thread.currentThread().interrupt();
                }
            }
        }
    }

    private class RpcCall
    extends Call {
        final Connection connection;
        final Writable rpcRequest;
        ByteBuffer rpcResponse;
        private ResponseParams responseParams;
        private Writable rv;

        RpcCall(RpcCall call) {
            super(call);
            this.connection = call.connection;
            this.rpcRequest = call.rpcRequest;
            this.rv = call.rv;
            this.responseParams = call.responseParams;
        }

        RpcCall(Connection connection, int id) {
            this(connection, id, -1);
        }

        RpcCall(Connection connection, int id, int retryCount) {
            this(connection, id, retryCount, null, RPC.RpcKind.RPC_BUILTIN, RpcConstants.DUMMY_CLIENT_ID, null, null);
        }

        RpcCall(Connection connection, int id, int retryCount, Writable param, RPC.RpcKind kind, byte[] clientId, TraceScope traceScope, CallerContext context) {
            super(id, retryCount, kind, clientId, traceScope, context);
            this.connection = connection;
            this.rpcRequest = param;
        }

        @Override
        boolean isOpen() {
            return this.connection.channel.isOpen();
        }

        void setResponseFields(Writable returnValue, ResponseParams responseParams) {
            this.rv = returnValue;
            this.responseParams = responseParams;
        }

        @Override
        public String getProtocol() {
            return "rpc";
        }

        @Override
        public UserGroupInformation getRemoteUser() {
            return this.connection.user;
        }

        @Override
        public InetAddress getHostInetAddress() {
            return this.connection.getHostInetAddress();
        }

        @Override
        public Void run() throws Exception {
            if (!this.connection.channel.isOpen()) {
                LOG.info(Thread.currentThread().getName() + ": skipped " + this);
                return null;
            }
            long startNanos = Time.monotonicNowNanos();
            Writable value = null;
            ResponseParams responseParams = new ResponseParams();
            try {
                value = Server.this.call(this.rpcKind, this.connection.protocolName, this.rpcRequest, this.timestampNanos);
            }
            catch (Throwable e) {
                this.populateResponseParamsOnError(e, responseParams);
            }
            if (!this.isResponseDeferred()) {
                long deltaNanos = Time.monotonicNowNanos() - startNanos;
                ProcessingDetails details = this.getProcessingDetails();
                details.set(ProcessingDetails.Timing.PROCESSING, deltaNanos, TimeUnit.NANOSECONDS);
                deltaNanos -= details.get(ProcessingDetails.Timing.LOCKWAIT, TimeUnit.NANOSECONDS);
                deltaNanos -= details.get(ProcessingDetails.Timing.LOCKSHARED, TimeUnit.NANOSECONDS);
                details.set(ProcessingDetails.Timing.LOCKFREE, deltaNanos -= details.get(ProcessingDetails.Timing.LOCKEXCLUSIVE, TimeUnit.NANOSECONDS), TimeUnit.NANOSECONDS);
                startNanos = Time.monotonicNowNanos();
                this.setResponseFields(value, responseParams);
                this.sendResponse();
                deltaNanos = Time.monotonicNowNanos() - startNanos;
                details.set(ProcessingDetails.Timing.RESPONSE, deltaNanos, TimeUnit.NANOSECONDS);
            } else if (LOG.isDebugEnabled()) {
                LOG.debug("Deferring response for callId: " + this.callId);
            }
            return null;
        }

        private void populateResponseParamsOnError(Throwable t, ResponseParams responseParams) {
            if (t instanceof UndeclaredThrowableException) {
                t = t.getCause();
            }
            Server.this.logException(LOG, t, this);
            if (t instanceof RpcServerException) {
                RpcServerException rse = (RpcServerException)t;
                responseParams.returnStatus = rse.getRpcStatusProto();
                responseParams.detailedErr = rse.getRpcErrorCodeProto();
            } else {
                responseParams.returnStatus = RpcHeaderProtos.RpcResponseHeaderProto.RpcStatusProto.ERROR;
                responseParams.detailedErr = RpcHeaderProtos.RpcResponseHeaderProto.RpcErrorCodeProto.ERROR_APPLICATION;
            }
            responseParams.errorClass = t.getClass().getName();
            responseParams.error = StringUtils.stringifyException(t);
            String exceptionHdr = responseParams.errorClass + ": ";
            if (responseParams.error.startsWith(exceptionHdr)) {
                responseParams.error = responseParams.error.substring(exceptionHdr.length());
            }
        }

        void setResponse(ByteBuffer response) throws IOException {
            this.rpcResponse = response;
        }

        @Override
        void doResponse(Throwable t, RpcHeaderProtos.RpcResponseHeaderProto.RpcStatusProto status) throws IOException {
            RpcCall call = this;
            if (t != null) {
                if (status == null) {
                    status = RpcHeaderProtos.RpcResponseHeaderProto.RpcStatusProto.FATAL;
                }
                call = new RpcCall(this);
                Server.this.setupResponse(call, status, RpcHeaderProtos.RpcResponseHeaderProto.RpcErrorCodeProto.ERROR_RPC_SERVER, null, t.getClass().getName(), StringUtils.stringifyException(t));
            } else {
                Server.this.setupResponse(call, call.responseParams.returnStatus, call.responseParams.detailedErr, call.rv, call.responseParams.errorClass, call.responseParams.error);
            }
            this.connection.sendResponse(call);
        }

        private void sendDeferedResponse() {
            try {
                this.connection.sendResponse(this);
            }
            catch (Exception e) {
                LOG.error("Failed to send deferred response. ThreadName=" + Thread.currentThread().getName() + ", CallId=" + this.callId + ", hostname=" + this.getHostAddress());
            }
        }

        @Override
        public void setDeferredResponse(Writable response) {
            if (this.connection.getServer().running) {
                try {
                    Server.this.setupResponse(this, RpcHeaderProtos.RpcResponseHeaderProto.RpcStatusProto.SUCCESS, null, response, null, null);
                }
                catch (IOException e) {
                    LOG.error("Failed to setup deferred successful response. ThreadName=" + Thread.currentThread().getName() + ", Call=" + this);
                    return;
                }
                this.sendDeferedResponse();
            }
        }

        @Override
        public void setDeferredError(Throwable t) {
            if (this.connection.getServer().running) {
                if (t == null) {
                    t = new IOException("User code indicated an error without an exception");
                }
                try {
                    ResponseParams responseParams = new ResponseParams();
                    this.populateResponseParamsOnError(t, responseParams);
                    Server.this.setupResponse(this, responseParams.returnStatus, responseParams.detailedErr, null, responseParams.errorClass, responseParams.error);
                }
                catch (IOException e) {
                    LOG.error("Failed to setup deferred error response. ThreadName=" + Thread.currentThread().getName() + ", Call=" + this);
                }
                this.sendDeferedResponse();
            }
        }

        @Override
        public String toString() {
            return super.toString() + " " + this.rpcRequest + " from " + this.connection;
        }

        private class ResponseParams {
            String errorClass = null;
            String error = null;
            RpcHeaderProtos.RpcResponseHeaderProto.RpcErrorCodeProto detailedErr = null;
            RpcHeaderProtos.RpcResponseHeaderProto.RpcStatusProto returnStatus = RpcHeaderProtos.RpcResponseHeaderProto.RpcStatusProto.SUCCESS;

            private ResponseParams() {
            }
        }
    }

    public static class Call
    implements Schedulable,
    PrivilegedExceptionAction<Void> {
        private final ProcessingDetails processingDetails = new ProcessingDetails(TimeUnit.NANOSECONDS);
        private volatile String detailedMetricsName = "";
        final int callId;
        final int retryCount;
        long timestampNanos;
        long responseTimestampNanos;
        private AtomicInteger responseWaitCount = new AtomicInteger(1);
        final RPC.RpcKind rpcKind;
        final byte[] clientId;
        private final TraceScope traceScope;
        private final CallerContext callerContext;
        private boolean deferredResponse = false;
        private int priorityLevel;
        private long clientStateId;
        private boolean isCallCoordinated;

        Call() {
            this(-2, -1, RPC.RpcKind.RPC_BUILTIN, RpcConstants.DUMMY_CLIENT_ID);
        }

        Call(Call call) {
            this(call.callId, call.retryCount, call.rpcKind, call.clientId, call.traceScope, call.callerContext);
        }

        Call(int id, int retryCount, RPC.RpcKind kind, byte[] clientId) {
            this(id, retryCount, kind, clientId, null, null);
        }

        @VisibleForTesting
        public Call(int id, int retryCount, Void ignore1, Void ignore2, RPC.RpcKind kind, byte[] clientId) {
            this(id, retryCount, kind, clientId, null, null);
        }

        Call(int id, int retryCount, RPC.RpcKind kind, byte[] clientId, TraceScope traceScope, CallerContext callerContext) {
            this.callId = id;
            this.retryCount = retryCount;
            this.responseTimestampNanos = this.timestampNanos = Time.monotonicNowNanos();
            this.rpcKind = kind;
            this.clientId = clientId;
            this.traceScope = traceScope;
            this.callerContext = callerContext;
            this.clientStateId = Long.MIN_VALUE;
            this.isCallCoordinated = false;
        }

        boolean isOpen() {
            return true;
        }

        String getDetailedMetricsName() {
            return this.detailedMetricsName;
        }

        void setDetailedMetricsName(String name) {
            this.detailedMetricsName = name;
        }

        public ProcessingDetails getProcessingDetails() {
            return this.processingDetails;
        }

        public String toString() {
            return "Call#" + this.callId + " Retry#" + this.retryCount;
        }

        @Override
        public Void run() throws Exception {
            return null;
        }

        public UserGroupInformation getRemoteUser() {
            return null;
        }

        public InetAddress getHostInetAddress() {
            return null;
        }

        public String getHostAddress() {
            InetAddress addr = this.getHostInetAddress();
            return addr != null ? addr.getHostAddress() : null;
        }

        public String getProtocol() {
            return null;
        }

        @InterfaceStability.Unstable
        @InterfaceAudience.LimitedPrivate(value={"HDFS"})
        public final void postponeResponse() {
            int count = this.responseWaitCount.incrementAndGet();
            assert (count > 0) : "response has already been sent";
        }

        @InterfaceStability.Unstable
        @InterfaceAudience.LimitedPrivate(value={"HDFS"})
        public final void sendResponse() throws IOException {
            int count = this.responseWaitCount.decrementAndGet();
            assert (count >= 0) : "response has already been sent";
            if (count == 0) {
                this.doResponse(null);
            }
        }

        @InterfaceStability.Unstable
        @InterfaceAudience.LimitedPrivate(value={"HDFS"})
        public final void abortResponse(Throwable t) throws IOException {
            if (this.responseWaitCount.getAndSet(-1) > 0) {
                this.doResponse(t);
            }
        }

        void doResponse(Throwable t) throws IOException {
            this.doResponse(t, RpcHeaderProtos.RpcResponseHeaderProto.RpcStatusProto.FATAL);
        }

        void doResponse(Throwable t, RpcHeaderProtos.RpcResponseHeaderProto.RpcStatusProto proto) throws IOException {
        }

        @Override
        public UserGroupInformation getUserGroupInformation() {
            return this.getRemoteUser();
        }

        @Override
        public int getPriorityLevel() {
            return this.priorityLevel;
        }

        public void setPriorityLevel(int priorityLevel) {
            this.priorityLevel = priorityLevel;
        }

        public long getClientStateId() {
            return this.clientStateId;
        }

        public void setClientStateId(long stateId) {
            this.clientStateId = stateId;
        }

        public void markCallCoordinated(boolean flag) {
            this.isCallCoordinated = flag;
        }

        public boolean isCallCoordinated() {
            return this.isCallCoordinated;
        }

        @InterfaceStability.Unstable
        public void deferResponse() {
            this.deferredResponse = true;
        }

        @InterfaceStability.Unstable
        public boolean isResponseDeferred() {
            return this.deferredResponse;
        }

        public void setDeferredResponse(Writable response) {
        }

        public void setDeferredError(Throwable t) {
        }
    }

    static class RpcKindMapValue {
        final Class<? extends Writable> rpcRequestWrapperClass;
        final RPC.RpcInvoker rpcInvoker;

        RpcKindMapValue(Class<? extends Writable> rpcRequestWrapperClass, RPC.RpcInvoker rpcInvoker) {
            this.rpcInvoker = rpcInvoker;
            this.rpcRequestWrapperClass = rpcRequestWrapperClass;
        }
    }

    static class ExceptionsHandler {
        private volatile Set<String> terseExceptions = new HashSet<String>();
        private volatile Set<String> suppressedExceptions = new HashSet<String>();

        ExceptionsHandler() {
        }

        void addTerseLoggingExceptions(Class<?> ... exceptionClass) {
            this.terseExceptions = ExceptionsHandler.addExceptions(this.terseExceptions, exceptionClass);
        }

        void addSuppressedLoggingExceptions(Class<?> ... exceptionClass) {
            this.suppressedExceptions = ExceptionsHandler.addExceptions(this.suppressedExceptions, exceptionClass);
        }

        boolean isTerseLog(Class<?> t) {
            return this.terseExceptions.contains(t.toString());
        }

        boolean isSuppressedLog(Class<?> t) {
            return this.suppressedExceptions.contains(t.toString());
        }

        private static Set<String> addExceptions(Set<String> exceptionsSet, Class<?>[] exceptionClass) {
            HashSet<String> newSet = new HashSet<String>(exceptionsSet);
            for (Class<?> name : exceptionClass) {
                newSet.add(name.toString());
            }
            return Collections.unmodifiableSet(newSet);
        }
    }
}

