/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kudu.client;

import com.stumbleupon.async.Callback;
import com.stumbleupon.async.Deferred;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.security.cert.CertificateException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.Random;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import javax.annotation.concurrent.GuardedBy;
import org.apache.kudu.Common;
import org.apache.kudu.Schema;
import org.apache.kudu.client.AlterTableOptions;
import org.apache.kudu.client.AlterTableRequest;
import org.apache.kudu.client.AlterTableResponse;
import org.apache.kudu.client.AsyncKuduScanner;
import org.apache.kudu.client.AsyncKuduSession;
import org.apache.kudu.client.AuthnTokenReacquirer;
import org.apache.kudu.client.AuthzTokenCache;
import org.apache.kudu.client.Bytes;
import org.apache.kudu.client.CallResponse;
import org.apache.kudu.client.ConnectToCluster;
import org.apache.kudu.client.ConnectToClusterResponse;
import org.apache.kudu.client.Connection;
import org.apache.kudu.client.ConnectionCache;
import org.apache.kudu.client.CreateTableOptions;
import org.apache.kudu.client.CreateTableRequest;
import org.apache.kudu.client.CreateTableResponse;
import org.apache.kudu.client.DeleteTableRequest;
import org.apache.kudu.client.DeleteTableResponse;
import org.apache.kudu.client.ExternalConsistencyMode;
import org.apache.kudu.client.GetTableLocationsRequest;
import org.apache.kudu.client.GetTableSchemaRequest;
import org.apache.kudu.client.HiveMetastoreConfig;
import org.apache.kudu.client.HostAndPort;
import org.apache.kudu.client.IsAlterTableDoneRequest;
import org.apache.kudu.client.IsAlterTableDoneResponse;
import org.apache.kudu.client.IsCreateTableDoneRequest;
import org.apache.kudu.client.IsCreateTableDoneResponse;
import org.apache.kudu.client.KeyRange;
import org.apache.kudu.client.KuduClient;
import org.apache.kudu.client.KuduException;
import org.apache.kudu.client.KuduRpc;
import org.apache.kudu.client.KuduTable;
import org.apache.kudu.client.ListTablesRequest;
import org.apache.kudu.client.ListTablesResponse;
import org.apache.kudu.client.ListTabletServersRequest;
import org.apache.kudu.client.ListTabletServersResponse;
import org.apache.kudu.client.LocatedTablet;
import org.apache.kudu.client.NoLeaderFoundException;
import org.apache.kudu.client.NonCoveredRangeException;
import org.apache.kudu.client.NonRecoverableException;
import org.apache.kudu.client.OperationResponse;
import org.apache.kudu.client.ProtobufHelper;
import org.apache.kudu.client.RecoverableException;
import org.apache.kudu.client.RemoteTablet;
import org.apache.kudu.client.RequestTracker;
import org.apache.kudu.client.RpcProxy;
import org.apache.kudu.client.RpcTraceFrame;
import org.apache.kudu.client.SecurityContext;
import org.apache.kudu.client.ServerInfo;
import org.apache.kudu.client.SplitKeyRangeRequest;
import org.apache.kudu.client.SplitKeyRangeResponse;
import org.apache.kudu.client.Statistics;
import org.apache.kudu.client.Status;
import org.apache.kudu.client.TableLocationsCache;
import org.apache.kudu.client.TimeoutTracker;
import org.apache.kudu.master.Master;
import org.apache.kudu.security.Token;
import org.apache.kudu.shaded.com.google.common.base.Joiner;
import org.apache.kudu.shaded.com.google.common.base.Preconditions;
import org.apache.kudu.shaded.com.google.common.collect.Lists;
import org.apache.kudu.shaded.com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.kudu.shaded.com.google.protobuf.ByteString;
import org.apache.kudu.shaded.com.google.protobuf.Message;
import org.apache.kudu.shaded.org.jboss.netty.channel.socket.ClientSocketChannelFactory;
import org.apache.kudu.shaded.org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory;
import org.apache.kudu.shaded.org.jboss.netty.channel.socket.nio.NioWorkerPool;
import org.apache.kudu.shaded.org.jboss.netty.util.HashedWheelTimer;
import org.apache.kudu.shaded.org.jboss.netty.util.Timeout;
import org.apache.kudu.shaded.org.jboss.netty.util.Timer;
import org.apache.kudu.shaded.org.jboss.netty.util.TimerTask;
import org.apache.kudu.util.AsyncUtil;
import org.apache.kudu.util.NetUtil;
import org.apache.kudu.util.Pair;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.yetus.audience.InterfaceStability;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@InterfaceAudience.Public
@InterfaceStability.Unstable
public class AsyncKuduClient
implements AutoCloseable {
    public static final Logger LOG = LoggerFactory.getLogger(AsyncKuduClient.class);
    public static final int SLEEP_TIME = 500;
    public static final byte[] EMPTY_ARRAY = new byte[0];
    public static final long NO_TIMESTAMP = -1L;
    public static final long DEFAULT_OPERATION_TIMEOUT_MS = 30000L;
    public static final long DEFAULT_KEEP_ALIVE_PERIOD_MS = 15000L;
    private static final long MAX_RPC_ATTEMPTS = 100L;
    private static final int FETCH_TABLETS_PER_POINT_LOOKUP = 10;
    static int FETCH_TABLETS_PER_RANGE_LOOKUP = 1000;
    private final ClientSocketChannelFactory channelFactory;
    private final ConcurrentHashMap<String, TableLocationsCache> tableLocations = new ConcurrentHashMap();
    private final ConnectionCache connectionCache;
    @GuardedBy(value="sessions")
    private final Set<AsyncKuduSession> sessions = new HashSet<AsyncKuduSession>();
    @GuardedBy(value="this")
    private HiveMetastoreConfig hiveMetastoreConfig = null;
    static final String MASTER_TABLE_NAME_PLACEHOLDER = "Kudu Master";
    private final KuduTable masterTable;
    private final List<HostAndPort> masterAddresses;
    private final HashedWheelTimer timer;
    private long lastPropagatedTimestamp = -1L;
    private volatile boolean hasConnectedToMaster = false;
    private String location = "";
    private final Semaphore masterLookups = new Semaphore(50);
    private final Random sleepRandomizer = new Random();
    private final long defaultOperationTimeoutMs;
    private final long defaultAdminOperationTimeoutMs;
    private final Statistics statistics;
    private final boolean statisticsDisabled;
    private final RequestTracker requestTracker;
    @InterfaceAudience.LimitedPrivate(value={"Test"})
    final SecurityContext securityContext;
    private final AuthnTokenReacquirer tokenReacquirer;
    private final AuthzTokenCache authzTokenCache;
    private volatile boolean closed;

    private AsyncKuduClient(AsyncKuduClientBuilder b) {
        this.channelFactory = b.createChannelFactory();
        this.masterAddresses = b.masterAddresses;
        this.masterTable = new KuduTable(this, MASTER_TABLE_NAME_PLACEHOLDER, MASTER_TABLE_NAME_PLACEHOLDER, null, null, 1, null);
        this.defaultOperationTimeoutMs = b.defaultOperationTimeoutMs;
        this.defaultAdminOperationTimeoutMs = b.defaultAdminOperationTimeoutMs;
        this.statisticsDisabled = b.statisticsDisabled;
        this.statistics = this.statisticsDisabled ? null : new Statistics();
        this.timer = b.timer;
        this.requestTracker = new RequestTracker(UUID.randomUUID().toString().replace("-", ""));
        this.securityContext = new SecurityContext();
        this.connectionCache = new ConnectionCache(this.securityContext, this.timer, this.channelFactory);
        this.tokenReacquirer = new AuthnTokenReacquirer(this);
        this.authzTokenCache = new AuthzTokenCache(this);
    }

    @Nonnull
    RpcProxy newRpcProxy(ServerInfo serverInfo) {
        return this.newRpcProxy(serverInfo, Connection.CredentialsPolicy.ANY_CREDENTIALS);
    }

    @Nonnull
    private RpcProxy newRpcProxy(ServerInfo serverInfo, Connection.CredentialsPolicy credentialsPolicy) {
        Connection connection = this.connectionCache.getConnection(serverInfo, credentialsPolicy);
        return new RpcProxy(this, connection);
    }

    @Nullable
    RpcProxy newMasterRpcProxy(HostAndPort hostPort, Connection.CredentialsPolicy credentialsPolicy) {
        InetAddress inetAddress = NetUtil.getInetAddress(hostPort.getHost());
        if (inetAddress == null) {
            return null;
        }
        return this.newRpcProxy(new ServerInfo(AsyncKuduClient.getFakeMasterUuid(hostPort), hostPort, inetAddress, ""), credentialsPolicy);
    }

    static String getFakeMasterUuid(HostAndPort hostPort) {
        return "master-" + hostPort.toString();
    }

    void reconnectToCluster(Callback<Void, Boolean> cb, Callback<Void, Exception> eb) {
        final class ReconnectToClusterCB
        implements Callback<Void, ConnectToClusterResponse> {
            private final Callback<Void, Boolean> cb;

            ReconnectToClusterCB(Callback<Void, Boolean> cb) {
                this.cb = Preconditions.checkNotNull(cb);
            }

            /*
             * WARNING - Removed try catching itself - possible behaviour change.
             */
            public Void call(ConnectToClusterResponse resp) throws Exception {
                Master.ConnectToMasterResponsePB masterResponsePB = resp.getConnectResponse();
                if (masterResponsePB.hasAuthnToken()) {
                    LOG.info("connect to master: received a new authn token");
                    AsyncKuduClient.this.securityContext.setAuthenticationToken(masterResponsePB.getAuthnToken());
                    this.cb.call((Object)true);
                } else {
                    LOG.warn("connect to master: received no authn token");
                    AsyncKuduClient.this.securityContext.setAuthenticationToken(null);
                    this.cb.call((Object)false);
                }
                AsyncKuduClient asyncKuduClient = AsyncKuduClient.this;
                synchronized (asyncKuduClient) {
                    AsyncKuduClient.this.location = masterResponsePB.getClientLocation();
                }
                return null;
            }
        }
        ConnectToCluster.run(this.masterTable, this.masterAddresses, null, this.defaultAdminOperationTimeoutMs, Connection.CredentialsPolicy.PRIMARY_CREDENTIALS).addCallbacks((Callback)new ReconnectToClusterCB(cb), eb);
    }

    public synchronized void updateLastPropagatedTimestamp(long lastPropagatedTimestamp) {
        if (this.lastPropagatedTimestamp == -1L || this.lastPropagatedTimestamp < lastPropagatedTimestamp) {
            this.lastPropagatedTimestamp = lastPropagatedTimestamp;
        }
    }

    public synchronized long getLastPropagatedTimestamp() {
        return this.lastPropagatedTimestamp;
    }

    public synchronized boolean hasLastPropagatedTimestamp() {
        return this.lastPropagatedTimestamp != -1L;
    }

    public String getLocationString() {
        return this.location;
    }

    Timer getTimer() {
        return this.timer;
    }

    public KuduClient syncClient() {
        return new KuduClient(this);
    }

    public Deferred<KuduTable> createTable(String name, Schema schema, CreateTableOptions builder) {
        this.checkIsClosed();
        if (builder == null) {
            throw new IllegalArgumentException("CreateTableOptions may not be null");
        }
        if (!builder.getBuilder().getPartitionSchema().hasRangeSchema() && builder.getBuilder().getPartitionSchema().getHashBucketSchemasCount() == 0) {
            throw new IllegalArgumentException("Table partitioning must be specified using setRangePartitionColumns or addHashPartitions");
        }
        CreateTableRequest create = new CreateTableRequest(this.masterTable, name, schema, builder, this.timer, this.defaultAdminOperationTimeoutMs);
        Deferred<CreateTableResponse> createTableD = this.sendRpcToTablet(create);
        Deferred kuduTableD = createTableD.addCallbackDeferring(resp -> this.getTableSchema(name, resp.getTableId(), create));
        if (!builder.shouldWait()) {
            return kuduTableD;
        }
        return kuduTableD.addCallbackDeferring(tableResp -> {
            Master.TableIdentifierPB.Builder table = Master.TableIdentifierPB.newBuilder().setTableId(ByteString.copyFromUtf8(tableResp.getTableId()));
            return this.getDelayedIsCreateTableDoneDeferred(table, create, (KuduTable)tableResp);
        });
    }

    public Deferred<IsCreateTableDoneResponse> isCreateTableDone(String name) {
        return this.doIsCreateTableDone(Master.TableIdentifierPB.newBuilder().setTableName(name), null);
    }

    private Deferred<IsCreateTableDoneResponse> doIsCreateTableDone(@Nonnull Master.TableIdentifierPB.Builder table, @Nullable KuduRpc<?> parent) {
        this.checkIsClosed();
        IsCreateTableDoneRequest request = new IsCreateTableDoneRequest(this.masterTable, table, this.timer, this.defaultAdminOperationTimeoutMs);
        if (parent != null) {
            request.setParentRpc(parent);
        }
        return this.sendRpcToTablet(request);
    }

    public Deferred<DeleteTableResponse> deleteTable(String name) {
        this.checkIsClosed();
        DeleteTableRequest delete = new DeleteTableRequest(this.masterTable, name, this.timer, this.defaultAdminOperationTimeoutMs);
        return this.sendRpcToTablet(delete);
    }

    public Deferred<AlterTableResponse> alterTable(String name, AlterTableOptions ato) {
        this.checkIsClosed();
        AlterTableRequest alter = new AlterTableRequest(this.masterTable, name, ato, this.timer, this.defaultAdminOperationTimeoutMs);
        Deferred responseD = this.sendRpcToTablet(alter);
        if (ato.hasAddDropRangePartitions()) {
            responseD = responseD.addCallback((Callback)new Callback<AlterTableResponse, AlterTableResponse>(){

                public AlterTableResponse call(AlterTableResponse resp) {
                    AsyncKuduClient.this.tableLocations.remove(resp.getTableId());
                    return resp;
                }

                public String toString() {
                    return "ClearTableLocationsCacheCB";
                }
            }).addErrback((Callback)new Callback<Exception, Exception>(){

                public Exception call(Exception e) {
                    AsyncKuduClient.this.tableLocations.clear();
                    return e;
                }

                public String toString() {
                    return "ClearTableLocationsCacheEB";
                }
            });
        }
        if (!ato.shouldWait()) {
            return responseD;
        }
        return responseD.addCallbackDeferring(resp -> {
            Master.TableIdentifierPB.Builder table = Master.TableIdentifierPB.newBuilder().setTableId(ByteString.copyFromUtf8(resp.getTableId()));
            return this.getDelayedIsAlterTableDoneDeferred(table, alter, (AlterTableResponse)resp);
        });
    }

    public Deferred<IsAlterTableDoneResponse> isAlterTableDone(String name) {
        return this.doIsAlterTableDone(Master.TableIdentifierPB.newBuilder().setTableName(name), null);
    }

    private Deferred<IsAlterTableDoneResponse> doIsAlterTableDone(@Nonnull Master.TableIdentifierPB.Builder table, @Nullable KuduRpc<?> parent) {
        this.checkIsClosed();
        IsAlterTableDoneRequest request = new IsAlterTableDoneRequest(this.masterTable, table, this.timer, this.defaultAdminOperationTimeoutMs);
        request.setParentRpc(parent);
        return this.sendRpcToTablet(request);
    }

    public Deferred<ListTabletServersResponse> listTabletServers() {
        this.checkIsClosed();
        ListTabletServersRequest rpc = new ListTabletServersRequest(this.masterTable, this.timer, this.defaultAdminOperationTimeoutMs);
        return this.sendRpcToTablet(rpc);
    }

    private Deferred<KuduTable> getTableSchema(@Nullable String tableName, @Nullable String tableId, @Nullable KuduRpc<?> parent) {
        Preconditions.checkArgument(tableId != null || tableName != null);
        GetTableSchemaRequest rpc = new GetTableSchemaRequest(this.masterTable, tableId, tableId != null ? null : tableName, this.timer, this.defaultAdminOperationTimeoutMs, false);
        rpc.setParentRpc(parent);
        return this.sendRpcToTablet(rpc).addCallback(resp -> {
            Token.SignedTokenPB authzToken;
            TableLocationsCache cache = this.tableLocations.get(resp.getTableId());
            if (cache != null) {
                cache.clearNonCoveredRangeEntries();
            }
            if ((authzToken = resp.getAuthzToken()) != null) {
                this.authzTokenCache.put(resp.getTableId(), authzToken);
            }
            LOG.debug("Opened table {}", (Object)resp.getTableId());
            return new KuduTable(this, resp.getTableName(), resp.getTableId(), resp.getSchema(), resp.getPartitionSchema(), resp.getNumReplicas(), resp.getExtraConfig());
        });
    }

    public Deferred<ListTablesResponse> getTablesList() {
        return this.getTablesList(null);
    }

    public Deferred<ListTablesResponse> getTablesList(String nameFilter) {
        ListTablesRequest rpc = new ListTablesRequest(this.masterTable, nameFilter, this.timer, this.defaultAdminOperationTimeoutMs);
        return this.sendRpcToTablet(rpc);
    }

    public Deferred<Boolean> tableExists(String name) {
        if (name == null) {
            throw new IllegalArgumentException("The table name cannot be null");
        }
        return AsyncUtil.addCallbacksDeferring(this.getTableSchema(name, null, null), _table -> Deferred.fromResult((Object)true), e -> {
            Status status;
            if (e instanceof NonRecoverableException && (status = ((NonRecoverableException)e).getStatus()).isNotFound()) {
                return Deferred.fromResult((Object)false);
            }
            return Deferred.fromError((Exception)e);
        });
    }

    Deferred<KuduTable> openTableById(String id) {
        this.checkIsClosed();
        return this.getTableSchema(null, id, null);
    }

    public Deferred<KuduTable> openTable(String name) {
        this.checkIsClosed();
        return this.getTableSchema(name, null, null);
    }

    @InterfaceStability.Unstable
    public Deferred<byte[]> exportAuthenticationCredentials() {
        KuduRpc<byte[]> fakeRpc = this.buildFakeRpc("exportAuthenticationCredentials", null);
        Deferred fakeRpcD = fakeRpc.getDeferred();
        this.doExportAuthenticationCredentials(fakeRpc);
        return fakeRpcD;
    }

    private void doExportAuthenticationCredentials(KuduRpc<byte[]> fakeRpc) {
        if (this.hasConnectedToMaster) {
            fakeRpc.callback(this.securityContext.exportAuthenticationCredentials());
            return;
        }
        ++fakeRpc.attempt;
        this.getMasterTableLocationsPB(null).addCallback((Callback)new MasterLookupCB(this.masterTable, null, 1)).addCallback(ignored -> {
            assert (this.hasConnectedToMaster);
            this.doExportAuthenticationCredentials(fakeRpc);
            return null;
        }).addErrback(new RetryTaskErrback<byte[]>(fakeRpc, _ignored -> this.doExportAuthenticationCredentials(fakeRpc)));
    }

    @InterfaceAudience.LimitedPrivate(value={"Test"})
    public AuthzTokenCache getAuthzTokenCache() {
        return this.authzTokenCache;
    }

    @InterfaceAudience.LimitedPrivate(value={"Impala"})
    @InterfaceStability.Unstable
    public Deferred<HiveMetastoreConfig> getHiveMetastoreConfig() {
        KuduRpc<HiveMetastoreConfig> fakeRpc = this.buildFakeRpc("getHiveMetastoreConfig", null);
        Deferred fakeRpcD = fakeRpc.getDeferred();
        this.doGetHiveMetastoreConfig(fakeRpc);
        return fakeRpcD;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void doGetHiveMetastoreConfig(KuduRpc<HiveMetastoreConfig> fakeRpc) {
        if (this.hasConnectedToMaster) {
            HiveMetastoreConfig c;
            AsyncKuduClient asyncKuduClient = this;
            synchronized (asyncKuduClient) {
                c = this.hiveMetastoreConfig;
            }
            fakeRpc.callback(c);
            return;
        }
        ++fakeRpc.attempt;
        this.getMasterTableLocationsPB(null).addCallback((Callback)new MasterLookupCB(this.masterTable, null, 1)).addCallback(ignored -> {
            assert (this.hasConnectedToMaster);
            this.doGetHiveMetastoreConfig(fakeRpc);
            return null;
        }).addErrback(new RetryTaskErrback<HiveMetastoreConfig>(fakeRpc, ignored -> this.doGetHiveMetastoreConfig(fakeRpc)));
    }

    @InterfaceStability.Unstable
    public void importAuthenticationCredentials(byte[] authnData) {
        this.securityContext.importAuthenticationCredentials(authnData);
    }

    public long getDefaultOperationTimeoutMs() {
        return this.defaultOperationTimeoutMs;
    }

    public long getDefaultAdminOperationTimeoutMs() {
        return this.defaultAdminOperationTimeoutMs;
    }

    @Deprecated
    public long getDefaultSocketReadTimeoutMs() {
        LOG.info("getDefaultSocketReadTimeoutMs is deprecated");
        return 0L;
    }

    public String getMasterAddressesAsString() {
        return Joiner.on(",").join(this.masterAddresses);
    }

    public boolean isStatisticsEnabled() {
        return !this.statisticsDisabled;
    }

    public Statistics getStatistics() {
        if (this.statisticsDisabled) {
            throw new IllegalStateException("This client's statistics is disabled");
        }
        return this.statistics;
    }

    RequestTracker getRequestTracker() {
        return this.requestTracker;
    }

    @InterfaceAudience.LimitedPrivate(value={"Test"})
    KuduTable getMasterTable() {
        return this.masterTable;
    }

    public AsyncKuduScanner.AsyncKuduScannerBuilder newScannerBuilder(KuduTable table) {
        this.checkIsClosed();
        return new AsyncKuduScanner.AsyncKuduScannerBuilder(this, table);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public AsyncKuduSession newSession() {
        this.checkIsClosed();
        AsyncKuduSession session = new AsyncKuduSession(this);
        Set<AsyncKuduSession> set = this.sessions;
        synchronized (set) {
            this.sessions.add(session);
        }
        return session;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    void removeSession(AsyncKuduSession session) {
        Set<AsyncKuduSession> set = this.sessions;
        synchronized (set) {
            boolean removed = this.sessions.remove(session);
            assert (removed);
        }
    }

    Deferred<AsyncKuduScanner.Response> scanNextRows(AsyncKuduScanner scanner) {
        RemoteTablet tablet = Preconditions.checkNotNull(scanner.currentTablet());
        KuduRpc<AsyncKuduScanner.Response> nextRequest = scanner.getNextRowsRequest();
        ++nextRequest.attempt;
        ServerInfo info = tablet.getReplicaSelectedServerInfo(nextRequest.getReplicaSelection(), this.location);
        if (info == null) {
            return this.delayedSendRpcToTablet(nextRequest, new RecoverableException(Status.RemoteError(String.format("No information on servers hosting tablet %s, will retry later", tablet.getTabletId()))));
        }
        Deferred<AsyncKuduScanner.Response> d = nextRequest.getDeferred();
        RpcProxy.sendRpc(this, this.connectionCache.getConnection(info, Connection.CredentialsPolicy.ANY_CREDENTIALS), nextRequest);
        return d;
    }

    Deferred<AsyncKuduScanner.Response> closeScanner(AsyncKuduScanner scanner) {
        RemoteTablet tablet = scanner.currentTablet();
        if (tablet == null) {
            return Deferred.fromResult(null);
        }
        KuduRpc<AsyncKuduScanner.Response> closeRequest = scanner.getCloseRequest();
        ServerInfo info = tablet.getReplicaSelectedServerInfo(closeRequest.getReplicaSelection(), this.location);
        if (info == null) {
            return Deferred.fromResult(null);
        }
        Deferred<AsyncKuduScanner.Response> d = closeRequest.getDeferred();
        ++closeRequest.attempt;
        RpcProxy.sendRpc(this, this.connectionCache.getConnection(info, Connection.CredentialsPolicy.ANY_CREDENTIALS), closeRequest);
        return d;
    }

    Deferred<Void> keepAlive(AsyncKuduScanner scanner) {
        this.checkIsClosed();
        RemoteTablet tablet = scanner.currentTablet();
        if (tablet == null) {
            return Deferred.fromResult(null);
        }
        KuduRpc<Void> keepAliveRequest = scanner.getKeepAliveRequest();
        ServerInfo info = tablet.getReplicaSelectedServerInfo(keepAliveRequest.getReplicaSelection(), this.location);
        if (info == null) {
            return Deferred.fromResult(null);
        }
        Deferred<Void> d = keepAliveRequest.getDeferred();
        ++keepAliveRequest.attempt;
        RpcProxy.sendRpc(this, this.connectionCache.getConnection(info, Connection.CredentialsPolicy.ANY_CREDENTIALS), keepAliveRequest);
        return d;
    }

    <R> Deferred<R> sendRpcToTablet(KuduRpc<R> request) {
        RemoteTablet tablet;
        ServerInfo info;
        byte[] partitionKey;
        if (AsyncKuduClient.cannotRetryRequest(request)) {
            return AsyncKuduClient.tooManyAttemptsOrTimeout(request, null);
        }
        ++request.attempt;
        String tableId = request.getTable().getTableId();
        TableLocationsCache.Entry entry = this.getTableLocationEntry(tableId, partitionKey = request.partitionKey());
        if (entry != null && entry.isNonCoveredRange()) {
            NonCoveredRangeException e = new NonCoveredRangeException(entry.getLowerBoundPartitionKey(), entry.getUpperBoundPartitionKey());
            Deferred<R> d = request.getDeferred();
            request.errback(e);
            return d;
        }
        long lastPropagatedTs = this.getLastPropagatedTimestamp();
        if (request.getExternalConsistencyMode() == ExternalConsistencyMode.CLIENT_PROPAGATED && lastPropagatedTs != -1L) {
            request.setPropagatedTimestamp(lastPropagatedTs);
        }
        if (entry != null && (info = (tablet = entry.getTablet()).getReplicaSelectedServerInfo(request.getReplicaSelection(), this.location)) != null) {
            Deferred<R> d = request.getDeferred();
            request.setTablet(tablet);
            RpcProxy.sendRpc(this, this.connectionCache.getConnection(info, Connection.CredentialsPolicy.ANY_CREDENTIALS), request);
            return d;
        }
        request.addTrace(new RpcTraceFrame.RpcTraceFrameBuilder(request.method(), RpcTraceFrame.Action.QUERY_MASTER).build());
        RetryRpcCB cb = new RetryRpcCB(request);
        RetryRpcErrback<R> eb = new RetryRpcErrback<R>(request);
        Deferred<Master.GetTableLocationsResponsePB> returnedD = this.locateTablet(request.getTable(), partitionKey, 10, request);
        return AsyncUtil.addCallbacksDeferring(returnedD, cb, eb);
    }

    private <R> Callback<Exception, Exception> getDelayedIsTableDoneEB(KuduRpc<R> rpc) {
        return e -> {
            rpc.errback((Exception)e);
            return e;
        };
    }

    private <R> KuduRpc<R> buildFakeRpc(final @Nonnull String method, @Nullable KuduRpc<?> parent, long timeoutMs) {
        KuduRpc rpc = new KuduRpc<R>(null, this.timer, timeoutMs){

            @Override
            Message createRequestPB() {
                return null;
            }

            @Override
            String serviceName() {
                return null;
            }

            @Override
            String method() {
                return method;
            }

            @Override
            Pair<R, Object> deserialize(CallResponse callResponse, String tsUUID) throws KuduException {
                return null;
            }
        };
        rpc.setParentRpc(parent);
        return rpc;
    }

    private <R> KuduRpc<R> buildFakeRpc(@Nonnull String method, @Nullable KuduRpc<?> parent) {
        return this.buildFakeRpc(method, parent, this.defaultAdminOperationTimeoutMs);
    }

    Deferred<AlterTableResponse> getDelayedIsAlterTableDoneDeferred(@Nonnull Master.TableIdentifierPB.Builder table, @Nullable KuduRpc<?> parent, @Nullable AlterTableResponse resp) {
        KuduRpc<AlterTableResponse> fakeRpc = this.buildFakeRpc("IsAlterTableDone", parent);
        Deferred fakeRpcD = fakeRpc.getDeferred();
        this.delayedIsAlterTableDone(table, fakeRpc, this.getDelayedIsAlterTableDoneCB(fakeRpc, table, resp), this.getDelayedIsTableDoneEB(fakeRpc));
        return fakeRpcD;
    }

    Deferred<KuduTable> getDelayedIsCreateTableDoneDeferred(@Nonnull Master.TableIdentifierPB.Builder table, @Nullable KuduRpc<?> parent, @Nullable KuduTable resp) {
        KuduRpc<KuduTable> fakeRpc = this.buildFakeRpc("IsCreateTableDone", parent);
        Deferred fakeRpcD = fakeRpc.getDeferred();
        this.delayedIsCreateTableDone(table, fakeRpc, this.getDelayedIsCreateTableDoneCB(fakeRpc, table, resp), this.getDelayedIsTableDoneEB(fakeRpc));
        return fakeRpcD;
    }

    private Callback<Deferred<AlterTableResponse>, IsAlterTableDoneResponse> getDelayedIsAlterTableDoneCB(@Nonnull KuduRpc<AlterTableResponse> rpc, @Nonnull Master.TableIdentifierPB.Builder table, @Nullable AlterTableResponse alterResp) {
        return resp -> {
            Deferred d = rpc.getDeferred();
            if (resp.isDone()) {
                rpc.callback(alterResp);
            } else {
                ++rpc.attempt;
                this.delayedIsAlterTableDone(table, rpc, this.getDelayedIsAlterTableDoneCB(rpc, table, alterResp), this.getDelayedIsTableDoneEB(rpc));
            }
            return d;
        };
    }

    private Callback<Deferred<KuduTable>, IsCreateTableDoneResponse> getDelayedIsCreateTableDoneCB(KuduRpc<KuduTable> rpc, Master.TableIdentifierPB.Builder table, KuduTable tableResp) {
        return resp -> {
            Deferred d = rpc.getDeferred();
            if (resp.isDone()) {
                rpc.callback(tableResp);
            } else {
                ++rpc.attempt;
                this.delayedIsCreateTableDone(table, rpc, this.getDelayedIsCreateTableDoneCB(rpc, table, tableResp), this.getDelayedIsTableDoneEB(rpc));
            }
            return d;
        };
    }

    private void delayedIsCreateTableDone(final Master.TableIdentifierPB.Builder table, final KuduRpc<KuduTable> rpc, final Callback<Deferred<KuduTable>, IsCreateTableDoneResponse> callback, final Callback<Exception, Exception> errback) {
        long sleepTimeMillis = this.getSleepTimeForRpcMillis(rpc);
        if (rpc.timeoutTracker.wouldSleepingTimeoutMillis(sleepTimeMillis)) {
            AsyncKuduClient.tooManyAttemptsOrTimeout(rpc, null);
            return;
        }
        final class RetryTimer
        implements TimerTask {
            RetryTimer() {
            }

            @Override
            public void run(Timeout timeout) {
                AsyncKuduClient.this.doIsCreateTableDone(table, rpc).addCallbacks(callback, errback);
            }
        }
        AsyncKuduClient.newTimeout(this.timer, new RetryTimer(), sleepTimeMillis);
    }

    private void delayedIsAlterTableDone(final Master.TableIdentifierPB.Builder table, final KuduRpc<AlterTableResponse> rpc, final Callback<Deferred<AlterTableResponse>, IsAlterTableDoneResponse> callback, final Callback<Exception, Exception> errback) {
        long sleepTimeMillis = this.getSleepTimeForRpcMillis(rpc);
        if (rpc.timeoutTracker.wouldSleepingTimeoutMillis(sleepTimeMillis)) {
            AsyncKuduClient.tooManyAttemptsOrTimeout(rpc, null);
            return;
        }
        final class RetryTimer
        implements TimerTask {
            RetryTimer() {
            }

            @Override
            public void run(Timeout timeout) {
                AsyncKuduClient.this.doIsAlterTableDone(table, rpc).addCallbacks(callback, errback);
            }
        }
        AsyncKuduClient.newTimeout(this.timer, new RetryTimer(), sleepTimeMillis);
    }

    private long getSleepTimeForRpcMillis(KuduRpc<?> rpc) {
        int attemptCount = rpc.attempt;
        if (attemptCount == 0) {
            return 0L;
        }
        long sleepTime = (long)(Math.pow(2.0, Math.min(attemptCount, 12)) * this.sleepRandomizer.nextDouble());
        if (LOG.isTraceEnabled()) {
            LOG.trace("Going to sleep for {} at retry {}", (Object)sleepTime, (Object)rpc.attempt);
        }
        return sleepTime;
    }

    @InterfaceAudience.LimitedPrivate(value={"Test"})
    void emptyTabletsCacheForTable(String tableId) {
        this.tableLocations.remove(tableId);
    }

    private static boolean cannotRetryRequest(KuduRpc<?> rpc) {
        return rpc.timeoutTracker.timedOut() || (long)rpc.attempt > 100L;
    }

    private static <R> Deferred<R> tooManyAttemptsOrTimeout(KuduRpc<R> request, KuduException cause) {
        String message = (long)request.attempt > 100L ? "too many attempts: " : "cannot complete before timeout: ";
        Status statusTimedOut = Status.TimedOut(message + request);
        LOG.debug("Cannot continue with RPC because of: {}", (Object)statusTimedOut);
        Deferred<R> d = request.getDeferred();
        request.errback(new NonRecoverableException(statusTimedOut, (Throwable)cause));
        return d;
    }

    private Deferred<Master.GetTableLocationsResponsePB> locateTablet(KuduTable table, byte[] partitionKey, int fetchBatchSize, KuduRpc<?> parentRpc) {
        Deferred<Master.GetTableLocationsResponsePB> d;
        TableLocationsCache.Entry entry;
        boolean hasPermit = this.acquireMasterLookupPermit();
        String tableId = table.getTableId();
        if (!hasPermit && (entry = this.getTableLocationEntry(tableId, partitionKey)) != null && !entry.isNonCoveredRange() && entry.getTablet().getLeaderServerInfo() != null) {
            return Deferred.fromResult(null);
        }
        if (AsyncKuduClient.isMasterTable(tableId)) {
            d = this.getMasterTableLocationsPB(parentRpc);
        } else {
            long timeoutMillis = parentRpc == null ? this.defaultAdminOperationTimeoutMs : parentRpc.timeoutTracker.getMillisBeforeTimeout();
            GetTableLocationsRequest rpc = new GetTableLocationsRequest(this.masterTable, partitionKey, null, tableId, fetchBatchSize, this.timer, timeoutMillis);
            rpc.setParentRpc(parentRpc);
            d = this.sendRpcToTablet(rpc);
        }
        d.addCallback((Callback)new MasterLookupCB(table, partitionKey, fetchBatchSize));
        if (hasPermit) {
            d.addBoth(new ReleaseMasterLookupPermit());
        }
        return d;
    }

    Deferred<Master.GetTableLocationsResponsePB> getMasterTableLocationsPB(KuduRpc<?> parentRpc) {
        return ConnectToCluster.run(this.masterTable, this.masterAddresses, parentRpc, this.defaultAdminOperationTimeoutMs, Connection.CredentialsPolicy.ANY_CREDENTIALS).addCallback(resp -> {
            List<ByteString> caCerts;
            if (resp.getConnectResponse().hasAuthnToken()) {
                this.securityContext.setAuthenticationToken(resp.getConnectResponse().getAuthnToken());
            }
            if (!(caCerts = resp.getConnectResponse().getCaCertDerList()).isEmpty()) {
                try {
                    this.securityContext.trustCertificates(caCerts);
                }
                catch (CertificateException e) {
                    LOG.warn("Ignoring invalid CA cert from leader {}: {}", (Object)resp.getLeaderHostAndPort(), (Object)e.getMessage());
                }
            }
            HiveMetastoreConfig hiveMetastoreConfig = null;
            Master.ConnectToMasterResponsePB respPb = resp.getConnectResponse();
            if (respPb.hasHmsConfig()) {
                Master.HiveMetastoreConfig metastoreConf = respPb.getHmsConfig();
                hiveMetastoreConfig = new HiveMetastoreConfig(metastoreConf.getHmsUris(), metastoreConf.getHmsSaslEnabled(), metastoreConf.getHmsUuid());
            }
            AsyncKuduClient asyncKuduClient = this;
            synchronized (asyncKuduClient) {
                this.hiveMetastoreConfig = hiveMetastoreConfig;
                this.location = respPb.getClientLocation();
            }
            this.hasConnectedToMaster = true;
            return resp.getAsTableLocations();
        });
    }

    List<LocatedTablet> syncLocateTable(KuduTable table, byte[] startPartitionKey, byte[] endPartitionKey, int fetchBatchSize, long deadline) throws Exception {
        return (List)this.locateTable(table, startPartitionKey, endPartitionKey, fetchBatchSize, deadline).join();
    }

    private Deferred<List<LocatedTablet>> loopLocateTable(final KuduTable table, byte[] startPartitionKey, final byte[] endPartitionKey, final int fetchBatchSize, final List<LocatedTablet> ret, final TimeoutTracker timeoutTracker) {
        Preconditions.checkArgument(startPartitionKey == null || startPartitionKey.length > 0, "use null for unbounded start partition key");
        Preconditions.checkArgument(endPartitionKey == null || endPartitionKey.length > 0, "use null for unbounded end partition key");
        byte[] partitionKey = startPartitionKey;
        String tableId = table.getTableId();
        while (partitionKey == null || partitionKey.length > 0 && (endPartitionKey == null || Bytes.memcmp(partitionKey, endPartitionKey) < 0)) {
            byte[] key = partitionKey == null ? EMPTY_ARRAY : partitionKey;
            TableLocationsCache.Entry entry = this.getTableLocationEntry(tableId, key);
            if (entry != null) {
                if (!entry.isNonCoveredRange()) {
                    ret.add(new LocatedTablet(entry.getTablet()));
                }
                partitionKey = entry.getUpperBoundPartitionKey();
                continue;
            }
            if (timeoutTracker.timedOut()) {
                Status statusTimedOut = Status.TimedOut("Took too long getting the list of tablets, " + timeoutTracker);
                return Deferred.fromError((Exception)new NonRecoverableException(statusTimedOut));
            }
            final byte[] lookupKey = partitionKey;
            KuduRpc fakeRpc = this.buildFakeRpc("loopLocateTable", null, timeoutTracker.getMillisBeforeTimeout());
            return this.locateTablet(table, key, fetchBatchSize, fakeRpc).addCallbackDeferring((Callback)new Callback<Deferred<List<LocatedTablet>>, Master.GetTableLocationsResponsePB>(){

                public Deferred<List<LocatedTablet>> call(Master.GetTableLocationsResponsePB resp) {
                    return AsyncKuduClient.this.loopLocateTable(table, lookupKey, endPartitionKey, fetchBatchSize, ret, timeoutTracker);
                }

                public String toString() {
                    return "LoopLocateTableCB";
                }
            });
        }
        return Deferred.fromResult(ret);
    }

    Deferred<List<LocatedTablet>> locateTable(KuduTable table, byte[] startPartitionKey, byte[] endPartitionKey, int fetchBatchSize, long deadline) {
        ArrayList<LocatedTablet> ret = Lists.newArrayList();
        TimeoutTracker timeoutTracker = new TimeoutTracker();
        timeoutTracker.setTimeout(deadline);
        return this.loopLocateTable(table, startPartitionKey, endPartitionKey, fetchBatchSize, ret, timeoutTracker);
    }

    private Deferred<SplitKeyRangeResponse> getTabletKeyRanges(KuduTable table, byte[] startPrimaryKey, byte[] endPrimaryKey, byte[] partitionKey, long splitSizeBytes, KuduRpc<?> parentRpc) {
        long timeoutMillis = parentRpc == null ? this.defaultAdminOperationTimeoutMs : parentRpc.timeoutTracker.getMillisBeforeTimeout();
        SplitKeyRangeRequest rpc = new SplitKeyRangeRequest(table, startPrimaryKey, endPrimaryKey, partitionKey, splitSizeBytes, this.timer, timeoutMillis);
        rpc.setParentRpc(parentRpc);
        return this.sendRpcToTablet(rpc);
    }

    Deferred<List<KeyRange>> getTableKeyRanges(KuduTable table, byte[] startPrimaryKey, byte[] endPrimaryKey, byte[] startPartitionKey, byte[] endPartitionKey, int fetchBatchSize, long splitSizeBytes, long deadline) {
        TimeoutTracker timeoutTracker = new TimeoutTracker();
        timeoutTracker.setTimeout(deadline);
        Callback locateTabletCB = tablets -> {
            if (splitSizeBytes <= 0L) {
                ArrayList<KeyRange> keyRanges = Lists.newArrayList();
                for (LocatedTablet tablet : tablets) {
                    keyRanges.add(new KeyRange(tablet, startPrimaryKey, endPrimaryKey, -1L));
                }
                return Deferred.fromResult(keyRanges);
            }
            ArrayList<Deferred> deferreds = new ArrayList<Deferred>();
            for (LocatedTablet tablet : tablets) {
                KuduRpc fakeRpc = this.buildFakeRpc("getTableKeyRanges", null, timeoutTracker.getMillisBeforeTimeout());
                deferreds.add(this.getTabletKeyRanges(table, startPrimaryKey, endPrimaryKey, tablet.getPartition().getPartitionKeyStart(), splitSizeBytes, fakeRpc).addCallbackDeferring(resp -> {
                    ArrayList<KeyRange> ranges = Lists.newArrayList();
                    for (Common.KeyRangePB pb : resp.getKeyRanges()) {
                        KeyRange newRange = new KeyRange(tablet, pb.getStartPrimaryKey().toByteArray(), pb.getStopPrimaryKey().toByteArray(), pb.getSizeBytesEstimates());
                        ranges.add(newRange);
                        LOG.debug("Add key range {}", (Object)newRange);
                    }
                    return Deferred.fromResult(ranges);
                }));
            }
            return Deferred.groupInOrder(deferreds).addCallbackDeferring(rangeLists -> {
                ArrayList ret = Lists.newArrayList();
                for (List ranges : rangeLists) {
                    ret.addAll(ranges);
                }
                return Deferred.fromResult(ret);
            });
        };
        ArrayList<LocatedTablet> tablets2 = Lists.newArrayList();
        return this.loopLocateTable(table, startPartitionKey, endPartitionKey, fetchBatchSize, tablets2, timeoutTracker).addCallbackDeferring(locateTabletCB);
    }

    <R> void handleTabletNotFound(KuduRpc<R> rpc, KuduException ex, ServerInfo info) {
        this.invalidateTabletCache(rpc.getTablet(), info, ex.getMessage());
        this.handleRetryableError(rpc, ex);
    }

    <R> void handleNotLeader(KuduRpc<R> rpc, KuduException ex, ServerInfo info) {
        rpc.getTablet().demoteLeader(info.getUuid());
        this.handleRetryableError(rpc, ex);
    }

    <R> void handleRetryableError(KuduRpc<R> rpc, KuduException ex) {
        this.delayedSendRpcToTablet(rpc, ex);
    }

    <R> void handleRetryableErrorNoDelay(KuduRpc<R> rpc, KuduException ex) {
        if (AsyncKuduClient.cannotRetryRequest(rpc)) {
            AsyncKuduClient.tooManyAttemptsOrTimeout(rpc, ex);
            return;
        }
        this.sendRpcToTablet(rpc);
    }

    <R> void handleInvalidAuthnToken(KuduRpc<R> rpc) {
        this.tokenReacquirer.handleAuthnTokenExpiration(rpc);
    }

    <R> void handleInvalidAuthzToken(KuduRpc<R> rpc, KuduException ex) {
        this.authzTokenCache.retrieveAuthzToken(rpc, ex);
    }

    Token.SignedTokenPB getAuthzToken(String tableId) {
        return this.authzTokenCache.get(tableId);
    }

    private <R> Deferred<R> delayedSendRpcToTablet(KuduRpc<R> rpc, KuduException ex) {
        assert (ex != null);
        Status reasonForRetry = ex.getStatus();
        rpc.addTrace(new RpcTraceFrame.RpcTraceFrameBuilder(rpc.method(), RpcTraceFrame.Action.SLEEP_THEN_RETRY).callStatus(reasonForRetry).build());
        long sleepTime = this.getSleepTimeForRpcMillis(rpc);
        if (AsyncKuduClient.cannotRetryRequest(rpc) || rpc.timeoutTracker.wouldSleepingTimeoutMillis(sleepTime)) {
            return AsyncKuduClient.tooManyAttemptsOrTimeout(rpc, ex);
        }
        AsyncKuduClient.newTimeout(this.timer, _timeout -> this.sendRpcToTablet(rpc), sleepTime);
        return rpc.getDeferred();
    }

    private void invalidateTabletCache(RemoteTablet tablet, ServerInfo info, String errorMessage) {
        String uuid = info.getUuid();
        LOG.info("Invalidating location {} for tablet {}: {}", new Object[]{info, tablet.getTabletId(), errorMessage});
        tablet.removeTabletClient(uuid);
    }

    private ServerInfo resolveTS(Master.TSInfoPB tsInfoPB) throws UnknownHostException {
        List<Common.HostPortPB> addresses = tsInfoPB.getRpcAddressesList();
        String uuid = tsInfoPB.getPermanentUuid().toStringUtf8();
        if (addresses.isEmpty()) {
            LOG.warn("Received a tablet server with no addresses, UUID: {}", (Object)uuid);
            return null;
        }
        HostAndPort hostPort = ProtobufHelper.hostAndPortFromPB(addresses.get(0));
        InetAddress inetAddress = NetUtil.getInetAddress(hostPort.getHost());
        if (inetAddress == null) {
            throw new UnknownHostException("Failed to resolve the IP of `" + addresses.get(0).getHost() + "'");
        }
        return new ServerInfo(uuid, hostPort, inetAddress, tsInfoPB.getLocation());
    }

    private boolean acquireMasterLookupPermit() {
        try {
            return this.masterLookups.tryAcquire(5L, TimeUnit.MILLISECONDS);
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            return false;
        }
    }

    private void releaseMasterLookupPermit() {
        this.masterLookups.release();
    }

    @InterfaceAudience.LimitedPrivate(value={"Test"})
    void discoverTablets(KuduTable table, byte[] requestPartitionKey, int requestedBatchSize, List<Master.TabletLocationsPB> locations, List<Master.TSInfoPB> tsInfosList, long ttl) throws KuduException {
        TableLocationsCache existingLocationsCache;
        String tableId = table.getTableId();
        String tableName = table.getName();
        TableLocationsCache locationsCache = this.tableLocations.get(tableId);
        if (locationsCache == null && (existingLocationsCache = this.tableLocations.putIfAbsent(tableId, locationsCache = new TableLocationsCache())) != null) {
            locationsCache = existingLocationsCache;
        }
        int numTsInfos = tsInfosList.size();
        ArrayList<RemoteTablet> tablets = new ArrayList<RemoteTablet>(locations.size());
        for (Master.TabletLocationsPB tabletPb : locations) {
            ArrayList<NonRecoverableException> lookupExceptions = new ArrayList<NonRecoverableException>(tabletPb.getReplicasCount());
            ArrayList<ServerInfo> servers = new ArrayList<ServerInfo>(tabletPb.getReplicasCount());
            Consumer<Master.TSInfoPB> updateServersAndCollectExceptions = tsInfo -> {
                try {
                    ServerInfo serverInfo = this.resolveTS((Master.TSInfoPB)tsInfo);
                    if (serverInfo != null) {
                        servers.add(serverInfo);
                    }
                }
                catch (UnknownHostException ex) {
                    lookupExceptions.add((NonRecoverableException)((Object)ex));
                }
            };
            for (Master.TabletLocationsPB.ReplicaPB replicaPB : tabletPb.getReplicasList()) {
                updateServersAndCollectExceptions.accept(replicaPB.getTsInfo());
            }
            ArrayList<Master.TabletLocationsPB.ReplicaPB> replicas = new ArrayList<Master.TabletLocationsPB.ReplicaPB>();
            for (Master.TabletLocationsPB.InternedReplicaPB replica3 : tabletPb.getInternedReplicasList()) {
                int tsInfoIdx = replica3.getTsInfoIdx();
                if (tsInfoIdx >= numTsInfos) {
                    lookupExceptions.add(new NonRecoverableException(Status.Corruption(String.format("invalid response from master: referenced tablet idx %d but only %d present", tsInfoIdx, numTsInfos))));
                    continue;
                }
                Master.TSInfoPB tsInfo2 = tsInfosList.get(tsInfoIdx);
                updateServersAndCollectExceptions.accept(tsInfo2);
                Master.TabletLocationsPB.ReplicaPB.Builder builder = Master.TabletLocationsPB.ReplicaPB.newBuilder();
                builder.setRole(replica3.getRole());
                builder.setTsInfo(tsInfo2);
                replicas.add(builder.build());
            }
            if (!lookupExceptions.isEmpty() && lookupExceptions.size() == tabletPb.getReplicasCount()) {
                Status status = Status.IOError("Couldn't find any valid locations, exceptions: " + lookupExceptions);
                throw new NonRecoverableException(status);
            }
            RemoteTablet remoteTablet = new RemoteTablet(tableId, tabletPb.getTabletId().toStringUtf8(), ProtobufHelper.pbToPartition(tabletPb.getPartition()), replicas.isEmpty() ? tabletPb.getReplicasList() : replicas, servers);
            LOG.debug("Learned about tablet {} for table '{}' with partition {}", new Object[]{remoteTablet.getTabletId(), tableName, remoteTablet.getPartition()});
            tablets.add(remoteTablet);
        }
        locationsCache.cacheTabletLocations(tablets, requestPartitionKey, requestedBatchSize, ttl);
        TableLocationsCache.Entry entry = locationsCache.get(requestPartitionKey);
        if (!entry.isNonCoveredRange() && entry.getTablet().getLeaderServerInfo() == null) {
            throw new NoLeaderFoundException(Status.NotFound("Tablet " + entry.toString() + " doesn't have a leader"));
        }
    }

    TableLocationsCache.Entry getTableLocationEntry(String tableId, byte[] partitionKey) {
        TableLocationsCache cache = this.tableLocations.get(tableId);
        if (cache == null) {
            return null;
        }
        return cache.get(partitionKey);
    }

    Deferred<LocatedTablet> getTabletLocation(KuduTable table, byte[] partitionKey, LookupType lookupType, long timeoutMs) {
        byte[] endPartitionKey;
        byte[] startPartitionKey;
        if (partitionKey.length == 0) {
            startPartitionKey = null;
            endPartitionKey = new byte[]{0};
        } else {
            startPartitionKey = partitionKey;
            endPartitionKey = Arrays.copyOf(partitionKey, partitionKey.length + 1);
        }
        TimeoutTracker timeoutTracker = new TimeoutTracker();
        timeoutTracker.setTimeout(timeoutMs);
        Deferred<List<LocatedTablet>> locatedTablets = this.locateTable(table, startPartitionKey, endPartitionKey, 10, timeoutMs);
        return locatedTablets.addCallbackDeferring(tablets -> {
            Preconditions.checkArgument(tablets.size() <= 1, "found more than one tablet for a single partition key");
            if (tablets.isEmpty()) {
                TableLocationsCache.Entry entry = this.getTableLocationEntry(table.getTableId(), partitionKey);
                if (entry == null) {
                    LOG.debug("Table location expired before it could be processed; retrying.");
                    return Deferred.fromError((Exception)new RecoverableException(Status.NotFound("Table location expired before it could be processed")));
                }
                if (entry.isNonCoveredRange()) {
                    if (lookupType == LookupType.POINT || entry.getUpperBoundPartitionKey().length == 0) {
                        return Deferred.fromError((Exception)new NonCoveredRangeException(entry.getLowerBoundPartitionKey(), entry.getUpperBoundPartitionKey()));
                    }
                    return this.getTabletLocation(table, entry.getUpperBoundPartitionKey(), LookupType.POINT, timeoutTracker.getMillisBeforeTimeout());
                }
                return Deferred.fromResult((Object)new LocatedTablet(entry.getTablet()));
            }
            return Deferred.fromResult((Object)((LocatedTablet)tablets.get(0)));
        });
    }

    @Override
    public void close() throws Exception {
        this.shutdown().join();
    }

    public Deferred<ArrayList<Void>> shutdown() {
        this.checkIsClosed();
        this.closed = true;
        final class DisconnectCB
        implements Callback<Deferred<ArrayList<Void>>, ArrayList<List<OperationResponse>>> {
            DisconnectCB() {
            }

            public Deferred<ArrayList<Void>> call(ArrayList<List<OperationResponse>> ignoredResponses) {
                final class ReleaseResourcesCB
                implements Callback<ArrayList<Void>, ArrayList<Void>> {
                    ReleaseResourcesCB() {
                    }

                    public ArrayList<Void> call(ArrayList<Void> arg) {
                        LOG.debug("Releasing all remaining resources");
                        AsyncKuduClient.this.timer.stop();
                        final class ShutdownThread
                        extends Thread {
                            ShutdownThread() {
                                super("AsyncKuduClient@" + AsyncKuduClient.super.hashCode() + " shutdown");
                            }

                            @Override
                            public void run() {
                                AsyncKuduClient.this.channelFactory.releaseExternalResources();
                            }
                        }
                        new ShutdownThread().start();
                        return arg;
                    }

                    public String toString() {
                        return "release resources callback";
                    }
                }
                return AsyncKuduClient.this.connectionCache.disconnectEverything().addCallback((Callback)new ReleaseResourcesCB());
            }

            public String toString() {
                return "disconnect callback";
            }
        }
        return this.closeAllSessions().addCallbackDeferring((Callback)new DisconnectCB());
    }

    private void checkIsClosed() {
        if (this.closed) {
            throw new IllegalStateException("Cannot proceed, the client has already been closed");
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private Deferred<ArrayList<List<OperationResponse>>> closeAllSessions() {
        HashSet<AsyncKuduSession> copyOfSessions;
        Set<AsyncKuduSession> set = this.sessions;
        synchronized (set) {
            copyOfSessions = new HashSet<AsyncKuduSession>(this.sessions);
        }
        if (copyOfSessions.isEmpty()) {
            return Deferred.fromResult(null);
        }
        ArrayList<Deferred<List<OperationResponse>>> deferreds = new ArrayList<Deferred<List<OperationResponse>>>(copyOfSessions.size());
        for (AsyncKuduSession session : copyOfSessions) {
            deferreds.add(session.close());
        }
        return Deferred.group(deferreds);
    }

    private static boolean isMasterTable(String tableId) {
        return MASTER_TABLE_NAME_PLACEHOLDER == tableId;
    }

    static Timeout newTimeout(Timer timer, TimerTask task, long timeoutMillis) {
        Preconditions.checkNotNull(timer);
        try {
            return timer.newTimeout(task, timeoutMillis, TimeUnit.MILLISECONDS);
        }
        catch (IllegalStateException e) {
            LOG.warn("Failed to schedule timer. Ignore this if we're shutting down.", (Throwable)e);
            return null;
        }
    }

    @InterfaceAudience.LimitedPrivate(value={"Test"})
    List<Connection> getConnectionListCopy() {
        return this.connectionCache.getConnectionListCopy();
    }

    @InterfaceAudience.Public
    @InterfaceStability.Evolving
    public static final class AsyncKuduClientBuilder {
        private static final int DEFAULT_MASTER_PORT = 7051;
        private static final int DEFAULT_BOSS_COUNT = 1;
        private static final int DEFAULT_WORKER_COUNT = 2 * Runtime.getRuntime().availableProcessors();
        private final List<HostAndPort> masterAddresses;
        private long defaultAdminOperationTimeoutMs = 30000L;
        private long defaultOperationTimeoutMs = 30000L;
        private final HashedWheelTimer timer = new HashedWheelTimer(new ThreadFactoryBuilder().setDaemon(true).build(), 20L, TimeUnit.MILLISECONDS);
        private Executor bossExecutor;
        private Executor workerExecutor;
        private int bossCount = 1;
        private int workerCount = DEFAULT_WORKER_COUNT;
        private boolean statisticsDisabled = false;

        public AsyncKuduClientBuilder(String masterAddresses) {
            this.masterAddresses = NetUtil.parseStrings(masterAddresses, 7051);
        }

        public AsyncKuduClientBuilder(List<String> masterAddresses) {
            this.masterAddresses = Lists.newArrayListWithCapacity(masterAddresses.size());
            for (String address : masterAddresses) {
                this.masterAddresses.add(NetUtil.parseString(address, 7051));
            }
        }

        public AsyncKuduClientBuilder defaultAdminOperationTimeoutMs(long timeoutMs) {
            this.defaultAdminOperationTimeoutMs = timeoutMs;
            return this;
        }

        public AsyncKuduClientBuilder defaultOperationTimeoutMs(long timeoutMs) {
            this.defaultOperationTimeoutMs = timeoutMs;
            return this;
        }

        @Deprecated
        public AsyncKuduClientBuilder defaultSocketReadTimeoutMs(long timeoutMs) {
            LOG.info("defaultSocketReadTimeoutMs is deprecated");
            return this;
        }

        public AsyncKuduClientBuilder nioExecutors(Executor bossExecutor, Executor workerExecutor) {
            this.bossExecutor = bossExecutor;
            this.workerExecutor = workerExecutor;
            return this;
        }

        public AsyncKuduClientBuilder bossCount(int bossCount) {
            Preconditions.checkArgument(bossCount > 0, "bossCount should be greater than 0");
            this.bossCount = bossCount;
            return this;
        }

        public AsyncKuduClientBuilder workerCount(int workerCount) {
            Preconditions.checkArgument(workerCount > 0, "workerCount should be greater than 0");
            this.workerCount = workerCount;
            return this;
        }

        private NioClientSocketChannelFactory createChannelFactory() {
            Executor boss = this.bossExecutor;
            Executor worker = this.workerExecutor;
            if (boss == null || worker == null) {
                ExecutorService defaultExec = Executors.newCachedThreadPool(new ThreadFactoryBuilder().setNameFormat("kudu-nio-%d").setDaemon(true).build());
                if (boss == null) {
                    boss = defaultExec;
                }
                if (worker == null) {
                    worker = defaultExec;
                }
            }
            return new NioClientSocketChannelFactory(boss, this.bossCount, new NioWorkerPool(worker, this.workerCount), this.timer);
        }

        public AsyncKuduClientBuilder disableStatistics() {
            this.statisticsDisabled = true;
            return this;
        }

        public AsyncKuduClient build() {
            return new AsyncKuduClient(this);
        }
    }

    static enum LookupType {
        POINT,
        LOWER_BOUND;

    }

    private final class MasterLookupCB
    implements Callback<Object, Master.GetTableLocationsResponsePB> {
        final KuduTable table;
        private final byte[] partitionKey;
        private final int requestedBatchSize;

        MasterLookupCB(KuduTable table, byte[] partitionKey, int requestedBatchSize) {
            this.table = table;
            this.partitionKey = partitionKey;
            this.requestedBatchSize = requestedBatchSize;
        }

        public Object call(Master.GetTableLocationsResponsePB response) {
            if (response.hasError()) {
                Status status = Status.fromMasterErrorPB(response.getError());
                return new NonRecoverableException(status);
            }
            try {
                AsyncKuduClient.this.discoverTablets(this.table, this.partitionKey, this.requestedBatchSize, response.getTabletLocationsList(), response.getTsInfosList(), response.getTtlMillis());
            }
            catch (KuduException e) {
                return e;
            }
            return null;
        }

        public String toString() {
            return "get tablet locations from the master for table " + this.table.getName();
        }
    }

    private final class ReleaseMasterLookupPermit<T>
    implements Callback<T, T> {
        private ReleaseMasterLookupPermit() {
        }

        public T call(T arg) {
            AsyncKuduClient.this.releaseMasterLookupPermit();
            return arg;
        }

        public String toString() {
            return "release master lookup permit";
        }
    }

    final class RetryRpcErrback<R>
    implements Callback<Deferred<R>, Exception> {
        private final KuduRpc<R> request;

        public RetryRpcErrback(KuduRpc<R> request) {
            this.request = request;
        }

        public Deferred<R> call(Exception arg) {
            if (arg instanceof RecoverableException) {
                return AsyncKuduClient.this.delayedSendRpcToTablet(this.request, (KuduException)arg);
            }
            if (LOG.isDebugEnabled()) {
                LOG.debug(String.format("Notify RPC %s after lookup exception", this.request), (Throwable)arg);
            }
            this.request.errback(arg);
            return Deferred.fromError((Exception)arg);
        }

        public String toString() {
            return "retry RPC after error";
        }
    }

    final class RetryRpcCB<R, D>
    implements Callback<Deferred<R>, D> {
        private final KuduRpc<R> request;

        RetryRpcCB(KuduRpc<R> request) {
            this.request = request;
        }

        public Deferred<R> call(D arg) {
            LOG.debug("Retrying sending RPC {} after lookup", this.request);
            return AsyncKuduClient.this.sendRpcToTablet(this.request);
        }

        public String toString() {
            return "retry RPC";
        }
    }

    class RetryTaskErrback<R>
    implements Callback<Void, Exception> {
        private final KuduRpc<R> fakeRpc;
        private final TimerTask retryTask;

        public RetryTaskErrback(KuduRpc<R> fakeRpc, TimerTask retryTask) {
            this.fakeRpc = fakeRpc;
            this.retryTask = retryTask;
        }

        public Void call(Exception arg) {
            if (!(arg instanceof RecoverableException)) {
                this.fakeRpc.errback(arg);
                return null;
            }
            RecoverableException ex = (RecoverableException)arg;
            long sleepTime = AsyncKuduClient.this.getSleepTimeForRpcMillis(this.fakeRpc);
            if (AsyncKuduClient.cannotRetryRequest(this.fakeRpc) || this.fakeRpc.timeoutTracker.wouldSleepingTimeoutMillis(sleepTime)) {
                AsyncKuduClient.tooManyAttemptsOrTimeout(this.fakeRpc, ex);
                return null;
            }
            this.fakeRpc.addTrace(new RpcTraceFrame.RpcTraceFrameBuilder(this.fakeRpc.method(), RpcTraceFrame.Action.SLEEP_THEN_RETRY).callStatus(ex.getStatus()).build());
            AsyncKuduClient.newTimeout(AsyncKuduClient.this.timer, this.retryTask, sleepTime);
            return null;
        }

        public String toString() {
            return "retry task after error";
        }
    }
}

