/*
 * Decompiled with CFR 0.152.
 */
package com.pingcap.tikv;

import com.pingcap.com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.pingcap.tikv.BytePairWrapper;
import com.pingcap.tikv.ByteWrapper;
import com.pingcap.tikv.TiConfiguration;
import com.pingcap.tikv.TiSession;
import com.pingcap.tikv.codec.KeyUtils;
import com.pingcap.tikv.exception.GrpcException;
import com.pingcap.tikv.exception.TiBatchWriteException;
import com.pingcap.tikv.region.RegionManager;
import com.pingcap.tikv.region.TiRegion;
import com.pingcap.tikv.txn.TxnKVClient;
import com.pingcap.tikv.txn.type.BatchKeys;
import com.pingcap.tikv.txn.type.ClientRPCResult;
import com.pingcap.tikv.util.BackOffFunction;
import com.pingcap.tikv.util.BackOffer;
import com.pingcap.tikv.util.ClientUtils;
import com.pingcap.tikv.util.ConcreteBackOffer;
import com.pingcap.tikv.util.LogDesensitization;
import com.pingcap.tikv.util.Pair;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.tikv.kvproto.Kvrpcpb;
import org.tikv.kvproto.Metapb;
import shade.com.google.protobuf.ByteString;

public class TwoPhaseCommitter {
    private static final int WRITE_BUFFER_SIZE = 32768;
    private static final int TXN_COMMIT_BATCH_SIZE = 786432;
    private static final long DEFAULT_BATCH_WRITE_LOCK_TTL = 3600000L;
    private static final Logger LOG = LoggerFactory.getLogger(TwoPhaseCommitter.class);
    private final long startTs;
    private final long lockTTL;
    private final boolean retryCommitSecondaryKeys;
    private final TxnKVClient kvClient;
    private final RegionManager regionManager;
    private final long txnPrewriteBatchSize;
    private final long txnCommitBatchSize;
    private final int writeBufferSize;
    private final int writeThreadPerTask;
    private final int prewriteMaxRetryTimes;
    private final ExecutorService executorService;

    public TwoPhaseCommitter(TiConfiguration conf, long startTime) {
        this.kvClient = TiSession.getInstance(conf).createTxnClient();
        this.regionManager = this.kvClient.getRegionManager();
        this.startTs = startTime;
        this.lockTTL = 3600000L;
        this.retryCommitSecondaryKeys = true;
        this.txnPrewriteBatchSize = 786432L;
        this.txnCommitBatchSize = 786432L;
        this.writeBufferSize = 32768;
        this.writeThreadPerTask = 1;
        this.prewriteMaxRetryTimes = 3;
        this.executorService = this.createExecutorService();
    }

    public TwoPhaseCommitter(TiConfiguration conf, long startTime, long lockTTL, long txnPrewriteBatchSize, long txnCommitBatchSize, int writeBufferSize, int writeThreadPerTask, boolean retryCommitSecondaryKeys, int prewriteMaxRetryTimes) {
        this.kvClient = TiSession.getInstance(conf).createTxnClient();
        this.regionManager = this.kvClient.getRegionManager();
        this.startTs = startTime;
        this.lockTTL = lockTTL;
        this.retryCommitSecondaryKeys = retryCommitSecondaryKeys;
        this.txnPrewriteBatchSize = txnPrewriteBatchSize;
        this.txnCommitBatchSize = txnCommitBatchSize;
        this.writeBufferSize = writeBufferSize;
        this.writeThreadPerTask = writeThreadPerTask;
        this.prewriteMaxRetryTimes = prewriteMaxRetryTimes;
        this.executorService = this.createExecutorService();
    }

    private ExecutorService createExecutorService() {
        return Executors.newFixedThreadPool(this.writeThreadPerTask, new ThreadFactoryBuilder().setNameFormat("2pc-pool-%d").setDaemon(true).build());
    }

    public void close() throws Exception {
        if (this.executorService != null) {
            this.executorService.shutdownNow();
        }
    }

    public void prewritePrimaryKey(BackOffer backOffer, byte[] primaryKey, byte[] value) throws TiBatchWriteException {
        this.doPrewritePrimaryKeyWithRetry(backOffer, ByteString.copyFrom(primaryKey), ByteString.copyFrom(value));
    }

