/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.hdfs.server.federation.router;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.io.EOFException;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.lang.reflect.Constructor;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.net.ConnectException;
import java.net.InetSocketAddress;
import java.util.AbstractQueue;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.NameNodeProxiesClient;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.SnapshotException;
import org.apache.hadoop.hdfs.server.federation.resolver.ActiveNamenodeResolver;
import org.apache.hadoop.hdfs.server.federation.resolver.FederationNamenodeContext;
import org.apache.hadoop.hdfs.server.federation.resolver.FederationNamenodeServiceState;
import org.apache.hadoop.hdfs.server.federation.resolver.RemoteLocation;
import org.apache.hadoop.hdfs.server.federation.router.ConnectionContext;
import org.apache.hadoop.hdfs.server.federation.router.ConnectionManager;
import org.apache.hadoop.hdfs.server.federation.router.ConnectionNullException;
import org.apache.hadoop.hdfs.server.federation.router.NoNamenodesAvailableException;
import org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys;
import org.apache.hadoop.hdfs.server.federation.router.RemoteLocationContext;
import org.apache.hadoop.hdfs.server.federation.router.RemoteMethod;
import org.apache.hadoop.hdfs.server.federation.router.RemoteResult;
import org.apache.hadoop.hdfs.server.federation.router.Router;
import org.apache.hadoop.hdfs.server.federation.router.RouterRpcMonitor;
import org.apache.hadoop.hdfs.server.federation.router.RouterRpcServer;
import org.apache.hadoop.hdfs.server.federation.router.SubClusterTimeoutException;
import org.apache.hadoop.io.retry.RetryPolicies;
import org.apache.hadoop.io.retry.RetryPolicy;
import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.ipc.RetriableException;
import org.apache.hadoop.ipc.StandbyException;
import org.apache.hadoop.net.ConnectTimeoutException;
import org.apache.hadoop.security.UserGroupInformation;
import org.eclipse.jetty.util.ajax.JSON;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class RouterRpcClient {
    private static final Logger LOG = LoggerFactory.getLogger(RouterRpcClient.class);
    private final Router router;
    private final ActiveNamenodeResolver namenodeResolver;
    private final ConnectionManager connectionManager;
    private final ThreadPoolExecutor executorService;
    private final RetryPolicy retryPolicy;
    private final RouterRpcMonitor rpcMonitor;
    private static final Pattern STACK_TRACE_PATTERN = Pattern.compile("\\tat (.*)\\.(.*)\\((.*):(\\d*)\\)");

    public RouterRpcClient(Configuration conf, Router router, ActiveNamenodeResolver resolver, RouterRpcMonitor monitor) {
        this.router = router;
        this.namenodeResolver = resolver;
        Configuration clientConf = this.getClientConfiguration(conf);
        this.connectionManager = new ConnectionManager(clientConf);
        this.connectionManager.start();
        int numThreads = conf.getInt("dfs.federation.router.client.thread-size", 32);
        ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("RPC Router Client-%d").build();
        AbstractQueue workQueue = conf.getBoolean("dfs.federation.router.client.reject.overload", false) ? new ArrayBlockingQueue(numThreads) : new LinkedBlockingQueue();
        this.executorService = new ThreadPoolExecutor(numThreads, numThreads, 0L, TimeUnit.MILLISECONDS, (BlockingQueue<Runnable>)((Object)workQueue), threadFactory);
        this.rpcMonitor = monitor;
        int maxFailoverAttempts = conf.getInt("dfs.client.failover.max.attempts", 15);
        int maxRetryAttempts = conf.getInt("dfs.federation.router.client.retry.max.attempts", 3);
        int failoverSleepBaseMillis = conf.getInt("dfs.client.failover.sleep.base.millis", 500);
        int failoverSleepMaxMillis = conf.getInt("dfs.client.failover.sleep.max.millis", 15000);
        this.retryPolicy = RetryPolicies.failoverOnNetworkException((RetryPolicy)RetryPolicies.TRY_ONCE_THEN_FAIL, (int)maxFailoverAttempts, (int)maxRetryAttempts, (long)failoverSleepBaseMillis, (long)failoverSleepMaxMillis);
    }

    private Configuration getClientConfiguration(Configuration conf) {
        long connectTimeOut;
        Configuration clientConf = new Configuration(conf);
        int maxRetries = conf.getInt("dfs.federation.router.connect.max.retries.on.timeouts", 0);
        if (maxRetries >= 0) {
            clientConf.setInt("ipc.client.connect.max.retries.on.timeouts", maxRetries);
        }
        if ((connectTimeOut = conf.getTimeDuration("dfs.federation.router.connect.timeout", RBFConfigKeys.DFS_ROUTER_CLIENT_CONNECT_TIMEOUT_DEFAULT, TimeUnit.MILLISECONDS)) >= 0L) {
            clientConf.setLong("ipc.client.connect.timeout", connectTimeOut);
        }
        return clientConf;
    }

    public ActiveNamenodeResolver getNamenodeResolver() {
        return this.namenodeResolver;
    }

    public void shutdown() {
        if (this.connectionManager != null) {
            this.connectionManager.close();
        }
        if (this.executorService != null) {
            this.executorService.shutdownNow();
        }
    }

    public int getNumConnections() {
        return this.connectionManager.getNumConnections();
    }

    public int getNumActiveConnections() {
        return this.connectionManager.getNumActiveConnections();
    }

    public int getNumConnectionPools() {
        return this.connectionManager.getNumConnectionPools();
    }

    public int getNumCreatingConnections() {
        return this.connectionManager.getNumCreatingConnections();
    }

    public String getJSON() {
        return this.connectionManager.getJSON();
    }

    public String getAsyncCallerPoolJson() {
        LinkedHashMap<String, Integer> info = new LinkedHashMap<String, Integer>();
        info.put("active", this.executorService.getActiveCount());
        info.put("total", this.executorService.getPoolSize());
        info.put("max", this.executorService.getMaximumPoolSize());
        return JSON.toString(info);
    }

    private ConnectionContext getConnection(UserGroupInformation ugi, String nsId, String rpcAddress, Class<?> proto) throws IOException {
        ConnectionContext connection = null;
        try {
            UserGroupInformation connUGI = ugi;
            if (UserGroupInformation.isSecurityEnabled()) {
                UserGroupInformation routerUser = UserGroupInformation.getLoginUser();
                connUGI = UserGroupInformation.createProxyUser((String)ugi.getUserName(), (UserGroupInformation)routerUser);
            }
            connection = this.connectionManager.getConnection(connUGI, rpcAddress, proto);
            LOG.debug("User {} NN {} is using connection {}", new Object[]{ugi.getUserName(), rpcAddress, connection});
        }
        catch (Exception ex) {
            LOG.error("Cannot open NN client to address: {}", (Object)rpcAddress, (Object)ex);
        }
        if (connection == null) {
            throw new ConnectionNullException("Cannot get a connection to " + rpcAddress);
        }
        return connection;
    }

    private static IOException toIOException(Exception e) {
        if (e instanceof RemoteException) {
            return ((RemoteException)e).unwrapRemoteException();
        }
        if (e instanceof IOException) {
            return (IOException)e;
        }
        return new IOException(e);
    }

    private RetryPolicy.RetryAction.RetryDecision shouldRetry(IOException ioe, int retryCount, String nsId) throws IOException {
        if (this.isClusterUnAvailable(nsId)) {
            if (retryCount == 0) {
                return RetryPolicy.RetryAction.RetryDecision.RETRY;
            }
            throw new NoNamenodesAvailableException(nsId, ioe);
        }
        try {
            RetryPolicy.RetryAction a = this.retryPolicy.shouldRetry((Exception)ioe, retryCount, 0, true);
            return a.action;
        }
        catch (Exception ex) {
            LOG.error("Re-throwing API exception, no more retries", (Throwable)ex);
            throw RouterRpcClient.toIOException(ex);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     * WARNING - void declaration
     */
    private Object invokeMethod(UserGroupInformation ugi, List<? extends FederationNamenodeContext> namenodes, Class<?> protocol, Method method, Object ... params) throws ConnectException, StandbyException, IOException {
        void var10_12;
        if (namenodes == null || namenodes.isEmpty()) {
            throw new IOException("No namenodes to invoke " + method.getName() + " with params " + Arrays.deepToString(params) + " from " + this.router.getRouterId());
        }
        Object ret = null;
        if (this.rpcMonitor != null) {
            this.rpcMonitor.proxyOp();
        }
        boolean failover = false;
        LinkedHashMap<FederationNamenodeContext, IOException> ioes = new LinkedHashMap<FederationNamenodeContext, IOException>();
        for (FederationNamenodeContext federationNamenodeContext : namenodes) {
            ConnectionContext connection = null;
            String nsId = federationNamenodeContext.getNameserviceId();
            String rpcAddress = federationNamenodeContext.getRpcAddress();
            try {
                Object address;
                connection = this.getConnection(ugi, nsId, rpcAddress, protocol);
                NameNodeProxiesClient.ProxyAndInfo<?> client = connection.getClient();
                Object proxy = client.getProxy();
                ret = this.invoke(nsId, 0, method, proxy, params);
                if (failover) {
                    address = client.getAddress();
                    this.namenodeResolver.updateActiveNamenode(nsId, (InetSocketAddress)address);
                }
                if (this.rpcMonitor != null) {
                    this.rpcMonitor.proxyOpComplete(true);
                }
                address = ret;
                return address;
            }
            catch (IOException ioe) {
                ioes.put(federationNamenodeContext, ioe);
                if (ioe instanceof StandbyException) {
                    if (this.rpcMonitor != null) {
                        this.rpcMonitor.proxyOpFailureStandby();
                    }
                    failover = true;
                    continue;
                }
                if (RouterRpcClient.isUnavailableException(ioe)) {
                    if (this.rpcMonitor != null) {
                        this.rpcMonitor.proxyOpFailureCommunicate();
                    }
                    failover = true;
                    continue;
                }
                if (ioe instanceof RemoteException) {
                    if (this.rpcMonitor != null) {
                        this.rpcMonitor.proxyOpComplete(true);
                    }
                    RemoteException re = (RemoteException)ioe;
                    ioe = re.unwrapRemoteException();
                    ioe = RouterRpcClient.getCleanException(ioe);
                    throw ioe;
                }
                if (ioe instanceof ConnectionNullException) {
                    if (this.rpcMonitor != null) {
                        this.rpcMonitor.proxyOpFailureCommunicate();
                    }
                    LOG.error("Get connection for {} {} error: {}", new Object[]{nsId, rpcAddress, ioe.getMessage()});
                    StandbyException se = new StandbyException(ioe.getMessage());
                    se.initCause((Throwable)ioe);
                    throw se;
                }
                if (ioe instanceof NoNamenodesAvailableException) {
                    if (this.rpcMonitor != null) {
                        this.rpcMonitor.proxyOpNoNamenodes();
                    }
                    LOG.error("Cannot get available namenode for {} {} error: {}", new Object[]{nsId, rpcAddress, ioe.getMessage()});
                    throw new RetriableException((Exception)ioe);
                }
                if (this.rpcMonitor != null) {
                    this.rpcMonitor.proxyOpFailureCommunicate();
                    this.rpcMonitor.proxyOpComplete(false);
                }
                throw ioe;
            }
            finally {
                if (connection == null) continue;
                connection.release();
            }
        }
        if (this.rpcMonitor != null) {
            this.rpcMonitor.proxyOpComplete(false);
        }
        String msg = "No namenode available to invoke " + method.getName() + " " + Arrays.deepToString(params) + " in " + namenodes + " from " + this.router.getRouterId();
        LOG.error(msg);
        boolean bl = false;
        for (Map.Entry entry : ioes.entrySet()) {
            FederationNamenodeContext namenode = (FederationNamenodeContext)entry.getKey();
            String nnKey = namenode.getNamenodeKey();
            String addr = namenode.getRpcAddress();
            IOException ioe = (IOException)entry.getValue();
            if (ioe instanceof StandbyException) {
                LOG.error("{} at {} is in Standby: {}", new Object[]{nnKey, addr, ioe.getMessage()});
                continue;
            }
            if (RouterRpcClient.isUnavailableException(ioe)) {
                ++var10_12;
                LOG.error("{} at {} cannot be reached: {}", new Object[]{nnKey, addr, ioe.getMessage()});
                continue;
            }
            LOG.error("{} at {} error: \"{}\"", new Object[]{nnKey, addr, ioe.getMessage()});
        }
        if (var10_12 == ioes.size()) {
            throw new ConnectException(msg);
        }
        throw new StandbyException(msg);
    }

    private Object invoke(String nsId, int retryCount, Method method, Object obj, Object ... params) throws IOException {
        try {
            return method.invoke(obj, params);
        }
        catch (IllegalAccessException e) {
            LOG.error("Unexpected exception while proxying API", (Throwable)e);
            return null;
        }
        catch (IllegalArgumentException e) {
            LOG.error("Unexpected exception while proxying API", (Throwable)e);
            return null;
        }
        catch (InvocationTargetException e) {
            Throwable cause = e.getCause();
            if (cause instanceof IOException) {
                IOException ioe = (IOException)cause;
                RetryPolicy.RetryAction.RetryDecision decision = this.shouldRetry(ioe, retryCount, nsId);
                if (decision == RetryPolicy.RetryAction.RetryDecision.RETRY) {
                    if (this.rpcMonitor != null) {
                        this.rpcMonitor.proxyOpRetries();
                    }
                    return this.invoke(nsId, ++retryCount, method, obj, params);
                }
                if (decision == RetryPolicy.RetryAction.RetryDecision.FAILOVER_AND_RETRY) {
                    if (ioe instanceof StandbyException) {
                        throw ioe;
                    }
                    if (RouterRpcClient.isUnavailableException(ioe)) {
                        throw ioe;
                    }
                    throw new StandbyException(ioe.getMessage());
                }
                throw ioe;
            }
            throw new IOException(e);
        }
    }

    public static boolean isUnavailableException(IOException ioe) {
        Throwable cause;
        if (ioe instanceof ConnectException || ioe instanceof ConnectTimeoutException || ioe instanceof EOFException || ioe instanceof StandbyException) {
            return true;
        }
        return ioe instanceof RetriableException && (cause = ioe.getCause()) instanceof NoNamenodesAvailableException;
    }

    private boolean isClusterUnAvailable(String nsId) throws IOException {
        List<? extends FederationNamenodeContext> nnState = this.namenodeResolver.getNamenodesForNameserviceId(nsId);
        if (nnState != null) {
            for (FederationNamenodeContext federationNamenodeContext : nnState) {
                if (federationNamenodeContext.getState() != FederationNamenodeServiceState.ACTIVE) continue;
                return false;
            }
        }
        return true;
    }

    private static IOException getCleanException(IOException ioe) {
        Object ret = null;
        String msg = ioe.getMessage();
        Throwable cause = ioe.getCause();
        StackTraceElement[] stackTrace = ioe.getStackTrace();
        int index = msg.indexOf("\n");
        if (index > 0) {
            String[] msgSplit = msg.split("\n");
            msg = msgSplit[0];
            LinkedList<StackTraceElement> elements = new LinkedList<StackTraceElement>();
            for (int i = 1; i < msgSplit.length; ++i) {
                String line = msgSplit[i];
                Matcher matcher = STACK_TRACE_PATTERN.matcher(line);
                if (!matcher.find()) continue;
                String declaringClass = matcher.group(1);
                String methodName = matcher.group(2);
                String fileName = matcher.group(3);
                int lineNumber = Integer.parseInt(matcher.group(4));
                StackTraceElement element = new StackTraceElement(declaringClass, methodName, fileName, lineNumber);
                elements.add(element);
            }
            stackTrace = elements.toArray(new StackTraceElement[elements.size()]);
        }
        if (ioe instanceof RemoteException) {
            RemoteException re = (RemoteException)ioe;
            ret = new RemoteException(re.getClassName(), msg);
        } else {
            Class<?> ioeClass = ioe.getClass();
            try {
                Constructor<?> constructor = ioeClass.getDeclaredConstructor(String.class);
                ret = (IOException)constructor.newInstance(msg);
            }
            catch (ReflectiveOperationException e) {
                LOG.error("Could not create exception {}", (Object)ioeClass.getSimpleName(), (Object)e);
                ret = ioe;
            }
        }
        if (ret != null) {
            ((Throwable)ret).initCause(cause);
            ((Throwable)ret).setStackTrace(stackTrace);
        }
        return ret;
    }

    public Object invokeSingle(ExtendedBlock block, RemoteMethod method) throws IOException {
        String bpId = block.getBlockPoolId();
        return this.invokeSingleBlockPool(bpId, method);
    }

    public Object invokeSingleBlockPool(String bpId, RemoteMethod method) throws IOException {
        String nsId = this.getNameserviceForBlockPoolId(bpId);
        return this.invokeSingle(nsId, method);
    }

    public Object invokeSingle(String nsId, RemoteMethod method) throws IOException {
        UserGroupInformation ugi = RouterRpcServer.getRemoteUser();
        List<? extends FederationNamenodeContext> nns = this.getNamenodesForNameservice(nsId);
        RemoteLocation loc = new RemoteLocation(nsId, "/", "/");
        Class<?> proto = method.getProtocol();
        Method m = method.getMethod();
        Object[] params = method.getParams(loc);
        return this.invokeMethod(ugi, nns, proto, m, params);
    }

    public <T> T invokeSingle(String nsId, RemoteMethod method, Class<T> clazz) throws IOException {
        Object ret = this.invokeSingle(nsId, method);
        return (T)ret;
    }

    public <T> T invokeSingle(ExtendedBlock extendedBlock, RemoteMethod method, Class<T> clazz) throws IOException {
        String nsId = this.getNameserviceForBlockPoolId(extendedBlock.getBlockPoolId());
        Object ret = this.invokeSingle(nsId, method);
        return (T)ret;
    }

    public <T> T invokeSingle(RemoteLocationContext location, RemoteMethod remoteMethod, Class<T> clazz) throws IOException {
        List<RemoteLocationContext> locations = Collections.singletonList(location);
        Object ret = this.invokeSequential(locations, remoteMethod);
        return (T)ret;
    }

    public Object invokeSequential(List<? extends RemoteLocationContext> locations, RemoteMethod remoteMethod) throws IOException {
        return this.invokeSequential(locations, remoteMethod, null, null);
    }

    public <T> T invokeSequential(List<? extends RemoteLocationContext> locations, RemoteMethod remoteMethod, Class<T> expectedResultClass, Object expectedResultValue) throws IOException {
        UserGroupInformation ugi = RouterRpcServer.getRemoteUser();
        Method m = remoteMethod.getMethod();
        ArrayList<IOException> thrownExceptions = new ArrayList<IOException>();
        Object firstResult = null;
        for (RemoteLocationContext remoteLocationContext : locations) {
            String ns = remoteLocationContext.getNameserviceId();
            List<? extends FederationNamenodeContext> namenodes = this.getNamenodesForNameservice(ns);
            try {
                Class<?> proto = remoteMethod.getProtocol();
                Object[] params = remoteMethod.getParams(remoteLocationContext);
                Object result = this.invokeMethod(ugi, namenodes, proto, m, params);
                if (RouterRpcClient.isExpectedClass(expectedResultClass, result) && RouterRpcClient.isExpectedValue(expectedResultValue, result)) {
                    Object ret = result;
                    return (T)ret;
                }
                if (firstResult != null) continue;
                firstResult = result;
            }
            catch (IOException ioe) {
                ioe = this.processException(ioe, remoteLocationContext);
                thrownExceptions.add(ioe);
            }
            catch (Exception e) {
                LOG.error("Unexpected exception {} proxying {} to {}", new Object[]{e.getClass(), m.getName(), ns, e});
                IOException ioe = new IOException("Unexpected exception proxying API " + e.getMessage(), e);
                thrownExceptions.add(ioe);
            }
        }
        if (!thrownExceptions.isEmpty()) {
            for (int i = 0; i < thrownExceptions.size(); ++i) {
                IOException iOException = (IOException)thrownExceptions.get(i);
                if (!RouterRpcClient.isUnavailableException(iOException)) continue;
                throw iOException;
            }
            throw (IOException)thrownExceptions.get(0);
        }
        Object ret = firstResult;
        return (T)ret;
    }

    private IOException processException(IOException ioe, RemoteLocationContext loc) {
        if (ioe instanceof RemoteException) {
            RemoteException re = (RemoteException)ioe;
            String newMsg = RouterRpcClient.processExceptionMsg(re.getMessage(), loc.getDest(), loc.getSrc());
            RemoteException newException = new RemoteException(re.getClassName(), newMsg);
            newException.setStackTrace(ioe.getStackTrace());
            return newException;
        }
        if (ioe instanceof FileNotFoundException) {
            String newMsg = RouterRpcClient.processExceptionMsg(ioe.getMessage(), loc.getDest(), loc.getSrc());
            FileNotFoundException newException = new FileNotFoundException(newMsg);
            newException.setStackTrace(ioe.getStackTrace());
            return newException;
        }
        if (ioe instanceof SnapshotException) {
            String newMsg = RouterRpcClient.processExceptionMsg(ioe.getMessage(), loc.getDest(), loc.getSrc());
            SnapshotException newException = new SnapshotException(newMsg);
            newException.setStackTrace(ioe.getStackTrace());
            return newException;
        }
        return ioe;
    }

    @VisibleForTesting
    static String processExceptionMsg(String msg, String dst, String src) {
        if (dst.equals(src) || !dst.startsWith("/") || !src.startsWith("/")) {
            return msg;
        }
        String newMsg = msg.replaceFirst(dst, src);
        int minLen = Math.min(dst.length(), src.length());
        for (int i = 0; newMsg.equals(msg) && i < minLen; ++i) {
            String dst1 = dst.substring(0, dst.length() - 1 - i);
            String src1 = src.substring(0, src.length() - 1 - i);
            newMsg = msg.replaceFirst(dst1, src1);
        }
        return newMsg;
    }

    private static boolean isExpectedClass(Class<?> expectedClass, Object clazz) {
        if (expectedClass == null) {
            return true;
        }
        if (clazz == null) {
            return false;
        }
        return expectedClass.isInstance(clazz);
    }

    private static boolean isExpectedValue(Object expectedValue, Object value) {
        if (expectedValue == null) {
            return true;
        }
        if (value == null) {
            return false;
        }
        return value.equals(expectedValue);
    }

    public <T extends RemoteLocationContext, R> boolean invokeAll(Collection<T> locations, RemoteMethod method) throws IOException {
        boolean anyResult = false;
        Map<T, Boolean> results = this.invokeConcurrent(locations, method, false, false, Boolean.class);
        for (Boolean value : results.values()) {
            boolean result = value;
            if (!result) continue;
            anyResult = true;
        }
        return anyResult;
    }

    public <T extends RemoteLocationContext, R> void invokeConcurrent(Collection<T> locations, RemoteMethod method) throws IOException {
        this.invokeConcurrent(locations, method, Void.TYPE);
    }

    public <T extends RemoteLocationContext, R> Map<T, R> invokeConcurrent(Collection<T> locations, RemoteMethod method, Class<R> clazz) throws IOException {
        return this.invokeConcurrent(locations, method, false, false, clazz);
    }

    public <T extends RemoteLocationContext, R> void invokeConcurrent(Collection<T> locations, RemoteMethod method, boolean requireResponse, boolean standby) throws IOException {
        this.invokeConcurrent(locations, method, requireResponse, standby, Void.TYPE);
    }

    public <T extends RemoteLocationContext, R> Map<T, R> invokeConcurrent(Collection<T> locations, RemoteMethod method, boolean requireResponse, boolean standby, Class<R> clazz) throws IOException {
        return this.invokeConcurrent(locations, method, requireResponse, standby, -1L, clazz);
    }

    public <T extends RemoteLocationContext, R> Map<T, R> invokeConcurrent(Collection<T> locations, RemoteMethod method, boolean requireResponse, boolean standby, long timeOutMs, Class<R> clazz) throws IOException {
        List<RemoteResult<T, R>> results = this.invokeConcurrent(locations, method, standby, timeOutMs, clazz);
        TreeMap<T, R> ret = new TreeMap<T, R>();
        ArrayList<IOException> thrownExceptions = new ArrayList<IOException>();
        IOException firstUnavailableException = null;
        for (RemoteResult<T, R> result : results) {
            if (result.hasException()) {
                IOException ioe = result.getException();
                thrownExceptions.add(ioe);
                if (RouterRpcClient.isUnavailableException(ioe)) {
                    firstUnavailableException = ioe;
                }
            }
            if (!result.hasResult()) continue;
            ret.put(result.getLocation(), result.getResult());
        }
        if (!thrownExceptions.isEmpty() && (requireResponse || ret.isEmpty())) {
            if (firstUnavailableException != null) {
                throw firstUnavailableException;
            }
            throw (IOException)thrownExceptions.get(0);
        }
        return ret;
    }

    public <T extends RemoteLocationContext, R> List<RemoteResult<T, R>> invokeConcurrent(Collection<T> locations, RemoteMethod method, boolean standby, long timeOutMs, Class<R> clazz) throws IOException {
        UserGroupInformation ugi = RouterRpcServer.getRemoteUser();
        Method m = method.getMethod();
        if (locations.isEmpty()) {
            throw new IOException("No remote locations available");
        }
        if (locations.size() == 1 && timeOutMs <= 0L) {
            RemoteLocationContext location = (RemoteLocationContext)locations.iterator().next();
            String ns = location.getNameserviceId();
            List<? extends FederationNamenodeContext> namenodes = this.getNamenodesForNameservice(ns);
            try {
                Class<?> proto = method.getProtocol();
                Object[] paramList = method.getParams(location);
                Object result = this.invokeMethod(ugi, namenodes, proto, m, paramList);
                RemoteResult<RemoteLocationContext, Object> remoteResult = new RemoteResult<RemoteLocationContext, Object>(location, result);
                return Collections.singletonList(remoteResult);
            }
            catch (IOException ioe) {
                throw this.processException(ioe, location);
            }
        }
        ArrayList<RemoteLocationContext> orderedLocations = new ArrayList<RemoteLocationContext>();
        ArrayList<Callable<Object>> callables = new ArrayList<Callable<Object>>();
        for (RemoteLocationContext location : locations) {
            String nsId = location.getNameserviceId();
            List<? extends FederationNamenodeContext> namenodes = this.getNamenodesForNameservice(nsId);
            Class<?> proto = method.getProtocol();
            Object[] paramList = method.getParams(location);
            if (standby) {
                for (FederationNamenodeContext federationNamenodeContext : namenodes) {
                    String nnId = federationNamenodeContext.getNamenodeId();
                    List<FederationNamenodeContext> nnList = Collections.singletonList(federationNamenodeContext);
                    RemoteLocationContext nnLocation = location;
                    if (location instanceof RemoteLocation) {
                        nnLocation = new RemoteLocation(nsId, nnId, location.getDest());
                    }
                    orderedLocations.add(nnLocation);
                    callables.add(() -> this.invokeMethod(ugi, nnList, proto, m, paramList));
                }
                continue;
            }
            orderedLocations.add(location);
            callables.add(() -> this.invokeMethod(ugi, namenodes, proto, m, paramList));
        }
        if (this.rpcMonitor != null) {
            this.rpcMonitor.proxyOp();
        }
        try {
            List futures = null;
            futures = timeOutMs > 0L ? this.executorService.invokeAll(callables, timeOutMs, TimeUnit.MILLISECONDS) : this.executorService.invokeAll(callables);
            ArrayList<RemoteResult<T, R>> results = new ArrayList<RemoteResult<T, R>>();
            for (int i = 0; i < futures.size(); ++i) {
                RemoteLocationContext location = (RemoteLocationContext)orderedLocations.get(i);
                try {
                    Future future = futures.get(i);
                    Object result = future.get();
                    results.add(new RemoteResult(location, result));
                    continue;
                }
                catch (CancellationException ce) {
                    RemoteLocationContext loc = (RemoteLocationContext)orderedLocations.get(i);
                    String msg = "Invocation to \"" + loc + "\" for \"" + method.getMethodName() + "\" timed out";
                    LOG.error(msg);
                    SubClusterTimeoutException subClusterTimeoutException = new SubClusterTimeoutException(msg);
                    results.add(new RemoteResult(location, subClusterTimeoutException));
                    continue;
                }
                catch (ExecutionException ex) {
                    Throwable cause = ex.getCause();
                    LOG.debug("Canot execute {} in {}: {}", new Object[]{m.getName(), location, cause.getMessage()});
                    IOException ioe = null;
                    ioe = cause instanceof IOException ? (IOException)cause : new IOException("Unhandled exception while proxying API " + m.getName() + ": " + cause.getMessage(), cause);
                    results.add(new RemoteResult(location, ioe));
                }
            }
            return results;
        }
        catch (RejectedExecutionException e) {
            if (this.rpcMonitor != null) {
                this.rpcMonitor.proxyOpFailureClientOverloaded();
            }
            int active = this.executorService.getActiveCount();
            int total = this.executorService.getMaximumPoolSize();
            String msg = "Not enough client threads " + active + "/" + total;
            LOG.error(msg);
            throw new StandbyException("Router " + this.router.getRouterId() + " is overloaded: " + msg);
        }
        catch (InterruptedException ex) {
            LOG.error("Unexpected error while invoking API: {}", (Object)ex.getMessage());
            throw new IOException("Unexpected error while invoking API " + ex.getMessage(), ex);
        }
    }

    private List<? extends FederationNamenodeContext> getNamenodesForNameservice(String nsId) throws IOException {
        List<? extends FederationNamenodeContext> namenodes = this.namenodeResolver.getNamenodesForNameserviceId(nsId);
        if (namenodes == null || namenodes.isEmpty()) {
            throw new IOException("Cannot locate a registered namenode for " + nsId + " from " + this.router.getRouterId());
        }
        return namenodes;
    }

    private List<? extends FederationNamenodeContext> getNamenodesForBlockPoolId(String bpId) throws IOException {
        List<? extends FederationNamenodeContext> namenodes = this.namenodeResolver.getNamenodesForBlockPoolId(bpId);
        if (namenodes == null || namenodes.isEmpty()) {
            throw new IOException("Cannot locate a registered namenode for " + bpId + " from " + this.router.getRouterId());
        }
        return namenodes;
    }

    private String getNameserviceForBlockPoolId(String bpId) throws IOException {
        List<? extends FederationNamenodeContext> namenodes = this.getNamenodesForBlockPoolId(bpId);
        FederationNamenodeContext namenode = namenodes.get(0);
        return namenode.getNameserviceId();
    }
}

