/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.index.hbase;

import java.io.IOException;
import java.io.Serializable;
import java.util.Arrays;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.stream.Collectors;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hudi.client.SparkRDDWriteClient;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.client.common.HoodieSparkEngineContext;
import org.apache.hudi.common.config.HoodieMetadataConfig;
import org.apache.hudi.common.config.HoodieStorageConfig;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.model.EmptyHoodieRecordPayload;
import org.apache.hudi.common.model.HoodieAvroRecord;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordDelegate;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.model.HoodieWriteStat;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
import org.apache.hudi.common.testutils.HoodieTestUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.config.HoodieArchivalConfig;
import org.apache.hudi.config.HoodieCleanConfig;
import org.apache.hudi.config.HoodieCompactionConfig;
import org.apache.hudi.config.HoodieHBaseIndexConfig;
import org.apache.hudi.config.HoodieIndexConfig;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.hadoop.fs.HadoopFSUtils;
import org.apache.hudi.index.HoodieIndex;
import org.apache.hudi.index.hbase.SparkHoodieHBaseIndex;
import org.apache.hudi.storage.StorageConfiguration;
import org.apache.hudi.storage.StoragePath;
import org.apache.hudi.table.HoodieSparkTable;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.testutils.Assertions;
import org.apache.hudi.testutils.SparkClientFunctionalTestHarness;
import org.apache.spark.Partitioner;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.PairFunction;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.MethodOrderer;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestMethodOrder;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.EnumSource;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;
import org.mockito.verification.VerificationMode;
import scala.Tuple2;

