/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.index.hbase;

import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Date;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.client.common.HoodieSparkEngineContext;
import org.apache.hudi.client.utils.SparkMemoryUtils;
import org.apache.hudi.common.data.HoodieData;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.model.EmptyHoodieRecordPayload;
import org.apache.hudi.common.model.HoodieAvroRecord;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordDelegate;
import org.apache.hudi.common.model.HoodieRecordLocation;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.RateLimiter;
import org.apache.hudi.common.util.ReflectionUtils;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.data.HoodieJavaRDD;
import org.apache.hudi.exception.HoodieDependentSystemUnavailableException;
import org.apache.hudi.exception.HoodieIndexException;
import org.apache.hudi.index.HoodieIndex;
import org.apache.hudi.index.HoodieIndexUtils;
import org.apache.hudi.index.hbase.DefaultHBaseQPSResourceAllocator;
import org.apache.hudi.index.hbase.HBaseIndexQPSResourceAllocator;
import org.apache.hudi.org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hudi.org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hudi.org.apache.hadoop.hbase.TableName;
import org.apache.hudi.org.apache.hadoop.hbase.client.BufferedMutator;
import org.apache.hudi.org.apache.hadoop.hbase.client.Connection;
import org.apache.hudi.org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hudi.org.apache.hadoop.hbase.client.Delete;
import org.apache.hudi.org.apache.hadoop.hbase.client.Get;
import org.apache.hudi.org.apache.hadoop.hbase.client.HTable;
import org.apache.hudi.org.apache.hadoop.hbase.client.Mutation;
import org.apache.hudi.org.apache.hadoop.hbase.client.Put;
import org.apache.hudi.org.apache.hadoop.hbase.client.RegionLocator;
import org.apache.hudi.org.apache.hadoop.hbase.client.Result;
import org.apache.hudi.org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hudi.org.apache.hadoop.hbase.client.Scan;
import org.apache.hudi.org.apache.hadoop.hbase.util.Bytes;
import org.apache.hudi.table.HoodieTable;
import org.apache.spark.Partitioner;
import org.apache.spark.SparkConf;
import org.apache.spark.SparkFiles;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import org.joda.time.DateTime;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.Tuple2;