    private void doPrewritePrimaryKeyWithRetry(BackOffer backOffer, ByteString key, ByteString value) throws TiBatchWriteException {
        long lockTTL;
        Pair<TiRegion, Metapb.Store> pair = this.regionManager.getRegionStorePairByKey(key, backOffer);
        TiRegion tiRegion = (TiRegion)pair.first;
        Kvrpcpb.Mutation mutation = !value.isEmpty() ? Kvrpcpb.Mutation.newBuilder().setKey(key).setValue(value).setOp(Kvrpcpb.Op.Put).build() : Kvrpcpb.Mutation.newBuilder().setKey(key).setOp(Kvrpcpb.Op.Del).build();
        List<Kvrpcpb.Mutation> mutationList = Collections.singletonList(mutation);
        ClientRPCResult prewriteResult = this.kvClient.prewrite(backOffer, mutationList, key, lockTTL = this.getTxnLockTTL(this.startTs), this.startTs, tiRegion);
        if (!prewriteResult.isSuccess() && !prewriteResult.isRetry()) {
            throw new TiBatchWriteException("prewrite primary key error", prewriteResult.getException());
        }
        if (prewriteResult.isRetry()) {
            try {
                backOffer.doBackOff(BackOffFunction.BackOffFuncType.BoRegionMiss, new GrpcException(String.format("Txn prewrite primary key failed, regionId=%s", tiRegion.getId()), prewriteResult.getException()));
                this.doPrewritePrimaryKeyWithRetry(backOffer, key, value);
            }
            catch (GrpcException e) {
                String errorMsg = String.format("Txn prewrite primary key error, re-split commit failed, regionId=%s, detail=%s", tiRegion.getId(), e.getMessage());
                throw new TiBatchWriteException(errorMsg, e);
            }
        }
        LOG.info("prewrite primary key {} successfully", (Object)LogDesensitization.hide(KeyUtils.formatBytes(key)));
    }

    public void commitPrimaryKey(BackOffer backOffer, byte[] key, long commitTs) throws TiBatchWriteException {
        this.doCommitPrimaryKeyWithRetry(backOffer, ByteString.copyFrom(key), commitTs);
    }

    private void doCommitPrimaryKeyWithRetry(BackOffer backOffer, ByteString key, long commitTs) throws TiBatchWriteException {
        Pair<TiRegion, Metapb.Store> pair = this.regionManager.getRegionStorePairByKey(key, backOffer);
        TiRegion tiRegion = (TiRegion)pair.first;
        ArrayList<ByteString> keys = new ArrayList<ByteString>();
        keys.add(key);
        ClientRPCResult commitResult = this.kvClient.commit(backOffer, keys, this.startTs, commitTs, tiRegion);
        if (!commitResult.isSuccess()) {
            if (!commitResult.isRetry()) {
                throw new TiBatchWriteException("commit primary key error", commitResult.getException());
            }
            backOffer.doBackOff(BackOffFunction.BackOffFuncType.BoRegionMiss, new GrpcException(String.format("Txn commit primary key failed, regionId=%s", tiRegion.getId()), commitResult.getException()));
            this.doCommitPrimaryKeyWithRetry(backOffer, key, commitTs);
        }
        LOG.info("commit primary key {} successfully", (Object)LogDesensitization.hide(KeyUtils.formatBytes(key)));
    }

    public void prewriteSecondaryKeys(byte[] primaryKey, final Iterator<BytePairWrapper> pairs, int maxBackOfferMS) throws TiBatchWriteException {
        Iterator<Pair<ByteString, ByteString>> byteStringKeys = new Iterator<Pair<ByteString, ByteString>>(){

            @Override
            public boolean hasNext() {
                return pairs.hasNext();
            }

            @Override
            public Pair<ByteString, ByteString> next() {
                BytePairWrapper pair = (BytePairWrapper)pairs.next();
                return new Pair<ByteString, ByteString>(ByteString.copyFrom(pair.getKey()), ByteString.copyFrom(pair.getValue()));
            }
        };
        this.doPrewriteSecondaryKeys(ByteString.copyFrom(primaryKey), byteStringKeys, maxBackOfferMS);
    }

