/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.client.functional;

import java.io.IOException;
import java.io.InputStream;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.avro.generic.GenericRecord;
import org.apache.hadoop.fs.Path;
import org.apache.hudi.avro.model.HoodieCleanMetadata;
import org.apache.hudi.avro.model.HoodieClusteringPlan;
import org.apache.hudi.avro.model.HoodieRequestedReplaceMetadata;
import org.apache.hudi.client.HoodieWriteResult;
import org.apache.hudi.client.SparkRDDWriteClient;
import org.apache.hudi.client.SparkTaskContextSupplier;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.client.clustering.plan.strategy.SparkSingleFileSortPlanStrategy;
import org.apache.hudi.client.clustering.run.strategy.SparkSingleFileSortExecutionStrategy;
import org.apache.hudi.client.clustering.update.strategy.SparkRejectUpdateStrategy;
import org.apache.hudi.client.transaction.ConflictResolutionStrategy;
import org.apache.hudi.client.transaction.FileSystemBasedLockProviderTestClass;
import org.apache.hudi.client.transaction.PreferWriterConflictResolutionStrategy;
import org.apache.hudi.client.transaction.SimpleConcurrentFileWritesConflictResolutionStrategy;
import org.apache.hudi.client.transaction.lock.InProcessLockProvider;
import org.apache.hudi.client.validator.SparkPreCommitValidator;
import org.apache.hudi.client.validator.SqlQueryEqualityPreCommitValidator;
import org.apache.hudi.client.validator.SqlQuerySingleResultPreCommitValidator;
import org.apache.hudi.common.config.HoodieMetadataConfig;
import org.apache.hudi.common.config.HoodieStorageConfig;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.data.HoodieData;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.engine.TaskContextSupplier;
import org.apache.hudi.common.fs.ConsistencyGuardConfig;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.FileSlice;
import org.apache.hudi.common.model.HoodieAvroRecord;
import org.apache.hudi.common.model.HoodieBaseFile;
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy;
import org.apache.hudi.common.model.HoodieFileGroupId;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieOperation;
import org.apache.hudi.common.model.HoodiePreCombineAvroRecordMerger;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordDelegate;
import org.apache.hudi.common.model.HoodieRecordMerger;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.model.HoodieReplaceCommitMetadata;
import org.apache.hudi.common.model.HoodieWriteStat;
import org.apache.hudi.common.model.IOType;
import org.apache.hudi.common.model.WriteConcurrencyMode;
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.marker.MarkerType;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.table.timeline.TimelineMetadataUtils;
import org.apache.hudi.common.table.timeline.versioning.TimelineLayoutVersion;
import org.apache.hudi.common.table.view.FileSystemViewStorageConfig;
import org.apache.hudi.common.table.view.TableFileSystemView;
import org.apache.hudi.common.testutils.FileCreateUtils;
import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
import org.apache.hudi.common.testutils.HoodieTestTable;
import org.apache.hudi.common.testutils.HoodieTestUtils;
import org.apache.hudi.common.testutils.Transformations;
import org.apache.hudi.common.util.ClusteringUtils;
import org.apache.hudi.common.util.CollectionUtils;
import org.apache.hudi.common.util.FileFormatUtils;
import org.apache.hudi.common.util.FileIOUtils;
import org.apache.hudi.common.util.MarkerUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieArchivalConfig;
import org.apache.hudi.config.HoodieCleanConfig;
import org.apache.hudi.config.HoodieClusteringConfig;
import org.apache.hudi.config.HoodieCompactionConfig;
import org.apache.hudi.config.HoodieIndexConfig;
import org.apache.hudi.config.HoodieLockConfig;
import org.apache.hudi.config.HoodiePreCommitValidatorConfig;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.data.HoodieJavaRDD;
import org.apache.hudi.exception.HoodieClusteringException;
import org.apache.hudi.exception.HoodieCommitException;
import org.apache.hudi.exception.HoodieCorruptedDataException;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.exception.HoodieInsertException;
import org.apache.hudi.exception.HoodieUpsertException;
import org.apache.hudi.exception.HoodieValidationException;
import org.apache.hudi.exception.HoodieWriteConflictException;
import org.apache.hudi.execution.bulkinsert.RDDCustomColumnsSortPartitioner;
import org.apache.hudi.index.HoodieIndex;
import org.apache.hudi.io.HoodieMergeHandle;
import org.apache.hudi.io.storage.HoodieIOFactory;
import org.apache.hudi.keygen.BaseKeyGenerator;
import org.apache.hudi.keygen.KeyGenerator;
import org.apache.hudi.keygen.factory.HoodieSparkKeyGeneratorFactory;
import org.apache.hudi.storage.HoodieStorage;
import org.apache.hudi.storage.StorageConfiguration;
import org.apache.hudi.storage.StoragePath;
import org.apache.hudi.storage.StoragePathFilter;
import org.apache.hudi.table.HoodieSparkCopyOnWriteTable;
import org.apache.hudi.table.HoodieSparkTable;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.action.HoodieWriteMetadata;
import org.apache.hudi.table.action.cluster.ClusteringTestUtils;
import org.apache.hudi.table.action.commit.HoodieWriteHelper;
import org.apache.hudi.table.marker.WriteMarkersFactory;
import org.apache.hudi.testutils.Assertions;
import org.apache.hudi.testutils.HoodieClientTestBase;
import org.apache.hudi.testutils.HoodieClientTestUtils;
import org.apache.hudi.testutils.HoodieSparkWriteableTestTable;
import org.apache.hudi.utils.HoodieWriterClientTestHarness;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.jetbrains.annotations.NotNull;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;
import org.junit.jupiter.params.provider.ValueSource;
import org.mockito.Mockito;

