/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.index.hbase;

import com.google.common.annotations.VisibleForTesting;
import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.BufferedMutator;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.RegionLocator;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hudi.WriteStatus;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordLocation;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.HoodieTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ReflectionUtils;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieDependentSystemUnavailableException;
import org.apache.hudi.exception.HoodieIndexException;
import org.apache.hudi.index.HoodieIndex;
import org.apache.hudi.index.hbase.DefaultHBaseQPSResourceAllocator;
import org.apache.hudi.index.hbase.HBaseIndexQPSResourceAllocator;
import org.apache.hudi.table.HoodieTable;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import scala.Tuple2;

public class HBaseIndex<T extends HoodieRecordPayload>
extends HoodieIndex<T> {
    public static final String DEFAULT_SPARK_EXECUTOR_INSTANCES_CONFIG_NAME = "spark.executor.instances";
    public static final String DEFAULT_SPARK_DYNAMIC_ALLOCATION_ENABLED_CONFIG_NAME = "spark.dynamicAllocation.enabled";
    public static final String DEFAULT_SPARK_DYNAMIC_ALLOCATION_MAX_EXECUTORS_CONFIG_NAME = "spark.dynamicAllocation.maxExecutors";
    private static final byte[] SYSTEM_COLUMN_FAMILY = Bytes.toBytes((String)"_s");
    private static final byte[] COMMIT_TS_COLUMN = Bytes.toBytes((String)"commit_ts");
    private static final byte[] FILE_NAME_COLUMN = Bytes.toBytes((String)"file_name");
    private static final byte[] PARTITION_PATH_COLUMN = Bytes.toBytes((String)"partition_path");
    private static final int SLEEP_TIME_MILLISECONDS = 100;
    private static final Logger LOG = LogManager.getLogger(HBaseIndex.class);
    private static Connection hbaseConnection = null;
    private HBaseIndexQPSResourceAllocator hBaseIndexQPSResourceAllocator = null;
    private float qpsFraction;
    private int maxQpsPerRegionServer;
    private Integer multiPutBatchSize;
    private Integer numRegionServersForTable;
    private final String tableName;
    private HbasePutBatchSizeCalculator putBatchSizeCalculator;

    public HBaseIndex(HoodieWriteConfig config) {
        super(config);
        this.tableName = config.getHbaseTableName();
        this.addShutDownHook();
        this.init(config);
    }

    private void init(HoodieWriteConfig config) {
        this.multiPutBatchSize = config.getHbaseIndexGetBatchSize();
        this.qpsFraction = config.getHbaseIndexQPSFraction();
        this.maxQpsPerRegionServer = config.getHbaseIndexMaxQPSPerRegionServer();
        this.putBatchSizeCalculator = new HbasePutBatchSizeCalculator();
        this.hBaseIndexQPSResourceAllocator = this.createQPSResourceAllocator(this.config);
    }

    @VisibleForTesting
    public HBaseIndexQPSResourceAllocator createQPSResourceAllocator(HoodieWriteConfig config) {
        try {
            LOG.info((Object)("createQPSResourceAllocator :" + config.getHBaseQPSResourceAllocatorClass()));
            return (HBaseIndexQPSResourceAllocator)ReflectionUtils.loadClass(config.getHBaseQPSResourceAllocatorClass(), config);
        }
        catch (Exception e) {
            LOG.warn((Object)"error while instantiating HBaseIndexQPSResourceAllocator", (Throwable)e);
            return new DefaultHBaseQPSResourceAllocator(config);
        }
    }

    @Override
    public JavaPairRDD<HoodieKey, Option<Pair<String, String>>> fetchRecordLocation(JavaRDD<HoodieKey> hoodieKeys, JavaSparkContext jsc, HoodieTable<T> hoodieTable) {
        throw new UnsupportedOperationException("HBase index does not implement check exist");
    }

    private Connection getHBaseConnection() {
        Configuration hbaseConfig = HBaseConfiguration.create();
        String quorum = this.config.getHbaseZkQuorum();
        hbaseConfig.set("hbase.zookeeper.quorum", quorum);
        String zkZnodeParent = this.config.getHBaseZkZnodeParent();
        if (zkZnodeParent != null) {
            hbaseConfig.set("zookeeper.znode.parent", zkZnodeParent);
        }
        String port = String.valueOf(this.config.getHbaseZkPort());
        hbaseConfig.set("hbase.zookeeper.property.clientPort", port);
        try {
            return ConnectionFactory.createConnection((Configuration)hbaseConfig);
        }
        catch (IOException e) {
            throw new HoodieDependentSystemUnavailableException("HBASE", quorum + ":" + port);
        }
    }

    private void addShutDownHook() {
        Runtime.getRuntime().addShutdownHook(new Thread(() -> {
            try {
                hbaseConnection.close();
            }
            catch (Exception exception) {
                // empty catch block
            }
        }));
    }

    @Override
    public void close() {
        this.hBaseIndexQPSResourceAllocator.releaseQPSResources();
    }

    private Get generateStatement(String key) throws IOException {
        return new Get(Bytes.toBytes((String)key)).setMaxVersions(1).addColumn(SYSTEM_COLUMN_FAMILY, COMMIT_TS_COLUMN).addColumn(SYSTEM_COLUMN_FAMILY, FILE_NAME_COLUMN).addColumn(SYSTEM_COLUMN_FAMILY, PARTITION_PATH_COLUMN);
    }

    private boolean checkIfValidCommit(HoodieTableMetaClient metaClient, String commitTs) {
        HoodieTimeline commitTimeline = metaClient.getActiveTimeline().filterCompletedInstants();
        return !commitTimeline.empty() && (commitTimeline.containsInstant(new HoodieInstant(false, "commit", commitTs)) || HoodieTimeline.compareTimestamps(commitTimeline.firstInstant().get().getTimestamp(), commitTs, HoodieTimeline.GREATER));
    }

    private Function2<Integer, Iterator<HoodieRecord<T>>, Iterator<HoodieRecord<T>>> locationTagFunction(HoodieTableMetaClient metaClient) {
        return (Function2 & Serializable)(partitionNum, hoodieRecordIterator) -> {
            int multiGetBatchSize = this.config.getHbaseIndexGetBatchSize();
            Class<HBaseIndex> clazz = HBaseIndex.class;
            synchronized (HBaseIndex.class) {
                if (hbaseConnection == null || hbaseConnection.isClosed()) {
                    hbaseConnection = this.getHBaseConnection();
                }
                // ** MonitorExit[var5_5] (shouldn't be in output)
                ArrayList taggedRecords = new ArrayList();
                HTable hTable = null;
                try {
                    hTable = (HTable)hbaseConnection.getTable(TableName.valueOf((String)this.tableName));
                    ArrayList<Get> statements = new ArrayList<Get>();
                    LinkedList<HoodieRecord> currentBatchOfRecords = new LinkedList<HoodieRecord>();
                    while (hoodieRecordIterator.hasNext()) {
                        HoodieRecord rec = (HoodieRecord)hoodieRecordIterator.next();
                        statements.add(this.generateStatement(rec.getRecordKey()));
                        currentBatchOfRecords.add(rec);
                        if (statements.size() < multiGetBatchSize && hoodieRecordIterator.hasNext()) continue;
                        Result[] results = this.doGet(hTable, statements);
                        statements.clear();
                        for (Result result : results) {
                            HoodieRecord currentRecord = (HoodieRecord)currentBatchOfRecords.remove(0);
                            if (result.getRow() != null) {
                                String keyFromResult = Bytes.toString((byte[])result.getRow());
                                String commitTs = Bytes.toString((byte[])result.getValue(SYSTEM_COLUMN_FAMILY, COMMIT_TS_COLUMN));
                                String fileId = Bytes.toString((byte[])result.getValue(SYSTEM_COLUMN_FAMILY, FILE_NAME_COLUMN));
                                String partitionPath = Bytes.toString((byte[])result.getValue(SYSTEM_COLUMN_FAMILY, PARTITION_PATH_COLUMN));
                                if (this.checkIfValidCommit(metaClient, commitTs)) {
                                    currentRecord = new HoodieRecord(new HoodieKey(currentRecord.getRecordKey(), partitionPath), currentRecord.getData());
                                    currentRecord.unseal();
                                    currentRecord.setCurrentLocation(new HoodieRecordLocation(commitTs, fileId));
                                    currentRecord.seal();
                                    taggedRecords.add(currentRecord);
                                    assert (currentRecord.getRecordKey().contentEquals(keyFromResult));
                                    continue;
                                }
                                taggedRecords.add(currentRecord);
                                continue;
                            }
                            taggedRecords.add(currentRecord);
                        }
                    }
                }
                catch (IOException e) {
                    throw new HoodieIndexException("Failed to Tag indexed locations because of exception with HBase Client", e);
                }
                finally {
                    if (hTable != null) {
                        try {
                            hTable.close();
                        }
                        catch (IOException iOException) {}
                    }
                }
                return taggedRecords.iterator();
            }
        };
    }

    private Result[] doGet(HTable hTable, List<Get> keys2) throws IOException {
        HBaseIndex.sleepForTime(100);
        return hTable.get(keys2);
    }

    @Override
    public JavaRDD<HoodieRecord<T>> tagLocation(JavaRDD<HoodieRecord<T>> recordRDD, JavaSparkContext jsc, HoodieTable<T> hoodieTable) {
        return recordRDD.mapPartitionsWithIndex(this.locationTagFunction(hoodieTable.getMetaClient()), true);
    }

    private Function2<Integer, Iterator<WriteStatus>, Iterator<WriteStatus>> updateLocationFunction() {
        return (Function2 & Serializable)(partition, statusIterator) -> {
            ArrayList<WriteStatus> writeStatusList = new ArrayList<WriteStatus>();
            Class<HBaseIndex> clazz = HBaseIndex.class;
            synchronized (HBaseIndex.class) {
                if (hbaseConnection == null || hbaseConnection.isClosed()) {
                    hbaseConnection = this.getHBaseConnection();
                }
                // ** MonitorExit[var4_4] (shouldn't be in output)
                try (BufferedMutator mutator = hbaseConnection.getBufferedMutator(TableName.valueOf((String)this.tableName));){
                    while (statusIterator.hasNext()) {
                        WriteStatus writeStatus = (WriteStatus)statusIterator.next();
                        ArrayList<Mutation> mutations = new ArrayList<Mutation>();
                        try {
                            for (HoodieRecord rec : writeStatus.getWrittenRecords()) {
                                if (!writeStatus.isErrored(rec.getKey())) {
                                    Option<HoodieRecordLocation> loc = rec.getNewLocation();
                                    if (loc.isPresent()) {
                                        if (rec.getCurrentLocation() != null) continue;
                                        Put put = new Put(Bytes.toBytes((String)rec.getRecordKey()));
                                        put.addColumn(SYSTEM_COLUMN_FAMILY, COMMIT_TS_COLUMN, Bytes.toBytes((String)loc.get().getInstantTime()));
                                        put.addColumn(SYSTEM_COLUMN_FAMILY, FILE_NAME_COLUMN, Bytes.toBytes((String)loc.get().getFileId()));
                                        put.addColumn(SYSTEM_COLUMN_FAMILY, PARTITION_PATH_COLUMN, Bytes.toBytes((String)rec.getPartitionPath()));
                                        mutations.add((Mutation)put);
                                    } else {
                                        Delete delete = new Delete(Bytes.toBytes((String)rec.getRecordKey()));
                                        mutations.add((Mutation)delete);
                                    }
                                }
                                if (mutations.size() < this.multiPutBatchSize) continue;
                                this.doMutations(mutator, mutations);
                            }
                            this.doMutations(mutator, mutations);
                        }
                        catch (Exception e) {
                            Exception we = new Exception("Error updating index for " + writeStatus, e);
                            LOG.error((Object)we);
                            writeStatus.setGlobalError(we);
                        }
                        writeStatusList.add(writeStatus);
                    }
                }
                catch (IOException e) {
                    throw new HoodieIndexException("Failed to Update Index locations because of exception with HBase Client", e);
                }
                return writeStatusList.iterator();
            }
        };
    }

    private void doMutations(BufferedMutator mutator, List<Mutation> mutations) throws IOException {
        if (mutations.isEmpty()) {
            return;
        }
        mutator.mutate(mutations);
        mutator.flush();
        mutations.clear();
        HBaseIndex.sleepForTime(100);
    }

    private static void sleepForTime(int sleepTimeMs) {
        try {
            Thread.sleep(sleepTimeMs);
        }
        catch (InterruptedException e) {
            LOG.error((Object)"Sleep interrupted during throttling", (Throwable)e);
            throw new RuntimeException(e);
        }
    }

    @Override
    public JavaRDD<WriteStatus> updateLocation(JavaRDD<WriteStatus> writeStatusRDD, JavaSparkContext jsc, HoodieTable<T> hoodieTable) {
        HBaseIndexQPSResourceAllocator hBaseIndexQPSResourceAllocator = this.createQPSResourceAllocator(this.config);
        this.setPutBatchSize(writeStatusRDD, hBaseIndexQPSResourceAllocator, jsc);
        LOG.info((Object)("multiPutBatchSize: before hbase puts" + this.multiPutBatchSize));
        JavaRDD writeStatusJavaRDD = writeStatusRDD.mapPartitionsWithIndex(this.updateLocationFunction(), true);
        writeStatusJavaRDD = writeStatusJavaRDD.persist(this.config.getWriteStatusStorageLevel());
        return writeStatusJavaRDD;
    }

    private void setPutBatchSize(JavaRDD<WriteStatus> writeStatusRDD, HBaseIndexQPSResourceAllocator hBaseIndexQPSResourceAllocator, JavaSparkContext jsc) {
        if (this.config.getHbaseIndexPutBatchSizeAutoCompute().booleanValue()) {
            SparkConf conf = jsc.getConf();
            int maxExecutors = conf.getInt(DEFAULT_SPARK_EXECUTOR_INSTANCES_CONFIG_NAME, 1);
            if (conf.getBoolean(DEFAULT_SPARK_DYNAMIC_ALLOCATION_ENABLED_CONFIG_NAME, false)) {
                maxExecutors = Math.max(maxExecutors, conf.getInt(DEFAULT_SPARK_DYNAMIC_ALLOCATION_MAX_EXECUTORS_CONFIG_NAME, 1));
            }
            Tuple2<Long, Integer> numPutsParallelismTuple = this.getHBasePutAccessParallelism(writeStatusRDD);
            long numPuts = (Long)numPutsParallelismTuple._1;
            int hbasePutsParallelism = (Integer)numPutsParallelismTuple._2;
            this.numRegionServersForTable = this.getNumRegionServersAliveForTable();
            float desiredQPSFraction = hBaseIndexQPSResourceAllocator.calculateQPSFractionForPutsTime(numPuts, this.numRegionServersForTable);
            LOG.info((Object)("Desired QPSFraction :" + desiredQPSFraction));
            LOG.info((Object)("Number HBase puts :" + numPuts));
            LOG.info((Object)("Hbase Puts Parallelism :" + hbasePutsParallelism));
            float availableQpsFraction = hBaseIndexQPSResourceAllocator.acquireQPSResources(desiredQPSFraction, numPuts);
            LOG.info((Object)("Allocated QPS Fraction :" + availableQpsFraction));
            this.multiPutBatchSize = this.putBatchSizeCalculator.getBatchSize(this.numRegionServersForTable, this.maxQpsPerRegionServer, hbasePutsParallelism, maxExecutors, 100, availableQpsFraction);
            LOG.info((Object)("multiPutBatchSize :" + this.multiPutBatchSize));
        }
    }

    @VisibleForTesting
    public Tuple2<Long, Integer> getHBasePutAccessParallelism(JavaRDD<WriteStatus> writeStatusRDD) {
        JavaPairRDD insertOnlyWriteStatusRDD = writeStatusRDD.filter((Function & Serializable)w -> w.getStat().getNumInserts() > 0L).mapToPair((PairFunction & Serializable)w -> new Tuple2((Object)w.getStat().getNumInserts(), (Object)1));
        return (Tuple2)insertOnlyWriteStatusRDD.fold((Object)new Tuple2((Object)0L, (Object)0), (Function2 & Serializable)(w, c) -> new Tuple2((Object)((Long)w._1 + (Long)c._1), (Object)((Integer)w._2 + (Integer)c._2)));
    }

    /*
     * Enabled aggressive block sorting
     * Enabled unnecessary exception pruning
     * Enabled aggressive exception aggregation
     */
    private Integer getNumRegionServersAliveForTable() {
        if (this.numRegionServersForTable != null) return this.numRegionServersForTable;
        try (Connection conn = this.getHBaseConnection();){
            RegionLocator regionLocator = conn.getRegionLocator(TableName.valueOf((String)this.tableName));
            Integer n = this.numRegionServersForTable = Integer.valueOf(Math.toIntExact(regionLocator.getAllRegionLocations().stream().map(HRegionLocation::getServerName).distinct().count()));
            return n;
        }
        catch (IOException e) {
            LOG.error((Object)e);
            throw new RuntimeException(e);
        }
    }

    @Override
    public boolean rollbackCommit(String commitTime) {
        return true;
    }

    @Override
    public boolean isGlobal() {
        return true;
    }

    @Override
    public boolean canIndexLogFiles() {
        return true;
    }

    @Override
    public boolean isImplicitWithStorage() {
        return false;
    }

    @VisibleForTesting
    public void setHbaseConnection(Connection hbaseConnection) {
        HBaseIndex.hbaseConnection = hbaseConnection;
    }

    public static class HbasePutBatchSizeCalculator
    implements Serializable {
        private static final int MILLI_SECONDS_IN_A_SECOND = 1000;
        private static final Logger LOG = LogManager.getLogger(HbasePutBatchSizeCalculator.class);

        public int getBatchSize(int numRegionServersForTable, int maxQpsPerRegionServer, int numTasksDuringPut, int maxExecutors, int sleepTimeMs, float qpsFraction) {
            int numRSAlive = numRegionServersForTable;
            int maxReqPerSec = (int)(qpsFraction * (float)numRSAlive * (float)maxQpsPerRegionServer);
            int numTasks = numTasksDuringPut;
            int maxParallelPuts = Math.max(1, Math.min(numTasks, maxExecutors));
            int maxReqsSentPerTaskPerSec = 1000 / sleepTimeMs;
            int multiPutBatchSize = Math.max(1, maxReqPerSec / (maxParallelPuts * maxReqsSentPerTaskPerSec));
            LOG.info((Object)("HbaseIndexThrottling: qpsFraction :" + qpsFraction));
            LOG.info((Object)("HbaseIndexThrottling: numRSAlive :" + numRSAlive));
            LOG.info((Object)("HbaseIndexThrottling: maxReqPerSec :" + maxReqPerSec));
            LOG.info((Object)("HbaseIndexThrottling: numTasks :" + numTasks));
            LOG.info((Object)("HbaseIndexThrottling: maxExecutors :" + maxExecutors));
            LOG.info((Object)("HbaseIndexThrottling: maxParallelPuts :" + maxParallelPuts));
            LOG.info((Object)("HbaseIndexThrottling: maxReqsSentPerTaskPerSec :" + maxReqsSentPerTaskPerSec));
            LOG.info((Object)("HbaseIndexThrottling: numRegionServersForTable :" + numRegionServersForTable));
            LOG.info((Object)("HbaseIndexThrottling: multiPutBatchSize :" + multiPutBatchSize));
            return multiPutBatchSize;
        }
    }
}