    private void doPrewriteSecondaryKeys(ByteString primaryKey, Iterator<Pair<ByteString, ByteString>> pairs, int maxBackOfferMS) throws TiBatchWriteException {
        try {
            int taskBufferSize = this.writeThreadPerTask * 2;
            int totalSize = 0;
            int cnt = 0;
            ExecutorCompletionService<Void> completionService = new ExecutorCompletionService<Void>(this.executorService);
            while (pairs.hasNext()) {
                ArrayList keyBytes = new ArrayList(this.writeBufferSize);
                ArrayList valueBytes = new ArrayList(this.writeBufferSize);
                while (keyBytes.size() < this.writeBufferSize && pairs.hasNext()) {
                    Pair<ByteString, ByteString> pair = pairs.next();
                    keyBytes.add(pair.first);
                    valueBytes.add(pair.second);
                }
                int curSize = keyBytes.size();
                if (++cnt > taskBufferSize) {
                    completionService.take().get();
                }
                ConcreteBackOffer backOffer = ConcreteBackOffer.newCustomBackOff(maxBackOfferMS);
                completionService.submit(() -> {
                    this.doPrewriteSecondaryKeysInBatchesWithRetry(backOffer, primaryKey, keyBytes, valueBytes, curSize, 0);
                    return null;
                });
                totalSize += keyBytes.size();
            }
            for (int i = 0; i < Math.min(taskBufferSize, cnt); ++i) {
                completionService.take().get();
            }
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new TiBatchWriteException("Current thread interrupted.", e);
        }
        catch (ExecutionException e) {
            throw new TiBatchWriteException("Execution exception met.", e);
        }
    }

    private void doPrewriteSecondaryKeysInBatchesWithRetry(BackOffer backOffer, ByteString primaryKey, List<ByteString> keys, List<ByteString> values, int size, int level) throws TiBatchWriteException {
        if (keys == null || keys.isEmpty() || values == null || values.isEmpty() || size <= 0) {
            return;
        }
        LinkedHashMap<ByteString, Kvrpcpb.Mutation> mutations = new LinkedHashMap<ByteString, Kvrpcpb.Mutation>();
        for (int i = 0; i < size; ++i) {
            ByteString key = keys.get(i);
            ByteString value = values.get(i);
            Kvrpcpb.Mutation mutation = !value.isEmpty() ? Kvrpcpb.Mutation.newBuilder().setKey(key).setValue(value).setOp(Kvrpcpb.Op.Put).build() : Kvrpcpb.Mutation.newBuilder().setKey(key).setOp(Kvrpcpb.Op.Del).build();
            mutations.put(key, mutation);
        }
        Map<TiRegion, List<ByteString>> groupResult = ClientUtils.groupKeysByRegion(this.regionManager, keys, backOffer);
        ArrayList<BatchKeys> batchKeyList = new ArrayList<BatchKeys>();
        for (Map.Entry<TiRegion, List<ByteString>> entry : groupResult.entrySet()) {
            TiRegion tiRegion = entry.getKey();
            this.appendBatchBySize(batchKeyList, tiRegion, entry.getValue(), true, mutations);
        }
        for (BatchKeys batchKeys : batchKeyList) {
            TiRegion currentRegion;
            TiRegion oldRegion = batchKeys.getRegion();
            if (oldRegion.equals(currentRegion = this.regionManager.getRegionByKey(oldRegion.getStartKey(), backOffer))) {
                this.doPrewriteSecondaryKeySingleBatchWithRetry(backOffer, primaryKey, batchKeys, mutations);
                continue;
            }
            if (level > this.prewriteMaxRetryTimes) {
                throw new TiBatchWriteException(String.format("> max retry number %s, oldRegion=%s, currentRegion=%s", this.prewriteMaxRetryTimes, oldRegion, currentRegion));
            }
            LOG.info(String.format("oldRegion=%s != currentRegion=%s, will re-fetch region info and retry", oldRegion, currentRegion));
            this.retryPrewriteBatch(backOffer, primaryKey, batchKeys, mutations, level <= 0 ? 1 : level + 1);
        }
    }

