/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.client.functional;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.avro.generic.GenericRecord;
import org.apache.hudi.avro.model.HoodieClusteringPlan;
import org.apache.hudi.avro.model.HoodieRequestedReplaceMetadata;
import org.apache.hudi.client.BaseHoodieWriteClient;
import org.apache.hudi.client.HoodieWriteResult;
import org.apache.hudi.client.SparkRDDWriteClient;
import org.apache.hudi.client.SparkTaskContextSupplier;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.client.clustering.plan.strategy.SparkSingleFileSortPlanStrategy;
import org.apache.hudi.client.clustering.run.strategy.SparkSingleFileSortExecutionStrategy;
import org.apache.hudi.client.clustering.update.strategy.SparkRejectUpdateStrategy;
import org.apache.hudi.client.transaction.ConflictResolutionStrategy;
import org.apache.hudi.client.transaction.FileSystemBasedLockProviderTestClass;
import org.apache.hudi.client.transaction.PreferWriterConflictResolutionStrategy;
import org.apache.hudi.client.transaction.SimpleConcurrentFileWritesConflictResolutionStrategy;
import org.apache.hudi.client.validator.SparkPreCommitValidator;
import org.apache.hudi.client.validator.SqlQueryEqualityPreCommitValidator;
import org.apache.hudi.client.validator.SqlQuerySingleResultPreCommitValidator;
import org.apache.hudi.common.config.HoodieMetadataConfig;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.data.HoodieData;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.engine.TaskContextSupplier;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.FileSlice;
import org.apache.hudi.common.model.HoodieBaseFile;
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.model.HoodieWriteStat;
import org.apache.hudi.common.model.WriteConcurrencyMode;
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.InstantGenerator;
import org.apache.hudi.common.table.timeline.TimelineFactory;
import org.apache.hudi.common.table.timeline.versioning.TimelineLayoutVersion;
import org.apache.hudi.common.table.view.TableFileSystemView;
import org.apache.hudi.common.testutils.FileCreateUtilsLegacy;
import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
import org.apache.hudi.common.testutils.HoodieTestUtils;
import org.apache.hudi.common.testutils.Transformations;
import org.apache.hudi.common.util.ClusteringUtils;
import org.apache.hudi.common.util.FileFormatUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieArchivalConfig;
import org.apache.hudi.config.HoodieCleanConfig;
import org.apache.hudi.config.HoodieClusteringConfig;
import org.apache.hudi.config.HoodieLockConfig;
import org.apache.hudi.config.HoodiePreCommitValidatorConfig;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieClusteringException;
import org.apache.hudi.exception.HoodieCorruptedDataException;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieInsertException;
import org.apache.hudi.exception.HoodieUpsertException;
import org.apache.hudi.exception.HoodieValidationException;
import org.apache.hudi.exception.HoodieWriteConflictException;
import org.apache.hudi.execution.bulkinsert.RDDCustomColumnsSortPartitioner;
import org.apache.hudi.io.HoodieMergeHandle;
import org.apache.hudi.keygen.BaseKeyGenerator;
import org.apache.hudi.keygen.KeyGenerator;
import org.apache.hudi.keygen.factory.HoodieSparkKeyGeneratorFactory;
import org.apache.hudi.storage.HoodieStorage;
import org.apache.hudi.storage.StoragePath;
import org.apache.hudi.table.HoodieSparkCopyOnWriteTable;
import org.apache.hudi.table.HoodieSparkTable;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.action.HoodieWriteMetadata;
import org.apache.hudi.table.action.cluster.ClusteringTestUtils;
import org.apache.hudi.table.upgrade.SparkUpgradeDowngradeHelper;
import org.apache.hudi.table.upgrade.SupportsUpgradeDowngrade;
import org.apache.hudi.testutils.Assertions;
import org.apache.hudi.testutils.HoodieClientTestBase;
import org.apache.hudi.testutils.HoodieClientTestUtils;
import org.apache.hudi.testutils.HoodieSparkWriteableTestTable;
import org.apache.hudi.utils.HoodieWriterClientTestHarness;
import org.apache.spark.api.java.AbstractJavaRDDLike;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;
import org.junit.jupiter.params.provider.ValueSource;

