/*
 * Decompiled with CFR 0.152.
 */
package com.pingcap.tikv.region;

import com.pingcap.com.google.common.annotations.VisibleForTesting;
import com.pingcap.tidb.tipb.DAGRequest;
import com.pingcap.tidb.tipb.SelectResponse;
import com.pingcap.tikv.PDClient;
import com.pingcap.tikv.StoreVersion;
import com.pingcap.tikv.TiConfiguration;
import com.pingcap.tikv.exception.GrpcException;
import com.pingcap.tikv.exception.KeyException;
import com.pingcap.tikv.exception.LockException;
import com.pingcap.tikv.exception.RegionException;
import com.pingcap.tikv.exception.SelectException;
import com.pingcap.tikv.exception.TiClientInternalException;
import com.pingcap.tikv.exception.TiKVException;
import com.pingcap.tikv.operation.KVErrorHandler;
import com.pingcap.tikv.region.AbstractRegionStoreClient;
import com.pingcap.tikv.region.RegionManager;
import com.pingcap.tikv.region.TiRegion;
import com.pingcap.tikv.region.TiStoreType;
import com.pingcap.tikv.streaming.StreamingResponse;
import com.pingcap.tikv.txn.AbstractLockResolverClient;
import com.pingcap.tikv.txn.Lock;
import com.pingcap.tikv.txn.ResolveLockResult;
import com.pingcap.tikv.util.BackOffFunction;
import com.pingcap.tikv.util.BackOffer;
import com.pingcap.tikv.util.ChannelFactory;
import com.pingcap.tikv.util.ConcreteBackOffer;
import com.pingcap.tikv.util.Pair;
import com.pingcap.tikv.util.RangeSplitter;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Queue;
import java.util.Set;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.tikv.kvproto.Coprocessor;
import org.tikv.kvproto.Errorpb;
import org.tikv.kvproto.Kvrpcpb;
import org.tikv.kvproto.Metapb;
import org.tikv.kvproto.TikvGrpc;
import shade.com.google.protobuf.ByteString;
import shade.com.google.protobuf.InvalidProtocolBufferException;
import shade.io.grpc.ManagedChannel;