    private void retryPrewriteBatch(BackOffer backOffer, ByteString primaryKey, BatchKeys batchKeys, Map<ByteString, Kvrpcpb.Mutation> mutations, int level) {
        int size = batchKeys.getKeys().size();
        ArrayList<ByteString> keyBytes = new ArrayList<ByteString>(size);
        ArrayList<ByteString> valueBytes = new ArrayList<ByteString>(size);
        for (ByteString k : batchKeys.getKeys()) {
            keyBytes.add(k);
            valueBytes.add(mutations.get(k).getValue());
        }
        this.doPrewriteSecondaryKeysInBatchesWithRetry(backOffer, primaryKey, keyBytes, valueBytes, size, level);
    }

    private void doPrewriteSecondaryKeySingleBatchWithRetry(BackOffer backOffer, ByteString primaryKey, BatchKeys batchKeys, Map<ByteString, Kvrpcpb.Mutation> mutations) throws TiBatchWriteException {
        LOG.info("start prewrite secondary key, row={}, size={}KB, regionId={}", new Object[]{batchKeys.getKeys().size(), Float.valueOf(batchKeys.getSizeInKB()), batchKeys.getRegion().getId()});
        List<ByteString> keyList = batchKeys.getKeys();
        int batchSize = keyList.size();
        ArrayList<Kvrpcpb.Mutation> mutationList = new ArrayList<Kvrpcpb.Mutation>(batchSize);
        for (ByteString key : keyList) {
            mutationList.add(mutations.get(key));
        }
        int txnSize = batchKeys.getKeys().size();
        long lockTTL = this.getTxnLockTTL(this.startTs, txnSize);
        ClientRPCResult prewriteResult = this.kvClient.prewrite(backOffer, mutationList, primaryKey, lockTTL, this.startTs, batchKeys.getRegion());
        if (!prewriteResult.isSuccess() && !prewriteResult.isRetry()) {
            throw new TiBatchWriteException("prewrite secondary key error", prewriteResult.getException());
        }
        if (prewriteResult.isRetry()) {
            LOG.info("prewrite secondary key fail, will backoff and retry");
            try {
                backOffer.doBackOff(BackOffFunction.BackOffFuncType.BoRegionMiss, new GrpcException(String.format("Txn prewrite secondary key SingleBatch failed, regionId=%s", batchKeys.getRegion().getId()), prewriteResult.getException()));
                this.retryPrewriteBatch(backOffer, primaryKey, batchKeys, mutations, 0);
            }
            catch (GrpcException e) {
                String errorMsg = String.format("Txn prewrite secondary key SingleBatch error, re-split commit failed, regionId=%s, detail=%s", batchKeys.getRegion().getId(), e.getMessage());
                throw new TiBatchWriteException(errorMsg, e);
            }
        }
        LOG.info("prewrite secondary key successfully, row={}, size={}KB, regionId={}", new Object[]{batchKeys.getKeys().size(), Float.valueOf(batchKeys.getSizeInKB()), batchKeys.getRegion().getId()});
    }

    private void appendBatchBySize(List<BatchKeys> batchKeyList, TiRegion tiRegion, List<ByteString> keys, boolean sizeIncludeValue, Map<ByteString, Kvrpcpb.Mutation> mutations) {
        long commitBatchSize;
        long l = commitBatchSize = sizeIncludeValue ? this.txnPrewriteBatchSize : this.txnCommitBatchSize;
        if (keys == null) {
            return;
        }
        int len = keys.size();
        int start = 0;
        while (start < len) {
            int end;
            int sizeInBytes = 0;
            for (end = start; end < len && (long)sizeInBytes < commitBatchSize; ++end) {
                sizeInBytes = sizeIncludeValue ? (int)((long)sizeInBytes + this.keyValueSize(keys.get(end), mutations)) : (int)((long)sizeInBytes + this.keySize(keys.get(end)));
            }
            BatchKeys batchKeys = new BatchKeys(tiRegion, keys.subList(start, end), sizeInBytes);
            batchKeyList.add(batchKeys);
            start = end;
        }
    }

    private long keyValueSize(ByteString key, Map<ByteString, Kvrpcpb.Mutation> mutations) {
        long size = key.size();
        Kvrpcpb.Mutation mutation = mutations.get(key);
        if (mutation != null) {
            size += (long)mutation.getValue().toByteArray().length;
        }
        return size;
    }

    private long keySize(ByteString key) {
        return key.size();
    }