@Tag(value="functional")
public class TestHoodieClientOnCopyOnWriteStorage
extends HoodieClientTestBase {
    private static final Map<String, String> STRATEGY_PARAMS = new HashMap<String, String>(){
        {
            this.put("sortColumn", "record_key");
        }
    };
    private static final String COUNT_SQL_QUERY_FOR_VALIDATION = "select count(*) from <TABLE_NAME>";
    private final Function<List, JavaRDD> list2Rdd = recordList -> this.jsc.parallelize(recordList, 1);
    private final Function<JavaRDD, List> rdd2List = AbstractJavaRDDLike::collect;
    private final Function<HoodieWriteMetadata, HoodieWriteMetadata<List<WriteStatus>>> clusteringMetadataRdd2List = metadata -> metadata.clone((Object)((JavaRDD)metadata.getWriteStatuses()).collect());
    private final Function<HoodieWriteConfig, KeyGenerator> createKeyGenerator = config -> HoodieSparkKeyGeneratorFactory.createKeyGenerator((TypedProperties)config.getProps());
    private final HoodieWriterClientTestHarness.Function2<HoodieTable, HoodieTableMetaClient, HoodieWriteConfig> getHoodieTable = (metaClient, config) -> this.getHoodieTable((HoodieTableMetaClient)metaClient, (HoodieWriteConfig)config);

    private static Stream<Arguments> smallInsertHandlingParams() {
        return Arrays.stream(new Boolean[][]{{true}, {false}}).map(Arguments::of);
    }

    private static Stream<Arguments> populateMetaFieldsParams() {
        return Arrays.stream(new Boolean[][]{{true}, {false}}).map(Arguments::of);
    }

    private static Stream<Arguments> rollbackAfterConsistencyCheckFailureParams() {
        return Stream.of(Arguments.of((Object[])new Object[]{true, true}), Arguments.of((Object[])new Object[]{true, false}), Arguments.of((Object[])new Object[]{false, true}), Arguments.of((Object[])new Object[]{false, false}));
    }

    private static HoodiePreCommitValidatorConfig createPreCommitValidatorConfig(int expectedNumberOfRows) {
        return HoodiePreCommitValidatorConfig.newBuilder().withPreCommitValidator(SqlQuerySingleResultPreCommitValidator.class.getName()).withPrecommitValidatorSingleResultSqlQueries("select count(*) from <TABLE_NAME>#" + expectedNumberOfRows).build();
    }

    private static HoodieClusteringConfig.Builder createClusteringBuilder(boolean isInline, int inlineNumCommits) {
        return HoodieClusteringConfig.newBuilder().withClusteringMaxNumGroups(10).withClusteringTargetPartitions(0).withInlineClustering(Boolean.valueOf(isInline)).withInlineClusteringNumCommits(inlineNumCommits).fromProperties(TestHoodieClientOnCopyOnWriteStorage.getDisabledRowWriterProperties());
    }

    private static HoodieLockConfig createLockConfig(ConflictResolutionStrategy conflictResolutionStrategy) {
        return HoodieLockConfig.newBuilder().withLockProvider(FileSystemBasedLockProviderTestClass.class).withConflictResolutionStrategy(conflictResolutionStrategy).build();
    }

    private static HoodieCleanConfig createCleanConfig(HoodieFailedWritesCleaningPolicy policy, boolean autoClean) {
        return HoodieCleanConfig.newBuilder().withFailedWritesCleaningPolicy(policy).withAutoClean(Boolean.valueOf(autoClean)).build();
    }

    @BeforeEach
    public void setUpTestTable() {
        this.testTable = HoodieSparkWriteableTestTable.of((HoodieTableMetaClient)this.metaClient);
    }

    private Function<HoodieWriteConfig, BaseHoodieWriteClient> createBrokenClusteringClient(Throwable throwable) {
        return config -> new WriteClientBrokenClustering((HoodieEngineContext)this.context, (HoodieWriteConfig)config, throwable);
    }

    private Function<HoodieWriteConfig, BaseHoodieWriteClient> createBrokenCleaningClient(Throwable throwable) {
        return config -> new WriteClientBrokenClean((HoodieEngineContext)this.context, (HoodieWriteConfig)config, throwable);
    }

    protected Object castInsertFirstBatch(HoodieWriteConfig writeConfig, BaseHoodieWriteClient client, String newCommitTime, String initCommitTime, int numRecordsInThisCommit, HoodieWriterClientTestHarness.Function3<Object, BaseHoodieWriteClient, Object, String> writeFn, boolean isPreppedAPI, boolean assertForCommit, int expRecordsInThisCommit, boolean filterForCommitTimeWithAssert, InstantGenerator instantGenerator) throws Exception {
        return this.insertFirstBatch(writeConfig, (SparkRDDWriteClient)client, newCommitTime, initCommitTime, numRecordsInThisCommit, (HoodieWriterClientTestHarness.Function3<JavaRDD<WriteStatus>, SparkRDDWriteClient, JavaRDD<HoodieRecord>, String>)((HoodieWriterClientTestHarness.Function3)(writeClient, records, commitTime) -> (JavaRDD)writeFn.apply(writeClient, records, commitTime)), isPreppedAPI, assertForCommit, expRecordsInThisCommit, filterForCommitTimeWithAssert, instantGenerator);
    }

    protected Object castWriteBatch(BaseHoodieWriteClient client, String newCommitTime, String prevCommitTime, Option<List<String>> commitTimesBetweenPrevAndNew, String initCommitTime, int numRecordsInThisCommit, HoodieWriterClientTestHarness.Function2<List<HoodieRecord>, String, Integer> recordGenFunction, HoodieWriterClientTestHarness.Function3<Object, BaseHoodieWriteClient, Object, String> writeFn, boolean assertForCommit, int expRecordsInThisCommit, int expTotalRecords, int expTotalCommits, boolean doCommit, boolean filterForCommitTimeWithAssert, InstantGenerator instantGenerator) throws Exception {
        return this.writeBatch((SparkRDDWriteClient)client, newCommitTime, prevCommitTime, commitTimesBetweenPrevAndNew, initCommitTime, numRecordsInThisCommit, recordGenFunction, (HoodieWriterClientTestHarness.Function3<JavaRDD<WriteStatus>, SparkRDDWriteClient, JavaRDD<HoodieRecord>, String>)((HoodieWriterClientTestHarness.Function3)(writeClient, records, commitTime) -> (JavaRDD)writeFn.apply(writeClient, records, commitTime)), assertForCommit, expRecordsInThisCommit, expTotalRecords, expTotalCommits, doCommit, filterForCommitTimeWithAssert, instantGenerator);
    }

    protected Object castUpdateBatch(HoodieWriteConfig writeConfig, BaseHoodieWriteClient client, String newCommitTime, String prevCommitTime, Option<List<String>> commitTimesBetweenPrevAndNew, String initCommitTime, int numRecordsInThisCommit, HoodieWriterClientTestHarness.Function3<Object, BaseHoodieWriteClient, Object, String> writeFn, boolean isPreppedAPI, boolean assertForCommit, int expRecordsInThisCommit, int expTotalRecords, int expTotalCommits, boolean filterForCommitTimeWithAssert, InstantGenerator instantGenerator) throws Exception {
        HoodieWriterClientTestHarness.Function2<List<HoodieRecord>, String, Integer> recordGenFunction = this.generateWrapRecordsFn(isPreppedAPI, writeConfig, (HoodieWriterClientTestHarness.Function2<List<HoodieRecord>, String, Integer>)((HoodieWriterClientTestHarness.Function2)(arg_0, arg_1) -> ((HoodieTestDataGenerator)this.dataGen).generateUniqueUpdates(arg_0, arg_1)));
        return this.writeBatch((SparkRDDWriteClient)client, newCommitTime, prevCommitTime, commitTimesBetweenPrevAndNew, initCommitTime, numRecordsInThisCommit, recordGenFunction, (HoodieWriterClientTestHarness.Function3<JavaRDD<WriteStatus>, SparkRDDWriteClient, JavaRDD<HoodieRecord>, String>)((HoodieWriterClientTestHarness.Function3)(writeClient, records, commitTime) -> (JavaRDD)writeFn.apply(writeClient, records, commitTime)), assertForCommit, expRecordsInThisCommit, expTotalRecords, expTotalCommits, false, filterForCommitTimeWithAssert, instantGenerator);
    }

    protected Object castDeleteBatch(HoodieWriteConfig writeConfig, BaseHoodieWriteClient client, String newCommitTime, String prevCommitTime, String initCommitTime, int numRecordsInThisCommit, boolean isPreppedAPI, boolean assertForCommit, int expRecordsInThisCommit, int expTotalRecords, boolean filterForCommitTimeWithAssert, TimelineFactory timelineFactory, InstantGenerator instantGenerator) throws Exception {
        return this.deleteBatch(writeConfig, (SparkRDDWriteClient)client, newCommitTime, prevCommitTime, initCommitTime, numRecordsInThisCommit, isPreppedAPI, assertForCommit, expRecordsInThisCommit, expTotalRecords, filterForCommitTimeWithAssert, timelineFactory, instantGenerator);
    }

    @ParameterizedTest
    @MethodSource(value={"populateMetaFieldsParams"})
    public void testAutoCommitOnInsert(boolean populateMetaFields) throws Exception {
        this.testAutoCommit((writeClient, recordRDD, instantTime) -> writeClient.insert(recordRDD, instantTime), false, populateMetaFields, HoodieTestUtils.INSTANT_GENERATOR);
    }

    @Test
    public void testAutoCommitOnInsertPrepped() throws Exception {
        this.testAutoCommit((writeClient, recordRDD, instantTime) -> writeClient.insertPreppedRecords(recordRDD, instantTime), true, true, HoodieTestUtils.INSTANT_GENERATOR);
    }

    @ParameterizedTest
    @MethodSource(value={"populateMetaFieldsParams"})
    public void testAutoCommitOnUpsert(boolean populateMetaFields) throws Exception {
        this.testAutoCommit((writeClient, recordRDD, instantTime) -> writeClient.upsert(recordRDD, instantTime), false, populateMetaFields, HoodieTestUtils.INSTANT_GENERATOR);
    }

    @ParameterizedTest
    @MethodSource(value={"populateMetaFieldsParams"})
    public void testAutoCommitOnUpsertPrepped(boolean populateMetaFields) throws Exception {
        this.testAutoCommit((writeClient, recordRDD, instantTime) -> writeClient.upsertPreppedRecords(recordRDD, instantTime), true, populateMetaFields, HoodieTestUtils.INSTANT_GENERATOR);
    }

    @ParameterizedTest
    @MethodSource(value={"populateMetaFieldsParams"})
    public void testAutoCommitOnBulkInsert(boolean populateMetaFields) throws Exception {
        this.testAutoCommit((writeClient, recordRDD, instantTime) -> writeClient.bulkInsert(recordRDD, instantTime), false, populateMetaFields, HoodieTestUtils.INSTANT_GENERATOR);
    }

    @Test
    public void testAutoCommitOnBulkInsertPrepped() throws Exception {
        this.testAutoCommit((writeClient, recordRDD, instantTime) -> writeClient.bulkInsertPreppedRecords(recordRDD, instantTime, Option.empty()), true, true, HoodieTestUtils.INSTANT_GENERATOR);
    }

    @Test
    public void testPreCommitValidatorsOnInsert() throws Exception {
        int numRecords = 200;
        HoodieWriteConfig config = this.getConfigBuilder().withAutoCommit(true).withPreCommitValidatorConfig(TestHoodieClientOnCopyOnWriteStorage.createPreCommitValidatorConfig(200)).build();
        try (SparkRDDWriteClient client = this.getHoodieWriteClient(config);){
            HoodieWriterClientTestHarness.Function3 writeFn = (writeClient, recordRDD, instantTime) -> writeClient.bulkInsert(recordRDD, instantTime, Option.empty());
            String newCommitTime = client.createNewInstantTime();
            JavaRDD<WriteStatus> result = this.insertFirstBatch(config, client, newCommitTime, "000", numRecords, (HoodieWriterClientTestHarness.Function3<JavaRDD<WriteStatus>, SparkRDDWriteClient, JavaRDD<HoodieRecord>, String>)writeFn, false, false, numRecords, HoodieTestUtils.INSTANT_GENERATOR);
            org.junit.jupiter.api.Assertions.assertTrue((boolean)this.testTable.commitExists(newCommitTime));
        }
    }

    @Test
    public void testPreCommitValidationFailureOnInsert() throws Exception {
        String newCommitTime;
        block14: {
            int numRecords = 200;
            HoodieWriteConfig config = this.getConfigBuilder().withPreCommitValidatorConfig(TestHoodieClientOnCopyOnWriteStorage.createPreCommitValidatorConfig(500)).build();
            newCommitTime = this.metaClient.createNewInstantTime();
            try (SparkRDDWriteClient client = this.getHoodieWriteClient(config);){
                HoodieWriterClientTestHarness.Function3 writeFn = (writeClient, recordRDD, instantTime) -> writeClient.bulkInsert(recordRDD, instantTime, Option.empty());
                JavaRDD<WriteStatus> result = this.insertFirstBatch(config, client, newCommitTime, "000", numRecords, (HoodieWriterClientTestHarness.Function3<JavaRDD<WriteStatus>, SparkRDDWriteClient, JavaRDD<HoodieRecord>, String>)writeFn, false, false, numRecords, HoodieTestUtils.INSTANT_GENERATOR);
                org.junit.jupiter.api.Assertions.fail((String)"Expected validation to fail because we only insert 200 rows. Validation is configured to expect 500 rows");
            }
            catch (HoodieInsertException e) {
                if (e.getCause() instanceof HoodieValidationException) break block14;
                throw e;
            }
        }
        org.junit.jupiter.api.Assertions.assertFalse((boolean)this.testTable.commitExists(newCommitTime));
    }

    @Test
    public void testPreCommitValidationWithMultipleInflights() throws Exception {
        String instant1;
        HoodieWriteConfig config;
        int numRecords;
        block2: {
            numRecords = 200;
            config = this.getConfigBuilder().withCleanConfig(TestHoodieClientOnCopyOnWriteStorage.createCleanConfig(HoodieFailedWritesCleaningPolicy.NEVER, true)).withPreCommitValidatorConfig(TestHoodieClientOnCopyOnWriteStorage.createPreCommitValidatorConfig(500)).build();
            instant1 = this.getHoodieWriteClient(config).createNewInstantTime();
            try {
                this.insertWithConfig(config, numRecords, instant1);
                org.junit.jupiter.api.Assertions.fail((String)"Expected validation to fail because we only insert 200 rows. Validation is configured to expect 500 rows");
            }
            catch (HoodieInsertException e) {
                if (e.getCause() instanceof HoodieValidationException) break block2;
                throw e;
            }
        }
        org.junit.jupiter.api.Assertions.assertFalse((boolean)this.testTable.commitExists(instant1));
        org.junit.jupiter.api.Assertions.assertTrue((boolean)this.testTable.inflightCommitExists(instant1));
        numRecords = 300;
        config = this.getConfigBuilder().withCleanConfig(TestHoodieClientOnCopyOnWriteStorage.createCleanConfig(HoodieFailedWritesCleaningPolicy.NEVER, true)).withPreCommitValidatorConfig(TestHoodieClientOnCopyOnWriteStorage.createPreCommitValidatorConfig(numRecords)).build();
        String instant2 = this.getHoodieWriteClient(config).createNewInstantTime();
        this.insertWithConfig(config, numRecords, instant2);
        org.junit.jupiter.api.Assertions.assertTrue((boolean)this.testTable.inflightCommitExists(instant1));
        org.junit.jupiter.api.Assertions.assertTrue((boolean)this.testTable.commitExists(instant2));
    }

    private void insertWithConfig(HoodieWriteConfig config, int numRecords, String instant) throws Exception {
        try (SparkRDDWriteClient client = this.getHoodieWriteClient(config);){
            HoodieWriterClientTestHarness.Function3 writeFn = (writeClient, recordRDD, instantTime) -> writeClient.bulkInsert(recordRDD, instantTime, Option.empty());
            JavaRDD<WriteStatus> javaRDD = this.insertFirstBatch(config, client, instant, "000", numRecords, (HoodieWriterClientTestHarness.Function3<JavaRDD<WriteStatus>, SparkRDDWriteClient, JavaRDD<HoodieRecord>, String>)writeFn, false, false, numRecords, HoodieTestUtils.INSTANT_GENERATOR);
        }
    }

    @ParameterizedTest
    @MethodSource(value={"populateMetaFieldsParams"})
    public void testDeduplicationOnInsert(boolean populateMetaFields) throws Exception {
        this.testDeduplication((client, records, commitTime) -> this.rdd2List.apply((JavaRDD)client.insert((Object)this.list2Rdd.apply((List)records), commitTime)), populateMetaFields, false);
    }

    @ParameterizedTest
    @MethodSource(value={"populateMetaFieldsParams"})
    public void testDeduplicationKeepOperationFieldOnInsert(boolean populateMetaFields) throws Exception {
        this.testDeduplication((client, records, commitTime) -> this.rdd2List.apply((JavaRDD)client.insert((Object)this.list2Rdd.apply((List)records), commitTime)), populateMetaFields, true);
    }

    @ParameterizedTest
    @MethodSource(value={"populateMetaFieldsParams"})
    public void testDeduplicationOnBulkInsert(boolean populateMetaFields) throws Exception {
        this.testDeduplication((client, records, commitTime) -> this.rdd2List.apply((JavaRDD)client.bulkInsert((Object)this.list2Rdd.apply((List)records), commitTime)), populateMetaFields, false);
    }

    @Test
    public void testDeduplicationOnUpsert() throws Exception {
        this.testDeduplication((client, records, commitTime) -> this.rdd2List.apply((JavaRDD)client.upsert((Object)this.list2Rdd.apply((List)records), commitTime)), true, false);
    }

    @ParameterizedTest
    @MethodSource(value={"populateMetaFieldsParams"})
    public void testUpserts(boolean populateMetaFields) throws Exception {
        this.testUpsertsInternal((writeClient, recordRDD, instantTime) -> writeClient.upsert(recordRDD, instantTime), populateMetaFields, false, (SupportsUpgradeDowngrade)SparkUpgradeDowngradeHelper.getInstance());
    }

    @Test
    public void testUpsertsPrepped() throws Exception {
        this.testUpsertsInternal((writeClient, recordRDD, instantTime) -> writeClient.upsertPreppedRecords(recordRDD, instantTime), true, true, (SupportsUpgradeDowngrade)SparkUpgradeDowngradeHelper.getInstance());
    }

    protected void testMergeHandle(HoodieWriteConfig config) throws IOException {
        String instantTime = "007";
        HoodieTableMetaClient metaClient = HoodieClientTestUtils.createMetaClient(this.jsc, this.basePath);
        HoodieSparkTable table = this.getHoodieTable(metaClient, config);
        Pair partitionAndBaseFilePaths = this.getPartitionAndBaseFilePathsFromLatestCommitMetadata(metaClient);
        String partitionPath = (String)partitionAndBaseFilePaths.getLeft();
        String baseFilePath = (String)partitionAndBaseFilePaths.getRight();
        this.jsc.parallelize(Arrays.asList(1)).map(arg_0 -> TestHoodieClientOnCopyOnWriteStorage.lambda$testMergeHandle$6447fa78$1(baseFilePath, config, (HoodieTable)table, partitionPath, arg_0)).collect();
    }

    @Test
    public void testRestoreWithSavepointBeyondArchival() throws Exception {
        HoodieWriteConfig config = this.getConfigBuilder().withRollbackUsingMarkers(true).build();
        HoodieWriteConfig hoodieWriteConfig = this.getConfigBuilder(HoodieFailedWritesCleaningPolicy.EAGER).withRollbackUsingMarkers(true).withArchivalConfig(HoodieArchivalConfig.newBuilder().withArchiveBeyondSavepoint(true).build()).withProps((Map)config.getProps()).build();
        HoodieTableMetaClient.newTableBuilder().fromMetaClient(this.metaClient).setPopulateMetaFields(config.populateMetaFields()).initTable(this.metaClient.getStorageConf().newInstance(), this.metaClient.getBasePath());
        SparkRDDWriteClient client = this.getHoodieWriteClient(hoodieWriteConfig);
        String newCommitTime = "001";
        String initCommitTime = "000";
        int numRecords = 200;
        this.insertFirstBatch(hoodieWriteConfig, client, newCommitTime, initCommitTime, numRecords, (HoodieWriterClientTestHarness.Function3<JavaRDD<WriteStatus>, SparkRDDWriteClient, JavaRDD<HoodieRecord>, String>)((HoodieWriterClientTestHarness.Function3)SparkRDDWriteClient::insert), false, true, numRecords, config.populateMetaFields(), HoodieTestUtils.INSTANT_GENERATOR);
        String prevCommitTime = newCommitTime;
        newCommitTime = "004";
        numRecords = 100;
        String commitTimeBetweenPrevAndNew = "002";
        this.updateBatch(hoodieWriteConfig, client, newCommitTime, prevCommitTime, (Option<List<String>>)Option.of(Arrays.asList(commitTimeBetweenPrevAndNew)), initCommitTime, numRecords, (HoodieWriterClientTestHarness.Function3<JavaRDD<WriteStatus>, SparkRDDWriteClient, JavaRDD<HoodieRecord>, String>)((HoodieWriterClientTestHarness.Function3)SparkRDDWriteClient::upsert), false, true, numRecords, 200, 2, config.populateMetaFields(), HoodieTestUtils.INSTANT_GENERATOR);
        prevCommitTime = newCommitTime;
        newCommitTime = "005";
        numRecords = 50;
        this.deleteBatch(hoodieWriteConfig, client, newCommitTime, prevCommitTime, initCommitTime, numRecords, false, true, 0, 150, config.populateMetaFields(), HoodieTestUtils.TIMELINE_FACTORY, HoodieTestUtils.INSTANT_GENERATOR);
        HoodieWriteConfig newConfig = this.getConfigBuilder().withProps((Map)config.getProps()).withTimelineLayoutVersion(TimelineLayoutVersion.CURR_VERSION.intValue()).withArchivalConfig(HoodieArchivalConfig.newBuilder().withArchiveBeyondSavepoint(true).build()).build();
        client = this.getHoodieWriteClient(newConfig);
        client.savepoint("004", "user1", "comment1");
        SparkRDDWriteClient finalClient = client;
        org.junit.jupiter.api.Assertions.assertThrows(IllegalArgumentException.class, () -> finalClient.restoreToSavepoint("004"), (String)("Restore should not be supported when " + HoodieArchivalConfig.ARCHIVE_BEYOND_SAVEPOINT.key() + " is enabled"));
    }

    @ParameterizedTest
    @MethodSource(value={"populateMetaFieldsParams"})
    public void testInsertsWithHoodieConcatHandle(boolean populateMetaFields) throws Exception {
        this.testHoodieConcatHandle(populateMetaFields, false, HoodieTestUtils.INSTANT_GENERATOR);
    }

    @Test
    public void testInsertsPreppedWithHoodieConcatHandle() throws Exception {
        this.testHoodieConcatHandle(true, true, HoodieTestUtils.INSTANT_GENERATOR);
    }

    @Test
    public void testInsertsWithHoodieConcatHandleOnDuplicateIncomingKeys() throws Exception {
        this.testHoodieConcatHandleOnDupInserts(false, HoodieTestUtils.INSTANT_GENERATOR);
    }

    @Test
    public void testInsertsPreppedWithHoodieConcatHandleOnDuplicateIncomingKeys() throws Exception {
        this.testHoodieConcatHandleOnDupInserts(true, HoodieTestUtils.INSTANT_GENERATOR);
    }

    @Test
    public void testBulkInsertWithCustomPartitioner() {
        HoodieWriteConfig config = this.getConfigBuilder().withRollbackUsingMarkers(true).build();
        try (SparkRDDWriteClient client = this.getHoodieWriteClient(config);){
            String commitTime1 = "001";
            client.startCommitWithTime("001");
            List inserts1 = this.dataGen.generateInserts("001", Integer.valueOf(100));
            JavaRDD insertRecordsRDD1 = this.jsc.parallelize(inserts1, 10);
            RDDCustomColumnsSortPartitioner partitioner = new RDDCustomColumnsSortPartitioner(new String[]{"rider"}, HoodieTestDataGenerator.AVRO_SCHEMA, config);
            List statuses = client.bulkInsert(insertRecordsRDD1, "001", Option.of((Object)partitioner)).collect();
            Assertions.assertNoWriteErrors((List)statuses);
        }
    }

    @Test
    public void testPendingRestore() throws IOException {
        HoodieWriteConfig config = this.getConfigBuilder().withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(false).build()).build();
        StoragePath completeRestoreFile = null;
        StoragePath backupCompletedRestoreFile = null;
        try (SparkRDDWriteClient client = this.getHoodieWriteClient(config);){
            this.insertBatchRecords(client, "001", 100, 3, 2, (HoodieWriterClientTestHarness.Function3<JavaRDD<WriteStatus>, SparkRDDWriteClient, JavaRDD<HoodieRecord>, String>)((HoodieWriterClientTestHarness.Function3)SparkRDDWriteClient::insert));
            client.savepoint("001", "user1", "comment1");
            client.restoreToInstant("001", false);
            HoodieInstant restoreCompleted = (HoodieInstant)this.metaClient.reloadActiveTimeline().getRestoreTimeline().filterCompletedInstants().getInstants().get(0);
            completeRestoreFile = new StoragePath(config.getBasePath() + "/" + ".hoodie" + "/" + "timeline" + "/" + HoodieTestUtils.INSTANT_FILE_NAME_GENERATOR.getFileName(restoreCompleted));
            backupCompletedRestoreFile = new StoragePath(config.getBasePath() + "/" + ".hoodie" + "/" + "timeline" + "/" + HoodieTestUtils.INSTANT_FILE_NAME_GENERATOR.getFileName(restoreCompleted) + ".backup");
            this.metaClient.getStorage().rename(completeRestoreFile, backupCompletedRestoreFile);
        }
        client = this.getHoodieWriteClient(config);
        var5_5 = null;
        try {
            String commitTime2 = "002";
            org.junit.jupiter.api.Assertions.assertThrows(IllegalArgumentException.class, () -> client.startCommitWithTime("002"));
        }
        catch (Throwable throwable) {
            var5_5 = throwable;
            throw throwable;
        }
        finally {
            if (client != null) {
                if (var5_5 != null) {
                    try {
                        client.close();
                    }
                    catch (Throwable throwable) {
                        var5_5.addSuppressed(throwable);
                    }
                } else {
                    client.close();
                }
            }
        }
        this.metaClient.getStorage().rename(backupCompletedRestoreFile, completeRestoreFile);
        client = this.getHoodieWriteClient(config);
        var5_5 = null;
        try {
            this.insertBatchRecords(client, "003", 100, 3, 2, (HoodieWriterClientTestHarness.Function3<JavaRDD<WriteStatus>, SparkRDDWriteClient, JavaRDD<HoodieRecord>, String>)((HoodieWriterClientTestHarness.Function3)SparkRDDWriteClient::insert));
        }
        catch (Throwable throwable) {
            var5_5 = throwable;
            throw throwable;
        }
        finally {
            if (client != null) {
                if (var5_5 != null) {
                    try {
                        client.close();
                    }
                    catch (Throwable throwable) {
                        var5_5.addSuppressed(throwable);
                    }
                } else {
                    client.close();
                }
            }
        }
    }

    @Test
    public void testDeletes() throws Exception {
        HoodieWriterClientTestHarness.Function3 secondBatchGenFn = (instantTime, numRecordsInThisCommit, recordsInFirstBatch) -> {
            ArrayList recordsInSecondBatch = new ArrayList();
            return (time, numRecords) -> {
                List fewRecordsForDelete = recordsInFirstBatch.subList(0, 50);
                List fewRecordsForUpdate = recordsInFirstBatch.subList(50, 100);
                recordsInSecondBatch.addAll(this.dataGen.generateDeletesFromExistingRecords(fewRecordsForDelete));
                recordsInSecondBatch.addAll(fewRecordsForUpdate);
                return recordsInSecondBatch;
            };
        };
        super.testDeletes(secondBatchGenFn, 100, 50, 150);
    }

    @Test
    public void testDeletesForInsertsInSameBatch() throws Exception {
        super.testDeletesForInsertsInSameBatch(HoodieTestUtils.INSTANT_GENERATOR);
    }

    private Pair<JavaRDD<WriteStatus>, List<HoodieRecord>> insertBatchRecords(SparkRDDWriteClient client, String commitTime, Integer recordNum, int expectStatusSize, int numSlices, HoodieWriterClientTestHarness.Function3<JavaRDD<WriteStatus>, SparkRDDWriteClient, JavaRDD<HoodieRecord>, String> writeFn) throws IOException {
        client.startCommitWithTime(commitTime);
        List inserts = this.dataGen.generateInserts(commitTime, recordNum);
        JavaRDD insertRecordsRDD = this.jsc.parallelize(inserts, numSlices);
        JavaRDD statuses = (JavaRDD)writeFn.apply((Object)client, (Object)insertRecordsRDD, (Object)commitTime);
        Assertions.assertNoWriteErrors((List)statuses.collect());
        org.junit.jupiter.api.Assertions.assertEquals((long)expectStatusSize, (long)statuses.count(), (String)"check expect status size.");
        return Pair.of((Object)statuses, (Object)inserts);
    }

    @Test
    public void testUpdateRejectForClustering() throws IOException {
        String testPartitionPath = "2016/09/26";
        this.dataGen = new HoodieTestDataGenerator(new String[]{"2016/09/26"});
        Properties props = new Properties();
        props.setProperty(HoodieClusteringConfig.ASYNC_CLUSTERING_ENABLE.key(), "true");
        props.setProperty(HoodieClusteringConfig.UPDATES_STRATEGY.key(), SparkRejectUpdateStrategy.class.getName());
        HoodieWriteConfig config = this.getSmallInsertWriteConfig(100, "{\"type\": \"record\",\"name\": \"triprec\",\"fields\": [ {\"name\": \"timestamp\",\"type\": \"long\"},{\"name\": \"_row_key\", \"type\": \"string\"},{\"name\": \"partition_path\", \"type\": [\"null\", \"string\"], \"default\": null },{\"name\": \"trip_type\", \"type\": {\"type\": \"enum\", \"name\": \"TripType\", \"symbols\": [\"UNKNOWN\", \"UBERX\", \"BLACK\"], \"default\": \"UNKNOWN\"}},{\"name\": \"rider\", \"type\": \"string\"},{\"name\": \"driver\", \"type\": \"string\"},{\"name\": \"begin_lat\", \"type\": \"double\"},{\"name\": \"begin_lon\", \"type\": \"double\"},{\"name\": \"end_lat\", \"type\": \"double\"},{\"name\": \"end_lon\", \"type\": \"double\"},{\"name\": \"distance_in_meters\", \"type\": \"int\"},{\"name\": \"seconds_since_epoch\", \"type\": \"long\"},{\"name\": \"weight\", \"type\": \"float\"},{\"name\": \"nation\", \"type\": \"bytes\"},{\"name\":\"current_date\",\"type\": {\"type\": \"int\", \"logicalType\": \"date\"}},{\"name\":\"current_ts\",\"type\": {\"type\": \"long\"}},{\"name\":\"height\",\"type\":{\"type\":\"fixed\",\"name\":\"abc\",\"size\":5,\"logicalType\":\"decimal\",\"precision\":10,\"scale\":6}},{\"name\": \"city_to_state\", \"type\": {\"type\": \"map\", \"values\": \"string\"}},{\"name\": \"fare\",\"type\": {\"type\":\"record\", \"name\":\"fare\",\"fields\": [{\"name\": \"amount\",\"type\": \"double\"},{\"name\": \"currency\", \"type\": \"string\"}]}},{\"name\": \"tip_history\", \"default\": [], \"type\": {\"type\": \"array\", \"default\": [], \"items\": {\"type\": \"record\", \"default\": null, \"name\": \"tip_history\", \"fields\": [{\"name\": \"amount\", \"type\": \"double\"}, {\"name\": \"currency\", \"type\": \"string\"}]}}},{\"name\": \"_hoodie_is_deleted\", \"type\": \"boolean\", \"default\": false} ]}", this.dataGen.getEstimatedFileSizeInBytes(150), true, props);
        SparkRDDWriteClient client = this.getHoodieWriteClient(config);
        HoodieSparkCopyOnWriteTable table = (HoodieSparkCopyOnWriteTable)HoodieSparkTable.create((HoodieWriteConfig)config, (HoodieEngineContext)this.context, (HoodieTableMetaClient)this.metaClient);
        Pair<JavaRDD<WriteStatus>, List<HoodieRecord>> upsertResult = this.insertBatchRecords(client, "001", 600, 2, 1, (HoodieWriterClientTestHarness.Function3<JavaRDD<WriteStatus>, SparkRDDWriteClient, JavaRDD<HoodieRecord>, String>)((HoodieWriterClientTestHarness.Function3)SparkRDDWriteClient::upsert));
        List inserts1 = (List)upsertResult.getValue();
        List fileGroupIds1 = table.getFileSystemView().getAllFileGroups("2016/09/26").map(fileGroup -> fileGroup.getFileGroupId().getFileId()).collect(Collectors.toList());
        org.junit.jupiter.api.Assertions.assertEquals((int)2, (int)fileGroupIds1.size());
        List<List> firstInsertFileSlicesList = table.getFileSystemView().getAllFileGroups("2016/09/26").map(fileGroup -> fileGroup.getAllFileSlices().collect(Collectors.toList())).collect(Collectors.toList());
        List[] fileSlices = firstInsertFileSlicesList.toArray(new List[0]);
        this.createRequestedClusterInstant(this.metaClient, "002", fileSlices);
        this.insertBatchRecords(client, "003", 1, 1, 1, (HoodieWriterClientTestHarness.Function3<JavaRDD<WriteStatus>, SparkRDDWriteClient, JavaRDD<HoodieRecord>, String>)((HoodieWriterClientTestHarness.Function3)SparkRDDWriteClient::upsert));
        List fileGroupIds2 = table.getFileSystemView().getAllFileGroups("2016/09/26").map(fileGroup -> fileGroup.getFileGroupId().getFileId()).collect(Collectors.toList());
        org.junit.jupiter.api.Assertions.assertEquals((int)3, (int)fileGroupIds2.size());
        String commitTime4 = "004";
        client.startCommitWithTime(commitTime4);
        ArrayList insertsAndUpdates3 = new ArrayList(this.dataGen.generateUpdates(commitTime4, inserts1));
        String assertMsg = String.format("Not allowed to update the clustering files in partition: %s For pending clustering operations, we are not going to support update for now.", "2016/09/26");
        org.junit.jupiter.api.Assertions.assertThrows(HoodieUpsertException.class, () -> client.upsert(this.jsc.parallelize(insertsAndUpdates3, 1), commitTime4).collect(), (String)assertMsg);
        JavaRDD statuses = (JavaRDD)this.insertBatchRecords(client, "005", 1, 1, 1, (HoodieWriterClientTestHarness.Function3<JavaRDD<WriteStatus>, SparkRDDWriteClient, JavaRDD<HoodieRecord>, String>)((HoodieWriterClientTestHarness.Function3)SparkRDDWriteClient::upsert)).getKey();
        fileGroupIds2.removeAll(fileGroupIds1);
        org.junit.jupiter.api.Assertions.assertEquals(fileGroupIds2.get(0), (Object)((WriteStatus)statuses.collect().get(0)).getFileId());
        List firstInsertFileGroupIds4 = table.getFileSystemView().getAllFileGroups("2016/09/26").map(fileGroup -> fileGroup.getFileGroupId().getFileId()).collect(Collectors.toList());
        org.junit.jupiter.api.Assertions.assertEquals((int)3, (int)firstInsertFileGroupIds4.size());
    }

    @Test
    public void testSmallInsertHandlingForUpserts() throws Exception {
        String testPartitionPath = "2016/09/26";
        int insertSplitLimit = 100;
        HoodieWriteConfig config = this.getSmallInsertWriteConfig(100, "{\"type\": \"record\",\"name\": \"triprec\",\"fields\": [ {\"name\": \"timestamp\",\"type\": \"long\"},{\"name\": \"_row_key\", \"type\": \"string\"},{\"name\": \"partition_path\", \"type\": [\"null\", \"string\"], \"default\": null },{\"name\": \"trip_type\", \"type\": {\"type\": \"enum\", \"name\": \"TripType\", \"symbols\": [\"UNKNOWN\", \"UBERX\", \"BLACK\"], \"default\": \"UNKNOWN\"}},{\"name\": \"rider\", \"type\": \"string\"},{\"name\": \"driver\", \"type\": \"string\"},{\"name\": \"begin_lat\", \"type\": \"double\"},{\"name\": \"begin_lon\", \"type\": \"double\"},{\"name\": \"end_lat\", \"type\": \"double\"},{\"name\": \"end_lon\", \"type\": \"double\"},{\"name\": \"distance_in_meters\", \"type\": \"int\"},{\"name\": \"seconds_since_epoch\", \"type\": \"long\"},{\"name\": \"weight\", \"type\": \"float\"},{\"name\": \"nation\", \"type\": \"bytes\"},{\"name\":\"current_date\",\"type\": {\"type\": \"int\", \"logicalType\": \"date\"}},{\"name\":\"current_ts\",\"type\": {\"type\": \"long\"}},{\"name\":\"height\",\"type\":{\"type\":\"fixed\",\"name\":\"abc\",\"size\":5,\"logicalType\":\"decimal\",\"precision\":10,\"scale\":6}},{\"name\": \"city_to_state\", \"type\": {\"type\": \"map\", \"values\": \"string\"}},{\"name\": \"fare\",\"type\": {\"type\":\"record\", \"name\":\"fare\",\"fields\": [{\"name\": \"amount\",\"type\": \"double\"},{\"name\": \"currency\", \"type\": \"string\"}]}},{\"name\": \"tip_history\", \"default\": [], \"type\": {\"type\": \"array\", \"default\": [], \"items\": {\"type\": \"record\", \"default\": null, \"name\": \"tip_history\", \"fields\": [{\"name\": \"amount\", \"type\": \"double\"}, {\"name\": \"currency\", \"type\": \"string\"}]}}},{\"name\": \"_hoodie_is_deleted\", \"type\": \"boolean\", \"default\": false} ]}", this.dataGen.getEstimatedFileSizeInBytes(150));
        this.dataGen = new HoodieTestDataGenerator(new String[]{"2016/09/26"});
        SparkRDDWriteClient client = this.getHoodieWriteClient(config);
        FileFormatUtils fileUtils = TestHoodieClientOnCopyOnWriteStorage.getFileUtilsInstance(this.metaClient);
        String commitTime1 = "001";
        client.startCommitWithTime(commitTime1);
        List inserts1 = this.dataGen.generateInserts(commitTime1, Integer.valueOf(100));
        Set keys1 = Transformations.recordsToRecordKeySet((List)inserts1);
        JavaRDD insertRecordsRDD1 = this.jsc.parallelize(inserts1, 1);
        List statuses = client.upsert(insertRecordsRDD1, commitTime1).collect();
        Assertions.assertNoWriteErrors((List)statuses);
        org.junit.jupiter.api.Assertions.assertEquals((int)1, (int)statuses.size(), (String)"Just 1 file needs to be added.");
        String file1 = ((WriteStatus)statuses.get(0)).getFileId();
        org.junit.jupiter.api.Assertions.assertEquals((int)100, (int)fileUtils.readRowKeys(this.storage, new StoragePath(this.basePath, ((WriteStatus)statuses.get(0)).getStat().getPath())).size(), (String)"file should contain 100 records");
        String commitTime2 = "002";
        client.startCommitWithTime(commitTime2);
        List inserts2 = this.dataGen.generateInserts(commitTime2, Integer.valueOf(40));
        Set keys2 = Transformations.recordsToRecordKeySet((List)inserts2);
        ArrayList insertsAndUpdates2 = new ArrayList();
        insertsAndUpdates2.addAll(inserts2);
        insertsAndUpdates2.addAll(this.dataGen.generateUpdates(commitTime2, inserts1));
        JavaRDD insertAndUpdatesRDD2 = this.jsc.parallelize(insertsAndUpdates2, 1);
        statuses = client.upsert(insertAndUpdatesRDD2, commitTime2).collect();
        Assertions.assertNoWriteErrors((List)statuses);
        org.junit.jupiter.api.Assertions.assertEquals((int)1, (int)statuses.size(), (String)"Just 1 file needs to be updated.");
        org.junit.jupiter.api.Assertions.assertEquals((Object)file1, (Object)((WriteStatus)statuses.get(0)).getFileId(), (String)"Existing file should be expanded");
        org.junit.jupiter.api.Assertions.assertEquals((Object)commitTime1, (Object)((WriteStatus)statuses.get(0)).getStat().getPrevCommit(), (String)"Existing file should be expanded");
        StoragePath newFile = new StoragePath(this.basePath, ((WriteStatus)statuses.get(0)).getStat().getPath());
        org.junit.jupiter.api.Assertions.assertEquals((int)140, (int)fileUtils.readRowKeys(this.storage, newFile).size(), (String)"file should contain 140 records");
        List records = fileUtils.readAvroRecords(this.storage, newFile);
        for (GenericRecord record : records) {
            String recordKey = record.get(HoodieRecord.RECORD_KEY_METADATA_FIELD).toString();
            org.junit.jupiter.api.Assertions.assertEquals((Object)commitTime2, (Object)record.get(HoodieRecord.COMMIT_TIME_METADATA_FIELD).toString(), (String)"only expect commit2");
            org.junit.jupiter.api.Assertions.assertTrue((keys2.contains(recordKey) || keys1.contains(recordKey) ? 1 : 0) != 0, (String)"key expected to be part of commit2");
        }
        String commitTime3 = "003";
        client.startCommitWithTime(commitTime3);
        List insertsAndUpdates3 = this.dataGen.generateInserts(commitTime3, Integer.valueOf(200));
        Set keys3 = Transformations.recordsToRecordKeySet((List)insertsAndUpdates3);
        List updates3 = this.dataGen.generateUpdates(commitTime3, inserts2);
        insertsAndUpdates3.addAll(updates3);
        JavaRDD insertAndUpdatesRDD3 = this.jsc.parallelize(insertsAndUpdates3, 1);
        statuses = client.upsert(insertAndUpdatesRDD3, commitTime3).collect();
        Assertions.assertNoWriteErrors((List)statuses);
        org.junit.jupiter.api.Assertions.assertEquals((int)2, (int)statuses.size(), (String)"2 files needs to be committed.");
        HoodieTableMetaClient metadata = this.createMetaClient();
        HoodieSparkTable table = this.getHoodieTable(metadata, config);
        TableFileSystemView.BaseFileOnlyView fileSystemView = table.getBaseFileOnlyView();
        List files = fileSystemView.getLatestBaseFilesBeforeOrOn("2016/09/26", commitTime3).collect(Collectors.toList());
        int numTotalInsertsInCommit3 = 0;
        int numTotalUpdatesInCommit3 = 0;
        for (HoodieBaseFile file : files) {
            String recordKey;
            if (file.getFileName().contains(file1)) {
                org.junit.jupiter.api.Assertions.assertEquals((Object)commitTime3, (Object)file.getCommitTime(), (String)"Existing file should be expanded");
                records = fileUtils.readAvroRecords(this.storage, new StoragePath(file.getPath()));
                for (GenericRecord record : records) {
                    recordKey = record.get(HoodieRecord.RECORD_KEY_METADATA_FIELD).toString();
                    String recordCommitTime = record.get(HoodieRecord.COMMIT_TIME_METADATA_FIELD).toString();
                    if (!recordCommitTime.equals(commitTime3)) continue;
                    if (keys2.contains(recordKey)) {
                        keys2.remove(recordKey);
                        ++numTotalUpdatesInCommit3;
                        continue;
                    }
                    ++numTotalInsertsInCommit3;
                }
                org.junit.jupiter.api.Assertions.assertEquals((int)0, (int)keys2.size(), (String)"All keys added in commit 2 must be updated in commit3 correctly");
                continue;
            }
            org.junit.jupiter.api.Assertions.assertEquals((Object)commitTime3, (Object)file.getCommitTime(), (String)"New file must be written for commit 3");
            records = fileUtils.readAvroRecords(this.storage, new StoragePath(file.getPath()));
            for (GenericRecord record : records) {
                recordKey = record.get(HoodieRecord.RECORD_KEY_METADATA_FIELD).toString();
                org.junit.jupiter.api.Assertions.assertEquals((Object)commitTime3, (Object)record.get(HoodieRecord.COMMIT_TIME_METADATA_FIELD).toString(), (String)"only expect commit3");
                org.junit.jupiter.api.Assertions.assertTrue((boolean)keys3.contains(recordKey), (String)"key expected to be part of commit3");
            }
            numTotalInsertsInCommit3 += records.size();
        }
        org.junit.jupiter.api.Assertions.assertEquals((int)numTotalUpdatesInCommit3, (int)inserts2.size(), (String)"Total updates in commit3 must add up");
        org.junit.jupiter.api.Assertions.assertEquals((int)numTotalInsertsInCommit3, (int)keys3.size(), (String)"Total inserts in commit3 must add up");
    }

    @ParameterizedTest
    @MethodSource(value={"smallInsertHandlingParams"})
    public void testSmallInsertHandlingForInserts(boolean mergeAllowDuplicateInserts) throws Exception {
        String testPartitionPath = "2016/09/26";
        int insertSplitLimit = 100;
        HoodieWriteConfig config = this.getSmallInsertWriteConfig(100, false, mergeAllowDuplicateInserts);
        this.dataGen = new HoodieTestDataGenerator(new String[]{"2016/09/26"});
        SparkRDDWriteClient client = this.getHoodieWriteClient(config);
        FileFormatUtils fileUtils = TestHoodieClientOnCopyOnWriteStorage.getFileUtilsInstance(this.metaClient);
        String commitTime1 = "001";
        Pair<JavaRDD<WriteStatus>, List<HoodieRecord>> insertResult = this.insertBatchRecords(client, commitTime1, 100, 1, 1, (HoodieWriterClientTestHarness.Function3<JavaRDD<WriteStatus>, SparkRDDWriteClient, JavaRDD<HoodieRecord>, String>)((HoodieWriterClientTestHarness.Function3)SparkRDDWriteClient::insert));
        List statuses = ((JavaRDD)insertResult.getLeft()).collect();
        Set keys1 = Transformations.recordsToRecordKeySet((List)((List)insertResult.getRight()));
        Assertions.assertPartitionMetadata((String)this.basePath, (String[])new String[]{"2016/09/26"}, (HoodieStorage)this.storage);
        String file1 = ((WriteStatus)statuses.get(0)).getFileId();
        org.junit.jupiter.api.Assertions.assertEquals((int)100, (int)fileUtils.readRowKeys(this.storage, new StoragePath(this.basePath, ((WriteStatus)statuses.get(0)).getStat().getPath())).size(), (String)"file should contain 100 records");
        String commitTime2 = "002";
        insertResult = this.insertBatchRecords(client, commitTime2, 40, 1, 1, (HoodieWriterClientTestHarness.Function3<JavaRDD<WriteStatus>, SparkRDDWriteClient, JavaRDD<HoodieRecord>, String>)((HoodieWriterClientTestHarness.Function3)SparkRDDWriteClient::insert));
        Set keys2 = Transformations.recordsToRecordKeySet((List)((List)insertResult.getRight()));
        statuses = ((JavaRDD)insertResult.getLeft()).collect();
        org.junit.jupiter.api.Assertions.assertEquals((Object)file1, (Object)((WriteStatus)statuses.get(0)).getFileId(), (String)"Existing file should be expanded");
        org.junit.jupiter.api.Assertions.assertEquals((Object)commitTime1, (Object)((WriteStatus)statuses.get(0)).getStat().getPrevCommit(), (String)"Existing file should be expanded");
        StoragePath newFile = new StoragePath(this.basePath, ((WriteStatus)statuses.get(0)).getStat().getPath());
        org.junit.jupiter.api.Assertions.assertEquals((int)140, (int)fileUtils.readRowKeys(this.storage, newFile).size(), (String)"file should contain 140 records");
        List records = fileUtils.readAvroRecords(this.storage, newFile);
        for (GenericRecord record : records) {
            String recordKey = record.get(HoodieRecord.RECORD_KEY_METADATA_FIELD).toString();
            String recCommitTime = record.get(HoodieRecord.COMMIT_TIME_METADATA_FIELD).toString();
            org.junit.jupiter.api.Assertions.assertTrue((commitTime1.equals(recCommitTime) || commitTime2.equals(recCommitTime) ? 1 : 0) != 0, (String)"Record expected to be part of commit 1 or commit2");
            org.junit.jupiter.api.Assertions.assertTrue((keys2.contains(recordKey) || keys1.contains(recordKey) ? 1 : 0) != 0, (String)"key expected to be part of commit 1 or commit2");
        }
        String commitTime3 = "003";
        insertResult = this.insertBatchRecords(client, commitTime3, 200, 2, 1, (HoodieWriterClientTestHarness.Function3<JavaRDD<WriteStatus>, SparkRDDWriteClient, JavaRDD<HoodieRecord>, String>)((HoodieWriterClientTestHarness.Function3)SparkRDDWriteClient::insert));
        statuses = ((JavaRDD)insertResult.getLeft()).collect();
        org.junit.jupiter.api.Assertions.assertEquals((int)340, (int)(fileUtils.readRowKeys(this.storage, new StoragePath(this.basePath, ((WriteStatus)statuses.get(0)).getStat().getPath())).size() + fileUtils.readRowKeys(this.storage, new StoragePath(this.basePath, ((WriteStatus)statuses.get(1)).getStat().getPath())).size()), (String)"file should contain 340 records");
        HoodieTableMetaClient metaClient = this.createMetaClient();
        HoodieSparkTable table = this.getHoodieTable(metaClient, config);
        List files = table.getBaseFileOnlyView().getLatestBaseFilesBeforeOrOn("2016/09/26", commitTime3).collect(Collectors.toList());
        org.junit.jupiter.api.Assertions.assertEquals((int)2, (int)files.size(), (String)"Total of 2 valid data files");
        int totalInserts = 0;
        for (HoodieBaseFile file : files) {
            org.junit.jupiter.api.Assertions.assertEquals((Object)commitTime3, (Object)file.getCommitTime(), (String)"All files must be at commit 3");
            totalInserts += fileUtils.readAvroRecords(this.storage, new StoragePath(file.getPath())).size();
        }
        org.junit.jupiter.api.Assertions.assertEquals((int)340, (int)totalInserts, (String)"Total number of records must add up");
    }

    @Test
    public void testDeletesWithDeleteApi() throws Exception {
        String testPartitionPath = "2016/09/26";
        int insertSplitLimit = 100;
        HoodieWriteConfig config = this.getSmallInsertWriteConfig(100, "{\"type\": \"record\",\"name\": \"triprec\",\"fields\": [ {\"name\": \"timestamp\",\"type\": \"long\"},{\"name\": \"_row_key\", \"type\": \"string\"},{\"name\": \"partition_path\", \"type\": [\"null\", \"string\"], \"default\": null },{\"name\": \"trip_type\", \"type\": {\"type\": \"enum\", \"name\": \"TripType\", \"symbols\": [\"UNKNOWN\", \"UBERX\", \"BLACK\"], \"default\": \"UNKNOWN\"}},{\"name\": \"rider\", \"type\": \"string\"},{\"name\": \"driver\", \"type\": \"string\"},{\"name\": \"begin_lat\", \"type\": \"double\"},{\"name\": \"begin_lon\", \"type\": \"double\"},{\"name\": \"end_lat\", \"type\": \"double\"},{\"name\": \"end_lon\", \"type\": \"double\"},{\"name\": \"distance_in_meters\", \"type\": \"int\"},{\"name\": \"seconds_since_epoch\", \"type\": \"long\"},{\"name\": \"weight\", \"type\": \"float\"},{\"name\": \"nation\", \"type\": \"bytes\"},{\"name\":\"current_date\",\"type\": {\"type\": \"int\", \"logicalType\": \"date\"}},{\"name\":\"current_ts\",\"type\": {\"type\": \"long\"}},{\"name\":\"height\",\"type\":{\"type\":\"fixed\",\"name\":\"abc\",\"size\":5,\"logicalType\":\"decimal\",\"precision\":10,\"scale\":6}},{\"name\": \"city_to_state\", \"type\": {\"type\": \"map\", \"values\": \"string\"}},{\"name\": \"fare\",\"type\": {\"type\":\"record\", \"name\":\"fare\",\"fields\": [{\"name\": \"amount\",\"type\": \"double\"},{\"name\": \"currency\", \"type\": \"string\"}]}},{\"name\": \"tip_history\", \"default\": [], \"type\": {\"type\": \"array\", \"default\": [], \"items\": {\"type\": \"record\", \"default\": null, \"name\": \"tip_history\", \"fields\": [{\"name\": \"amount\", \"type\": \"double\"}, {\"name\": \"currency\", \"type\": \"string\"}]}}},{\"name\": \"_hoodie_is_deleted\", \"type\": \"boolean\", \"default\": false} ]}", this.dataGen.getEstimatedFileSizeInBytes(150));
        this.dataGen = new HoodieTestDataGenerator(new String[]{"2016/09/26"});
        SparkRDDWriteClient client = this.getHoodieWriteClient(config);
        String commitTime1 = "001";
        client.startCommitWithTime(commitTime1);
        List inserts1 = this.dataGen.generateInserts(commitTime1, Integer.valueOf(100));
        Set keys1 = Transformations.recordsToRecordKeySet((List)inserts1);
        ArrayList<String> keysSoFar = new ArrayList<String>(keys1);
        JavaRDD insertRecordsRDD1 = this.jsc.parallelize(inserts1, 1);
        List statuses = client.upsert(insertRecordsRDD1, commitTime1).collect();
        Assertions.assertNoWriteErrors((List)statuses);
        org.junit.jupiter.api.Assertions.assertEquals((int)1, (int)statuses.size(), (String)"Just 1 file needs to be added.");
        String file1 = ((WriteStatus)statuses.get(0)).getFileId();
        org.junit.jupiter.api.Assertions.assertEquals((int)100, (int)TestHoodieClientOnCopyOnWriteStorage.getFileUtilsInstance(this.metaClient).readRowKeys(this.storage, new StoragePath(this.basePath, ((WriteStatus)statuses.get(0)).getStat().getPath())).size(), (String)"file should contain 100 records");
        this.testDeletes(client, inserts1, 20, file1, "002", 80, keysSoFar);
        Pair<Set<String>, List<HoodieRecord>> updateBatch2 = this.testUpdates("003", client, 40, 120);
        keysSoFar.addAll((Collection)updateBatch2.getLeft());
        this.testDeletes(client, (List)updateBatch2.getRight(), 10, file1, "004", 110, keysSoFar);
        Pair<Set<String>, List<HoodieRecord>> updateBatch3 = this.testUpdates("005", client, 40, 150);
        keysSoFar.addAll((Collection)updateBatch3.getLeft());
        String commitTime6 = "006";
        client.startCommitWithTime(commitTime6);
        List dummyInserts3 = this.dataGen.generateInserts(commitTime6, Integer.valueOf(20));
        List hoodieKeysToDelete3 = Transformations.randomSelectAsHoodieKeys((List)dummyInserts3, (int)20);
        JavaRDD deleteKeys3 = this.jsc.parallelize(hoodieKeysToDelete3, 1);
        statuses = client.delete(deleteKeys3, commitTime6).collect();
        Assertions.assertNoWriteErrors((List)statuses);
        org.junit.jupiter.api.Assertions.assertEquals((int)0, (int)statuses.size(), (String)"Just 0 write status for delete.");
        this.assertTheEntireDatasetHasAllRecordsStill(150);
        this.testDeletes(client, (List)updateBatch3.getRight(), 10, file1, "007", 140, keysSoFar);
    }

    @ParameterizedTest
    @MethodSource(value={"populateMetaFieldsParams"})
    public void testSimpleClustering(boolean populateMetaFields) throws Exception {
        this.testInsertAndClustering(TestHoodieClientOnCopyOnWriteStorage.createClusteringBuilder(true, 1).build(), populateMetaFields, true, false, SqlQueryEqualityPreCommitValidator.class.getName(), COUNT_SQL_QUERY_FOR_VALIDATION, "");
    }

    @Test
    public void testAndValidateClusteringOutputFiles() throws IOException {
        this.testAndValidateClusteringOutputFiles(this.createBrokenClusteringClient((Throwable)new HoodieException("CLUSTERING FAILURE")), TestHoodieClientOnCopyOnWriteStorage.createClusteringBuilder(true, 2).build(), this.list2Rdd, this.rdd2List);
    }

    @Test
    public void testRollbackOfRegularCommitWithPendingReplaceCommitInTimeline() throws Exception {
        this.testInsertAndClustering(TestHoodieClientOnCopyOnWriteStorage.createClusteringBuilder(true, 1).build(), true, false, false, SqlQueryEqualityPreCommitValidator.class.getName(), COUNT_SQL_QUERY_FOR_VALIDATION, "");
        HoodieWriteConfig.Builder cfgBuilder = this.getConfigBuilder().withAutoCommit(false);
        SparkRDDWriteClient client = this.getHoodieWriteClient(cfgBuilder.build());
        String commitTime1 = client.createNewInstantTime();
        this.insertBatchRecords(client, commitTime1, 200, 1, 2, (HoodieWriterClientTestHarness.Function3<JavaRDD<WriteStatus>, SparkRDDWriteClient, JavaRDD<HoodieRecord>, String>)((HoodieWriterClientTestHarness.Function3)SparkRDDWriteClient::upsert)).getLeft();
        HoodieTableMetaClient metaClient = this.createMetaClient();
        org.junit.jupiter.api.Assertions.assertEquals((int)2, (int)metaClient.getActiveTimeline().getCommitsTimeline().filterInflightsAndRequested().countInstants());
        JavaRDD statuses = (JavaRDD)this.insertBatchRecords(client, commitTime1, 200, 1, 2, (HoodieWriterClientTestHarness.Function3<JavaRDD<WriteStatus>, SparkRDDWriteClient, JavaRDD<HoodieRecord>, String>)((HoodieWriterClientTestHarness.Function3)SparkRDDWriteClient::upsert)).getLeft();
        client.commit(commitTime1, (Object)statuses);
        metaClient.reloadActiveTimeline();
        org.junit.jupiter.api.Assertions.assertEquals((int)1, (int)metaClient.getActiveTimeline().getCommitsTimeline().filterInflightsAndRequested().countInstants());
    }

    @ParameterizedTest
    @ValueSource(booleans={true, false})
    public void testInlineScheduleClustering(boolean scheduleInlineClustering) throws IOException {
        HoodieClusteringConfig clusteringConfig = TestHoodieClientOnCopyOnWriteStorage.createClusteringBuilder(false, 1).withAsyncClusteringMaxCommits(1).withScheduleInlineClustering(Boolean.valueOf(scheduleInlineClustering)).build();
        this.testInlineScheduleClustering(this.createBrokenClusteringClient((Throwable)new HoodieException("CLUSTERING FAILURE")), clusteringConfig, this.list2Rdd, this.rdd2List);
    }

    @ParameterizedTest
    @MethodSource(value={"populateMetaFieldsParams"})
    public void testClusteringWithSortColumns(boolean populateMetaFields) throws Exception {
        HoodieClusteringConfig clusteringConfig = TestHoodieClientOnCopyOnWriteStorage.createClusteringBuilder(true, 1).withClusteringSortColumns(populateMetaFields ? "_hoodie_record_key" : "_row_key").build();
        this.testInsertAndClustering(clusteringConfig, populateMetaFields, true, false, SqlQueryEqualityPreCommitValidator.class.getName(), COUNT_SQL_QUERY_FOR_VALIDATION, "");
    }

    @ParameterizedTest
    @MethodSource(value={"populateMetaFieldsParams"})
    public void testClusteringWithSortOneFilePerGroup(boolean populateMetaFields) throws Exception {
        HoodieClusteringConfig clusteringConfig = TestHoodieClientOnCopyOnWriteStorage.createClusteringBuilder(true, 1).withClusteringSortColumns("begin_lat,begin_lon").withClusteringPlanStrategyClass(SparkSingleFileSortPlanStrategy.class.getName()).withClusteringExecutionStrategyClass(SparkSingleFileSortExecutionStrategy.class.getName()).build();
        this.testInsertAndClustering(clusteringConfig, populateMetaFields, true, true, SqlQueryEqualityPreCommitValidator.class.getName(), COUNT_SQL_QUERY_FOR_VALIDATION, "");
    }

    @Test
    public void testPendingClusteringRollback() throws Exception {
        boolean populateMetaFields = true;
        List<HoodieRecord> allRecords = this.testInsertAndClustering(TestHoodieClientOnCopyOnWriteStorage.createClusteringBuilder(true, 1).build(), populateMetaFields, false);
        HoodieTableMetaClient metaClient = this.createMetaClient();
        List pendingClusteringPlans = TestHoodieClientOnCopyOnWriteStorage.getAndAssertPendingClusteringPlans((boolean)true, (HoodieTableMetaClient)metaClient);
        HoodieInstant pendingClusteringInstant = (HoodieInstant)((Pair)pendingClusteringPlans.get(0)).getLeft();
        HoodieWriteConfig.Builder cfgBuilder = this.getConfigBuilder(HoodieFailedWritesCleaningPolicy.EAGER);
        this.addConfigsForPopulateMetaFields(cfgBuilder, populateMetaFields);
        HoodieWriteConfig config = cfgBuilder.build();
        SparkRDDWriteClient client = this.getHoodieWriteClient(config);
        this.dataGen = new HoodieTestDataGenerator();
        String commitTime = client.createNewInstantTime();
        allRecords.addAll(this.dataGen.generateInserts(commitTime, Integer.valueOf(200)));
        org.junit.jupiter.api.Assertions.assertThrows(HoodieUpsertException.class, () -> this.writeAndVerifyBatch((BaseHoodieWriteClient)client, allRecords, commitTime, populateMetaFields));
        client.rollback(pendingClusteringInstant.requestedTime());
        metaClient.reloadActiveTimeline();
        org.junit.jupiter.api.Assertions.assertEquals((long)0L, (long)ClusteringUtils.getAllPendingClusteringPlans((HoodieTableMetaClient)metaClient).count());
        HoodieInstant rollbackInstant = (HoodieInstant)metaClient.getActiveTimeline().getRollbackTimeline().lastInstant().get();
        FileCreateUtilsLegacy.deleteRollbackCommit((String)metaClient.getBasePath().toString(), (String)rollbackInstant.requestedTime());
        metaClient.reloadActiveTimeline();
        HoodieClusteringPlan clusteringPlan = ClusteringTestUtils.createClusteringPlan(metaClient, pendingClusteringInstant.requestedTime(), "1");
        HoodieRequestedReplaceMetadata requestedReplaceMetadata = HoodieRequestedReplaceMetadata.newBuilder().setClusteringPlan(clusteringPlan).setOperationType(WriteOperationType.CLUSTER.name()).build();
        FileCreateUtilsLegacy.createRequestedClusterCommit((String)metaClient.getBasePath().toString(), (String)pendingClusteringInstant.requestedTime(), (HoodieRequestedReplaceMetadata)requestedReplaceMetadata);
        try {
            client.cluster(pendingClusteringInstant.requestedTime(), false);
        }
        catch (Exception exception) {
            // empty catch block
        }
        metaClient.reloadActiveTimeline();
        HoodieInstant newRollbackInstant = (HoodieInstant)metaClient.getActiveTimeline().getRollbackTimeline().lastInstant().get();
        org.junit.jupiter.api.Assertions.assertEquals((Object)rollbackInstant.requestedTime(), (Object)newRollbackInstant.requestedTime());
    }

    @ParameterizedTest
    @ValueSource(booleans={true, false})
    public void testInflightClusteringRollbackWhenUpdatesAllowed(boolean rollbackPendingClustering) throws Exception {
        HoodieClusteringConfig clusteringConfig = TestHoodieClientOnCopyOnWriteStorage.createClusteringBuilder(true, 1).withClusteringUpdatesStrategy("org.apache.hudi.client.clustering.update.strategy.SparkAllowUpdateStrategy").withRollbackPendingClustering(Boolean.valueOf(rollbackPendingClustering)).build();
        List<HoodieRecord> allRecords = this.testInsertAndClustering(clusteringConfig, true, false);
        HoodieTableMetaClient metaClient = this.createMetaClient();
        List pendingClusteringPlans = TestHoodieClientOnCopyOnWriteStorage.getAndAssertPendingClusteringPlans((boolean)true, (HoodieTableMetaClient)metaClient);
        HoodieInstant pendingClusteringInstant = (HoodieInstant)((Pair)pendingClusteringPlans.get(0)).getLeft();
        org.junit.jupiter.api.Assertions.assertEquals((Object)pendingClusteringInstant.getState(), (Object)HoodieInstant.State.INFLIGHT);
        HoodieWriteConfig.Builder cfgBuilder = this.getConfigBuilder(HoodieFailedWritesCleaningPolicy.EAGER);
        this.addConfigsForPopulateMetaFields(cfgBuilder, true);
        cfgBuilder.withClusteringConfig(clusteringConfig);
        cfgBuilder.withProperties(TestHoodieClientOnCopyOnWriteStorage.getPropertiesForKeyGen());
        HoodieWriteConfig config = cfgBuilder.build();
        SparkRDDWriteClient client = this.getHoodieWriteClient(config);
        String commitTime = client.createNewInstantTime();
        allRecords.addAll(this.dataGen.generateUpdates(commitTime, Integer.valueOf(200)));
        this.writeAndVerifyBatch((BaseHoodieWriteClient)client, allRecords, commitTime, true);
        metaClient.reloadActiveTimeline();
        pendingClusteringPlans = ClusteringUtils.getAllPendingClusteringPlans((HoodieTableMetaClient)metaClient).collect(Collectors.toList());
        org.junit.jupiter.api.Assertions.assertEquals((int)(config.isRollbackPendingClustering() ? 0 : 1), (int)pendingClusteringPlans.size());
    }

    @Test
    public void testClusteringWithFailingValidator() throws Exception {
        HoodieClusteringConfig clusteringConfig = TestHoodieClientOnCopyOnWriteStorage.createClusteringBuilder(true, 1).withClusteringSortColumns("_hoodie_record_key").withInlineClustering(Boolean.valueOf(true)).build();
        try {
            this.testInsertAndClustering(clusteringConfig, true, true, false, FailingPreCommitValidator.class.getName(), COUNT_SQL_QUERY_FOR_VALIDATION, "");
            org.junit.jupiter.api.Assertions.fail((String)"expected pre-commit clustering validation to fail");
        }
        catch (HoodieValidationException hoodieValidationException) {
            // empty catch block
        }
    }

    @Test
    public void testClusteringInvalidConfigForSqlQueryValidator() throws Exception {
        try {
            this.testInsertAndClustering(TestHoodieClientOnCopyOnWriteStorage.createClusteringBuilder(true, 1).build(), false, true, false, SqlQueryEqualityPreCommitValidator.class.getName(), "", "");
            org.junit.jupiter.api.Assertions.fail((String)"expected pre-commit clustering validation to fail because sql query is not configured");
        }
        catch (HoodieValidationException hoodieValidationException) {
            // empty catch block
        }
    }

    @Test
    public void testClusteringInvalidConfigForSqlQuerySingleResultValidator() throws Exception {
        this.testInsertAndClustering(TestHoodieClientOnCopyOnWriteStorage.createClusteringBuilder(true, 1).build(), false, true, false, SqlQuerySingleResultPreCommitValidator.class.getName(), "", "select count(*) from <TABLE_NAME>#400");
    }

    @Test
    public void testClusteringInvalidConfigForSqlQuerySingleResultValidatorFailure() throws Exception {
        try {
            this.testInsertAndClustering(TestHoodieClientOnCopyOnWriteStorage.createClusteringBuilder(true, 1).build(), false, true, false, SqlQuerySingleResultPreCommitValidator.class.getName(), "", "select count(*) from <TABLE_NAME>#802");
            org.junit.jupiter.api.Assertions.fail((String)"expected pre-commit clustering validation to fail because of count mismatch. expect 400 rows, not 802");
        }
        catch (HoodieValidationException hoodieValidationException) {
            // empty catch block
        }
    }

    private List<HoodieRecord> testInsertAndClustering(HoodieClusteringConfig clusteringConfig, boolean populateMetaFields, boolean completeClustering) throws Exception {
        return this.testInsertAndClustering(clusteringConfig, populateMetaFields, completeClustering, false, "", "", "");
    }

    private List<HoodieRecord> testInsertAndClustering(HoodieClusteringConfig clusteringConfig, boolean populateMetaFields, boolean completeClustering, boolean assertSameFileIds, String validatorClasses, String sqlQueryForEqualityValidation, String sqlQueryForSingleResultValidation) throws Exception {
        Pair allRecords = this.testInsertTwoBatches(populateMetaFields, this.createBrokenClusteringClient((Throwable)new HoodieException("CLUSTERING FAILURE")));
        this.testClustering(clusteringConfig, populateMetaFields, completeClustering, assertSameFileIds, validatorClasses, sqlQueryForEqualityValidation, sqlQueryForSingleResultValidation, allRecords, this.clusteringMetadataRdd2List, this.createKeyGenerator);
        return (List)((Pair)allRecords.getLeft()).getLeft();
    }

    @ParameterizedTest
    @ValueSource(booleans={true, false})
    public void testFailWritesOnInlineTableServiceExceptions(boolean shouldFail) throws IOException {
        this.testFailWritesOnInlineTableServiceThrowable(shouldFail, shouldFail, this.createBrokenClusteringClient((Throwable)new HoodieException("CLUSTERING FAILURE")), "CLUSTERING FAILURE");
    }

    @Test
    public void testFailWritesOnInlineTableServiceErrors() throws IOException {
        this.testFailWritesOnInlineTableServiceThrowable(false, true, this.createBrokenClusteringClient(new OutOfMemoryError("CLUSTERING FAILURE")), "CLUSTERING FAILURE");
    }

    @Test
    public void testFailWritesOnInlineCleanExceptions() throws IOException {
        this.testFailWritesOnInlineTableServiceThrowable(true, true, this.createBrokenCleaningClient((Throwable)new HoodieException("CLEANING FAILURE")), "CLEANING FAILURE");
        this.testFailWritesOnInlineTableServiceThrowable(false, true, this.createBrokenCleaningClient((Throwable)new HoodieException("CLEANING FAILURE")), "CLEANING FAILURE");
        this.testFailWritesOnInlineTableServiceThrowable(true, true, this.createBrokenCleaningClient(new OutOfMemoryError("CLEANING FAILURE")), "CLEANING FAILURE");
    }

    @ParameterizedTest
    @MethodSource(value={"populateMetaFieldsParams"})
    public void testInsertOverwritePartitionHandlingWithMoreRecords(boolean populateMetaFields) throws Exception {
        this.verifyInsertOverwritePartitionHandling(1000, 3000, populateMetaFields);
    }

    @Test
    public void testInsertOverwritePartitionHandlingWithFewerRecords() throws Exception {
        this.verifyInsertOverwritePartitionHandling(3000, 1000, true);
    }

    @Test
    public void testInsertOverwritePartitionHandlingWithSimilarNumberOfRecords() throws Exception {
        this.verifyInsertOverwritePartitionHandling(3000, 3000, true);
    }

    private void verifyInsertOverwritePartitionHandling(int batch1RecordsCount, int batch2RecordsCount, boolean populateMetaFields) throws Exception {
        String testPartitionPath = "americas";
        HoodieWriteConfig config = this.getSmallInsertWriteConfig(2000, "{\"type\": \"record\",\"name\": \"triprec\",\"fields\": [ {\"name\": \"timestamp\",\"type\": \"long\"},{\"name\": \"_row_key\", \"type\": \"string\"},{\"name\": \"partition_path\", \"type\": [\"null\", \"string\"], \"default\": null },{\"name\": \"trip_type\", \"type\": {\"type\": \"enum\", \"name\": \"TripType\", \"symbols\": [\"UNKNOWN\", \"UBERX\", \"BLACK\"], \"default\": \"UNKNOWN\"}},{\"name\": \"rider\", \"type\": \"string\"},{\"name\": \"driver\", \"type\": \"string\"},{\"name\": \"begin_lat\", \"type\": \"double\"},{\"name\": \"begin_lon\", \"type\": \"double\"},{\"name\": \"end_lat\", \"type\": \"double\"},{\"name\": \"end_lon\", \"type\": \"double\"},{\"name\": \"distance_in_meters\", \"type\": \"int\"},{\"name\": \"seconds_since_epoch\", \"type\": \"long\"},{\"name\": \"weight\", \"type\": \"float\"},{\"name\": \"nation\", \"type\": \"bytes\"},{\"name\":\"current_date\",\"type\": {\"type\": \"int\", \"logicalType\": \"date\"}},{\"name\":\"current_ts\",\"type\": {\"type\": \"long\"}},{\"name\":\"height\",\"type\":{\"type\":\"fixed\",\"name\":\"abc\",\"size\":5,\"logicalType\":\"decimal\",\"precision\":10,\"scale\":6}},{\"name\": \"city_to_state\", \"type\": {\"type\": \"map\", \"values\": \"string\"}},{\"name\": \"fare\",\"type\": {\"type\":\"record\", \"name\":\"fare\",\"fields\": [{\"name\": \"amount\",\"type\": \"double\"},{\"name\": \"currency\", \"type\": \"string\"}]}},{\"name\": \"tip_history\", \"default\": [], \"type\": {\"type\": \"array\", \"default\": [], \"items\": {\"type\": \"record\", \"default\": null, \"name\": \"tip_history\", \"fields\": [{\"name\": \"amount\", \"type\": \"double\"}, {\"name\": \"currency\", \"type\": \"string\"}]}}},{\"name\": \"_hoodie_is_deleted\", \"type\": \"boolean\", \"default\": false} ]}", this.dataGen.getEstimatedFileSizeInBytes(150), populateMetaFields, TestHoodieClientOnCopyOnWriteStorage.getPropertiesForKeyGen((boolean)populateMetaFields));
        SparkRDDWriteClient client = this.getHoodieWriteClient(config);
        this.dataGen = new HoodieTestDataGenerator(new String[]{"americas"});
        String commit1 = "001";
        List statuses = this.writeAndVerifyBatch((BaseHoodieWriteClient)client, this.dataGen.generateInserts(commit1, Integer.valueOf(batch1RecordsCount)), commit1, populateMetaFields);
        Set<String> batch1Buckets = this.getFileIdsFromWriteStatus(statuses);
        String commitTime2 = "002";
        client.startCommitWithTime(commitTime2, "replacecommit");
        List inserts2 = this.dataGen.generateInserts(commitTime2, Integer.valueOf(batch2RecordsCount));
        ArrayList insertsAndUpdates2 = new ArrayList(inserts2);
        JavaRDD insertAndUpdatesRDD2 = this.jsc.parallelize(insertsAndUpdates2, 2);
        HoodieWriteResult writeResult = client.insertOverwrite(insertAndUpdatesRDD2, commitTime2);
        statuses = writeResult.getWriteStatuses().collect();
        Assertions.assertNoWriteErrors((List)statuses);
        org.junit.jupiter.api.Assertions.assertEquals(batch1Buckets, new HashSet((Collection)writeResult.getPartitionToReplaceFileIds().get("americas")));
        this.verifyRecordsWritten(commitTime2, populateMetaFields, inserts2, statuses, config, HoodieSparkKeyGeneratorFactory.createKeyGenerator((TypedProperties)config.getProps()));
    }

    private Set<String> getFileIdsFromWriteStatus(List<WriteStatus> statuses) {
        return statuses.stream().map(WriteStatus::getFileId).collect(Collectors.toSet());
    }

    @ParameterizedTest
    @MethodSource(value={"populateMetaFieldsParams"})
    public void verifyDeletePartitionsHandlingWithFewerRecordsFirstPartition(boolean populateMetaFields) throws Exception {
        this.verifyDeletePartitionsHandling(1000, 3000, 3000, populateMetaFields);
    }

    @Test
    public void verifyDeletePartitionsHandlingWithSimilarNumberOfRecords() throws Exception {
        this.verifyDeletePartitionsHandling(3000, 3000, 3000, true);
    }

    @Test
    public void verifyDeletePartitionsHandlingHandlingWithFewerRecordsSecondThirdPartition() throws Exception {
        this.verifyDeletePartitionsHandling(3000, 1000, 1000, true);
    }

    private Set<String> insertPartitionRecordsWithCommit(SparkRDDWriteClient client, int recordsCount, String commitTime1, String partitionPath) throws IOException {
        client.startCommitWithTime(commitTime1);
        List inserts1 = this.dataGen.generateInsertsForPartition(commitTime1, Integer.valueOf(recordsCount), partitionPath);
        JavaRDD insertRecordsRDD1 = this.jsc.parallelize(inserts1, 2);
        List statuses = client.upsert(insertRecordsRDD1, commitTime1).collect();
        Assertions.assertNoWriteErrors((List)statuses);
        Set<String> batchBuckets = statuses.stream().map(WriteStatus::getFileId).collect(Collectors.toSet());
        this.verifyRecordsWritten(commitTime1, true, inserts1, statuses, client.getConfig(), HoodieSparkKeyGeneratorFactory.createKeyGenerator((TypedProperties)client.getConfig().getProps()));
        return batchBuckets;
    }

    private Set<String> deletePartitionWithCommit(SparkRDDWriteClient client, String commitTime, List<String> deletePartitionPath) {
        client.startCommitWithTime(commitTime, "replacecommit");
        HoodieWriteResult writeResult = client.deletePartitions(deletePartitionPath, commitTime);
        Set<String> deletePartitionReplaceFileIds = writeResult.getPartitionToReplaceFileIds().entrySet().stream().flatMap(entry -> ((List)entry.getValue()).stream()).collect(Collectors.toSet());
        return deletePartitionReplaceFileIds;
    }

    private void verifyDeletePartitionsHandling(int batch1RecordsCount, int batch2RecordsCount, int batch3RecordsCount, boolean populateMetaFields) throws Exception {
        HoodieWriteConfig config = this.getSmallInsertWriteConfig(2000, "{\"type\": \"record\",\"name\": \"triprec\",\"fields\": [ {\"name\": \"timestamp\",\"type\": \"long\"},{\"name\": \"_row_key\", \"type\": \"string\"},{\"name\": \"partition_path\", \"type\": [\"null\", \"string\"], \"default\": null },{\"name\": \"trip_type\", \"type\": {\"type\": \"enum\", \"name\": \"TripType\", \"symbols\": [\"UNKNOWN\", \"UBERX\", \"BLACK\"], \"default\": \"UNKNOWN\"}},{\"name\": \"rider\", \"type\": \"string\"},{\"name\": \"driver\", \"type\": \"string\"},{\"name\": \"begin_lat\", \"type\": \"double\"},{\"name\": \"begin_lon\", \"type\": \"double\"},{\"name\": \"end_lat\", \"type\": \"double\"},{\"name\": \"end_lon\", \"type\": \"double\"},{\"name\": \"distance_in_meters\", \"type\": \"int\"},{\"name\": \"seconds_since_epoch\", \"type\": \"long\"},{\"name\": \"weight\", \"type\": \"float\"},{\"name\": \"nation\", \"type\": \"bytes\"},{\"name\":\"current_date\",\"type\": {\"type\": \"int\", \"logicalType\": \"date\"}},{\"name\":\"current_ts\",\"type\": {\"type\": \"long\"}},{\"name\":\"height\",\"type\":{\"type\":\"fixed\",\"name\":\"abc\",\"size\":5,\"logicalType\":\"decimal\",\"precision\":10,\"scale\":6}},{\"name\": \"city_to_state\", \"type\": {\"type\": \"map\", \"values\": \"string\"}},{\"name\": \"fare\",\"type\": {\"type\":\"record\", \"name\":\"fare\",\"fields\": [{\"name\": \"amount\",\"type\": \"double\"},{\"name\": \"currency\", \"type\": \"string\"}]}},{\"name\": \"tip_history\", \"default\": [], \"type\": {\"type\": \"array\", \"default\": [], \"items\": {\"type\": \"record\", \"default\": null, \"name\": \"tip_history\", \"fields\": [{\"name\": \"amount\", \"type\": \"double\"}, {\"name\": \"currency\", \"type\": \"string\"}]}}},{\"name\": \"_hoodie_is_deleted\", \"type\": \"boolean\", \"default\": false} ]}", this.dataGen.getEstimatedFileSizeInBytes(150), populateMetaFields, TestHoodieClientOnCopyOnWriteStorage.getPropertiesForKeyGen((boolean)populateMetaFields));
        SparkRDDWriteClient client = this.getHoodieWriteClient(config);
        this.dataGen = new HoodieTestDataGenerator();
        String commitTime1 = "001";
        Set<String> batch1Buckets = this.insertPartitionRecordsWithCommit(client, batch1RecordsCount, commitTime1, "2016/03/15");
        String commitTime2 = "002";
        Set<String> batch2Buckets = this.insertPartitionRecordsWithCommit(client, batch2RecordsCount, commitTime2, "2015/03/16");
        String commitTime3 = "003";
        Set<String> batch3Buckets = this.insertPartitionRecordsWithCommit(client, batch3RecordsCount, commitTime3, "2015/03/17");
        String commitTime4 = "004";
        Set<String> deletePartitionReplaceFileIds1 = this.deletePartitionWithCommit(client, commitTime4, Arrays.asList("2016/03/15"));
        org.junit.jupiter.api.Assertions.assertEquals(batch1Buckets, deletePartitionReplaceFileIds1);
        List<HoodieBaseFile> baseFiles = HoodieClientTestUtils.getLatestBaseFiles(this.basePath, this.storage, String.format("%s/%s/*", this.basePath, "2016/03/15"));
        org.junit.jupiter.api.Assertions.assertEquals((int)0, (int)baseFiles.size());
        baseFiles = HoodieClientTestUtils.getLatestBaseFiles(this.basePath, this.storage, String.format("%s/%s/*", this.basePath, "2015/03/16"));
        org.junit.jupiter.api.Assertions.assertTrue((baseFiles.size() > 0 ? 1 : 0) != 0);
        baseFiles = HoodieClientTestUtils.getLatestBaseFiles(this.basePath, this.storage, String.format("%s/%s/*", this.basePath, "2015/03/17"));
        org.junit.jupiter.api.Assertions.assertTrue((baseFiles.size() > 0 ? 1 : 0) != 0);
        String commitTime5 = "005";
        Set<String> deletePartitionReplaceFileIds2 = this.deletePartitionWithCommit(client, commitTime5, Arrays.asList("2015/03/16", "2015/03/17"));
        HashSet<String> expectedFileId = new HashSet<String>();
        expectedFileId.addAll(batch2Buckets);
        expectedFileId.addAll(batch3Buckets);
        org.junit.jupiter.api.Assertions.assertEquals(expectedFileId, deletePartitionReplaceFileIds2);
        baseFiles = HoodieClientTestUtils.getLatestBaseFiles(this.basePath, this.storage, String.format("%s/%s/*", this.basePath, "2016/03/15"), String.format("%s/%s/*", this.basePath, "2015/03/16"), String.format("%s/%s/*", this.basePath, "2015/03/17"));
        org.junit.jupiter.api.Assertions.assertEquals((int)0, (int)baseFiles.size());
    }

    private Pair<Set<String>, List<HoodieRecord>> testUpdates(String instantTime, SparkRDDWriteClient client, int sizeToInsertAndUpdate, int expectedRecords) throws IOException {
        client.startCommitWithTime(instantTime);
        List inserts = this.dataGen.generateInserts(instantTime, Integer.valueOf(sizeToInsertAndUpdate));
        Set keys = Transformations.recordsToRecordKeySet((List)inserts);
        ArrayList insertsAndUpdates = new ArrayList();
        insertsAndUpdates.addAll(inserts);
        insertsAndUpdates.addAll(this.dataGen.generateUpdates(instantTime, inserts));
        JavaRDD insertAndUpdatesRDD = this.jsc.parallelize(insertsAndUpdates, 1);
        List statuses = client.upsert(insertAndUpdatesRDD, instantTime).collect();
        Assertions.assertNoWriteErrors((List)statuses);
        this.assertTheEntireDatasetHasAllRecordsStill(expectedRecords);
        return Pair.of((Object)keys, (Object)inserts);
    }

    private void testDeletes(SparkRDDWriteClient client, List<HoodieRecord> previousRecords, int sizeToDelete, String existingFile, String instantTime, int expectedRecords, List<String> keys) {
        client.startCommitWithTime(instantTime);
        List hoodieKeysToDelete = Transformations.randomSelectAsHoodieKeys(previousRecords, (int)sizeToDelete);
        JavaRDD deleteKeys = this.jsc.parallelize(hoodieKeysToDelete, 1);
        List statuses = client.delete(deleteKeys, instantTime).collect();
        Assertions.assertNoWriteErrors((List)statuses);
        org.junit.jupiter.api.Assertions.assertEquals((int)1, (int)statuses.size(), (String)"Just 1 file needs to be added.");
        org.junit.jupiter.api.Assertions.assertEquals((Object)existingFile, (Object)((WriteStatus)statuses.get(0)).getFileId(), (String)"Existing file should be expanded");
        this.assertTheEntireDatasetHasAllRecordsStill(expectedRecords);
        StoragePath newFile = new StoragePath(this.basePath, ((WriteStatus)statuses.get(0)).getStat().getPath());
        org.junit.jupiter.api.Assertions.assertEquals((int)expectedRecords, (int)TestHoodieClientOnCopyOnWriteStorage.getFileUtilsInstance(this.metaClient).readRowKeys(this.storage, newFile).size(), (String)"file should contain 110 records");
        List records = TestHoodieClientOnCopyOnWriteStorage.getFileUtilsInstance(this.metaClient).readAvroRecords(this.storage, newFile);
        for (GenericRecord record : records) {
            String recordKey = record.get(HoodieRecord.RECORD_KEY_METADATA_FIELD).toString();
            org.junit.jupiter.api.Assertions.assertTrue((boolean)keys.contains(recordKey), (String)("key expected to be part of " + instantTime));
            org.junit.jupiter.api.Assertions.assertFalse((boolean)hoodieKeysToDelete.contains(recordKey), (String)"Key deleted");
        }
    }

    protected String[] assertTheEntireDatasetHasAllRecordsStill(int expectedRecords) {
        String[] fullPartitionPaths = new String[this.dataGen.getPartitionPaths().length];
        for (int i = 0; i < fullPartitionPaths.length; ++i) {
            fullPartitionPaths[i] = String.format("%s/%s/*", this.basePath, this.dataGen.getPartitionPaths()[i]);
        }
        org.junit.jupiter.api.Assertions.assertEquals((long)expectedRecords, (long)HoodieClientTestUtils.read(this.jsc, this.basePath, this.sqlContext, this.storage, fullPartitionPaths).count(), (String)("Must contain " + expectedRecords + " records"));
        return fullPartitionPaths;
    }

    @ParameterizedTest
    @MethodSource(value={"populateMetaFieldsParams"})
    public void testDeletesWithoutInserts(boolean populateMetaFields) {
        this.testDeletesWithoutInserts(populateMetaFields, this.list2Rdd, this.rdd2List);
    }

    @Test
    public void testCommitWritesRelativePaths() throws Exception {
        this.testCommitWritesRelativePaths(this.list2Rdd);
    }

    @ParameterizedTest
    @MethodSource(value={"populateMetaFieldsParams"})
    public void testMetadataStatsOnCommit(boolean populateMetaFields) throws Exception {
        this.testMetadataStatsOnCommit(populateMetaFields, this.list2Rdd);
    }

    @ParameterizedTest
    @ValueSource(booleans={true, false})
    public void testConsistencyCheckDuringFinalize(boolean enableOptimisticConsistencyGuard) throws Exception {
        this.testConsistencyCheckDuringFinalize((HoodieEngineContext)this.context, enableOptimisticConsistencyGuard, this.getHoodieTable, this.list2Rdd, this.rdd2List);
    }

    private void testRollbackAfterConsistencyCheckFailureUsingFileList(boolean rollbackUsingMarkers, boolean enableOptimisticConsistencyGuard, boolean populateMetaFields) throws Exception {
        this.testRollbackAfterConsistencyCheckFailureUsingFileList((HoodieEngineContext)this.context, rollbackUsingMarkers, enableOptimisticConsistencyGuard, populateMetaFields, this.getHoodieTable, this.list2Rdd, this.rdd2List);
    }

    @ParameterizedTest
    @ValueSource(booleans={true, false})
    public void testRollbackAfterConsistencyCheckFailureUsingFileList(boolean enableOptimisticConsistencyGuard) throws Exception {
        this.testRollbackAfterConsistencyCheckFailureUsingFileList(false, enableOptimisticConsistencyGuard, true);
    }

    @ParameterizedTest
    @MethodSource(value={"rollbackAfterConsistencyCheckFailureParams"})
    public void testRollbackAfterConsistencyCheckFailureUsingMarkers(boolean enableOptimisticConsistencyGuard, boolean populateMetCols) throws Exception {
        this.testRollbackAfterConsistencyCheckFailureUsingFileList(true, enableOptimisticConsistencyGuard, populateMetCols);
    }

    @Test
    public void testRollbackFailedCommits() throws Exception {
        super.testRollbackFailedCommits(true);
    }

    @Test
    public void testRollbackFailedCommitsToggleCleaningPolicy() throws Exception {
        super.testRollbackFailedCommitsToggleCleaningPolicy(true);
    }

    @Test
    public void testParallelInsertAndCleanPreviousFailedCommits() throws Exception {
        super.testParallelInsertAndCleanPreviousFailedCommits(true);
    }

    @Test
    public void testMultiOperationsPerCommit() throws IOException {
        HoodieWriteConfig.Builder cfgBuilder = this.getConfigBuilder().withAutoCommit(false).withAllowMultiWriteOnSameInstant(true);
        this.addConfigsForPopulateMetaFields(cfgBuilder, true);
        HoodieWriteConfig cfg = cfgBuilder.build();
        SparkRDDWriteClient client = this.getHoodieWriteClient(cfg);
        String firstInstantTime = "0000";
        client.startCommitWithTime(firstInstantTime);
        int numRecords = 200;
        JavaRDD writeRecords = this.jsc.parallelize(this.dataGen.generateInserts(firstInstantTime, Integer.valueOf(numRecords)), 1);
        JavaRDD result = client.bulkInsert(writeRecords, firstInstantTime);
        org.junit.jupiter.api.Assertions.assertTrue((boolean)client.commit(firstInstantTime, (Object)result), (String)"Commit should succeed");
        org.junit.jupiter.api.Assertions.assertTrue((boolean)this.testTable.commitExists(firstInstantTime), (String)"After explicit commit, commit file should be created");
        String[] fullPartitionPaths = this.assertTheEntireDatasetHasAllRecordsStill(numRecords);
        String nextInstantTime = "0001";
        client.startCommitWithTime(nextInstantTime);
        JavaRDD updateRecords = this.jsc.parallelize(this.dataGen.generateUpdates(nextInstantTime, Integer.valueOf(numRecords)), 1);
        JavaRDD insertRecords = this.jsc.parallelize(this.dataGen.generateInserts(nextInstantTime, Integer.valueOf(numRecords)), 1);
        JavaRDD inserts = client.bulkInsert(insertRecords, nextInstantTime);
        JavaRDD upserts = client.upsert(updateRecords, nextInstantTime);
        org.junit.jupiter.api.Assertions.assertTrue((boolean)client.commit(nextInstantTime, (Object)inserts.union(upserts)), (String)"Commit should succeed");
        org.junit.jupiter.api.Assertions.assertTrue((boolean)this.testTable.commitExists(firstInstantTime), (String)"After explicit commit, commit file should be created");
        int totalRecords = 2 * numRecords;
        org.junit.jupiter.api.Assertions.assertEquals((long)totalRecords, (long)HoodieClientTestUtils.read(this.jsc, this.basePath, this.sqlContext, this.storage, fullPartitionPaths).count(), (String)("Must contain " + totalRecords + " records"));
    }

    @Test
    public void testClusteringCommitInPresenceOfInflightCommit() throws Exception {
        Properties properties = TestHoodieClientOnCopyOnWriteStorage.getDisabledRowWriterProperties();
        properties.setProperty("hoodie.write.lock.filesystem.path", this.basePath + "/.hoodie/.locks");
        HoodieLockConfig lockConfig = TestHoodieClientOnCopyOnWriteStorage.createLockConfig((ConflictResolutionStrategy)new PreferWriterConflictResolutionStrategy());
        HoodieCleanConfig cleanConfig = TestHoodieClientOnCopyOnWriteStorage.createCleanConfig(HoodieFailedWritesCleaningPolicy.LAZY, false);
        HoodieWriteConfig insertWriteConfig = this.getConfigBuilder().withAutoCommit(false).withCleanConfig(cleanConfig).withLockConfig(lockConfig).withWriteConcurrencyMode(WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL).withProperties(properties).build();
        SparkRDDWriteClient client = this.getHoodieWriteClient(insertWriteConfig);
        int numRecords = 200;
        String firstCommit = client.createNewInstantTime();
        String partitionStr = "2016/03/15";
        HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator(new String[]{partitionStr});
        this.writeBatch(client, firstCommit, "000", (Option<List<String>>)Option.of(Arrays.asList("000")), "000", numRecords, (HoodieWriterClientTestHarness.Function2<List<HoodieRecord>, String, Integer>)((HoodieWriterClientTestHarness.Function2)(arg_0, arg_1) -> ((HoodieTestDataGenerator)dataGenerator).generateInserts(arg_0, arg_1)), (HoodieWriterClientTestHarness.Function3<JavaRDD<WriteStatus>, SparkRDDWriteClient, JavaRDD<HoodieRecord>, String>)((HoodieWriterClientTestHarness.Function3)SparkRDDWriteClient::insert), true, numRecords, numRecords, 1, true, HoodieTestUtils.INSTANT_GENERATOR);
        String inflightCommit = client.createNewInstantTime();
        this.writeBatch(client, inflightCommit, firstCommit, (Option<List<String>>)Option.of(Arrays.asList("000")), "000", 100, (HoodieWriterClientTestHarness.Function2<List<HoodieRecord>, String, Integer>)((HoodieWriterClientTestHarness.Function2)(arg_0, arg_1) -> ((HoodieTestDataGenerator)dataGenerator).generateUniqueUpdates(arg_0, arg_1)), (HoodieWriterClientTestHarness.Function3<JavaRDD<WriteStatus>, SparkRDDWriteClient, JavaRDD<HoodieRecord>, String>)((HoodieWriterClientTestHarness.Function3)SparkRDDWriteClient::upsert), false, 0, 200, 2, false, HoodieTestUtils.INSTANT_GENERATOR);
        HoodieWriteConfig clusteringWriteConfig = this.getConfigBuilder().withAutoCommit(false).withCleanConfig(cleanConfig).withClusteringConfig(TestHoodieClientOnCopyOnWriteStorage.createClusteringBuilder(true, 1).build()).withPreCommitValidatorConfig(TestHoodieClientOnCopyOnWriteStorage.createPreCommitValidatorConfig(200)).withLockConfig(lockConfig).withWriteConcurrencyMode(WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL).withProperties(properties).build();
        String clusteringCommitTime = client.createNewInstantTime();
        SparkRDDWriteClient clusteringWriteClient = this.getHoodieWriteClient(clusteringWriteConfig);
        clusteringWriteClient.scheduleClusteringAtInstant(clusteringCommitTime, Option.empty());
        org.junit.jupiter.api.Assertions.assertThrows(HoodieClusteringException.class, () -> clusteringWriteClient.cluster(clusteringCommitTime, true));
        clusteringWriteClient.rollback(clusteringCommitTime);
        List instants = this.metaClient.reloadActiveTimeline().getInstants();
        org.junit.jupiter.api.Assertions.assertEquals((int)3, (int)instants.size());
        org.junit.jupiter.api.Assertions.assertEquals((Object)"rollback", (Object)((HoodieInstant)instants.get(2)).getAction());
        org.junit.jupiter.api.Assertions.assertEquals((Object)HoodieTestUtils.INSTANT_GENERATOR.createNewInstant(HoodieInstant.State.INFLIGHT, "commit", inflightCommit), instants.get(1));
    }

    @Test
    public void testIngestionCommitInPresenceOfCompletedClusteringCommit() throws Exception {
        Properties properties = TestHoodieClientOnCopyOnWriteStorage.getDisabledRowWriterProperties();
        properties.setProperty("hoodie.write.lock.filesystem.path", this.basePath + "/.hoodie/.locks");
        HoodieCleanConfig cleanConfig = TestHoodieClientOnCopyOnWriteStorage.createCleanConfig(HoodieFailedWritesCleaningPolicy.LAZY, false);
        HoodieWriteConfig insertWriteConfig = this.getConfigBuilder().withAutoCommit(false).withCleanConfig(cleanConfig).withLockConfig(TestHoodieClientOnCopyOnWriteStorage.createLockConfig((ConflictResolutionStrategy)new PreferWriterConflictResolutionStrategy())).withWriteConcurrencyMode(WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL).withProperties(properties).build();
        SparkRDDWriteClient client = this.getHoodieWriteClient(insertWriteConfig);
        int numRecords = 200;
        String firstCommit = client.createNewInstantTime();
        String partitionStr = "2016/03/15";
        HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator(new String[]{partitionStr});
        this.writeBatch(client, firstCommit, "000", (Option<List<String>>)Option.of(Arrays.asList("000")), "000", numRecords, (HoodieWriterClientTestHarness.Function2<List<HoodieRecord>, String, Integer>)((HoodieWriterClientTestHarness.Function2)(arg_0, arg_1) -> ((HoodieTestDataGenerator)dataGenerator).generateInserts(arg_0, arg_1)), (HoodieWriterClientTestHarness.Function3<JavaRDD<WriteStatus>, SparkRDDWriteClient, JavaRDD<HoodieRecord>, String>)((HoodieWriterClientTestHarness.Function3)SparkRDDWriteClient::insert), true, numRecords, numRecords, 1, true, HoodieTestUtils.INSTANT_GENERATOR);
        String inflightCommit = client.createNewInstantTime();
        JavaRDD<WriteStatus> ingestionResult = this.writeBatch(client, inflightCommit, firstCommit, (Option<List<String>>)Option.of(Arrays.asList("000")), "000", 100, (HoodieWriterClientTestHarness.Function2<List<HoodieRecord>, String, Integer>)((HoodieWriterClientTestHarness.Function2)(arg_0, arg_1) -> ((HoodieTestDataGenerator)dataGenerator).generateUniqueUpdates(arg_0, arg_1)), (HoodieWriterClientTestHarness.Function3<JavaRDD<WriteStatus>, SparkRDDWriteClient, JavaRDD<HoodieRecord>, String>)((HoodieWriterClientTestHarness.Function3)SparkRDDWriteClient::upsert), false, 0, 200, 2, false, HoodieTestUtils.INSTANT_GENERATOR);
        HoodieWriteConfig clusteringWriteConfig = this.getConfigBuilder().withAutoCommit(false).withCleanConfig(cleanConfig).withClusteringConfig(TestHoodieClientOnCopyOnWriteStorage.createClusteringBuilder(true, 1).build()).withPreCommitValidatorConfig(TestHoodieClientOnCopyOnWriteStorage.createPreCommitValidatorConfig(200)).withLockConfig(TestHoodieClientOnCopyOnWriteStorage.createLockConfig((ConflictResolutionStrategy)new SimpleConcurrentFileWritesConflictResolutionStrategy())).withWriteConcurrencyMode(WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL).withProperties(properties).build();
        SparkRDDWriteClient clusteringWriteClient = this.getHoodieWriteClient(clusteringWriteConfig);
        String clusteringCommitTime = clusteringWriteClient.createNewInstantTime();
        clusteringWriteClient.scheduleClusteringAtInstant(clusteringCommitTime, Option.empty());
        clusteringWriteClient.cluster(clusteringCommitTime, true);
        org.junit.jupiter.api.Assertions.assertThrows(HoodieWriteConflictException.class, () -> client.commit(inflightCommit, (Object)ingestionResult));
    }

    protected HoodieInstant createRequestedClusterInstant(HoodieTableMetaClient metaClient, String clusterTime, List<FileSlice>[] fileSlices) throws IOException {
        HoodieClusteringPlan clusteringPlan = ClusteringUtils.createClusteringPlan((String)((String)HoodieClusteringConfig.EXECUTION_STRATEGY_CLASS_NAME.defaultValue()), STRATEGY_PARAMS, (List[])fileSlices, Collections.emptyMap());
        HoodieInstant clusteringInstant = HoodieTestUtils.INSTANT_GENERATOR.createNewInstant(HoodieInstant.State.REQUESTED, "clustering", clusterTime);
        HoodieRequestedReplaceMetadata requestedReplaceMetadata = HoodieRequestedReplaceMetadata.newBuilder().setClusteringPlan(clusteringPlan).setOperationType(WriteOperationType.CLUSTER.name()).build();
        metaClient.getActiveTimeline().saveToPendingClusterCommit(clusteringInstant, requestedReplaceMetadata);
        return clusteringInstant;
    }

    private static Properties getDisabledRowWriterProperties() {
        Properties properties = new Properties();
        properties.setProperty("hoodie.datasource.write.row.writer.enable", String.valueOf(false));
        return properties;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private static /* synthetic */ Boolean lambda$testMergeHandle$6447fa78$1(String baseFilePath, HoodieWriteConfig config, HoodieTable table, String partitionPath, Integer e) throws Exception {
        HoodieBaseFile baseFile = new HoodieBaseFile(baseFilePath);
        try (HoodieMergeHandle handle = null;){
            handle = new HoodieMergeHandle(config, "007", table, new HashMap(), partitionPath, FSUtils.getFileId((String)baseFile.getFileName()), baseFile, (TaskContextSupplier)new SparkTaskContextSupplier(), config.populateMetaFields() ? Option.empty() : Option.of((Object)((BaseKeyGenerator)HoodieSparkKeyGeneratorFactory.createKeyGenerator((TypedProperties)config.getProps()))));
            WriteStatus writeStatus = new WriteStatus(Boolean.valueOf(false), Double.valueOf(0.0));
            writeStatus.setStat(new HoodieWriteStat());
            writeStatus.getStat().setNumWrites(0L);
            handle.performMergeDataValidationCheck(writeStatus);
        }
        handle = null;
        try {
            String newInstantTime = "006";
            config.getProps().setProperty("hoodie.merge.data.validation.enabled", "true");
            HoodieWriteConfig cfg2 = HoodieWriteConfig.newBuilder().withProps((Map)config.getProps()).build();
            handle = new HoodieMergeHandle(cfg2, "006", table, new HashMap(), partitionPath, FSUtils.getFileId((String)baseFile.getFileName()), baseFile, (TaskContextSupplier)new SparkTaskContextSupplier(), config.populateMetaFields() ? Option.empty() : Option.of((Object)((BaseKeyGenerator)HoodieSparkKeyGeneratorFactory.createKeyGenerator((TypedProperties)config.getProps()))));
            WriteStatus writeStatus = new WriteStatus(Boolean.valueOf(false), Double.valueOf(0.0));
            writeStatus.setStat(new HoodieWriteStat());
            writeStatus.getStat().setNumWrites(0L);
            handle.performMergeDataValidationCheck(writeStatus);
            org.junit.jupiter.api.Assertions.fail((String)"The above line should have thrown an exception");
        }
        catch (HoodieCorruptedDataException hoodieCorruptedDataException) {
        }
        finally {
            if (handle != null) {
                try {
                    handle.close();
                }
                catch (Exception exception) {}
            }
        }
        return true;
    }

    public static class WriteClientBrokenClean<T extends HoodieRecordPayload>
    extends WriteClientBrokenBase<T> {
        public WriteClientBrokenClean(HoodieEngineContext context, HoodieWriteConfig clientConfig, Throwable throwable) {
            super(context, clientConfig, throwable);
        }

        protected void autoCleanOnCommit() {
            if (this.throwable instanceof Error) {
                throw (Error)this.throwable;
            }
            throw (HoodieException)this.throwable;
        }
    }

    public static class WriteClientBrokenClustering<T extends HoodieRecordPayload>
    extends WriteClientBrokenBase<T> {
        public WriteClientBrokenClustering(HoodieEngineContext context, HoodieWriteConfig clientConfig, Throwable throwable) {
            super(context, clientConfig, throwable);
        }

        protected void runTableServicesInlineInternal(HoodieTable table, HoodieCommitMetadata metadata, Option<Map<String, String>> extraMetadata) {
            if (this.config.inlineClusteringEnabled()) {
                if (this.throwable instanceof Error) {
                    throw (Error)this.throwable;
                }
                throw (HoodieException)this.throwable;
            }
        }
    }

    public static class WriteClientBrokenBase<T extends HoodieRecordPayload>
    extends SparkRDDWriteClient<T> {
        protected final Throwable throwable;

        public WriteClientBrokenBase(HoodieEngineContext context, HoodieWriteConfig clientConfig, Throwable throwable) {
            super(context, clientConfig);
            this.throwable = throwable;
        }
    }

    public static class FailingPreCommitValidator<T extends HoodieRecordPayload, I, K, O extends HoodieData<WriteStatus>>
    extends SparkPreCommitValidator<T, I, K, O> {
        public FailingPreCommitValidator(HoodieSparkTable table, HoodieEngineContext context, HoodieWriteConfig config) {
            super(table, context, config);
        }

        protected void validateRecordsBeforeAndAfter(Dataset<Row> before, Dataset<Row> after, Set<String> partitionsAffected) {
            throw new HoodieValidationException("simulate failure");
        }
    }
}