public class RegionStoreClient
extends AbstractRegionStoreClient {
    private static final Logger logger = LoggerFactory.getLogger(RegionStoreClient.class);
    @VisibleForTesting
    public final AbstractLockResolverClient lockResolverClient;
    private final TiStoreType storeType;
    private final Map<Long, Set<Long>> resolvedLocks = new HashMap<Long, Set<Long>>();
    private final PDClient pdClient;
    private Boolean isV4 = null;

    private synchronized Boolean getIsV4() {
        if (this.isV4 == null) {
            this.isV4 = StoreVersion.minTiKVVersion("4.0.0", this.pdClient);
        }
        return this.isV4;
    }

    private RegionStoreClient(TiConfiguration conf, TiRegion region, Metapb.Store store, TiStoreType storeType, ChannelFactory channelFactory, TikvGrpc.TikvBlockingStub blockingStub, TikvGrpc.TikvStub asyncStub, RegionManager regionManager, PDClient pdClient, RegionStoreClientBuilder clientBuilder) {
        super(conf, region, channelFactory, blockingStub, asyncStub, regionManager);
        this.storeType = storeType;
        if (this.storeType == TiStoreType.TiKV) {
            this.lockResolverClient = AbstractLockResolverClient.getInstance(store, conf, region, (TikvGrpc.TikvBlockingStub)this.blockingStub, (TikvGrpc.TikvStub)this.asyncStub, channelFactory, regionManager, pdClient, clientBuilder);
        } else {
            Metapb.Store tikvStore = (Metapb.Store)regionManager.getRegionStorePairByKey((ByteString)region.getStartKey(), (TiStoreType)TiStoreType.TiKV).second;
            String addressStr = tikvStore.getAddress();
            if (logger.isDebugEnabled()) {
                logger.debug(String.format("Create region store client on address %s", addressStr));
            }
            ManagedChannel channel = channelFactory.getChannel(addressStr);
            TikvGrpc.TikvBlockingStub tikvBlockingStub = TikvGrpc.newBlockingStub(channel);
            TikvGrpc.TikvStub tikvAsyncStub = TikvGrpc.newStub(channel);
            this.lockResolverClient = AbstractLockResolverClient.getInstance(tikvStore, conf, region, tikvBlockingStub, tikvAsyncStub, channelFactory, regionManager, pdClient, clientBuilder);
        }
        this.pdClient = pdClient;
    }

    public synchronized boolean addResolvedLocks(Long version, Set<Long> locks) {
        Set<Long> oldList = this.resolvedLocks.get(version);
        if (oldList != null) {
            oldList.addAll(locks);
        } else {
            this.resolvedLocks.put(version, new HashSet<Long>(locks));
        }
        return true;
    }

    public synchronized Set<Long> getResolvedLocks(Long version) {
        return this.resolvedLocks.getOrDefault(version, Collections.emptySet());
    }

    public ByteString get(BackOffer backOffer, ByteString key, long version) throws TiClientInternalException, KeyException {
        boolean forWrite = false;
        Supplier<Kvrpcpb.GetRequest> factory = () -> Kvrpcpb.GetRequest.newBuilder().setContext(this.region.getContext(this.getResolvedLocks(version))).setKey(key).setVersion(version).build();
        KVErrorHandler<Kvrpcpb.GetResponse> handler = new KVErrorHandler<Kvrpcpb.GetResponse>(this.regionManager, this, this.lockResolverClient, this.region, resp -> resp.hasRegionError() ? resp.getRegionError() : null, resp -> resp.hasError() ? resp.getError() : null, resolveLockResult -> this.addResolvedLocks(version, resolveLockResult.getResolvedLocks()), version, forWrite);
        Kvrpcpb.GetResponse resp2 = this.callWithRetry(backOffer, TikvGrpc.getKvGetMethod(), factory, handler);
        this.handleGetResponse(resp2);
        return resp2.getValue();
    }

    private void handleGetResponse(Kvrpcpb.GetResponse resp) throws TiClientInternalException, KeyException {
        if (resp == null) {
            this.regionManager.onRequestFail(this.region);
            throw new TiClientInternalException("GetResponse failed without a cause");
        }
        if (resp.hasRegionError()) {
            throw new RegionException(resp.getRegionError());
        }
        if (resp.hasError()) {
            throw new KeyException(resp.getError());
        }
    }

    public List<Kvrpcpb.KvPair> batchGet(BackOffer backOffer, List<ByteString> keys, long version) {
        boolean forWrite = false;
        Supplier<Kvrpcpb.BatchGetRequest> request = () -> Kvrpcpb.BatchGetRequest.newBuilder().setContext(this.region.getContext(this.getResolvedLocks(version))).addAllKeys(keys).setVersion(version).build();
        KVErrorHandler<Kvrpcpb.BatchGetResponse> handler = new KVErrorHandler<Kvrpcpb.BatchGetResponse>(this.regionManager, this, this.lockResolverClient, this.region, resp -> resp.hasRegionError() ? resp.getRegionError() : null, resp -> null, resolveLockResult -> this.addResolvedLocks(version, resolveLockResult.getResolvedLocks()), version, forWrite);
        Kvrpcpb.BatchGetResponse resp2 = this.callWithRetry(backOffer, TikvGrpc.getKvBatchGetMethod(), request, handler);
        try {
            return this.handleBatchGetResponse(backOffer, resp2, version);
        }
        catch (TiKVException e) {
            if ("locks not resolved, retry".equals(e.getMessage())) {
                backOffer.doBackOff(BackOffFunction.BackOffFuncType.BoTxnLock, e);
                return this.batchGet(backOffer, keys, version);
            }
            throw e;
        }
    }

    private List<Kvrpcpb.KvPair> handleBatchGetResponse(BackOffer backOffer, Kvrpcpb.BatchGetResponse resp, long version) {
        boolean forWrite = false;
        if (resp == null) {
            this.regionManager.onRequestFail(this.region);
            throw new TiClientInternalException("BatchGetResponse failed without a cause");
        }
        if (resp.hasRegionError()) {
            throw new RegionException(resp.getRegionError());
        }
        ArrayList<Lock> locks = new ArrayList<Lock>();
        for (Kvrpcpb.KvPair pair : resp.getPairsList()) {
            if (!pair.hasError()) continue;
            if (pair.getError().hasLocked()) {
                Lock lock = new Lock(pair.getError().getLocked());
                locks.add(lock);
                continue;
            }
            throw new KeyException(pair.getError());
        }
        if (!locks.isEmpty()) {
            ResolveLockResult resolveLockResult = this.lockResolverClient.resolveLocks(backOffer, version, locks, forWrite);
            this.addResolvedLocks(version, resolveLockResult.getResolvedLocks());
            throw new TiKVException("locks not resolved, retry");
        }
        return resp.getPairsList();
    }

    public List<Kvrpcpb.KvPair> scan(BackOffer backOffer, ByteString startKey, long version, boolean keyOnly) {
        KVErrorHandler<Kvrpcpb.ScanResponse> handler;
        Supplier<Kvrpcpb.ScanRequest> request;
        Kvrpcpb.ScanResponse resp2;
        boolean forWrite = false;
        do {
            this.region = this.regionManager.getRegionByKey(startKey);
            request = () -> Kvrpcpb.ScanRequest.newBuilder().setContext(this.region.getContext(this.getResolvedLocks(version))).setStartKey(startKey).setVersion(version).setKeyOnly(keyOnly).setLimit(this.getConf().getScanBatchSize()).build();
            handler = new KVErrorHandler<Kvrpcpb.ScanResponse>(this.regionManager, this, this.lockResolverClient, this.region, resp -> resp.hasRegionError() ? resp.getRegionError() : null, resp -> null, resolveLockResult -> this.addResolvedLocks(version, resolveLockResult.getResolvedLocks()), version, forWrite);
        } while (!this.isScanSuccess(backOffer, resp2 = this.callWithRetry(backOffer, TikvGrpc.getKvScanMethod(), request, handler)));
        return this.doScan(resp2);
    }

    private boolean isScanSuccess(BackOffer backOffer, Kvrpcpb.ScanResponse resp) {
        if (resp == null) {
            this.regionManager.onRequestFail(this.region);
            throw new TiClientInternalException("ScanResponse failed without a cause");
        }
        if (resp.hasRegionError()) {
            backOffer.doBackOff(BackOffFunction.BackOffFuncType.BoRegionMiss, new RegionException(resp.getRegionError()));
            return false;
        }
        return true;
    }

    private List<Kvrpcpb.KvPair> doScan(Kvrpcpb.ScanResponse resp) {
        List<Kvrpcpb.KvPair> kvPairs = resp.getPairsList();
        ArrayList<Kvrpcpb.KvPair> newKvPairs = new ArrayList<Kvrpcpb.KvPair>();
        for (Kvrpcpb.KvPair kvPair : kvPairs) {
            if (kvPair.hasError()) {
                Lock lock = AbstractLockResolverClient.extractLockFromKeyErr(kvPair.getError());
                newKvPairs.add(Kvrpcpb.KvPair.newBuilder().setError(kvPair.getError()).setValue(kvPair.getValue()).setKey(lock.getKey()).build());
                continue;
            }
            newKvPairs.add(kvPair);
        }
        return Collections.unmodifiableList(newKvPairs);
    }

    public List<Kvrpcpb.KvPair> scan(BackOffer backOffer, ByteString startKey, long version) {
        return this.scan(backOffer, startKey, version, false);
    }

    public void prewrite(BackOffer backOffer, ByteString primary, Iterable<Kvrpcpb.Mutation> mutations, long startTs, long lockTTL) throws TiClientInternalException, KeyException, RegionException {
        this.prewrite(backOffer, primary, mutations, startTs, lockTTL, false, false, null);
    }

    public void prewrite(BackOffer bo, ByteString primaryLock, Iterable<Kvrpcpb.Mutation> mutations, long startTs, long ttl, boolean skipConstraintCheck, boolean useAsyncCommit, Iterable<ByteString> secondaries) throws TiClientInternalException, KeyException, RegionException {
        KVErrorHandler<Kvrpcpb.PrewriteResponse> handler;
        Supplier<Kvrpcpb.PrewriteRequest> factory;
        Kvrpcpb.PrewriteResponse resp2;
        boolean forWrite = true;
        do {
            factory = () -> {
                Kvrpcpb.PrewriteRequest.Builder builder = Kvrpcpb.PrewriteRequest.newBuilder().setContext(this.region.getContext()).setStartVersion(startTs).setPrimaryLock(primaryLock).addAllMutations(mutations).setLockTtl(ttl).setSkipConstraintCheck(skipConstraintCheck).setTxnSize(16L);
                if (this.getIsV4().booleanValue()) {
                    builder.setMinCommitTs(startTs);
                }
                if (useAsyncCommit) {
                    builder.setUseAsyncCommit(true);
                    if (secondaries != null) {
                        builder.addAllSecondaries(secondaries);
                    }
                }
                return builder.build();
            };
            handler = new KVErrorHandler<Kvrpcpb.PrewriteResponse>(this.regionManager, this, this.lockResolverClient, this.region, resp -> resp.hasRegionError() ? resp.getRegionError() : null, resp -> null, resolveLockResult -> null, startTs, forWrite);
        } while (!this.isPrewriteSuccess(bo, resp2 = this.callWithRetry(bo, TikvGrpc.getKvPrewriteMethod(), factory, handler), startTs));
    }

    private boolean isPrewriteSuccess(BackOffer backOffer, Kvrpcpb.PrewriteResponse resp, long startTs) throws TiClientInternalException, KeyException, RegionException {
        boolean forWrite = true;
        if (resp == null) {
            this.regionManager.onRequestFail(this.region);
            throw new TiClientInternalException("Prewrite Response failed without a cause");
        }
        if (resp.hasRegionError()) {
            throw new RegionException(resp.getRegionError());
        }
        boolean isSuccess = true;
        ArrayList<Lock> locks = new ArrayList<Lock>();
        for (Kvrpcpb.KeyError err : resp.getErrorsList()) {
            if (err.hasLocked()) {
                isSuccess = false;
                Lock lock = new Lock(err.getLocked());
                locks.add(lock);
                continue;
            }
            throw new KeyException(err, err.toString());
        }
        if (isSuccess) {
            return true;
        }
        ResolveLockResult resolveLockResult = this.lockResolverClient.resolveLocks(backOffer, startTs, locks, forWrite);
        this.addResolvedLocks(startTs, resolveLockResult.getResolvedLocks());
        long msBeforeExpired = resolveLockResult.getMsBeforeTxnExpired();
        if (msBeforeExpired > 0L) {
            backOffer.doBackOffWithMaxSleep(BackOffFunction.BackOffFuncType.BoTxnLock, msBeforeExpired, new KeyException(resp.getErrorsList().get(0)));
        }
        return false;
    }

    public void txnHeartBeat(BackOffer bo, ByteString primaryLock, long startTs, long ttl) {
        KVErrorHandler<Kvrpcpb.TxnHeartBeatResponse> handler;
        Supplier<Kvrpcpb.TxnHeartBeatRequest> factory;
        Kvrpcpb.TxnHeartBeatResponse resp2;
        boolean forWrite = false;
        do {
            factory = () -> Kvrpcpb.TxnHeartBeatRequest.newBuilder().setContext(this.region.getContext()).setStartVersion(startTs).setPrimaryLock(primaryLock).setAdviseLockTtl(ttl).build();
            handler = new KVErrorHandler<Kvrpcpb.TxnHeartBeatResponse>(this.regionManager, this, this.lockResolverClient, this.region, resp -> resp.hasRegionError() ? resp.getRegionError() : null, resp -> resp.hasError() ? resp.getError() : null, resolveLockResult -> null, startTs, forWrite);
        } while (!this.isTxnHeartBeatSuccess(resp2 = this.callWithRetry(bo, TikvGrpc.getKvTxnHeartBeatMethod(), factory, handler)));
    }

    private boolean isTxnHeartBeatSuccess(Kvrpcpb.TxnHeartBeatResponse resp) throws TiClientInternalException, RegionException {
        if (resp == null) {
            this.regionManager.onRequestFail(this.region);
            throw new TiClientInternalException("TxnHeartBeat Response failed without a cause");
        }
        if (resp.hasRegionError()) {
            throw new RegionException(resp.getRegionError());
        }
        if (resp.hasError()) {
            throw new TiClientInternalException("TxnHeartBeat fail, " + resp.getError().getAbort());
        }
        return true;
    }

    public void commit(BackOffer backOffer, Iterable<ByteString> keys, long startTs, long commitTs) throws KeyException {
        boolean forWrite = true;
        Supplier<Kvrpcpb.CommitRequest> factory = () -> Kvrpcpb.CommitRequest.newBuilder().setStartVersion(startTs).setCommitVersion(commitTs).addAllKeys(keys).setContext(this.region.getContext()).build();
        KVErrorHandler<Kvrpcpb.CommitResponse> handler = new KVErrorHandler<Kvrpcpb.CommitResponse>(this.regionManager, this, this.lockResolverClient, this.region, resp -> resp.hasRegionError() ? resp.getRegionError() : null, resp -> resp.hasError() ? resp.getError() : null, resolveLockResult -> null, startTs, forWrite);
        Kvrpcpb.CommitResponse resp2 = this.callWithRetry(backOffer, TikvGrpc.getKvCommitMethod(), factory, handler);
        this.handleCommitResponse(resp2);
    }

    private void handleCommitResponse(Kvrpcpb.CommitResponse resp) throws TiClientInternalException, RegionException, KeyException {
        if (resp == null) {
            this.regionManager.onRequestFail(this.region);
            throw new TiClientInternalException("CommitResponse failed without a cause");
        }
        if (resp.hasRegionError()) {
            throw new RegionException(resp.getRegionError());
        }
        if (resp.hasError()) {
            throw new KeyException(resp.getError());
        }
    }

    public List<RangeSplitter.RegionTask> coprocess(BackOffer backOffer, DAGRequest req, List<Coprocessor.KeyRange> ranges, Queue<SelectResponse> responseQueue, long startTs) {
        boolean forWrite = false;
        if (req == null || ranges == null || req.getExecutorsCount() < 1) {
            throw new IllegalArgumentException("Invalid coprocessor argument!");
        }
        Supplier<Coprocessor.Request> reqToSend = () -> Coprocessor.Request.newBuilder().setContext(this.region.getContext(this.getResolvedLocks(startTs))).setTp(RequestTypes.REQ_TYPE_DAG.getValue()).setStartTs(startTs).setData(req.toByteString()).addAllRanges(ranges).build();
        KVErrorHandler<Coprocessor.Response> handler = new KVErrorHandler<Coprocessor.Response>(this.regionManager, this, this.lockResolverClient, this.region, resp -> resp.hasRegionError() ? resp.getRegionError() : null, resp -> null, resolveLockResult -> this.addResolvedLocks(startTs, resolveLockResult.getResolvedLocks()), startTs, forWrite);
        Coprocessor.Response resp2 = this.callWithRetry(backOffer, TikvGrpc.getCoprocessorMethod(), reqToSend, handler);
        return this.handleCopResponse(backOffer, resp2, ranges, responseQueue, startTs);
    }

    private List<RangeSplitter.RegionTask> handleCopResponse(BackOffer backOffer, Coprocessor.Response response, List<Coprocessor.KeyRange> ranges, Queue<SelectResponse> responseQueue, long startTs) {
        boolean forWrite = false;
        if (response == null) {
            backOffer.doBackOff(BackOffFunction.BackOffFuncType.BoRegionMiss, new GrpcException("TiKV down or Network partition"));
            logger.warn("Re-splitting region task due to region error: TiKV down or Network partition");
            return RangeSplitter.newSplitter(this.regionManager).splitRangeByRegion(ranges, this.storeType);
        }
        if (response.hasRegionError()) {
            Errorpb.Error regionError = response.getRegionError();
            backOffer.doBackOff(BackOffFunction.BackOffFuncType.BoRegionMiss, new GrpcException(regionError.toString()));
            logger.warn("Re-splitting region task due to region error:" + regionError.getMessage());
            return RangeSplitter.newSplitter(this.regionManager).splitRangeByRegion(ranges, this.storeType);
        }
        if (response.hasLocked()) {
            Lock lock = new Lock(response.getLocked());
            logger.debug(String.format("coprocessor encounters locks: %s", lock));
            ResolveLockResult resolveLockResult = this.lockResolverClient.resolveLocks(backOffer, startTs, Collections.singletonList(lock), forWrite);
            this.addResolvedLocks(startTs, resolveLockResult.getResolvedLocks());
            long msBeforeExpired = resolveLockResult.getMsBeforeTxnExpired();
            if (msBeforeExpired > 0L) {
                backOffer.doBackOffWithMaxSleep(BackOffFunction.BackOffFuncType.BoTxnLockFast, msBeforeExpired, new LockException(lock));
            }
            return RangeSplitter.newSplitter(this.regionManager).splitRangeByRegion(ranges, this.storeType);
        }
        String otherError = response.getOtherError();
        if (otherError != null && !otherError.isEmpty()) {
            logger.warn(String.format("Other error occurred, message: %s", otherError));
            throw new GrpcException(otherError);
        }
        responseQueue.offer(this.doCoprocessor(response));
        return null;
    }

    private Iterator<SelectResponse> doCoprocessor(StreamingResponse response) {
        final Iterator<Coprocessor.Response> responseIterator = response.iterator();
        if (!responseIterator.hasNext()) {
            return null;
        }
        return new Iterator<SelectResponse>(){

            @Override
            public boolean hasNext() {
                return responseIterator.hasNext();
            }

            @Override
            public SelectResponse next() {
                return RegionStoreClient.this.doCoprocessor((Coprocessor.Response)responseIterator.next());
            }
        };
    }

    private SelectResponse doCoprocessor(Coprocessor.Response resp) {
        try {
            SelectResponse selectResp = SelectResponse.parseFrom(resp.getData());
            if (selectResp.hasError()) {
                throw new SelectException(selectResp.getError(), selectResp.getError().getMsg());
            }
            return selectResp;
        }
        catch (InvalidProtocolBufferException e) {
            throw new TiClientInternalException("Error parsing protobuf for coprocessor response.", e);
        }
    }

    public Iterator<SelectResponse> coprocessStreaming(DAGRequest req, List<Coprocessor.KeyRange> ranges, long startTs) {
        boolean forWrite = false;
        Supplier<Coprocessor.Request> reqToSend = () -> Coprocessor.Request.newBuilder().setContext(this.region.getContext(this.getResolvedLocks(startTs))).setTp(RequestTypes.REQ_TYPE_DAG.getValue()).setData(req.toByteString()).addAllRanges(ranges).build();
        KVErrorHandler<StreamingResponse> handler = new KVErrorHandler<StreamingResponse>(this.regionManager, this, this.lockResolverClient, this.region, StreamingResponse::getFirstRegionError, resp -> null, resolveLockResult -> this.addResolvedLocks(startTs, resolveLockResult.getResolvedLocks()), startTs, forWrite);
        StreamingResponse responseIterator = this.callServerStreamingWithRetry(ConcreteBackOffer.newCopNextMaxBackOff(), TikvGrpc.getCoprocessorStreamMethod(), reqToSend, handler);
        return this.doCoprocessor(responseIterator);
    }

    public List<TiRegion> splitRegion(Iterable<ByteString> splitKeys) {
        Supplier<Kvrpcpb.SplitRegionRequest> request = () -> Kvrpcpb.SplitRegionRequest.newBuilder().setContext(this.region.getContext()).addAllSplitKeys(splitKeys).build();
        KVErrorHandler<Kvrpcpb.SplitRegionResponse> handler = new KVErrorHandler<Kvrpcpb.SplitRegionResponse>(this.regionManager, this, null, this.region, resp -> resp.hasRegionError() ? resp.getRegionError() : null, resp -> null, resolveLockResult -> null, 0L, false);
        Kvrpcpb.SplitRegionResponse resp2 = this.callWithRetry(ConcreteBackOffer.newGetBackOff(), TikvGrpc.getSplitRegionMethod(), request, handler);
        if (resp2 == null) {
            this.regionManager.onRequestFail(this.region);
            throw new TiClientInternalException("SplitRegion Response failed without a cause");
        }
        if (resp2.hasRegionError()) {
            throw new TiClientInternalException(String.format("failed to split region %d because %s", this.region.getId(), resp2.getRegionError().toString()));
        }
        return resp2.getRegionsList().stream().map(region -> new TiRegion((Metapb.Region)region, null, this.conf.getIsolationLevel(), this.conf.getCommandPriority())).collect(Collectors.toList());
    }

    public static class RegionStoreClientBuilder {
        private final TiConfiguration conf;
        private final ChannelFactory channelFactory;
        private final RegionManager regionManager;
        private final PDClient pdClient;

        public RegionStoreClientBuilder(TiConfiguration conf, ChannelFactory channelFactory, RegionManager regionManager, PDClient pdClient) {
            Objects.requireNonNull(conf, "conf is null");
            Objects.requireNonNull(channelFactory, "channelFactory is null");
            Objects.requireNonNull(regionManager, "regionManager is null");
            this.conf = conf;
            this.channelFactory = channelFactory;
            this.regionManager = regionManager;
            this.pdClient = pdClient;
        }

        public RegionStoreClient build(TiRegion region, Metapb.Store store, TiStoreType storeType) throws GrpcException {
            Objects.requireNonNull(region, "region is null");
            Objects.requireNonNull(store, "store is null");
            Objects.requireNonNull(storeType, "storeType is null");
            String addressStr = store.getAddress();
            if (logger.isDebugEnabled()) {
                logger.debug(String.format("Create region store client on address %s", addressStr));
            }
            ManagedChannel channel = this.channelFactory.getChannel(addressStr);
            TikvGrpc.TikvBlockingStub blockingStub = TikvGrpc.newBlockingStub(channel);
            TikvGrpc.TikvStub asyncStub = TikvGrpc.newStub(channel);
            return new RegionStoreClient(this.conf, region, store, storeType, this.channelFactory, blockingStub, asyncStub, this.regionManager, this.pdClient, this);
        }

        public RegionStoreClient build(TiRegion region, Metapb.Store store) throws GrpcException {
            return this.build(region, store, TiStoreType.TiKV);
        }

        public RegionStoreClient build(ByteString key) throws GrpcException {
            return this.build(key, TiStoreType.TiKV);
        }

        public RegionStoreClient build(ByteString key, TiStoreType storeType) throws GrpcException {
            Pair<TiRegion, Metapb.Store> pair = this.regionManager.getRegionStorePairByKey(key, storeType);
            return this.build((TiRegion)pair.first, (Metapb.Store)pair.second, storeType);
        }

        public RegionStoreClient build(TiRegion region) throws GrpcException {
            Metapb.Store store = this.regionManager.getStoreById(region.getLeader().getStoreId());
            return this.build(region, store, TiStoreType.TiKV);
        }

        public RegionManager getRegionManager() {
            return this.regionManager;
        }
    }

    public static enum RequestTypes {
        REQ_TYPE_SELECT(101),
        REQ_TYPE_INDEX(102),
        REQ_TYPE_DAG(103),
        REQ_TYPE_ANALYZE(104),
        BATCH_ROW_COUNT(64);

        private final int value;

        private RequestTypes(int value) {
            this.value = value;
        }

        public int getValue() {
            return this.value;
        }
    }
}