public class SparkHoodieHBaseIndex
extends HoodieIndex<Object, Object> {
    public static final String DEFAULT_SPARK_EXECUTOR_INSTANCES_CONFIG_NAME = "spark.executor.instances";
    public static final String DEFAULT_SPARK_DYNAMIC_ALLOCATION_ENABLED_CONFIG_NAME = "spark.dynamicAllocation.enabled";
    public static final String DEFAULT_SPARK_DYNAMIC_ALLOCATION_MAX_EXECUTORS_CONFIG_NAME = "spark.dynamicAllocation.maxExecutors";
    private static final byte[] SYSTEM_COLUMN_FAMILY = Bytes.toBytes("_s");
    private static final byte[] COMMIT_TS_COLUMN = Bytes.toBytes("commit_ts");
    private static final byte[] FILE_NAME_COLUMN = Bytes.toBytes("file_name");
    private static final byte[] PARTITION_PATH_COLUMN = Bytes.toBytes("partition_path");
    private static final Logger LOG = LoggerFactory.getLogger(SparkHoodieHBaseIndex.class);
    private static Connection hbaseConnection = null;
    private HBaseIndexQPSResourceAllocator hBaseIndexQPSResourceAllocator = null;
    private int maxQpsPerRegionServer;
    private long totalNumInserts;
    private int numWriteStatusWithInserts;
    private static transient Thread shutdownThread;
    private Integer multiPutBatchSize;
    private Integer numRegionServersForTable;
    private final String tableName;
    private HBasePutBatchSizeCalculator putBatchSizeCalculator;

    public SparkHoodieHBaseIndex(HoodieWriteConfig config) {
        super(config);
        this.tableName = config.getHbaseTableName();
        this.addShutDownHook();
        this.init(config);
    }

    private void init(HoodieWriteConfig config) {
        this.multiPutBatchSize = config.getHbaseIndexPutBatchSize();
        this.maxQpsPerRegionServer = config.getHbaseIndexMaxQPSPerRegionServer();
        this.putBatchSizeCalculator = new HBasePutBatchSizeCalculator();
        this.hBaseIndexQPSResourceAllocator = this.createQPSResourceAllocator(this.config);
    }

    public HBaseIndexQPSResourceAllocator createQPSResourceAllocator(HoodieWriteConfig config) {
        try {
            LOG.info("createQPSResourceAllocator :" + config.getHBaseQPSResourceAllocatorClass());
            return (HBaseIndexQPSResourceAllocator)ReflectionUtils.loadClass(config.getHBaseQPSResourceAllocatorClass(), config);
        }
        catch (Exception e) {
            LOG.warn("error while instantiating HBaseIndexQPSResourceAllocator", (Throwable)e);
            return new DefaultHBaseQPSResourceAllocator(config);
        }
    }

    private Connection getHBaseConnection() {
        Configuration hbaseConfig = HBaseConfiguration.create();
        String quorum = this.config.getHbaseZkQuorum();
        hbaseConfig.set("hbase.zookeeper.quorum", quorum);
        String zkZnodeParent = this.config.getHBaseZkZnodeParent();
        if (zkZnodeParent != null) {
            hbaseConfig.set("zookeeper.znode.parent", zkZnodeParent);
        }
        String port = String.valueOf(this.config.getHbaseZkPort());
        hbaseConfig.set("hbase.zookeeper.property.clientPort", port);
        try {
            String authentication = this.config.getHBaseIndexSecurityAuthentication();
            if (authentication.equals("kerberos")) {
                hbaseConfig.set("hbase.security.authentication", "kerberos");
                hbaseConfig.set("hadoop.security.authentication", "kerberos");
                hbaseConfig.set("hbase.security.authorization", "true");
                hbaseConfig.set("hbase.regionserver.kerberos.principal", this.config.getHBaseIndexRegionserverPrincipal());
                hbaseConfig.set("hbase.master.kerberos.principal", this.config.getHBaseIndexMasterPrincipal());
                String principal = this.config.getHBaseIndexKerberosUserPrincipal();
                String keytab = SparkFiles.get((String)this.config.getHBaseIndexKerberosUserKeytab());
                UserGroupInformation.setConfiguration((Configuration)hbaseConfig);
                UserGroupInformation ugi = UserGroupInformation.loginUserFromKeytabAndReturnUGI((String)principal, (String)keytab);
                return (Connection)ugi.doAs(() -> ConnectionFactory.createConnection(hbaseConfig));
            }
            return ConnectionFactory.createConnection(hbaseConfig);
        }
        catch (IOException | InterruptedException e) {
            throw new HoodieDependentSystemUnavailableException("HBASE", quorum + ":" + port, e);
        }
    }

    private void addShutDownHook() {
        if (null == shutdownThread) {
            shutdownThread = new Thread(() -> {
                try {
                    hbaseConnection.close();
                }
                catch (Exception exception) {
                    // empty catch block
                }
            });
            Runtime.getRuntime().addShutdownHook(shutdownThread);
        }
    }

    @Override
    public void close() {
        LOG.info("No resources to release from Hbase index");
    }

    private Get generateStatement(String key) throws IOException {
        return new Get(Bytes.toBytes(this.getHBaseKey(key))).readVersions(1).addColumn(SYSTEM_COLUMN_FAMILY, COMMIT_TS_COLUMN).addColumn(SYSTEM_COLUMN_FAMILY, FILE_NAME_COLUMN).addColumn(SYSTEM_COLUMN_FAMILY, PARTITION_PATH_COLUMN);
    }

    private Get generateStatement(String key, long startTime, long endTime) throws IOException {
        return this.generateStatement(key).setTimeRange(startTime, endTime);
    }

    protected String getHBaseKey(String key) {
        return key;
    }

    private <R> Function2<Integer, Iterator<HoodieRecord<R>>, Iterator<HoodieRecord<R>>> locationTagFunction(HoodieTableMetaClient metaClient) {
        Integer multiGetBatchSize = this.config.getHbaseIndexGetBatchSize();
        return (Function2 & Serializable)(partitionNum, hoodieRecordIterator) -> {
            boolean updatePartitionPath = this.config.getHbaseIndexUpdatePartitionPath();
            RateLimiter limiter = RateLimiter.create(multiGetBatchSize * 10, TimeUnit.SECONDS);
            Class<SparkHoodieHBaseIndex> clazz = SparkHoodieHBaseIndex.class;
            synchronized (SparkHoodieHBaseIndex.class) {
                if (hbaseConnection == null || hbaseConnection.isClosed()) {
                    hbaseConnection = this.getHBaseConnection();
                }
                // ** MonitorExit[var7_7] (shouldn't be in output)
                ArrayList taggedRecords = new ArrayList();
                try (HTable hTable = (HTable)hbaseConnection.getTable(TableName.valueOf(this.tableName));){
                    ArrayList<Get> statements = new ArrayList<Get>();
                    LinkedList<HoodieRecord> currentBatchOfRecords = new LinkedList<HoodieRecord>();
                    HoodieTimeline completedCommitsTimeline = metaClient.getCommitsTimeline().filterCompletedInstants();
                    while (hoodieRecordIterator.hasNext()) {
                        HoodieRecord rec = (HoodieRecord)hoodieRecordIterator.next();
                        statements.add(this.generateStatement(rec.getRecordKey()));
                        currentBatchOfRecords.add(rec);
                        if (hoodieRecordIterator.hasNext() && statements.size() < multiGetBatchSize) continue;
                        Result[] results = this.doGet(hTable, statements, limiter);
                        statements.clear();
                        for (Result result : results) {
                            HoodieAvroRecord<HoodieRecordPayload> currentRecord = (HoodieAvroRecord<HoodieRecordPayload>)currentBatchOfRecords.remove(0);
                            if (result.getRow() == null) {
                                taggedRecords.add(currentRecord);
                                continue;
                            }
                            String keyFromResult = Bytes.toString(result.getRow());
                            String commitTs = Bytes.toString(result.getValue(SYSTEM_COLUMN_FAMILY, COMMIT_TS_COLUMN));
                            String fileId = Bytes.toString(result.getValue(SYSTEM_COLUMN_FAMILY, FILE_NAME_COLUMN));
                            String partitionPath = Bytes.toString(result.getValue(SYSTEM_COLUMN_FAMILY, PARTITION_PATH_COLUMN));
                            if (!HoodieIndexUtils.checkIfValidCommit(completedCommitsTimeline, commitTs)) {
                                taggedRecords.add(currentRecord);
                                continue;
                            }
                            if (updatePartitionPath && !partitionPath.equals(currentRecord.getPartitionPath())) {
                                HoodieAvroRecord<EmptyHoodieRecordPayload> emptyRecord = new HoodieAvroRecord<EmptyHoodieRecordPayload>(new HoodieKey(currentRecord.getRecordKey(), partitionPath), new EmptyHoodieRecordPayload());
                                emptyRecord.unseal();
                                emptyRecord.setCurrentLocation(new HoodieRecordLocation(commitTs, fileId));
                                emptyRecord.setIgnoreIndexUpdate(true);
                                emptyRecord.seal();
                                currentRecord = new HoodieAvroRecord<HoodieRecordPayload>(new HoodieKey(currentRecord.getRecordKey(), currentRecord.getPartitionPath()), (HoodieRecordPayload)((HoodieRecord)currentRecord).getData());
                                taggedRecords.add(emptyRecord);
                                taggedRecords.add(currentRecord);
                                continue;
                            }
                            currentRecord = new HoodieAvroRecord<HoodieRecordPayload>(new HoodieKey(currentRecord.getRecordKey(), partitionPath), (HoodieRecordPayload)((HoodieRecord)currentRecord).getData());
                            currentRecord.unseal();
                            currentRecord.setCurrentLocation(new HoodieRecordLocation(commitTs, fileId));
                            currentRecord.seal();
                            taggedRecords.add(currentRecord);
                            assert (currentRecord.getRecordKey().contentEquals(keyFromResult));
                        }
                    }
                }
                catch (IOException e) {
                    throw new HoodieIndexException("Failed to Tag indexed locations because of exception with HBase Client", e);
                }
                finally {
                    limiter.stop();
                }
                return taggedRecords.iterator();
            }
        };
    }

    private Result[] doGet(HTable hTable, List<Get> keys2, RateLimiter limiter) throws IOException {
        if (keys2.size() > 0) {
            limiter.tryAcquire(keys2.size());
            return hTable.get(keys2);
        }
        return new Result[keys2.size()];
    }

    @Override
    public <R> HoodieData<HoodieRecord<R>> tagLocation(HoodieData<HoodieRecord<R>> records, HoodieEngineContext context, HoodieTable hoodieTable) {
        return HoodieJavaRDD.of(HoodieJavaRDD.getJavaRDD(records).mapPartitionsWithIndex(this.locationTagFunction(hoodieTable.getMetaClient()), true));
    }

    private Function2<Integer, Iterator<WriteStatus>, Iterator<WriteStatus>> updateLocationFunction() {
        return (Function2 & Serializable)(partition, statusIterator) -> {
            ArrayList<WriteStatus> writeStatusList = new ArrayList<WriteStatus>();
            Class<SparkHoodieHBaseIndex> clazz = SparkHoodieHBaseIndex.class;
            synchronized (SparkHoodieHBaseIndex.class) {
                if (hbaseConnection == null || hbaseConnection.isClosed()) {
                    hbaseConnection = this.getHBaseConnection();
                }
                // ** MonitorExit[var4_4] (shouldn't be in output)
                long startTimeForPutsTask = DateTime.now().getMillis();
                LOG.info("startTimeForPutsTask for this task: " + startTimeForPutsTask);
                RateLimiter limiter = RateLimiter.create(this.multiPutBatchSize, TimeUnit.SECONDS);
                try (BufferedMutator mutator = hbaseConnection.getBufferedMutator(TableName.valueOf(this.tableName));){
                    while (statusIterator.hasNext()) {
                        WriteStatus writeStatus = (WriteStatus)statusIterator.next();
                        ArrayList<Mutation> mutations = new ArrayList<Mutation>();
                        try {
                            long numOfInserts = writeStatus.getStat().getNumInserts();
                            LOG.info("Num of inserts in this WriteStatus: " + numOfInserts);
                            LOG.info("Total inserts in this job: " + this.totalNumInserts);
                            LOG.info("multiPutBatchSize for this job: " + this.multiPutBatchSize);
                            for (HoodieRecordDelegate recordDelegate : writeStatus.getWrittenRecordDelegates()) {
                                if (!writeStatus.isErrored(recordDelegate.getHoodieKey())) {
                                    if (recordDelegate.getIgnoreIndexUpdate()) continue;
                                    Option<HoodieRecordLocation> loc = recordDelegate.getNewLocation();
                                    if (loc.isPresent()) {
                                        if (recordDelegate.getCurrentLocation().isPresent()) continue;
                                        Put put = new Put(Bytes.toBytes(this.getHBaseKey(recordDelegate.getRecordKey())));
                                        put.addColumn(SYSTEM_COLUMN_FAMILY, COMMIT_TS_COLUMN, Bytes.toBytes(loc.get().getInstantTime()));
                                        put.addColumn(SYSTEM_COLUMN_FAMILY, FILE_NAME_COLUMN, Bytes.toBytes(loc.get().getFileId()));
                                        put.addColumn(SYSTEM_COLUMN_FAMILY, PARTITION_PATH_COLUMN, Bytes.toBytes(recordDelegate.getPartitionPath()));
                                        mutations.add(put);
                                    } else {
                                        Delete delete = new Delete(Bytes.toBytes(this.getHBaseKey(recordDelegate.getRecordKey())));
                                        mutations.add(delete);
                                    }
                                }
                                if (mutations.size() < this.multiPutBatchSize) continue;
                                this.doMutations(mutator, mutations, limiter);
                            }
                            this.doMutations(mutator, mutations, limiter);
                        }
                        catch (Exception e) {
                            Exception we = new Exception("Error updating index for " + writeStatus, e);
                            LOG.error(we.getMessage(), (Throwable)e);
                            writeStatus.setGlobalError(we);
                        }
                        writeStatusList.add(writeStatus);
                    }
                    long endPutsTime = DateTime.now().getMillis();
                    LOG.info("hbase puts task time for this task: " + (endPutsTime - startTimeForPutsTask));
                }
                catch (IOException e) {
                    throw new HoodieIndexException("Failed to Update Index locations because of exception with HBase Client", e);
                }
                finally {
                    limiter.stop();
                }
                return writeStatusList.iterator();
            }
        };
    }

    private void doMutations(BufferedMutator mutator, List<Mutation> mutations, RateLimiter limiter) throws IOException {
        if (mutations.isEmpty()) {
            return;
        }
        limiter.tryAcquire(mutations.size());
        mutator.mutate(mutations);
        mutator.flush();
        mutations.clear();
    }

    Map<String, Integer> mapFileWithInsertsToUniquePartition(JavaRDD<WriteStatus> writeStatusRDD) {
        HashMap<String, Integer> fileIdPartitionMap = new HashMap<String, Integer>();
        int partitionIndex = 0;
        List fileIds = writeStatusRDD.filter((Function & Serializable)w -> w.getStat().getNumInserts() > 0L).map(WriteStatus::getFileId).collect();
        for (String fileId : fileIds) {
            fileIdPartitionMap.put(fileId, partitionIndex++);
        }
        return fileIdPartitionMap;
    }

    @Override
    public HoodieData<WriteStatus> updateLocation(HoodieData<WriteStatus> writeStatus, HoodieEngineContext context, HoodieTable hoodieTable) {
        JavaRDD writeStatusRDD = HoodieJavaRDD.getJavaRDD(writeStatus);
        Option<Float> desiredQPSFraction = this.calculateQPSFraction(writeStatusRDD);
        Map<String, Integer> fileIdPartitionMap = this.mapFileWithInsertsToUniquePartition(writeStatusRDD);
        JavaRDD partitionedRDD = this.numWriteStatusWithInserts == 0 ? writeStatusRDD : writeStatusRDD.mapToPair((PairFunction & Serializable)w -> new Tuple2((Object)w.getFileId(), w)).partitionBy((Partitioner)new WriteStatusPartitioner(fileIdPartitionMap, this.numWriteStatusWithInserts)).map(Tuple2::_2);
        JavaSparkContext jsc = HoodieSparkEngineContext.getSparkContext(context);
        this.acquireQPSResourcesAndSetBatchSize(desiredQPSFraction, jsc);
        JavaRDD writeStatusJavaRDD = partitionedRDD.mapPartitionsWithIndex(this.updateLocationFunction(), true);
        writeStatusJavaRDD = writeStatusJavaRDD.persist(SparkMemoryUtils.getWriteStatusStorageLevel(this.config.getProps()));
        writeStatusJavaRDD.count();
        this.hBaseIndexQPSResourceAllocator.releaseQPSResources();
        return HoodieJavaRDD.of(writeStatusJavaRDD);
    }

    private Option<Float> calculateQPSFraction(JavaRDD<WriteStatus> writeStatusRDD) {
        if (this.config.getHbaseIndexPutBatchSizeAutoCompute()) {
            Tuple2<Long, Integer> numPutsParallelismTuple = this.getHBasePutAccessParallelism(writeStatusRDD);
            this.totalNumInserts = (Long)numPutsParallelismTuple._1;
            this.numWriteStatusWithInserts = (Integer)numPutsParallelismTuple._2;
            this.numRegionServersForTable = this.getNumRegionServersAliveForTable();
            float desiredQPSFraction = this.hBaseIndexQPSResourceAllocator.calculateQPSFractionForPutsTime(this.totalNumInserts, this.numRegionServersForTable);
            LOG.info("Desired QPSFraction :" + desiredQPSFraction);
            LOG.info("Number HBase puts :" + this.totalNumInserts);
            LOG.info("Number of WriteStatus with inserts :" + this.numWriteStatusWithInserts);
            return Option.of(Float.valueOf(desiredQPSFraction));
        }
        return Option.empty();
    }

    private void acquireQPSResourcesAndSetBatchSize(Option<Float> desiredQPSFraction, JavaSparkContext jsc) {
        if (this.config.getHbaseIndexPutBatchSizeAutoCompute()) {
            SparkConf conf = jsc.getConf();
            int maxExecutors = conf.getInt(DEFAULT_SPARK_EXECUTOR_INSTANCES_CONFIG_NAME, 1);
            if (conf.getBoolean(DEFAULT_SPARK_DYNAMIC_ALLOCATION_ENABLED_CONFIG_NAME, false)) {
                maxExecutors = Math.max(maxExecutors, conf.getInt(DEFAULT_SPARK_DYNAMIC_ALLOCATION_MAX_EXECUTORS_CONFIG_NAME, 1));
            }
            float availableQpsFraction = this.hBaseIndexQPSResourceAllocator.acquireQPSResources(desiredQPSFraction.get().floatValue(), this.totalNumInserts);
            LOG.info("Allocated QPS Fraction :" + availableQpsFraction);
            this.multiPutBatchSize = this.putBatchSizeCalculator.getBatchSize(this.numRegionServersForTable, this.maxQpsPerRegionServer, this.numWriteStatusWithInserts, maxExecutors, availableQpsFraction);
            LOG.info("multiPutBatchSize :" + this.multiPutBatchSize);
        }
    }

    Tuple2<Long, Integer> getHBasePutAccessParallelism(JavaRDD<WriteStatus> writeStatusRDD) {
        JavaPairRDD insertOnlyWriteStatusRDD = writeStatusRDD.filter((Function & Serializable)w -> w.getStat().getNumInserts() > 0L).mapToPair((PairFunction & Serializable)w -> new Tuple2((Object)w.getStat().getNumInserts(), (Object)1));
        return (Tuple2)insertOnlyWriteStatusRDD.fold((Object)new Tuple2((Object)0L, (Object)0), (Function2 & Serializable)(w, c) -> new Tuple2((Object)((Long)w._1 + (Long)c._1), (Object)((Integer)w._2 + (Integer)c._2)));
    }

    /*
     * Enabled aggressive block sorting
     * Enabled unnecessary exception pruning
     * Enabled aggressive exception aggregation
     */
    private Integer getNumRegionServersAliveForTable() {
        if (this.numRegionServersForTable != null) return this.numRegionServersForTable;
        try (Connection conn = this.getHBaseConnection();){
            RegionLocator regionLocator = conn.getRegionLocator(TableName.valueOf(this.tableName));
            Integer n = this.numRegionServersForTable = Integer.valueOf(Math.toIntExact(regionLocator.getAllRegionLocations().stream().map(HRegionLocation::getServerName).distinct().count()));
            return n;
        }
        catch (IOException e) {
            LOG.error("Get region locator error", (Throwable)e);
            throw new RuntimeException(e);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public boolean rollbackCommit(String instantTime) {
        int multiGetBatchSize = this.config.getHbaseIndexGetBatchSize();
        boolean rollbackSync = this.config.getHBaseIndexRollbackSync();
        if (!this.config.getHBaseIndexRollbackSync().booleanValue()) {
            return true;
        }
        Class<SparkHoodieHBaseIndex> clazz = SparkHoodieHBaseIndex.class;
        synchronized (SparkHoodieHBaseIndex.class) {
            if (hbaseConnection == null || hbaseConnection.isClosed()) {
                hbaseConnection = this.getHBaseConnection();
            }
            // ** MonitorExit[var4_4] (shouldn't be in output)
            RateLimiter limiter = RateLimiter.create(this.multiPutBatchSize, TimeUnit.SECONDS);
            try (HTable hTable = (HTable)hbaseConnection.getTable(TableName.valueOf(this.tableName));
                 BufferedMutator mutator = hbaseConnection.getBufferedMutator(TableName.valueOf(this.tableName));){
                Long rollbackTime = HoodieActiveTimeline.parseDateFromInstantTime(instantTime).getTime();
                Long currentTime = new Date().getTime();
                Scan scan = new Scan();
                scan.addFamily(SYSTEM_COLUMN_FAMILY);
                scan.setTimeRange(rollbackTime, currentTime);
                ResultScanner scanner = hTable.getScanner(scan);
                Iterator<Result> scannerIterator = scanner.iterator();
                ArrayList<Get> statements = new ArrayList<Get>();
                ArrayList<Result> currentVersionResults = new ArrayList<Result>();
                ArrayList<Mutation> mutations = new ArrayList<Mutation>();
                while (scannerIterator.hasNext()) {
                    Result result = scannerIterator.next();
                    currentVersionResults.add(result);
                    statements.add(this.generateStatement(Bytes.toString(result.getRow()), 0L, rollbackTime - 1L));
                    if (scannerIterator.hasNext() && statements.size() < multiGetBatchSize) continue;
                    Result[] lastVersionResults = hTable.get(statements);
                    for (int i = 0; i < lastVersionResults.length; ++i) {
                        String nowPath;
                        String oldPath;
                        Result lastVersionResult = lastVersionResults[i];
                        if (null == lastVersionResult.getRow() && rollbackSync) {
                            Result currentVersionResult = (Result)currentVersionResults.get(i);
                            Delete delete = new Delete(currentVersionResult.getRow());
                            mutations.add(delete);
                        }
                        if (null == lastVersionResult.getRow() || (oldPath = new String(lastVersionResult.getValue(SYSTEM_COLUMN_FAMILY, PARTITION_PATH_COLUMN))).equals(nowPath = new String(((Result)currentVersionResults.get(i)).getValue(SYSTEM_COLUMN_FAMILY, PARTITION_PATH_COLUMN))) && !rollbackSync) continue;
                        Put put = new Put(lastVersionResult.getRow());
                        put.addColumn(SYSTEM_COLUMN_FAMILY, COMMIT_TS_COLUMN, lastVersionResult.getValue(SYSTEM_COLUMN_FAMILY, COMMIT_TS_COLUMN));
                        put.addColumn(SYSTEM_COLUMN_FAMILY, FILE_NAME_COLUMN, lastVersionResult.getValue(SYSTEM_COLUMN_FAMILY, FILE_NAME_COLUMN));
                        put.addColumn(SYSTEM_COLUMN_FAMILY, PARTITION_PATH_COLUMN, lastVersionResult.getValue(SYSTEM_COLUMN_FAMILY, PARTITION_PATH_COLUMN));
                        mutations.add(put);
                    }
                    this.doMutations(mutator, mutations, limiter);
                    currentVersionResults.clear();
                    statements.clear();
                    mutations.clear();
                }
            }
            catch (Exception e) {
                LOG.error("hbase index roll back failed", (Throwable)e);
                boolean bl = false;
                return bl;
            }
            finally {
                limiter.stop();
            }
            return true;
        }
    }

    @Override
    public boolean isGlobal() {
        return true;
    }

    @Override
    public boolean canIndexLogFiles() {
        return true;
    }

    @Override
    public boolean isImplicitWithStorage() {
        return false;
    }

    public void setHbaseConnection(Connection hbaseConnection) {
        SparkHoodieHBaseIndex.hbaseConnection = hbaseConnection;
    }

    public static class WriteStatusPartitioner
    extends Partitioner {
        private int totalPartitions;
        final Map<String, Integer> fileIdPartitionMap;

        public WriteStatusPartitioner(Map<String, Integer> fileIdPartitionMap, int totalPartitions) {
            this.totalPartitions = totalPartitions;
            this.fileIdPartitionMap = fileIdPartitionMap;
        }

        public int numPartitions() {
            return this.totalPartitions;
        }

        public int getPartition(Object key) {
            String fileId = (String)key;
            if (!this.fileIdPartitionMap.containsKey(fileId)) {
                LOG.info("This writestatus(fileId: " + fileId + ") is not mapped because it doesn't have any inserts. In this case, we can assign a random partition to this WriteStatus.");
                return Math.abs(fileId.hashCode()) % this.totalPartitions;
            }
            return this.fileIdPartitionMap.get(fileId);
        }
    }

    public static class HBasePutBatchSizeCalculator
    implements Serializable {
        private static final Logger LOG = LoggerFactory.getLogger(HBasePutBatchSizeCalculator.class);

        public int getBatchSize(int numRegionServersForTable, int maxQpsPerRegionServer, int numTasksDuringPut, int maxExecutors, float qpsFraction) {
            int numRSAlive = numRegionServersForTable;
            int maxReqPerSec = this.getMaxReqPerSec(numRSAlive, maxQpsPerRegionServer, qpsFraction);
            int numTasks = numTasksDuringPut;
            int maxParallelPutsTask = Math.max(1, Math.min(numTasks, maxExecutors));
            int multiPutBatchSizePerSecPerTask = Math.max(1, (int)Math.ceil(maxReqPerSec / maxParallelPutsTask));
            LOG.info("HbaseIndexThrottling: qpsFraction :" + qpsFraction);
            LOG.info("HbaseIndexThrottling: numRSAlive :" + numRSAlive);
            LOG.info("HbaseIndexThrottling: maxReqPerSec :" + maxReqPerSec);
            LOG.info("HbaseIndexThrottling: numTasks :" + numTasks);
            LOG.info("HbaseIndexThrottling: maxExecutors :" + maxExecutors);
            LOG.info("HbaseIndexThrottling: maxParallelPuts :" + maxParallelPutsTask);
            LOG.info("HbaseIndexThrottling: numRegionServersForTable :" + numRegionServersForTable);
            LOG.info("HbaseIndexThrottling: multiPutBatchSizePerSecPerTask :" + multiPutBatchSizePerSecPerTask);
            return multiPutBatchSizePerSecPerTask;
        }

        public int getMaxReqPerSec(int numRegionServersForTable, int maxQpsPerRegionServer, float qpsFraction) {
            return (int)(qpsFraction * (float)numRegionServersForTable * (float)maxQpsPerRegionServer);
        }
    }
}