    public void commitSecondaryKeys(final Iterator<ByteWrapper> keys, long commitTs, int commitBackOfferMS) throws TiBatchWriteException {
        Iterator<ByteString> byteStringKeys = new Iterator<ByteString>(){

            @Override
            public boolean hasNext() {
                return keys.hasNext();
            }

            @Override
            public ByteString next() {
                return ByteString.copyFrom(((ByteWrapper)keys.next()).getBytes());
            }
        };
        this.doCommitSecondaryKeys(byteStringKeys, commitTs, commitBackOfferMS);
    }

    private void doCommitSecondaryKeys(Iterator<ByteString> keys, long commitTs, int commitBackOfferMS) throws TiBatchWriteException {
        try {
            int taskBufferSize = this.writeThreadPerTask * 2;
            int totalSize = 0;
            int cnt = 0;
            ExecutorCompletionService<Void> completionService = new ExecutorCompletionService<Void>(this.executorService);
            while (keys.hasNext()) {
                ArrayList<ByteString> keyBytes = new ArrayList<ByteString>(this.writeBufferSize);
                while (keyBytes.size() < this.writeBufferSize && keys.hasNext()) {
                    keyBytes.add(keys.next());
                }
                int curSize = keyBytes.size();
                if (++cnt > taskBufferSize) {
                    completionService.take().get();
                }
                ConcreteBackOffer backOffer = ConcreteBackOffer.newCustomBackOff(commitBackOfferMS);
                completionService.submit(() -> {
                    this.doCommitSecondaryKeysWithRetry(backOffer, keyBytes, curSize, commitTs);
                    return null;
                });
                totalSize += keyBytes.size();
            }
            for (int i = 0; i < Math.min(taskBufferSize, cnt); ++i) {
                completionService.take().get();
            }
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new TiBatchWriteException("Current thread interrupted.", e);
        }
        catch (ExecutionException e) {
            throw new TiBatchWriteException("Execution exception met.", e);
        }
    }

    private void doCommitSecondaryKeysWithRetry(BackOffer backOffer, List<ByteString> keys, int size, long commitTs) throws TiBatchWriteException {
        if (keys == null || keys.isEmpty() || size <= 0) {
            return;
        }
        Map<TiRegion, List<ByteString>> groupResult = ClientUtils.groupKeysByRegion(this.regionManager, keys, backOffer);
        ArrayList<BatchKeys> batchKeyList = new ArrayList<BatchKeys>();
        for (Map.Entry<TiRegion, List<ByteString>> entry : groupResult.entrySet()) {
            TiRegion tiRegion = entry.getKey();
            this.appendBatchBySize(batchKeyList, tiRegion, entry.getValue(), false, null);
        }
        for (BatchKeys batchKeys : batchKeyList) {
            this.doCommitSecondaryKeySingleBatchWithRetry(backOffer, batchKeys, commitTs);
        }
    }

    private void doCommitSecondaryKeySingleBatchWithRetry(BackOffer backOffer, BatchKeys batchKeys, long commitTs) throws TiBatchWriteException {
        LOG.info("start commit secondary key, row={}, size={}KB, regionId={}", new Object[]{batchKeys.getKeys().size(), Float.valueOf(batchKeys.getSizeInKB()), batchKeys.getRegion().getId()});
        List<ByteString> keysCommit = batchKeys.getKeys();
        ClientRPCResult commitResult = this.kvClient.commit(backOffer, keysCommit, this.startTs, commitTs, batchKeys.getRegion());
        if (this.retryCommitSecondaryKeys && commitResult.isRetry()) {
            this.doCommitSecondaryKeysWithRetry(backOffer, keysCommit, keysCommit.size(), commitTs);
        } else if (!commitResult.isSuccess()) {
            String error = String.format("Txn commit secondary key error, regionId=%s", batchKeys.getRegion());
            LOG.warn(error);
            throw new TiBatchWriteException("commit secondary key error", commitResult.getException());
        }
        LOG.info("commit {} rows successfully, size={}KB, regionId={}", new Object[]{batchKeys.getKeys().size(), Float.valueOf(batchKeys.getSizeInKB()), batchKeys.getRegion().getId()});
    }

    private long getTxnLockTTL(long startTime) {
        return this.lockTTL;
    }

    private long getTxnLockTTL(long startTime, int txnSize) {
        return this.lockTTL;
    }
}