@TestMethodOrder(value=MethodOrderer.Alphanumeric.class)
@Tag(value="functional")
public class TestSparkHoodieHBaseIndex
extends SparkClientFunctionalTestHarness {
    private static final String TABLE_NAME = "test_table";
    private static HBaseTestingUtility utility;
    private static Configuration hbaseConfig;
    private StorageConfiguration<Configuration> storageConf;
    private HoodieTestDataGenerator dataGen;
    private HoodieTableMetaClient metaClient;
    private HoodieSparkEngineContext context;
    private String basePath;

    @BeforeAll
    public static void init() throws Exception {
        System.setProperty("zookeeper.4lw.commands.whitelist", "*");
        hbaseConfig = HBaseConfiguration.create();
        hbaseConfig.set("zookeeper.znode.parent", "/hudi-hbase-test");
        utility = new HBaseTestingUtility(hbaseConfig);
        utility.startMiniCluster();
        hbaseConfig = utility.getConnection().getConfiguration();
        utility.createTable(TableName.valueOf((String)TABLE_NAME), StringUtils.getUTF8Bytes((String)"_s"), 2);
    }

    @AfterAll
    public static void clean() throws Exception {
        utility.shutdownMiniHBaseCluster();
        utility.shutdownMiniDFSCluster();
        utility.shutdownMiniMapReduceCluster();
        utility = null;
    }

    @BeforeEach
    public void setUp() throws Exception {
        this.storageConf = HadoopFSUtils.getStorageConf((Configuration)this.jsc().hadoopConfiguration());
        ((Configuration)this.storageConf.unwrap()).addResource(utility.getConfiguration());
        this.context = new HoodieSparkEngineContext(this.jsc());
        this.basePath = utility.getDataTestDirOnTestFS(TABLE_NAME).toString();
        this.metaClient = this.getHoodieMetaClient(this.storageConf, this.basePath);
        this.dataGen = new HoodieTestDataGenerator();
    }

    @AfterEach
    public void cleanUpTableData() throws IOException {
        utility.cleanupDataTestDirOnTestFS(TABLE_NAME);
    }

    @ParameterizedTest
    @EnumSource(value=HoodieTableType.class)
    public void testSimpleTagLocationAndUpdate(HoodieTableType tableType) throws Exception {
        this.metaClient = HoodieTestUtils.init(this.storageConf, (String)this.basePath, (HoodieTableType)tableType);
        String newCommitTime = "001";
        int numRecords = 10;
        List records = this.dataGen.generateInserts("001", Integer.valueOf(10));
        JavaRDD writeRecords = this.jsc().parallelize(records, 1);
        HoodieWriteConfig config = this.getConfig();
        SparkHoodieHBaseIndex index = new SparkHoodieHBaseIndex(config);
        try (SparkRDDWriteClient writeClient = this.getHoodieWriteClient(config);){
            this.metaClient = HoodieTableMetaClient.reload((HoodieTableMetaClient)this.metaClient);
            HoodieSparkTable hoodieTable = HoodieSparkTable.create((HoodieWriteConfig)config, (HoodieEngineContext)this.context, (HoodieTableMetaClient)this.metaClient);
            JavaRDD<HoodieRecord> records1 = this.tagLocation((HoodieIndex)index, (JavaRDD<HoodieRecord>)writeRecords, (HoodieTable)hoodieTable);
            org.junit.jupiter.api.Assertions.assertEquals((long)0L, (long)records1.filter((Function & Serializable)record -> record.isCurrentLocationKnown()).count());
            writeClient.startCommitWithTime("001");
            JavaRDD writeStatues = writeClient.upsert(writeRecords, "001");
            Assertions.assertNoWriteErrors((List)writeStatues.collect());
            JavaRDD<HoodieRecord> records2 = this.tagLocation((HoodieIndex)index, (JavaRDD<HoodieRecord>)writeRecords, (HoodieTable)hoodieTable);
            org.junit.jupiter.api.Assertions.assertEquals((long)0L, (long)records2.filter((Function & Serializable)record -> record.isCurrentLocationKnown()).count());
            writeClient.commit("001", (Object)writeStatues);
            String secondCommitTime = writeClient.createNewInstantTime();
            this.metaClient = HoodieTableMetaClient.reload((HoodieTableMetaClient)this.metaClient);
            hoodieTable = HoodieSparkTable.create((HoodieWriteConfig)config, (HoodieEngineContext)this.context, (HoodieTableMetaClient)this.metaClient);
            List newRecords = this.dataGen.generateUpdatesForAllRecords(secondCommitTime);
            String newPartitionPath = "2022/11/04";
            newRecords = newRecords.stream().map(rec -> new HoodieAvroRecord(new HoodieKey(rec.getRecordKey(), newPartitionPath), (HoodieRecordPayload)rec.getData())).collect(Collectors.toList());
            JavaRDD newWriteRecords = this.jsc().parallelize(newRecords, 1);
            this.metaClient = HoodieTableMetaClient.reload((HoodieTableMetaClient)this.metaClient);
            hoodieTable = HoodieSparkTable.create((HoodieWriteConfig)config, (HoodieEngineContext)this.context, (HoodieTableMetaClient)this.metaClient);
            List records4 = this.tagLocation((HoodieIndex)index, (JavaRDD<HoodieRecord>)newWriteRecords, (HoodieTable)hoodieTable).collect();
            org.junit.jupiter.api.Assertions.assertEquals((long)10L, (long)records4.stream().filter(record -> record.isCurrentLocationKnown()).count());
            org.junit.jupiter.api.Assertions.assertEquals((long)10L, (long)records4.stream().map(record -> record.getKey().getRecordKey()).distinct().count());
            org.junit.jupiter.api.Assertions.assertEquals((long)10L, (long)records4.stream().filter(record -> record.getCurrentLocation() != null && record.getCurrentLocation().getInstantTime().equals("001")).distinct().count());
            org.junit.jupiter.api.Assertions.assertEquals((long)0L, (long)records4.stream().filter(record -> record.getKey().getPartitionPath().equalsIgnoreCase(newPartitionPath)).count());
            this.metaClient = HoodieTableMetaClient.reload((HoodieTableMetaClient)this.metaClient);
            hoodieTable = HoodieSparkTable.create((HoodieWriteConfig)config, (HoodieEngineContext)this.context, (HoodieTableMetaClient)this.metaClient);
            List records3 = this.tagLocation((HoodieIndex)index, (JavaRDD<HoodieRecord>)writeRecords, (HoodieTable)hoodieTable).collect();
            org.junit.jupiter.api.Assertions.assertEquals((long)10L, (long)records3.stream().filter(record -> record.isCurrentLocationKnown()).count());
            org.junit.jupiter.api.Assertions.assertEquals((long)10L, (long)records3.stream().map(record -> record.getKey().getRecordKey()).distinct().count());
            org.junit.jupiter.api.Assertions.assertEquals((long)10L, (long)records3.stream().filter(record -> record.getCurrentLocation() != null && record.getCurrentLocation().getInstantTime().equals("001")).distinct().count());
        }
    }

    @Test
    public void testTagLocationAndPartitionPathUpdateDisabled() throws Exception {
        String newCommitTime = "001";
        String oldPartitionPath = "1970/01/01";
        int numRecords = 10;
        List newRecords = this.dataGen.generateInserts("001", Integer.valueOf(10));
        LinkedList<HoodieAvroRecord> oldRecords = new LinkedList<HoodieAvroRecord>();
        for (HoodieRecord newRecord : newRecords) {
            HoodieKey key = new HoodieKey(newRecord.getRecordKey(), "1970/01/01");
            HoodieAvroRecord hoodieRecord = new HoodieAvroRecord(key, (HoodieRecordPayload)newRecord.getData());
            oldRecords.add(hoodieRecord);
        }
        JavaRDD newWriteRecords = this.jsc().parallelize(newRecords, 1);
        JavaRDD oldWriteRecords = this.jsc().parallelize(oldRecords, 1);
        HoodieWriteConfig config = this.getConfigBuilder(100, false, false).build();
        SparkRDDWriteClient writeClient = this.getHoodieWriteClient(config);
        writeClient.startCommitWithTime("001");
        JavaRDD writeStatues = writeClient.upsert(oldWriteRecords, "001");
        writeClient.commit("001", (Object)writeStatues);
        Assertions.assertNoWriteErrors((List)writeStatues.collect());
        this.metaClient = HoodieTableMetaClient.reload((HoodieTableMetaClient)this.metaClient);
        HoodieSparkTable hoodieTable = HoodieSparkTable.create((HoodieWriteConfig)config, (HoodieEngineContext)this.context, (HoodieTableMetaClient)this.metaClient);
        SparkHoodieHBaseIndex index = new SparkHoodieHBaseIndex(config);
        List notAllowPathChangeRecords = this.tagLocation((HoodieIndex)index, (JavaRDD<HoodieRecord>)newWriteRecords, (HoodieTable)hoodieTable).collect();
        org.junit.jupiter.api.Assertions.assertEquals((long)10L, (long)notAllowPathChangeRecords.stream().count());
        String newCommitTime1 = "002";
        writeClient.startCommitWithTime(newCommitTime1);
        JavaRDD writeStatues1 = writeClient.upsert(newWriteRecords, newCommitTime1);
        writeClient.commit(newCommitTime1, (Object)writeStatues1);
        Assertions.assertNoWriteErrors((List)writeStatues1.collect());
        org.junit.jupiter.api.Assertions.assertEquals((long)10L, (Long)((Long)writeStatues1.map((Function & Serializable)writeStatus -> writeStatus.getTotalRecords()).reduce(Long::sum)));
        org.junit.jupiter.api.Assertions.assertEquals((long)0L, (long)writeStatues1.filter((Function & Serializable)writeStatus -> !writeStatus.getPartitionPath().equals("1970/01/01")).count());
    }

    @Test
    public void testTagLocationAndPartitionPathUpdateEnabled() throws Exception {
        String newCommitTime = "001";
        String oldPartitionPath = "1970/01/01";
        int numRecords = 10;
        List newRecords = this.dataGen.generateInserts("001", Integer.valueOf(10));
        LinkedList<HoodieAvroRecord> oldRecords = new LinkedList<HoodieAvroRecord>();
        for (HoodieRecord newRecord : newRecords) {
            HoodieKey key = new HoodieKey(newRecord.getRecordKey(), "1970/01/01");
            HoodieAvroRecord hoodieRecord = new HoodieAvroRecord(key, (HoodieRecordPayload)newRecord.getData());
            oldRecords.add(hoodieRecord);
        }
        JavaRDD newWriteRecords = this.jsc().parallelize(newRecords, 1);
        JavaRDD oldWriteRecords = this.jsc().parallelize(oldRecords, 1);
        HoodieWriteConfig config = this.getConfigBuilder(100, true, false).build();
        SparkRDDWriteClient writeClient = this.getHoodieWriteClient(config);
        writeClient.startCommitWithTime("001");
        JavaRDD writeStatues = writeClient.upsert(oldWriteRecords, "001");
        writeClient.commit("001", (Object)writeStatues);
        Assertions.assertNoWriteErrors((List)writeStatues.collect());
        this.metaClient = HoodieTableMetaClient.reload((HoodieTableMetaClient)this.metaClient);
        HoodieSparkTable hoodieTable = HoodieSparkTable.create((HoodieWriteConfig)config, (HoodieEngineContext)this.context, (HoodieTableMetaClient)this.metaClient);
        SparkHoodieHBaseIndex index = new SparkHoodieHBaseIndex(config);
        List pathChangeRecords = this.tagLocation((HoodieIndex)index, (JavaRDD<HoodieRecord>)newWriteRecords, (HoodieTable)hoodieTable).collect();
        org.junit.jupiter.api.Assertions.assertEquals((long)20L, (long)pathChangeRecords.stream().count());
        org.junit.jupiter.api.Assertions.assertEquals((long)10L, (long)pathChangeRecords.stream().filter(HoodieRecord::isCurrentLocationKnown).count());
        String newCommitTime1 = "002";
        writeClient.startCommitWithTime(newCommitTime1);
        JavaRDD writeStatues1 = writeClient.upsert(newWriteRecords, newCommitTime1);
        writeClient.commit(newCommitTime1, (Object)writeStatues1);
        Assertions.assertNoWriteErrors((List)writeStatues1.collect());
        org.junit.jupiter.api.Assertions.assertEquals((long)20L, (Long)((Long)writeStatues1.map((Function & Serializable)writeStatus -> writeStatus.getTotalRecords()).reduce(Long::sum)));
        org.junit.jupiter.api.Assertions.assertNotEquals((long)0L, (long)writeStatues1.filter((Function & Serializable)writeStatus -> writeStatus.getPartitionPath().equals("1970/01/01")).count());
        this.metaClient = HoodieTableMetaClient.reload((HoodieTableMetaClient)this.metaClient);
        hoodieTable = HoodieSparkTable.create((HoodieWriteConfig)config, (HoodieEngineContext)this.context, (HoodieTableMetaClient)this.metaClient);
        List pathChangeRecords1 = this.tagLocation((HoodieIndex)index, (JavaRDD<HoodieRecord>)newWriteRecords, (HoodieTable)hoodieTable).collect();
        org.junit.jupiter.api.Assertions.assertEquals((long)10L, (long)pathChangeRecords1.stream().filter(HoodieRecord::isCurrentLocationKnown).count());
    }

    @Test
    public void testTagLocationAndDuplicateUpdate() throws Exception {
        String newCommitTime = "001";
        int numRecords = 10;
        List records = this.dataGen.generateInserts("001", Integer.valueOf(10));
        JavaRDD writeRecords = this.jsc().parallelize(records, 1);
        HoodieWriteConfig config = this.getConfig();
        SparkHoodieHBaseIndex index = new SparkHoodieHBaseIndex(config);
        try (SparkRDDWriteClient writeClient = this.getHoodieWriteClient(config);){
            writeClient.startCommitWithTime("001");
            this.metaClient = HoodieTableMetaClient.reload((HoodieTableMetaClient)this.metaClient);
            HoodieSparkTable hoodieTable = HoodieSparkTable.create((HoodieWriteConfig)config, (HoodieEngineContext)this.context, (HoodieTableMetaClient)this.metaClient);
            JavaRDD writeStatues = writeClient.upsert(writeRecords, "001");
            this.tagLocation((HoodieIndex)index, (JavaRDD<HoodieRecord>)writeRecords, (HoodieTable)hoodieTable);
            this.metaClient.getStorage().deleteDirectory(new StoragePath(this.metaClient.getTimelinePath(), "001.inflight"));
            writeClient.upsert(writeRecords, "001");
            Assertions.assertNoWriteErrors((List)writeStatues.collect());
            writeClient.commit("001", (Object)writeStatues);
            this.metaClient = HoodieTableMetaClient.reload((HoodieTableMetaClient)this.metaClient);
            hoodieTable = HoodieSparkTable.create((HoodieWriteConfig)config, (HoodieEngineContext)this.context, (HoodieTableMetaClient)this.metaClient);
            List taggedRecords = this.tagLocation((HoodieIndex)index, (JavaRDD<HoodieRecord>)writeRecords, (HoodieTable)hoodieTable).collect();
            org.junit.jupiter.api.Assertions.assertEquals((long)10L, (long)taggedRecords.stream().filter(HoodieRecord::isCurrentLocationKnown).count());
            org.junit.jupiter.api.Assertions.assertEquals((long)10L, (long)taggedRecords.stream().map(record -> record.getKey().getRecordKey()).distinct().count());
            org.junit.jupiter.api.Assertions.assertEquals((long)10L, (long)taggedRecords.stream().filter(record -> record.getCurrentLocation() != null && record.getCurrentLocation().getInstantTime().equals("001")).distinct().count());
        }
    }

    @Disabled(value="HUDI-6460")
    public void testTagLocationAndPartitionPathUpdateWithExplicitRollback() throws Exception {
        int numRecords = 10;
        String oldPartitionPath = "1970/01/01";
        String emptyHoodieRecordPayloadClassName = EmptyHoodieRecordPayload.class.getName();
        HoodieWriteConfig config = this.getConfigBuilder(100, true, true).withRollbackUsingMarkers(false).build();
        SparkHoodieHBaseIndex index = new SparkHoodieHBaseIndex(config);
        try (SparkRDDWriteClient writeClient = this.getHoodieWriteClient(config);){
            String firstCommitTime = writeClient.startCommit();
            List newRecords = this.dataGen.generateInserts(firstCommitTime, Integer.valueOf(10));
            LinkedList<HoodieAvroRecord> oldRecords = new LinkedList<HoodieAvroRecord>();
            for (HoodieRecord newRecord : newRecords) {
                HoodieKey key = new HoodieKey(newRecord.getRecordKey(), "1970/01/01");
                HoodieAvroRecord hoodieRecord = new HoodieAvroRecord(key, (HoodieRecordPayload)newRecord.getData());
                oldRecords.add(hoodieRecord);
            }
            JavaRDD newWriteRecords = this.jsc().parallelize(newRecords, 1);
            JavaRDD oldWriteRecords = this.jsc().parallelize(oldRecords, 1);
            this.metaClient = HoodieTableMetaClient.reload((HoodieTableMetaClient)this.metaClient);
            HoodieSparkTable hoodieTable = HoodieSparkTable.create((HoodieWriteConfig)config, (HoodieEngineContext)this.context, (HoodieTableMetaClient)this.metaClient);
            List beforeFirstTaggedRecords = this.tagLocation((HoodieIndex)index, (JavaRDD<HoodieRecord>)oldWriteRecords, (HoodieTable)hoodieTable).collect();
            JavaRDD oldWriteStatues = writeClient.upsert(oldWriteRecords, firstCommitTime);
            this.updateLocation((HoodieIndex)index, (JavaRDD<WriteStatus>)oldWriteStatues, (HoodieTable)hoodieTable);
            writeClient.commit(firstCommitTime, (Object)oldWriteStatues);
            List afterFirstTaggedRecords = this.tagLocation((HoodieIndex)index, (JavaRDD<HoodieRecord>)oldWriteRecords, (HoodieTable)hoodieTable).collect();
            this.metaClient = HoodieTableMetaClient.reload((HoodieTableMetaClient)this.metaClient);
            hoodieTable = HoodieSparkTable.create((HoodieWriteConfig)config, (HoodieEngineContext)this.context, (HoodieTableMetaClient)this.metaClient);
            String secondCommitTime = writeClient.startCommit();
            List beforeSecondTaggedRecords = this.tagLocation((HoodieIndex)index, (JavaRDD<HoodieRecord>)newWriteRecords, (HoodieTable)hoodieTable).collect();
            JavaRDD newWriteStatues = writeClient.upsert(newWriteRecords, secondCommitTime);
            this.updateLocation((HoodieIndex)index, (JavaRDD<WriteStatus>)newWriteStatues, (HoodieTable)hoodieTable);
            writeClient.commit(secondCommitTime, (Object)newWriteStatues);
            List afterSecondTaggedRecords = this.tagLocation((HoodieIndex)index, (JavaRDD<HoodieRecord>)newWriteRecords, (HoodieTable)hoodieTable).collect();
            writeClient.rollback(secondCommitTime);
            List afterRollback = this.tagLocation((HoodieIndex)index, (JavaRDD<HoodieRecord>)newWriteRecords, (HoodieTable)hoodieTable).collect();
            org.junit.jupiter.api.Assertions.assertEquals((long)10L, (long)beforeFirstTaggedRecords.stream().filter(record -> record.getCurrentLocation() == null).count());
            org.junit.jupiter.api.Assertions.assertEquals((long)10L, (long)afterFirstTaggedRecords.stream().filter(HoodieRecord::isCurrentLocationKnown).count());
            org.junit.jupiter.api.Assertions.assertEquals((long)10L, (long)beforeSecondTaggedRecords.stream().filter(record -> record.getKey().getPartitionPath().equals("1970/01/01") && record.getData().getClass().getName().equals(emptyHoodieRecordPayloadClassName)).count());
            org.junit.jupiter.api.Assertions.assertEquals((long)20L, (long)beforeSecondTaggedRecords.stream().count());
            org.junit.jupiter.api.Assertions.assertEquals((long)10L, (long)afterSecondTaggedRecords.stream().count());
            org.junit.jupiter.api.Assertions.assertEquals((long)10L, (long)afterSecondTaggedRecords.stream().filter(record -> !record.getKey().getPartitionPath().equals("1970/01/01")).count());
            org.junit.jupiter.api.Assertions.assertEquals((long)10L, (long)afterRollback.stream().filter(record -> record.getKey().getPartitionPath().equals("1970/01/01") && record.getData().getClass().getName().equals(emptyHoodieRecordPayloadClassName)).count());
            org.junit.jupiter.api.Assertions.assertEquals((long)20L, (long)beforeSecondTaggedRecords.stream().count());
            org.junit.jupiter.api.Assertions.assertEquals((long)10L, (long)afterRollback.stream().filter(HoodieRecord::isCurrentLocationKnown).filter(record -> record.getCurrentLocation().getInstantTime().equals(firstCommitTime)).count());
        }
    }

    @Test
    public void testSimpleTagLocationAndUpdateWithRollback() throws Exception {
        HoodieWriteConfig config = this.getConfigBuilder(100, false, false).withRollbackUsingMarkers(false).build();
        SparkHoodieHBaseIndex index = new SparkHoodieHBaseIndex(config);
        try (SparkRDDWriteClient writeClient = this.getHoodieWriteClient(config);){
            String newCommitTime = writeClient.startCommit();
            int numRecords = 10;
            List records = this.dataGen.generateInserts(newCommitTime, Integer.valueOf(10));
            JavaRDD writeRecords = this.jsc().parallelize(records, 1);
            this.metaClient = HoodieTableMetaClient.reload((HoodieTableMetaClient)this.metaClient);
            JavaRDD writeStatues = writeClient.upsert(writeRecords, newCommitTime);
            Assertions.assertNoWriteErrors((List)writeStatues.collect());
            writeClient.commit(newCommitTime, (Object)writeStatues);
            HoodieSparkTable hoodieTable = HoodieSparkTable.create((HoodieWriteConfig)config, (HoodieEngineContext)this.context, (HoodieTableMetaClient)this.metaClient);
            List records2 = this.tagLocation((HoodieIndex)index, (JavaRDD<HoodieRecord>)writeRecords, (HoodieTable)hoodieTable).collect();
            org.junit.jupiter.api.Assertions.assertEquals((long)10L, (long)records2.stream().filter(HoodieRecord::isCurrentLocationKnown).count());
            List fileIds = writeStatues.map(WriteStatus::getFileId).collect();
            org.junit.jupiter.api.Assertions.assertEquals((long)0L, (long)records2.stream().filter(record -> record.getCurrentLocation().getFileId() == null).count());
            List taggedFileIds = records2.stream().map(record -> record.getCurrentLocation().getFileId()).distinct().collect(Collectors.toList());
            org.junit.jupiter.api.Assertions.assertTrue((taggedFileIds.containsAll(fileIds) && fileIds.containsAll(taggedFileIds) ? 1 : 0) != 0);
            writeClient.rollback(newCommitTime);
            hoodieTable = HoodieSparkTable.create((HoodieWriteConfig)config, (HoodieEngineContext)this.context, (HoodieTableMetaClient)this.metaClient);
            List records3 = this.tagLocation((HoodieIndex)index, (JavaRDD<HoodieRecord>)writeRecords, (HoodieTable)hoodieTable).collect();
            org.junit.jupiter.api.Assertions.assertEquals((long)0L, (long)records3.stream().filter(HoodieRecord::isCurrentLocationKnown).count());
            org.junit.jupiter.api.Assertions.assertEquals((long)0L, (long)records3.stream().filter(record -> record.getCurrentLocation() != null).count());
        }
    }

    @Test
    public void testSimpleTagLocationWithInvalidCommit() throws Exception {
        HoodieWriteConfig config = this.getConfigBuilder(100, false, false).withRollbackUsingMarkers(false).build();
        SparkHoodieHBaseIndex index = new SparkHoodieHBaseIndex(config);
        try (SparkRDDWriteClient writeClient = this.getHoodieWriteClient(config);){
            String newCommitTime = writeClient.startCommit();
            JavaRDD<HoodieRecord> writeRecords = this.generateAndCommitRecords(writeClient, 199, newCommitTime);
            String invalidCommit = writeClient.startCommit();
            JavaRDD<HoodieRecord> invalidWriteRecords = this.generateAndCommitRecords(writeClient, 1, invalidCommit);
            HoodieSparkTable hoodieTable = HoodieSparkTable.create((HoodieWriteConfig)config, (HoodieEngineContext)this.context, (HoodieTableMetaClient)this.metaClient);
            JavaRDD<HoodieRecord> javaRDD0 = this.tagLocation((HoodieIndex)index, invalidWriteRecords, (HoodieTable)hoodieTable);
            assert (javaRDD0.collect().size() == 1);
            assert (javaRDD0.filter(HoodieRecord::isCurrentLocationKnown).collect().size() == 1);
            assert (((HoodieRecord)javaRDD0.collect().get(0)).getCurrentLocation().getInstantTime().equals(invalidCommit));
            writeClient.rollback(invalidCommit);
            this.metaClient = HoodieTableMetaClient.reload((HoodieTableMetaClient)this.metaClient);
            hoodieTable = HoodieSparkTable.create((HoodieWriteConfig)config, (HoodieEngineContext)this.context, (HoodieTableMetaClient)this.metaClient);
            JavaRDD<HoodieRecord> javaRDD1 = this.tagLocation((HoodieIndex)index, writeRecords, (HoodieTable)hoodieTable);
            assert (javaRDD1.filter(HoodieRecord::isCurrentLocationKnown).collect().size() == 199);
            JavaRDD<HoodieRecord> javaRDD2 = this.tagLocation((HoodieIndex)index, invalidWriteRecords, (HoodieTable)hoodieTable);
            assert (javaRDD2.collect().size() == 1);
            assert (javaRDD2.filter(HoodieRecord::isCurrentLocationKnown).collect().size() == 0);
        }
    }

    @Test
    public void testEnsureTagLocationUsesCommitTimeline() throws Exception {
        HoodieWriteConfig config = this.getConfigBuilder(100, false, false).withRollbackUsingMarkers(false).build();
        SparkHoodieHBaseIndex index = new SparkHoodieHBaseIndex(config);
        try (SparkRDDWriteClient writeClient = this.getHoodieWriteClient(config);){
            String commitTime1 = writeClient.startCommit();
            JavaRDD<HoodieRecord> writeRecords1 = this.generateAndCommitRecords(writeClient, 20, commitTime1);
            writeClient.rollback(commitTime1);
            this.metaClient = HoodieTableMetaClient.reload((HoodieTableMetaClient)this.metaClient);
            this.generateAndCommitRecords(writeClient, 20);
            this.metaClient = HoodieTableMetaClient.reload((HoodieTableMetaClient)this.metaClient);
            HoodieSparkTable hoodieTable = HoodieSparkTable.create((HoodieWriteConfig)config, (HoodieEngineContext)this.context, (HoodieTableMetaClient)this.metaClient);
            JavaRDD<HoodieRecord> javaRDD1 = this.tagLocation((HoodieIndex)index, writeRecords1, (HoodieTable)hoodieTable);
            assert (javaRDD1.filter(HoodieRecord::isCurrentLocationKnown).collect().size() == 20);
        }
    }

    private JavaRDD<HoodieRecord> generateAndCommitRecords(SparkRDDWriteClient writeClient, int numRecs) throws Exception {
        String commitTime = writeClient.startCommit();
        return this.generateAndCommitRecords(writeClient, numRecs, commitTime);
    }

    private JavaRDD<HoodieRecord> generateAndCommitRecords(SparkRDDWriteClient writeClient, int numRecs, String commitTime) throws Exception {
        List records = this.dataGen.generateInserts(commitTime, Integer.valueOf(numRecs));
        JavaRDD writeRecords = this.jsc().parallelize(records, 1);
        this.metaClient = HoodieTableMetaClient.reload((HoodieTableMetaClient)this.metaClient);
        JavaRDD writeStatues = writeClient.upsert(writeRecords, commitTime);
        Assertions.assertNoWriteErrors((List)writeStatues.collect());
        writeClient.commit(commitTime, (Object)writeStatues);
        return writeRecords;
    }

    @Test
    public void testHbaseTagLocationForArchivedCommits() throws Exception {
        HashMap<String, String> params = new HashMap<String, String>();
        params.put(HoodieCleanConfig.CLEANER_COMMITS_RETAINED.key(), "1");
        params.put(HoodieMetadataConfig.COMPACT_NUM_DELTA_COMMITS.key(), "3");
        params.put(HoodieArchivalConfig.MAX_COMMITS_TO_KEEP.key(), "5");
        params.put(HoodieArchivalConfig.MIN_COMMITS_TO_KEEP.key(), "4");
        HoodieWriteConfig config = this.getConfigBuilder(100, false, false).withProps(params).build();
        SparkHoodieHBaseIndex index = new SparkHoodieHBaseIndex(config);
        try (SparkRDDWriteClient writeClient = this.getHoodieWriteClient(config);){
            JavaRDD<HoodieRecord> writeRecords1 = this.generateAndCommitRecords(writeClient, 20);
            this.metaClient = HoodieTableMetaClient.reload((HoodieTableMetaClient)this.metaClient);
            String commit1 = ((HoodieInstant)this.metaClient.getActiveTimeline().firstInstant().get()).requestedTime();
            for (int nCommit = 0; nCommit < 6; ++nCommit) {
                this.generateAndCommitRecords(writeClient, 20);
            }
            this.metaClient = HoodieTableMetaClient.reload((HoodieTableMetaClient)this.metaClient);
            org.junit.jupiter.api.Assertions.assertTrue((boolean)this.metaClient.getArchivedTimeline().containsInstant(commit1));
            HoodieSparkTable hoodieTable = HoodieSparkTable.create((HoodieWriteConfig)config, (HoodieEngineContext)this.context, (HoodieTableMetaClient)this.metaClient);
            JavaRDD<HoodieRecord> javaRDD1 = this.tagLocation((HoodieIndex)index, writeRecords1, (HoodieTable)hoodieTable);
            org.junit.jupiter.api.Assertions.assertEquals((int)20, (int)javaRDD1.filter(HoodieRecord::isCurrentLocationKnown).collect().size());
        }
    }

    @Test
    public void testTotalGetsBatching() throws Exception {
        HoodieWriteConfig config = this.getConfig();
        SparkHoodieHBaseIndex index = new SparkHoodieHBaseIndex(config);
        Connection hbaseConnection = (Connection)Mockito.mock(Connection.class);
        HTable table = (HTable)Mockito.mock(HTable.class);
        Mockito.when((Object)hbaseConnection.getTable(TableName.valueOf((String)TABLE_NAME))).thenReturn((Object)table);
        Mockito.when((Object)table.get((List)ArgumentMatchers.any())).thenReturn((Object)new Result[0]);
        index.setHbaseConnection(hbaseConnection);
        try (SparkRDDWriteClient writeClient = this.getHoodieWriteClient(config);){
            String newCommitTime = writeClient.startCommit();
            List records = this.dataGen.generateInserts(newCommitTime, Integer.valueOf(250));
            JavaRDD writeRecords = this.jsc().parallelize(records, 1);
            this.metaClient = HoodieTableMetaClient.reload((HoodieTableMetaClient)this.metaClient);
            HoodieSparkTable hoodieTable = HoodieSparkTable.create((HoodieWriteConfig)config, (HoodieEngineContext)this.context, (HoodieTableMetaClient)this.metaClient);
            JavaRDD writeStatues = writeClient.upsert(writeRecords, newCommitTime);
            Assertions.assertNoWriteErrors((List)writeStatues.collect());
            this.tagLocation((HoodieIndex)index, (JavaRDD<HoodieRecord>)writeRecords, (HoodieTable)hoodieTable);
            ((HTable)Mockito.verify((Object)table, (VerificationMode)Mockito.times((int)3))).get((List)ArgumentMatchers.any());
        }
    }

    @Test
    public void testTotalPutsBatching() throws Exception {
        HoodieWriteConfig config = this.getConfig();
        SparkHoodieHBaseIndex index = new SparkHoodieHBaseIndex(config);
        try (SparkRDDWriteClient writeClient = this.getHoodieWriteClient(config);){
            String newCommitTime = writeClient.startCommit();
            List records = this.dataGen.generateInserts(newCommitTime, Integer.valueOf(250));
            JavaRDD writeRecords = this.jsc().parallelize(records, 1);
            this.metaClient = HoodieTableMetaClient.reload((HoodieTableMetaClient)this.metaClient);
            HoodieSparkTable hoodieTable = HoodieSparkTable.create((HoodieWriteConfig)config, (HoodieEngineContext)this.context, (HoodieTableMetaClient)this.metaClient);
            JavaRDD writeStatues = writeClient.upsert(writeRecords, newCommitTime);
            writeClient.commit(newCommitTime, (Object)writeStatues);
            Connection hbaseConnection = (Connection)Mockito.mock(Connection.class);
            HTable table = (HTable)Mockito.mock(HTable.class);
            Mockito.when((Object)hbaseConnection.getTable(TableName.valueOf((String)TABLE_NAME))).thenReturn((Object)table);
            Mockito.when((Object)table.get((List)ArgumentMatchers.any())).thenReturn((Object)new Result[0]);
            index.setHbaseConnection(hbaseConnection);
            int numberOfDataFileIds = (int)writeStatues.map((Function & Serializable)status -> status.getFileId()).distinct().count();
            this.updateLocation((HoodieIndex)index, (JavaRDD<WriteStatus>)writeStatues, (HoodieTable)hoodieTable);
            ((HTable)Mockito.verify((Object)table, (VerificationMode)Mockito.atMost((int)numberOfDataFileIds))).put((List)ArgumentMatchers.any());
        }
    }

    @Test
    public void testsHBasePutAccessParallelism() {
        HoodieWriteConfig config = this.getConfig();
        SparkHoodieHBaseIndex index = new SparkHoodieHBaseIndex(config);
        JavaRDD writeStatusRDD = this.jsc().parallelize(Arrays.asList(this.getSampleWriteStatus(0, 2), this.getSampleWriteStatus(2, 3), this.getSampleWriteStatus(4, 3), this.getSampleWriteStatus(6, 3), this.getSampleWriteStatus(8, 0)), 10);
        Tuple2 tuple = index.getHBasePutAccessParallelism(writeStatusRDD);
        int hbasePutAccessParallelism = Integer.parseInt(((Integer)tuple._2).toString());
        int hbaseNumPuts = Integer.parseInt(((Long)tuple._1).toString());
        org.junit.jupiter.api.Assertions.assertEquals((int)10, (int)writeStatusRDD.getNumPartitions());
        org.junit.jupiter.api.Assertions.assertEquals((int)4, (int)hbasePutAccessParallelism);
        org.junit.jupiter.api.Assertions.assertEquals((int)20, (int)hbaseNumPuts);
    }

    @Test
    public void testsWriteStatusPartitioner() {
        List[] writeStatuses;
        HoodieWriteConfig config = this.getConfig();
        SparkHoodieHBaseIndex index = new SparkHoodieHBaseIndex(config);
        int parallelism = 4;
        JavaRDD writeStatusRDD = this.jsc().parallelize(Arrays.asList(this.getSampleWriteStatusWithFileId(0, 2), this.getSampleWriteStatusWithFileId(2, 3), this.getSampleWriteStatusWithFileId(4, 3), this.getSampleWriteStatusWithFileId(0, 3), this.getSampleWriteStatusWithFileId(11, 0)), parallelism);
        Map fileIdPartitionMap = index.mapFileWithInsertsToUniquePartition(writeStatusRDD);
        int numWriteStatusWithInserts = (Integer)index.getHBasePutAccessParallelism((JavaRDD)writeStatusRDD)._2;
        JavaRDD partitionedRDD = writeStatusRDD.mapToPair((PairFunction & Serializable)w -> new Tuple2((Object)w.getFileId(), w)).partitionBy((Partitioner)new SparkHoodieHBaseIndex.WriteStatusPartitioner(fileIdPartitionMap, numWriteStatusWithInserts)).map((Function & Serializable)w -> (WriteStatus)w._2());
        org.junit.jupiter.api.Assertions.assertEquals((int)numWriteStatusWithInserts, (int)partitionedRDD.getNumPartitions());
        int[] partitionIndexesBeforeRepartition = writeStatusRDD.partitions().stream().mapToInt(p -> p.index()).toArray();
        org.junit.jupiter.api.Assertions.assertEquals((int)parallelism, (int)partitionIndexesBeforeRepartition.length);
        int[] partitionIndexesAfterRepartition = partitionedRDD.partitions().stream().mapToInt(p -> p.index()).toArray();
        org.junit.jupiter.api.Assertions.assertEquals((int)numWriteStatusWithInserts, (int)partitionIndexesAfterRepartition.length);
        for (List list : writeStatuses = partitionedRDD.collectPartitions(partitionIndexesAfterRepartition)) {
            int count = 0;
            for (WriteStatus w2 : list) {
                if (w2.getStat().getNumInserts() <= 0L) continue;
                ++count;
            }
            org.junit.jupiter.api.Assertions.assertEquals((int)1, (int)count);
        }
    }

    @Test
    public void testsWriteStatusPartitionerWithNoInserts() {
        HoodieWriteConfig config = this.getConfig();
        SparkHoodieHBaseIndex index = new SparkHoodieHBaseIndex(config);
        int parallelism = 3;
        JavaRDD writeStatusRDD = this.jsc().parallelize(Arrays.asList(this.getSampleWriteStatusWithFileId(0, 2), this.getSampleWriteStatusWithFileId(0, 3), this.getSampleWriteStatusWithFileId(0, 0)), parallelism);
        Map fileIdPartitionMap = index.mapFileWithInsertsToUniquePartition(writeStatusRDD);
        int numWriteStatusWithInserts = (Integer)index.getHBasePutAccessParallelism((JavaRDD)writeStatusRDD)._2;
        JavaRDD partitionedRDD = writeStatusRDD.mapToPair((PairFunction & Serializable)w -> new Tuple2((Object)w.getFileId(), w)).partitionBy((Partitioner)new SparkHoodieHBaseIndex.WriteStatusPartitioner(fileIdPartitionMap, numWriteStatusWithInserts)).map((Function & Serializable)w -> (WriteStatus)w._2());
        org.junit.jupiter.api.Assertions.assertEquals((int)numWriteStatusWithInserts, (int)partitionedRDD.getNumPartitions());
        int[] partitionIndexesBeforeRepartition = writeStatusRDD.partitions().stream().mapToInt(p -> p.index()).toArray();
        org.junit.jupiter.api.Assertions.assertEquals((int)parallelism, (int)partitionIndexesBeforeRepartition.length);
        int[] partitionIndexesAfterRepartition = partitionedRDD.partitions().stream().mapToInt(p -> p.index()).toArray();
        org.junit.jupiter.api.Assertions.assertEquals((int)numWriteStatusWithInserts, (int)partitionIndexesAfterRepartition.length);
        org.junit.jupiter.api.Assertions.assertEquals((int)partitionIndexesBeforeRepartition.length, (int)parallelism);
    }

    private WriteStatus getSampleWriteStatusWithFileId(int numInserts, int numUpdateWrites) {
        WriteStatus writeStatus = new WriteStatus(Boolean.valueOf(false), Double.valueOf(0.0));
        HoodieWriteStat hoodieWriteStat = new HoodieWriteStat();
        hoodieWriteStat.setNumInserts((long)numInserts);
        hoodieWriteStat.setNumUpdateWrites((long)numUpdateWrites);
        writeStatus.setStat(hoodieWriteStat);
        writeStatus.setFileId(UUID.randomUUID().toString());
        return writeStatus;
    }

    @Test
    public void testsHBasePutAccessParallelismWithNoInserts() {
        HoodieWriteConfig config = this.getConfig();
        SparkHoodieHBaseIndex index = new SparkHoodieHBaseIndex(config);
        JavaRDD writeStatusRDD = this.jsc().parallelize(Arrays.asList(this.getSampleWriteStatus(0, 2), this.getSampleWriteStatus(0, 1)), 10);
        Tuple2 tuple = index.getHBasePutAccessParallelism(writeStatusRDD);
        int hbasePutAccessParallelism = Integer.parseInt(((Integer)tuple._2).toString());
        int hbaseNumPuts = Integer.parseInt(((Long)tuple._1).toString());
        org.junit.jupiter.api.Assertions.assertEquals((int)10, (int)writeStatusRDD.getNumPartitions());
        org.junit.jupiter.api.Assertions.assertEquals((int)0, (int)hbasePutAccessParallelism);
        org.junit.jupiter.api.Assertions.assertEquals((int)0, (int)hbaseNumPuts);
    }

    @Test
    public void testSmallBatchSize() throws Exception {
        String newCommitTime = "001";
        int numRecords = 10;
        List records = this.dataGen.generateInserts("001", Integer.valueOf(10));
        JavaRDD writeRecords = this.jsc().parallelize(records, 1);
        HoodieWriteConfig config = this.getConfig(2);
        SparkHoodieHBaseIndex index = new SparkHoodieHBaseIndex(config);
        try (SparkRDDWriteClient writeClient = this.getHoodieWriteClient(config);){
            this.metaClient = HoodieTableMetaClient.reload((HoodieTableMetaClient)this.metaClient);
            HoodieSparkTable hoodieTable = HoodieSparkTable.create((HoodieWriteConfig)config, (HoodieEngineContext)this.context, (HoodieTableMetaClient)this.metaClient);
            JavaRDD<HoodieRecord> records1 = this.tagLocation((HoodieIndex)index, (JavaRDD<HoodieRecord>)writeRecords, (HoodieTable)hoodieTable);
            org.junit.jupiter.api.Assertions.assertEquals((long)0L, (long)records1.filter((Function & Serializable)record -> record.isCurrentLocationKnown()).count());
            writeClient.startCommitWithTime("001");
            JavaRDD writeStatues = writeClient.upsert(writeRecords, "001");
            Assertions.assertNoWriteErrors((List)writeStatues.collect());
            JavaRDD<HoodieRecord> records2 = this.tagLocation((HoodieIndex)index, (JavaRDD<HoodieRecord>)writeRecords, (HoodieTable)hoodieTable);
            org.junit.jupiter.api.Assertions.assertEquals((long)0L, (long)records2.filter((Function & Serializable)record -> record.isCurrentLocationKnown()).count());
            writeClient.commit("001", (Object)writeStatues);
            this.metaClient = HoodieTableMetaClient.reload((HoodieTableMetaClient)this.metaClient);
            hoodieTable = HoodieSparkTable.create((HoodieWriteConfig)config, (HoodieEngineContext)this.context, (HoodieTableMetaClient)this.metaClient);
            List records3 = this.tagLocation((HoodieIndex)index, (JavaRDD<HoodieRecord>)writeRecords, (HoodieTable)hoodieTable).collect();
            org.junit.jupiter.api.Assertions.assertEquals((long)10L, (long)records3.stream().filter(record -> record.isCurrentLocationKnown()).count());
            org.junit.jupiter.api.Assertions.assertEquals((long)10L, (long)records3.stream().map(record -> record.getKey().getRecordKey()).distinct().count());
            org.junit.jupiter.api.Assertions.assertEquals((long)10L, (long)records3.stream().filter(record -> record.getCurrentLocation() != null && record.getCurrentLocation().getInstantTime().equals("001")).distinct().count());
        }
    }

    @Test
    public void testDelete() throws Exception {
        String newCommitTime = "001";
        int numRecords = 10;
        List records = this.dataGen.generateInserts("001", Integer.valueOf(10));
        JavaRDD writeRecords = this.jsc().parallelize(records, 1);
        HoodieWriteConfig config = this.getConfig();
        SparkHoodieHBaseIndex index = new SparkHoodieHBaseIndex(config);
        try (SparkRDDWriteClient writeClient = this.getHoodieWriteClient(config);){
            this.metaClient = HoodieTableMetaClient.reload((HoodieTableMetaClient)this.metaClient);
            HoodieSparkTable hoodieTable = HoodieSparkTable.create((HoodieWriteConfig)config, (HoodieEngineContext)this.context, (HoodieTableMetaClient)this.metaClient);
            JavaRDD<HoodieRecord> records1 = this.tagLocation((HoodieIndex)index, (JavaRDD<HoodieRecord>)writeRecords, (HoodieTable)hoodieTable);
            org.junit.jupiter.api.Assertions.assertEquals((long)0L, (long)records1.filter((Function & Serializable)record -> record.isCurrentLocationKnown()).count());
            writeClient.startCommitWithTime("001");
            JavaRDD writeStatues = writeClient.upsert(writeRecords, "001");
            Assertions.assertNoWriteErrors((List)writeStatues.collect());
            writeClient.commit("001", (Object)writeStatues);
            this.metaClient = HoodieTableMetaClient.reload((HoodieTableMetaClient)this.metaClient);
            hoodieTable = HoodieSparkTable.create((HoodieWriteConfig)config, (HoodieEngineContext)this.context, (HoodieTableMetaClient)this.metaClient);
            List records2 = this.tagLocation((HoodieIndex)index, (JavaRDD<HoodieRecord>)writeRecords, (HoodieTable)hoodieTable).collect();
            org.junit.jupiter.api.Assertions.assertEquals((long)10L, (long)records2.stream().filter(record -> record.isCurrentLocationKnown()).count());
            org.junit.jupiter.api.Assertions.assertEquals((long)10L, (long)records2.stream().map(record -> record.getKey().getRecordKey()).distinct().count());
            org.junit.jupiter.api.Assertions.assertEquals((long)10L, (long)records2.stream().filter(record -> record.getCurrentLocation() != null && record.getCurrentLocation().getInstantTime().equals("001")).distinct().count());
            JavaRDD deleteWriteStatues = writeStatues.map((Function & Serializable)w -> {
                WriteStatus newWriteStatus = new WriteStatus(Boolean.valueOf(true), Double.valueOf(1.0));
                w.getWrittenRecordDelegates().forEach(r -> newWriteStatus.markSuccess(HoodieRecordDelegate.create((HoodieKey)r.getHoodieKey()), Option.empty()));
                org.junit.jupiter.api.Assertions.assertEquals((long)w.getTotalRecords(), (long)newWriteStatus.getTotalRecords());
                newWriteStatus.setStat(new HoodieWriteStat());
                return newWriteStatus;
            });
            deleteWriteStatues.cache();
            JavaRDD<WriteStatus> deleteStatus = this.updateLocation((HoodieIndex)index, (JavaRDD<WriteStatus>)deleteWriteStatues, (HoodieTable)hoodieTable);
            org.junit.jupiter.api.Assertions.assertEquals((long)deleteStatus.count(), (long)deleteWriteStatues.count());
            Assertions.assertNoWriteErrors((List)deleteStatus.collect());
            List records3 = this.tagLocation((HoodieIndex)index, (JavaRDD<HoodieRecord>)writeRecords, (HoodieTable)hoodieTable).collect();
            org.junit.jupiter.api.Assertions.assertEquals((long)0L, (long)records3.stream().filter(record -> record.isCurrentLocationKnown()).count());
            org.junit.jupiter.api.Assertions.assertEquals((long)10L, (long)records3.stream().map(record -> record.getKey().getRecordKey()).distinct().count());
            org.junit.jupiter.api.Assertions.assertEquals((long)0L, (long)records3.stream().filter(record -> record.getCurrentLocation() != null && record.getCurrentLocation().getInstantTime().equals("001")).distinct().count());
        }
    }

    private WriteStatus getSampleWriteStatus(int numInserts, int numUpdateWrites) {
        WriteStatus writeStatus = new WriteStatus(Boolean.valueOf(false), Double.valueOf(0.1));
        HoodieWriteStat hoodieWriteStat = new HoodieWriteStat();
        hoodieWriteStat.setNumInserts((long)numInserts);
        hoodieWriteStat.setNumUpdateWrites((long)numUpdateWrites);
        writeStatus.setStat(hoodieWriteStat);
        return writeStatus;
    }

    private HoodieWriteConfig getConfig() {
        return this.getConfigBuilder(100, false, false).build();
    }

    private HoodieWriteConfig getConfig(int hbaseIndexBatchSize) {
        return this.getConfigBuilder(hbaseIndexBatchSize, false, false).build();
    }

    private HoodieWriteConfig getConfig(boolean updatePartitionPath, boolean rollbackSync) {
        return this.getConfigBuilder(100, updatePartitionPath, rollbackSync).build();
    }

    private HoodieWriteConfig.Builder getConfigBuilder(int hbaseIndexBatchSize, boolean updatePartitionPath, boolean rollbackSync) {
        return HoodieWriteConfig.newBuilder().withPath(this.basePath).withSchema("{\"type\": \"record\",\"name\": \"triprec\",\"fields\": [ {\"name\": \"timestamp\",\"type\": \"long\"},{\"name\": \"_row_key\", \"type\": \"string\"},{\"name\": \"partition_path\", \"type\": [\"null\", \"string\"], \"default\": null },{\"name\": \"trip_type\", \"type\": {\"type\": \"enum\", \"name\": \"TripType\", \"symbols\": [\"UNKNOWN\", \"UBERX\", \"BLACK\"], \"default\": \"UNKNOWN\"}},{\"name\": \"rider\", \"type\": \"string\"},{\"name\": \"driver\", \"type\": \"string\"},{\"name\": \"begin_lat\", \"type\": \"double\"},{\"name\": \"begin_lon\", \"type\": \"double\"},{\"name\": \"end_lat\", \"type\": \"double\"},{\"name\": \"end_lon\", \"type\": \"double\"},{\"name\": \"distance_in_meters\", \"type\": \"int\"},{\"name\": \"seconds_since_epoch\", \"type\": \"long\"},{\"name\": \"weight\", \"type\": \"float\"},{\"name\": \"nation\", \"type\": \"bytes\"},{\"name\":\"current_date\",\"type\": {\"type\": \"int\", \"logicalType\": \"date\"}},{\"name\":\"current_ts\",\"type\": {\"type\": \"long\"}},{\"name\":\"height\",\"type\":{\"type\":\"fixed\",\"name\":\"abc\",\"size\":5,\"logicalType\":\"decimal\",\"precision\":10,\"scale\":6}},{\"name\": \"city_to_state\", \"type\": {\"type\": \"map\", \"values\": \"string\"}},{\"name\": \"fare\",\"type\": {\"type\":\"record\", \"name\":\"fare\",\"fields\": [{\"name\": \"amount\",\"type\": \"double\"},{\"name\": \"currency\", \"type\": \"string\"}]}},{\"name\": \"tip_history\", \"default\": [], \"type\": {\"type\": \"array\", \"default\": [], \"items\": {\"type\": \"record\", \"default\": null, \"name\": \"tip_history\", \"fields\": [{\"name\": \"amount\", \"type\": \"double\"}, {\"name\": \"currency\", \"type\": \"string\"}]}}},{\"name\": \"_hoodie_is_deleted\", \"type\": \"boolean\", \"default\": false} ]}").withParallelism(1, 1).withDeleteParallelism(1).withCompactionConfig(HoodieCompactionConfig.newBuilder().compactionSmallFileSize(0x100000L).withInlineCompaction(Boolean.valueOf(false)).build()).withAutoCommit(false).withStorageConfig(HoodieStorageConfig.newBuilder().hfileMaxFileSize(0x100000L).parquetMaxFileSize(0x100000L).build()).forTable("test-trip-table").withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.HBASE).withHBaseIndexConfig(new HoodieHBaseIndexConfig.Builder().hbaseZkPort(Integer.parseInt(hbaseConfig.get("hbase.zookeeper.property.clientPort"))).hbaseIndexPutBatchSizeAutoCompute(true).hbaseZkZnodeParent(hbaseConfig.get("zookeeper.znode.parent", "")).hbaseZkQuorum(hbaseConfig.get("hbase.zookeeper.quorum")).hbaseTableName(TABLE_NAME).hbaseIndexUpdatePartitionPath(updatePartitionPath).hbaseIndexRollbackSync(rollbackSync).hbaseIndexGetBatchSize(hbaseIndexBatchSize).build()).build());
    }
}