@Tag(value="functional")
public class TestHoodieClientOnCopyOnWriteStorage
extends HoodieClientTestBase {
    private static final String CLUSTERING_FAILURE = "CLUSTERING FAILURE";
    private static final Map<String, String> STRATEGY_PARAMS = new HashMap<String, String>(){
        {
            this.put("sortColumn", "record_key");
        }
    };
    private HoodieTestTable testTable;
    private static final String COUNT_SQL_QUERY_FOR_VALIDATION = "select count(*) from <TABLE_NAME>";

    private static Stream<Arguments> smallInsertHandlingParams() {
        return Arrays.stream(new Boolean[][]{{true}, {false}}).map(Arguments::of);
    }

    private static Stream<Arguments> populateMetaFieldsParams() {
        return Arrays.stream(new Boolean[][]{{true}, {false}}).map(Arguments::of);
    }

    private static Stream<Arguments> rollbackFailedCommitsParams() {
        return Stream.of(Arguments.of((Object[])new Object[]{HoodieFailedWritesCleaningPolicy.LAZY, true}), Arguments.of((Object[])new Object[]{HoodieFailedWritesCleaningPolicy.NEVER, true}), Arguments.of((Object[])new Object[]{HoodieFailedWritesCleaningPolicy.NEVER, false}));
    }

    private static Stream<Arguments> rollbackAfterConsistencyCheckFailureParams() {
        return Stream.of(Arguments.of((Object[])new Object[]{true, true}), Arguments.of((Object[])new Object[]{true, false}), Arguments.of((Object[])new Object[]{false, true}), Arguments.of((Object[])new Object[]{false, false}));
    }

    @BeforeEach
    public void setUpTestTable() {
        this.testTable = HoodieSparkWriteableTestTable.of((HoodieTableMetaClient)this.metaClient);
    }

    @ParameterizedTest
    @MethodSource(value={"populateMetaFieldsParams"})
    public void testAutoCommitOnInsert(boolean populateMetaFields) throws Exception {
        this.testAutoCommit((HoodieWriterClientTestHarness.Function3<JavaRDD<WriteStatus>, SparkRDDWriteClient, JavaRDD<HoodieRecord>, String>)((HoodieWriterClientTestHarness.Function3)SparkRDDWriteClient::insert), false, populateMetaFields);
    }

    @Test
    public void testAutoCommitOnInsertPrepped() throws Exception {
        this.testAutoCommit((HoodieWriterClientTestHarness.Function3<JavaRDD<WriteStatus>, SparkRDDWriteClient, JavaRDD<HoodieRecord>, String>)((HoodieWriterClientTestHarness.Function3)SparkRDDWriteClient::insertPreppedRecords), true, true);
    }

    @ParameterizedTest
    @MethodSource(value={"populateMetaFieldsParams"})
    public void testAutoCommitOnUpsert(boolean populateMetaFields) throws Exception {
        this.testAutoCommit((HoodieWriterClientTestHarness.Function3<JavaRDD<WriteStatus>, SparkRDDWriteClient, JavaRDD<HoodieRecord>, String>)((HoodieWriterClientTestHarness.Function3)SparkRDDWriteClient::upsert), false, populateMetaFields);
    }

    @ParameterizedTest
    @MethodSource(value={"populateMetaFieldsParams"})
    public void testAutoCommitOnUpsertPrepped(boolean populateMetaFields) throws Exception {
        this.testAutoCommit((HoodieWriterClientTestHarness.Function3<JavaRDD<WriteStatus>, SparkRDDWriteClient, JavaRDD<HoodieRecord>, String>)((HoodieWriterClientTestHarness.Function3)SparkRDDWriteClient::upsertPreppedRecords), true, populateMetaFields);
    }

    @ParameterizedTest
    @MethodSource(value={"populateMetaFieldsParams"})
    public void testAutoCommitOnBulkInsert(boolean populateMetaFields) throws Exception {
        this.testAutoCommit((HoodieWriterClientTestHarness.Function3<JavaRDD<WriteStatus>, SparkRDDWriteClient, JavaRDD<HoodieRecord>, String>)((HoodieWriterClientTestHarness.Function3)SparkRDDWriteClient::bulkInsert), false, populateMetaFields);
    }

    @Test
    public void testAutoCommitOnBulkInsertPrepped() throws Exception {
        this.testAutoCommit((HoodieWriterClientTestHarness.Function3<JavaRDD<WriteStatus>, SparkRDDWriteClient, JavaRDD<HoodieRecord>, String>)((HoodieWriterClientTestHarness.Function3)(writeClient, recordRDD, instantTime) -> writeClient.bulkInsertPreppedRecords(recordRDD, instantTime, Option.empty())), true, true);
    }

    private void testAutoCommit(HoodieWriterClientTestHarness.Function3<JavaRDD<WriteStatus>, SparkRDDWriteClient, JavaRDD<HoodieRecord>, String> writeFn, boolean isPrepped, boolean populateMetaFields) throws Exception {
        HoodieWriteConfig.Builder cfgBuilder = this.getConfigBuilder().withAutoCommit(false);
        this.addConfigsForPopulateMetaFields(cfgBuilder, populateMetaFields);
        try (SparkRDDWriteClient client = this.getHoodieWriteClient(cfgBuilder.build());){
            String prevCommitTime = "000";
            String newCommitTime = "001";
            int numRecords = 200;
            JavaRDD<WriteStatus> result = this.insertFirstBatch(cfgBuilder.build(), client, newCommitTime, prevCommitTime, numRecords, writeFn, isPrepped, false, numRecords);
            org.junit.jupiter.api.Assertions.assertFalse((boolean)this.testTable.commitExists(newCommitTime), (String)"If Autocommit is false, then commit should not be made automatically");
            org.junit.jupiter.api.Assertions.assertTrue((boolean)client.commit(newCommitTime, result), (String)"Commit should succeed");
            org.junit.jupiter.api.Assertions.assertTrue((boolean)this.testTable.commitExists(newCommitTime), (String)"After explicit commit, commit file should be created");
        }
    }

    @Test
    public void testPreCommitValidatorsOnInsert() throws Exception {
        int numRecords = 200;
        HoodiePreCommitValidatorConfig validatorConfig = HoodiePreCommitValidatorConfig.newBuilder().withPreCommitValidator(SqlQuerySingleResultPreCommitValidator.class.getName()).withPrecommitValidatorSingleResultSqlQueries("select count(*) from <TABLE_NAME>#" + numRecords).build();
        HoodieWriteConfig config = this.getConfigBuilder().withAutoCommit(true).withPreCommitValidatorConfig(validatorConfig).build();
        try (SparkRDDWriteClient client = this.getHoodieWriteClient(config);){
            HoodieWriterClientTestHarness.Function3 writeFn = (writeClient, recordRDD, instantTime) -> writeClient.bulkInsert(recordRDD, instantTime, Option.empty());
            String newCommitTime = HoodieActiveTimeline.createNewInstantTime();
            JavaRDD<WriteStatus> result = this.insertFirstBatch(config, client, newCommitTime, "000", numRecords, (HoodieWriterClientTestHarness.Function3<JavaRDD<WriteStatus>, SparkRDDWriteClient, JavaRDD<HoodieRecord>, String>)writeFn, false, false, numRecords);
            org.junit.jupiter.api.Assertions.assertTrue((boolean)this.testTable.commitExists(newCommitTime));
        }
    }

    @Test
    public void testPreCommitValidationFailureOnInsert() throws Exception {
        String newCommitTime;
        block14: {
            int numRecords = 200;
            HoodiePreCommitValidatorConfig validatorConfig = HoodiePreCommitValidatorConfig.newBuilder().withPreCommitValidator(SqlQuerySingleResultPreCommitValidator.class.getName()).withPrecommitValidatorSingleResultSqlQueries("select count(*) from <TABLE_NAME>#500").build();
            HoodieWriteConfig config = this.getConfigBuilder().withPreCommitValidatorConfig(validatorConfig).build();
            newCommitTime = HoodieActiveTimeline.createNewInstantTime();
            try (SparkRDDWriteClient client = this.getHoodieWriteClient(config);){
                HoodieWriterClientTestHarness.Function3 writeFn = (writeClient, recordRDD, instantTime) -> writeClient.bulkInsert(recordRDD, instantTime, Option.empty());
                JavaRDD<WriteStatus> result = this.insertFirstBatch(config, client, newCommitTime, "000", numRecords, (HoodieWriterClientTestHarness.Function3<JavaRDD<WriteStatus>, SparkRDDWriteClient, JavaRDD<HoodieRecord>, String>)writeFn, false, false, numRecords);
                org.junit.jupiter.api.Assertions.fail((String)"Expected validation to fail because we only insert 200 rows. Validation is configured to expect 500 rows");
            }
            catch (HoodieInsertException e) {
                if (e.getCause() instanceof HoodieValidationException) break block14;
                throw e;
            }
        }
        org.junit.jupiter.api.Assertions.assertFalse((boolean)this.testTable.commitExists(newCommitTime));
    }

    @Test
    public void testPreCommitValidationWithMultipleInflights() throws Exception {
        String instant1;
        HoodieWriteConfig config;
        HoodiePreCommitValidatorConfig validatorConfig;
        int numRecords;
        block2: {
            numRecords = 200;
            validatorConfig = HoodiePreCommitValidatorConfig.newBuilder().withPreCommitValidator(SqlQuerySingleResultPreCommitValidator.class.getName()).withPrecommitValidatorSingleResultSqlQueries("select count(*) from <TABLE_NAME>#500").build();
            config = this.getConfigBuilder().withCleanConfig(HoodieCleanConfig.newBuilder().withFailedWritesCleaningPolicy(HoodieFailedWritesCleaningPolicy.NEVER).build()).withPreCommitValidatorConfig(validatorConfig).build();
            instant1 = HoodieActiveTimeline.createNewInstantTime();
            try {
                this.insertWithConfig(config, numRecords, instant1);
                org.junit.jupiter.api.Assertions.fail((String)"Expected validation to fail because we only insert 200 rows. Validation is configured to expect 500 rows");
            }
            catch (HoodieInsertException e) {
                if (e.getCause() instanceof HoodieValidationException) break block2;
                throw e;
            }
        }
        org.junit.jupiter.api.Assertions.assertFalse((boolean)this.testTable.commitExists(instant1));
        org.junit.jupiter.api.Assertions.assertTrue((boolean)this.testTable.inflightCommitExists(instant1));
        numRecords = 300;
        validatorConfig = HoodiePreCommitValidatorConfig.newBuilder().withPreCommitValidator(SqlQuerySingleResultPreCommitValidator.class.getName()).withPrecommitValidatorSingleResultSqlQueries("select count(*) from <TABLE_NAME>#" + numRecords).build();
        config = this.getConfigBuilder().withCleanConfig(HoodieCleanConfig.newBuilder().withFailedWritesCleaningPolicy(HoodieFailedWritesCleaningPolicy.NEVER).build()).withPreCommitValidatorConfig(validatorConfig).build();
        String instant2 = HoodieActiveTimeline.createNewInstantTime();
        this.insertWithConfig(config, numRecords, instant2);
        org.junit.jupiter.api.Assertions.assertTrue((boolean)this.testTable.inflightCommitExists(instant1));
        org.junit.jupiter.api.Assertions.assertTrue((boolean)this.testTable.commitExists(instant2));
    }

    private void insertWithConfig(HoodieWriteConfig config, int numRecords, String instant) throws Exception {
        try (SparkRDDWriteClient client = this.getHoodieWriteClient(config);){
            HoodieWriterClientTestHarness.Function3 writeFn = (writeClient, recordRDD, instantTime) -> writeClient.bulkInsert(recordRDD, instantTime, Option.empty());
            JavaRDD<WriteStatus> javaRDD = this.insertFirstBatch(config, client, instant, "000", numRecords, (HoodieWriterClientTestHarness.Function3<JavaRDD<WriteStatus>, SparkRDDWriteClient, JavaRDD<HoodieRecord>, String>)writeFn, false, false, numRecords);
        }
    }

    @ParameterizedTest
    @MethodSource(value={"populateMetaFieldsParams"})
    public void testDeduplicationOnInsert(boolean populateMetaFields) throws Exception {
        this.testDeduplication((HoodieWriterClientTestHarness.Function3<JavaRDD<WriteStatus>, SparkRDDWriteClient, JavaRDD<HoodieRecord>, String>)((HoodieWriterClientTestHarness.Function3)SparkRDDWriteClient::insert), populateMetaFields);
    }

    @ParameterizedTest
    @MethodSource(value={"populateMetaFieldsParams"})
    public void testDeduplicationKeepOperationFieldOnInsert(boolean populateMetaFields) throws Exception {
        this.testDeduplicationKeepOperation((HoodieWriterClientTestHarness.Function3<JavaRDD<WriteStatus>, SparkRDDWriteClient, JavaRDD<HoodieRecord>, String>)((HoodieWriterClientTestHarness.Function3)SparkRDDWriteClient::insert), populateMetaFields);
    }

    @ParameterizedTest
    @MethodSource(value={"populateMetaFieldsParams"})
    public void testDeduplicationOnBulkInsert(boolean populateMetaFields) throws Exception {
        this.testDeduplication((HoodieWriterClientTestHarness.Function3<JavaRDD<WriteStatus>, SparkRDDWriteClient, JavaRDD<HoodieRecord>, String>)((HoodieWriterClientTestHarness.Function3)SparkRDDWriteClient::bulkInsert), populateMetaFields);
    }

    @Test
    public void testDeduplicationOnUpsert() throws Exception {
        this.testDeduplication((HoodieWriterClientTestHarness.Function3<JavaRDD<WriteStatus>, SparkRDDWriteClient, JavaRDD<HoodieRecord>, String>)((HoodieWriterClientTestHarness.Function3)SparkRDDWriteClient::upsert), true);
    }

    private void testDeduplication(HoodieWriterClientTestHarness.Function3<JavaRDD<WriteStatus>, SparkRDDWriteClient, JavaRDD<HoodieRecord>, String> writeFn, boolean populateMetaFields) throws Exception {
        String newCommitTime = "001";
        String recordKey = UUID.randomUUID().toString();
        HoodieKey keyOne = new HoodieKey(recordKey, "2018-01-01");
        HoodieAvroRecord recordOne = new HoodieAvroRecord(keyOne, (HoodieRecordPayload)this.dataGen.generateRandomValue(keyOne, newCommitTime));
        HoodieKey keyTwo = new HoodieKey(recordKey, "2018-02-01");
        HoodieAvroRecord recordTwo = new HoodieAvroRecord(keyTwo, (HoodieRecordPayload)this.dataGen.generateRandomValue(keyTwo, newCommitTime));
        HoodieAvroRecord recordThree = new HoodieAvroRecord(keyTwo, (HoodieRecordPayload)this.dataGen.generateRandomValue(keyTwo, newCommitTime));
        HoodieJavaRDD records = HoodieJavaRDD.of((JavaRDD)this.jsc.parallelize(Arrays.asList(recordOne, recordTwo, recordThree), 1));
        HoodieWriteConfig.Builder configBuilder = this.getConfigBuilder(HoodieFailedWritesCleaningPolicy.LAZY).combineInput(true, true);
        this.addConfigsForPopulateMetaFields(configBuilder, populateMetaFields);
        HoodieWriteConfig writeConfig = configBuilder.build();
        HoodieIndex index = (HoodieIndex)Mockito.mock(HoodieIndex.class);
        Mockito.when((Object)index.isGlobal()).thenReturn((Object)true);
        int dedupParallelism = records.getNumPartitions() + 2;
        HoodieData dedupedRecsRdd = HoodieWriteHelper.newInstance().deduplicateRecords((HoodieData)records, index, dedupParallelism, writeConfig.getSchema(), writeConfig.getProps(), (HoodieRecordMerger)HoodiePreCombineAvroRecordMerger.INSTANCE);
        List dedupedRecs = dedupedRecsRdd.collectAsList();
        org.junit.jupiter.api.Assertions.assertEquals((int)dedupParallelism, (int)dedupedRecsRdd.getNumPartitions());
        org.junit.jupiter.api.Assertions.assertEquals((int)1, (int)dedupedRecs.size());
        org.junit.jupiter.api.Assertions.assertEquals((Object)((HoodieRecord)dedupedRecs.get(0)).getPartitionPath(), (Object)recordThree.getPartitionPath());
        TestHoodieClientOnCopyOnWriteStorage.assertNodupesWithinPartition((List)dedupedRecs);
        index = (HoodieIndex)Mockito.mock(HoodieIndex.class);
        Mockito.when((Object)index.isGlobal()).thenReturn((Object)false);
        dedupedRecsRdd = HoodieWriteHelper.newInstance().deduplicateRecords((HoodieData)records, index, dedupParallelism, writeConfig.getSchema(), writeConfig.getProps(), (HoodieRecordMerger)HoodiePreCombineAvroRecordMerger.INSTANCE);
        dedupedRecs = dedupedRecsRdd.collectAsList();
        org.junit.jupiter.api.Assertions.assertEquals((int)dedupParallelism, (int)dedupedRecsRdd.getNumPartitions());
        org.junit.jupiter.api.Assertions.assertEquals((int)2, (int)dedupedRecs.size());
        TestHoodieClientOnCopyOnWriteStorage.assertNodupesWithinPartition((List)dedupedRecs);
        JavaRDD recordList = this.jsc.parallelize(Arrays.asList(recordOne, recordTwo, recordThree), 1);
        try (SparkRDDWriteClient client = this.getHoodieWriteClient(writeConfig);){
            client.startCommitWithTime(newCommitTime);
            List statuses = ((JavaRDD)writeFn.apply((Object)client, (Object)recordList, (Object)newCommitTime)).collect();
            Assertions.assertNoWriteErrors((List)statuses);
            org.junit.jupiter.api.Assertions.assertEquals((int)2, (int)statuses.size());
            this.assertNoDuplicatesInPartition(statuses.stream().map(WriteStatus::getWrittenRecordDelegates).flatMap(Collection::stream).collect(Collectors.toList()));
        }
    }

    private void testDeduplicationKeepOperation(HoodieWriterClientTestHarness.Function3<JavaRDD<WriteStatus>, SparkRDDWriteClient, JavaRDD<HoodieRecord>, String> writeFn, boolean populateMetaFields) throws Exception {
        String newCommitTime = "001";
        String recordKey = UUID.randomUUID().toString();
        HoodieKey keyOne = new HoodieKey(recordKey, "2018-01-01");
        HoodieAvroRecord recordOne = new HoodieAvroRecord(keyOne, (HoodieRecordPayload)this.dataGen.generateRandomValue(keyOne, newCommitTime), HoodieOperation.INSERT);
        HoodieKey keyTwo = new HoodieKey(recordKey, "2018-02-01");
        HoodieAvroRecord recordTwo = new HoodieAvroRecord(keyTwo, (HoodieRecordPayload)this.dataGen.generateRandomValue(keyTwo, newCommitTime), HoodieOperation.INSERT);
        HoodieAvroRecord recordThree = new HoodieAvroRecord(keyTwo, (HoodieRecordPayload)this.dataGen.generateRandomValue(keyTwo, newCommitTime), HoodieOperation.UPDATE_AFTER);
        HoodieJavaRDD records = HoodieJavaRDD.of((JavaRDD)this.jsc.parallelize(Arrays.asList(recordOne, recordTwo, recordThree), 1));
        HoodieWriteConfig.Builder configBuilder = this.getConfigBuilder(HoodieFailedWritesCleaningPolicy.LAZY).withAllowOperationMetadataField(true).combineInput(true, true);
        this.addConfigsForPopulateMetaFields(configBuilder, populateMetaFields);
        HoodieWriteConfig writeConfig = configBuilder.build();
        HoodieIndex index = (HoodieIndex)Mockito.mock(HoodieIndex.class);
        Mockito.when((Object)index.isGlobal()).thenReturn((Object)true);
        int dedupParallelism = records.getNumPartitions() + 100;
        HoodieData dedupedRecsRdd = HoodieWriteHelper.newInstance().deduplicateRecords((HoodieData)records, index, dedupParallelism, writeConfig.getSchema(), writeConfig.getProps(), (HoodieRecordMerger)HoodiePreCombineAvroRecordMerger.INSTANCE);
        List dedupedRecs = dedupedRecsRdd.collectAsList();
        org.junit.jupiter.api.Assertions.assertEquals((Object)((HoodieRecord)dedupedRecs.get(0)).getOperation(), (Object)recordThree.getOperation());
        JavaRDD recordList = this.jsc.parallelize(Arrays.asList(recordOne, recordTwo, recordThree), 1);
        try (SparkRDDWriteClient client = this.getHoodieWriteClient(writeConfig);){
            client.startCommitWithTime(newCommitTime);
            List statuses = ((JavaRDD)writeFn.apply((Object)client, (Object)recordList, (Object)newCommitTime)).collect();
            Assertions.assertNoWriteErrors((List)statuses);
            org.junit.jupiter.api.Assertions.assertEquals((int)2, (int)statuses.size());
            this.assertNoDuplicatesInPartition(statuses.stream().map(WriteStatus::getWrittenRecordDelegates).flatMap(Collection::stream).collect(Collectors.toList()));
        }
    }

    void assertNoDuplicatesInPartition(List<HoodieRecordDelegate> recordDelegates) {
        HashMap partitionToKeys = new HashMap();
        for (HoodieRecordDelegate r : recordDelegates) {
            String recordKey = r.getRecordKey();
            String partitionPath = r.getPartitionPath();
            if (!partitionToKeys.containsKey(partitionPath)) {
                partitionToKeys.put(partitionPath, new HashSet());
            }
            org.junit.jupiter.api.Assertions.assertFalse((boolean)((Set)partitionToKeys.get(partitionPath)).contains(recordKey), (String)("key " + recordKey + " is duplicate within partition " + partitionPath));
            ((Set)partitionToKeys.get(partitionPath)).add(recordKey);
        }
    }

    @ParameterizedTest
    @MethodSource(value={"populateMetaFieldsParams"})
    public void testUpserts(boolean populateMetaFields) throws Exception {
        HoodieWriteConfig.Builder cfgBuilder = this.getConfigBuilder().withRollbackUsingMarkers(true);
        this.addConfigsForPopulateMetaFields(cfgBuilder, populateMetaFields);
        this.testUpsertsInternal(cfgBuilder.build(), (HoodieWriterClientTestHarness.Function3<JavaRDD<WriteStatus>, SparkRDDWriteClient, JavaRDD<HoodieRecord>, String>)((HoodieWriterClientTestHarness.Function3)SparkRDDWriteClient::upsert), false);
    }

    @Test
    public void testUpsertsPrepped() throws Exception {
        HoodieWriteConfig.Builder cfgBuilder = this.getConfigBuilder().withRollbackUsingMarkers(true);
        this.addConfigsForPopulateMetaFields(cfgBuilder, true);
        this.testUpsertsInternal(cfgBuilder.build(), (HoodieWriterClientTestHarness.Function3<JavaRDD<WriteStatus>, SparkRDDWriteClient, JavaRDD<HoodieRecord>, String>)((HoodieWriterClientTestHarness.Function3)SparkRDDWriteClient::upsertPreppedRecords), true);
    }

    private void testUpsertsInternal(HoodieWriteConfig config, HoodieWriterClientTestHarness.Function3<JavaRDD<WriteStatus>, SparkRDDWriteClient, JavaRDD<HoodieRecord>, String> writeFn, boolean isPrepped) throws Exception {
        HoodieWriteConfig hoodieWriteConfig = this.getConfigBuilder(HoodieFailedWritesCleaningPolicy.LAZY).withRollbackUsingMarkers(true).withProps((Map)config.getProps()).withTimelineLayoutVersion(TimelineLayoutVersion.VERSION_0.intValue()).build();
        HoodieTableMetaClient.withPropertyBuilder().fromMetaClient(this.metaClient).setTimelineLayoutVersion(TimelineLayoutVersion.VERSION_0).setPopulateMetaFields(config.populateMetaFields()).initTable(this.metaClient.getStorageConf().newInstance(), this.metaClient.getBasePath());
        SparkRDDWriteClient client = this.getHoodieWriteClient(hoodieWriteConfig);
        String newCommitTime = "001";
        String initCommitTime = "000";
        int numRecords = 200;
        this.insertFirstBatch(hoodieWriteConfig, client, newCommitTime, initCommitTime, numRecords, (HoodieWriterClientTestHarness.Function3<JavaRDD<WriteStatus>, SparkRDDWriteClient, JavaRDD<HoodieRecord>, String>)((HoodieWriterClientTestHarness.Function3)SparkRDDWriteClient::insert), isPrepped, true, numRecords, config.populateMetaFields());
        String prevCommitTime = newCommitTime;
        newCommitTime = "004";
        numRecords = 100;
        String commitTimeBetweenPrevAndNew = "002";
        this.updateBatch(hoodieWriteConfig, client, newCommitTime, prevCommitTime, (Option<List<String>>)Option.of(Arrays.asList(commitTimeBetweenPrevAndNew)), initCommitTime, numRecords, writeFn, isPrepped, true, numRecords, 200, 2, config.populateMetaFields());
        prevCommitTime = newCommitTime;
        newCommitTime = "005";
        numRecords = 50;
        this.deleteBatch(hoodieWriteConfig, client, newCommitTime, prevCommitTime, initCommitTime, numRecords, isPrepped, true, 0, 150, config.populateMetaFields());
        HoodieWriteConfig newConfig = this.getConfigBuilder().withProps((Map)config.getProps()).withTimelineLayoutVersion(TimelineLayoutVersion.CURR_VERSION.intValue()).build();
        client = this.getHoodieWriteClient(newConfig);
        client.savepoint("004", "user1", "comment1");
        client.restoreToInstant("004", config.isMetadataTableEnabled());
        org.junit.jupiter.api.Assertions.assertFalse((boolean)this.metaClient.reloadActiveTimeline().getRollbackTimeline().lastInstant().isPresent());
        client.deleteSavepoint("004");
        org.junit.jupiter.api.Assertions.assertFalse((boolean)this.metaClient.reloadActiveTimeline().getSavePointTimeline().containsInstant("004"));
        String[] fullPartitionPaths = new String[this.dataGen.getPartitionPaths().length];
        for (int i = 0; i < fullPartitionPaths.length; ++i) {
            fullPartitionPaths[i] = String.format("%s/%s/*", this.basePath, this.dataGen.getPartitionPaths()[i]);
        }
        org.junit.jupiter.api.Assertions.assertEquals((long)200L, (long)HoodieClientTestUtils.read(this.jsc, this.basePath, this.sqlContext, this.storage, fullPartitionPaths).count(), (String)"Must contain 200 records");
        prevCommitTime = newCommitTime;
        newCommitTime = "006";
        numRecords = 50;
        this.deleteBatch(newConfig, client, newCommitTime, prevCommitTime, initCommitTime, numRecords, isPrepped, true, 0, 150);
        HoodieActiveTimeline activeTimeline = new HoodieActiveTimeline(this.metaClient, false);
        List instants = activeTimeline.getCommitAndReplaceTimeline().getInstants();
        org.junit.jupiter.api.Assertions.assertEquals((int)5, (int)instants.size());
        org.junit.jupiter.api.Assertions.assertEquals((Object)new HoodieInstant(HoodieInstant.State.COMPLETED, "commit", "001"), instants.get(0));
        org.junit.jupiter.api.Assertions.assertEquals((Object)new HoodieInstant(HoodieInstant.State.COMPLETED, "commit", "004"), instants.get(1));
        org.junit.jupiter.api.Assertions.assertEquals((Object)new HoodieInstant(HoodieInstant.State.REQUESTED, "commit", "006"), instants.get(2));
        org.junit.jupiter.api.Assertions.assertEquals((Object)new HoodieInstant(HoodieInstant.State.INFLIGHT, "commit", "006"), instants.get(3));
        org.junit.jupiter.api.Assertions.assertEquals((Object)new HoodieInstant(HoodieInstant.State.COMPLETED, "commit", "006"), instants.get(4));
        HoodieWriteConfig cfg = hoodieWriteConfig;
        String instantTime = "007";
        HoodieTableMetaClient metaClient = HoodieClientTestUtils.createMetaClient(this.jsc, this.basePath);
        String basePathStr = this.basePath;
        HoodieSparkTable table = this.getHoodieTable(metaClient, cfg);
        String extension = metaClient.getTableConfig().getBaseFileFormat().getFileExtension();
        this.jsc.parallelize(Arrays.asList(1)).map(arg_0 -> TestHoodieClientOnCopyOnWriteStorage.lambda$testUpsertsInternal$57b26ee6$1(metaClient, extension, basePathStr, cfg, (HoodieTable)table, config, arg_0)).collect();
    }

    @Test
    public void testRestoreWithSavepointBeyondArchival() throws Exception {
        HoodieWriteConfig config = this.getConfigBuilder().withRollbackUsingMarkers(true).build();
        HoodieWriteConfig hoodieWriteConfig = this.getConfigBuilder(HoodieFailedWritesCleaningPolicy.EAGER).withRollbackUsingMarkers(true).withArchivalConfig(HoodieArchivalConfig.newBuilder().withArchiveBeyondSavepoint(true).build()).withProps((Map)config.getProps()).withTimelineLayoutVersion(TimelineLayoutVersion.VERSION_0.intValue()).build();
        HoodieTableMetaClient.withPropertyBuilder().fromMetaClient(this.metaClient).setTimelineLayoutVersion(TimelineLayoutVersion.VERSION_0).setPopulateMetaFields(config.populateMetaFields()).initTable(this.metaClient.getStorageConf().newInstance(), this.metaClient.getBasePath());
        SparkRDDWriteClient client = this.getHoodieWriteClient(hoodieWriteConfig);
        String newCommitTime = "001";
        String initCommitTime = "000";
        int numRecords = 200;
        this.insertFirstBatch(hoodieWriteConfig, client, newCommitTime, initCommitTime, numRecords, (HoodieWriterClientTestHarness.Function3<JavaRDD<WriteStatus>, SparkRDDWriteClient, JavaRDD<HoodieRecord>, String>)((HoodieWriterClientTestHarness.Function3)SparkRDDWriteClient::insert), false, true, numRecords, config.populateMetaFields());
        String prevCommitTime = newCommitTime;
        newCommitTime = "004";
        numRecords = 100;
        String commitTimeBetweenPrevAndNew = "002";
        this.updateBatch(hoodieWriteConfig, client, newCommitTime, prevCommitTime, (Option<List<String>>)Option.of(Arrays.asList(commitTimeBetweenPrevAndNew)), initCommitTime, numRecords, (HoodieWriterClientTestHarness.Function3<JavaRDD<WriteStatus>, SparkRDDWriteClient, JavaRDD<HoodieRecord>, String>)((HoodieWriterClientTestHarness.Function3)SparkRDDWriteClient::upsert), false, true, numRecords, 200, 2, config.populateMetaFields());
        prevCommitTime = newCommitTime;
        newCommitTime = "005";
        numRecords = 50;
        this.deleteBatch(hoodieWriteConfig, client, newCommitTime, prevCommitTime, initCommitTime, numRecords, false, true, 0, 150, config.populateMetaFields());
        HoodieWriteConfig newConfig = this.getConfigBuilder().withProps((Map)config.getProps()).withTimelineLayoutVersion(TimelineLayoutVersion.CURR_VERSION.intValue()).withArchivalConfig(HoodieArchivalConfig.newBuilder().withArchiveBeyondSavepoint(true).build()).build();
        client = this.getHoodieWriteClient(newConfig);
        client.savepoint("004", "user1", "comment1");
        SparkRDDWriteClient finalClient = client;
        org.junit.jupiter.api.Assertions.assertThrows(IllegalArgumentException.class, () -> finalClient.restoreToSavepoint("004"), (String)("Restore should not be supported when " + HoodieArchivalConfig.ARCHIVE_BEYOND_SAVEPOINT.key() + " is enabled"));
    }

    @ParameterizedTest
    @MethodSource(value={"populateMetaFieldsParams"})
    public void testInsertsWithHoodieConcatHandle(boolean populateMetaFields) throws Exception {
        HoodieWriteConfig.Builder cfgBuilder = this.getConfigBuilder();
        this.addConfigsForPopulateMetaFields(cfgBuilder, populateMetaFields);
        this.testHoodieConcatHandle(cfgBuilder.build(), false);
    }

    @Test
    public void testInsertsPreppedWithHoodieConcatHandle() throws Exception {
        HoodieWriteConfig.Builder cfgBuilder = this.getConfigBuilder();
        this.addConfigsForPopulateMetaFields(cfgBuilder, true);
        this.testHoodieConcatHandle(cfgBuilder.build(), true);
    }

    private void testHoodieConcatHandle(HoodieWriteConfig config, boolean isPrepped) throws Exception {
        HoodieWriteConfig hoodieWriteConfig = this.getConfigBuilder().withProps((Map)config.getProps()).withMergeAllowDuplicateOnInserts(true).withTimelineLayoutVersion(TimelineLayoutVersion.VERSION_0.intValue()).build();
        HoodieTableMetaClient.withPropertyBuilder().fromMetaClient(this.metaClient).setTimelineLayoutVersion(TimelineLayoutVersion.VERSION_0).initTable(this.metaClient.getStorageConf().newInstance(), this.metaClient.getBasePath());
        SparkRDDWriteClient client = this.getHoodieWriteClient(hoodieWriteConfig);
        String newCommitTime = "001";
        String initCommitTime = "000";
        int numRecords = 200;
        this.insertFirstBatch(hoodieWriteConfig, client, newCommitTime, initCommitTime, numRecords, (HoodieWriterClientTestHarness.Function3<JavaRDD<WriteStatus>, SparkRDDWriteClient, JavaRDD<HoodieRecord>, String>)((HoodieWriterClientTestHarness.Function3)SparkRDDWriteClient::insert), isPrepped, true, numRecords, config.populateMetaFields());
        String prevCommitTime = newCommitTime;
        newCommitTime = "004";
        numRecords = 100;
        String commitTimeBetweenPrevAndNew = "002";
        HoodieWriterClientTestHarness.Function2<List<HoodieRecord>, String, Integer> recordGenFunction = this.generateWrapRecordsFn(isPrepped, hoodieWriteConfig, (HoodieWriterClientTestHarness.Function2<List<HoodieRecord>, String, Integer>)((HoodieWriterClientTestHarness.Function2)(arg_0, arg_1) -> ((HoodieTestDataGenerator)this.dataGen).generateUniqueUpdates(arg_0, arg_1)));
        this.writeBatch(client, newCommitTime, prevCommitTime, (Option<List<String>>)Option.of(Arrays.asList(commitTimeBetweenPrevAndNew)), initCommitTime, numRecords, recordGenFunction, (HoodieWriterClientTestHarness.Function3<JavaRDD<WriteStatus>, SparkRDDWriteClient, JavaRDD<HoodieRecord>, String>)((HoodieWriterClientTestHarness.Function3)SparkRDDWriteClient::insert), true, numRecords, 300, 2, false, config.populateMetaFields());
    }

    @Test
    public void testInsertsWithHoodieConcatHandleOnDuplicateIncomingKeys() throws Exception {
        HoodieWriteConfig.Builder cfgBuilder = this.getConfigBuilder();
        this.testHoodieConcatHandleOnDupInserts(cfgBuilder.build(), false);
    }

    @Test
    public void testInsertsPreppedWithHoodieConcatHandleOnDuplicateIncomingKeys() throws Exception {
        HoodieWriteConfig.Builder cfgBuilder = this.getConfigBuilder();
        this.testHoodieConcatHandleOnDupInserts(cfgBuilder.build(), true);
    }

    private void testHoodieConcatHandleOnDupInserts(HoodieWriteConfig config, boolean isPrepped) throws Exception {
        HoodieWriteConfig hoodieWriteConfig = this.getConfigBuilder().withProps((Map)config.getProps()).withMergeAllowDuplicateOnInserts(true).build();
        SparkRDDWriteClient client = this.getHoodieWriteClient(hoodieWriteConfig);
        String initCommitTime = "000";
        String newCommitTime = "001";
        int firstInsertRecords = 50;
        this.insertFirstBatch(hoodieWriteConfig, client, newCommitTime, initCommitTime, firstInsertRecords, (HoodieWriterClientTestHarness.Function3<JavaRDD<WriteStatus>, SparkRDDWriteClient, JavaRDD<HoodieRecord>, String>)((HoodieWriterClientTestHarness.Function3)SparkRDDWriteClient::insert), isPrepped, true, firstInsertRecords, config.populateMetaFields());
        String prevCommitTime = newCommitTime;
        newCommitTime = "004";
        int secondInsertRecords = 100;
        List<String> commitTimesBetweenPrevAndNew = Arrays.asList("002", "003");
        HoodieWriterClientTestHarness.Function2<List<HoodieRecord>, String, Integer> recordGenFunction = this.generateWrapRecordsFn(isPrepped, hoodieWriteConfig, (HoodieWriterClientTestHarness.Function2<List<HoodieRecord>, String, Integer>)((HoodieWriterClientTestHarness.Function2)(arg_0, arg_1) -> ((HoodieTestDataGenerator)this.dataGen).generateUpdates(arg_0, arg_1)));
        this.writeBatch(client, newCommitTime, prevCommitTime, (Option<List<String>>)Option.of(commitTimesBetweenPrevAndNew), initCommitTime, secondInsertRecords, recordGenFunction, (HoodieWriterClientTestHarness.Function3<JavaRDD<WriteStatus>, SparkRDDWriteClient, JavaRDD<HoodieRecord>, String>)((HoodieWriterClientTestHarness.Function3)SparkRDDWriteClient::insert), true, secondInsertRecords, firstInsertRecords + secondInsertRecords, 2, false, config.populateMetaFields());
    }

    @Test
    public void testBulkInsertWithCustomPartitioner() {
        HoodieWriteConfig config = this.getConfigBuilder().withRollbackUsingMarkers(true).build();
        try (SparkRDDWriteClient client = this.getHoodieWriteClient(config);){
            String commitTime1 = "001";
            client.startCommitWithTime("001");
            List inserts1 = this.dataGen.generateInserts("001", Integer.valueOf(100));
            JavaRDD insertRecordsRDD1 = this.jsc.parallelize(inserts1, 10);
            RDDCustomColumnsSortPartitioner partitioner = new RDDCustomColumnsSortPartitioner(new String[]{"rider"}, HoodieTestDataGenerator.AVRO_SCHEMA, config);
            List statuses = client.bulkInsert(insertRecordsRDD1, "001", Option.of((Object)partitioner)).collect();
            Assertions.assertNoWriteErrors((List)statuses);
        }
    }

    @Test
    public void testPendingRestore() throws IOException {
        List statuses;
        HoodieWriteConfig config = this.getConfigBuilder().withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(false).build()).build();
        StoragePath completeRestoreFile = null;
        StoragePath backupCompletedRestoreFile = null;
        try (SparkRDDWriteClient client = this.getHoodieWriteClient(config);){
            String commitTime1 = "001";
            client.startCommitWithTime("001");
            List inserts1 = this.dataGen.generateInserts("001", Integer.valueOf(100));
            JavaRDD insertRecordsRDD1 = this.jsc.parallelize(inserts1, 2);
            statuses = client.insert(insertRecordsRDD1, "001").collect();
            Assertions.assertNoWriteErrors((List)statuses);
            client.savepoint("001", "user1", "comment1");
            client.restoreToInstant("001", false);
            HoodieInstant restoreCompleted = (HoodieInstant)this.metaClient.reloadActiveTimeline().getRestoreTimeline().filterCompletedInstants().getInstants().get(0);
            completeRestoreFile = new StoragePath(config.getBasePath() + "/" + ".hoodie" + "/" + restoreCompleted.getTimestamp() + "." + "restore");
            backupCompletedRestoreFile = new StoragePath(config.getBasePath() + "/" + ".hoodie" + "/" + restoreCompleted.getTimestamp() + "." + "restore" + ".backup");
            this.metaClient.getStorage().rename(completeRestoreFile, backupCompletedRestoreFile);
        }
        client = this.getHoodieWriteClient(config);
        var5_5 = null;
        try {
            String commitTime2 = "002";
            org.junit.jupiter.api.Assertions.assertThrows(IllegalArgumentException.class, () -> client.startCommitWithTime("002"));
        }
        catch (Throwable commitTime2) {
            var5_5 = commitTime2;
            throw commitTime2;
        }
        finally {
            if (client != null) {
                if (var5_5 != null) {
                    try {
                        client.close();
                    }
                    catch (Throwable commitTime2) {
                        var5_5.addSuppressed(commitTime2);
                    }
                } else {
                    client.close();
                }
            }
        }
        this.metaClient.getStorage().rename(backupCompletedRestoreFile, completeRestoreFile);
        client = this.getHoodieWriteClient(config);
        var5_5 = null;
        try {
            String commitTime3 = "003";
            client.startCommitWithTime("003");
            List inserts3 = this.dataGen.generateInserts("003", Integer.valueOf(100));
            JavaRDD insertRecordsRDD3 = this.jsc.parallelize(inserts3, 2);
            statuses = client.insert(insertRecordsRDD3, "003").collect();
            Assertions.assertNoWriteErrors((List)statuses);
        }
        catch (Throwable throwable) {
            var5_5 = throwable;
            throw throwable;
        }
        finally {
            if (client != null) {
                if (var5_5 != null) {
                    try {
                        client.close();
                    }
                    catch (Throwable throwable) {
                        var5_5.addSuppressed(throwable);
                    }
                } else {
                    client.close();
                }
            }
        }
    }

    @Test
    public void testDeletes() throws Exception {
        HoodieWriteConfig.Builder cfgBuilder = this.getConfigBuilder(HoodieFailedWritesCleaningPolicy.LAZY);
        this.addConfigsForPopulateMetaFields(cfgBuilder, true);
        SparkRDDWriteClient client = this.getHoodieWriteClient(cfgBuilder.build());
        String initCommitTime = "000";
        String newCommitTime = "001";
        ArrayList recordsInFirstBatch = new ArrayList();
        HoodieWriterClientTestHarness.Function2 recordGenFunction = (instantTime, numRecordsInThisCommit) -> {
            List fewRecordsForInsert = this.dataGen.generateInserts(instantTime, Integer.valueOf(200));
            List fewRecordsForDelete = this.dataGen.generateDeletes(instantTime, Integer.valueOf(100));
            recordsInFirstBatch.addAll(fewRecordsForInsert);
            recordsInFirstBatch.addAll(fewRecordsForDelete);
            return recordsInFirstBatch;
        };
        this.writeBatch(client, newCommitTime, initCommitTime, (Option<List<String>>)Option.empty(), initCommitTime, -1, (HoodieWriterClientTestHarness.Function2<List<HoodieRecord>, String, Integer>)recordGenFunction, (HoodieWriterClientTestHarness.Function3<JavaRDD<WriteStatus>, SparkRDDWriteClient, JavaRDD<HoodieRecord>, String>)((HoodieWriterClientTestHarness.Function3)SparkRDDWriteClient::upsert), true, 200, 200, 1, false, true);
        String prevCommitTime = newCommitTime;
        newCommitTime = "004";
        ArrayList recordsInSecondBatch = new ArrayList();
        recordGenFunction = (instantTime, numRecordsInThisCommit) -> {
            List fewRecordsForDelete = recordsInFirstBatch.subList(0, 50);
            List fewRecordsForUpdate = recordsInFirstBatch.subList(50, 100);
            recordsInSecondBatch.addAll(this.dataGen.generateDeletesFromExistingRecords(fewRecordsForDelete));
            recordsInSecondBatch.addAll(fewRecordsForUpdate);
            return recordsInSecondBatch;
        };
        this.writeBatch(client, newCommitTime, prevCommitTime, (Option<List<String>>)Option.empty(), initCommitTime, 100, (HoodieWriterClientTestHarness.Function2<List<HoodieRecord>, String, Integer>)recordGenFunction, (HoodieWriterClientTestHarness.Function3<JavaRDD<WriteStatus>, SparkRDDWriteClient, JavaRDD<HoodieRecord>, String>)((HoodieWriterClientTestHarness.Function3)SparkRDDWriteClient::upsert), true, 50, 150, 2, false, true);
    }

    @Test
    public void testDeletesForInsertsInSameBatch() throws Exception {
        HoodieWriteConfig.Builder cfgBuilder = this.getConfigBuilder(HoodieFailedWritesCleaningPolicy.LAZY);
        this.addConfigsForPopulateMetaFields(cfgBuilder, true);
        SparkRDDWriteClient client = this.getHoodieWriteClient(cfgBuilder.build());
        String initCommitTime = "000";
        String newCommitTime = "001";
        ArrayList recordsInFirstBatch = new ArrayList();
        HoodieWriterClientTestHarness.Function2 recordGenFunction = (instantTime, numRecordsInThisCommit) -> {
            List fewRecordsForInsert = this.dataGen.generateInserts(instantTime, Integer.valueOf(200));
            List fewRecordsForDelete = fewRecordsForInsert.subList(40, 90);
            recordsInFirstBatch.addAll(fewRecordsForInsert);
            recordsInFirstBatch.addAll(this.dataGen.generateDeletesFromExistingRecords(fewRecordsForDelete));
            return recordsInFirstBatch;
        };
        this.writeBatch(client, newCommitTime, initCommitTime, (Option<List<String>>)Option.empty(), initCommitTime, -1, (HoodieWriterClientTestHarness.Function2<List<HoodieRecord>, String, Integer>)recordGenFunction, (HoodieWriterClientTestHarness.Function3<JavaRDD<WriteStatus>, SparkRDDWriteClient, JavaRDD<HoodieRecord>, String>)((HoodieWriterClientTestHarness.Function3)SparkRDDWriteClient::upsert), true, 150, 150, 1, false, true);
    }

    private void assertPartitionPathRecordKeys(List<Pair<String, String>> expectedPartitionPathRecKeyPairs, String[] fullPartitionPaths) {
        Dataset<Row> rows = this.getAllRows(fullPartitionPaths);
        List<Pair<String, String>> actualPartitionPathRecKeyPairs = this.getActualPartitionPathAndRecordKeys(rows);
        this.assertActualAndExpectedPartitionPathRecordKeyMatches(expectedPartitionPathRecKeyPairs, actualPartitionPathRecKeyPairs);
    }

    private List<Pair<String, String>> getActualPartitionPathAndRecordKeys(Dataset<Row> rows) {
        ArrayList<Pair<String, String>> actualPartitionPathRecKeyPairs = new ArrayList<Pair<String, String>>();
        for (Row row : rows.collectAsList()) {
            actualPartitionPathRecKeyPairs.add((Pair<String, String>)Pair.of((Object)row.getAs("_hoodie_partition_path"), (Object)row.getAs("_row_key")));
        }
        return actualPartitionPathRecKeyPairs;
    }

    private Dataset<Row> getAllRows(String[] fullPartitionPaths) {
        return HoodieClientTestUtils.read(this.jsc, this.basePath, this.sqlContext, this.storage, fullPartitionPaths);
    }

    private String getFullPartitionPath(String relativePartitionPath) {
        return this.getFullPartitionPaths(relativePartitionPath)[0];
    }

    private String[] getFullPartitionPaths(String ... relativePartitionPaths) {
        String[] fullPartitionPaths = new String[relativePartitionPaths.length];
        for (int i = 0; i < fullPartitionPaths.length; ++i) {
            fullPartitionPaths[i] = String.format("%s/%s/*", this.basePath, relativePartitionPaths[i]);
        }
        return fullPartitionPaths;
    }

    private void assertActualAndExpectedPartitionPathRecordKeyMatches(List<Pair<String, String>> expectedPartitionPathRecKeyPairs, List<Pair<String, String>> actualPartitionPathRecKeyPairs) {
        org.junit.jupiter.api.Assertions.assertEquals((int)expectedPartitionPathRecKeyPairs.size(), (int)actualPartitionPathRecKeyPairs.size());
        for (Pair<String, String> entry : actualPartitionPathRecKeyPairs) {
            org.junit.jupiter.api.Assertions.assertTrue((boolean)expectedPartitionPathRecKeyPairs.contains(entry));
        }
        for (Pair<String, String> entry : expectedPartitionPathRecKeyPairs) {
            org.junit.jupiter.api.Assertions.assertTrue((boolean)actualPartitionPathRecKeyPairs.contains(entry));
        }
    }

    private Pair<List<WriteStatus>, List<HoodieRecord>> insertBatchRecords(SparkRDDWriteClient client, String commitTime, Integer recordNum, int expectStatueSize) {
        client.startCommitWithTime(commitTime);
        List inserts1 = this.dataGen.generateInserts(commitTime, recordNum);
        JavaRDD insertRecordsRDD1 = this.jsc.parallelize(inserts1, 1);
        List statuses = client.upsert(insertRecordsRDD1, commitTime).collect();
        Assertions.assertNoWriteErrors((List)statuses);
        org.junit.jupiter.api.Assertions.assertEquals((int)expectStatueSize, (int)statuses.size(), (String)"check expect statue size.");
        return Pair.of((Object)statuses, (Object)inserts1);
    }

    @Test
    public void testUpdateRejectForClustering() throws IOException {
        String testPartitionPath = "2016/09/26";
        this.dataGen = new HoodieTestDataGenerator(new String[]{"2016/09/26"});
        Properties props = new Properties();
        props.setProperty(HoodieClusteringConfig.ASYNC_CLUSTERING_ENABLE.key(), "true");
        props.setProperty(HoodieClusteringConfig.UPDATES_STRATEGY.key(), SparkRejectUpdateStrategy.class.getName());
        HoodieWriteConfig config = this.getSmallInsertWriteConfig(100, "{\"type\": \"record\",\"name\": \"triprec\",\"fields\": [ {\"name\": \"timestamp\",\"type\": \"long\"},{\"name\": \"_row_key\", \"type\": \"string\"},{\"name\": \"partition_path\", \"type\": [\"null\", \"string\"], \"default\": null },{\"name\": \"trip_type\", \"type\": {\"type\": \"enum\", \"name\": \"TripType\", \"symbols\": [\"UNKNOWN\", \"UBERX\", \"BLACK\"], \"default\": \"UNKNOWN\"}},{\"name\": \"rider\", \"type\": \"string\"},{\"name\": \"driver\", \"type\": \"string\"},{\"name\": \"begin_lat\", \"type\": \"double\"},{\"name\": \"begin_lon\", \"type\": \"double\"},{\"name\": \"end_lat\", \"type\": \"double\"},{\"name\": \"end_lon\", \"type\": \"double\"},{\"name\": \"distance_in_meters\", \"type\": \"int\"},{\"name\": \"seconds_since_epoch\", \"type\": \"long\"},{\"name\": \"weight\", \"type\": \"float\"},{\"name\": \"nation\", \"type\": \"bytes\"},{\"name\":\"current_date\",\"type\": {\"type\": \"int\", \"logicalType\": \"date\"}},{\"name\":\"current_ts\",\"type\": {\"type\": \"long\"}},{\"name\":\"height\",\"type\":{\"type\":\"fixed\",\"name\":\"abc\",\"size\":5,\"logicalType\":\"decimal\",\"precision\":10,\"scale\":6}},{\"name\": \"city_to_state\", \"type\": {\"type\": \"map\", \"values\": \"string\"}},{\"name\": \"fare\",\"type\": {\"type\":\"record\", \"name\":\"fare\",\"fields\": [{\"name\": \"amount\",\"type\": \"double\"},{\"name\": \"currency\", \"type\": \"string\"}]}},{\"name\": \"tip_history\", \"default\": [], \"type\": {\"type\": \"array\", \"default\": [], \"items\": {\"type\": \"record\", \"default\": null, \"name\": \"tip_history\", \"fields\": [{\"name\": \"amount\", \"type\": \"double\"}, {\"name\": \"currency\", \"type\": \"string\"}]}}},{\"name\": \"_hoodie_is_deleted\", \"type\": \"boolean\", \"default\": false} ]}", this.dataGen.getEstimatedFileSizeInBytes(150), true, props);
        SparkRDDWriteClient client = this.getHoodieWriteClient(config);
        HoodieSparkCopyOnWriteTable table = (HoodieSparkCopyOnWriteTable)HoodieSparkTable.create((HoodieWriteConfig)config, (HoodieEngineContext)this.context, (HoodieTableMetaClient)this.metaClient);
        String commitTime1 = "001";
        Pair<List<WriteStatus>, List<HoodieRecord>> upsertResult = this.insertBatchRecords(client, commitTime1, 600, 2);
        List inserts1 = (List)upsertResult.getValue();
        List fileGroupIds1 = table.getFileSystemView().getAllFileGroups("2016/09/26").map(fileGroup -> fileGroup.getFileGroupId().getFileId()).collect(Collectors.toList());
        org.junit.jupiter.api.Assertions.assertEquals((int)2, (int)fileGroupIds1.size());
        String commitTime2 = "002";
        List<List> firstInsertFileSlicesList = table.getFileSystemView().getAllFileGroups("2016/09/26").map(fileGroup -> fileGroup.getAllFileSlices().collect(Collectors.toList())).collect(Collectors.toList());
        List[] fileSlices = firstInsertFileSlicesList.toArray(new List[firstInsertFileSlicesList.size()]);
        this.createRequestedReplaceInstant(this.metaClient, commitTime2, fileSlices);
        String commitTime3 = "003";
        this.insertBatchRecords(client, commitTime3, 1, 1);
        List fileGroupIds2 = table.getFileSystemView().getAllFileGroups("2016/09/26").map(fileGroup -> fileGroup.getFileGroupId().getFileId()).collect(Collectors.toList());
        org.junit.jupiter.api.Assertions.assertEquals((int)3, (int)fileGroupIds2.size());
        String commitTime4 = "004";
        client.startCommitWithTime(commitTime4);
        ArrayList insertsAndUpdates3 = new ArrayList();
        insertsAndUpdates3.addAll(this.dataGen.generateUpdates(commitTime4, inserts1));
        String assertMsg = String.format("Not allowed to update the clustering files in partition: %s For pending clustering operations, we are not going to support update for now.", "2016/09/26");
        org.junit.jupiter.api.Assertions.assertThrows(HoodieUpsertException.class, () -> client.upsert(this.jsc.parallelize(insertsAndUpdates3, 1), commitTime4).collect(), (String)assertMsg);
        String commitTime5 = "005";
        List statuses = (List)this.insertBatchRecords(client, commitTime5, 1, 1).getKey();
        fileGroupIds2.removeAll(fileGroupIds1);
        org.junit.jupiter.api.Assertions.assertEquals(fileGroupIds2.get(0), (Object)((WriteStatus)statuses.get(0)).getFileId());
        List firstInsertFileGroupIds4 = table.getFileSystemView().getAllFileGroups("2016/09/26").map(fileGroup -> fileGroup.getFileGroupId().getFileId()).collect(Collectors.toList());
        org.junit.jupiter.api.Assertions.assertEquals((int)3, (int)firstInsertFileGroupIds4.size());
    }

    @Test
    public void testSmallInsertHandlingForUpserts() throws Exception {
        String testPartitionPath = "2016/09/26";
        int insertSplitLimit = 100;
        HoodieWriteConfig config = this.getSmallInsertWriteConfig(100, "{\"type\": \"record\",\"name\": \"triprec\",\"fields\": [ {\"name\": \"timestamp\",\"type\": \"long\"},{\"name\": \"_row_key\", \"type\": \"string\"},{\"name\": \"partition_path\", \"type\": [\"null\", \"string\"], \"default\": null },{\"name\": \"trip_type\", \"type\": {\"type\": \"enum\", \"name\": \"TripType\", \"symbols\": [\"UNKNOWN\", \"UBERX\", \"BLACK\"], \"default\": \"UNKNOWN\"}},{\"name\": \"rider\", \"type\": \"string\"},{\"name\": \"driver\", \"type\": \"string\"},{\"name\": \"begin_lat\", \"type\": \"double\"},{\"name\": \"begin_lon\", \"type\": \"double\"},{\"name\": \"end_lat\", \"type\": \"double\"},{\"name\": \"end_lon\", \"type\": \"double\"},{\"name\": \"distance_in_meters\", \"type\": \"int\"},{\"name\": \"seconds_since_epoch\", \"type\": \"long\"},{\"name\": \"weight\", \"type\": \"float\"},{\"name\": \"nation\", \"type\": \"bytes\"},{\"name\":\"current_date\",\"type\": {\"type\": \"int\", \"logicalType\": \"date\"}},{\"name\":\"current_ts\",\"type\": {\"type\": \"long\"}},{\"name\":\"height\",\"type\":{\"type\":\"fixed\",\"name\":\"abc\",\"size\":5,\"logicalType\":\"decimal\",\"precision\":10,\"scale\":6}},{\"name\": \"city_to_state\", \"type\": {\"type\": \"map\", \"values\": \"string\"}},{\"name\": \"fare\",\"type\": {\"type\":\"record\", \"name\":\"fare\",\"fields\": [{\"name\": \"amount\",\"type\": \"double\"},{\"name\": \"currency\", \"type\": \"string\"}]}},{\"name\": \"tip_history\", \"default\": [], \"type\": {\"type\": \"array\", \"default\": [], \"items\": {\"type\": \"record\", \"default\": null, \"name\": \"tip_history\", \"fields\": [{\"name\": \"amount\", \"type\": \"double\"}, {\"name\": \"currency\", \"type\": \"string\"}]}}},{\"name\": \"_hoodie_is_deleted\", \"type\": \"boolean\", \"default\": false} ]}", (long)this.dataGen.getEstimatedFileSizeInBytes(150));
        this.dataGen = new HoodieTestDataGenerator(new String[]{"2016/09/26"});
        SparkRDDWriteClient client = this.getHoodieWriteClient(config);
        FileFormatUtils fileUtils = HoodieIOFactory.getIOFactory((HoodieStorage)this.metaClient.getStorage()).getFileFormatUtils(this.metaClient.getTableConfig().getBaseFileFormat());
        String commitTime1 = "001";
        client.startCommitWithTime(commitTime1);
        List inserts1 = this.dataGen.generateInserts(commitTime1, Integer.valueOf(100));
        Set keys1 = Transformations.recordsToRecordKeySet((List)inserts1);
        JavaRDD insertRecordsRDD1 = this.jsc.parallelize(inserts1, 1);
        List statuses = client.upsert(insertRecordsRDD1, commitTime1).collect();
        Assertions.assertNoWriteErrors((List)statuses);
        org.junit.jupiter.api.Assertions.assertEquals((int)1, (int)statuses.size(), (String)"Just 1 file needs to be added.");
        String file1 = ((WriteStatus)statuses.get(0)).getFileId();
        org.junit.jupiter.api.Assertions.assertEquals((int)100, (int)fileUtils.readRowKeys(this.storage, new StoragePath(this.basePath, ((WriteStatus)statuses.get(0)).getStat().getPath())).size(), (String)"file should contain 100 records");
        String commitTime2 = "002";
        client.startCommitWithTime(commitTime2);
        List inserts2 = this.dataGen.generateInserts(commitTime2, Integer.valueOf(40));
        Set keys2 = Transformations.recordsToRecordKeySet((List)inserts2);
        ArrayList insertsAndUpdates2 = new ArrayList();
        insertsAndUpdates2.addAll(inserts2);
        insertsAndUpdates2.addAll(this.dataGen.generateUpdates(commitTime2, inserts1));
        JavaRDD insertAndUpdatesRDD2 = this.jsc.parallelize(insertsAndUpdates2, 1);
        statuses = client.upsert(insertAndUpdatesRDD2, commitTime2).collect();
        Assertions.assertNoWriteErrors((List)statuses);
        org.junit.jupiter.api.Assertions.assertEquals((int)1, (int)statuses.size(), (String)"Just 1 file needs to be updated.");
        org.junit.jupiter.api.Assertions.assertEquals((Object)file1, (Object)((WriteStatus)statuses.get(0)).getFileId(), (String)"Existing file should be expanded");
        org.junit.jupiter.api.Assertions.assertEquals((Object)commitTime1, (Object)((WriteStatus)statuses.get(0)).getStat().getPrevCommit(), (String)"Existing file should be expanded");
        StoragePath newFile = new StoragePath(this.basePath, ((WriteStatus)statuses.get(0)).getStat().getPath());
        org.junit.jupiter.api.Assertions.assertEquals((int)140, (int)fileUtils.readRowKeys(this.storage, newFile).size(), (String)"file should contain 140 records");
        List records = fileUtils.readAvroRecords(this.storage, newFile);
        for (GenericRecord record : records) {
            String recordKey = record.get(HoodieRecord.RECORD_KEY_METADATA_FIELD).toString();
            org.junit.jupiter.api.Assertions.assertEquals((Object)commitTime2, (Object)record.get(HoodieRecord.COMMIT_TIME_METADATA_FIELD).toString(), (String)"only expect commit2");
            org.junit.jupiter.api.Assertions.assertTrue((keys2.contains(recordKey) || keys1.contains(recordKey) ? 1 : 0) != 0, (String)"key expected to be part of commit2");
        }
        String commitTime3 = "003";
        client.startCommitWithTime(commitTime3);
        List insertsAndUpdates3 = this.dataGen.generateInserts(commitTime3, Integer.valueOf(200));
        Set keys3 = Transformations.recordsToRecordKeySet((List)insertsAndUpdates3);
        List updates3 = this.dataGen.generateUpdates(commitTime3, inserts2);
        insertsAndUpdates3.addAll(updates3);
        JavaRDD insertAndUpdatesRDD3 = this.jsc.parallelize(insertsAndUpdates3, 1);
        statuses = client.upsert(insertAndUpdatesRDD3, commitTime3).collect();
        Assertions.assertNoWriteErrors((List)statuses);
        org.junit.jupiter.api.Assertions.assertEquals((int)2, (int)statuses.size(), (String)"2 files needs to be committed.");
        HoodieTableMetaClient metadata = this.createMetaClient(this.basePath);
        HoodieSparkTable table = this.getHoodieTable(metadata, config);
        TableFileSystemView.BaseFileOnlyView fileSystemView = table.getBaseFileOnlyView();
        List files = fileSystemView.getLatestBaseFilesBeforeOrOn("2016/09/26", commitTime3).collect(Collectors.toList());
        int numTotalInsertsInCommit3 = 0;
        int numTotalUpdatesInCommit3 = 0;
        for (HoodieBaseFile file : files) {
            String recordKey;
            if (file.getFileName().contains(file1)) {
                org.junit.jupiter.api.Assertions.assertEquals((Object)commitTime3, (Object)file.getCommitTime(), (String)"Existing file should be expanded");
                records = fileUtils.readAvroRecords(this.storage, new StoragePath(file.getPath()));
                for (GenericRecord record : records) {
                    recordKey = record.get(HoodieRecord.RECORD_KEY_METADATA_FIELD).toString();
                    String recordCommitTime = record.get(HoodieRecord.COMMIT_TIME_METADATA_FIELD).toString();
                    if (!recordCommitTime.equals(commitTime3)) continue;
                    if (keys2.contains(recordKey)) {
                        keys2.remove(recordKey);
                        ++numTotalUpdatesInCommit3;
                        continue;
                    }
                    ++numTotalInsertsInCommit3;
                }
                org.junit.jupiter.api.Assertions.assertEquals((int)0, (int)keys2.size(), (String)"All keys added in commit 2 must be updated in commit3 correctly");
                continue;
            }
            org.junit.jupiter.api.Assertions.assertEquals((Object)commitTime3, (Object)file.getCommitTime(), (String)"New file must be written for commit 3");
            records = fileUtils.readAvroRecords(this.storage, new StoragePath(file.getPath()));
            for (GenericRecord record : records) {
                recordKey = record.get(HoodieRecord.RECORD_KEY_METADATA_FIELD).toString();
                org.junit.jupiter.api.Assertions.assertEquals((Object)commitTime3, (Object)record.get(HoodieRecord.COMMIT_TIME_METADATA_FIELD).toString(), (String)"only expect commit3");
                org.junit.jupiter.api.Assertions.assertTrue((boolean)keys3.contains(recordKey), (String)"key expected to be part of commit3");
            }
            numTotalInsertsInCommit3 += records.size();
        }
        org.junit.jupiter.api.Assertions.assertEquals((int)numTotalUpdatesInCommit3, (int)inserts2.size(), (String)"Total updates in commit3 must add up");
        org.junit.jupiter.api.Assertions.assertEquals((int)numTotalInsertsInCommit3, (int)keys3.size(), (String)"Total inserts in commit3 must add up");
    }

    @ParameterizedTest
    @MethodSource(value={"smallInsertHandlingParams"})
    public void testSmallInsertHandlingForInserts(boolean mergeAllowDuplicateInserts) throws Exception {
        String testPartitionPath = "2016/09/26";
        int insertSplitLimit = 100;
        HoodieWriteConfig config = this.getSmallInsertWriteConfig(100, false, mergeAllowDuplicateInserts);
        this.dataGen = new HoodieTestDataGenerator(new String[]{"2016/09/26"});
        SparkRDDWriteClient client = this.getHoodieWriteClient(config);
        FileFormatUtils fileUtils = HoodieIOFactory.getIOFactory((HoodieStorage)this.metaClient.getStorage()).getFileFormatUtils(this.metaClient.getTableConfig().getBaseFileFormat());
        String commitTime1 = "001";
        client.startCommitWithTime(commitTime1);
        List inserts1 = this.dataGen.generateInserts(commitTime1, Integer.valueOf(100));
        Set keys1 = Transformations.recordsToRecordKeySet((List)inserts1);
        JavaRDD insertRecordsRDD1 = this.jsc.parallelize(inserts1, 1);
        List statuses = client.insert(insertRecordsRDD1, commitTime1).collect();
        Assertions.assertNoWriteErrors((List)statuses);
        TestHoodieClientOnCopyOnWriteStorage.assertPartitionMetadata((String)this.basePath, (String[])new String[]{"2016/09/26"}, (HoodieStorage)this.storage);
        org.junit.jupiter.api.Assertions.assertEquals((int)1, (int)statuses.size(), (String)"Just 1 file needs to be added.");
        String file1 = ((WriteStatus)statuses.get(0)).getFileId();
        org.junit.jupiter.api.Assertions.assertEquals((int)100, (int)fileUtils.readRowKeys(this.storage, new StoragePath(this.basePath, ((WriteStatus)statuses.get(0)).getStat().getPath())).size(), (String)"file should contain 100 records");
        String commitTime2 = "002";
        client.startCommitWithTime(commitTime2);
        List inserts2 = this.dataGen.generateInserts(commitTime2, Integer.valueOf(40));
        Set keys2 = Transformations.recordsToRecordKeySet((List)inserts2);
        JavaRDD insertRecordsRDD2 = this.jsc.parallelize(inserts2, 1);
        statuses = client.insert(insertRecordsRDD2, commitTime2).collect();
        Assertions.assertNoWriteErrors((List)statuses);
        org.junit.jupiter.api.Assertions.assertEquals((int)1, (int)statuses.size(), (String)"Just 1 file needs to be updated.");
        org.junit.jupiter.api.Assertions.assertEquals((Object)file1, (Object)((WriteStatus)statuses.get(0)).getFileId(), (String)"Existing file should be expanded");
        org.junit.jupiter.api.Assertions.assertEquals((Object)commitTime1, (Object)((WriteStatus)statuses.get(0)).getStat().getPrevCommit(), (String)"Existing file should be expanded");
        StoragePath newFile = new StoragePath(this.basePath, ((WriteStatus)statuses.get(0)).getStat().getPath());
        org.junit.jupiter.api.Assertions.assertEquals((int)140, (int)fileUtils.readRowKeys(this.storage, newFile).size(), (String)"file should contain 140 records");
        List records = fileUtils.readAvroRecords(this.storage, newFile);
        for (GenericRecord record : records) {
            String recordKey = record.get(HoodieRecord.RECORD_KEY_METADATA_FIELD).toString();
            String recCommitTime = record.get(HoodieRecord.COMMIT_TIME_METADATA_FIELD).toString();
            org.junit.jupiter.api.Assertions.assertTrue((commitTime1.equals(recCommitTime) || commitTime2.equals(recCommitTime) ? 1 : 0) != 0, (String)"Record expected to be part of commit 1 or commit2");
            org.junit.jupiter.api.Assertions.assertTrue((keys2.contains(recordKey) || keys1.contains(recordKey) ? 1 : 0) != 0, (String)"key expected to be part of commit 1 or commit2");
        }
        String commitTime3 = "003";
        client.startCommitWithTime(commitTime3);
        List inserts3 = this.dataGen.generateInserts(commitTime3, Integer.valueOf(200));
        JavaRDD insertRecordsRDD3 = this.jsc.parallelize(inserts3, 1);
        statuses = client.insert(insertRecordsRDD3, commitTime3).collect();
        Assertions.assertNoWriteErrors((List)statuses);
        org.junit.jupiter.api.Assertions.assertEquals((int)2, (int)statuses.size(), (String)"2 files needs to be committed.");
        org.junit.jupiter.api.Assertions.assertEquals((int)340, (int)(fileUtils.readRowKeys(this.storage, new StoragePath(this.basePath, ((WriteStatus)statuses.get(0)).getStat().getPath())).size() + fileUtils.readRowKeys(this.storage, new StoragePath(this.basePath, ((WriteStatus)statuses.get(1)).getStat().getPath())).size()), (String)"file should contain 340 records");
        HoodieTableMetaClient metaClient = this.createMetaClient(this.basePath);
        HoodieSparkTable table = this.getHoodieTable(metaClient, config);
        List files = table.getBaseFileOnlyView().getLatestBaseFilesBeforeOrOn("2016/09/26", commitTime3).collect(Collectors.toList());
        org.junit.jupiter.api.Assertions.assertEquals((int)2, (int)files.size(), (String)"Total of 2 valid data files");
        int totalInserts = 0;
        for (HoodieBaseFile file : files) {
            org.junit.jupiter.api.Assertions.assertEquals((Object)commitTime3, (Object)file.getCommitTime(), (String)"All files must be at commit 3");
            totalInserts += fileUtils.readAvroRecords(this.storage, new StoragePath(file.getPath())).size();
        }
        org.junit.jupiter.api.Assertions.assertEquals((int)totalInserts, (int)(inserts1.size() + inserts2.size() + inserts3.size()), (String)"Total number of records must add up");
    }

    @Test
    public void testDeletesWithDeleteApi() throws Exception {
        String testPartitionPath = "2016/09/26";
        int insertSplitLimit = 100;
        HoodieWriteConfig config = this.getSmallInsertWriteConfig(100, "{\"type\": \"record\",\"name\": \"triprec\",\"fields\": [ {\"name\": \"timestamp\",\"type\": \"long\"},{\"name\": \"_row_key\", \"type\": \"string\"},{\"name\": \"partition_path\", \"type\": [\"null\", \"string\"], \"default\": null },{\"name\": \"trip_type\", \"type\": {\"type\": \"enum\", \"name\": \"TripType\", \"symbols\": [\"UNKNOWN\", \"UBERX\", \"BLACK\"], \"default\": \"UNKNOWN\"}},{\"name\": \"rider\", \"type\": \"string\"},{\"name\": \"driver\", \"type\": \"string\"},{\"name\": \"begin_lat\", \"type\": \"double\"},{\"name\": \"begin_lon\", \"type\": \"double\"},{\"name\": \"end_lat\", \"type\": \"double\"},{\"name\": \"end_lon\", \"type\": \"double\"},{\"name\": \"distance_in_meters\", \"type\": \"int\"},{\"name\": \"seconds_since_epoch\", \"type\": \"long\"},{\"name\": \"weight\", \"type\": \"float\"},{\"name\": \"nation\", \"type\": \"bytes\"},{\"name\":\"current_date\",\"type\": {\"type\": \"int\", \"logicalType\": \"date\"}},{\"name\":\"current_ts\",\"type\": {\"type\": \"long\"}},{\"name\":\"height\",\"type\":{\"type\":\"fixed\",\"name\":\"abc\",\"size\":5,\"logicalType\":\"decimal\",\"precision\":10,\"scale\":6}},{\"name\": \"city_to_state\", \"type\": {\"type\": \"map\", \"values\": \"string\"}},{\"name\": \"fare\",\"type\": {\"type\":\"record\", \"name\":\"fare\",\"fields\": [{\"name\": \"amount\",\"type\": \"double\"},{\"name\": \"currency\", \"type\": \"string\"}]}},{\"name\": \"tip_history\", \"default\": [], \"type\": {\"type\": \"array\", \"default\": [], \"items\": {\"type\": \"record\", \"default\": null, \"name\": \"tip_history\", \"fields\": [{\"name\": \"amount\", \"type\": \"double\"}, {\"name\": \"currency\", \"type\": \"string\"}]}}},{\"name\": \"_hoodie_is_deleted\", \"type\": \"boolean\", \"default\": false} ]}", (long)this.dataGen.getEstimatedFileSizeInBytes(150));
        this.dataGen = new HoodieTestDataGenerator(new String[]{"2016/09/26"});
        SparkRDDWriteClient client = this.getHoodieWriteClient(config);
        String commitTime1 = "001";
        client.startCommitWithTime(commitTime1);
        List inserts1 = this.dataGen.generateInserts(commitTime1, Integer.valueOf(100));
        Set keys1 = Transformations.recordsToRecordKeySet((List)inserts1);
        ArrayList<String> keysSoFar = new ArrayList<String>(keys1);
        JavaRDD insertRecordsRDD1 = this.jsc.parallelize(inserts1, 1);
        List statuses = client.upsert(insertRecordsRDD1, commitTime1).collect();
        Assertions.assertNoWriteErrors((List)statuses);
        org.junit.jupiter.api.Assertions.assertEquals((int)1, (int)statuses.size(), (String)"Just 1 file needs to be added.");
        String file1 = ((WriteStatus)statuses.get(0)).getFileId();
        org.junit.jupiter.api.Assertions.assertEquals((int)100, (int)HoodieIOFactory.getIOFactory((HoodieStorage)this.metaClient.getStorage()).getFileFormatUtils(this.metaClient.getTableConfig().getBaseFileFormat()).readRowKeys(this.storage, new StoragePath(this.basePath, ((WriteStatus)statuses.get(0)).getStat().getPath())).size(), (String)"file should contain 100 records");
        this.testDeletes(client, inserts1, 20, file1, "002", 80, keysSoFar);
        Pair<Set<String>, List<HoodieRecord>> updateBatch2 = this.testUpdates("003", client, 40, 120);
        keysSoFar.addAll((Collection)updateBatch2.getLeft());
        this.testDeletes(client, (List)updateBatch2.getRight(), 10, file1, "004", 110, keysSoFar);
        Pair<Set<String>, List<HoodieRecord>> updateBatch3 = this.testUpdates("005", client, 40, 150);
        keysSoFar.addAll((Collection)updateBatch3.getLeft());
        String commitTime6 = "006";
        client.startCommitWithTime(commitTime6);
        List dummyInserts3 = this.dataGen.generateInserts(commitTime6, Integer.valueOf(20));
        List hoodieKeysToDelete3 = Transformations.randomSelectAsHoodieKeys((List)dummyInserts3, (int)20);
        JavaRDD deleteKeys3 = this.jsc.parallelize(hoodieKeysToDelete3, 1);
        statuses = client.delete(deleteKeys3, commitTime6).collect();
        Assertions.assertNoWriteErrors((List)statuses);
        org.junit.jupiter.api.Assertions.assertEquals((int)0, (int)statuses.size(), (String)"Just 0 write status for delete.");
        String[] fullPartitionPaths = new String[this.dataGen.getPartitionPaths().length];
        for (int i = 0; i < fullPartitionPaths.length; ++i) {
            fullPartitionPaths[i] = String.format("%s/%s/*", this.basePath, this.dataGen.getPartitionPaths()[i]);
        }
        org.junit.jupiter.api.Assertions.assertEquals((long)150L, (long)HoodieClientTestUtils.read(this.jsc, this.basePath, this.sqlContext, this.storage, fullPartitionPaths).count(), (String)"Must contain 150 records");
        this.testDeletes(client, (List)updateBatch3.getRight(), 10, file1, "007", 140, keysSoFar);
    }

    @ParameterizedTest
    @MethodSource(value={"populateMetaFieldsParams"})
    public void testSimpleClustering(boolean populateMetaFields) throws Exception {
        HoodieClusteringConfig clusteringConfig = HoodieClusteringConfig.newBuilder().withClusteringMaxNumGroups(10).withClusteringTargetPartitions(0).withInlineClusteringNumCommits(1).withInlineClustering(Boolean.valueOf(true)).fromProperties(TestHoodieClientOnCopyOnWriteStorage.getDisabledRowWriterProperties()).build();
        this.testInsertAndClustering(clusteringConfig, populateMetaFields, true, false, SqlQueryEqualityPreCommitValidator.class.getName(), COUNT_SQL_QUERY_FOR_VALIDATION, "");
    }

    @Test
    public void testAndValidateClusteringOutputFiles() throws IOException {
        String partitionPath = "2015/03/16";
        this.testInsertTwoBatches(true, partitionPath);
        HoodieWriteConfig.Builder cfgBuilder = this.getConfigBuilder().withEmbeddedTimelineServerEnabled(false).withAutoCommit(false).withClusteringConfig(HoodieClusteringConfig.newBuilder().withInlineClustering(Boolean.valueOf(true)).withInlineClusteringNumCommits(2).fromProperties(TestHoodieClientOnCopyOnWriteStorage.getDisabledRowWriterProperties()).build());
        try (SparkRDDWriteClient client = this.getHoodieWriteClient(cfgBuilder.build());){
            int numRecords = 200;
            String newCommitTime = HoodieActiveTimeline.createNewInstantTime();
            List records1 = this.dataGen.generateInserts(newCommitTime, Integer.valueOf(numRecords));
            client.startCommitWithTime(newCommitTime);
            JavaRDD insertRecordsRDD1 = this.jsc.parallelize(records1, 2);
            JavaRDD statuses = client.insert(insertRecordsRDD1, newCommitTime);
            client.commit(newCommitTime, (Object)statuses);
            List statusList = statuses.collect();
            Assertions.assertNoWriteErrors((List)statusList);
            this.metaClient = HoodieTableMetaClient.reload((HoodieTableMetaClient)this.metaClient);
            HoodieInstant replaceCommitInstant = (HoodieInstant)this.metaClient.getActiveTimeline().getCompletedReplaceTimeline().firstInstant().get();
            HoodieReplaceCommitMetadata replaceCommitMetadata = (HoodieReplaceCommitMetadata)HoodieReplaceCommitMetadata.fromBytes((byte[])((byte[])this.metaClient.getActiveTimeline().getInstantDetails(replaceCommitInstant).get()), HoodieReplaceCommitMetadata.class);
            ArrayList filesFromReplaceCommit = new ArrayList();
            replaceCommitMetadata.getPartitionToWriteStats().forEach((k, v) -> v.forEach(entry -> filesFromReplaceCommit.add(entry.getPath())));
            List pathInfoList = this.storage.listDirectEntries(new StoragePath(this.basePath + "/" + partitionPath));
            List clusteredFiles = pathInfoList.stream().filter(entry -> entry.getPath().getName().contains(replaceCommitInstant.getTimestamp())).map(pathInfo -> partitionPath + "/" + pathInfo.getPath().getName()).collect(Collectors.toList());
            org.junit.jupiter.api.Assertions.assertEquals(clusteredFiles, filesFromReplaceCommit);
        }
    }

    @Test
    public void testRollbackOfRegularCommitWithPendingReplaceCommitInTimeline() throws Exception {
        HoodieClusteringConfig clusteringConfig = HoodieClusteringConfig.newBuilder().withClusteringMaxNumGroups(10).withClusteringTargetPartitions(0).withInlineClusteringNumCommits(1).withInlineClustering(Boolean.valueOf(true)).fromProperties(TestHoodieClientOnCopyOnWriteStorage.getDisabledRowWriterProperties()).build();
        this.testInsertAndClustering(clusteringConfig, true, false, false, SqlQueryEqualityPreCommitValidator.class.getName(), COUNT_SQL_QUERY_FOR_VALIDATION, "");
        HoodieWriteConfig.Builder cfgBuilder = this.getConfigBuilder().withAutoCommit(false);
        SparkRDDWriteClient client = this.getHoodieWriteClient(cfgBuilder.build());
        String commitTime1 = HoodieActiveTimeline.createNewInstantTime();
        List records1 = this.dataGen.generateInserts(commitTime1, Integer.valueOf(200));
        client.startCommitWithTime(commitTime1);
        JavaRDD insertRecordsRDD1 = this.jsc.parallelize(records1, 2);
        JavaRDD statuses = client.upsert(insertRecordsRDD1, commitTime1);
        List statusList = statuses.collect();
        Assertions.assertNoWriteErrors((List)statusList);
        HoodieTableMetaClient metaClient = this.createMetaClient(this.basePath);
        org.junit.jupiter.api.Assertions.assertEquals((int)2, (int)metaClient.getActiveTimeline().getCommitsTimeline().filterInflightsAndRequested().countInstants());
        records1 = this.dataGen.generateInserts(commitTime1, Integer.valueOf(200));
        client.startCommitWithTime(commitTime1);
        insertRecordsRDD1 = this.jsc.parallelize(records1, 2);
        statuses = client.upsert(insertRecordsRDD1, commitTime1);
        statusList = statuses.collect();
        Assertions.assertNoWriteErrors((List)statusList);
        client.commit(commitTime1, (Object)statuses);
        metaClient.reloadActiveTimeline();
        org.junit.jupiter.api.Assertions.assertEquals((int)1, (int)metaClient.getActiveTimeline().getCommitsTimeline().filterInflightsAndRequested().countInstants());
    }

    @ParameterizedTest
    @ValueSource(booleans={true, false})
    public void testInlineScheduleClustering(boolean scheduleInlineClustering) throws IOException {
        this.testInsertTwoBatches(true);
        HoodieClusteringConfig clusteringConfig = HoodieClusteringConfig.newBuilder().withClusteringMaxNumGroups(10).withClusteringTargetPartitions(0).withInlineClusteringNumCommits(1).withInlineClustering(Boolean.valueOf(false)).withScheduleInlineClustering(Boolean.valueOf(scheduleInlineClustering)).build();
        HoodieWriteConfig config = this.getConfigBuilder(HoodieFailedWritesCleaningPolicy.LAZY).withAutoCommit(false).withClusteringConfig(clusteringConfig).withProps((Map)TestHoodieClientOnCopyOnWriteStorage.getPropertiesForKeyGen()).build();
        SparkRDDWriteClient client = this.getHoodieWriteClient(config);
        this.dataGen = new HoodieTestDataGenerator(new String[]{"2015/03/16"});
        String commitTime1 = HoodieActiveTimeline.createNewInstantTime();
        List records1 = this.dataGen.generateInserts(commitTime1, Integer.valueOf(200));
        client.startCommitWithTime(commitTime1);
        JavaRDD insertRecordsRDD1 = this.jsc.parallelize(records1, 2);
        JavaRDD statuses = client.upsert(insertRecordsRDD1, commitTime1);
        List statusList = statuses.collect();
        Assertions.assertNoWriteErrors((List)statusList);
        client.commit(commitTime1, (Object)statuses);
        HoodieTableMetaClient metaClient = this.createMetaClient(this.basePath);
        List pendingClusteringPlans = ClusteringUtils.getAllPendingClusteringPlans((HoodieTableMetaClient)metaClient).collect(Collectors.toList());
        if (scheduleInlineClustering) {
            org.junit.jupiter.api.Assertions.assertEquals((int)1, (int)pendingClusteringPlans.size());
        } else {
            org.junit.jupiter.api.Assertions.assertEquals((int)0, (int)pendingClusteringPlans.size());
        }
    }

    @ParameterizedTest
    @MethodSource(value={"populateMetaFieldsParams"})
    public void testClusteringWithSortColumns(boolean populateMetaFields) throws Exception {
        HoodieClusteringConfig clusteringConfig = HoodieClusteringConfig.newBuilder().withClusteringMaxNumGroups(10).withClusteringSortColumns(populateMetaFields ? "_hoodie_record_key" : "_row_key").withClusteringTargetPartitions(0).withInlineClusteringNumCommits(1).withInlineClustering(Boolean.valueOf(true)).fromProperties(TestHoodieClientOnCopyOnWriteStorage.getDisabledRowWriterProperties()).build();
        this.testInsertAndClustering(clusteringConfig, populateMetaFields, true, false, SqlQueryEqualityPreCommitValidator.class.getName(), COUNT_SQL_QUERY_FOR_VALIDATION, "");
    }

    @ParameterizedTest
    @MethodSource(value={"populateMetaFieldsParams"})
    public void testClusteringWithSortOneFilePerGroup(boolean populateMetaFields) throws Exception {
        HoodieClusteringConfig clusteringConfig = HoodieClusteringConfig.newBuilder().withClusteringMaxNumGroups(10).withClusteringSortColumns("begin_lat,begin_lon").withClusteringPlanStrategyClass(SparkSingleFileSortPlanStrategy.class.getName()).withClusteringExecutionStrategyClass(SparkSingleFileSortExecutionStrategy.class.getName()).withClusteringTargetPartitions(0).withInlineClusteringNumCommits(1).fromProperties(TestHoodieClientOnCopyOnWriteStorage.getDisabledRowWriterProperties()).build();
        this.testInsertAndClustering(clusteringConfig, populateMetaFields, true, true, SqlQueryEqualityPreCommitValidator.class.getName(), COUNT_SQL_QUERY_FOR_VALIDATION, "");
    }

    @Test
    public void testPendingClusteringRollback() throws Exception {
        boolean populateMetaFields = true;
        HoodieClusteringConfig clusteringConfig = HoodieClusteringConfig.newBuilder().withClusteringMaxNumGroups(10).withClusteringTargetPartitions(0).withInlineClusteringNumCommits(1).withInlineClustering(Boolean.valueOf(true)).fromProperties(TestHoodieClientOnCopyOnWriteStorage.getDisabledRowWriterProperties()).build();
        List<HoodieRecord> allRecords = this.testInsertAndClustering(clusteringConfig, populateMetaFields, false);
        HoodieTableMetaClient metaClient = this.createMetaClient(this.basePath);
        List pendingClusteringPlans = ClusteringUtils.getAllPendingClusteringPlans((HoodieTableMetaClient)metaClient).collect(Collectors.toList());
        org.junit.jupiter.api.Assertions.assertEquals((int)1, (int)pendingClusteringPlans.size());
        HoodieInstant pendingClusteringInstant = (HoodieInstant)((Pair)pendingClusteringPlans.get(0)).getLeft();
        HoodieWriteConfig.Builder cfgBuilder = this.getConfigBuilder(HoodieFailedWritesCleaningPolicy.EAGER);
        this.addConfigsForPopulateMetaFields(cfgBuilder, populateMetaFields);
        HoodieWriteConfig config = cfgBuilder.build();
        SparkRDDWriteClient client = this.getHoodieWriteClient(config);
        this.dataGen = new HoodieTestDataGenerator();
        String commitTime = HoodieActiveTimeline.createNewInstantTime();
        allRecords.addAll(this.dataGen.generateInserts(commitTime, Integer.valueOf(200)));
        org.junit.jupiter.api.Assertions.assertThrows(HoodieUpsertException.class, () -> this.writeAndVerifyBatch(client, allRecords, commitTime, populateMetaFields));
        client.rollback(pendingClusteringInstant.getTimestamp());
        metaClient.reloadActiveTimeline();
        org.junit.jupiter.api.Assertions.assertEquals((long)0L, (long)ClusteringUtils.getAllPendingClusteringPlans((HoodieTableMetaClient)metaClient).count());
        HoodieInstant rollbackInstant = (HoodieInstant)metaClient.getActiveTimeline().getRollbackTimeline().lastInstant().get();
        FileCreateUtils.deleteRollbackCommit((String)metaClient.getBasePath(), (String)rollbackInstant.getTimestamp());
        metaClient.reloadActiveTimeline();
        HoodieClusteringPlan clusteringPlan = ClusteringTestUtils.createClusteringPlan(metaClient, pendingClusteringInstant.getTimestamp(), "1");
        HoodieRequestedReplaceMetadata requestedReplaceMetadata = HoodieRequestedReplaceMetadata.newBuilder().setClusteringPlan(clusteringPlan).setOperationType(WriteOperationType.CLUSTER.name()).build();
        FileCreateUtils.createRequestedReplaceCommit((String)metaClient.getBasePath(), (String)pendingClusteringInstant.getTimestamp(), (Option)Option.of((Object)requestedReplaceMetadata));
        try {
            client.cluster(pendingClusteringInstant.getTimestamp(), false);
        }
        catch (Exception exception) {
            // empty catch block
        }
        metaClient.reloadActiveTimeline();
        HoodieInstant newRollbackInstant = (HoodieInstant)metaClient.getActiveTimeline().getRollbackTimeline().lastInstant().get();
        org.junit.jupiter.api.Assertions.assertEquals((Object)rollbackInstant.getTimestamp(), (Object)newRollbackInstant.getTimestamp());
    }

    @ParameterizedTest
    @ValueSource(booleans={true, false})
    public void testInflightClusteringRollbackWhenUpdatesAllowed(boolean rollbackPendingClustering) throws Exception {
        HoodieClusteringConfig clusteringConfig = HoodieClusteringConfig.newBuilder().withClusteringMaxNumGroups(10).withClusteringTargetPartitions(0).withClusteringUpdatesStrategy("org.apache.hudi.client.clustering.update.strategy.SparkAllowUpdateStrategy").withRollbackPendingClustering(Boolean.valueOf(rollbackPendingClustering)).withInlineClustering(Boolean.valueOf(true)).withInlineClusteringNumCommits(1).fromProperties(TestHoodieClientOnCopyOnWriteStorage.getDisabledRowWriterProperties()).build();
        List<HoodieRecord> allRecords = this.testInsertAndClustering(clusteringConfig, true, false);
        HoodieTableMetaClient metaClient = this.createMetaClient(this.basePath);
        List pendingClusteringPlans = ClusteringUtils.getAllPendingClusteringPlans((HoodieTableMetaClient)metaClient).collect(Collectors.toList());
        org.junit.jupiter.api.Assertions.assertEquals((int)1, (int)pendingClusteringPlans.size());
        HoodieInstant pendingClusteringInstant = (HoodieInstant)((Pair)pendingClusteringPlans.get(0)).getLeft();
        org.junit.jupiter.api.Assertions.assertEquals((Object)pendingClusteringInstant.getState(), (Object)HoodieInstant.State.INFLIGHT);
        HoodieWriteConfig.Builder cfgBuilder = this.getConfigBuilder(HoodieFailedWritesCleaningPolicy.EAGER);
        this.addConfigsForPopulateMetaFields(cfgBuilder, true);
        cfgBuilder.withClusteringConfig(clusteringConfig);
        HoodieWriteConfig config = cfgBuilder.build();
        SparkRDDWriteClient client = this.getHoodieWriteClient(config);
        String commitTime = HoodieActiveTimeline.createNewInstantTime();
        allRecords.addAll(this.dataGen.generateUpdates(commitTime, Integer.valueOf(200)));
        this.writeAndVerifyBatch(client, allRecords, commitTime, true);
        metaClient.reloadActiveTimeline();
        pendingClusteringPlans = ClusteringUtils.getAllPendingClusteringPlans((HoodieTableMetaClient)metaClient).collect(Collectors.toList());
        org.junit.jupiter.api.Assertions.assertEquals((int)(config.isRollbackPendingClustering() ? 0 : 1), (int)pendingClusteringPlans.size());
    }

    @Test
    public void testClusteringWithFailingValidator() throws Exception {
        HoodieClusteringConfig clusteringConfig = HoodieClusteringConfig.newBuilder().withClusteringMaxNumGroups(10).withClusteringSortColumns("_hoodie_record_key").withInlineClustering(Boolean.valueOf(true)).withClusteringTargetPartitions(0).withInlineClusteringNumCommits(1).fromProperties(TestHoodieClientOnCopyOnWriteStorage.getDisabledRowWriterProperties()).build();
        try {
            this.testInsertAndClustering(clusteringConfig, true, true, false, FailingPreCommitValidator.class.getName(), COUNT_SQL_QUERY_FOR_VALIDATION, "");
            org.junit.jupiter.api.Assertions.fail((String)"expected pre-commit clustering validation to fail");
        }
        catch (HoodieValidationException hoodieValidationException) {
            // empty catch block
        }
    }

    @Test
    public void testClusteringInvalidConfigForSqlQueryValidator() throws Exception {
        HoodieClusteringConfig clusteringConfig = HoodieClusteringConfig.newBuilder().withClusteringMaxNumGroups(10).withClusteringTargetPartitions(0).withInlineClusteringNumCommits(1).withInlineClustering(Boolean.valueOf(true)).fromProperties(TestHoodieClientOnCopyOnWriteStorage.getDisabledRowWriterProperties()).build();
        try {
            this.testInsertAndClustering(clusteringConfig, false, true, false, SqlQueryEqualityPreCommitValidator.class.getName(), "", "");
            org.junit.jupiter.api.Assertions.fail((String)"expected pre-commit clustering validation to fail because sql query is not configured");
        }
        catch (HoodieValidationException hoodieValidationException) {
            // empty catch block
        }
    }

    @Test
    public void testClusteringInvalidConfigForSqlQuerySingleResultValidator() throws Exception {
        HoodieClusteringConfig clusteringConfig = HoodieClusteringConfig.newBuilder().withClusteringMaxNumGroups(10).withClusteringTargetPartitions(0).withInlineClusteringNumCommits(1).withInlineClustering(Boolean.valueOf(true)).fromProperties(TestHoodieClientOnCopyOnWriteStorage.getDisabledRowWriterProperties()).build();
        this.testInsertAndClustering(clusteringConfig, false, true, false, SqlQuerySingleResultPreCommitValidator.class.getName(), "", "select count(*) from <TABLE_NAME>#400");
    }

    @Test
    public void testClusteringInvalidConfigForSqlQuerySingleResultValidatorFailure() throws Exception {
        HoodieClusteringConfig clusteringConfig = HoodieClusteringConfig.newBuilder().withClusteringMaxNumGroups(10).withClusteringTargetPartitions(0).withInlineClusteringNumCommits(1).withInlineClustering(Boolean.valueOf(true)).fromProperties(TestHoodieClientOnCopyOnWriteStorage.getDisabledRowWriterProperties()).build();
        try {
            this.testInsertAndClustering(clusteringConfig, false, true, false, SqlQuerySingleResultPreCommitValidator.class.getName(), "", "select count(*) from <TABLE_NAME>#802");
            org.junit.jupiter.api.Assertions.fail((String)"expected pre-commit clustering validation to fail because of count mismatch. expect 400 rows, not 802");
        }
        catch (HoodieValidationException hoodieValidationException) {
            // empty catch block
        }
    }

    private List<HoodieRecord> testInsertAndClustering(HoodieClusteringConfig clusteringConfig, boolean populateMetaFields, boolean completeClustering) throws Exception {
        return this.testInsertAndClustering(clusteringConfig, populateMetaFields, completeClustering, false, "", "", "");
    }

    private List<HoodieRecord> testInsertAndClustering(HoodieClusteringConfig clusteringConfig, boolean populateMetaFields, boolean completeClustering, boolean assertSameFileIds, String validatorClasses, String sqlQueryForEqualityValidation, String sqlQueryForSingleResultValidation) throws Exception {
        Pair<Pair<List<HoodieRecord>, List<String>>, Set<HoodieFileGroupId>> allRecords = this.testInsertTwoBatches(populateMetaFields);
        this.testClustering(clusteringConfig, populateMetaFields, completeClustering, assertSameFileIds, validatorClasses, sqlQueryForEqualityValidation, sqlQueryForSingleResultValidation, allRecords);
        return (List)((Pair)allRecords.getLeft()).getLeft();
    }

    @ParameterizedTest
    @ValueSource(booleans={true, false})
    public void testFailWritesOnInlineTableServiceExceptions(boolean shouldFail) throws IOException {
        try {
            Properties properties = new Properties();
            properties.setProperty("hoodie.fail.writes.on.inline.table.service.exception", String.valueOf(shouldFail));
            properties.setProperty("hoodie.auto.commit", "false");
            properties.setProperty("hoodie.clustering.inline.max.commits", "1");
            properties.setProperty("hoodie.clustering.inline", "true");
            this.testInsertTwoBatches(true, "2015/03/16", properties, true);
            org.junit.jupiter.api.Assertions.assertFalse((boolean)shouldFail);
        }
        catch (HoodieException e) {
            org.junit.jupiter.api.Assertions.assertEquals((Object)CLUSTERING_FAILURE, (Object)e.getMessage());
            org.junit.jupiter.api.Assertions.assertTrue((boolean)shouldFail);
        }
    }

    private Pair<Pair<List<HoodieRecord>, List<String>>, Set<HoodieFileGroupId>> testInsertTwoBatches(boolean populateMetaFields) throws IOException {
        return this.testInsertTwoBatches(populateMetaFields, "2015/03/16");
    }

    private Pair<Pair<List<HoodieRecord>, List<String>>, Set<HoodieFileGroupId>> testInsertTwoBatches(boolean populateMetaFields, String partitionPath) throws IOException {
        return this.testInsertTwoBatches(populateMetaFields, partitionPath, new Properties(), false);
    }

    private Pair<Pair<List<HoodieRecord>, List<String>>, Set<HoodieFileGroupId>> testInsertTwoBatches(boolean populateMetaFields, String partitionPath, Properties props, boolean failInlineClustering) throws IOException {
        WriteClientBrokenClustering client;
        HoodieWriteConfig config = this.getSmallInsertWriteConfig(2000, "{\"type\": \"record\",\"name\": \"triprec\",\"fields\": [ {\"name\": \"timestamp\",\"type\": \"long\"},{\"name\": \"_row_key\", \"type\": \"string\"},{\"name\": \"partition_path\", \"type\": [\"null\", \"string\"], \"default\": null },{\"name\": \"trip_type\", \"type\": {\"type\": \"enum\", \"name\": \"TripType\", \"symbols\": [\"UNKNOWN\", \"UBERX\", \"BLACK\"], \"default\": \"UNKNOWN\"}},{\"name\": \"rider\", \"type\": \"string\"},{\"name\": \"driver\", \"type\": \"string\"},{\"name\": \"begin_lat\", \"type\": \"double\"},{\"name\": \"begin_lon\", \"type\": \"double\"},{\"name\": \"end_lat\", \"type\": \"double\"},{\"name\": \"end_lon\", \"type\": \"double\"},{\"name\": \"distance_in_meters\", \"type\": \"int\"},{\"name\": \"seconds_since_epoch\", \"type\": \"long\"},{\"name\": \"weight\", \"type\": \"float\"},{\"name\": \"nation\", \"type\": \"bytes\"},{\"name\":\"current_date\",\"type\": {\"type\": \"int\", \"logicalType\": \"date\"}},{\"name\":\"current_ts\",\"type\": {\"type\": \"long\"}},{\"name\":\"height\",\"type\":{\"type\":\"fixed\",\"name\":\"abc\",\"size\":5,\"logicalType\":\"decimal\",\"precision\":10,\"scale\":6}},{\"name\": \"city_to_state\", \"type\": {\"type\": \"map\", \"values\": \"string\"}},{\"name\": \"fare\",\"type\": {\"type\":\"record\", \"name\":\"fare\",\"fields\": [{\"name\": \"amount\",\"type\": \"double\"},{\"name\": \"currency\", \"type\": \"string\"}]}},{\"name\": \"tip_history\", \"default\": [], \"type\": {\"type\": \"array\", \"default\": [], \"items\": {\"type\": \"record\", \"default\": null, \"name\": \"tip_history\", \"fields\": [{\"name\": \"amount\", \"type\": \"double\"}, {\"name\": \"currency\", \"type\": \"string\"}]}}},{\"name\": \"_hoodie_is_deleted\", \"type\": \"boolean\", \"default\": false} ]}", 10L, false, populateMetaFields, populateMetaFields ? props : TestHoodieClientOnCopyOnWriteStorage.getPropertiesForKeyGen());
        if (failInlineClustering) {
            if (null != this.writeClient) {
                this.writeClient.close();
                this.writeClient = null;
            }
            client = new WriteClientBrokenClustering((HoodieEngineContext)this.context, config);
        } else {
            client = this.getHoodieWriteClient(config);
        }
        this.dataGen = new HoodieTestDataGenerator(new String[]{partitionPath});
        String commitTime1 = HoodieActiveTimeline.createNewInstantTime();
        List records1 = this.dataGen.generateInserts(commitTime1, Integer.valueOf(200));
        List<WriteStatus> statuses1 = this.writeAndVerifyBatch(client, records1, commitTime1, populateMetaFields, failInlineClustering);
        Set<HoodieFileGroupId> fileIds1 = this.getFileGroupIdsFromWriteStatus(statuses1);
        String commitTime2 = HoodieActiveTimeline.createNewInstantTime();
        List records2 = this.dataGen.generateInserts(commitTime2, Integer.valueOf(200));
        List<WriteStatus> statuses2 = this.writeAndVerifyBatch(client, records2, commitTime2, populateMetaFields, failInlineClustering);
        client.close();
        Set<HoodieFileGroupId> fileIds2 = this.getFileGroupIdsFromWriteStatus(statuses2);
        HashSet<HoodieFileGroupId> fileIdsUnion = new HashSet<HoodieFileGroupId>(fileIds1);
        fileIdsUnion.addAll(fileIds2);
        HashSet<HoodieFileGroupId> fileIdIntersection = new HashSet<HoodieFileGroupId>(fileIds1);
        fileIdIntersection.retainAll(fileIds2);
        org.junit.jupiter.api.Assertions.assertEquals((int)0, (int)fileIdIntersection.size());
        return Pair.of((Object)Pair.of(Stream.concat(records1.stream(), records2.stream()).collect(Collectors.toList()), Arrays.asList(commitTime1, commitTime2)), fileIdsUnion);
    }

    private void testClustering(HoodieClusteringConfig clusteringConfig, boolean populateMetaFields, boolean completeClustering, boolean assertSameFileIds, String validatorClasses, String sqlQueryForEqualityValidation, String sqlQueryForSingleResultValidation, Pair<Pair<List<HoodieRecord>, List<String>>, Set<HoodieFileGroupId>> allRecords) throws IOException {
        HoodieWriteConfig config = this.getConfigBuilder(HoodieFailedWritesCleaningPolicy.LAZY).withAutoCommit(false).withClusteringConfig(clusteringConfig).withProps((Map)TestHoodieClientOnCopyOnWriteStorage.getPropertiesForKeyGen()).build();
        HoodieWriteMetadata<JavaRDD<WriteStatus>> clusterMetadata = this.performClustering(clusteringConfig, populateMetaFields, completeClustering, validatorClasses, sqlQueryForEqualityValidation, sqlQueryForSingleResultValidation, (Pair<List<HoodieRecord>, List<String>>)((Pair)allRecords.getLeft()));
        if (assertSameFileIds) {
            Set replacedFileIds = ((List)clusterMetadata.getWriteStats().get()).stream().map(s -> new HoodieFileGroupId(s.getPartitionPath(), s.getFileId())).collect(Collectors.toSet());
            Set insertedFileIds = (Set)allRecords.getRight();
            org.junit.jupiter.api.Assertions.assertEquals((Object)insertedFileIds, replacedFileIds);
        }
        if (completeClustering) {
            String clusteringCommitTime = ((HoodieInstant)this.metaClient.reloadActiveTimeline().getCompletedReplaceTimeline().getReverseOrderedInstants().findFirst().get()).getTimestamp();
            this.verifyRecordsWritten(clusteringCommitTime, populateMetaFields, (List)((Pair)allRecords.getLeft()).getLeft(), ((JavaRDD)clusterMetadata.getWriteStatuses()).collect(), config);
        }
    }

    private HoodieWriteMetadata<JavaRDD<WriteStatus>> performClustering(HoodieClusteringConfig clusteringConfig, boolean populateMetaFields, boolean completeClustering, String validatorClasses, String sqlQueryForEqualityValidation, String sqlQueryForSingleResultValidation, Pair<List<HoodieRecord>, List<String>> allRecords) throws IOException {
        HoodiePreCommitValidatorConfig validatorConfig = HoodiePreCommitValidatorConfig.newBuilder().withPreCommitValidator(StringUtils.nullToEmpty((String)validatorClasses)).withPrecommitValidatorEqualitySqlQueries(sqlQueryForEqualityValidation).withPrecommitValidatorSingleResultSqlQueries(sqlQueryForSingleResultValidation).build();
        HoodieWriteConfig config = this.getConfigBuilder().withAutoCommit(false).withPreCommitValidatorConfig(validatorConfig).withProps((Map)(populateMetaFields ? new Properties() : TestHoodieClientOnCopyOnWriteStorage.getPropertiesForKeyGen())).withClusteringConfig(clusteringConfig).build();
        SparkRDDWriteClient client = this.getHoodieWriteClient(config);
        String clusteringCommitTime = client.scheduleClustering(Option.empty()).get().toString();
        HoodieWriteMetadata clusterMetadata = client.cluster(clusteringCommitTime, completeClustering);
        if (config.populateMetaFields()) {
            this.verifyRecordsWrittenWithPreservedMetadata(new HashSet<String>((Collection)allRecords.getRight()), (List)allRecords.getLeft(), ((JavaRDD)clusterMetadata.getWriteStatuses()).collect());
        } else {
            this.verifyRecordsWritten(clusteringCommitTime, populateMetaFields, (List)allRecords.getLeft(), ((JavaRDD)clusterMetadata.getWriteStatuses()).collect(), config);
        }
        HashSet replacedFileIds = new HashSet();
        clusterMetadata.getPartitionToReplaceFileIds().entrySet().forEach(partitionFiles -> ((List)partitionFiles.getValue()).stream().forEach(file -> replacedFileIds.add(new HoodieFileGroupId((String)partitionFiles.getKey(), file))));
        return clusterMetadata;
    }

    private Set<HoodieFileGroupId> getFileGroupIdsFromWriteStatus(List<WriteStatus> statuses) {
        return statuses.stream().map(s -> new HoodieFileGroupId(s.getPartitionPath(), s.getFileId())).collect(Collectors.toSet());
    }

    @ParameterizedTest
    @MethodSource(value={"populateMetaFieldsParams"})
    public void testInsertOverwritePartitionHandlingWithMoreRecords(boolean populateMetaFields) throws Exception {
        this.verifyInsertOverwritePartitionHandling(1000, 3000, populateMetaFields);
    }

    @Test
    public void testInsertOverwritePartitionHandlingWithFewerRecords() throws Exception {
        this.verifyInsertOverwritePartitionHandling(3000, 1000, true);
    }

    @Test
    public void testInsertOverwritePartitionHandlingWithSimilarNumberOfRecords() throws Exception {
        this.verifyInsertOverwritePartitionHandling(3000, 3000, true);
    }

    private void verifyInsertOverwritePartitionHandling(int batch1RecordsCount, int batch2RecordsCount, boolean populateMetaFields) throws Exception {
        String testPartitionPath = "americas";
        HoodieWriteConfig config = this.getSmallInsertWriteConfig(2000, "{\"type\": \"record\",\"name\": \"triprec\",\"fields\": [ {\"name\": \"timestamp\",\"type\": \"long\"},{\"name\": \"_row_key\", \"type\": \"string\"},{\"name\": \"partition_path\", \"type\": [\"null\", \"string\"], \"default\": null },{\"name\": \"trip_type\", \"type\": {\"type\": \"enum\", \"name\": \"TripType\", \"symbols\": [\"UNKNOWN\", \"UBERX\", \"BLACK\"], \"default\": \"UNKNOWN\"}},{\"name\": \"rider\", \"type\": \"string\"},{\"name\": \"driver\", \"type\": \"string\"},{\"name\": \"begin_lat\", \"type\": \"double\"},{\"name\": \"begin_lon\", \"type\": \"double\"},{\"name\": \"end_lat\", \"type\": \"double\"},{\"name\": \"end_lon\", \"type\": \"double\"},{\"name\": \"distance_in_meters\", \"type\": \"int\"},{\"name\": \"seconds_since_epoch\", \"type\": \"long\"},{\"name\": \"weight\", \"type\": \"float\"},{\"name\": \"nation\", \"type\": \"bytes\"},{\"name\":\"current_date\",\"type\": {\"type\": \"int\", \"logicalType\": \"date\"}},{\"name\":\"current_ts\",\"type\": {\"type\": \"long\"}},{\"name\":\"height\",\"type\":{\"type\":\"fixed\",\"name\":\"abc\",\"size\":5,\"logicalType\":\"decimal\",\"precision\":10,\"scale\":6}},{\"name\": \"city_to_state\", \"type\": {\"type\": \"map\", \"values\": \"string\"}},{\"name\": \"fare\",\"type\": {\"type\":\"record\", \"name\":\"fare\",\"fields\": [{\"name\": \"amount\",\"type\": \"double\"},{\"name\": \"currency\", \"type\": \"string\"}]}},{\"name\": \"tip_history\", \"default\": [], \"type\": {\"type\": \"array\", \"default\": [], \"items\": {\"type\": \"record\", \"default\": null, \"name\": \"tip_history\", \"fields\": [{\"name\": \"amount\", \"type\": \"double\"}, {\"name\": \"currency\", \"type\": \"string\"}]}}},{\"name\": \"_hoodie_is_deleted\", \"type\": \"boolean\", \"default\": false} ]}", this.dataGen.getEstimatedFileSizeInBytes(150), populateMetaFields, populateMetaFields ? new Properties() : TestHoodieClientOnCopyOnWriteStorage.getPropertiesForKeyGen());
        SparkRDDWriteClient client = this.getHoodieWriteClient(config);
        this.dataGen = new HoodieTestDataGenerator(new String[]{"americas"});
        String commit1 = "001";
        List statuses = this.writeAndVerifyBatch(client, this.dataGen.generateInserts(commit1, Integer.valueOf(batch1RecordsCount)), commit1, populateMetaFields);
        Set<String> batch1Buckets = this.getFileIdsFromWriteStatus(statuses);
        String commitTime2 = "002";
        client.startCommitWithTime(commitTime2, "replacecommit");
        List inserts2 = this.dataGen.generateInserts(commitTime2, Integer.valueOf(batch2RecordsCount));
        ArrayList insertsAndUpdates2 = new ArrayList();
        insertsAndUpdates2.addAll(inserts2);
        JavaRDD insertAndUpdatesRDD2 = this.jsc.parallelize(insertsAndUpdates2, 2);
        HoodieWriteResult writeResult = client.insertOverwrite(insertAndUpdatesRDD2, commitTime2);
        statuses = writeResult.getWriteStatuses().collect();
        Assertions.assertNoWriteErrors((List)statuses);
        org.junit.jupiter.api.Assertions.assertEquals(batch1Buckets, new HashSet((Collection)writeResult.getPartitionToReplaceFileIds().get("americas")));
        this.verifyRecordsWritten(commitTime2, populateMetaFields, inserts2, statuses, config);
    }

    private Set<String> getFileIdsFromWriteStatus(List<WriteStatus> statuses) {
        return statuses.stream().map(s -> s.getFileId()).collect(Collectors.toSet());
    }

    @ParameterizedTest
    @MethodSource(value={"populateMetaFieldsParams"})
    public void verifyDeletePartitionsHandlingWithFewerRecordsFirstPartition(boolean populateMetaFields) throws Exception {
        this.verifyDeletePartitionsHandling(1000, 3000, 3000, populateMetaFields);
    }

    @Test
    public void verifyDeletePartitionsHandlingWithSimilarNumberOfRecords() throws Exception {
        this.verifyDeletePartitionsHandling(3000, 3000, 3000, true);
    }

    @Test
    public void verifyDeletePartitionsHandlingHandlingWithFewerRecordsSecondThirdPartition() throws Exception {
        this.verifyDeletePartitionsHandling(3000, 1000, 1000, true);
    }

    private Set<String> insertPartitionRecordsWithCommit(SparkRDDWriteClient client, int recordsCount, String commitTime1, String partitionPath) throws IOException {
        client.startCommitWithTime(commitTime1);
        List inserts1 = this.dataGen.generateInsertsForPartition(commitTime1, Integer.valueOf(recordsCount), partitionPath);
        JavaRDD insertRecordsRDD1 = this.jsc.parallelize(inserts1, 2);
        List statuses = client.upsert(insertRecordsRDD1, commitTime1).collect();
        Assertions.assertNoWriteErrors((List)statuses);
        Set<String> batchBuckets = statuses.stream().map(s -> s.getFileId()).collect(Collectors.toSet());
        this.verifyRecordsWritten(commitTime1, true, inserts1, statuses, client.getConfig());
        return batchBuckets;
    }

    private Set<String> deletePartitionWithCommit(SparkRDDWriteClient client, String commitTime, List<String> deletePartitionPath) {
        client.startCommitWithTime(commitTime, "replacecommit");
        HoodieWriteResult writeResult = client.deletePartitions(deletePartitionPath, commitTime);
        Set<String> deletePartitionReplaceFileIds = writeResult.getPartitionToReplaceFileIds().entrySet().stream().flatMap(entry -> ((List)entry.getValue()).stream()).collect(Collectors.toSet());
        return deletePartitionReplaceFileIds;
    }

    private void verifyDeletePartitionsHandling(int batch1RecordsCount, int batch2RecordsCount, int batch3RecordsCount, boolean populateMetaFields) throws Exception {
        HoodieWriteConfig config = this.getSmallInsertWriteConfig(2000, "{\"type\": \"record\",\"name\": \"triprec\",\"fields\": [ {\"name\": \"timestamp\",\"type\": \"long\"},{\"name\": \"_row_key\", \"type\": \"string\"},{\"name\": \"partition_path\", \"type\": [\"null\", \"string\"], \"default\": null },{\"name\": \"trip_type\", \"type\": {\"type\": \"enum\", \"name\": \"TripType\", \"symbols\": [\"UNKNOWN\", \"UBERX\", \"BLACK\"], \"default\": \"UNKNOWN\"}},{\"name\": \"rider\", \"type\": \"string\"},{\"name\": \"driver\", \"type\": \"string\"},{\"name\": \"begin_lat\", \"type\": \"double\"},{\"name\": \"begin_lon\", \"type\": \"double\"},{\"name\": \"end_lat\", \"type\": \"double\"},{\"name\": \"end_lon\", \"type\": \"double\"},{\"name\": \"distance_in_meters\", \"type\": \"int\"},{\"name\": \"seconds_since_epoch\", \"type\": \"long\"},{\"name\": \"weight\", \"type\": \"float\"},{\"name\": \"nation\", \"type\": \"bytes\"},{\"name\":\"current_date\",\"type\": {\"type\": \"int\", \"logicalType\": \"date\"}},{\"name\":\"current_ts\",\"type\": {\"type\": \"long\"}},{\"name\":\"height\",\"type\":{\"type\":\"fixed\",\"name\":\"abc\",\"size\":5,\"logicalType\":\"decimal\",\"precision\":10,\"scale\":6}},{\"name\": \"city_to_state\", \"type\": {\"type\": \"map\", \"values\": \"string\"}},{\"name\": \"fare\",\"type\": {\"type\":\"record\", \"name\":\"fare\",\"fields\": [{\"name\": \"amount\",\"type\": \"double\"},{\"name\": \"currency\", \"type\": \"string\"}]}},{\"name\": \"tip_history\", \"default\": [], \"type\": {\"type\": \"array\", \"default\": [], \"items\": {\"type\": \"record\", \"default\": null, \"name\": \"tip_history\", \"fields\": [{\"name\": \"amount\", \"type\": \"double\"}, {\"name\": \"currency\", \"type\": \"string\"}]}}},{\"name\": \"_hoodie_is_deleted\", \"type\": \"boolean\", \"default\": false} ]}", this.dataGen.getEstimatedFileSizeInBytes(150), populateMetaFields, populateMetaFields ? new Properties() : TestHoodieClientOnCopyOnWriteStorage.getPropertiesForKeyGen());
        SparkRDDWriteClient client = this.getHoodieWriteClient(config);
        this.dataGen = new HoodieTestDataGenerator();
        String commitTime1 = "001";
        Set<String> batch1Buckets = this.insertPartitionRecordsWithCommit(client, batch1RecordsCount, commitTime1, "2016/03/15");
        String commitTime2 = "002";
        Set<String> batch2Buckets = this.insertPartitionRecordsWithCommit(client, batch2RecordsCount, commitTime2, "2015/03/16");
        String commitTime3 = "003";
        Set<String> batch3Buckets = this.insertPartitionRecordsWithCommit(client, batch3RecordsCount, commitTime3, "2015/03/17");
        String commitTime4 = "004";
        Set<String> deletePartitionReplaceFileIds1 = this.deletePartitionWithCommit(client, commitTime4, Arrays.asList("2016/03/15"));
        org.junit.jupiter.api.Assertions.assertEquals(batch1Buckets, deletePartitionReplaceFileIds1);
        List<HoodieBaseFile> baseFiles = HoodieClientTestUtils.getLatestBaseFiles(this.basePath, this.storage, String.format("%s/%s/*", this.basePath, "2016/03/15"));
        org.junit.jupiter.api.Assertions.assertEquals((int)0, (int)baseFiles.size());
        baseFiles = HoodieClientTestUtils.getLatestBaseFiles(this.basePath, this.storage, String.format("%s/%s/*", this.basePath, "2015/03/16"));
        org.junit.jupiter.api.Assertions.assertTrue((baseFiles.size() > 0 ? 1 : 0) != 0);
        baseFiles = HoodieClientTestUtils.getLatestBaseFiles(this.basePath, this.storage, String.format("%s/%s/*", this.basePath, "2015/03/17"));
        org.junit.jupiter.api.Assertions.assertTrue((baseFiles.size() > 0 ? 1 : 0) != 0);
        String commitTime5 = "005";
        Set<String> deletePartitionReplaceFileIds2 = this.deletePartitionWithCommit(client, commitTime5, Arrays.asList("2015/03/16", "2015/03/17"));
        HashSet<String> expectedFileId = new HashSet<String>();
        expectedFileId.addAll(batch2Buckets);
        expectedFileId.addAll(batch3Buckets);
        org.junit.jupiter.api.Assertions.assertEquals(expectedFileId, deletePartitionReplaceFileIds2);
        baseFiles = HoodieClientTestUtils.getLatestBaseFiles(this.basePath, this.storage, String.format("%s/%s/*", this.basePath, "2016/03/15"), String.format("%s/%s/*", this.basePath, "2015/03/16"), String.format("%s/%s/*", this.basePath, "2015/03/17"));
        org.junit.jupiter.api.Assertions.assertEquals((int)0, (int)baseFiles.size());
    }

    private void verifyRecordsWritten(String commitTime, boolean populateMetadataField, List<HoodieRecord> expectedRecords, List<WriteStatus> allStatus, HoodieWriteConfig config) throws IOException {
        ArrayList<GenericRecord> records = new ArrayList<GenericRecord>();
        Set<String> expectedKeys = this.verifyRecordKeys(expectedRecords, allStatus, records);
        if (config.populateMetaFields()) {
            for (GenericRecord record : records) {
                String recordKey = record.get(HoodieRecord.RECORD_KEY_METADATA_FIELD).toString();
                org.junit.jupiter.api.Assertions.assertEquals((Object)commitTime, (Object)record.get(HoodieRecord.COMMIT_TIME_METADATA_FIELD).toString());
                org.junit.jupiter.api.Assertions.assertTrue((boolean)expectedKeys.contains(recordKey));
            }
        } else {
            KeyGenerator keyGenerator = HoodieSparkKeyGeneratorFactory.createKeyGenerator((TypedProperties)new TypedProperties((Properties)config.getProps()));
            for (GenericRecord record : records) {
                String recordKey = keyGenerator.getKey(record).getRecordKey();
                if (!populateMetadataField) {
                    org.junit.jupiter.api.Assertions.assertNull((Object)record.get(HoodieRecord.COMMIT_TIME_METADATA_FIELD));
                }
                org.junit.jupiter.api.Assertions.assertTrue((boolean)expectedKeys.contains(recordKey));
            }
        }
    }

    @NotNull
    private Set<String> verifyRecordKeys(List<HoodieRecord> expectedRecords, List<WriteStatus> allStatus, List<GenericRecord> records) {
        for (WriteStatus status : allStatus) {
            StoragePath filePath = new StoragePath(this.basePath, status.getStat().getPath());
            records.addAll(HoodieIOFactory.getIOFactory((HoodieStorage)this.metaClient.getStorage()).getFileFormatUtils(this.metaClient.getTableConfig().getBaseFileFormat()).readAvroRecords(this.storage, filePath));
        }
        Set expectedKeys = Transformations.recordsToRecordKeySet(expectedRecords);
        org.junit.jupiter.api.Assertions.assertEquals((int)records.size(), (int)expectedKeys.size());
        return expectedKeys;
    }

    private void verifyRecordsWrittenWithPreservedMetadata(Set<String> commitTimes, List<HoodieRecord> expectedRecords, List<WriteStatus> allStatus) {
        ArrayList<GenericRecord> records = new ArrayList<GenericRecord>();
        Set<String> expectedKeys = this.verifyRecordKeys(expectedRecords, allStatus, records);
        Map<String, List<GenericRecord>> recordsByCommitTime = records.stream().collect(Collectors.groupingBy(r -> r.get(HoodieRecord.COMMIT_TIME_METADATA_FIELD).toString()));
        org.junit.jupiter.api.Assertions.assertTrue((boolean)commitTimes.containsAll(recordsByCommitTime.keySet()));
        Set expectedFileIds = allStatus.stream().map(WriteStatus::getFileId).collect(Collectors.toSet());
        for (GenericRecord record : records) {
            String recordKey = record.get(HoodieRecord.RECORD_KEY_METADATA_FIELD).toString();
            org.junit.jupiter.api.Assertions.assertTrue((boolean)expectedKeys.contains(recordKey));
            String fileName = record.get(HoodieRecord.FILENAME_METADATA_FIELD).toString();
            org.junit.jupiter.api.Assertions.assertTrue((boolean)expectedFileIds.contains(FSUtils.getFileId((String)fileName)));
        }
    }

    private List<WriteStatus> writeAndVerifyBatch(SparkRDDWriteClient client, List<HoodieRecord> inserts, String commitTime, boolean populateMetaFields) throws IOException {
        return this.writeAndVerifyBatch(client, inserts, commitTime, populateMetaFields, false);
    }

    private List<WriteStatus> writeAndVerifyBatch(SparkRDDWriteClient client, List<HoodieRecord> inserts, String commitTime, boolean populateMetaFields, boolean autoCommitOff) throws IOException {
        client.startCommitWithTime(commitTime);
        JavaRDD insertRecordsRDD1 = this.jsc.parallelize(inserts, 2);
        JavaRDD statusRDD = client.upsert(insertRecordsRDD1, commitTime);
        if (autoCommitOff) {
            client.commit(commitTime, (Object)statusRDD);
        }
        List statuses = statusRDD.collect();
        Assertions.assertNoWriteErrors((List)statuses);
        this.verifyRecordsWritten(commitTime, populateMetaFields, inserts, statuses, client.getConfig());
        return statuses;
    }

    private Pair<Set<String>, List<HoodieRecord>> testUpdates(String instantTime, SparkRDDWriteClient client, int sizeToInsertAndUpdate, int expectedTotalRecords) throws IOException {
        client.startCommitWithTime(instantTime);
        List inserts = this.dataGen.generateInserts(instantTime, Integer.valueOf(sizeToInsertAndUpdate));
        Set keys = Transformations.recordsToRecordKeySet((List)inserts);
        ArrayList insertsAndUpdates = new ArrayList();
        insertsAndUpdates.addAll(inserts);
        insertsAndUpdates.addAll(this.dataGen.generateUpdates(instantTime, inserts));
        JavaRDD insertAndUpdatesRDD = this.jsc.parallelize(insertsAndUpdates, 1);
        List statuses = client.upsert(insertAndUpdatesRDD, instantTime).collect();
        Assertions.assertNoWriteErrors((List)statuses);
        String[] fullPartitionPaths = new String[this.dataGen.getPartitionPaths().length];
        for (int i = 0; i < fullPartitionPaths.length; ++i) {
            fullPartitionPaths[i] = String.format("%s/%s/*", this.basePath, this.dataGen.getPartitionPaths()[i]);
        }
        org.junit.jupiter.api.Assertions.assertEquals((long)expectedTotalRecords, (long)HoodieClientTestUtils.read(this.jsc, this.basePath, this.sqlContext, this.storage, fullPartitionPaths).count(), (String)("Must contain " + expectedTotalRecords + " records"));
        return Pair.of((Object)keys, (Object)inserts);
    }

    private void testDeletes(SparkRDDWriteClient client, List<HoodieRecord> previousRecords, int sizeToDelete, String existingFile, String instantTime, int expectedRecords, List<String> keys) {
        client.startCommitWithTime(instantTime);
        List hoodieKeysToDelete = Transformations.randomSelectAsHoodieKeys(previousRecords, (int)sizeToDelete);
        JavaRDD deleteKeys = this.jsc.parallelize(hoodieKeysToDelete, 1);
        List statuses = client.delete(deleteKeys, instantTime).collect();
        Assertions.assertNoWriteErrors((List)statuses);
        org.junit.jupiter.api.Assertions.assertEquals((int)1, (int)statuses.size(), (String)"Just 1 file needs to be added.");
        org.junit.jupiter.api.Assertions.assertEquals((Object)existingFile, (Object)((WriteStatus)statuses.get(0)).getFileId(), (String)"Existing file should be expanded");
        String[] fullPartitionPaths = new String[this.dataGen.getPartitionPaths().length];
        for (int i = 0; i < fullPartitionPaths.length; ++i) {
            fullPartitionPaths[i] = String.format("%s/%s/*", this.basePath, this.dataGen.getPartitionPaths()[i]);
        }
        org.junit.jupiter.api.Assertions.assertEquals((long)expectedRecords, (long)HoodieClientTestUtils.read(this.jsc, this.basePath, this.sqlContext, this.storage, fullPartitionPaths).count(), (String)("Must contain " + expectedRecords + " records"));
        StoragePath newFile = new StoragePath(this.basePath, ((WriteStatus)statuses.get(0)).getStat().getPath());
        org.junit.jupiter.api.Assertions.assertEquals((int)expectedRecords, (int)HoodieIOFactory.getIOFactory((HoodieStorage)this.metaClient.getStorage()).getFileFormatUtils(this.metaClient.getTableConfig().getBaseFileFormat()).readRowKeys(this.storage, newFile).size(), (String)"file should contain 110 records");
        List records = HoodieIOFactory.getIOFactory((HoodieStorage)this.metaClient.getStorage()).getFileFormatUtils(this.metaClient.getTableConfig().getBaseFileFormat()).readAvroRecords(this.storage, newFile);
        for (GenericRecord record : records) {
            String recordKey = record.get(HoodieRecord.RECORD_KEY_METADATA_FIELD).toString();
            org.junit.jupiter.api.Assertions.assertTrue((boolean)keys.contains(recordKey), (String)("key expected to be part of " + instantTime));
            org.junit.jupiter.api.Assertions.assertFalse((boolean)hoodieKeysToDelete.contains(recordKey), (String)"Key deleted");
        }
    }

    @ParameterizedTest
    @MethodSource(value={"populateMetaFieldsParams"})
    public void testDeletesWithoutInserts(boolean populateMetaFields) {
        String testPartitionPath = "2016/09/26";
        int insertSplitLimit = 100;
        HoodieWriteConfig config = this.getSmallInsertWriteConfig(100, "{\"type\": \"record\",\"name\": \"triprec\",\"fields\": [ {\"name\": \"timestamp\",\"type\": \"long\"},{\"name\": \"_row_key\", \"type\": \"string\"},{\"name\": \"partition_path\", \"type\": [\"null\", \"string\"], \"default\": null },{\"name\": \"trip_type\", \"type\": {\"type\": \"enum\", \"name\": \"TripType\", \"symbols\": [\"UNKNOWN\", \"UBERX\", \"BLACK\"], \"default\": \"UNKNOWN\"}},{\"name\": \"rider\", \"type\": \"string\"},{\"name\": \"driver\", \"type\": \"string\"},{\"name\": \"begin_lat\", \"type\": \"double\"},{\"name\": \"begin_lon\", \"type\": \"double\"},{\"name\": \"end_lat\", \"type\": \"double\"},{\"name\": \"end_lon\", \"type\": \"double\"},{\"name\": \"distance_in_meters\", \"type\": \"int\"},{\"name\": \"seconds_since_epoch\", \"type\": \"long\"},{\"name\": \"weight\", \"type\": \"float\"},{\"name\": \"nation\", \"type\": \"bytes\"},{\"name\":\"current_date\",\"type\": {\"type\": \"int\", \"logicalType\": \"date\"}},{\"name\":\"current_ts\",\"type\": {\"type\": \"long\"}},{\"name\":\"height\",\"type\":{\"type\":\"fixed\",\"name\":\"abc\",\"size\":5,\"logicalType\":\"decimal\",\"precision\":10,\"scale\":6}},{\"name\": \"city_to_state\", \"type\": {\"type\": \"map\", \"values\": \"string\"}},{\"name\": \"fare\",\"type\": {\"type\":\"record\", \"name\":\"fare\",\"fields\": [{\"name\": \"amount\",\"type\": \"double\"},{\"name\": \"currency\", \"type\": \"string\"}]}},{\"name\": \"tip_history\", \"default\": [], \"type\": {\"type\": \"array\", \"default\": [], \"items\": {\"type\": \"record\", \"default\": null, \"name\": \"tip_history\", \"fields\": [{\"name\": \"amount\", \"type\": \"double\"}, {\"name\": \"currency\", \"type\": \"string\"}]}}},{\"name\": \"_hoodie_is_deleted\", \"type\": \"boolean\", \"default\": false} ]}", this.dataGen.getEstimatedFileSizeInBytes(150), populateMetaFields, populateMetaFields ? new Properties() : TestHoodieClientOnCopyOnWriteStorage.getPropertiesForKeyGen());
        this.dataGen = new HoodieTestDataGenerator(new String[]{"2016/09/26"});
        SparkRDDWriteClient client = this.getHoodieWriteClient(config);
        String commitTime1 = "001";
        client.startCommitWithTime(commitTime1);
        List dummyInserts = this.dataGen.generateInserts(commitTime1, Integer.valueOf(20));
        List hoodieKeysToDelete = Transformations.randomSelectAsHoodieKeys((List)dummyInserts, (int)20);
        JavaRDD deleteKeys = this.jsc.parallelize(hoodieKeysToDelete, 1);
        client.delete(deleteKeys, commitTime1).collect();
    }

    @Test
    public void testCommitWritesRelativePaths() throws Exception {
        HoodieWriteConfig.Builder cfgBuilder = this.getConfigBuilder().withAutoCommit(false);
        this.addConfigsForPopulateMetaFields(cfgBuilder, true);
        try (SparkRDDWriteClient client = this.getHoodieWriteClient(cfgBuilder.build());){
            HoodieTableMetaClient metaClient = this.createMetaClient(this.basePath);
            HoodieSparkTable table = HoodieSparkTable.create((HoodieWriteConfig)cfgBuilder.build(), (HoodieEngineContext)this.context, (HoodieTableMetaClient)metaClient);
            String instantTime = "000";
            client.startCommitWithTime(instantTime);
            List records = this.dataGen.generateInserts(instantTime, Integer.valueOf(200));
            JavaRDD writeRecords = this.jsc.parallelize(records, 1);
            JavaRDD result = client.bulkInsert(writeRecords, instantTime);
            org.junit.jupiter.api.Assertions.assertTrue((boolean)client.commit(instantTime, (Object)result), (String)"Commit should succeed");
            org.junit.jupiter.api.Assertions.assertTrue((boolean)this.testTable.commitExists(instantTime), (String)"After explicit commit, commit file should be created");
            String actionType = metaClient.getCommitActionType();
            HoodieInstant commitInstant = new HoodieInstant(false, actionType, instantTime);
            HoodieTimeline commitTimeline = metaClient.getCommitTimeline().filterCompletedInstants();
            HoodieCommitMetadata commitMetadata = (HoodieCommitMetadata)HoodieCommitMetadata.fromBytes((byte[])((byte[])commitTimeline.getInstantDetails(commitInstant).get()), HoodieCommitMetadata.class);
            String basePath = table.getMetaClient().getBasePath();
            Collection commitPathNames = commitMetadata.getFileIdAndFullPaths(new StoragePath(basePath)).values();
            try (InputStream inputStream = this.storage.open(this.testTable.getCommitFilePath(instantTime));){
                String everything = FileIOUtils.readAsUTFString((InputStream)inputStream);
                HoodieCommitMetadata metadata = (HoodieCommitMetadata)HoodieCommitMetadata.fromJsonString((String)everything, HoodieCommitMetadata.class);
                HashMap paths = metadata.getFileIdAndFullPaths(new StoragePath(basePath));
                for (String pathName : paths.values()) {
                    org.junit.jupiter.api.Assertions.assertTrue((boolean)commitPathNames.contains(pathName));
                }
            }
        }
    }

    @ParameterizedTest
    @MethodSource(value={"populateMetaFieldsParams"})
    public void testMetadataStatsOnCommit(boolean populateMetaFields) throws Exception {
        HoodieWriteConfig.Builder cfgBuilder = this.getConfigBuilder().withAutoCommit(false);
        this.addConfigsForPopulateMetaFields(cfgBuilder, populateMetaFields);
        HoodieWriteConfig cfg = cfgBuilder.build();
        SparkRDDWriteClient client = this.getHoodieWriteClient(cfg);
        String instantTime0 = "000";
        client.startCommitWithTime(instantTime0);
        List records0 = this.dataGen.generateInserts(instantTime0, Integer.valueOf(200));
        JavaRDD writeRecords0 = this.jsc.parallelize(records0, 1);
        JavaRDD result0 = client.bulkInsert(writeRecords0, instantTime0);
        org.junit.jupiter.api.Assertions.assertTrue((boolean)client.commit(instantTime0, (Object)result0), (String)"Commit should succeed");
        org.junit.jupiter.api.Assertions.assertTrue((boolean)this.testTable.commitExists(instantTime0), (String)"After explicit commit, commit file should be created");
        try (InputStream inputStream = this.storage.open(this.testTable.getCommitFilePath(instantTime0));){
            String everything = FileIOUtils.readAsUTFString((InputStream)inputStream);
            HoodieCommitMetadata metadata = (HoodieCommitMetadata)HoodieCommitMetadata.fromJsonString((String)everything, HoodieCommitMetadata.class);
            int inserts = 0;
            for (Map.Entry pstat : metadata.getPartitionToWriteStats().entrySet()) {
                for (HoodieWriteStat stat : (List)pstat.getValue()) {
                    inserts = (int)((long)inserts + stat.getNumInserts());
                }
            }
            org.junit.jupiter.api.Assertions.assertEquals((int)200, (int)inserts);
        }
        String instantTime1 = "001";
        client.startCommitWithTime(instantTime1);
        List records1 = this.dataGen.generateUpdates(instantTime1, records0);
        JavaRDD writeRecords1 = this.jsc.parallelize(records1, 1);
        JavaRDD result1 = client.upsert(writeRecords1, instantTime1);
        org.junit.jupiter.api.Assertions.assertTrue((boolean)client.commit(instantTime1, (Object)result1), (String)"Commit should succeed");
        org.junit.jupiter.api.Assertions.assertTrue((boolean)this.testTable.commitExists(instantTime1), (String)"After explicit commit, commit file should be created");
        InputStream inputStream = this.storage.open(this.testTable.getCommitFilePath(instantTime1));
        Object object = null;
        try {
            String everything = FileIOUtils.readAsUTFString((InputStream)inputStream);
            HoodieCommitMetadata metadata = (HoodieCommitMetadata)HoodieCommitMetadata.fromJsonString((String)everything, HoodieCommitMetadata.class);
            int inserts = 0;
            int upserts = 0;
            for (Map.Entry pstat : metadata.getPartitionToWriteStats().entrySet()) {
                for (HoodieWriteStat stat : (List)pstat.getValue()) {
                    inserts = (int)((long)inserts + stat.getNumInserts());
                    upserts = (int)((long)upserts + stat.getNumUpdateWrites());
                }
            }
            org.junit.jupiter.api.Assertions.assertEquals((int)0, (int)inserts);
            org.junit.jupiter.api.Assertions.assertEquals((int)200, (int)upserts);
        }
        catch (Throwable throwable) {
            object = throwable;
            throw throwable;
        }
        finally {
            if (inputStream != null) {
                if (object != null) {
                    try {
                        inputStream.close();
                    }
                    catch (Throwable throwable) {
                        ((Throwable)object).addSuppressed(throwable);
                    }
                } else {
                    inputStream.close();
                }
            }
        }
    }

    @ParameterizedTest
    @ValueSource(booleans={true, false})
    public void testConsistencyCheckDuringFinalize(boolean enableOptimisticConsistencyGuard) throws Exception {
        HoodieTableMetaClient metaClient = this.createMetaClient(this.basePath);
        String instantTime = "000";
        HoodieWriteConfig cfg = this.getConfigBuilder().withAutoCommit(false).withConsistencyGuardConfig(ConsistencyGuardConfig.newBuilder().withEnableOptimisticConsistencyGuard(enableOptimisticConsistencyGuard).build()).build();
        SparkRDDWriteClient client = this.getHoodieWriteClient(cfg);
        Pair<StoragePath, JavaRDD<WriteStatus>> result = this.testConsistencyCheck(metaClient, instantTime, enableOptimisticConsistencyGuard);
        metaClient.getStorage().deleteFile((StoragePath)result.getKey());
        if (!enableOptimisticConsistencyGuard) {
            org.junit.jupiter.api.Assertions.assertTrue((boolean)client.commit(instantTime, result.getRight()), (String)"Commit should succeed");
            org.junit.jupiter.api.Assertions.assertTrue((boolean)this.testTable.commitExists(instantTime), (String)"After explicit commit, commit file should be created");
            org.junit.jupiter.api.Assertions.assertFalse((boolean)metaClient.getStorage().exists(new StoragePath(metaClient.getMarkerFolderPath(instantTime))));
        } else {
            org.junit.jupiter.api.Assertions.assertTrue((boolean)this.testTable.commitExists(instantTime), (String)"After explicit commit, commit file should be created");
            org.junit.jupiter.api.Assertions.assertFalse((boolean)metaClient.getStorage().exists(new StoragePath(metaClient.getMarkerFolderPath(instantTime))));
        }
    }

    private void testRollbackAfterConsistencyCheckFailureUsingFileList(boolean rollbackUsingMarkers, boolean enableOptimisticConsistencyGuard, boolean populateMetaFields) throws Exception {
        String instantTime = "00000000000010";
        HoodieTableMetaClient metaClient = this.createMetaClient(this.basePath);
        Properties properties = new Properties();
        if (!populateMetaFields) {
            properties = TestHoodieClientOnCopyOnWriteStorage.getPropertiesForKeyGen();
        }
        HoodieWriteConfig cfg = !enableOptimisticConsistencyGuard ? this.getConfigBuilder().withRollbackUsingMarkers(rollbackUsingMarkers).withAutoCommit(false).withConsistencyGuardConfig(ConsistencyGuardConfig.newBuilder().withConsistencyCheckEnabled(true).withMaxConsistencyCheckIntervalMs(1).withInitialConsistencyCheckIntervalMs(1).withEnableOptimisticConsistencyGuard(enableOptimisticConsistencyGuard).build()).build() : this.getConfigBuilder().withRollbackUsingMarkers(rollbackUsingMarkers).withAutoCommit(false).withConsistencyGuardConfig(ConsistencyGuardConfig.newBuilder().withConsistencyCheckEnabled(true).withEnableOptimisticConsistencyGuard(enableOptimisticConsistencyGuard).withOptimisticConsistencyGuardSleepTimeMs(1L).build()).withProperties(properties).build();
        SparkRDDWriteClient client = this.getHoodieWriteClient(cfg);
        this.testConsistencyCheck(metaClient, instantTime, enableOptimisticConsistencyGuard);
        if (!enableOptimisticConsistencyGuard) {
            client.rollback(instantTime);
            org.junit.jupiter.api.Assertions.assertFalse((boolean)this.testTable.commitExists(instantTime), (String)"After explicit rollback, commit file should not be present");
            org.junit.jupiter.api.Assertions.assertFalse((boolean)metaClient.getStorage().exists(new StoragePath(metaClient.getMarkerFolderPath(instantTime))));
        } else {
            org.junit.jupiter.api.Assertions.assertTrue((boolean)this.testTable.commitExists(instantTime), (String)"With optimistic CG, first commit should succeed. commit file should be present");
            org.junit.jupiter.api.Assertions.assertFalse((boolean)metaClient.getStorage().exists(new StoragePath(metaClient.getMarkerFolderPath(instantTime))));
            client.rollback(instantTime);
            org.junit.jupiter.api.Assertions.assertFalse((boolean)this.testTable.commitExists(instantTime), (String)"After explicit rollback, commit file should not be present");
        }
    }

    @ParameterizedTest
    @ValueSource(booleans={true, false})
    public void testRollbackAfterConsistencyCheckFailureUsingFileList(boolean enableOptimisticConsistencyGuard) throws Exception {
        this.testRollbackAfterConsistencyCheckFailureUsingFileList(false, enableOptimisticConsistencyGuard, true);
    }

    @ParameterizedTest
    @MethodSource(value={"rollbackAfterConsistencyCheckFailureParams"})
    public void testRollbackAfterConsistencyCheckFailureUsingMarkers(boolean enableOptimisticConsistencyGuard, boolean populateMetCols) throws Exception {
        this.testRollbackAfterConsistencyCheckFailureUsingFileList(true, enableOptimisticConsistencyGuard, populateMetCols);
    }

    @Test
    public void testRollbackFailedCommits() throws Exception {
        HoodieFailedWritesCleaningPolicy cleaningPolicy = HoodieFailedWritesCleaningPolicy.NEVER;
        boolean populateMetaFields = true;
        HoodieTestUtils.init((StorageConfiguration)this.storageConf, (String)this.basePath);
        SparkRDDWriteClient client = new SparkRDDWriteClient((HoodieEngineContext)this.context, this.getParallelWritingWriteConfig(cleaningPolicy, populateMetaFields));
        this.writeBatch(client, "100", "100", (Option<List<String>>)Option.of(Arrays.asList("100")), "100", 100, (HoodieWriterClientTestHarness.Function2<List<HoodieRecord>, String, Integer>)((HoodieWriterClientTestHarness.Function2)(arg_0, arg_1) -> ((HoodieTestDataGenerator)this.dataGen).generateInserts(arg_0, arg_1)), (HoodieWriterClientTestHarness.Function3<JavaRDD<WriteStatus>, SparkRDDWriteClient, JavaRDD<HoodieRecord>, String>)((HoodieWriterClientTestHarness.Function3)SparkRDDWriteClient::bulkInsert), false, 100, 300, 0, true);
        this.writeBatch(client, "200", "100", (Option<List<String>>)Option.of(Arrays.asList("200")), "100", 100, (HoodieWriterClientTestHarness.Function2<List<HoodieRecord>, String, Integer>)((HoodieWriterClientTestHarness.Function2)(arg_0, arg_1) -> ((HoodieTestDataGenerator)this.dataGen).generateInserts(arg_0, arg_1)), (HoodieWriterClientTestHarness.Function3<JavaRDD<WriteStatus>, SparkRDDWriteClient, JavaRDD<HoodieRecord>, String>)((HoodieWriterClientTestHarness.Function3)SparkRDDWriteClient::bulkInsert), false, 100, 300, 0, false);
        client.close();
        client = new SparkRDDWriteClient((HoodieEngineContext)this.context, this.getParallelWritingWriteConfig(cleaningPolicy, populateMetaFields));
        this.writeBatch(client, "300", "200", (Option<List<String>>)Option.of(Arrays.asList("300")), "300", 100, (HoodieWriterClientTestHarness.Function2<List<HoodieRecord>, String, Integer>)((HoodieWriterClientTestHarness.Function2)(arg_0, arg_1) -> ((HoodieTestDataGenerator)this.dataGen).generateInserts(arg_0, arg_1)), (HoodieWriterClientTestHarness.Function3<JavaRDD<WriteStatus>, SparkRDDWriteClient, JavaRDD<HoodieRecord>, String>)((HoodieWriterClientTestHarness.Function3)SparkRDDWriteClient::bulkInsert), false, 100, 300, 0, false);
        client.close();
        this.dataGen = new HoodieTestDataGenerator();
        client = new SparkRDDWriteClient((HoodieEngineContext)this.context, this.getParallelWritingWriteConfig(cleaningPolicy, populateMetaFields));
        this.writeBatch(client, "400", "300", (Option<List<String>>)Option.of(Arrays.asList("400")), "400", 100, (HoodieWriterClientTestHarness.Function2<List<HoodieRecord>, String, Integer>)((HoodieWriterClientTestHarness.Function2)(arg_0, arg_1) -> ((HoodieTestDataGenerator)this.dataGen).generateInserts(arg_0, arg_1)), (HoodieWriterClientTestHarness.Function3<JavaRDD<WriteStatus>, SparkRDDWriteClient, JavaRDD<HoodieRecord>, String>)((HoodieWriterClientTestHarness.Function3)SparkRDDWriteClient::bulkInsert), false, 100, 300, 0, true);
        HoodieTableMetaClient metaClient = this.createMetaClient(this.basePath);
        org.junit.jupiter.api.Assertions.assertTrue((metaClient.getActiveTimeline().getTimelineOfActions(CollectionUtils.createSet((Object[])new String[]{"rollback"})).countInstants() == 0 ? 1 : 0) != 0);
        org.junit.jupiter.api.Assertions.assertTrue((metaClient.getActiveTimeline().filterInflights().countInstants() == 2 ? 1 : 0) != 0);
        org.junit.jupiter.api.Assertions.assertTrue((metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants().countInstants() == 2 ? 1 : 0) != 0);
        boolean conditionMet = false;
        while (!conditionMet) {
            conditionMet = client.getHeartbeatClient().isHeartbeatExpired("300");
            Thread.sleep(2000L);
        }
        client = new SparkRDDWriteClient((HoodieEngineContext)this.context, this.getParallelWritingWriteConfig(cleaningPolicy, populateMetaFields));
        this.writeBatch(client, "500", "400", (Option<List<String>>)Option.of(Arrays.asList("500")), "500", 100, (HoodieWriterClientTestHarness.Function2<List<HoodieRecord>, String, Integer>)((HoodieWriterClientTestHarness.Function2)(arg_0, arg_1) -> ((HoodieTestDataGenerator)this.dataGen).generateInserts(arg_0, arg_1)), (HoodieWriterClientTestHarness.Function3<JavaRDD<WriteStatus>, SparkRDDWriteClient, JavaRDD<HoodieRecord>, String>)((HoodieWriterClientTestHarness.Function3)SparkRDDWriteClient::bulkInsert), false, 100, 300, 0, true);
        client.clean();
        client.close();
        HoodieActiveTimeline timeline = metaClient.getActiveTimeline().reload();
        if (cleaningPolicy.isLazy()) {
            org.junit.jupiter.api.Assertions.assertTrue((timeline.getTimelineOfActions(CollectionUtils.createSet((Object[])new String[]{"rollback"})).countInstants() == 2 ? 1 : 0) != 0);
            org.junit.jupiter.api.Assertions.assertTrue((timeline.getTimelineOfActions(CollectionUtils.createSet((Object[])new String[]{"clean"})).countInstants() == 0 ? 1 : 0) != 0);
            org.junit.jupiter.api.Assertions.assertTrue((timeline.getCommitsTimeline().filterCompletedInstants().countInstants() == 3 ? 1 : 0) != 0);
        } else if (cleaningPolicy.isNever()) {
            org.junit.jupiter.api.Assertions.assertTrue((timeline.getTimelineOfActions(CollectionUtils.createSet((Object[])new String[]{"rollback"})).countInstants() == 2 ? 1 : 0) != 0);
            org.junit.jupiter.api.Assertions.assertTrue((timeline.getTimelineOfActions(CollectionUtils.createSet((Object[])new String[]{"clean"})).countInstants() == 0 ? 1 : 0) != 0);
            org.junit.jupiter.api.Assertions.assertTrue((timeline.getCommitsTimeline().filterCompletedInstants().countInstants() == 3 ? 1 : 0) != 0);
        }
    }

    @Test
    public void testRollbackFailedCommitsToggleCleaningPolicy() throws Exception {
        boolean populateMetaFields = true;
        HoodieTestUtils.init((StorageConfiguration)this.storageConf, (String)this.basePath);
        HoodieFailedWritesCleaningPolicy cleaningPolicy = HoodieFailedWritesCleaningPolicy.EAGER;
        SparkRDDWriteClient client = new SparkRDDWriteClient((HoodieEngineContext)this.context, this.getParallelWritingWriteConfig(cleaningPolicy, populateMetaFields));
        this.writeBatch(client, "100", "100", (Option<List<String>>)Option.of(Arrays.asList("100")), "100", 100, (HoodieWriterClientTestHarness.Function2<List<HoodieRecord>, String, Integer>)((HoodieWriterClientTestHarness.Function2)(arg_0, arg_1) -> ((HoodieTestDataGenerator)this.dataGen).generateInserts(arg_0, arg_1)), (HoodieWriterClientTestHarness.Function3<JavaRDD<WriteStatus>, SparkRDDWriteClient, JavaRDD<HoodieRecord>, String>)((HoodieWriterClientTestHarness.Function3)SparkRDDWriteClient::bulkInsert), false, 100, 300, 0, true);
        this.writeBatch(client, "200", "100", (Option<List<String>>)Option.of(Arrays.asList("200")), "200", 100, (HoodieWriterClientTestHarness.Function2<List<HoodieRecord>, String, Integer>)((HoodieWriterClientTestHarness.Function2)(arg_0, arg_1) -> ((HoodieTestDataGenerator)this.dataGen).generateInserts(arg_0, arg_1)), (HoodieWriterClientTestHarness.Function3<JavaRDD<WriteStatus>, SparkRDDWriteClient, JavaRDD<HoodieRecord>, String>)((HoodieWriterClientTestHarness.Function3)SparkRDDWriteClient::bulkInsert), false, 100, 300, 0, false);
        client.close();
        cleaningPolicy = HoodieFailedWritesCleaningPolicy.LAZY;
        client = new SparkRDDWriteClient((HoodieEngineContext)this.context, this.getParallelWritingWriteConfig(cleaningPolicy, populateMetaFields));
        this.writeBatch(client, "300", "200", (Option<List<String>>)Option.of(Arrays.asList("300")), "300", 100, (HoodieWriterClientTestHarness.Function2<List<HoodieRecord>, String, Integer>)((HoodieWriterClientTestHarness.Function2)(arg_0, arg_1) -> ((HoodieTestDataGenerator)this.dataGen).generateInserts(arg_0, arg_1)), (HoodieWriterClientTestHarness.Function3<JavaRDD<WriteStatus>, SparkRDDWriteClient, JavaRDD<HoodieRecord>, String>)((HoodieWriterClientTestHarness.Function3)SparkRDDWriteClient::bulkInsert), false, 100, 300, 0, false);
        client.close();
        client = new SparkRDDWriteClient((HoodieEngineContext)this.context, this.getParallelWritingWriteConfig(cleaningPolicy, populateMetaFields));
        this.writeBatch(client, "400", "300", (Option<List<String>>)Option.of(Arrays.asList("400")), "400", 100, (HoodieWriterClientTestHarness.Function2<List<HoodieRecord>, String, Integer>)((HoodieWriterClientTestHarness.Function2)(arg_0, arg_1) -> ((HoodieTestDataGenerator)this.dataGen).generateInserts(arg_0, arg_1)), (HoodieWriterClientTestHarness.Function3<JavaRDD<WriteStatus>, SparkRDDWriteClient, JavaRDD<HoodieRecord>, String>)((HoodieWriterClientTestHarness.Function3)SparkRDDWriteClient::bulkInsert), false, 100, 300, 0, false);
        client.close();
        boolean conditionMet = false;
        while (!conditionMet) {
            conditionMet = client.getHeartbeatClient().isHeartbeatExpired("400");
            Thread.sleep(2000L);
        }
        client.clean();
        HoodieActiveTimeline timeline = this.metaClient.getActiveTimeline().reload();
        org.junit.jupiter.api.Assertions.assertTrue((timeline.getTimelineOfActions(CollectionUtils.createSet((Object[])new String[]{"rollback"})).countInstants() == 3 ? 1 : 0) != 0);
        client = new SparkRDDWriteClient((HoodieEngineContext)this.context, this.getParallelWritingWriteConfig(cleaningPolicy, populateMetaFields));
        this.writeBatch(client, "500", "400", (Option<List<String>>)Option.of(Arrays.asList("300")), "300", 100, (HoodieWriterClientTestHarness.Function2<List<HoodieRecord>, String, Integer>)((HoodieWriterClientTestHarness.Function2)(arg_0, arg_1) -> ((HoodieTestDataGenerator)this.dataGen).generateInserts(arg_0, arg_1)), (HoodieWriterClientTestHarness.Function3<JavaRDD<WriteStatus>, SparkRDDWriteClient, JavaRDD<HoodieRecord>, String>)((HoodieWriterClientTestHarness.Function3)SparkRDDWriteClient::bulkInsert), false, 100, 300, 0, false);
        client.close();
        client = new SparkRDDWriteClient((HoodieEngineContext)this.context, this.getParallelWritingWriteConfig(cleaningPolicy, populateMetaFields));
        this.writeBatch(client, "600", "500", (Option<List<String>>)Option.of(Arrays.asList("400")), "400", 100, (HoodieWriterClientTestHarness.Function2<List<HoodieRecord>, String, Integer>)((HoodieWriterClientTestHarness.Function2)(arg_0, arg_1) -> ((HoodieTestDataGenerator)this.dataGen).generateInserts(arg_0, arg_1)), (HoodieWriterClientTestHarness.Function3<JavaRDD<WriteStatus>, SparkRDDWriteClient, JavaRDD<HoodieRecord>, String>)((HoodieWriterClientTestHarness.Function3)SparkRDDWriteClient::bulkInsert), false, 100, 300, 0, false);
        client.close();
        cleaningPolicy = HoodieFailedWritesCleaningPolicy.EAGER;
        client = new SparkRDDWriteClient((HoodieEngineContext)this.context, this.getParallelWritingWriteConfig(cleaningPolicy, populateMetaFields));
        client.startCommit();
        client.close();
        timeline = this.metaClient.getActiveTimeline().reload();
        org.junit.jupiter.api.Assertions.assertTrue((timeline.getTimelineOfActions(CollectionUtils.createSet((Object[])new String[]{"rollback"})).countInstants() == 3 ? 1 : 0) != 0);
        org.junit.jupiter.api.Assertions.assertTrue((timeline.getCommitsTimeline().filterCompletedInstants().countInstants() == 1 ? 1 : 0) != 0);
    }

    @Test
    public void testParallelInsertAndCleanPreviousFailedCommits() throws Exception {
        HoodieFailedWritesCleaningPolicy cleaningPolicy = HoodieFailedWritesCleaningPolicy.LAZY;
        ExecutorService service = Executors.newFixedThreadPool(2);
        HoodieTestUtils.init((StorageConfiguration)this.storageConf, (String)this.basePath);
        SparkRDDWriteClient client = new SparkRDDWriteClient((HoodieEngineContext)this.context, this.getParallelWritingWriteConfig(cleaningPolicy, true));
        this.writeBatch(client, "100", "100", (Option<List<String>>)Option.of(Arrays.asList("100")), "100", 100, (HoodieWriterClientTestHarness.Function2<List<HoodieRecord>, String, Integer>)((HoodieWriterClientTestHarness.Function2)(arg_0, arg_1) -> ((HoodieTestDataGenerator)this.dataGen).generateInserts(arg_0, arg_1)), (HoodieWriterClientTestHarness.Function3<JavaRDD<WriteStatus>, SparkRDDWriteClient, JavaRDD<HoodieRecord>, String>)((HoodieWriterClientTestHarness.Function3)SparkRDDWriteClient::bulkInsert), false, 100, 100, 0, true);
        this.writeBatch(client, "200", "100", (Option<List<String>>)Option.of(Arrays.asList("200")), "200", 100, (HoodieWriterClientTestHarness.Function2<List<HoodieRecord>, String, Integer>)((HoodieWriterClientTestHarness.Function2)(arg_0, arg_1) -> ((HoodieTestDataGenerator)this.dataGen).generateInserts(arg_0, arg_1)), (HoodieWriterClientTestHarness.Function3<JavaRDD<WriteStatus>, SparkRDDWriteClient, JavaRDD<HoodieRecord>, String>)((HoodieWriterClientTestHarness.Function3)SparkRDDWriteClient::bulkInsert), false, 100, 100, 0, false);
        client.close();
        client = new SparkRDDWriteClient((HoodieEngineContext)this.context, this.getParallelWritingWriteConfig(cleaningPolicy, true));
        this.writeBatch(client, "300", "200", (Option<List<String>>)Option.of(Arrays.asList("300")), "300", 100, (HoodieWriterClientTestHarness.Function2<List<HoodieRecord>, String, Integer>)((HoodieWriterClientTestHarness.Function2)(arg_0, arg_1) -> ((HoodieTestDataGenerator)this.dataGen).generateInserts(arg_0, arg_1)), (HoodieWriterClientTestHarness.Function3<JavaRDD<WriteStatus>, SparkRDDWriteClient, JavaRDD<HoodieRecord>, String>)((HoodieWriterClientTestHarness.Function3)SparkRDDWriteClient::bulkInsert), false, 100, 100, 0, false);
        client.close();
        this.dataGen = new HoodieTestDataGenerator();
        Future<JavaRDD> commit3 = service.submit(() -> this.writeBatch(new SparkRDDWriteClient((HoodieEngineContext)this.context, this.getParallelWritingWriteConfig(cleaningPolicy, true)), "400", "300", (Option<List<String>>)Option.of(Arrays.asList("400")), "300", 100, (HoodieWriterClientTestHarness.Function2<List<HoodieRecord>, String, Integer>)((HoodieWriterClientTestHarness.Function2)(arg_0, arg_1) -> ((HoodieTestDataGenerator)this.dataGen).generateInserts(arg_0, arg_1)), (HoodieWriterClientTestHarness.Function3<JavaRDD<WriteStatus>, SparkRDDWriteClient, JavaRDD<HoodieRecord>, String>)((HoodieWriterClientTestHarness.Function3)SparkRDDWriteClient::bulkInsert), false, 100, 100, 0, true));
        commit3.get();
        HoodieTableMetaClient metaClient = this.createMetaClient(this.basePath);
        org.junit.jupiter.api.Assertions.assertTrue((metaClient.getActiveTimeline().getTimelineOfActions(CollectionUtils.createSet((Object[])new String[]{"rollback"})).countInstants() == 0 ? 1 : 0) != 0);
        org.junit.jupiter.api.Assertions.assertTrue((metaClient.getActiveTimeline().filterInflights().countInstants() == 2 ? 1 : 0) != 0);
        org.junit.jupiter.api.Assertions.assertTrue((metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants().countInstants() == 2 ? 1 : 0) != 0);
        client = new SparkRDDWriteClient((HoodieEngineContext)this.context, this.getParallelWritingWriteConfig(cleaningPolicy, true));
        boolean conditionMet = false;
        while (!conditionMet) {
            conditionMet = client.getHeartbeatClient().isHeartbeatExpired("300");
            Thread.sleep(2000L);
        }
        Future<JavaRDD> commit4 = service.submit(() -> this.writeBatch(new SparkRDDWriteClient((HoodieEngineContext)this.context, this.getParallelWritingWriteConfig(cleaningPolicy, true)), "500", "400", (Option<List<String>>)Option.of(Arrays.asList("500")), "500", 100, (HoodieWriterClientTestHarness.Function2<List<HoodieRecord>, String, Integer>)((HoodieWriterClientTestHarness.Function2)(arg_0, arg_1) -> ((HoodieTestDataGenerator)this.dataGen).generateInserts(arg_0, arg_1)), (HoodieWriterClientTestHarness.Function3<JavaRDD<WriteStatus>, SparkRDDWriteClient, JavaRDD<HoodieRecord>, String>)((HoodieWriterClientTestHarness.Function3)SparkRDDWriteClient::bulkInsert), false, 100, 100, 0, true));
        Future<HoodieCleanMetadata> clean1 = service.submit(() -> new SparkRDDWriteClient((HoodieEngineContext)this.context, this.getParallelWritingWriteConfig(cleaningPolicy, true)).clean());
        commit4.get();
        clean1.get();
        HoodieActiveTimeline timeline = metaClient.getActiveTimeline().reload();
        org.junit.jupiter.api.Assertions.assertTrue((timeline.getTimelineOfActions(CollectionUtils.createSet((Object[])new String[]{"rollback"})).countInstants() == 2 ? 1 : 0) != 0);
        org.junit.jupiter.api.Assertions.assertTrue((timeline.getTimelineOfActions(CollectionUtils.createSet((Object[])new String[]{"clean"})).countInstants() == 0 ? 1 : 0) != 0);
        org.junit.jupiter.api.Assertions.assertTrue((timeline.getCommitsTimeline().filterCompletedInstants().countInstants() == 3 ? 1 : 0) != 0);
        client.close();
    }

    private Pair<StoragePath, JavaRDD<WriteStatus>> testConsistencyCheck(HoodieTableMetaClient metaClient, String instantTime, boolean enableOptimisticConsistencyGuard) throws Exception {
        String partitionPath;
        HoodieWriteConfig cfg = !enableOptimisticConsistencyGuard ? this.getConfigBuilder().withAutoCommit(false).withConsistencyGuardConfig(ConsistencyGuardConfig.newBuilder().withConsistencyCheckEnabled(true).withMaxConsistencyCheckIntervalMs(1).withInitialConsistencyCheckIntervalMs(1).withEnableOptimisticConsistencyGuard(enableOptimisticConsistencyGuard).build()).build() : this.getConfigBuilder().withAutoCommit(false).withConsistencyGuardConfig(ConsistencyGuardConfig.newBuilder().withConsistencyCheckEnabled(true).withEnableOptimisticConsistencyGuard(enableOptimisticConsistencyGuard).withOptimisticConsistencyGuardSleepTimeMs(1L).build()).build();
        SparkRDDWriteClient client = this.getHoodieWriteClient(cfg);
        client.startCommitWithTime(instantTime);
        JavaRDD writeRecords = this.jsc.parallelize(this.dataGen.generateInserts(instantTime, Integer.valueOf(200)), 1);
        JavaRDD result = client.bulkInsert(writeRecords, instantTime);
        result.collect();
        String markerFolderPath = metaClient.getMarkerFolderPath(instantTime);
        if (cfg.getMarkersType() == MarkerType.TIMELINE_SERVER_BASED) {
            String markerName = (String)MarkerUtils.readTimelineServerBasedMarkersFromFileSystem((String)markerFolderPath, (HoodieStorage)this.storage, (HoodieEngineContext)this.context, (int)1).values().stream().flatMap(Collection::stream).findFirst().get();
            partitionPath = new Path(markerFolderPath, markerName).getParent().toString();
        } else {
            partitionPath = (String)this.storage.globEntries(new StoragePath(String.format("%s/*/*/*/*", markerFolderPath)), (StoragePathFilter & Serializable)path -> path.toString().contains(".marker")).stream().limit(1L).map(status -> status.getPath().getParent().toString()).collect(Collectors.toList()).get(0);
        }
        Option markerFilePath = WriteMarkersFactory.get((MarkerType)cfg.getMarkersType(), (HoodieTable)this.getHoodieTable(metaClient, cfg), (String)instantTime).create(partitionPath, FSUtils.makeBaseFileName((String)instantTime, (String)"1-0-1", (String)UUID.randomUUID().toString(), (String)BASE_FILE_EXTENSION), IOType.MERGE);
        if (!enableOptimisticConsistencyGuard) {
            Exception e = (Exception)org.junit.jupiter.api.Assertions.assertThrows(HoodieCommitException.class, () -> client.commit(instantTime, (Object)result), (String)"Commit should fail due to consistency check");
            org.junit.jupiter.api.Assertions.assertTrue((boolean)(e.getCause() instanceof HoodieIOException));
        } else {
            client.commit(instantTime, (Object)result);
        }
        return Pair.of((Object)markerFilePath.get(), (Object)result);
    }

    @Test
    public void testMultiOperationsPerCommit() throws IOException {
        HoodieWriteConfig.Builder cfgBuilder = this.getConfigBuilder().withAutoCommit(false).withAllowMultiWriteOnSameInstant(true);
        this.addConfigsForPopulateMetaFields(cfgBuilder, true);
        HoodieWriteConfig cfg = cfgBuilder.build();
        SparkRDDWriteClient client = this.getHoodieWriteClient(cfg);
        String firstInstantTime = "0000";
        client.startCommitWithTime(firstInstantTime);
        int numRecords = 200;
        JavaRDD writeRecords = this.jsc.parallelize(this.dataGen.generateInserts(firstInstantTime, Integer.valueOf(numRecords)), 1);
        JavaRDD result = client.bulkInsert(writeRecords, firstInstantTime);
        org.junit.jupiter.api.Assertions.assertTrue((boolean)client.commit(firstInstantTime, (Object)result), (String)"Commit should succeed");
        org.junit.jupiter.api.Assertions.assertTrue((boolean)this.testTable.commitExists(firstInstantTime), (String)"After explicit commit, commit file should be created");
        String[] fullPartitionPaths = new String[this.dataGen.getPartitionPaths().length];
        for (int i = 0; i < fullPartitionPaths.length; ++i) {
            fullPartitionPaths[i] = String.format("%s/%s/*", this.basePath, this.dataGen.getPartitionPaths()[i]);
        }
        org.junit.jupiter.api.Assertions.assertEquals((long)numRecords, (long)HoodieClientTestUtils.read(this.jsc, this.basePath, this.sqlContext, this.storage, fullPartitionPaths).count(), (String)("Must contain " + numRecords + " records"));
        String nextInstantTime = "0001";
        client.startCommitWithTime(nextInstantTime);
        JavaRDD updateRecords = this.jsc.parallelize(this.dataGen.generateUpdates(nextInstantTime, Integer.valueOf(numRecords)), 1);
        JavaRDD insertRecords = this.jsc.parallelize(this.dataGen.generateInserts(nextInstantTime, Integer.valueOf(numRecords)), 1);
        JavaRDD inserts = client.bulkInsert(insertRecords, nextInstantTime);
        JavaRDD upserts = client.upsert(updateRecords, nextInstantTime);
        org.junit.jupiter.api.Assertions.assertTrue((boolean)client.commit(nextInstantTime, (Object)inserts.union(upserts)), (String)"Commit should succeed");
        org.junit.jupiter.api.Assertions.assertTrue((boolean)this.testTable.commitExists(firstInstantTime), (String)"After explicit commit, commit file should be created");
        int totalRecords = 2 * numRecords;
        org.junit.jupiter.api.Assertions.assertEquals((long)totalRecords, (long)HoodieClientTestUtils.read(this.jsc, this.basePath, this.sqlContext, this.storage, fullPartitionPaths).count(), (String)("Must contain " + totalRecords + " records"));
    }

    @Test
    public void testClusteringCommitInPresenceOfInflightCommit() throws Exception {
        Properties properties = TestHoodieClientOnCopyOnWriteStorage.getDisabledRowWriterProperties();
        properties.setProperty("hoodie.write.lock.filesystem.path", this.basePath + "/.hoodie/.locks");
        HoodieLockConfig lockConfig = HoodieLockConfig.newBuilder().withLockProvider(FileSystemBasedLockProviderTestClass.class).withConflictResolutionStrategy((ConflictResolutionStrategy)new PreferWriterConflictResolutionStrategy()).build();
        HoodieCleanConfig cleanConfig = HoodieCleanConfig.newBuilder().withFailedWritesCleaningPolicy(HoodieFailedWritesCleaningPolicy.LAZY).withAutoClean(Boolean.valueOf(false)).build();
        HoodieWriteConfig insertWriteConfig = this.getConfigBuilder().withAutoCommit(false).withCleanConfig(cleanConfig).withLockConfig(lockConfig).withWriteConcurrencyMode(WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL).withProperties(properties).build();
        SparkRDDWriteClient client = this.getHoodieWriteClient(insertWriteConfig);
        int numRecords = 200;
        String firstCommit = HoodieActiveTimeline.createNewInstantTime();
        String partitionStr = "2016/03/15";
        HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator(new String[]{partitionStr});
        this.writeBatch(client, firstCommit, "000", (Option<List<String>>)Option.of(Arrays.asList("000")), "000", numRecords, (HoodieWriterClientTestHarness.Function2<List<HoodieRecord>, String, Integer>)((HoodieWriterClientTestHarness.Function2)(arg_0, arg_1) -> ((HoodieTestDataGenerator)dataGenerator).generateInserts(arg_0, arg_1)), (HoodieWriterClientTestHarness.Function3<JavaRDD<WriteStatus>, SparkRDDWriteClient, JavaRDD<HoodieRecord>, String>)((HoodieWriterClientTestHarness.Function3)SparkRDDWriteClient::insert), true, numRecords, numRecords, 1, true);
        String inflightCommit = HoodieActiveTimeline.createNewInstantTime();
        this.writeBatch(client, inflightCommit, firstCommit, (Option<List<String>>)Option.of(Arrays.asList("000")), "000", 100, (HoodieWriterClientTestHarness.Function2<List<HoodieRecord>, String, Integer>)((HoodieWriterClientTestHarness.Function2)(arg_0, arg_1) -> ((HoodieTestDataGenerator)dataGenerator).generateUniqueUpdates(arg_0, arg_1)), (HoodieWriterClientTestHarness.Function3<JavaRDD<WriteStatus>, SparkRDDWriteClient, JavaRDD<HoodieRecord>, String>)((HoodieWriterClientTestHarness.Function3)SparkRDDWriteClient::upsert), false, 0, 200, 2, false);
        HoodieClusteringConfig clusteringConfig = HoodieClusteringConfig.newBuilder().withClusteringMaxNumGroups(10).withClusteringTargetPartitions(0).withInlineClusteringNumCommits(1).build();
        HoodiePreCommitValidatorConfig preCommitValidatorConfig = HoodiePreCommitValidatorConfig.newBuilder().withPreCommitValidator(StringUtils.nullToEmpty((String)SqlQuerySingleResultPreCommitValidator.class.getName())).withPrecommitValidatorSingleResultSqlQueries("select count(*) from <TABLE_NAME>#200").build();
        HoodieWriteConfig clusteringWriteConfig = this.getConfigBuilder().withAutoCommit(false).withCleanConfig(cleanConfig).withClusteringConfig(clusteringConfig).withPreCommitValidatorConfig(preCommitValidatorConfig).withLockConfig(lockConfig).withWriteConcurrencyMode(WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL).withProperties(properties).build();
        String clusteringCommitTime = HoodieActiveTimeline.createNewInstantTime();
        SparkRDDWriteClient clusteringWriteClient = this.getHoodieWriteClient(clusteringWriteConfig);
        clusteringWriteClient.scheduleClusteringAtInstant(clusteringCommitTime, Option.empty());
        org.junit.jupiter.api.Assertions.assertThrows(HoodieClusteringException.class, () -> clusteringWriteClient.cluster(clusteringCommitTime, true));
        clusteringWriteClient.rollback(clusteringCommitTime);
        List instants = this.metaClient.reloadActiveTimeline().getInstants();
        org.junit.jupiter.api.Assertions.assertEquals((int)3, (int)instants.size());
        org.junit.jupiter.api.Assertions.assertEquals((Object)"rollback", (Object)((HoodieInstant)instants.get(2)).getAction());
        org.junit.jupiter.api.Assertions.assertEquals((Object)new HoodieInstant(true, "commit", inflightCommit), instants.get(1));
    }

    @Test
    public void testIngestionCommitInPresenceOfCompletedClusteringCommit() throws Exception {
        Properties properties = TestHoodieClientOnCopyOnWriteStorage.getDisabledRowWriterProperties();
        properties.setProperty("hoodie.write.lock.filesystem.path", this.basePath + "/.hoodie/.locks");
        HoodieLockConfig lockConfig = HoodieLockConfig.newBuilder().withLockProvider(FileSystemBasedLockProviderTestClass.class).withConflictResolutionStrategy((ConflictResolutionStrategy)new PreferWriterConflictResolutionStrategy()).build();
        HoodieCleanConfig cleanConfig = HoodieCleanConfig.newBuilder().withFailedWritesCleaningPolicy(HoodieFailedWritesCleaningPolicy.LAZY).withAutoClean(Boolean.valueOf(false)).build();
        HoodieWriteConfig insertWriteConfig = this.getConfigBuilder().withAutoCommit(false).withCleanConfig(cleanConfig).withLockConfig(lockConfig).withWriteConcurrencyMode(WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL).withProperties(properties).build();
        SparkRDDWriteClient client = this.getHoodieWriteClient(insertWriteConfig);
        int numRecords = 200;
        String firstCommit = HoodieActiveTimeline.createNewInstantTime();
        String partitionStr = "2016/03/15";
        HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator(new String[]{partitionStr});
        this.writeBatch(client, firstCommit, "000", (Option<List<String>>)Option.of(Arrays.asList("000")), "000", numRecords, (HoodieWriterClientTestHarness.Function2<List<HoodieRecord>, String, Integer>)((HoodieWriterClientTestHarness.Function2)(arg_0, arg_1) -> ((HoodieTestDataGenerator)dataGenerator).generateInserts(arg_0, arg_1)), (HoodieWriterClientTestHarness.Function3<JavaRDD<WriteStatus>, SparkRDDWriteClient, JavaRDD<HoodieRecord>, String>)((HoodieWriterClientTestHarness.Function3)SparkRDDWriteClient::insert), true, numRecords, numRecords, 1, true);
        String inflightCommit = HoodieActiveTimeline.createNewInstantTime();
        JavaRDD<WriteStatus> ingestionResult = this.writeBatch(client, inflightCommit, firstCommit, (Option<List<String>>)Option.of(Arrays.asList("000")), "000", 100, (HoodieWriterClientTestHarness.Function2<List<HoodieRecord>, String, Integer>)((HoodieWriterClientTestHarness.Function2)(arg_0, arg_1) -> ((HoodieTestDataGenerator)dataGenerator).generateUniqueUpdates(arg_0, arg_1)), (HoodieWriterClientTestHarness.Function3<JavaRDD<WriteStatus>, SparkRDDWriteClient, JavaRDD<HoodieRecord>, String>)((HoodieWriterClientTestHarness.Function3)SparkRDDWriteClient::upsert), false, 0, 200, 2, false);
        HoodieClusteringConfig clusteringConfig = HoodieClusteringConfig.newBuilder().withClusteringMaxNumGroups(10).withClusteringTargetPartitions(0).withInlineClusteringNumCommits(1).build();
        HoodieLockConfig clusteringLockConfig = HoodieLockConfig.newBuilder().withLockProvider(FileSystemBasedLockProviderTestClass.class).withConflictResolutionStrategy((ConflictResolutionStrategy)new SimpleConcurrentFileWritesConflictResolutionStrategy()).build();
        HoodiePreCommitValidatorConfig preCommitValidatorConfig = HoodiePreCommitValidatorConfig.newBuilder().withPreCommitValidator(StringUtils.nullToEmpty((String)SqlQuerySingleResultPreCommitValidator.class.getName())).withPrecommitValidatorSingleResultSqlQueries("select count(*) from <TABLE_NAME>#200").build();
        HoodieWriteConfig clusteringWriteConfig = this.getConfigBuilder().withAutoCommit(false).withCleanConfig(cleanConfig).withClusteringConfig(clusteringConfig).withPreCommitValidatorConfig(preCommitValidatorConfig).withLockConfig(clusteringLockConfig).withWriteConcurrencyMode(WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL).withProperties(properties).build();
        String clusteringCommitTime = HoodieActiveTimeline.createNewInstantTime();
        SparkRDDWriteClient clusteringWriteClient = this.getHoodieWriteClient(clusteringWriteConfig);
        clusteringWriteClient.scheduleClusteringAtInstant(clusteringCommitTime, Option.empty());
        clusteringWriteClient.cluster(clusteringCommitTime, true);
        org.junit.jupiter.api.Assertions.assertThrows(HoodieWriteConflictException.class, () -> client.commit(inflightCommit, (Object)ingestionResult));
    }

    private HoodieWriteConfig getSmallInsertWriteConfig(int insertSplitSize) {
        return this.getSmallInsertWriteConfig(insertSplitSize, false);
    }

    private HoodieWriteConfig getSmallInsertWriteConfig(int insertSplitSize, boolean useNullSchema) {
        return this.getSmallInsertWriteConfig(insertSplitSize, useNullSchema, false);
    }

    private HoodieWriteConfig getSmallInsertWriteConfig(int insertSplitSize, boolean useNullSchema, boolean mergeAllowDuplicateInserts) {
        return this.getSmallInsertWriteConfig(insertSplitSize, useNullSchema, (long)this.dataGen.getEstimatedFileSizeInBytes(150), mergeAllowDuplicateInserts);
    }

    private HoodieWriteConfig getSmallInsertWriteConfig(int insertSplitSize, boolean useNullSchema, long smallFileSize) {
        return this.getSmallInsertWriteConfig(insertSplitSize, useNullSchema, smallFileSize, false);
    }

    private HoodieWriteConfig getSmallInsertWriteConfig(int insertSplitSize, boolean useNullSchema, long smallFileSize, boolean mergeAllowDuplicateInserts) {
        String schemaStr = useNullSchema ? HoodieTestDataGenerator.NULL_SCHEMA : "{\"type\": \"record\",\"name\": \"triprec\",\"fields\": [ {\"name\": \"timestamp\",\"type\": \"long\"},{\"name\": \"_row_key\", \"type\": \"string\"},{\"name\": \"partition_path\", \"type\": [\"null\", \"string\"], \"default\": null },{\"name\": \"trip_type\", \"type\": {\"type\": \"enum\", \"name\": \"TripType\", \"symbols\": [\"UNKNOWN\", \"UBERX\", \"BLACK\"], \"default\": \"UNKNOWN\"}},{\"name\": \"rider\", \"type\": \"string\"},{\"name\": \"driver\", \"type\": \"string\"},{\"name\": \"begin_lat\", \"type\": \"double\"},{\"name\": \"begin_lon\", \"type\": \"double\"},{\"name\": \"end_lat\", \"type\": \"double\"},{\"name\": \"end_lon\", \"type\": \"double\"},{\"name\": \"distance_in_meters\", \"type\": \"int\"},{\"name\": \"seconds_since_epoch\", \"type\": \"long\"},{\"name\": \"weight\", \"type\": \"float\"},{\"name\": \"nation\", \"type\": \"bytes\"},{\"name\":\"current_date\",\"type\": {\"type\": \"int\", \"logicalType\": \"date\"}},{\"name\":\"current_ts\",\"type\": {\"type\": \"long\"}},{\"name\":\"height\",\"type\":{\"type\":\"fixed\",\"name\":\"abc\",\"size\":5,\"logicalType\":\"decimal\",\"precision\":10,\"scale\":6}},{\"name\": \"city_to_state\", \"type\": {\"type\": \"map\", \"values\": \"string\"}},{\"name\": \"fare\",\"type\": {\"type\":\"record\", \"name\":\"fare\",\"fields\": [{\"name\": \"amount\",\"type\": \"double\"},{\"name\": \"currency\", \"type\": \"string\"}]}},{\"name\": \"tip_history\", \"default\": [], \"type\": {\"type\": \"array\", \"default\": [], \"items\": {\"type\": \"record\", \"default\": null, \"name\": \"tip_history\", \"fields\": [{\"name\": \"amount\", \"type\": \"double\"}, {\"name\": \"currency\", \"type\": \"string\"}]}}},{\"name\": \"_hoodie_is_deleted\", \"type\": \"boolean\", \"default\": false} ]}";
        return this.getSmallInsertWriteConfig(insertSplitSize, schemaStr, smallFileSize, mergeAllowDuplicateInserts);
    }

    private HoodieWriteConfig getSmallInsertWriteConfig(int insertSplitSize, String schemaStr, long smallFileSize) {
        return this.getSmallInsertWriteConfig(insertSplitSize, schemaStr, smallFileSize, false);
    }

    private HoodieWriteConfig getSmallInsertWriteConfig(int insertSplitSize, String schemaStr, long smallFileSize, boolean mergeAllowDuplicateInserts) {
        return this.getSmallInsertWriteConfig(insertSplitSize, schemaStr, smallFileSize, mergeAllowDuplicateInserts, true, new Properties());
    }

    private HoodieWriteConfig getSmallInsertWriteConfig(int insertSplitSize, String schemaStr, long smallFileSize, boolean populateMetaFields, Properties props) {
        return this.getSmallInsertWriteConfig(insertSplitSize, schemaStr, smallFileSize, false, populateMetaFields, props);
    }

    private HoodieWriteConfig getSmallInsertWriteConfig(int insertSplitSize, String schemaStr, long smallFileSize, boolean mergeAllowDuplicateInserts, boolean populateMetaFields, Properties props) {
        HoodieWriteConfig.Builder builder = this.getConfigBuilder(schemaStr);
        if (!populateMetaFields) {
            builder.withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.SIMPLE).build());
        }
        return builder.withCompactionConfig(HoodieCompactionConfig.newBuilder().compactionSmallFileSize(smallFileSize).insertSplitSize(insertSplitSize).build()).withCleanConfig(HoodieCleanConfig.newBuilder().withFailedWritesCleaningPolicy(HoodieFailedWritesCleaningPolicy.LAZY).build()).withStorageConfig(HoodieStorageConfig.newBuilder().hfileMaxFileSize((long)this.dataGen.getEstimatedFileSizeInBytes(200)).parquetMaxFileSize((long)this.dataGen.getEstimatedFileSizeInBytes(200)).build()).withMergeAllowDuplicateOnInserts(mergeAllowDuplicateInserts).withProps((Map)props).build();
    }

    protected HoodieInstant createRequestedReplaceInstant(HoodieTableMetaClient metaClient, String clusterTime, List<FileSlice>[] fileSlices) throws IOException {
        HoodieClusteringPlan clusteringPlan = ClusteringUtils.createClusteringPlan((String)((String)HoodieClusteringConfig.EXECUTION_STRATEGY_CLASS_NAME.defaultValue()), STRATEGY_PARAMS, (List[])fileSlices, Collections.emptyMap());
        HoodieInstant clusteringInstant = new HoodieInstant(HoodieInstant.State.REQUESTED, "replacecommit", clusterTime);
        HoodieRequestedReplaceMetadata requestedReplaceMetadata = HoodieRequestedReplaceMetadata.newBuilder().setClusteringPlan(clusteringPlan).setOperationType(WriteOperationType.CLUSTER.name()).build();
        metaClient.getActiveTimeline().saveToPendingReplaceCommit(clusteringInstant, TimelineMetadataUtils.serializeRequestedReplaceMetadata((HoodieRequestedReplaceMetadata)requestedReplaceMetadata));
        return clusteringInstant;
    }

    private HoodieWriteConfig getParallelWritingWriteConfig(HoodieFailedWritesCleaningPolicy cleaningPolicy, boolean populateMetaFields) {
        Properties properties = new Properties();
        properties.setProperty("hoodie.write.lock.wait_time_ms", "3000");
        properties.setProperty("hoodie.write.lock.client.wait_time_ms_between_retry", "3000");
        properties.setProperty("hoodie.write.lock.client.num_retries", "20");
        if (!populateMetaFields) {
            TestHoodieClientOnCopyOnWriteStorage.getPropertiesForKeyGen((boolean)populateMetaFields).entrySet().forEach(kv -> properties.put(kv.getKey(), kv.getValue()));
        }
        return this.getConfigBuilder().withEmbeddedTimelineServerEnabled(false).withCleanConfig(HoodieCleanConfig.newBuilder().withFailedWritesCleaningPolicy(cleaningPolicy).withAutoClean(Boolean.valueOf(false)).build()).withTimelineLayoutVersion(1).withHeartbeatIntervalInMs(Integer.valueOf(3000)).withFileSystemViewConfig(FileSystemViewStorageConfig.newBuilder().withRemoteServerPort(Integer.valueOf(timelineServicePort)).build()).withAutoCommit(false).withLockConfig(HoodieLockConfig.newBuilder().withLockProvider(InProcessLockProvider.class).build()).withWriteConcurrencyMode(WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL).withProperties(properties).build();
    }

    private static Properties getDisabledRowWriterProperties() {
        Properties properties = new Properties();
        properties.setProperty("hoodie.datasource.write.row.writer.enable", String.valueOf(false));
        return properties;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private static /* synthetic */ Boolean lambda$testUpsertsInternal$57b26ee6$1(HoodieTableMetaClient metaClient, String extension, String basePathStr, HoodieWriteConfig cfg, HoodieTable table, HoodieWriteConfig config, Integer e) throws Exception {
        HoodieCommitMetadata commitMetadata = (HoodieCommitMetadata)HoodieCommitMetadata.fromBytes((byte[])((byte[])metaClient.getActiveTimeline().getInstantDetails((HoodieInstant)metaClient.getCommitsTimeline().filterCompletedInstants().lastInstant().get()).get()), HoodieCommitMetadata.class);
        String filePath = commitMetadata.getPartitionToWriteStats().values().stream().flatMap(w -> w.stream()).filter(s -> s.getPath().endsWith(extension)).findAny().map(ee -> ee.getPath()).orElse(null);
        String partitionPath = commitMetadata.getPartitionToWriteStats().values().stream().flatMap(w -> w.stream()).filter(s -> s.getPath().endsWith(extension)).findAny().map(ee -> ee.getPartitionPath()).orElse(null);
        Path baseFilePath = new Path(basePathStr, filePath);
        HoodieBaseFile baseFile = new HoodieBaseFile(baseFilePath.toString());
        try (HoodieMergeHandle handle = null;){
            handle = new HoodieMergeHandle(cfg, "007", table, new HashMap(), partitionPath, FSUtils.getFileId((String)baseFilePath.getName()), baseFile, (TaskContextSupplier)new SparkTaskContextSupplier(), config.populateMetaFields() ? Option.empty() : Option.of((Object)((BaseKeyGenerator)HoodieSparkKeyGeneratorFactory.createKeyGenerator((TypedProperties)new TypedProperties((Properties)config.getProps())))));
            WriteStatus writeStatus = new WriteStatus(Boolean.valueOf(false), Double.valueOf(0.0));
            writeStatus.setStat(new HoodieWriteStat());
            writeStatus.getStat().setNumWrites(0L);
            handle.performMergeDataValidationCheck(writeStatus);
        }
        handle = null;
        try {
            String newInstantTime = "006";
            cfg.getProps().setProperty("hoodie.merge.data.validation.enabled", "true");
            HoodieWriteConfig cfg2 = HoodieWriteConfig.newBuilder().withProps((Map)cfg.getProps()).build();
            handle = new HoodieMergeHandle(cfg2, "006", table, new HashMap(), partitionPath, FSUtils.getFileId((String)baseFilePath.getName()), baseFile, (TaskContextSupplier)new SparkTaskContextSupplier(), config.populateMetaFields() ? Option.empty() : Option.of((Object)((BaseKeyGenerator)HoodieSparkKeyGeneratorFactory.createKeyGenerator((TypedProperties)new TypedProperties((Properties)config.getProps())))));
            WriteStatus writeStatus = new WriteStatus(Boolean.valueOf(false), Double.valueOf(0.0));
            writeStatus.setStat(new HoodieWriteStat());
            writeStatus.getStat().setNumWrites(0L);
            handle.performMergeDataValidationCheck(writeStatus);
            org.junit.jupiter.api.Assertions.fail((String)"The above line should have thrown an exception");
        }
        catch (HoodieCorruptedDataException hoodieCorruptedDataException) {
        }
        finally {
            if (handle != null) {
                try {
                    handle.close();
                }
                catch (Exception exception) {}
            }
        }
        return true;
    }

    public static class WriteClientBrokenClustering<T extends HoodieRecordPayload>
    extends SparkRDDWriteClient<T> {
        public WriteClientBrokenClustering(HoodieEngineContext context, HoodieWriteConfig clientConfig) {
            super(context, clientConfig);
        }

        protected void runTableServicesInline(HoodieTable table, HoodieCommitMetadata metadata, Option<Map<String, String>> extraMetadata) {
            if (this.config.inlineClusteringEnabled()) {
                throw new HoodieException(TestHoodieClientOnCopyOnWriteStorage.CLUSTERING_FAILURE);
            }
        }
    }

    public static class FailingPreCommitValidator<T extends HoodieRecordPayload, I, K, O extends HoodieData<WriteStatus>>
    extends SparkPreCommitValidator<T, I, K, O> {
        public FailingPreCommitValidator(HoodieSparkTable table, HoodieEngineContext context, HoodieWriteConfig config) {
            super(table, context, config);
        }

        protected void validateRecordsBeforeAndAfter(Dataset<Row> before, Dataset<Row> after, Set<String> partitionsAffected) {
            throw new HoodieValidationException("simulate failure");
        }
    }
}

