/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.utilities.deltastreamer;

import com.fasterxml.jackson.databind.ObjectMapper;
import java.io.IOException;
import java.io.InputStream;
import java.sql.Connection;
import java.sql.DriverManager;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.function.BiPredicate;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.generic.IndexedRecord;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hudi.DataSourceReadOptions;
import org.apache.hudi.DataSourceWriteOptions;
import org.apache.hudi.DefaultSparkRecordMerger;
import org.apache.hudi.client.SparkRDDWriteClient;
import org.apache.hudi.client.heartbeat.HoodieHeartbeatClient;
import org.apache.hudi.client.transaction.lock.InProcessLockProvider;
import org.apache.hudi.common.config.DFSPropertiesConfiguration;
import org.apache.hudi.common.config.HoodieMetadataConfig;
import org.apache.hudi.common.config.HoodieStorageConfig;
import org.apache.hudi.common.config.RecordMergeMode;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.engine.HoodieLocalEngineContext;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.BaseFile;
import org.apache.hudi.common.model.DefaultHoodieRecordPayload;
import org.apache.hudi.common.model.HoodieBaseFile;
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieReplaceCommitMetadata;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.model.OverwriteWithLatestAvroPayload;
import org.apache.hudi.common.model.PartialUpdateAvroPayload;
import org.apache.hudi.common.model.WriteConcurrencyMode;
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.common.table.HoodieTableConfig;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.HoodieTableVersion;
import org.apache.hudi.common.table.TableSchemaResolver;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.table.timeline.InstantComparison;
import org.apache.hudi.common.table.timeline.TimelineUtils;
import org.apache.hudi.common.table.view.FileSystemViewManager;
import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
import org.apache.hudi.common.testutils.HoodieTestUtils;
import org.apache.hudi.common.testutils.JavaTestUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.common.util.collection.ClosableIterator;
import org.apache.hudi.config.HoodieArchivalConfig;
import org.apache.hudi.config.HoodieCleanConfig;
import org.apache.hudi.config.HoodieClusteringConfig;
import org.apache.hudi.config.HoodieCompactionConfig;
import org.apache.hudi.config.HoodieErrorTableConfig;
import org.apache.hudi.config.HoodieIndexConfig;
import org.apache.hudi.config.HoodieLockConfig;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.config.metrics.HoodieMetricsConfig;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.exception.TableNotFoundException;
import org.apache.hudi.execution.bulkinsert.BulkInsertSortMode;
import org.apache.hudi.hadoop.fs.HadoopFSUtils;
import org.apache.hudi.hive.HiveSyncConfig;
import org.apache.hudi.hive.HoodieHiveSyncClient;
import org.apache.hudi.io.hadoop.HoodieAvroParquetReader;
import org.apache.hudi.keygen.ComplexKeyGenerator;
import org.apache.hudi.keygen.CustomKeyGenerator;
import org.apache.hudi.keygen.NonpartitionedKeyGenerator;
import org.apache.hudi.keygen.SimpleKeyGenerator;
import org.apache.hudi.metrics.Metrics;
import org.apache.hudi.metrics.MetricsReporterType;
import org.apache.hudi.storage.HoodieStorage;
import org.apache.hudi.storage.StorageConfiguration;
import org.apache.hudi.storage.StoragePath;
import org.apache.hudi.storage.StoragePathInfo;
import org.apache.hudi.sync.common.HoodieSyncConfig;
import org.apache.hudi.testutils.HoodieClientTestUtils;
import org.apache.hudi.utilities.DummySchemaProvider;
import org.apache.hudi.utilities.HoodieClusteringJob;
import org.apache.hudi.utilities.HoodieIndexer;
import org.apache.hudi.utilities.HoodieMetadataTableValidator;
import org.apache.hudi.utilities.UtilHelpers;
import org.apache.hudi.utilities.config.HoodieStreamerConfig;
import org.apache.hudi.utilities.config.SourceTestConfig;
import org.apache.hudi.utilities.deltastreamer.DeltaSync;
import org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer;
import org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamerTestBase;
import org.apache.hudi.utilities.deltastreamer.MockConfigurationHotUpdateStrategy;
import org.apache.hudi.utilities.schema.FilebasedSchemaProvider;
import org.apache.hudi.utilities.schema.SchemaProvider;
import org.apache.hudi.utilities.sources.CsvDFSSource;
import org.apache.hudi.utilities.sources.InputBatch;
import org.apache.hudi.utilities.sources.JdbcSource;
import org.apache.hudi.utilities.sources.JsonKafkaSource;
import org.apache.hudi.utilities.sources.ORCDFSSource;
import org.apache.hudi.utilities.sources.ParquetDFSSource;
import org.apache.hudi.utilities.sources.SqlSource;
import org.apache.hudi.utilities.sources.TestDataSource;
import org.apache.hudi.utilities.sources.TestParquetDFSSourceEmptyBatch;
import org.apache.hudi.utilities.streamer.HoodieStreamer;
import org.apache.hudi.utilities.streamer.NoNewDataTerminationStrategy;
import org.apache.hudi.utilities.streamer.StreamSync;
import org.apache.hudi.utilities.streamer.StreamerCheckpointUtils;
import org.apache.hudi.utilities.testutils.JdbcTestUtils;
import org.apache.hudi.utilities.testutils.UtilitiesTestBase;
import org.apache.hudi.utilities.testutils.sources.DistributedTestDataSource;
import org.apache.hudi.utilities.transform.SqlQueryBasedTransformer;
import org.apache.hudi.utilities.transform.Transformer;
import org.apache.kafka.common.errors.TopicExistsException;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.AnalysisException;
import org.apache.spark.sql.Column;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.api.java.UDF4;
import org.apache.spark.sql.functions;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructField;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.CsvSource;
import org.junit.jupiter.params.provider.EnumSource;
import org.junit.jupiter.params.provider.MethodSource;
import org.junit.jupiter.params.provider.ValueSource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class TestHoodieDeltaStreamer
extends HoodieDeltaStreamerTestBase {
    private static final Logger LOG = LoggerFactory.getLogger(TestHoodieDeltaStreamer.class);

    private void addRecordMerger(HoodieRecord.HoodieRecordType type, List<String> hoodieConfig) {
        if (type == HoodieRecord.HoodieRecordType.SPARK) {
            HashMap<String, String> opts = new HashMap<String, String>();
            opts.put(HoodieWriteConfig.RECORD_MERGE_IMPL_CLASSES.key(), DefaultSparkRecordMerger.class.getName());
            opts.put(HoodieStorageConfig.LOGFILE_DATA_BLOCK_FORMAT.key(), "parquet");
            opts.put(HoodieWriteConfig.RECORD_MERGE_MODE.key(), RecordMergeMode.CUSTOM.name());
            opts.put(HoodieWriteConfig.RECORD_MERGE_STRATEGY_ID.key(), "eeb8d96f-b1e4-49fd-bbf8-28ac514178e5");
            for (Map.Entry entry : opts.entrySet()) {
                hoodieConfig.add(String.format("%s=%s", entry.getKey(), entry.getValue()));
            }
            this.hudiOpts.putAll(opts);
        }
    }

    protected HoodieDeltaStreamer initialHoodieDeltaStreamer(String tableBasePath, int totalRecords, String asyncCluster, HoodieRecord.HoodieRecordType recordType) throws IOException {
        return this.initialHoodieDeltaStreamer(tableBasePath, totalRecords, asyncCluster, recordType, WriteOperationType.INSERT);
    }

    protected HoodieDeltaStreamer initialHoodieDeltaStreamer(String tableBasePath, int totalRecords, String asyncCluster, HoodieRecord.HoodieRecordType recordType, WriteOperationType writeOperationType) throws IOException {
        return this.initialHoodieDeltaStreamer(tableBasePath, totalRecords, asyncCluster, recordType, writeOperationType, Collections.emptySet());
    }

    protected HoodieDeltaStreamer initialHoodieDeltaStreamer(String tableBasePath, int totalRecords, String asyncCluster, HoodieRecord.HoodieRecordType recordType, WriteOperationType writeOperationType, Set<String> customConfigs) throws IOException {
        HoodieDeltaStreamer.Config cfg = HoodieDeltaStreamerTestBase.TestHelpers.makeConfig(tableBasePath, writeOperationType);
        this.addRecordMerger(recordType, cfg.configs);
        cfg.continuousMode = true;
        cfg.tableType = HoodieTableType.COPY_ON_WRITE.name();
        cfg.configs.addAll(TestHoodieDeltaStreamer.getTableServicesConfigs(totalRecords, "false", "", "", asyncCluster, ""));
        cfg.configs.addAll(this.getAllMultiWriterConfigs());
        customConfigs.forEach(config -> cfg.configs.add(config));
        return new HoodieDeltaStreamer(cfg, jsc);
    }

    protected HoodieClusteringJob initialHoodieClusteringJob(String tableBasePath, String clusteringInstantTime, Boolean runSchedule, String scheduleAndExecute, HoodieRecord.HoodieRecordType recordType) {
        return this.initialHoodieClusteringJob(tableBasePath, clusteringInstantTime, runSchedule, scheduleAndExecute, null, recordType);
    }

    protected HoodieClusteringJob initialHoodieClusteringJob(String tableBasePath, String clusteringInstantTime, Boolean runSchedule, String scheduleAndExecute) {
        return this.initialHoodieClusteringJob(tableBasePath, clusteringInstantTime, runSchedule, scheduleAndExecute, null, HoodieRecord.HoodieRecordType.AVRO);
    }

    protected HoodieClusteringJob initialHoodieClusteringJob(String tableBasePath, String clusteringInstantTime, Boolean runSchedule, String scheduleAndExecute, Boolean retryLastFailedClusteringJob, HoodieRecord.HoodieRecordType recordType) {
        HoodieClusteringJob.Config scheduleClusteringConfig = this.buildHoodieClusteringUtilConfig(tableBasePath, clusteringInstantTime, runSchedule, scheduleAndExecute, retryLastFailedClusteringJob);
        this.addRecordMerger(recordType, scheduleClusteringConfig.configs);
        scheduleClusteringConfig.configs.addAll(this.getAllMultiWriterConfigs());
        return new HoodieClusteringJob(jsc, scheduleClusteringConfig);
    }

    @AfterEach
    public void perTestAfterEach() {
        ++testNum;
    }

    @Test
    public void testProps() {
        TypedProperties props = new DFSPropertiesConfiguration(fs.getConf(), new StoragePath(basePath + "/" + "test-source.properties")).getProps();
        Assertions.assertEquals((int)2, (int)props.getInteger("hoodie.upsert.shuffle.parallelism"));
        Assertions.assertEquals((Object)"_row_key", (Object)props.getString("hoodie.datasource.write.recordkey.field"));
        Assertions.assertEquals((Object)"org.apache.hudi.utilities.deltastreamer.TestHoodieDeltaStreamer$TestGenerator", (Object)props.getString("hoodie.datasource.write.keygenerator.class"));
    }

    private static HoodieStreamer.Config getBaseConfig() {
        HoodieStreamer.Config base = new HoodieStreamer.Config();
        base.targetBasePath = "s3://mybucket/blah";
        base.tableType = "COPY_ON_WRITE";
        base.targetTableName = "test";
        return base;
    }

    private static Stream<Arguments> schemaEvolArgs() {
        return Stream.of(Arguments.of((Object[])new Object[]{DataSourceWriteOptions.COW_TABLE_TYPE_OPT_VAL(), true, HoodieRecord.HoodieRecordType.AVRO}), Arguments.of((Object[])new Object[]{DataSourceWriteOptions.COW_TABLE_TYPE_OPT_VAL(), false, HoodieRecord.HoodieRecordType.AVRO}), Arguments.of((Object[])new Object[]{DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL(), true, HoodieRecord.HoodieRecordType.AVRO}), Arguments.of((Object[])new Object[]{DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL(), false, HoodieRecord.HoodieRecordType.AVRO}), Arguments.of((Object[])new Object[]{DataSourceWriteOptions.COW_TABLE_TYPE_OPT_VAL(), true, HoodieRecord.HoodieRecordType.SPARK}), Arguments.of((Object[])new Object[]{DataSourceWriteOptions.COW_TABLE_TYPE_OPT_VAL(), false, HoodieRecord.HoodieRecordType.SPARK}), Arguments.of((Object[])new Object[]{DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL(), true, HoodieRecord.HoodieRecordType.SPARK}), Arguments.of((Object[])new Object[]{DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL(), false, HoodieRecord.HoodieRecordType.SPARK}));
    }

    private static Stream<Arguments> provideValidCliArgs() {
        HoodieStreamer.Config base = TestHoodieDeltaStreamer.getBaseConfig();
        HoodieStreamer.Config conf1 = TestHoodieDeltaStreamer.getBaseConfig();
        conf1.baseFileFormat = "PARQUET";
        HoodieStreamer.Config conf2 = TestHoodieDeltaStreamer.getBaseConfig();
        conf2.sourceLimit = Long.parseLong("500");
        HoodieStreamer.Config conf3 = TestHoodieDeltaStreamer.getBaseConfig();
        conf3.enableHiveSync = true;
        HoodieStreamer.Config conf4 = TestHoodieDeltaStreamer.getBaseConfig();
        conf4.configs = Arrays.asList("hoodie.datasource.hive_sync.table=test_table");
        HoodieStreamer.Config conf5 = TestHoodieDeltaStreamer.getBaseConfig();
        conf5.configs = Arrays.asList("hoodie.datasource.write.recordkey.field=Field1,Field2,Field3");
        HoodieStreamer.Config conf6 = TestHoodieDeltaStreamer.getBaseConfig();
        conf6.configs = Arrays.asList("hoodie.datasource.hive_sync.table=test_table", "hoodie.datasource.write.recordkey.field=Field1,Field2,Field3");
        HoodieStreamer.Config conf = TestHoodieDeltaStreamer.getBaseConfig();
        conf.baseFileFormat = "PARQUET";
        conf.sourceLimit = Long.parseLong("500");
        conf.enableHiveSync = true;
        conf.configs = Arrays.asList("hoodie.datasource.hive_sync.table=test_table", "hoodie.datasource.write.recordkey.field=Field1,Field2,Field3");
        String[] allConfig = new String[]{"--target-base-path", "s3://mybucket/blah", "--source-limit", "500", "--table-type", "COPY_ON_WRITE", "--target-table", "test", "--base-file-format", "PARQUET", "--enable-hive-sync", "--hoodie-conf", "hoodie.datasource.hive_sync.table=test_table", "--hoodie-conf", "hoodie.datasource.write.recordkey.field=Field1,Field2,Field3"};
        return Stream.of(Arguments.of((Object[])new Object[]{new String[]{"--target-base-path", "s3://mybucket/blah", "--table-type", "COPY_ON_WRITE", "--target-table", "test"}, base}), Arguments.of((Object[])new Object[]{new String[]{"--target-base-path", "s3://mybucket/blah", "--table-type", "COPY_ON_WRITE", "--target-table", "test", "--base-file-format", "PARQUET"}, conf1}), Arguments.of((Object[])new Object[]{new String[]{"--target-base-path", "s3://mybucket/blah", "--table-type", "COPY_ON_WRITE", "--target-table", "test", "--source-limit", "500"}, conf2}), Arguments.of((Object[])new Object[]{new String[]{"--target-base-path", "s3://mybucket/blah", "--table-type", "COPY_ON_WRITE", "--target-table", "test", "--enable-hive-sync"}, conf3}), Arguments.of((Object[])new Object[]{new String[]{"--target-base-path", "s3://mybucket/blah", "--table-type", "COPY_ON_WRITE", "--target-table", "test", "--hoodie-conf", "hoodie.datasource.hive_sync.table=test_table"}, conf4}), Arguments.of((Object[])new Object[]{new String[]{"--target-base-path", "s3://mybucket/blah", "--table-type", "COPY_ON_WRITE", "--target-table", "test", "--hoodie-conf", "hoodie.datasource.write.recordkey.field=Field1,Field2,Field3"}, conf5}), Arguments.of((Object[])new Object[]{new String[]{"--target-base-path", "s3://mybucket/blah", "--table-type", "COPY_ON_WRITE", "--target-table", "test", "--hoodie-conf", "hoodie.datasource.hive_sync.table=test_table", "--hoodie-conf", "hoodie.datasource.write.recordkey.field=Field1,Field2,Field3"}, conf6}), Arguments.of((Object[])new Object[]{allConfig, conf}));
    }

    @ParameterizedTest
    @MethodSource(value={"provideValidCliArgs"})
    public void testValidCommandLineArgs(String[] args, HoodieStreamer.Config expected) {
        Assertions.assertEquals((Object)expected, (Object)HoodieDeltaStreamer.getConfig((String[])args));
    }

    @Test
    public void testKafkaConnectCheckpointProvider() throws IOException {
        String tableBasePath = basePath + "/test_table";
        String bootstrapPath = basePath + "/kafka_topic1";
        String partitionPath = bootstrapPath + "/year=2016/month=05/day=01";
        String filePath = partitionPath + "/kafka_topic1+0+100+200.parquet";
        String checkpointProviderClass = "org.apache.hudi.utilities.checkpointing.KafkaConnectHdfsProvider";
        HoodieDeltaStreamer.Config cfg = HoodieDeltaStreamerTestBase.TestHelpers.makeDropAllConfig(tableBasePath, WriteOperationType.UPSERT);
        TypedProperties props = new DFSPropertiesConfiguration(fs.getConf(), new StoragePath(basePath + "/" + "test-source.properties")).getProps();
        props.put((Object)"hoodie.streamer.checkpoint.provider.path", (Object)bootstrapPath);
        cfg.initialCheckpointProvider = checkpointProviderClass;
        fs.mkdirs(new Path(bootstrapPath));
        fs.mkdirs(new Path(partitionPath));
        HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator();
        UtilitiesTestBase.Helpers.saveParquetToDFS(UtilitiesTestBase.Helpers.toGenericRecords(dataGenerator.generateInserts("000", Integer.valueOf(100))), new Path(filePath));
        HoodieDeltaStreamer deltaStreamer = new HoodieDeltaStreamer(cfg, jsc, fs, jsc.hadoopConfiguration(), Option.ofNullable((Object)props));
        Assertions.assertEquals((Object)"kafka_topic1,0:200", (Object)deltaStreamer.getConfig().checkpoint);
    }

    @Test
    public void testPropsWithInvalidKeyGenerator() {
        Exception e = (Exception)Assertions.assertThrows(HoodieException.class, () -> {
            String tableBasePath = basePath + "/test_table_invalid_key_gen";
            HoodieDeltaStreamer deltaStreamer = new HoodieDeltaStreamer(HoodieDeltaStreamerTestBase.TestHelpers.makeConfig(tableBasePath, WriteOperationType.BULK_INSERT, Collections.singletonList(TripsWithDistanceTransformer.class.getName()), "test-invalid.properties", false), jsc);
            deltaStreamer.sync();
        }, (String)"Should error out when setting the key generator class property to an invalid value");
        LOG.warn("Expected error during getting the key generator", (Throwable)e);
        Assertions.assertTrue((boolean)e.getMessage().contains("Unable to load class"));
    }

    private static Stream<Arguments> provideInferKeyGenArgs() {
        return Stream.of(Arguments.of((Object[])new Object[]{"test-infer-complex-keygen.properties", ComplexKeyGenerator.class.getName()}), Arguments.of((Object[])new Object[]{"test-infer-nonpartitioned-keygen.properties", NonpartitionedKeyGenerator.class.getName()}));
    }

    @ParameterizedTest
    @MethodSource(value={"provideInferKeyGenArgs"})
    public void testInferKeyGenerator(String propsFilename, String expectedKeyGeneratorClassName) throws Exception {
        String[] splitNames = propsFilename.split("\\.");
        String tableBasePath = basePath + "/" + splitNames[0];
        HoodieDeltaStreamer deltaStreamer = new HoodieDeltaStreamer(HoodieDeltaStreamerTestBase.TestHelpers.makeConfig(tableBasePath, WriteOperationType.UPSERT, Collections.singletonList(TripsWithDistanceTransformer.class.getName()), propsFilename, false), jsc);
        deltaStreamer.sync();
        HoodieTableMetaClient metaClient = HoodieTableMetaClient.builder().setConf(HoodieTestUtils.getDefaultStorageConf()).setBasePath(tableBasePath).build();
        Assertions.assertEquals((Object)expectedKeyGeneratorClassName, (Object)metaClient.getTableConfig().getKeyGeneratorClassName());
        Dataset res = sqlContext.read().format("hudi").load(tableBasePath);
        Assertions.assertEquals((long)1000L, (long)res.count());
        TestHoodieDeltaStreamer.assertUseV2Checkpoint(metaClient);
    }

    private static void assertUseV2Checkpoint(HoodieTableMetaClient metaClient) {
        metaClient.reloadActiveTimeline();
        Option metadata = HoodieClientTestUtils.getCommitMetadataForInstant((HoodieTableMetaClient)metaClient, (HoodieInstant)((HoodieInstant)metaClient.getActiveTimeline().lastInstant().get()));
        Assertions.assertFalse((boolean)metadata.isEmpty());
        Map extraMetadata = ((HoodieCommitMetadata)metadata.get()).getExtraMetadata();
        Assertions.assertTrue((boolean)extraMetadata.containsKey("streamer.checkpoint.key.v2"));
        Assertions.assertFalse((boolean)extraMetadata.containsKey("deltastreamer.checkpoint.key"));
    }

    @Test
    public void testTableCreation() throws Exception {
        Exception e = (Exception)Assertions.assertThrows(TableNotFoundException.class, () -> {
            fs.mkdirs(new Path(basePath + "/not_a_table"));
            HoodieDeltaStreamer deltaStreamer = new HoodieDeltaStreamer(HoodieDeltaStreamerTestBase.TestHelpers.makeConfig(basePath + "/not_a_table", WriteOperationType.BULK_INSERT), jsc);
            deltaStreamer.sync();
        }, (String)"Should error out when pointed out at a dir thats not a table");
        LOG.debug("Expected error during table creation", (Throwable)e);
    }

    @ParameterizedTest
    @ValueSource(booleans={true, false})
    public void testTableCreationContainsHiveStylePartitioningEnable(boolean configFlag) throws Exception {
        String tablePath = basePath + "/url_encode_and_hive_style_partitioning_enable_" + configFlag;
        HoodieDeltaStreamer.Config cfg = HoodieDeltaStreamerTestBase.TestHelpers.makeConfig(tablePath, WriteOperationType.INSERT);
        cfg.configs.add(HoodieTableConfig.HIVE_STYLE_PARTITIONING_ENABLE.key() + "=" + configFlag);
        cfg.configs.add(HoodieTableConfig.URL_ENCODE_PARTITIONING.key() + "=" + configFlag);
        HoodieDeltaStreamer deltaStreamer = new HoodieDeltaStreamer(cfg, jsc);
        deltaStreamer.getIngestionService().ingestOnce();
        HoodieTableMetaClient metaClient = HoodieTestUtils.createMetaClient((HoodieEngineContext)context, (String)tablePath);
        Assertions.assertEquals((Object)configFlag, (Object)Boolean.parseBoolean(metaClient.getTableConfig().getHiveStylePartitioningEnable()));
        Assertions.assertEquals((Object)configFlag, (Object)Boolean.parseBoolean(metaClient.getTableConfig().getUrlEncodePartitioning()));
    }

    @ParameterizedTest
    @EnumSource(value=HoodieTableVersion.class, names={"SIX", "EIGHT"})
    public void testPartitionKeyFieldsBasedOnVersion(HoodieTableVersion version) throws IOException {
        String tablePath = basePath + "/partition_key_fields_meta_client" + version.versionCode();
        HoodieDeltaStreamer.Config cfg = HoodieDeltaStreamerTestBase.TestHelpers.makeConfig(tablePath, WriteOperationType.INSERT);
        cfg.configs.add(HoodieWriteConfig.WRITE_TABLE_VERSION.key() + "=" + version.versionCode());
        cfg.configs.add(HoodieWriteConfig.KEYGENERATOR_CLASS_NAME.key() + "=" + CustomKeyGenerator.class.getName());
        cfg.configs.add("hoodie.datasource.write.partitionpath.field=partition_path:simple");
        HoodieDeltaStreamer deltaStreamer = new HoodieDeltaStreamer(cfg, jsc);
        deltaStreamer.getIngestionService().ingestOnce();
        HoodieTableMetaClient metaClient = HoodieTestUtils.createMetaClient((HoodieEngineContext)context, (String)tablePath);
        String expectedPartitionFields = version.equals((Object)HoodieTableVersion.SIX) ? "partition_path" : "partition_path:simple";
        Assertions.assertEquals((Object)expectedPartitionFields, (Object)metaClient.getTableConfig().getString(HoodieTableConfig.PARTITION_FIELDS));
    }

    @ParameterizedTest
    @EnumSource(value=HoodieRecord.HoodieRecordType.class, names={"AVRO", "SPARK"})
    public void testBulkInsertsAndUpsertsWithBootstrap(HoodieRecord.HoodieRecordType recordType) throws Exception {
        String tableBasePath = basePath + "/test_table";
        HoodieDeltaStreamer.Config cfg = HoodieDeltaStreamerTestBase.TestHelpers.makeConfig(tableBasePath, WriteOperationType.BULK_INSERT);
        this.addRecordMerger(recordType, cfg.configs);
        this.syncAndAssertRecordCount(cfg, 1000, tableBasePath, "00000", 1);
        cfg.sourceLimit = 0L;
        this.syncAndAssertRecordCount(cfg, 1000, tableBasePath, "00000", 1);
        cfg.sourceLimit = 2000L;
        cfg.operation = WriteOperationType.UPSERT;
        this.syncAndAssertRecordCount(cfg, 1950, tableBasePath, "00001", 2);
        List<Row> counts = this.countsPerCommit(tableBasePath, sqlContext);
        Assertions.assertEquals((long)1950L, (long)counts.stream().mapToLong(entry -> entry.getLong(1)).sum());
        String bootstrapSourcePath = basePath + "/src_bootstrapped";
        Dataset sourceDf = sqlContext.read().format("org.apache.hudi").load(tableBasePath);
        sourceDf.write().format("parquet").partitionBy(new String[]{"rider"}).save(bootstrapSourcePath);
        String newDatasetBasePath = basePath + "/test_dataset_bootstrapped";
        cfg.runBootstrap = true;
        cfg.configs.add(String.format("hoodie.bootstrap.base.path=%s", bootstrapSourcePath));
        cfg.configs.add(String.format("%s=%s", DataSourceWriteOptions.PARTITIONPATH_FIELD().key(), "rider"));
        cfg.configs.add(String.format("hoodie.datasource.write.keygenerator.class=%s", SimpleKeyGenerator.class.getName()));
        cfg.configs.add("hoodie.datasource.write.hive_style_partitioning=true");
        cfg.configs.add("hoodie.bootstrap.parallelism=5");
        cfg.configs.add(String.format("%s=false", HoodieMetadataConfig.ENABLE_METADATA_INDEX_COLUMN_STATS.key()));
        cfg.targetBasePath = newDatasetBasePath;
        new HoodieDeltaStreamer(cfg, jsc).sync();
        Dataset res = sqlContext.read().format("org.apache.hudi").load(newDatasetBasePath);
        LOG.info("Schema :");
        res.printSchema();
        this.assertRecordCount(1950L, newDatasetBasePath, sqlContext);
        res.registerTempTable("bootstrapped");
        Assertions.assertEquals((long)1950L, (long)sqlContext.sql("select distinct _hoodie_record_key from bootstrapped").count());
        sqlContext.sql("select * from bootstrapped").show();
        StructField[] fields = res.schema().fields();
        List<String> fieldNames = Arrays.asList(res.schema().fieldNames());
        List<String> expectedFieldNames = Arrays.asList(sourceDf.schema().fieldNames());
        Assertions.assertEquals((int)expectedFieldNames.size(), (int)fields.length);
        Assertions.assertTrue((boolean)fieldNames.containsAll(HoodieRecord.HOODIE_META_COLUMNS));
        Assertions.assertTrue((boolean)fieldNames.containsAll(expectedFieldNames));
        UtilitiesTestBase.Helpers.deleteFileFromDfs(fs, tableBasePath);
        UtilitiesTestBase.Helpers.deleteFileFromDfs(fs, bootstrapSourcePath);
        UtilitiesTestBase.Helpers.deleteFileFromDfs(fs, newDatasetBasePath);
    }

    @Test
    public void testModifiedTableConfigs() throws Exception {
        String tableBasePath = basePath + "/test_table_modified_configs";
        HoodieDeltaStreamer.Config cfg = HoodieDeltaStreamerTestBase.TestHelpers.makeConfig(tableBasePath, WriteOperationType.BULK_INSERT);
        this.syncAndAssertRecordCount(cfg, 1000, tableBasePath, "00000", 1);
        cfg.sourceLimit = 0L;
        this.syncAndAssertRecordCount(cfg, 1000, tableBasePath, "00000", 1);
        cfg.sourceLimit = 2000L;
        cfg.operation = WriteOperationType.UPSERT;
        cfg.configs.add(HoodieTableConfig.RECORDKEY_FIELDS.key() + "=differentval");
        Assertions.assertThrows(HoodieException.class, () -> this.syncAndAssertRecordCount(cfg, 1000, tableBasePath, "00000", 1));
        List<Row> counts = this.countsPerCommit(tableBasePath, sqlContext);
        Assertions.assertEquals((long)1000L, (long)counts.stream().mapToLong(entry -> entry.getLong(1)).sum());
        HoodieDeltaStreamer.Config newCfg = HoodieDeltaStreamerTestBase.TestHelpers.makeConfig(tableBasePath, WriteOperationType.BULK_INSERT);
        newCfg.sourceLimit = 2000L;
        newCfg.operation = WriteOperationType.UPSERT;
        this.syncAndAssertRecordCount(newCfg, 1950, tableBasePath, "00001", 2);
        List<Row> counts2 = this.countsPerCommit(tableBasePath, sqlContext);
        Assertions.assertEquals((long)1950L, (long)counts2.stream().mapToLong(entry -> entry.getLong(1)).sum());
    }

    private void syncAndAssertRecordCount(HoodieDeltaStreamer.Config cfg, Integer expected, String tableBasePath, String metadata, Integer totalCommits) throws Exception {
        new HoodieDeltaStreamer(cfg, jsc).sync();
        this.assertRecordCount(expected.intValue(), tableBasePath, sqlContext);
        this.assertDistanceCount(expected.intValue(), tableBasePath, sqlContext);
        HoodieDeltaStreamerTestBase.TestHelpers.assertCommitMetadata(metadata, tableBasePath, totalCommits);
    }

    @ParameterizedTest
    @MethodSource(value={"schemaEvolArgs"})
    public void testSchemaEvolution(String tableType, boolean useUserProvidedSchema, HoodieRecord.HoodieRecordType recordType) throws Exception {
        String tableBasePath = basePath + "/test_table_schema_evolution" + tableType + "_" + useUserProvidedSchema;
        defaultSchemaProviderClassName = FilebasedSchemaProvider.class.getName();
        HoodieDeltaStreamer.Config cfg = HoodieDeltaStreamerTestBase.TestHelpers.makeConfig(tableBasePath, WriteOperationType.INSERT, Collections.singletonList(TestIdentityTransformer.class.getName()), "test-source.properties", false, true, false, null, tableType);
        this.addRecordMerger(recordType, cfg.configs);
        cfg.payloadClassName = DefaultHoodieRecordPayload.class.getName();
        cfg.recordMergeStrategyId = "eeb8d96f-b1e4-49fd-bbf8-28ac514178e5";
        cfg.recordMergeMode = RecordMergeMode.EVENT_TIME_ORDERING;
        cfg.configs.add("hoodie.streamer.schemaprovider.source.schema.file=" + basePath + "/source.avsc");
        cfg.configs.add("hoodie.streamer.schemaprovider.target.schema.file=" + basePath + "/source.avsc");
        cfg.configs.add(DataSourceWriteOptions.RECONCILE_SCHEMA().key() + "=true");
        new HoodieDeltaStreamer(cfg, jsc).sync();
        TestHoodieDeltaStreamer.assertUseV2Checkpoint(HoodieTestUtils.createMetaClient((HoodieStorage)storage, (String)tableBasePath));
        this.assertRecordCount(1000L, tableBasePath, sqlContext);
        HoodieDeltaStreamerTestBase.TestHelpers.assertCommitMetadata("00000", tableBasePath, 1);
        cfg = HoodieDeltaStreamerTestBase.TestHelpers.makeConfig(tableBasePath, WriteOperationType.UPSERT, Collections.singletonList(TripsWithEvolvedOptionalFieldTransformer.class.getName()), "test-source.properties", false, true, false, null, tableType);
        this.addRecordMerger(recordType, cfg.configs);
        cfg.configs.add("hoodie.streamer.schemaprovider.source.schema.file=" + basePath + "/source.avsc");
        cfg.configs.add("hoodie.streamer.schemaprovider.target.schema.file=" + basePath + "/source_evolved.avsc");
        cfg.configs.add(DataSourceWriteOptions.RECONCILE_SCHEMA().key() + "=true");
        new HoodieDeltaStreamer(cfg, jsc).sync();
        TestHoodieDeltaStreamer.assertUseV2Checkpoint(HoodieTestUtils.createMetaClient((HoodieStorage)storage, (String)tableBasePath));
        this.assertRecordCount(1450L, tableBasePath, sqlContext);
        HoodieDeltaStreamerTestBase.TestHelpers.assertCommitMetadata("00001", tableBasePath, 2);
        List<Row> counts = this.countsPerCommit(tableBasePath, sqlContext);
        Assertions.assertEquals((long)1450L, (long)counts.stream().mapToLong(entry -> entry.getLong(1)).sum());
        sqlContext.read().format("org.apache.hudi").load(tableBasePath).createOrReplaceTempView("tmp_trips");
        long recordCount = sqlContext.sparkSession().sql("select * from tmp_trips where evoluted_optional_union_field is not NULL").count();
        Assertions.assertEquals((long)950L, (long)recordCount);
        if (!useUserProvidedSchema) {
            defaultSchemaProviderClassName = TestFileBasedSchemaProviderNullTargetSchema.class.getName();
        }
        cfg = HoodieDeltaStreamerTestBase.TestHelpers.makeConfig(tableBasePath, WriteOperationType.UPSERT, Collections.singletonList(TestIdentityTransformer.class.getName()), "test-source.properties", false, true, false, null, tableType);
        this.addRecordMerger(recordType, cfg.configs);
        cfg.configs.add("hoodie.streamer.schemaprovider.source.schema.file=" + basePath + "/source.avsc");
        if (useUserProvidedSchema) {
            cfg.configs.add("hoodie.streamer.schemaprovider.target.schema.file=" + basePath + "/source_evolved.avsc");
        }
        cfg.configs.add(DataSourceWriteOptions.RECONCILE_SCHEMA().key() + "=true");
        new HoodieDeltaStreamer(cfg, jsc).sync();
        this.assertRecordCount(1900L, tableBasePath, sqlContext);
        HoodieDeltaStreamerTestBase.TestHelpers.assertCommitMetadata("00002", tableBasePath, 3);
        counts = this.countsPerCommit(tableBasePath, sqlContext);
        Assertions.assertEquals((long)1900L, (long)counts.stream().mapToLong(entry -> entry.getLong(1)).sum());
        TableSchemaResolver tableSchemaResolver = new TableSchemaResolver(HoodieTestUtils.createMetaClient((HoodieStorage)storage, (String)tableBasePath));
        Schema tableSchema = tableSchemaResolver.getTableAvroSchema(false);
        Assertions.assertNotNull((Object)tableSchema);
        Schema expectedSchema = new Schema.Parser().parse((InputStream)fs.open(new Path(basePath + "/source_evolved.avsc")));
        Assertions.assertEquals((Object)expectedSchema, (Object)tableSchema);
        UtilitiesTestBase.Helpers.deleteFileFromDfs(fs, tableBasePath);
        UtilitiesTestBase.Helpers.deleteFileFromDfs(HadoopFSUtils.getFs((String)cfg.targetBasePath, (Configuration)jsc.hadoopConfiguration()), basePath + "/" + "test-source.properties");
        TestHoodieDeltaStreamer.writeCommonPropsToFile(storage, basePath);
        defaultSchemaProviderClassName = FilebasedSchemaProvider.class.getName();
    }

    private static Stream<Arguments> continuousModeArgs() {
        return Stream.of(Arguments.of((Object[])new Object[]{"AVRO", "EIGHT"}), Arguments.of((Object[])new Object[]{"SPARK", "EIGHT"}), Arguments.of((Object[])new Object[]{"AVRO", "SIX"}));
    }

    private static Stream<Arguments> continuousModeMorArgs() {
        return Stream.of(Arguments.of((Object[])new Object[]{"AVRO", "EIGHT"}), Arguments.of((Object[])new Object[]{"AVRO", "SIX"}));
    }

    @Timeout(value=600L)
    @ParameterizedTest
    @MethodSource(value={"continuousModeArgs"})
    public void testUpsertsCOWContinuousMode(HoodieRecord.HoodieRecordType recordType, String writeTableVersion) throws Exception {
        this.testUpsertsContinuousMode(HoodieTableType.COPY_ON_WRITE, "continuous_cow", recordType, writeTableVersion);
    }

    @Test
    public void testUpsertsCOW_ContinuousModeDisabled() throws Exception {
        String tableBasePath = basePath + "/non_continuous_cow";
        HoodieDeltaStreamer.Config cfg = HoodieDeltaStreamerTestBase.TestHelpers.makeConfig(tableBasePath, WriteOperationType.UPSERT);
        cfg.tableType = HoodieTableType.COPY_ON_WRITE.name();
        cfg.configs.add(String.format("%s=%s", HoodieMetricsConfig.TURN_METRICS_ON.key(), "true"));
        cfg.configs.add(String.format("%s=%s", HoodieMetricsConfig.METRICS_REPORTER_TYPE_VALUE.key(), MetricsReporterType.INMEMORY.name()));
        cfg.continuousMode = false;
        HoodieDeltaStreamer ds = new HoodieDeltaStreamer(cfg, jsc);
        ds.sync();
        TestHoodieDeltaStreamer.assertUseV2Checkpoint(HoodieTestUtils.createMetaClient((HoodieStorage)storage, (String)tableBasePath));
        this.assertRecordCount(1000L, tableBasePath, sqlContext);
        Assertions.assertFalse((boolean)Metrics.isInitialized((String)tableBasePath), (String)"Metrics should be shutdown");
        UtilitiesTestBase.Helpers.deleteFileFromDfs(fs, tableBasePath);
    }

    @Timeout(value=600L)
    @ParameterizedTest
    @EnumSource(value=HoodieRecord.HoodieRecordType.class, names={"AVRO"})
    public void testUpsertsMORContinuousModeShutdownGracefully(HoodieRecord.HoodieRecordType recordType) throws Exception {
        this.testUpsertsContinuousMode(HoodieTableType.MERGE_ON_READ, "continuous_cow", true, recordType, "EIGHT");
    }

    @Timeout(value=600L)
    @ParameterizedTest
    @MethodSource(value={"continuousModeMorArgs"})
    public void testUpsertsMORContinuousMode(HoodieRecord.HoodieRecordType recordType, String writeTableVersion) throws Exception {
        this.testUpsertsContinuousMode(HoodieTableType.MERGE_ON_READ, "continuous_mor", recordType, writeTableVersion);
    }

    @Test
    public void testUpsertsMOR_ContinuousModeDisabled() throws Exception {
        String tableBasePath = basePath + "/non_continuous_mor";
        HoodieDeltaStreamer.Config cfg = HoodieDeltaStreamerTestBase.TestHelpers.makeConfig(tableBasePath, WriteOperationType.UPSERT);
        cfg.tableType = HoodieTableType.MERGE_ON_READ.name();
        cfg.configs.add(String.format("%s=%s", HoodieMetricsConfig.TURN_METRICS_ON.key(), "true"));
        cfg.configs.add(String.format("%s=%s", HoodieMetricsConfig.METRICS_REPORTER_TYPE_VALUE.key(), MetricsReporterType.INMEMORY.name()));
        cfg.continuousMode = false;
        HoodieDeltaStreamer ds = new HoodieDeltaStreamer(cfg, jsc);
        ds.sync();
        TestHoodieDeltaStreamer.assertUseV2Checkpoint(HoodieTestUtils.createMetaClient((HoodieStorage)storage, (String)tableBasePath));
        this.assertRecordCount(1000L, tableBasePath, sqlContext);
        Assertions.assertFalse((boolean)Metrics.isInitialized((String)tableBasePath), (String)"Metrics should be shutdown");
        UtilitiesTestBase.Helpers.deleteFileFromDfs(fs, tableBasePath);
    }

    private void testUpsertsContinuousMode(HoodieTableType tableType, String tempDir, HoodieRecord.HoodieRecordType recordType, String writeTableVersion) throws Exception {
        this.testUpsertsContinuousMode(tableType, tempDir, false, recordType, writeTableVersion);
    }

    private void testUpsertsContinuousMode(HoodieTableType tableType, String tempDir, boolean testShutdownGracefully, HoodieRecord.HoodieRecordType recordType, String writeTableVersion) throws Exception {
        String tableBasePath = basePath + "/" + tempDir;
        int totalRecords = 3000;
        HoodieDeltaStreamer.Config cfg = HoodieDeltaStreamerTestBase.TestHelpers.makeConfig(tableBasePath, WriteOperationType.UPSERT);
        this.addRecordMerger(recordType, cfg.configs);
        cfg.continuousMode = true;
        if (testShutdownGracefully) {
            cfg.postWriteTerminationStrategyClass = NoNewDataTerminationStrategy.class.getName();
        }
        cfg.tableType = tableType.name();
        cfg.configs.add(String.format("%s=%d", SourceTestConfig.MAX_UNIQUE_RECORDS_PROP.key(), totalRecords));
        cfg.configs.add(String.format("%s=false", HoodieCleanConfig.AUTO_CLEAN.key()));
        if (HoodieTableVersion.SIX.name().equals(writeTableVersion)) {
            cfg.configs.add(String.format("%s=%s", HoodieWriteConfig.WRITE_TABLE_VERSION.key(), HoodieTableVersion.SIX.versionCode()));
        }
        HoodieDeltaStreamer ds = new HoodieDeltaStreamer(cfg, jsc);
        TestHoodieDeltaStreamer.deltaStreamerTestRunner(ds, cfg, r -> {
            if (tableType.equals((Object)HoodieTableType.MERGE_ON_READ)) {
                HoodieDeltaStreamerTestBase.TestHelpers.assertAtleastNDeltaCommits(5, tableBasePath);
                HoodieDeltaStreamerTestBase.TestHelpers.assertAtleastNCompactionCommits(2, tableBasePath);
            } else {
                HoodieDeltaStreamerTestBase.TestHelpers.assertAtleastNCompactionCommits(5, tableBasePath);
            }
            this.assertRecordCount(totalRecords, tableBasePath, sqlContext);
            this.assertDistanceCount(totalRecords, tableBasePath, sqlContext);
            if (testShutdownGracefully) {
                TestDataSource.returnEmptyBatch = true;
            }
            return true;
        });
        HoodieTableMetaClient hudiTblMetaClient = HoodieTableMetaClient.builder().setBasePath(cfg.targetBasePath).setConf(context.getStorageConf()).build();
        Assertions.assertEquals((Object)HoodieTableVersion.valueOf((String)writeTableVersion), (Object)hudiTblMetaClient.getTableConfig().getTableVersion());
        UtilitiesTestBase.Helpers.deleteFileFromDfs(fs, tableBasePath);
    }

    static void deltaStreamerTestRunner(HoodieDeltaStreamer ds, HoodieDeltaStreamer.Config cfg, Function<Boolean, Boolean> condition) throws Exception {
        TestHoodieDeltaStreamer.deltaStreamerTestRunner(ds, cfg, condition, "single_ds_job");
    }

    static void deltaStreamerTestRunner(HoodieDeltaStreamer ds, HoodieDeltaStreamer.Config cfg, Function<Boolean, Boolean> condition, String jobId) throws Exception {
        ExecutorService executor = Executors.newSingleThreadExecutor();
        Future<?> dsFuture = executor.submit(() -> {
            try {
                ds.sync();
            }
            catch (Exception ex) {
                LOG.warn("DS continuous job failed, hence not proceeding with condition check for " + jobId);
                throw new RuntimeException(ex.getMessage(), ex);
            }
        });
        HoodieDeltaStreamerTestBase.TestHelpers.waitTillCondition(condition, dsFuture, 360L);
        if (cfg != null && !cfg.postWriteTerminationStrategyClass.isEmpty()) {
            TestHoodieDeltaStreamer.awaitDeltaStreamerShutdown(ds);
        } else {
            ds.shutdownGracefully();
            dsFuture.get();
        }
        executor.shutdown();
    }

    static void awaitDeltaStreamerShutdown(HoodieDeltaStreamer ds) throws InterruptedException {
        boolean shutDownRequested = false;
        int timeSoFar = 0;
        while (!shutDownRequested) {
            shutDownRequested = ds.getIngestionService().isShutdownRequested();
            Thread.sleep(500L);
            if ((timeSoFar += 500) <= 120000) continue;
            Assertions.fail((String)"Deltastreamer should have shutdown by now");
        }
        boolean shutdownComplete = false;
        while (!shutdownComplete) {
            shutdownComplete = ds.getIngestionService().isShutdown();
            Thread.sleep(500L);
            if ((timeSoFar += 500) <= 120000) continue;
            Assertions.fail((String)"Deltastreamer should have shutdown by now");
        }
    }

    static void deltaStreamerTestRunner(HoodieDeltaStreamer ds, Function<Boolean, Boolean> condition) throws Exception {
        TestHoodieDeltaStreamer.deltaStreamerTestRunner(ds, null, condition);
    }

    @ParameterizedTest
    @CsvSource(value={"AVRO", "SPARK"})
    public void testInlineClustering(HoodieRecord.HoodieRecordType recordType) throws Exception {
        String tableBasePath = basePath + "/inlineClustering";
        int totalRecords = 3000;
        HoodieDeltaStreamer.Config cfg = HoodieDeltaStreamerTestBase.TestHelpers.makeConfig(tableBasePath, WriteOperationType.UPSERT);
        this.addRecordMerger(recordType, cfg.configs);
        cfg.continuousMode = true;
        cfg.tableType = HoodieTableType.MERGE_ON_READ.name();
        cfg.configs.addAll(TestHoodieDeltaStreamer.getTableServicesConfigs(totalRecords, "false", "true", "2", "", ""));
        cfg.configs.add(String.format("%s=%s", "hoodie.datasource.write.row.writer.enable", "false"));
        HoodieDeltaStreamer ds = new HoodieDeltaStreamer(cfg, jsc);
        TestHoodieDeltaStreamer.deltaStreamerTestRunner(ds, cfg, r -> {
            HoodieDeltaStreamerTestBase.TestHelpers.assertAtLeastNCommits(2, tableBasePath);
            HoodieDeltaStreamerTestBase.TestHelpers.assertAtLeastNReplaceCommits(1, tableBasePath);
            return true;
        });
        UtilitiesTestBase.Helpers.deleteFileFromDfs(fs, tableBasePath);
    }

    @Test
    public void testDeltaSyncWithPendingClustering() throws Exception {
        String tableBasePath = basePath + "/inlineClusteringPending";
        int totalRecords = 2000;
        HoodieDeltaStreamer.Config cfg = HoodieDeltaStreamerTestBase.TestHelpers.makeConfig(tableBasePath, WriteOperationType.INSERT);
        cfg.continuousMode = false;
        cfg.tableType = HoodieTableType.COPY_ON_WRITE.name();
        cfg.configs.add(String.format("%s=%s", "hoodie.datasource.write.row.writer.enable", "false"));
        HoodieDeltaStreamer ds = new HoodieDeltaStreamer(cfg, jsc);
        ds.sync();
        HoodieDeltaStreamerTestBase.TestHelpers.assertAtLeastNCommits(1, tableBasePath);
        HoodieClusteringJob clusteringJob = this.initialHoodieClusteringJob(tableBasePath, null, false, "schedule");
        clusteringJob.cluster(0);
        HoodieTableMetaClient meta = HoodieTestUtils.createMetaClient((HoodieStorage)storage, (String)tableBasePath);
        List hoodieClusteringInstants = meta.getActiveTimeline().filterPendingClusteringTimeline().getInstants();
        HoodieInstant clusteringRequest = (HoodieInstant)hoodieClusteringInstants.get(0);
        meta.getActiveTimeline().transitionClusterRequestedToInflight(clusteringRequest, Option.empty());
        cfg.configs.addAll(TestHoodieDeltaStreamer.getTableServicesConfigs(totalRecords, "false", "true", "2", "", ""));
        cfg.retryLastPendingInlineClusteringJob = true;
        HoodieDeltaStreamer ds2 = new HoodieDeltaStreamer(cfg, jsc);
        ds2.sync();
        String completeClusteringTimeStamp = ((HoodieInstant)meta.reloadActiveTimeline().getCompletedReplaceTimeline().lastInstant().get()).requestedTime();
        Assertions.assertEquals((Object)clusteringRequest.requestedTime(), (Object)completeClusteringTimeStamp);
        HoodieDeltaStreamerTestBase.TestHelpers.assertAtLeastNCommits(2, tableBasePath);
        HoodieDeltaStreamerTestBase.TestHelpers.assertAtLeastNReplaceCommits(1, tableBasePath);
    }

    @Test
    public void testDeltaSyncWithPendingCompaction() throws Exception {
        PARQUET_SOURCE_ROOT = basePath + "parquetFilesDfs" + testNum;
        int parquetRecordsCount = 100;
        HoodieTestDataGenerator dataGenerator = TestHoodieDeltaStreamer.prepareParquetDFSFiles(parquetRecordsCount, PARQUET_SOURCE_ROOT, "1.parquet", false, null, null);
        TypedProperties extraProps = new TypedProperties();
        extraProps.setProperty("hoodie.compact.inline", "true");
        extraProps.setProperty("hoodie.compact.inline.max.delta.commits", "2");
        extraProps.setProperty("hoodie.datasource.write.table.type", "MERGE_ON_READ");
        extraProps.setProperty("hoodie.datasource.compaction.async.enable", "false");
        this.prepareParquetDFSSource(false, false, "source.avsc", "target.avsc", "test-parquet-dfs-source.properties", PARQUET_SOURCE_ROOT, false, "partition_path", "", extraProps, false);
        String tableBasePath = basePath + "test_parquet_table" + testNum;
        HoodieDeltaStreamer.Config deltaCfg = HoodieDeltaStreamerTestBase.TestHelpers.makeConfig(tableBasePath, WriteOperationType.UPSERT, ParquetDFSSource.class.getName(), null, "test-parquet-dfs-source.properties", false, false, 100000, false, null, "MERGE_ON_READ", "timestamp", null);
        deltaCfg.retryLastPendingInlineCompactionJob = false;
        HoodieDeltaStreamer deltaStreamer = new HoodieDeltaStreamer(deltaCfg, jsc);
        deltaStreamer.sync();
        this.assertRecordCount(parquetRecordsCount, tableBasePath, sqlContext);
        TestHoodieDeltaStreamer.prepareParquetDFSUpdates(100, PARQUET_SOURCE_ROOT, "2.parquet", false, null, null, dataGenerator, "001");
        deltaStreamer.sync();
        HoodieDeltaStreamerTestBase.TestHelpers.assertAtleastNDeltaCommits(2, tableBasePath);
        HoodieDeltaStreamerTestBase.TestHelpers.assertAtleastNCompactionCommits(1, tableBasePath);
        HoodieTableMetaClient meta = HoodieTestUtils.createMetaClient((HoodieStorage)storage, (String)tableBasePath);
        HoodieTimeline timeline = meta.getActiveTimeline().getCommitAndReplaceTimeline().filterCompletedInstants();
        HoodieInstant commitInstant = (HoodieInstant)timeline.lastInstant().get();
        String commitFileName = tableBasePath + "/.hoodie/timeline/" + HoodieTestUtils.INSTANT_FILE_NAME_GENERATOR.getFileName(commitInstant);
        fs.delete(new Path(commitFileName), false);
        TestHoodieDeltaStreamer.prepareParquetDFSUpdates(100, PARQUET_SOURCE_ROOT, "3.parquet", false, null, null, dataGenerator, "002");
        deltaStreamer = new HoodieDeltaStreamer(deltaCfg, jsc);
        deltaStreamer.sync();
        HoodieDeltaStreamerTestBase.TestHelpers.assertAtleastNDeltaCommits(3, tableBasePath);
        meta = HoodieTestUtils.createMetaClient((HoodieStorage)storage, (String)tableBasePath);
        timeline = meta.getActiveTimeline().getRollbackTimeline();
        Assertions.assertEquals((int)1, (int)timeline.getInstants().size());
    }

    @ParameterizedTest
    @ValueSource(booleans={true, false})
    public void testCleanerDeleteReplacedDataWithArchive(Boolean asyncClean) throws Exception {
        String tableBasePath = basePath + "/cleanerDeleteReplacedDataWithArchive" + asyncClean;
        int totalRecords = 3000;
        HoodieDeltaStreamer.Config cfg = HoodieDeltaStreamerTestBase.TestHelpers.makeConfig(tableBasePath, WriteOperationType.INSERT);
        this.addRecordMerger(HoodieRecord.HoodieRecordType.AVRO, cfg.configs);
        cfg.continuousMode = true;
        cfg.tableType = HoodieTableType.COPY_ON_WRITE.name();
        cfg.configs.addAll(TestHoodieDeltaStreamer.getTableServicesConfigs(totalRecords, "false", "true", "2", "", ""));
        cfg.configs.add(String.format("%s=%s", HoodieCompactionConfig.PARQUET_SMALL_FILE_LIMIT.key(), "0"));
        cfg.configs.add(String.format("%s=%s", HoodieMetadataConfig.COMPACT_NUM_DELTA_COMMITS.key(), "1"));
        cfg.configs.add(String.format("%s=%s", HoodieWriteConfig.MARKERS_TYPE.key(), "DIRECT"));
        cfg.configs.add(String.format("%s=%s", "hoodie.datasource.write.row.writer.enable", "false"));
        HoodieDeltaStreamer ds = new HoodieDeltaStreamer(cfg, jsc);
        TestHoodieDeltaStreamer.deltaStreamerTestRunner(ds, cfg, r -> {
            HoodieDeltaStreamerTestBase.TestHelpers.assertAtLeastNReplaceCommits(2, tableBasePath);
            return true;
        });
        HoodieDeltaStreamerTestBase.TestHelpers.assertAtLeastNCommits(6, tableBasePath);
        HoodieDeltaStreamerTestBase.TestHelpers.assertAtLeastNReplaceCommits(2, tableBasePath);
        HoodieTableMetaClient meta = HoodieTestUtils.createMetaClient((HoodieStorage)storage, (String)tableBasePath);
        HoodieTimeline replacedTimeline = meta.reloadActiveTimeline().getCompletedReplaceTimeline();
        Option firstReplaceHoodieInstant = replacedTimeline.nthFromLastInstant(1);
        Assertions.assertTrue((boolean)firstReplaceHoodieInstant.isPresent());
        HoodieReplaceCommitMetadata firstReplaceMetadata = replacedTimeline.readReplaceCommitMetadata((HoodieInstant)firstReplaceHoodieInstant.get());
        Map partitionToReplaceFileIds = firstReplaceMetadata.getPartitionToReplaceFileIds();
        String partitionName = null;
        List replacedFileIDs = null;
        for (Map.Entry entry : partitionToReplaceFileIds.entrySet()) {
            partitionName = String.valueOf(entry.getKey());
            replacedFileIDs = (List)entry.getValue();
        }
        Assertions.assertNotNull(partitionName);
        Assertions.assertNotNull(replacedFileIDs);
        ArrayList<String> replacedFilePaths = new ArrayList<String>();
        StoragePath partitionPath = new StoragePath(meta.getBasePath(), partitionName);
        List hoodieFiles = meta.getStorage().listFiles(partitionPath);
        for (StoragePathInfo pathInfo : hoodieFiles) {
            String file = pathInfo.getPath().toUri().toString();
            for (Object replacedFileID : replacedFileIDs) {
                if (!file.contains(String.valueOf(replacedFileID))) continue;
                replacedFilePaths.add(file);
            }
        }
        Assertions.assertFalse((boolean)replacedFilePaths.isEmpty());
        List<String> configs = TestHoodieDeltaStreamer.getTableServicesConfigs(1, "true", "true", "6", "", "");
        configs.add(String.format("%s=%s", HoodieCleanConfig.CLEANER_POLICY.key(), "KEEP_LATEST_COMMITS"));
        configs.add(String.format("%s=%s", HoodieCleanConfig.CLEANER_COMMITS_RETAINED.key(), "1"));
        configs.add(String.format("%s=%s", HoodieArchivalConfig.MIN_COMMITS_TO_KEEP.key(), "4"));
        configs.add(String.format("%s=%s", HoodieArchivalConfig.MAX_COMMITS_TO_KEEP.key(), "5"));
        configs.add(String.format("%s=%s", HoodieCleanConfig.ASYNC_CLEAN.key(), asyncClean));
        configs.add(String.format("%s=%s", HoodieMetadataConfig.COMPACT_NUM_DELTA_COMMITS.key(), "1"));
        configs.add(String.format("%s=%s", HoodieWriteConfig.MARKERS_TYPE.key(), "DIRECT"));
        if (asyncClean.booleanValue()) {
            configs.add(String.format("%s=%s", HoodieWriteConfig.WRITE_CONCURRENCY_MODE.key(), WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL.name()));
            configs.add(String.format("%s=%s", HoodieCleanConfig.FAILED_WRITES_CLEANER_POLICY.key(), HoodieFailedWritesCleaningPolicy.LAZY.name()));
            configs.add(String.format("%s=%s", HoodieLockConfig.LOCK_PROVIDER_CLASS_NAME.key(), InProcessLockProvider.class.getName()));
        }
        this.addRecordMerger(HoodieRecord.HoodieRecordType.AVRO, configs);
        cfg.configs = configs;
        cfg.continuousMode = false;
        ds = new HoodieDeltaStreamer(cfg, jsc);
        ds.sync();
        long count = meta.reloadActiveTimeline().getCompletedReplaceTimeline().getInstantsAsStream().filter(instant -> ((HoodieInstant)firstReplaceHoodieInstant.get()).equals(instant)).count();
        Assertions.assertEquals((long)0L, (long)count);
        for (String replacedFilePath : replacedFilePaths) {
            Assertions.assertFalse((boolean)meta.getStorage().exists(new StoragePath(replacedFilePath)));
        }
        UtilitiesTestBase.Helpers.deleteFileFromDfs(fs, tableBasePath);
    }

    @ParameterizedTest
    @ValueSource(booleans={false, true})
    public void testReleaseResources(boolean testFailureCase) throws Exception {
        String tableBasePath = basePath + "/inlineClusteringPending_" + testFailureCase;
        int totalRecords = 1000;
        HoodieDeltaStreamer.Config cfg = HoodieDeltaStreamerTestBase.TestHelpers.makeConfig(tableBasePath, WriteOperationType.UPSERT);
        cfg.continuousMode = false;
        cfg.tableType = HoodieTableType.COPY_ON_WRITE.name();
        cfg.configs.add(String.format("%s=%s", "hoodie.datasource.write.row.writer.enable", "false"));
        HoodieDeltaStreamer ds = new HoodieDeltaStreamer(cfg, jsc);
        ds.sync();
        ds.shutdownGracefully();
        HoodieDeltaStreamerTestBase.TestHelpers.assertAtLeastNCommits(1, tableBasePath);
        HoodieClusteringJob clusteringJob = this.initialHoodieClusteringJob(tableBasePath, null, false, "schedule");
        clusteringJob.cluster(0);
        HoodieTableMetaClient tableMetaClient = HoodieTableMetaClient.builder().setConf(context.getStorageConf()).setBasePath(tableBasePath).build();
        Assertions.assertEquals((int)1, (int)tableMetaClient.getActiveTimeline().filterPendingClusteringTimeline().getInstants().size());
        cfg.configs.addAll(TestHoodieDeltaStreamer.getTableServicesConfigs(totalRecords, "false", "true", "2", "", ""));
        cfg.retryLastPendingInlineClusteringJob = !testFailureCase;
        TypedProperties properties = HoodieStreamer.combineProperties((HoodieStreamer.Config)cfg, (Option)Option.empty(), (Configuration)jsc.hadoopConfiguration());
        SchemaProvider schemaProvider = UtilHelpers.wrapSchemaProviderWithPostProcessor((SchemaProvider)UtilHelpers.createSchemaProvider((String)cfg.schemaProviderClassName, (TypedProperties)properties, (JavaSparkContext)jsc), (TypedProperties)properties, (JavaSparkContext)jsc, (List)cfg.transformerClassNames);
        try (TestReleaseResourcesStreamSync streamSync = new TestReleaseResourcesStreamSync(cfg, sparkSession, schemaProvider, properties, jsc, fs, jsc.hadoopConfiguration(), client -> true);){
            block17: {
                Assertions.assertTrue((boolean)streamSync.releaseResourcesCalledSet.isEmpty());
                try {
                    streamSync.syncOnce();
                    if (testFailureCase) {
                        Assertions.fail((String)"Should not reach here when there is conflict w/ pending clustering and when retryLastPendingInlineClusteringJob is set to false");
                    }
                }
                catch (HoodieException e) {
                    if (testFailureCase) break block17;
                    Assertions.fail((String)"Should not reach here when retryLastPendingInlineClusteringJob is set to true");
                }
            }
            tableMetaClient = HoodieTableMetaClient.reload((HoodieTableMetaClient)tableMetaClient);
            Option failedInstant = tableMetaClient.getActiveTimeline().getCommitTimeline().lastInstant();
            Assertions.assertTrue((boolean)failedInstant.isPresent());
            Assertions.assertTrue((boolean)(testFailureCase ? !((HoodieInstant)failedInstant.get()).isCompleted() : ((HoodieInstant)failedInstant.get()).isCompleted()));
            if (testFailureCase) {
                Assertions.assertEquals((int)1, (int)streamSync.releaseResourcesCalledSet.size());
                Assertions.assertTrue((boolean)streamSync.releaseResourcesCalledSet.contains(((HoodieInstant)failedInstant.get()).requestedTime()));
            } else {
                Assertions.assertTrue((boolean)streamSync.releaseResourcesCalledSet.isEmpty());
            }
            HoodieHeartbeatClient heartbeatClient = new HoodieHeartbeatClient(tableMetaClient.getStorage(), basePath, Long.valueOf(((Integer)HoodieWriteConfig.CLIENT_HEARTBEAT_INTERVAL_IN_MS.defaultValue()).intValue()), (Integer)HoodieWriteConfig.CLIENT_HEARTBEAT_NUM_TOLERABLE_MISSES.defaultValue());
            Assertions.assertTrue((boolean)heartbeatClient.isHeartbeatExpired(((HoodieInstant)failedInstant.get()).requestedTime()));
            heartbeatClient.close();
        }
    }

    private List<String> getAllMultiWriterConfigs() {
        ArrayList<String> configs = new ArrayList<String>();
        configs.add(String.format("%s=%s", HoodieLockConfig.LOCK_PROVIDER_CLASS_NAME.key(), InProcessLockProvider.class.getCanonicalName()));
        configs.add(String.format("%s=%s", "hoodie.write.lock.wait_time_ms", "3000"));
        configs.add(String.format("%s=%s", HoodieWriteConfig.WRITE_CONCURRENCY_MODE.key(), WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL.name()));
        configs.add(String.format("%s=%s", HoodieCleanConfig.FAILED_WRITES_CLEANER_POLICY.key(), HoodieFailedWritesCleaningPolicy.LAZY.name()));
        return configs;
    }

    private HoodieClusteringJob.Config buildHoodieClusteringUtilConfig(String basePath, String clusteringInstantTime, Boolean runSchedule) {
        return this.buildHoodieClusteringUtilConfig(basePath, clusteringInstantTime, runSchedule, null, null);
    }

    private HoodieClusteringJob.Config buildHoodieClusteringUtilConfig(String basePath, String clusteringInstantTime, Boolean runSchedule, String runningMode, Boolean retryLastFailedClusteringJob) {
        HoodieClusteringJob.Config config = new HoodieClusteringJob.Config();
        config.basePath = basePath;
        config.clusteringInstantTime = clusteringInstantTime;
        config.runSchedule = runSchedule;
        config.propsFilePath = UtilitiesTestBase.basePath + "/clusteringjob.properties";
        config.runningMode = runningMode;
        if (retryLastFailedClusteringJob != null) {
            config.retryLastFailedClusteringJob = retryLastFailedClusteringJob;
        }
        config.configs.add(String.format("%s=%s", "hoodie.datasource.write.row.writer.enable", "false"));
        return config;
    }

    private HoodieIndexer.Config buildIndexerConfig(String basePath, String tableName, String indexInstantTime, String runningMode, String indexTypes) {
        return this.buildIndexerConfig(basePath, tableName, indexInstantTime, runningMode, indexTypes, Collections.emptyList());
    }

    private HoodieIndexer.Config buildIndexerConfig(String basePath, String tableName, String indexInstantTime, String runningMode, String indexTypes, List<String> configs) {
        HoodieIndexer.Config indexerConfig = new HoodieIndexer.Config();
        indexerConfig.basePath = basePath;
        indexerConfig.tableName = tableName;
        indexerConfig.indexInstantTime = indexInstantTime;
        indexerConfig.propsFilePath = UtilitiesTestBase.basePath + "/indexer.properties";
        indexerConfig.runningMode = runningMode;
        indexerConfig.indexTypes = indexTypes;
        indexerConfig.configs = configs;
        return indexerConfig;
    }

    @ParameterizedTest
    @EnumSource(value=HoodieRecord.HoodieRecordType.class, names={"AVRO", "SPARK"})
    public void testHoodieIndexer(HoodieRecord.HoodieRecordType recordType) throws Exception {
        String tableBasePath = basePath + "/asyncindexer";
        HoodieDeltaStreamer ds = this.initialHoodieDeltaStreamer(tableBasePath, 1000, "false", recordType, WriteOperationType.INSERT, Collections.singleton(HoodieMetadataConfig.ENABLE_METADATA_INDEX_COLUMN_STATS.key() + "=true"));
        TestHoodieDeltaStreamer.deltaStreamerTestRunner(ds, r -> {
            HoodieDeltaStreamerTestBase.TestHelpers.assertAtLeastNCommits(2, tableBasePath);
            Option scheduleIndexInstantTime = Option.empty();
            try {
                HoodieIndexer scheduleIndexingJob = new HoodieIndexer(jsc, this.buildIndexerConfig(tableBasePath, ds.getConfig().targetTableName, null, "schedule", "COLUMN_STATS"));
                scheduleIndexInstantTime = scheduleIndexingJob.doSchedule();
            }
            catch (Exception e) {
                LOG.info("Schedule indexing failed", (Throwable)e);
                return false;
            }
            if (scheduleIndexInstantTime.isPresent()) {
                HoodieDeltaStreamerTestBase.TestHelpers.assertPendingIndexCommit(tableBasePath);
                LOG.info("Schedule indexing success, now build index with instant time " + (String)scheduleIndexInstantTime.get());
                HoodieIndexer runIndexingJob = new HoodieIndexer(jsc, this.buildIndexerConfig(tableBasePath, ds.getConfig().targetTableName, (String)scheduleIndexInstantTime.get(), "execute", "COLUMN_STATS"));
                runIndexingJob.start(0);
                LOG.info("Metadata indexing success");
                HoodieDeltaStreamerTestBase.TestHelpers.assertCompletedIndexCommit(tableBasePath);
            } else {
                LOG.warn("Metadata indexing failed");
            }
            return true;
        });
        UtilitiesTestBase.Helpers.deleteFileFromDfs(fs, tableBasePath);
    }

    @Disabled(value="HUDI-8951")
    public void testHoodieIndexerExecutionAfterCommit() throws Exception {
        String tableBasePath = basePath + "/asyncindexer_commit";
        HashSet<String> customConfigs = new HashSet<String>();
        customConfigs.add(HoodieIndexConfig.INDEX_TYPE.key() + "=GLOBAL_SIMPLE");
        customConfigs.add(HoodieWriteConfig.MARKERS_TYPE.key() + "=DIRECT");
        HoodieDeltaStreamer ds = this.initialHoodieDeltaStreamer(tableBasePath, 100, "false", HoodieRecord.HoodieRecordType.AVRO, WriteOperationType.UPSERT, customConfigs);
        TestHoodieDeltaStreamer.deltaStreamerTestRunner(ds, r -> {
            HoodieDeltaStreamerTestBase.TestHelpers.assertAtLeastNCommits(1, tableBasePath);
            try {
                HoodieIndexer scheduleIndexingJob = new HoodieIndexer(jsc, this.buildIndexerConfig(tableBasePath, ds.getConfig().targetTableName, null, "schedule", "RECORD_INDEX", Arrays.asList(HoodieMetadataConfig.RECORD_INDEX_ENABLE_PROP.key() + "=true", HoodieWriteConfig.MARKERS_TYPE.key() + "=DIRECT")));
                Option scheduleIndexInstantTime = scheduleIndexingJob.doSchedule();
                HoodieDeltaStreamerTestBase.TestHelpers.assertPendingIndexCommit(tableBasePath);
                LOG.info("Schedule indexing success, now build index with instant time " + (String)scheduleIndexInstantTime.get());
                HoodieDeltaStreamerTestBase.TestHelpers.waitFor(() -> {
                    HoodieTableMetaClient metaClient = HoodieTestUtils.createMetaClient((StorageConfiguration)storage.getConf(), (String)tableBasePath);
                    HoodieTimeline pendingCommitsTimeline = metaClient.getCommitsTimeline().filterInflightsAndRequested();
                    return !pendingCommitsTimeline.empty();
                });
                HoodieIndexer runIndexingJob = new HoodieIndexer(jsc, this.buildIndexerConfig(tableBasePath, ds.getConfig().targetTableName, (String)scheduleIndexInstantTime.get(), "execute", "RECORD_INDEX", Arrays.asList(HoodieMetadataConfig.RECORD_INDEX_ENABLE_PROP.key() + "=true", HoodieWriteConfig.MARKERS_TYPE.key() + "=DIRECT")));
                runIndexingJob.start(0);
                LOG.info("Metadata indexing success");
                HoodieDeltaStreamerTestBase.TestHelpers.assertCompletedIndexCommit(tableBasePath);
                HoodieTableMetaClient metaClient = HoodieTestUtils.createMetaClient((StorageConfiguration)storage.getConf(), (String)tableBasePath);
                String indexCompletedTime = ((HoodieInstant)metaClient.reloadActiveTimeline().getAllCommitsTimeline().filterCompletedIndexTimeline().firstInstant().get()).getCompletionTime();
                Assertions.assertTrue((boolean)metaClient.getActiveTimeline().getCommitsTimeline().filterInflightsAndRequested().findInstantsBefore(indexCompletedTime).empty());
            }
            catch (Exception e) {
                Assertions.fail((String)"Indexing job should not have failed", (Throwable)e);
            }
            return true;
        });
        TestHoodieDeltaStreamer.validateRecordIndex(tableBasePath);
        UtilitiesTestBase.Helpers.deleteFileFromDfs(fs, tableBasePath);
    }

    private static void validateRecordIndex(String tableBasePath) {
        HoodieMetadataTableValidator.Config config = new HoodieMetadataTableValidator.Config();
        config.basePath = tableBasePath;
        config.validateLatestFileSlices = true;
        config.validateAllFileGroups = true;
        config.validateRecordIndexContent = true;
        config.validateRecordIndexCount = true;
        HoodieMetadataTableValidator validator = new HoodieMetadataTableValidator(jsc, config);
        Assertions.assertTrue((boolean)validator.run());
        Assertions.assertFalse((boolean)validator.hasValidationFailure());
        Assertions.assertTrue((boolean)validator.getThrowables().isEmpty());
    }

    @Disabled(value="HUDI-8951")
    @ParameterizedTest
    @EnumSource(value=HoodieRecord.HoodieRecordType.class, names={"AVRO", "SPARK"})
    public void testHoodieIndexerExecutionAfterClustering(HoodieRecord.HoodieRecordType recordType) throws Exception {
        String tableBasePath = basePath + "/asyncindexer_cluster";
        HoodieDeltaStreamer ds = this.initialHoodieDeltaStreamer(tableBasePath, 1000, "true", recordType, WriteOperationType.UPSERT, new HashSet<String>(Arrays.asList(HoodieIndexConfig.INDEX_TYPE.key() + "=GLOBAL_SIMPLE", HoodieClusteringConfig.ASYNC_CLUSTERING_MAX_COMMITS.key() + "=1", HoodieWriteConfig.MARKERS_TYPE.key() + "=DIRECT")));
        TestHoodieDeltaStreamer.deltaStreamerTestRunner(ds, r -> {
            HoodieDeltaStreamerTestBase.TestHelpers.assertAtLeastNCommits(1, tableBasePath);
            Option scheduleIndexInstantTime = Option.empty();
            try {
                HoodieIndexer scheduleIndexingJob = new HoodieIndexer(jsc, this.buildIndexerConfig(tableBasePath, ds.getConfig().targetTableName, null, "schedule", "RECORD_INDEX"));
                scheduleIndexInstantTime = scheduleIndexingJob.doSchedule();
                HoodieDeltaStreamerTestBase.TestHelpers.assertPendingIndexCommit(tableBasePath);
                LOG.info("Schedule indexing success, now build index with instant time " + (String)scheduleIndexInstantTime.get());
                HoodieDeltaStreamerTestBase.TestHelpers.waitFor(() -> {
                    HoodieTableMetaClient metaClient = HoodieTestUtils.createMetaClient((HoodieStorage)storage, (String)tableBasePath);
                    return metaClient.getActiveTimeline().getFirstPendingClusterInstant().isPresent();
                });
                try {
                    HoodieIndexer runIndexingJob = new HoodieIndexer(jsc, this.buildIndexerConfig(tableBasePath, ds.getConfig().targetTableName, (String)scheduleIndexInstantTime.get(), "execute", "RECORD_INDEX", Arrays.asList(HoodieMetadataConfig.RECORD_INDEX_ENABLE_PROP.key() + "=true", HoodieMetadataConfig.METADATA_INDEX_CHECK_TIMEOUT_SECONDS.key() + "=20")));
                    runIndexingJob.start(0);
                    Assertions.fail((String)"Indexing should fail with catchup failure");
                }
                catch (Throwable t) {
                    boolean res = JavaTestUtils.checkNestedExceptionContains((Throwable)t, (String)"Index catchup failed");
                    Assertions.assertTrue((boolean)res, (String)"Indexing catchup task should have timed out");
                }
                LOG.info("Metadata indexing timed out");
            }
            catch (Exception e) {
                Assertions.fail((String)"Indexing job should not have failed", (Throwable)e);
            }
            return true;
        });
        UtilitiesTestBase.Helpers.deleteFileFromDfs(fs, tableBasePath);
    }

    @ParameterizedTest
    @ValueSource(booleans={true, false})
    public void testHoodieAsyncClusteringJob(boolean shouldPassInClusteringInstantTime) throws Exception {
        String tableBasePath = basePath + "/asyncClusteringJob";
        HoodieDeltaStreamer ds = this.initialHoodieDeltaStreamer(tableBasePath, 3000, "false", HoodieRecord.HoodieRecordType.AVRO);
        CountDownLatch countDownLatch = new CountDownLatch(1);
        TestHoodieDeltaStreamer.deltaStreamerTestRunner(ds, r -> {
            HoodieDeltaStreamerTestBase.TestHelpers.assertAtLeastNCommits(2, tableBasePath);
            countDownLatch.countDown();
            return true;
        });
        if (countDownLatch.await(2L, TimeUnit.MINUTES)) {
            Option scheduleClusteringInstantTime = Option.empty();
            try {
                HoodieClusteringJob scheduleClusteringJob = this.initialHoodieClusteringJob(tableBasePath, null, true, null);
                scheduleClusteringInstantTime = scheduleClusteringJob.doSchedule();
            }
            catch (Exception e) {
                LOG.warn("Schedule clustering failed", (Throwable)e);
                Assertions.fail((String)"Schedule clustering failed", (Throwable)e);
            }
            if (scheduleClusteringInstantTime.isPresent()) {
                LOG.info("Schedule clustering success, now cluster with instant time " + (String)scheduleClusteringInstantTime.get());
                HoodieClusteringJob.Config clusterClusteringConfig = this.buildHoodieClusteringUtilConfig(tableBasePath, shouldPassInClusteringInstantTime ? (String)scheduleClusteringInstantTime.get() : null, false);
                HoodieClusteringJob clusterClusteringJob = new HoodieClusteringJob(jsc, clusterClusteringConfig);
                clusterClusteringJob.cluster(clusterClusteringConfig.retry);
                HoodieDeltaStreamerTestBase.TestHelpers.assertAtLeastNReplaceCommits(1, tableBasePath);
                LOG.info("Cluster success");
            } else {
                LOG.warn("Clustering execution failed");
                Assertions.fail((String)"Clustering execution failed");
            }
        } else {
            Assertions.fail((String)"Deltastreamer should have completed 2 commits.");
        }
    }

    @Disabled(value="HUDI-6753")
    public void testAsyncClusteringServiceSparkRecordType() throws Exception {
        this.testAsyncClusteringService(HoodieRecord.HoodieRecordType.SPARK);
    }

    @Test
    public void testAsyncClusteringServiceAvroRecordType() throws Exception {
        this.testAsyncClusteringService(HoodieRecord.HoodieRecordType.AVRO);
    }

    private void testAsyncClusteringService(HoodieRecord.HoodieRecordType recordType) throws Exception {
        String tableBasePath = basePath + "/asyncClustering";
        int totalRecords = 2000;
        HoodieDeltaStreamer.Config cfg = HoodieDeltaStreamerTestBase.TestHelpers.makeConfig(tableBasePath, WriteOperationType.INSERT);
        this.addRecordMerger(recordType, cfg.configs);
        cfg.continuousMode = true;
        cfg.tableType = HoodieTableType.COPY_ON_WRITE.name();
        cfg.configs.addAll(TestHoodieDeltaStreamer.getTableServicesConfigs(totalRecords, "false", "", "", "true", "3"));
        cfg.configs.add(String.format("%s=%s", "hoodie.datasource.write.row.writer.enable", "false"));
        cfg.configs.add(String.format("%s=%s", "hoodie.merge.allow.duplicate.on.inserts", "false"));
        HoodieDeltaStreamer ds = new HoodieDeltaStreamer(cfg, jsc);
        TestHoodieDeltaStreamer.deltaStreamerTestRunner(ds, cfg, r -> {
            HoodieDeltaStreamerTestBase.TestHelpers.assertAtLeastNReplaceCommits(1, tableBasePath);
            return true;
        });
        HoodieDeltaStreamerTestBase.TestHelpers.assertAtLeastNCommits(4, tableBasePath);
        HoodieDeltaStreamerTestBase.TestHelpers.assertAtLeastNReplaceCommits(1, tableBasePath);
        this.assertDistinctRecordCount(totalRecords, tableBasePath, sqlContext);
        UtilitiesTestBase.Helpers.deleteFileFromDfs(fs, tableBasePath);
    }

    @Timeout(value=600L)
    @Test
    public void testAsyncClusteringServiceWithConflictsAvro() throws Exception {
        this.testAsyncClusteringServiceWithConflicts(HoodieRecord.HoodieRecordType.AVRO);
    }

    private void testAsyncClusteringServiceWithConflicts(HoodieRecord.HoodieRecordType recordType) throws Exception {
        String tableBasePath = basePath + "/asyncClusteringWithConflicts_" + recordType.name();
        int totalRecords = 2000;
        HoodieDeltaStreamer.Config cfg = HoodieDeltaStreamerTestBase.TestHelpers.makeConfig(tableBasePath, WriteOperationType.UPSERT);
        this.addRecordMerger(recordType, cfg.configs);
        cfg.continuousMode = true;
        cfg.tableType = HoodieTableType.COPY_ON_WRITE.name();
        cfg.configs.addAll(TestHoodieDeltaStreamer.getTableServicesConfigs(totalRecords, "false", "", "", "true", "2"));
        cfg.configs.add(String.format("%s=%s", "hoodie.datasource.write.row.writer.enable", "false"));
        HoodieDeltaStreamer ds = new HoodieDeltaStreamer(cfg, jsc);
        TestHoodieDeltaStreamer.deltaStreamerTestRunner(ds, cfg, r -> {
            HoodieDeltaStreamerTestBase.TestHelpers.assertAtLeastNCommitsAfterRollback(1, 1, tableBasePath);
            return true;
        });
        HoodieDeltaStreamerTestBase.TestHelpers.assertAtLeastNReplaceCommits(1, tableBasePath);
        HoodieDeltaStreamerTestBase.TestHelpers.assertAtLeastNCommits(3, tableBasePath);
        UtilitiesTestBase.Helpers.deleteFileFromDfs(fs, tableBasePath);
    }

    @Timeout(value=600L)
    @Test
    public void testAsyncClusteringServiceWithCompaction() throws Exception {
        String tableBasePath = basePath + "/asyncClusteringCompaction";
        int totalRecords = 2000;
        HoodieDeltaStreamer.Config cfg = HoodieDeltaStreamerTestBase.TestHelpers.makeConfig(tableBasePath, WriteOperationType.INSERT);
        this.addRecordMerger(HoodieRecord.HoodieRecordType.AVRO, cfg.configs);
        cfg.continuousMode = true;
        cfg.tableType = HoodieTableType.MERGE_ON_READ.name();
        cfg.configs.addAll(TestHoodieDeltaStreamer.getTableServicesConfigs(totalRecords, "false", "", "", "true", "3"));
        cfg.configs.add(String.format("%s=%s", "hoodie.merge.allow.duplicate.on.inserts", "false"));
        HoodieDeltaStreamer ds = new HoodieDeltaStreamer(cfg, jsc);
        TestHoodieDeltaStreamer.deltaStreamerTestRunner(ds, cfg, r -> {
            HoodieDeltaStreamerTestBase.TestHelpers.assertAtleastNCompactionCommits(2, tableBasePath);
            HoodieDeltaStreamerTestBase.TestHelpers.assertAtLeastNReplaceCommits(1, tableBasePath);
            return true;
        });
        HoodieDeltaStreamerTestBase.TestHelpers.assertAtLeastNCommits(4, tableBasePath);
        HoodieDeltaStreamerTestBase.TestHelpers.assertAtLeastNReplaceCommits(1, tableBasePath);
        this.assertDistinctRecordCount(totalRecords, tableBasePath, sqlContext);
        HoodieTableMetaClient mdtMetaClient = HoodieTableMetaClient.builder().setBasePath(cfg.targetBasePath + "/.hoodie/metadata/").setConf(context.getStorageConf()).build();
        Assertions.assertTrue((boolean)mdtMetaClient.reloadActiveTimeline().getRollbackTimeline().getInstants().isEmpty());
        UtilitiesTestBase.Helpers.deleteFileFromDfs(fs, tableBasePath);
    }

    @ParameterizedTest
    @ValueSource(booleans={true, false})
    public void testAsyncClusteringJobWithRetry(boolean retryLastFailedClusteringJob) throws Exception {
        String tableBasePath = basePath + "/asyncClustering3";
        int totalRecords = 3000;
        HoodieDeltaStreamer.Config cfg = HoodieDeltaStreamerTestBase.TestHelpers.makeConfig(tableBasePath, WriteOperationType.INSERT);
        this.addRecordMerger(HoodieRecord.HoodieRecordType.AVRO, cfg.configs);
        cfg.continuousMode = false;
        cfg.tableType = HoodieTableType.COPY_ON_WRITE.name();
        cfg.configs.addAll(TestHoodieDeltaStreamer.getTableServicesConfigs(totalRecords, "false", "false", "0", "false", "0"));
        cfg.configs.addAll(this.getAllMultiWriterConfigs());
        HoodieDeltaStreamer ds = new HoodieDeltaStreamer(cfg, jsc);
        ds.sync();
        HoodieDeltaStreamerTestBase.TestHelpers.assertAtLeastNCommits(1, tableBasePath);
        HoodieClusteringJob schedule = this.initialHoodieClusteringJob(tableBasePath, null, false, "schedule");
        schedule.cluster(0);
        HoodieDeltaStreamer ds2 = new HoodieDeltaStreamer(cfg, jsc);
        ds2.sync();
        HoodieTableMetaClient meta = HoodieTestUtils.createMetaClient((HoodieStorage)storage, (String)tableBasePath);
        List hoodieClusteringInstants = meta.getActiveTimeline().filterPendingClusteringTimeline().getInstants();
        HoodieInstant clusteringRequest = (HoodieInstant)hoodieClusteringInstants.get(0);
        HoodieInstant hoodieInflightInstant = meta.getActiveTimeline().transitionClusterRequestedToInflight(clusteringRequest, Option.empty());
        HoodieClusteringJob scheduleAndExecute = this.initialHoodieClusteringJob(tableBasePath, null, false, "scheduleAndExecute", retryLastFailedClusteringJob, HoodieRecord.HoodieRecordType.AVRO);
        scheduleAndExecute.cluster(0);
        String completeClusteringTimeStamp = ((HoodieInstant)meta.getActiveTimeline().reload().getCompletedReplaceTimeline().lastInstant().get()).requestedTime();
        if (retryLastFailedClusteringJob) {
            Assertions.assertEquals((Object)clusteringRequest.requestedTime(), (Object)completeClusteringTimeStamp);
        } else {
            Assertions.assertFalse((boolean)clusteringRequest.requestedTime().equalsIgnoreCase(completeClusteringTimeStamp));
        }
        UtilitiesTestBase.Helpers.deleteFileFromDfs(fs, tableBasePath);
    }

    @ParameterizedTest
    @ValueSource(strings={"execute", "schedule", "scheduleAndExecute"})
    public void testHoodieAsyncClusteringJobWithScheduleAndExecute(String runningMode) throws Exception {
        String tableBasePath = basePath + "/asyncClustering2";
        HoodieDeltaStreamer ds = this.initialHoodieDeltaStreamer(tableBasePath, 3000, "false", HoodieRecord.HoodieRecordType.AVRO, WriteOperationType.BULK_INSERT);
        HoodieClusteringJob scheduleClusteringJob = this.initialHoodieClusteringJob(tableBasePath, null, true, runningMode, HoodieRecord.HoodieRecordType.AVRO);
        TestHoodieDeltaStreamer.deltaStreamerTestRunner(ds, r -> {
            block15: {
                Exception exception = null;
                HoodieDeltaStreamerTestBase.TestHelpers.assertAtLeastNCommits(2, tableBasePath);
                try {
                    int result = scheduleClusteringJob.cluster(0);
                    if (result == 0) {
                        LOG.info("Cluster success");
                    } else {
                        LOG.warn("Cluster failed");
                        if (!runningMode.toLowerCase().equals("execute")) {
                            return false;
                        }
                    }
                }
                catch (Exception e) {
                    LOG.warn("ScheduleAndExecute clustering failed", (Throwable)e);
                    exception = e;
                    if (runningMode.equalsIgnoreCase("execute")) break block15;
                    return false;
                }
            }
            switch (runningMode.toLowerCase()) {
                case "scheduleandexecute": {
                    HoodieDeltaStreamerTestBase.TestHelpers.assertAtLeastNReplaceCommits(2, tableBasePath);
                    return true;
                }
                case "schedule": {
                    HoodieDeltaStreamerTestBase.TestHelpers.assertAtLeastNClusterRequests(2, tableBasePath);
                    HoodieDeltaStreamerTestBase.TestHelpers.assertNoReplaceCommits(tableBasePath);
                    return true;
                }
                case "execute": {
                    HoodieDeltaStreamerTestBase.TestHelpers.assertNoReplaceCommits(tableBasePath);
                    return true;
                }
            }
            throw new IllegalStateException("Unexpected value: " + runningMode);
        });
        if (runningMode.toLowerCase(Locale.ROOT).equals("scheduleandexecute")) {
            UtilitiesTestBase.Helpers.deleteFileFromDfs(fs, tableBasePath);
        }
    }

    @Test
    public void testBulkInsertRowWriterNoSchemaProviderNoTransformer() throws Exception {
        this.testBulkInsertRowWriterMultiBatches(false, null);
    }

    @Test
    public void testBulkInsertRowWriterWithoutSchemaProviderAndTransformer() throws Exception {
        this.testBulkInsertRowWriterMultiBatches(false, Collections.singletonList(TripsWithDistanceTransformer.class.getName()));
    }

    @Test
    public void testBulkInsertRowWriterWithSchemaProviderAndNoTransformer() throws Exception {
        this.testBulkInsertRowWriterMultiBatches(true, null);
    }

    @Test
    public void testBulkInsertRowWriterWithSchemaProviderAndTransformer() throws Exception {
        this.testBulkInsertRowWriterMultiBatches(true, Collections.singletonList(TripsWithDistanceTransformer.class.getName()));
    }

    @Test
    public void testBulkInsertRowWriterForEmptyBatch() throws Exception {
        this.testBulkInsertRowWriterMultiBatches(false, null, true);
    }

    private void testBulkInsertRowWriterMultiBatches(boolean useSchemaProvider, List<String> transformerClassNames) throws Exception {
        this.testBulkInsertRowWriterMultiBatches(useSchemaProvider, transformerClassNames, false);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void testBulkInsertRowWriterMultiBatches(Boolean useSchemaProvider, List<String> transformerClassNames, boolean testEmptyBatch) throws Exception {
        PARQUET_SOURCE_ROOT = basePath + "/parquetFilesDfs" + testNum;
        int parquetRecordsCount = 100;
        boolean hasTransformer = transformerClassNames != null && !transformerClassNames.isEmpty();
        TestHoodieDeltaStreamer.prepareParquetDFSFiles(parquetRecordsCount, PARQUET_SOURCE_ROOT, "1.parquet", false, null, null);
        this.prepareParquetDFSSource(useSchemaProvider, hasTransformer, "source.avsc", "target.avsc", "test-parquet-dfs-source.properties", PARQUET_SOURCE_ROOT, false, "partition_path", "");
        String tableBasePath = basePath + "/test_parquet_table" + testNum;
        HoodieDeltaStreamer.Config cfg = HoodieDeltaStreamerTestBase.TestHelpers.makeConfig(tableBasePath, WriteOperationType.BULK_INSERT, testEmptyBatch ? TestParquetDFSSourceEmptyBatch.class.getName() : ParquetDFSSource.class.getName(), transformerClassNames, "test-parquet-dfs-source.properties", false, useSchemaProvider, 100000, false, null, null, "timestamp", null);
        cfg.configs.add(DataSourceWriteOptions.ENABLE_ROW_WRITER().key() + "=true");
        HoodieDeltaStreamer deltaStreamer = new HoodieDeltaStreamer(cfg, jsc);
        deltaStreamer.sync();
        this.assertRecordCount(parquetRecordsCount, tableBasePath, sqlContext);
        deltaStreamer.shutdownGracefully();
        try {
            if (testEmptyBatch) {
                this.prepareParquetDFSSource(useSchemaProvider, hasTransformer, "source.avsc", "target.avsc", "test-parquet-dfs-source.properties", PARQUET_SOURCE_ROOT, false, "partition_path", "0");
                TestHoodieDeltaStreamer.prepareParquetDFSFiles(100, PARQUET_SOURCE_ROOT, "2.parquet", false, null, null);
                deltaStreamer = new HoodieDeltaStreamer(cfg, jsc);
                deltaStreamer.sync();
                this.assertRecordCount(parquetRecordsCount, tableBasePath, sqlContext);
                HoodieTableMetaClient metaClient = HoodieClientTestUtils.createMetaClient((JavaSparkContext)jsc, (String)tableBasePath);
                TableSchemaResolver tableSchemaResolver = new TableSchemaResolver(metaClient);
                Assertions.assertNotEquals((Object)tableSchemaResolver.getTableAvroSchema(), (Object)Schema.create((Schema.Type)Schema.Type.NULL).toString());
                this.compareLatestTwoSchemas(metaClient);
                this.prepareParquetDFSSource(useSchemaProvider, hasTransformer, "source.avsc", "target.avsc", "test-parquet-dfs-source.properties", PARQUET_SOURCE_ROOT, false, "partition_path", "");
                deltaStreamer.shutdownGracefully();
            }
            int recordsSoFar = 100;
            deltaStreamer = new HoodieDeltaStreamer(cfg, jsc);
            for (int i = 2; i < 5; ++i) {
                TestHoodieDeltaStreamer.prepareParquetDFSFiles(100, PARQUET_SOURCE_ROOT, Integer.toString(i) + ".parquet", false, null, null);
                deltaStreamer.sync();
                this.assertRecordCount(recordsSoFar + (i - 1) * 100, tableBasePath, sqlContext);
                if (i != 2 && i != 4) continue;
                HoodieTableMetaClient metaClient = HoodieClientTestUtils.createMetaClient((JavaSparkContext)jsc, (String)tableBasePath);
                metaClient.reloadActiveTimeline().getCommitsTimeline().filterCompletedInstants().getInstants().forEach(entry -> this.assertValidSchemaAndOperationTypeInCommitMetadata((HoodieInstant)entry, metaClient, WriteOperationType.BULK_INSERT));
            }
            TestHoodieDeltaStreamer.assertUseV2Checkpoint(HoodieClientTestUtils.createMetaClient((JavaSparkContext)jsc, (String)tableBasePath));
        }
        finally {
            deltaStreamer.shutdownGracefully();
        }
        ++testNum;
    }

    @Test
    public void testBulkInsertRowWriterContinuousModeWithAsyncClustering() throws Exception {
        this.testBulkInsertRowWriterContinuousMode(false, null, false, TestHoodieDeltaStreamer.getTableServicesConfigs(2000, "false", "", "", "true", "3"), false);
    }

    @Test
    public void testBulkInsertRowWriterContinuousModeWithInlineClustering() throws Exception {
        this.testBulkInsertRowWriterContinuousMode(false, null, false, TestHoodieDeltaStreamer.getTableServicesConfigs(2000, "false", "true", "3", "false", ""), false);
    }

    @Test
    public void testBulkInsertRowWriterContinuousModeWithInlineClusteringAmbiguousDates() throws Exception {
        sparkSession.sqlContext().setConf("spark.sql.parquet.datetimeRebaseModeInWrite", "LEGACY");
        sparkSession.sqlContext().setConf("spark.sql.avro.datetimeRebaseModeInWrite", "LEGACY");
        sparkSession.sqlContext().setConf("spark.sql.parquet.int96RebaseModeInWrite", "LEGACY");
        sparkSession.sqlContext().setConf("spark.sql.parquet.datetimeRebaseModeInRead", "LEGACY");
        sparkSession.sqlContext().setConf("spark.sql.avro.datetimeRebaseModeInRead", "LEGACY");
        sparkSession.sqlContext().setConf("spark.sql.parquet.int96RebaseModeInRead", "LEGACY");
        this.testBulkInsertRowWriterContinuousMode(false, null, false, TestHoodieDeltaStreamer.getTableServicesConfigs(2000, "false", "true", "3", "false", ""), true);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void testBulkInsertRowWriterContinuousMode(Boolean useSchemaProvider, List<String> transformerClassNames, boolean testEmptyBatch, List<String> customConfigs, boolean makeDatesAmbiguous) throws Exception {
        PARQUET_SOURCE_ROOT = basePath + "/parquetFilesDfs" + testNum;
        int parquetRecordsCount = 100;
        boolean hasTransformer = transformerClassNames != null && !transformerClassNames.isEmpty();
        TestHoodieDeltaStreamer.prepareParquetDFSFiles(parquetRecordsCount, PARQUET_SOURCE_ROOT, "1.parquet", false, null, null, makeDatesAmbiguous);
        this.prepareParquetDFSSource(useSchemaProvider, hasTransformer, "source.avsc", "target.avsc", "test-parquet-dfs-source.properties", PARQUET_SOURCE_ROOT, false, "partition_path", testEmptyBatch ? "1" : "");
        ExecutorService executor = Executors.newSingleThreadExecutor();
        Future<?> inputGenerationFuture = executor.submit(() -> {
            try {
                for (int counter = 2; counter < 100; ++counter) {
                    LOG.info("Generating data for batch " + counter);
                    TestHoodieDeltaStreamer.prepareParquetDFSFiles(100, PARQUET_SOURCE_ROOT, Integer.toString(counter) + ".parquet", false, null, null, makeDatesAmbiguous);
                    Thread.sleep(2000L);
                }
            }
            catch (Exception ex) {
                LOG.warn("Input data generation failed", (Object)ex.getMessage());
                throw new RuntimeException(ex.getMessage(), ex);
            }
        });
        String tableBasePath = basePath + "/test_parquet_table" + testNum;
        HoodieDeltaStreamer.Config cfg = HoodieDeltaStreamerTestBase.TestHelpers.makeConfig(tableBasePath, WriteOperationType.BULK_INSERT, testEmptyBatch ? TestParquetDFSSourceEmptyBatch.class.getName() : ParquetDFSSource.class.getName(), transformerClassNames, "test-parquet-dfs-source.properties", false, useSchemaProvider, 100000, false, null, null, "timestamp", null);
        cfg.continuousMode = true;
        cfg.configs.add(DataSourceWriteOptions.ENABLE_ROW_WRITER().key() + "=true");
        cfg.configs.addAll(customConfigs);
        HoodieDeltaStreamer ds = new HoodieDeltaStreamer(cfg, jsc);
        try {
            TestHoodieDeltaStreamer.deltaStreamerTestRunner(ds, cfg, r -> {
                HoodieDeltaStreamerTestBase.TestHelpers.assertAtLeastNReplaceCommits(1, tableBasePath);
                return true;
            });
            HoodieDeltaStreamerTestBase.TestHelpers.assertAtLeastNCommits(4, tableBasePath);
            HoodieDeltaStreamerTestBase.TestHelpers.assertAtLeastNReplaceCommits(1, tableBasePath);
        }
        finally {
            ds.shutdownGracefully();
            inputGenerationFuture.cancel(true);
            UtilitiesTestBase.Helpers.deleteFileFromDfs(fs, tableBasePath);
            executor.shutdown();
        }
        ++testNum;
    }

    @Test
    public void testBulkInsertsAndUpsertsWithSQLBasedTransformerFor2StepPipeline() throws Exception {
        HoodieRecord.HoodieRecordType recordType = HoodieRecord.HoodieRecordType.AVRO;
        String tableBasePath = basePath + "/" + recordType.toString() + "/test_table2";
        String downstreamTableBasePath = basePath + "/" + recordType.toString() + "/test_downstream_table2";
        HoodieDeltaStreamer.Config cfg = HoodieDeltaStreamerTestBase.TestHelpers.makeConfig(tableBasePath, WriteOperationType.BULK_INSERT, Collections.singletonList(SqlQueryBasedTransformer.class.getName()), "test-source.properties", true);
        this.addRecordMerger(recordType, cfg.configs);
        cfg.configs.add("hoodie.datasource.hive_sync.partition_fields=year,month,day");
        new HoodieDeltaStreamer(cfg, jsc, fs, (Configuration)hiveServer.getHiveConf()).sync();
        this.assertRecordCount(1000L, tableBasePath, sqlContext);
        this.assertDistanceCount(1000L, tableBasePath, sqlContext);
        this.assertDistanceCountWithExactValue(1000L, tableBasePath, sqlContext);
        HoodieInstant lastInstantForUpstreamTable = HoodieDeltaStreamerTestBase.TestHelpers.assertCommitMetadata("00000", tableBasePath, 1);
        HoodieDeltaStreamer.Config downstreamCfg = HoodieDeltaStreamerTestBase.TestHelpers.makeConfigForHudiIncrSrc(tableBasePath, downstreamTableBasePath, WriteOperationType.BULK_INSERT, true, null);
        this.addRecordMerger(recordType, downstreamCfg.configs);
        new HoodieDeltaStreamer(downstreamCfg, jsc, fs, (Configuration)hiveServer.getHiveConf()).sync();
        this.assertRecordCount(1000L, downstreamTableBasePath, sqlContext);
        this.assertDistanceCount(1000L, downstreamTableBasePath, sqlContext);
        this.assertDistanceCountWithExactValue(1000L, downstreamTableBasePath, sqlContext);
        HoodieDeltaStreamerTestBase.TestHelpers.assertCommitMetadata(lastInstantForUpstreamTable.getCompletionTime(), downstreamTableBasePath, 1);
        cfg.sourceLimit = 0L;
        new HoodieDeltaStreamer(cfg, jsc, fs, (Configuration)hiveServer.getHiveConf()).sync();
        this.assertRecordCount(1000L, tableBasePath, sqlContext);
        this.assertDistanceCount(1000L, tableBasePath, sqlContext);
        this.assertDistanceCountWithExactValue(1000L, tableBasePath, sqlContext);
        HoodieDeltaStreamerTestBase.TestHelpers.assertCommitMetadata("00000", tableBasePath, 1);
        HoodieDeltaStreamer.Config downstreamCfg1 = HoodieDeltaStreamerTestBase.TestHelpers.makeConfigForHudiIncrSrc(tableBasePath, downstreamTableBasePath, WriteOperationType.BULK_INSERT, true, DummySchemaProvider.class.getName());
        new HoodieDeltaStreamer(downstreamCfg1, jsc).sync();
        this.assertRecordCount(1000L, downstreamTableBasePath, sqlContext);
        this.assertDistanceCount(1000L, downstreamTableBasePath, sqlContext);
        this.assertDistanceCountWithExactValue(1000L, downstreamTableBasePath, sqlContext);
        HoodieDeltaStreamerTestBase.TestHelpers.assertCommitMetadata(lastInstantForUpstreamTable.getCompletionTime(), downstreamTableBasePath, 1);
        cfg.sourceLimit = 2000L;
        cfg.operation = WriteOperationType.UPSERT;
        new HoodieDeltaStreamer(cfg, jsc, fs, (Configuration)hiveServer.getHiveConf()).sync();
        this.assertRecordCount(1950L, tableBasePath, sqlContext);
        this.assertDistanceCount(1950L, tableBasePath, sqlContext);
        this.assertDistanceCountWithExactValue(1950L, tableBasePath, sqlContext);
        lastInstantForUpstreamTable = HoodieDeltaStreamerTestBase.TestHelpers.assertCommitMetadata("00001", tableBasePath, 2);
        List<Row> counts = this.countsPerCommit(tableBasePath, sqlContext);
        Assertions.assertEquals((long)1950L, (long)counts.stream().mapToLong(entry -> entry.getLong(1)).sum());
        downstreamCfg = HoodieDeltaStreamerTestBase.TestHelpers.makeConfigForHudiIncrSrc(tableBasePath, downstreamTableBasePath, WriteOperationType.UPSERT, false, null);
        this.addRecordMerger(recordType, downstreamCfg.configs);
        downstreamCfg.sourceLimit = 2000L;
        new HoodieDeltaStreamer(downstreamCfg, jsc).sync();
        this.assertRecordCount(2000L, downstreamTableBasePath, sqlContext);
        this.assertDistanceCount(2000L, downstreamTableBasePath, sqlContext);
        this.assertDistanceCountWithExactValue(2000L, downstreamTableBasePath, sqlContext);
        HoodieInstant finalInstant = HoodieDeltaStreamerTestBase.TestHelpers.assertCommitMetadata(lastInstantForUpstreamTable.getCompletionTime(), downstreamTableBasePath, 2);
        counts = this.countsPerCommit(downstreamTableBasePath, sqlContext);
        Assertions.assertEquals((long)2000L, (long)counts.stream().mapToLong(entry -> entry.getLong(1)).sum());
        HiveSyncConfig hiveSyncConfig = TestHoodieDeltaStreamer.getHiveSyncConfig(tableBasePath, "hive_trips");
        hiveSyncConfig.setValue(HoodieSyncConfig.META_SYNC_PARTITION_FIELDS, "year,month,day");
        hiveSyncConfig.setHadoopConf((Configuration)hiveTestService.getHiveConf());
        HoodieTableMetaClient metaClient = HoodieTableMetaClient.builder().setConf(context.getStorageConf()).setBasePath(tableBasePath).setLoadActiveTimelineOnLoad(true).build();
        HoodieHiveSyncClient hiveClient = new HoodieHiveSyncClient(hiveSyncConfig, metaClient);
        String tableName = hiveSyncConfig.getString(HoodieSyncConfig.META_SYNC_TABLE_NAME);
        Assertions.assertTrue((boolean)hiveClient.tableExists(tableName), (String)("Table " + tableName + " should exist"));
        Assertions.assertEquals((int)3, (int)hiveClient.getAllPartitions(tableName).size(), (String)"Table partitions should match the number of partitions we wrote");
        Assertions.assertEquals((Object)lastInstantForUpstreamTable.requestedTime(), (Object)hiveClient.getLastCommitTimeSynced(tableName).get(), (String)"The last commit that was synced should be updated in the TBLPROPERTIES");
        UtilitiesTestBase.Helpers.deleteFileFromDfs(fs, tableBasePath);
        UtilitiesTestBase.Helpers.deleteFileFromDfs(fs, downstreamTableBasePath);
    }

    @Test
    public void testNullSchemaProvider() {
        String tableBasePath = basePath + "/test_table";
        HoodieDeltaStreamer.Config cfg = HoodieDeltaStreamerTestBase.TestHelpers.makeConfig(tableBasePath, WriteOperationType.BULK_INSERT, Collections.singletonList(SqlQueryBasedTransformer.class.getName()), "test-source.properties", true, false, false, null, null);
        Exception e = (Exception)Assertions.assertThrows(HoodieException.class, () -> new HoodieDeltaStreamer(cfg, jsc, fs, (Configuration)hiveServer.getHiveConf()).sync(), (String)"Should error out when schema provider is not provided");
        LOG.debug("Expected error during reading data from source ", (Throwable)e);
        Assertions.assertTrue((boolean)e.getMessage().contains("Please provide a valid schema provider class!"));
    }

    @Test
    public void testPayloadClassUpdate() throws Exception {
        String dataSetBasePath = basePath + "/test_dataset_mor_payload_class_update";
        HoodieDeltaStreamer.Config cfg = HoodieDeltaStreamerTestBase.TestHelpers.makeConfig(dataSetBasePath, WriteOperationType.BULK_INSERT, Collections.singletonList(SqlQueryBasedTransformer.class.getName()), "test-source.properties", false, true, false, null, "MERGE_ON_READ");
        new HoodieDeltaStreamer(cfg, jsc, fs, (Configuration)hiveServer.getHiveConf()).sync();
        this.assertRecordCount(1000L, dataSetBasePath, sqlContext);
        cfg = HoodieDeltaStreamerTestBase.TestHelpers.makeConfig(dataSetBasePath, WriteOperationType.BULK_INSERT, Collections.singletonList(SqlQueryBasedTransformer.class.getName()), "test-source.properties", false, true, true, DummyAvroPayload.class.getName(), "MERGE_ON_READ");
        new HoodieDeltaStreamer(cfg, jsc, fs, (Configuration)hiveServer.getHiveConf());
        HoodieTableMetaClient metaClient = UtilHelpers.createMetaClient((JavaSparkContext)jsc, (String)dataSetBasePath, (boolean)false);
        Assertions.assertEquals((Object)metaClient.getTableConfig().getPayloadClass(), (Object)DummyAvroPayload.class.getName());
    }

    @Test
    public void testPartialPayloadClass() throws Exception {
        String dataSetBasePath = basePath + "/test_dataset_mor";
        HoodieDeltaStreamer.Config cfg = HoodieDeltaStreamerTestBase.TestHelpers.makeConfig(dataSetBasePath, WriteOperationType.BULK_INSERT, Collections.singletonList(SqlQueryBasedTransformer.class.getName()), "test-source.properties", false, true, true, PartialUpdateAvroPayload.class.getName(), "MERGE_ON_READ");
        new HoodieDeltaStreamer(cfg, jsc, fs, (Configuration)hiveServer.getHiveConf()).sync();
        this.assertRecordCount(1000L, dataSetBasePath, sqlContext);
        HoodieTableMetaClient metaClient = UtilHelpers.createMetaClient((JavaSparkContext)jsc, (String)dataSetBasePath, (boolean)false);
        Assertions.assertEquals((Object)metaClient.getTableConfig().getPayloadClass(), (Object)PartialUpdateAvroPayload.class.getName());
    }

    @Test
    public void testPayloadClassUpdateWithCOWTable() throws Exception {
        String dataSetBasePath = basePath + "/test_dataset_cow";
        HoodieDeltaStreamer.Config cfg = HoodieDeltaStreamerTestBase.TestHelpers.makeConfig(dataSetBasePath, WriteOperationType.BULK_INSERT, Collections.singletonList(SqlQueryBasedTransformer.class.getName()), "test-source.properties", false, true, false, null, null);
        new HoodieDeltaStreamer(cfg, jsc, fs, (Configuration)hiveServer.getHiveConf()).sync();
        this.assertRecordCount(1000L, dataSetBasePath, sqlContext);
        Properties props = new Properties();
        String metaPath = dataSetBasePath + "/.hoodie/hoodie.properties";
        FileSystem fs = HadoopFSUtils.getFs((String)cfg.targetBasePath, (Configuration)jsc.hadoopConfiguration());
        try (FSDataInputStream inputStream = fs.open(new Path(metaPath));){
            props.load((InputStream)inputStream);
        }
        Assertions.assertTrue((boolean)props.containsKey(HoodieTableConfig.PAYLOAD_CLASS_NAME.key()));
        Assertions.assertTrue((boolean)props.containsKey(HoodieTableConfig.RECORD_MERGE_MODE.key()));
        Assertions.assertTrue((boolean)props.containsKey(HoodieTableConfig.RECORD_MERGE_STRATEGY_ID.key()));
        cfg = HoodieDeltaStreamerTestBase.TestHelpers.makeConfig(dataSetBasePath, WriteOperationType.BULK_INSERT, Collections.singletonList(SqlQueryBasedTransformer.class.getName()), "test-source.properties", false, true, true, DummyAvroPayload.class.getName(), null);
        new HoodieDeltaStreamer(cfg, jsc, fs, (Configuration)hiveServer.getHiveConf());
        props = new Properties();
        fs = HadoopFSUtils.getFs((String)cfg.targetBasePath, (Configuration)jsc.hadoopConfiguration());
        inputStream = fs.open(new Path(metaPath));
        var7_7 = null;
        try {
            props.load((InputStream)inputStream);
        }
        catch (Throwable throwable) {
            var7_7 = throwable;
            throw throwable;
        }
        finally {
            if (inputStream != null) {
                if (var7_7 != null) {
                    try {
                        inputStream.close();
                    }
                    catch (Throwable throwable) {
                        var7_7.addSuppressed(throwable);
                    }
                } else {
                    inputStream.close();
                }
            }
        }
        Assertions.assertEquals((Object)DummyAvroPayload.class.getName(), (Object)props.get(HoodieTableConfig.PAYLOAD_CLASS_NAME.key()));
    }

    private static Stream<Arguments> getArgumentsForFilterDupesWithPrecombineTest() {
        return Stream.of(Arguments.of((Object[])new Object[]{HoodieRecord.HoodieRecordType.AVRO, "MERGE_ON_READ", ""}), Arguments.of((Object[])new Object[]{HoodieRecord.HoodieRecordType.AVRO, "MERGE_ON_READ", "timestamp"}), Arguments.of((Object[])new Object[]{HoodieRecord.HoodieRecordType.AVRO, "COPY_ON_WRITE", ""}), Arguments.of((Object[])new Object[]{HoodieRecord.HoodieRecordType.AVRO, "COPY_ON_WRITE", "timestamp"}), Arguments.of((Object[])new Object[]{HoodieRecord.HoodieRecordType.SPARK, "MERGE_ON_READ", ""}), Arguments.of((Object[])new Object[]{HoodieRecord.HoodieRecordType.SPARK, "MERGE_ON_READ", "timestamp"}), Arguments.of((Object[])new Object[]{HoodieRecord.HoodieRecordType.SPARK, "COPY_ON_WRITE", ""}), Arguments.of((Object[])new Object[]{HoodieRecord.HoodieRecordType.SPARK, "COPY_ON_WRITE", "timestamp"}));
    }

    @ParameterizedTest
    @MethodSource(value={"getArgumentsForFilterDupesWithPrecombineTest"})
    public void testFilterDupesWithPrecombine(HoodieRecord.HoodieRecordType recordType, String tableType, String sourceOrderingField) throws Exception {
        String tableBasePath = basePath + "/test_dupes_tables_with_precombine";
        HoodieDeltaStreamer.Config cfg = HoodieDeltaStreamerTestBase.TestHelpers.makeConfig(tableBasePath, WriteOperationType.BULK_INSERT);
        cfg.tableType = tableType;
        cfg.filterDupes = true;
        cfg.sourceOrderingField = sourceOrderingField;
        this.addRecordMerger(recordType, cfg.configs);
        new HoodieStreamer((HoodieStreamer.Config)cfg, jsc).sync();
        this.assertRecordCount(1000L, tableBasePath, sqlContext);
        HoodieDeltaStreamerTestBase.TestHelpers.assertCommitMetadata("00000", tableBasePath, 1);
        this.runStreamSync(cfg, true, 2000, WriteOperationType.INSERT);
        this.assertRecordCount(2000L, tableBasePath, sqlContext);
        HoodieDeltaStreamerTestBase.TestHelpers.assertCommitMetadata("00001", tableBasePath, 2);
        UtilitiesTestBase.Helpers.deleteFileFromDfs(fs, tableBasePath);
    }

    @Test
    public void testFilterDupes() throws Exception {
        String tableBasePath = basePath + "/test_dupes_table";
        HoodieDeltaStreamer.Config cfg = HoodieDeltaStreamerTestBase.TestHelpers.makeConfig(tableBasePath, WriteOperationType.BULK_INSERT);
        new HoodieDeltaStreamer(cfg, jsc).sync();
        this.assertRecordCount(1000L, tableBasePath, sqlContext);
        HoodieDeltaStreamerTestBase.TestHelpers.assertCommitMetadata("00000", tableBasePath, 1);
        this.runStreamSync(cfg, true, 2000, WriteOperationType.INSERT);
        this.assertRecordCount(2000L, tableBasePath, sqlContext);
        HoodieDeltaStreamerTestBase.TestHelpers.assertCommitMetadata("00001", tableBasePath, 2);
        List<Row> counts = this.countsPerCommit(tableBasePath, sqlContext);
        Assertions.assertEquals((long)1000L, (long)counts.get(0).getLong(1));
        Assertions.assertEquals((long)1000L, (long)counts.get(1).getLong(1));
        HoodieTableMetaClient mClient = HoodieClientTestUtils.createMetaClient((JavaSparkContext)jsc, (String)tableBasePath);
        HoodieInstant lastFinished = (HoodieInstant)mClient.getCommitsTimeline().filterCompletedInstants().lastInstant().get();
        HoodieDeltaStreamer.Config cfg2 = HoodieDeltaStreamerTestBase.TestHelpers.makeDropAllConfig(tableBasePath, WriteOperationType.UPSERT);
        cfg2.configs.add(String.format("%s=false", HoodieCleanConfig.AUTO_CLEAN.key()));
        this.addRecordMerger(HoodieRecord.HoodieRecordType.AVRO, cfg2.configs);
        this.runStreamSync(cfg2, false, 2000, WriteOperationType.UPSERT);
        mClient = HoodieClientTestUtils.createMetaClient((JavaSparkContext)jsc, (String)tableBasePath);
        HoodieInstant newLastFinished = (HoodieInstant)mClient.getCommitsTimeline().filterCompletedInstants().lastInstant().get();
        Assertions.assertTrue((boolean)InstantComparison.compareTimestamps((String)newLastFinished.requestedTime(), (BiPredicate)InstantComparison.GREATER_THAN, (String)lastFinished.requestedTime()));
        HoodieCommitMetadata commitMetadata = mClient.getActiveTimeline().readCommitMetadata(newLastFinished);
        System.out.println("New Commit Metadata=" + commitMetadata);
        Assertions.assertTrue((boolean)commitMetadata.getPartitionToWriteStats().isEmpty());
        cfg2.filterDupes = true;
        cfg2.operation = WriteOperationType.UPSERT;
        try {
            new HoodieDeltaStreamer(cfg2, jsc).sync();
        }
        catch (IllegalArgumentException e) {
            Assertions.assertTrue((boolean)e.getMessage().contains("'--filter-dupes' needs to be disabled when '--op' is 'UPSERT' to ensure updates are not missed."));
        }
        UtilitiesTestBase.Helpers.deleteFileFromDfs(fs, tableBasePath);
    }

    private void runStreamSync(HoodieDeltaStreamer.Config cfg, boolean filterDupes, int numberOfRecords, WriteOperationType operationType) throws Exception {
        cfg.filterDupes = filterDupes;
        cfg.sourceLimit = numberOfRecords;
        cfg.operation = operationType;
        new HoodieDeltaStreamer(cfg, jsc).sync();
    }

    @ParameterizedTest
    @ValueSource(booleans={true, false})
    public void testDistributedTestDataSource(boolean persistSourceRdd) {
        TypedProperties props = new TypedProperties();
        props.setProperty(SourceTestConfig.MAX_UNIQUE_RECORDS_PROP.key(), "1000");
        props.setProperty(SourceTestConfig.NUM_SOURCE_PARTITIONS_PROP.key(), "1");
        props.setProperty(SourceTestConfig.USE_ROCKSDB_FOR_TEST_DATAGEN_KEYS.key(), "true");
        props.setProperty(HoodieErrorTableConfig.ERROR_TABLE_PERSIST_SOURCE_RDD.key(), String.valueOf(persistSourceRdd));
        DistributedTestDataSource distributedTestDataSource = new DistributedTestDataSource(props, jsc, sparkSession, null);
        InputBatch batch = distributedTestDataSource.fetchNext(Option.empty(), 10000000L);
        if (persistSourceRdd) {
            Exception actualException = (Exception)Assertions.assertThrows(UnsupportedOperationException.class, () -> ((JavaRDD)batch.getBatch().get()).cache());
            Assertions.assertTrue((boolean)actualException.getMessage().contains("Cannot change storage level of an RDD after it was already assigned a level"));
        } else {
            ((JavaRDD)batch.getBatch().get()).cache();
        }
        long c = ((JavaRDD)batch.getBatch().get()).count();
        Assertions.assertEquals((long)1000L, (long)c);
    }

    private void prepareJsonKafkaDFSFiles(int numRecords, boolean createTopic, String topicName) {
        this.prepareJsonKafkaDFSFiles(numRecords, createTopic, topicName, 2);
    }

    private void prepareJsonKafkaDFSFiles(int numRecords, boolean createTopic, String topicName, int numPartitions) {
        if (createTopic) {
            try {
                this.testUtils.createTopic(topicName, numPartitions);
            }
            catch (TopicExistsException topicExistsException) {
                // empty catch block
            }
        }
        HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator();
        this.testUtils.sendMessages(topicName, UtilitiesTestBase.Helpers.jsonifyRecordsByPartitions(dataGenerator.generateInsertsAsPerSchema("000", Integer.valueOf(numRecords), "{\"type\":\"record\",\"name\":\"tripUberRec\",\"fields\":[{\"name\":\"timestamp\",\"type\":\"long\"},{\"name\":\"_row_key\",\"type\":\"string\"},{\"name\":\"rider\",\"type\":\"string\"},{\"name\":\"driver\",\"type\":\"string\"},{\"name\":\"fare\",\"type\":\"double\"},{\"name\": \"_hoodie_is_deleted\", \"type\": \"boolean\", \"default\": false}]}"), numPartitions));
    }

    private void testParquetDFSSource(boolean useSchemaProvider, List<String> transformerClassNames) throws Exception {
        this.testParquetDFSSource(useSchemaProvider, transformerClassNames, false);
    }

    private void testParquetDFSSource(boolean useSchemaProvider, List<String> transformerClassNames, boolean testEmptyBatch) throws Exception {
        PARQUET_SOURCE_ROOT = basePath + "/parquetFilesDfs" + testNum;
        int parquetRecordsCount = 10;
        boolean hasTransformer = transformerClassNames != null && !transformerClassNames.isEmpty();
        TestHoodieDeltaStreamer.prepareParquetDFSFiles(parquetRecordsCount, PARQUET_SOURCE_ROOT, "1.parquet", false, null, null);
        this.prepareParquetDFSSource(useSchemaProvider, hasTransformer, "source.avsc", "target.avsc", "test-parquet-dfs-source.properties", PARQUET_SOURCE_ROOT, false, "partition_path", "");
        String tableBasePath = basePath + "/test_parquet_table" + testNum;
        HoodieDeltaStreamer.Config cfg = HoodieDeltaStreamerTestBase.TestHelpers.makeConfig(tableBasePath, WriteOperationType.INSERT, testEmptyBatch ? TestParquetDFSSourceEmptyBatch.class.getName() : ParquetDFSSource.class.getName(), transformerClassNames, "test-parquet-dfs-source.properties", false, useSchemaProvider, 100000, false, null, null, "timestamp", null);
        HoodieDeltaStreamer deltaStreamer = new HoodieDeltaStreamer(cfg, jsc);
        deltaStreamer.sync();
        this.assertRecordCount(parquetRecordsCount, tableBasePath, sqlContext);
        deltaStreamer.shutdownGracefully();
        if (testEmptyBatch) {
            TestHoodieDeltaStreamer.prepareParquetDFSFiles(100, PARQUET_SOURCE_ROOT, "2.parquet", false, null, null);
            this.prepareParquetDFSSource(useSchemaProvider, hasTransformer, "source.avsc", "target.avsc", "test-parquet-dfs-source.properties", PARQUET_SOURCE_ROOT, false, "partition_path", "0");
            HoodieDeltaStreamer deltaStreamer1 = new HoodieDeltaStreamer(cfg, jsc);
            deltaStreamer1.sync();
            this.assertRecordCount(parquetRecordsCount, tableBasePath, sqlContext);
            HoodieTableMetaClient metaClient = HoodieClientTestUtils.createMetaClient((JavaSparkContext)jsc, (String)tableBasePath);
            TableSchemaResolver tableSchemaResolver = new TableSchemaResolver(metaClient);
            Assertions.assertNotEquals((Object)tableSchemaResolver.getTableAvroSchema(), (Object)Schema.create((Schema.Type)Schema.Type.NULL).toString());
            this.compareLatestTwoSchemas(metaClient);
            this.prepareParquetDFSSource(useSchemaProvider, hasTransformer, "source.avsc", "target.avsc", "test-parquet-dfs-source.properties", PARQUET_SOURCE_ROOT, false, "partition_path", "");
            deltaStreamer1.shutdownGracefully();
        }
        TestHoodieDeltaStreamer.prepareParquetDFSFiles(100, PARQUET_SOURCE_ROOT, "3.parquet", false, null, null);
        deltaStreamer.sync();
        this.assertRecordCount(parquetRecordsCount + 100, tableBasePath, sqlContext);
        HoodieTableMetaClient metaClient = HoodieClientTestUtils.createMetaClient((JavaSparkContext)jsc, (String)tableBasePath);
        metaClient.reloadActiveTimeline().getCommitsTimeline().filterCompletedInstants().getInstants().forEach(entry -> this.assertValidSchemaAndOperationTypeInCommitMetadata((HoodieInstant)entry, metaClient, WriteOperationType.INSERT));
        ++testNum;
        deltaStreamer.shutdownGracefully();
    }

    private void assertValidSchemaAndOperationTypeInCommitMetadata(HoodieInstant instant, HoodieTableMetaClient metaClient, WriteOperationType operationType) {
        try {
            HoodieCommitMetadata commitMetadata = metaClient.getActiveTimeline().readCommitMetadata(instant);
            Assertions.assertFalse((boolean)StringUtils.isNullOrEmpty((String)commitMetadata.getMetadata("schema")));
            Assertions.assertEquals((Object)operationType, (Object)commitMetadata.getOperationType());
        }
        catch (IOException ioException) {
            throw new HoodieException("Failed to parse commit metadata for " + instant.toString());
        }
    }

    private void compareLatestTwoSchemas(HoodieTableMetaClient metaClient) throws IOException {
        List completedInstants = metaClient.getActiveTimeline().getWriteTimeline().filterCompletedInstants().getInstants();
        HoodieCommitMetadata commitMetadata1 = TimelineUtils.getCommitMetadata((HoodieInstant)((HoodieInstant)completedInstants.get(0)), (HoodieTimeline)metaClient.getActiveTimeline());
        HoodieCommitMetadata commitMetadata2 = TimelineUtils.getCommitMetadata((HoodieInstant)((HoodieInstant)completedInstants.get(1)), (HoodieTimeline)metaClient.getActiveTimeline());
        Assertions.assertEquals((Object)commitMetadata1.getMetadata("schema"), (Object)commitMetadata2.getMetadata("schema"));
    }

    private void testORCDFSSource(boolean useSchemaProvider, List<String> transformerClassNames) throws Exception {
        TestHoodieDeltaStreamer.prepareORCDFSFiles(5, ORC_SOURCE_ROOT);
        TypedProperties orcProps = new TypedProperties();
        orcProps.setProperty("include", "base.properties");
        orcProps.setProperty("hoodie.embed.timeline.server", "false");
        orcProps.setProperty("hoodie.datasource.write.recordkey.field", "_row_key");
        orcProps.setProperty("hoodie.datasource.write.partitionpath.field", "partition_path");
        if (useSchemaProvider) {
            orcProps.setProperty("hoodie.streamer.schemaprovider.source.schema.file", basePath + "/source.avsc");
            if (transformerClassNames != null) {
                orcProps.setProperty("hoodie.streamer.schemaprovider.target.schema.file", basePath + "/target.avsc");
            }
        }
        orcProps.setProperty("hoodie.streamer.source.dfs.root", ORC_SOURCE_ROOT);
        UtilitiesTestBase.Helpers.savePropsToDFS(orcProps, storage, basePath + "/" + "test-orc-dfs-source.properties");
        String tableBasePath = basePath + "/test_orc_source_table" + testNum;
        HoodieDeltaStreamer deltaStreamer = new HoodieDeltaStreamer(HoodieDeltaStreamerTestBase.TestHelpers.makeConfig(tableBasePath, WriteOperationType.INSERT, ORCDFSSource.class.getName(), transformerClassNames, "test-orc-dfs-source.properties", false, useSchemaProvider, 100000, false, null, null, "timestamp", null), jsc);
        deltaStreamer.sync();
        this.assertRecordCount(5L, tableBasePath, sqlContext);
        ++testNum;
    }

    private void prepareJsonKafkaDFSSource(String propsFileName, String autoResetValue, String topicName) throws IOException {
        this.prepareJsonKafkaDFSSource(propsFileName, autoResetValue, topicName, null, false);
    }

    private void prepareJsonKafkaDFSSource(String propsFileName, String autoResetValue, String topicName, Map<String, String> extraProps, boolean shouldAddOffsets) throws IOException {
        TypedProperties props = new TypedProperties();
        TestHoodieDeltaStreamer.populateAllCommonProps(props, basePath, this.testUtils.brokerAddress());
        props.setProperty("include", "base.properties");
        props.setProperty("hoodie.embed.timeline.server", "false");
        props.setProperty("hoodie.datasource.write.recordkey.field", "_row_key");
        props.setProperty("hoodie.datasource.write.partitionpath.field", "driver");
        props.setProperty("hoodie.streamer.source.dfs.root", JSON_KAFKA_SOURCE_ROOT);
        props.setProperty("hoodie.streamer.source.kafka.topic", topicName);
        props.setProperty("hoodie.streamer.source.kafka.checkpoint.type", this.kafkaCheckpointType);
        props.setProperty("hoodie.streamer.schemaprovider.source.schema.file", basePath + "/source_uber.avsc");
        props.setProperty("hoodie.streamer.schemaprovider.target.schema.file", basePath + "/target_uber.avsc");
        props.setProperty("auto.offset.reset", autoResetValue);
        if (extraProps != null && !extraProps.isEmpty()) {
            extraProps.forEach((arg_0, arg_1) -> props.setProperty(arg_0, arg_1));
        }
        props.setProperty(HoodieStreamerConfig.KAFKA_APPEND_OFFSETS.key(), Boolean.toString(shouldAddOffsets));
        UtilitiesTestBase.Helpers.savePropsToDFS(props, storage, basePath + "/" + propsFileName);
    }

    private void testDeltaStreamerTransitionFromParquetToKafkaSource(boolean autoResetToLatest) throws Exception {
        PARQUET_SOURCE_ROOT = basePath + "/parquetFilesDfsToKafka" + testNum;
        int parquetRecords = 10;
        TestHoodieDeltaStreamer.prepareParquetDFSFiles(parquetRecords, PARQUET_SOURCE_ROOT, "1.parquet", true, "{\"type\":\"record\",\"name\":\"tripUberRec\",\"fields\":[{\"name\":\"timestamp\",\"type\":\"long\"},{\"name\":\"_row_key\",\"type\":\"string\"},{\"name\":\"rider\",\"type\":\"string\"},{\"name\":\"driver\",\"type\":\"string\"},{\"name\":\"fare\",\"type\":\"double\"},{\"name\": \"_hoodie_is_deleted\", \"type\": \"boolean\", \"default\": false}]}", HoodieTestDataGenerator.AVRO_TRIP_SCHEMA);
        this.prepareParquetDFSSource(true, true, "source_uber.avsc", "target_uber.avsc", "test-parquet-dfs-source.properties", PARQUET_SOURCE_ROOT, false, "driver");
        String tableBasePath = basePath + "/test_dfs_to_kafka" + testNum;
        HoodieDeltaStreamer deltaStreamer = new HoodieDeltaStreamer(HoodieDeltaStreamerTestBase.TestHelpers.makeConfig(tableBasePath, WriteOperationType.INSERT, ParquetDFSSource.class.getName(), Collections.emptyList(), "test-parquet-dfs-source.properties", false, true, 100000, false, null, null, "timestamp", null), jsc);
        deltaStreamer.sync();
        this.assertRecordCount(parquetRecords, tableBasePath, sqlContext);
        deltaStreamer.shutdownGracefully();
        topicName = "topic" + testNum;
        this.prepareJsonKafkaDFSFiles(5, true, topicName);
        this.prepareJsonKafkaDFSSource("test-json-kafka-dfs-source.properties", autoResetToLatest ? "latest" : "earliest", topicName);
        deltaStreamer = new HoodieDeltaStreamer(HoodieDeltaStreamerTestBase.TestHelpers.makeConfig(tableBasePath, WriteOperationType.UPSERT, JsonKafkaSource.class.getName(), Collections.emptyList(), "test-json-kafka-dfs-source.properties", false, true, 100000, false, null, null, "timestamp", null), jsc);
        deltaStreamer.sync();
        int totalExpectedRecords = parquetRecords + (autoResetToLatest ? 0 : 5);
        this.assertRecordCount(totalExpectedRecords, tableBasePath, sqlContext);
        this.prepareJsonKafkaDFSFiles(20, false, topicName);
        deltaStreamer.sync();
        this.assertRecordCount(totalExpectedRecords += 20, tableBasePath, sqlContext);
        ++testNum;
    }

    @Test
    public void testJsonKafkaDFSSource() throws Exception {
        topicName = "topic" + testNum;
        this.prepareJsonKafkaDFSFiles(5, true, topicName);
        this.prepareJsonKafkaDFSSource("test-json-kafka-dfs-source.properties", "earliest", topicName);
        String tableBasePath = basePath + "/test_json_kafka_table" + testNum;
        HoodieDeltaStreamer deltaStreamer = new HoodieDeltaStreamer(HoodieDeltaStreamerTestBase.TestHelpers.makeConfig(tableBasePath, WriteOperationType.UPSERT, JsonKafkaSource.class.getName(), Collections.emptyList(), "test-json-kafka-dfs-source.properties", false, true, 100000, false, null, null, "timestamp", null), jsc);
        deltaStreamer.sync();
        this.assertRecordCount(5L, tableBasePath, sqlContext);
        int totalRecords = 5;
        int records = 10;
        this.prepareJsonKafkaDFSFiles(records, false, topicName);
        deltaStreamer.sync();
        this.assertRecordCount(totalRecords += records, tableBasePath, sqlContext);
    }

    @Test
    public void testJsonKafkaDFSSourceWithOffsets() throws Exception {
        topicName = "topic" + testNum;
        int numRecords = 30;
        int numPartitions = 2;
        int recsPerPartition = numRecords / numPartitions;
        long beforeTime = Instant.now().toEpochMilli();
        this.prepareJsonKafkaDFSFiles(numRecords, true, topicName, numPartitions);
        this.prepareJsonKafkaDFSSource("test-json-kafka-dfs-source.properties", "earliest", topicName, null, true);
        String tableBasePath = basePath + "/test_json_kafka_offsets_table" + testNum;
        HoodieDeltaStreamer deltaStreamer = new HoodieDeltaStreamer(HoodieDeltaStreamerTestBase.TestHelpers.makeConfig(tableBasePath, WriteOperationType.UPSERT, JsonKafkaSource.class.getName(), Collections.emptyList(), "test-json-kafka-dfs-source.properties", false, true, 100000, false, null, null, "timestamp", null), jsc);
        deltaStreamer.sync();
        sqlContext.clearCache();
        Dataset ds = sqlContext.read().format("org.apache.hudi").load(tableBasePath);
        Assertions.assertEquals((long)numRecords, (long)ds.count());
        for (int i = 0; i < numPartitions; ++i) {
            Assertions.assertEquals((long)recsPerPartition, (long)ds.filter("_hoodie_kafka_source_partition=" + i).count());
        }
        long afterTime = Instant.now().toEpochMilli();
        Assertions.assertEquals((long)numRecords, (long)ds.filter("_hoodie_kafka_source_timestamp>" + beforeTime).filter("_hoodie_kafka_source_timestamp<" + afterTime).count());
        sqlContext.read().format("org.apache.hudi").load(tableBasePath).col("_hoodie_kafka_source_offset");
        for (int i = 0; i < recsPerPartition; ++i) {
            for (int j = 0; j < numPartitions; ++j) {
                Assertions.assertEquals((long)1L, (long)ds.filter("_hoodie_kafka_source_offset=" + i).filter("_hoodie_kafka_source_partition=" + j).count());
            }
        }
    }

    @Test
    public void testKafkaTimestampType() throws Exception {
        topicName = "topic" + testNum;
        this.kafkaCheckpointType = "timestamp";
        this.prepareJsonKafkaDFSFiles(5, true, topicName);
        this.prepareJsonKafkaDFSSource("test-json-kafka-dfs-source.properties", "earliest", topicName);
        String tableBasePath = basePath + "/test_json_kafka_table" + testNum;
        HoodieDeltaStreamer deltaStreamer = new HoodieDeltaStreamer(HoodieDeltaStreamerTestBase.TestHelpers.makeConfig(tableBasePath, WriteOperationType.UPSERT, JsonKafkaSource.class.getName(), Collections.emptyList(), "test-json-kafka-dfs-source.properties", false, true, 100000, false, null, null, "timestamp", String.valueOf(System.currentTimeMillis())), jsc);
        deltaStreamer.sync();
        this.assertRecordCount(5L, tableBasePath, sqlContext);
        this.prepareJsonKafkaDFSFiles(5, false, topicName);
        deltaStreamer = new HoodieDeltaStreamer(HoodieDeltaStreamerTestBase.TestHelpers.makeConfig(tableBasePath, WriteOperationType.UPSERT, JsonKafkaSource.class.getName(), Collections.emptyList(), "test-json-kafka-dfs-source.properties", false, true, 100000, false, null, null, "timestamp", String.valueOf(System.currentTimeMillis())), jsc);
        deltaStreamer.sync();
        this.assertRecordCount(10L, tableBasePath, sqlContext);
    }

    @Disabled(value="HUDI-6609")
    public void testDeltaStreamerMultiwriterCheckpoint() throws Exception {
        PARQUET_SOURCE_ROOT = basePath + "/parquetFilesMultiCheckpoint" + testNum;
        int parquetRecords = 100;
        HoodieTestDataGenerator dataGenerator = TestHoodieDeltaStreamer.prepareParquetDFSFiles(parquetRecords, PARQUET_SOURCE_ROOT, "1.parquet", true, "{\"type\":\"record\",\"name\":\"tripUberRec\",\"fields\":[{\"name\":\"timestamp\",\"type\":\"long\"},{\"name\":\"_row_key\",\"type\":\"string\"},{\"name\":\"rider\",\"type\":\"string\"},{\"name\":\"driver\",\"type\":\"string\"},{\"name\":\"fare\",\"type\":\"double\"},{\"name\": \"_hoodie_is_deleted\", \"type\": \"boolean\", \"default\": false}]}", HoodieTestDataGenerator.AVRO_TRIP_SCHEMA);
        this.prepareParquetDFSSource(true, true, "source_uber.avsc", "target_uber.avsc", "test-parquet-dfs-source.properties", PARQUET_SOURCE_ROOT, false, "driver");
        String tableBasePath = basePath + "/test_multi_checkpoint" + testNum;
        HoodieDeltaStreamer.Config parquetCfg = HoodieDeltaStreamerTestBase.TestHelpers.makeConfig(tableBasePath, WriteOperationType.INSERT, ParquetDFSSource.class.getName(), Collections.emptyList(), "test-parquet-dfs-source.properties", false, true, Integer.MAX_VALUE, false, null, null, "timestamp", null);
        parquetCfg.configs = new ArrayList();
        HoodieDeltaStreamer parquetDs = new HoodieDeltaStreamer(parquetCfg, jsc);
        parquetDs.sync();
        this.assertRecordCount(100L, tableBasePath, sqlContext);
        topicName = "topic" + testNum;
        this.prepareJsonKafkaDFSFiles(20, true, topicName);
        HashMap<String, String> kafkaExtraProps = new HashMap<String, String>();
        this.prepareJsonKafkaDFSSource("test-json-kafka-dfs-source.properties", "earliest", topicName, kafkaExtraProps, false);
        HoodieDeltaStreamer kafkaDs = new HoodieDeltaStreamer(HoodieDeltaStreamerTestBase.TestHelpers.makeConfig(tableBasePath, WriteOperationType.UPSERT, JsonKafkaSource.class.getName(), Collections.emptyList(), "test-json-kafka-dfs-source.properties", false, true, Integer.MAX_VALUE, false, null, null, "timestamp", null), jsc);
        kafkaDs.sync();
        int totalExpectedRecords = parquetRecords + 20;
        this.assertRecordCount(totalExpectedRecords, tableBasePath, sqlContext);
        TestHoodieDeltaStreamer.prepareParquetDFSUpdates(parquetRecords, PARQUET_SOURCE_ROOT, "1.parquet", true, "{\"type\":\"record\",\"name\":\"tripUberRec\",\"fields\":[{\"name\":\"timestamp\",\"type\":\"long\"},{\"name\":\"_row_key\",\"type\":\"string\"},{\"name\":\"rider\",\"type\":\"string\"},{\"name\":\"driver\",\"type\":\"string\"},{\"name\":\"fare\",\"type\":\"double\"},{\"name\": \"_hoodie_is_deleted\", \"type\": \"boolean\", \"default\": false}]}", HoodieTestDataGenerator.AVRO_TRIP_SCHEMA, dataGenerator, "001");
        parquetDs = new HoodieDeltaStreamer(parquetCfg, jsc);
        parquetDs.sync();
        this.assertRecordCount(parquetRecords * 2 + 20, tableBasePath, sqlContext);
        HoodieTableMetaClient metaClient = HoodieTestUtils.init((StorageConfiguration)HadoopFSUtils.getStorageConf((Configuration)jsc.hadoopConfiguration()), (String)tableBasePath);
        List instants = metaClient.getCommitsTimeline().getInstants();
        ObjectMapper objectMapper = new ObjectMapper();
        HoodieCommitMetadata commitMetadata = metaClient.getCommitsTimeline().readCommitMetadata((HoodieInstant)instants.get(0));
        Map checkpointVals = (Map)objectMapper.readValue((String)commitMetadata.getExtraMetadata().get("deltastreamer.checkpoint.key"), Map.class);
        String parquetFirstcheckpoint = (String)checkpointVals.get("parquet");
        Assertions.assertNotNull((Object)parquetFirstcheckpoint);
        commitMetadata = metaClient.getCommitsTimeline().readCommitMetadata((HoodieInstant)instants.get(1));
        checkpointVals = (Map)objectMapper.readValue((String)commitMetadata.getExtraMetadata().get("deltastreamer.checkpoint.key"), Map.class);
        String kafkaCheckpoint = (String)checkpointVals.get("kafka");
        Assertions.assertNotNull((Object)kafkaCheckpoint);
        Assertions.assertEquals((Object)parquetFirstcheckpoint, checkpointVals.get("parquet"));
        commitMetadata = metaClient.getCommitsTimeline().readCommitMetadata((HoodieInstant)instants.get(2));
        checkpointVals = (Map)objectMapper.readValue((String)commitMetadata.getExtraMetadata().get("deltastreamer.checkpoint.key"), Map.class);
        String parquetSecondCheckpoint = (String)checkpointVals.get("parquet");
        Assertions.assertNotNull((Object)parquetSecondCheckpoint);
        Assertions.assertEquals((Object)kafkaCheckpoint, checkpointVals.get("kafka"));
        Assertions.assertTrue((Long.parseLong(parquetSecondCheckpoint) > Long.parseLong(parquetFirstcheckpoint) ? 1 : 0) != 0);
        parquetDs.shutdownGracefully();
        kafkaDs.shutdownGracefully();
    }

    @Test
    public void testParquetSourceToKafkaSourceEarliestAutoResetValue() throws Exception {
        this.testDeltaStreamerTransitionFromParquetToKafkaSource(false);
    }

    @Test
    public void testParquetSourceToKafkaSourceLatestAutoResetValue() throws Exception {
        this.testDeltaStreamerTransitionFromParquetToKafkaSource(true);
    }

    @Test
    public void testParquetDFSSourceWithoutSchemaProviderAndNoTransformer() throws Exception {
        this.testParquetDFSSource(false, null);
    }

    @Test
    public void testParquetDFSSourceForEmptyBatch() throws Exception {
        this.testParquetDFSSource(false, null, true);
    }

    @Test
    public void testEmptyBatchWithNullSchemaValue() throws Exception {
        PARQUET_SOURCE_ROOT = basePath + "/parquetFilesDfs" + testNum;
        int parquetRecordsCount = 10;
        TestHoodieDeltaStreamer.prepareParquetDFSFiles(parquetRecordsCount, PARQUET_SOURCE_ROOT, "1.parquet", false, null, null);
        this.prepareParquetDFSSource(false, false, "source.avsc", "target.avsc", "test-parquet-dfs-source.properties", PARQUET_SOURCE_ROOT, false, "partition_path", "0");
        String tableBasePath = basePath + "/test_parquet_table" + testNum;
        HoodieDeltaStreamer.Config config = HoodieDeltaStreamerTestBase.TestHelpers.makeConfig(tableBasePath, WriteOperationType.INSERT, ParquetDFSSource.class.getName(), null, "test-parquet-dfs-source.properties", false, false, 100000, false, null, null, "timestamp", null);
        HoodieDeltaStreamer deltaStreamer1 = new HoodieDeltaStreamer(config, jsc);
        deltaStreamer1.sync();
        this.assertRecordCount(parquetRecordsCount, tableBasePath, sqlContext);
        HoodieTableMetaClient metaClient = HoodieClientTestUtils.createMetaClient((JavaSparkContext)jsc, (String)tableBasePath);
        HoodieInstant firstCommit = (HoodieInstant)metaClient.getActiveTimeline().lastInstant().get();
        deltaStreamer1.shutdownGracefully();
        TestHoodieDeltaStreamer.prepareParquetDFSFiles(100, PARQUET_SOURCE_ROOT, "2.parquet", false, null, null);
        HoodieDeltaStreamer.Config updatedConfig = config;
        updatedConfig.schemaProviderClassName = NullValueSchemaProvider.class.getName();
        updatedConfig.sourceClassName = TestParquetDFSSourceEmptyBatch.class.getName();
        HoodieDeltaStreamer deltaStreamer2 = new HoodieDeltaStreamer(updatedConfig, jsc);
        deltaStreamer2.sync();
        this.assertRecordCount(parquetRecordsCount, tableBasePath, sqlContext);
        TableSchemaResolver tableSchemaResolver = new TableSchemaResolver(metaClient);
        HoodieInstant secondCommit = (HoodieInstant)metaClient.reloadActiveTimeline().lastInstant().get();
        Schema lastCommitSchema = tableSchemaResolver.getTableAvroSchema(secondCommit, true);
        Assertions.assertNotEquals((Object)firstCommit, (Object)secondCommit);
        Assertions.assertNotEquals((Object)lastCommitSchema, (Object)Schema.create((Schema.Type)Schema.Type.NULL));
        deltaStreamer2.shutdownGracefully();
    }

    @Test
    public void testEmptyBatchWithNullSchemaFirstBatch() throws Exception {
        PARQUET_SOURCE_ROOT = basePath + "/parquetFilesDfs" + testNum;
        int parquetRecordsCount = 10;
        TestHoodieDeltaStreamer.prepareParquetDFSFiles(100, PARQUET_SOURCE_ROOT, "1.parquet", false, null, null);
        this.prepareParquetDFSSource(false, false, "source.avsc", "target.avsc", "test-parquet-dfs-source.properties", PARQUET_SOURCE_ROOT, false, "partition_path", "0");
        String tableBasePath = basePath + "/test_parquet_table" + testNum;
        HoodieDeltaStreamer.Config config = HoodieDeltaStreamerTestBase.TestHelpers.makeConfig(tableBasePath, WriteOperationType.UPSERT, ParquetDFSSource.class.getName(), Collections.singletonList(TestIdentityTransformer.class.getName()), "test-parquet-dfs-source.properties", false, false, 100000, false, null, "MERGE_ON_READ", "timestamp", null);
        config.schemaProviderClassName = NullValueSchemaProvider.class.getName();
        config.sourceClassName = TestParquetDFSSourceEmptyBatch.class.getName();
        HoodieDeltaStreamer deltaStreamer1 = new HoodieDeltaStreamer(config, jsc);
        deltaStreamer1.sync();
        deltaStreamer1.shutdownGracefully();
        this.assertRecordCount(0L, tableBasePath, sqlContext);
        config.schemaProviderClassName = null;
        config.sourceClassName = ParquetDFSSource.class.getName();
        TestHoodieDeltaStreamer.prepareParquetDFSFiles(parquetRecordsCount, PARQUET_SOURCE_ROOT, "2.parquet", false, null, null);
        HoodieDeltaStreamer deltaStreamer2 = new HoodieDeltaStreamer(config, jsc);
        deltaStreamer2.sync();
        deltaStreamer2.shutdownGracefully();
        this.assertRecordCount(parquetRecordsCount, tableBasePath, sqlContext);
    }

    @Test
    public void testDeltaStreamerRestartAfterMissingHoodieProps() throws Exception {
        this.testDeltaStreamerRestartAfterMissingHoodieProps(true);
    }

    @Test
    public void testDeltaStreamerRestartAfterMissingHoodiePropsAfterValidCommit() throws Exception {
        this.testDeltaStreamerRestartAfterMissingHoodieProps(false);
    }

    private void testDeltaStreamerRestartAfterMissingHoodieProps(boolean testInitFailure) throws Exception {
        PARQUET_SOURCE_ROOT = basePath + "/parquetFilesDfs" + testNum;
        int parquetRecordsCount = 10;
        boolean hasTransformer = false;
        boolean useSchemaProvider = false;
        TestHoodieDeltaStreamer.prepareParquetDFSFiles(parquetRecordsCount, PARQUET_SOURCE_ROOT, "1.parquet", false, null, null);
        this.prepareParquetDFSSource(useSchemaProvider, hasTransformer, "source.avsc", "target.avsc", "test-parquet-dfs-source.properties", PARQUET_SOURCE_ROOT, false, "partition_path", "0");
        String tableBasePath = basePath + "/test_parquet_table" + testNum;
        HoodieDeltaStreamer deltaStreamer = new HoodieDeltaStreamer(HoodieDeltaStreamerTestBase.TestHelpers.makeConfig(tableBasePath, WriteOperationType.INSERT, testInitFailure ? TestParquetDFSSourceEmptyBatch.class.getName() : ParquetDFSSource.class.getName(), null, "test-parquet-dfs-source.properties", false, useSchemaProvider, 100000, false, null, null, "timestamp", null), jsc);
        deltaStreamer.sync();
        if (testInitFailure) {
            FileStatus[] fileStatuses = fs.listStatus(new Path(tableBasePath + "/.hoodie/timeline/"));
            Arrays.stream(fileStatuses).filter(entry -> entry.getPath().getName().contains("commit") || entry.getPath().getName().contains("inflight")).forEach(entry -> {
                try {
                    fs.delete(entry.getPath());
                }
                catch (IOException e) {
                    LOG.warn("Failed to delete " + entry.getPath().toString(), (Throwable)e);
                }
            });
        }
        fs.delete(new Path(tableBasePath + "/.hoodie/hoodie.properties"));
        if (testInitFailure) {
            deltaStreamer = new HoodieDeltaStreamer(HoodieDeltaStreamerTestBase.TestHelpers.makeConfig(tableBasePath, WriteOperationType.INSERT, ParquetDFSSource.class.getName(), null, "test-parquet-dfs-source.properties", false, useSchemaProvider, 100000, false, null, null, "timestamp", null), jsc);
            deltaStreamer.sync();
            this.assertRecordCount(parquetRecordsCount, tableBasePath, sqlContext);
        } else {
            Assertions.assertThrows(HoodieIOException.class, () -> new HoodieDeltaStreamer(HoodieDeltaStreamerTestBase.TestHelpers.makeConfig(tableBasePath, WriteOperationType.INSERT, ParquetDFSSource.class.getName(), null, "test-parquet-dfs-source.properties", false, useSchemaProvider, 100000, false, null, null, "timestamp", null), jsc));
        }
        ++testNum;
    }

    @Test
    public void testParquetDFSSourceWithoutSchemaProviderAndTransformer() throws Exception {
        this.testParquetDFSSource(false, Collections.singletonList(TripsWithDistanceTransformer.class.getName()));
    }

    @Test
    public void testParquetDFSSourceWithSourceSchemaFileAndNoTransformer() throws Exception {
        this.testParquetDFSSource(true, null);
    }

    @Test
    public void testParquetDFSSourceWithSchemaFilesAndTransformer() throws Exception {
        this.testParquetDFSSource(true, Collections.singletonList(TripsWithDistanceTransformer.class.getName()));
    }

    @Disabled(value="HUDI-8081")
    @Test
    public void testORCDFSSourceWithoutSchemaProviderAndNoTransformer() throws Exception {
        this.testORCDFSSource(false, null);
    }

    @Disabled(value="HUDI-8081")
    @Test
    public void testORCDFSSourceWithSchemaProviderAndWithTransformer() throws Exception {
        this.testORCDFSSource(true, Collections.singletonList(TripsWithDistanceTransformer.class.getName()));
    }

    private void prepareCsvDFSSource(boolean hasHeader, char sep, boolean useSchemaProvider, boolean hasTransformer) throws IOException {
        String sourceRoot = basePath + "/csvFiles";
        String recordKeyField = hasHeader || useSchemaProvider ? "_row_key" : "_c1";
        String partitionPath = hasHeader || useSchemaProvider ? "partition_path" : "_c2";
        TypedProperties csvProps = new TypedProperties();
        csvProps.setProperty("include", "base.properties");
        csvProps.setProperty("hoodie.datasource.write.recordkey.field", recordKeyField);
        csvProps.setProperty("hoodie.datasource.write.partitionpath.field", partitionPath);
        if (useSchemaProvider) {
            csvProps.setProperty("hoodie.streamer.schemaprovider.source.schema.file", basePath + "/source-flattened.avsc");
            if (hasTransformer) {
                csvProps.setProperty("hoodie.streamer.schemaprovider.target.schema.file", basePath + "/target-flattened.avsc");
            }
        }
        csvProps.setProperty("hoodie.streamer.source.dfs.root", sourceRoot);
        if (sep != ',') {
            if (sep == '\t') {
                csvProps.setProperty("hoodie.streamer.csv.sep", "\\t");
            } else {
                csvProps.setProperty("hoodie.streamer.csv.sep", Character.toString(sep));
            }
        }
        if (hasHeader) {
            csvProps.setProperty("hoodie.streamer.csv.header", Boolean.toString(hasHeader));
        }
        UtilitiesTestBase.Helpers.savePropsToDFS(csvProps, storage, basePath + "/" + "test-csv-dfs-source.properties");
        String path = sourceRoot + "/1.csv";
        HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator();
        UtilitiesTestBase.Helpers.saveCsvToDFS(hasHeader, sep, UtilitiesTestBase.Helpers.jsonifyRecords(dataGenerator.generateInserts("000", Integer.valueOf(3), true)), fs, path);
    }

    private void testCsvDFSSource(boolean hasHeader, char sep, boolean useSchemaProvider, List<String> transformerClassNames) throws Exception {
        this.prepareCsvDFSSource(hasHeader, sep, useSchemaProvider, transformerClassNames != null);
        String tableBasePath = basePath + "/test_csv_table" + testNum;
        String sourceOrderingField = hasHeader || useSchemaProvider ? "timestamp" : "_c0";
        HoodieDeltaStreamer deltaStreamer = new HoodieDeltaStreamer(HoodieDeltaStreamerTestBase.TestHelpers.makeConfig(tableBasePath, WriteOperationType.INSERT, CsvDFSSource.class.getName(), transformerClassNames, "test-csv-dfs-source.properties", false, useSchemaProvider, 1000, false, null, null, sourceOrderingField, null), jsc);
        deltaStreamer.sync();
        this.assertRecordCount(3L, tableBasePath, sqlContext);
        ++testNum;
    }

    @Test
    public void testCsvDFSSourceWithHeaderWithoutSchemaProviderAndNoTransformer() throws Exception {
        this.testCsvDFSSource(true, ',', false, null);
    }

    @Test
    public void testCsvDFSSourceWithHeaderAndSepWithoutSchemaProviderAndNoTransformer() throws Exception {
        this.testCsvDFSSource(true, '\t', false, null);
    }

    @Test
    public void testCsvDFSSourceWithHeaderAndSepWithSchemaProviderAndNoTransformer() throws Exception {
        this.testCsvDFSSource(true, '\t', true, null);
    }

    @Test
    public void testCsvDFSSourceWithHeaderAndSepWithoutSchemaProviderAndWithTransformer() throws Exception {
        this.testCsvDFSSource(true, '\t', false, Collections.singletonList(TripsWithDistanceTransformer.class.getName()));
    }

    @Test
    public void testCsvDFSSourceWithHeaderAndSepWithSchemaProviderAndTransformer() throws Exception {
        this.testCsvDFSSource(true, '\t', true, Collections.singletonList(TripsWithDistanceTransformer.class.getName()));
    }

    @Test
    public void testCsvDFSSourceNoHeaderWithoutSchemaProviderAndNoTransformer() throws Exception {
        this.testCsvDFSSource(false, '\t', false, null);
    }

    @Test
    public void testCsvDFSSourceNoHeaderWithSchemaProviderAndNoTransformer() throws Exception {
        this.testCsvDFSSource(false, '\t', true, null);
    }

    @Test
    public void testCsvDFSSourceNoHeaderWithoutSchemaProviderAndWithTransformer() throws Exception {
        Exception e = (Exception)Assertions.assertThrows(AnalysisException.class, () -> this.testCsvDFSSource(false, '\t', false, Collections.singletonList(TripsWithDistanceTransformer.class.getName())), (String)"Should error out when doing the transformation.");
        LOG.debug("Expected error during transformation", (Throwable)e);
        Assertions.assertTrue((e.getMessage().contains("[UNRESOLVED_COLUMN.WITH_SUGGESTION] A column or function parameter with name `begin_lat` cannot be resolved. Did you mean one of the following?") || e.getMessage().contains("Column 'begin_lat' does not exist. Did you mean one of the following?") || e.getMessage().contains("cannot resolve 'begin_lat' given input columns:") ? 1 : 0) != 0);
    }

    @Test
    public void testCsvDFSSourceNoHeaderWithSchemaProviderAndTransformer() throws Exception {
        this.testCsvDFSSource(false, '\t', true, Collections.singletonList(TripsWithDistanceTransformer.class.getName()));
    }

    private void prepareSqlSource() throws IOException {
        String sourceRoot = basePath + "sqlSourceFiles";
        TypedProperties sqlSourceProps = new TypedProperties();
        sqlSourceProps.setProperty("include", "base.properties");
        sqlSourceProps.setProperty("hoodie.embed.timeline.server", "false");
        sqlSourceProps.setProperty("hoodie.datasource.write.recordkey.field", "_row_key");
        sqlSourceProps.setProperty("hoodie.datasource.write.partitionpath.field", "partition_path");
        sqlSourceProps.setProperty("hoodie.streamer.source.sql.sql.query", "select * from test_sql_table");
        UtilitiesTestBase.Helpers.savePropsToDFS(sqlSourceProps, storage, basePath + "/" + "test-sql-source-source.properties");
        HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator();
        this.generateSqlSourceTestTable(sourceRoot, "1", "1000", 1000, dataGenerator);
    }

    private void generateSqlSourceTestTable(String dfsRoot, String filename, String instantTime, int n, HoodieTestDataGenerator dataGenerator) throws IOException {
        Path path = new Path(dfsRoot, filename);
        UtilitiesTestBase.Helpers.saveParquetToDFS(UtilitiesTestBase.Helpers.toGenericRecords(dataGenerator.generateInserts(instantTime, Integer.valueOf(n), false)), path);
        sparkSession.read().parquet(dfsRoot).createOrReplaceTempView("test_sql_table");
    }

    @Test
    public void testSqlSourceSource() throws Exception {
        this.prepareSqlSource();
        String tableBasePath = basePath + "/test_sql_source_table" + testNum++;
        HoodieDeltaStreamer deltaStreamer = new HoodieDeltaStreamer(HoodieDeltaStreamerTestBase.TestHelpers.makeConfig(tableBasePath, WriteOperationType.UPSERT, SqlSource.class.getName(), Collections.emptyList(), "test-sql-source-source.properties", false, false, 2000, false, null, null, "timestamp", null, true), jsc);
        deltaStreamer.sync();
        this.assertRecordCount(1000L, tableBasePath, sqlContext);
        String sourceRoot = basePath + "sqlSourceFiles";
        HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator();
        this.generateSqlSourceTestTable(sourceRoot, "2", "1000", 1000, dataGenerator);
        deltaStreamer.sync();
        this.assertRecordCount(2000L, tableBasePath, sqlContext);
    }

    @Test
    public void testJdbcSourceIncrementalFetchInContinuousMode() {
        try (Connection connection = DriverManager.getConnection("jdbc:h2:mem:test_mem", "test", "jdbc");){
            TypedProperties props = new TypedProperties();
            props.setProperty("hoodie.streamer.jdbc.url", "jdbc:h2:mem:test_mem");
            props.setProperty("hoodie.streamer.jdbc.driver.class", "org.h2.Driver");
            props.setProperty("hoodie.streamer.jdbc.user", "test");
            props.setProperty("hoodie.streamer.jdbc.password", "jdbc");
            props.setProperty("hoodie.streamer.jdbc.table.name", "triprec");
            props.setProperty("hoodie.streamer.jdbc.incr.pull", "true");
            props.setProperty("hoodie.streamer.jdbc.table.incr.column.name", "id");
            props.setProperty("hoodie.datasource.write.recordkey.field", "ID");
            UtilitiesTestBase.Helpers.savePropsToDFS(props, storage, basePath + "/test-jdbc-source.properties");
            int numRecords = 1000;
            int sourceLimit = 100;
            String tableBasePath = basePath + "/triprec";
            HoodieDeltaStreamer.Config cfg = HoodieDeltaStreamerTestBase.TestHelpers.makeConfig(tableBasePath, WriteOperationType.INSERT, JdbcSource.class.getName(), null, "test-jdbc-source.properties", false, false, sourceLimit, false, null, null, "timestamp", null);
            cfg.continuousMode = true;
            JdbcTestUtils.clearAndInsert("000", numRecords, connection, new HoodieTestDataGenerator(), props);
            HoodieDeltaStreamer deltaStreamer = new HoodieDeltaStreamer(cfg, jsc);
            TestHoodieDeltaStreamer.deltaStreamerTestRunner(deltaStreamer, cfg, r -> {
                HoodieDeltaStreamerTestBase.TestHelpers.assertAtleastNCompactionCommits(numRecords / sourceLimit + (numRecords % sourceLimit == 0 ? 0 : 1), tableBasePath);
                this.assertRecordCount(numRecords, tableBasePath, sqlContext);
                return true;
            });
        }
        catch (Exception e) {
            Assertions.fail((String)e.getMessage());
        }
    }

    @Test
    public void testHoodieIncrFallback() throws Exception {
        String tableBasePath = basePath + "/incr_test_table";
        String downstreamTableBasePath = basePath + "/incr_test_downstream_table";
        this.insertInTable(tableBasePath, 1, WriteOperationType.BULK_INSERT);
        HoodieDeltaStreamer.Config downstreamCfg = HoodieDeltaStreamerTestBase.TestHelpers.makeConfigForHudiIncrSrc(tableBasePath, downstreamTableBasePath, WriteOperationType.BULK_INSERT, true, null);
        downstreamCfg.configs.add("hoodie.streamer.source.hoodieincr.num_instants=1");
        new HoodieDeltaStreamer(downstreamCfg, jsc).sync();
        this.insertInTable(tableBasePath, 9, WriteOperationType.UPSERT);
        this.assertRecordCount(1000L, downstreamTableBasePath, sqlContext);
        if (downstreamCfg.configs == null) {
            downstreamCfg.configs = new ArrayList();
        }
        downstreamCfg.configs.remove(downstreamCfg.configs.size() - 1);
        downstreamCfg.configs.add(DataSourceReadOptions.INCREMENTAL_FALLBACK_TO_FULL_TABLE_SCAN().key() + "=true");
        downstreamCfg.configs.add("hoodie.streamer.source.hoodieincr.num_instants=10");
        downstreamCfg.operation = WriteOperationType.UPSERT;
        new HoodieDeltaStreamer(downstreamCfg, jsc).sync();
        new HoodieDeltaStreamer(downstreamCfg, jsc).sync();
        long baseTableRecords = sqlContext.read().format("org.apache.hudi").load(tableBasePath).count();
        long downStreamTableRecords = sqlContext.read().format("org.apache.hudi").load(downstreamTableBasePath).count();
        Assertions.assertEquals((long)baseTableRecords, (long)downStreamTableRecords);
    }

    private void insertInTable(String tableBasePath, int count, WriteOperationType operationType) throws Exception {
        HoodieDeltaStreamer.Config cfg = HoodieDeltaStreamerTestBase.TestHelpers.makeConfig(tableBasePath, operationType, Collections.singletonList(SqlQueryBasedTransformer.class.getName()), "test-source.properties", false);
        if (cfg.configs == null) {
            cfg.configs = new ArrayList();
        }
        cfg.configs.add("hoodie.clean.commits.retained=2");
        cfg.configs.add("hoodie.keep.min.commits=4");
        cfg.configs.add("hoodie.keep.max.commits=5");
        cfg.configs.add("hoodie.test.source.generate.inserts=true");
        for (int i = 0; i < count; ++i) {
            new HoodieDeltaStreamer(cfg, jsc).sync();
        }
    }

    @ParameterizedTest
    @EnumSource(value=HoodieRecord.HoodieRecordType.class, names={"AVRO", "SPARK"})
    public void testInsertOverwrite(HoodieRecord.HoodieRecordType recordType) throws Exception {
        this.testDeltaStreamerWithSpecifiedOperation(basePath + "/insert_overwrite", WriteOperationType.INSERT_OVERWRITE, recordType);
    }

    @ParameterizedTest
    @EnumSource(value=HoodieRecord.HoodieRecordType.class, names={"AVRO", "SPARK"})
    public void testInsertOverwriteTable(HoodieRecord.HoodieRecordType recordType) throws Exception {
        this.testDeltaStreamerWithSpecifiedOperation(basePath + "/insert_overwrite_table", WriteOperationType.INSERT_OVERWRITE_TABLE, recordType);
    }

    @Test
    public void testDeletePartitions() throws Exception {
        TestHoodieDeltaStreamer.prepareParquetDFSFiles(5, PARQUET_SOURCE_ROOT);
        this.prepareParquetDFSSource(false, false, "source.avsc", "target.avsc", "test-parquet-dfs-source.properties", PARQUET_SOURCE_ROOT, false, "partition_path");
        String tableBasePath = basePath + "test_parquet_table" + testNum;
        HoodieDeltaStreamer deltaStreamer = new HoodieDeltaStreamer(HoodieDeltaStreamerTestBase.TestHelpers.makeConfig(tableBasePath, WriteOperationType.INSERT, ParquetDFSSource.class.getName(), null, "test-parquet-dfs-source.properties", false, false, 100000, false, null, null, "timestamp", null), jsc);
        deltaStreamer.sync();
        Assertions.assertFalse((boolean)this.getAllFileIDsInTable(tableBasePath, (Option<String>)Option.of((Object)"2016/03/15")).isEmpty());
        this.assertRecordCount(5L, tableBasePath, sqlContext);
        ++testNum;
        TestHoodieDeltaStreamer.prepareParquetDFSFiles(5, PARQUET_SOURCE_ROOT);
        this.prepareParquetDFSSource(false, false);
        deltaStreamer = new HoodieDeltaStreamer(HoodieDeltaStreamerTestBase.TestHelpers.makeConfig(tableBasePath, WriteOperationType.DELETE_PARTITION, ParquetDFSSource.class.getName(), Collections.singletonList(TestSpecificPartitionTransformer.class.getName()), "test-parquet-dfs-source.properties", false, false, 100000, false, null, null, "timestamp", null), jsc);
        deltaStreamer.sync();
        this.assertNoPartitionMatch(tableBasePath, sqlContext, "2016/03/15");
        Assertions.assertTrue((boolean)this.getAllFileIDsInTable(tableBasePath, (Option<String>)Option.of((Object)"2016/03/15")).isEmpty());
    }

    @Test
    public void testToSortedTruncatedStringSecretsMasked() {
        TypedProperties props = new DFSPropertiesConfiguration(fs.getConf(), new StoragePath(basePath + "/" + "test-source.properties")).getProps();
        props.put((Object)"ssl.trustore.location", (Object)"SSL SECRET KEY");
        props.put((Object)"sasl.jaas.config", (Object)"SASL SECRET KEY");
        props.put((Object)"auth.credentials", (Object)"AUTH CREDENTIALS");
        props.put((Object)"auth.user.info", (Object)"AUTH USER INFO");
        String truncatedKeys = HoodieDeltaStreamer.toSortedTruncatedString((TypedProperties)props);
        Assertions.assertFalse((boolean)truncatedKeys.contains("SSL SECRET KEY"));
        Assertions.assertFalse((boolean)truncatedKeys.contains("SASL SECRET KEY"));
        Assertions.assertFalse((boolean)truncatedKeys.contains("AUTH CREDENTIALS"));
        Assertions.assertFalse((boolean)truncatedKeys.contains("AUTH USER INFO"));
        Assertions.assertTrue((boolean)truncatedKeys.contains("SENSITIVE_INFO_MASKED"));
    }

    void testDeltaStreamerWithSpecifiedOperation(String tableBasePath, WriteOperationType operationType, HoodieRecord.HoodieRecordType recordType) throws Exception {
        HoodieDeltaStreamer.Config cfg = HoodieDeltaStreamerTestBase.TestHelpers.makeConfig(tableBasePath, WriteOperationType.BULK_INSERT);
        this.addRecordMerger(recordType, cfg.configs);
        new HoodieDeltaStreamer(cfg, jsc).sync();
        this.assertRecordCount(1000L, tableBasePath, sqlContext);
        this.assertDistanceCount(1000L, tableBasePath, sqlContext);
        HoodieDeltaStreamerTestBase.TestHelpers.assertCommitMetadata("00000", tableBasePath, 1);
        Set<String> beforeFileIDs = this.getAllFileIDsInTable(tableBasePath, (Option<String>)Option.empty());
        cfg.operation = operationType;
        cfg.sourceLimit = 0L;
        new HoodieDeltaStreamer(cfg, jsc).sync();
        if (operationType == WriteOperationType.INSERT_OVERWRITE) {
            this.assertRecordCount(1000L, tableBasePath, sqlContext);
            this.assertDistanceCount(1000L, tableBasePath, sqlContext);
            HoodieDeltaStreamerTestBase.TestHelpers.assertCommitMetadata("00000", tableBasePath, 1);
        } else if (operationType == WriteOperationType.INSERT_OVERWRITE_TABLE) {
            HoodieTableMetaClient metaClient = HoodieClientTestUtils.createMetaClient((JavaSparkContext)jsc, (String)tableBasePath);
            HoodieTableFileSystemView fsView = HoodieTableFileSystemView.fileListingBasedFileSystemView((HoodieEngineContext)context, (HoodieTableMetaClient)metaClient, (HoodieTimeline)metaClient.getCommitsAndCompactionTimeline());
            Assertions.assertEquals((long)0L, (long)fsView.getLatestFileSlices("").count());
            HoodieDeltaStreamerTestBase.TestHelpers.assertCommitMetadata("00000", tableBasePath, 1);
            Set<String> afterFileIDs = this.getAllFileIDsInTable(tableBasePath, (Option<String>)Option.empty());
            Assertions.assertTrue((boolean)afterFileIDs.isEmpty());
        }
        cfg.sourceLimit = 1000L;
        new HoodieDeltaStreamer(cfg, jsc).sync();
        this.assertRecordCount(950L, tableBasePath, sqlContext);
        this.assertDistanceCount(950L, tableBasePath, sqlContext);
        HoodieDeltaStreamerTestBase.TestHelpers.assertCommitMetadata("00001", tableBasePath, 2);
        UtilitiesTestBase.Helpers.deleteFileFromDfs(fs, tableBasePath);
    }

    @Test
    public void testFetchingCheckpointFromPreviousCommits() throws IOException {
        HoodieDeltaStreamer.Config cfg = HoodieDeltaStreamerTestBase.TestHelpers.makeConfig(basePath + "/testFetchPreviousCheckpoint", WriteOperationType.BULK_INSERT);
        TypedProperties properties = new TypedProperties();
        properties.setProperty("hoodie.datasource.write.recordkey.field", "key");
        properties.setProperty("hoodie.datasource.write.partitionpath.field", "pp");
        DummyStreamSync testDeltaSync = new DummyStreamSync(cfg, sparkSession, null, properties, jsc, fs, jsc.hadoopConfiguration(), null);
        properties.put((Object)HoodieTableConfig.NAME.key(), (Object)"sample_tbl");
        HoodieTableMetaClient metaClient = HoodieTestUtils.init((StorageConfiguration)HadoopFSUtils.getStorageConf((Configuration)jsc.hadoopConfiguration()), (String)basePath, (HoodieTableType)HoodieTableType.COPY_ON_WRITE, (Properties)properties);
        HashMap<String, String> extraMetadata = new HashMap<String, String>();
        extraMetadata.put("deltastreamer.checkpoint.key", "abc");
        TestHoodieDeltaStreamer.addCommitToTimeline(metaClient, extraMetadata);
        metaClient.reloadActiveTimeline();
        Assertions.assertEquals((Object)((HoodieCommitMetadata)StreamerCheckpointUtils.getLatestCommitMetadataWithValidCheckpointInfo((HoodieTimeline)metaClient.getActiveTimeline().getCommitsTimeline()).get()).getMetadata("deltastreamer.checkpoint.key"), (Object)"abc");
        extraMetadata.put("deltastreamer.checkpoint.key", "def");
        TestHoodieDeltaStreamer.addCommitToTimeline(metaClient, extraMetadata);
        metaClient.reloadActiveTimeline();
        Assertions.assertEquals((Object)((HoodieCommitMetadata)StreamerCheckpointUtils.getLatestCommitMetadataWithValidCheckpointInfo((HoodieTimeline)metaClient.getActiveTimeline().getCommitsTimeline()).get()).getMetadata("deltastreamer.checkpoint.key"), (Object)"def");
        TestHoodieDeltaStreamer.addClusterCommitToTimeline(metaClient, Collections.emptyMap());
        metaClient.reloadActiveTimeline();
        Assertions.assertEquals((Object)((HoodieCommitMetadata)StreamerCheckpointUtils.getLatestCommitMetadataWithValidCheckpointInfo((HoodieTimeline)metaClient.getActiveTimeline().getCommitsTimeline()).get()).getMetadata("deltastreamer.checkpoint.key"), (Object)"def");
    }

    @ParameterizedTest
    @EnumSource(value=HoodieRecord.HoodieRecordType.class, names={"AVRO", "SPARK"})
    public void testDropPartitionColumns(HoodieRecord.HoodieRecordType recordType) throws Exception {
        String tableBasePath = basePath + "/test_drop_partition_columns" + testNum++;
        HoodieDeltaStreamer.Config cfg = HoodieDeltaStreamerTestBase.TestHelpers.makeConfig(tableBasePath, WriteOperationType.UPSERT);
        this.addRecordMerger(recordType, cfg.configs);
        cfg.configs.add(String.format("%s=%s", HoodieTableConfig.DROP_PARTITION_COLUMNS.key(), "true"));
        HoodieDeltaStreamer ds = new HoodieDeltaStreamer(cfg, jsc);
        ds.sync();
        HoodieDeltaStreamerTestBase.TestHelpers.assertAtLeastNCommits(1, tableBasePath);
        TableSchemaResolver tableSchemaResolver = new TableSchemaResolver(HoodieTestUtils.createMetaClient((HoodieStorage)storage, (String)tableBasePath));
        Schema tableSchema = tableSchemaResolver.getTableAvroSchemaFromDataFile();
        Assertions.assertNotNull((Object)tableSchema);
        List tableFields = tableSchema.getFields().stream().map(Schema.Field::name).collect(Collectors.toList());
        Assertions.assertFalse((boolean)tableFields.contains("partition_path"));
        UtilitiesTestBase.Helpers.deleteFileFromDfs(fs, tableBasePath);
    }

    @Test
    public void testForceEmptyMetaSync() throws Exception {
        String tableBasePath = basePath + "/test_force_empty_meta_sync";
        HoodieDeltaStreamer.Config cfg = HoodieDeltaStreamerTestBase.TestHelpers.makeConfig(tableBasePath, WriteOperationType.BULK_INSERT);
        cfg.sourceLimit = 0L;
        cfg.allowCommitOnNoCheckpointChange = true;
        cfg.enableMetaSync = true;
        cfg.forceEmptyMetaSync = true;
        new HoodieDeltaStreamer(cfg, jsc, fs, (Configuration)hiveServer.getHiveConf()).sync();
        this.assertRecordCount(0L, tableBasePath, sqlContext);
        HiveSyncConfig hiveSyncConfig = TestHoodieDeltaStreamer.getHiveSyncConfig(tableBasePath, "hive_trips");
        hiveSyncConfig.setHadoopConf((Configuration)hiveServer.getHiveConf());
        HoodieTableMetaClient metaClient = HoodieTableMetaClient.builder().setConf(context.getStorageConf()).setBasePath(tableBasePath).setLoadActiveTimelineOnLoad(true).build();
        HoodieHiveSyncClient hiveClient = new HoodieHiveSyncClient(hiveSyncConfig, metaClient);
        String tableName = hiveSyncConfig.getString(HoodieSyncConfig.META_SYNC_TABLE_NAME);
        Assertions.assertTrue((boolean)hiveClient.tableExists(tableName), (String)("Table " + tableName + " should exist"));
    }

    @Test
    public void testResumeCheckpointAfterChangingCOW2MOR() throws Exception {
        String tableBasePath = basePath + "/test_resume_checkpoint_after_changing_cow_to_mor";
        HoodieDeltaStreamer.Config cfg = HoodieDeltaStreamerTestBase.TestHelpers.makeConfig(tableBasePath, WriteOperationType.BULK_INSERT);
        new HoodieDeltaStreamer(cfg, jsc).sync();
        this.assertRecordCount(1000L, tableBasePath, sqlContext);
        HoodieDeltaStreamerTestBase.TestHelpers.assertCommitMetadata("00000", tableBasePath, 1);
        HoodieDeltaStreamerTestBase.TestHelpers.assertAtLeastNCommits(1, tableBasePath);
        HoodieTableMetaClient metaClient = HoodieTableMetaClient.builder().setConf(storage.getConf().newInstance()).setBasePath(cfg.targetBasePath).setLoadActiveTimelineOnLoad(false).build();
        Properties hoodieProps = new Properties();
        hoodieProps.load((InputStream)fs.open(new Path(cfg.targetBasePath + "/.hoodie/hoodie.properties")));
        LOG.info("old props: {}", (Object)hoodieProps);
        hoodieProps.put("hoodie.table.type", HoodieTableType.MERGE_ON_READ.name());
        LOG.info("new props: {}", (Object)hoodieProps);
        StoragePath metaPathDir = new StoragePath(metaClient.getBasePath(), ".hoodie");
        HoodieTableConfig.create((HoodieStorage)metaClient.getStorage(), (StoragePath)metaPathDir, (Properties)hoodieProps);
        cfg = HoodieDeltaStreamerTestBase.TestHelpers.makeConfig(tableBasePath, WriteOperationType.UPSERT);
        cfg.tableType = HoodieTableType.MERGE_ON_READ.name();
        new HoodieDeltaStreamer(cfg, jsc).sync();
        this.assertRecordCount(1450L, tableBasePath, sqlContext);
        HoodieDeltaStreamerTestBase.TestHelpers.assertCommitMetadata("00001", tableBasePath, 2);
        List<Row> counts = this.countsPerCommit(tableBasePath, sqlContext);
        Assertions.assertEquals((long)1450L, (long)counts.stream().mapToLong(entry -> entry.getLong(1)).sum());
        HoodieDeltaStreamerTestBase.TestHelpers.assertAtLeastNCommits(1, tableBasePath);
        HoodieDeltaStreamerTestBase.TestHelpers.assertAtleastNDeltaCommits(1, tableBasePath);
        new HoodieDeltaStreamer(cfg, jsc).sync();
        this.assertRecordCount(1900L, tableBasePath, sqlContext);
        HoodieDeltaStreamerTestBase.TestHelpers.assertCommitMetadata("00002", tableBasePath, 3);
        counts = this.countsPerCommit(tableBasePath, sqlContext);
        Assertions.assertEquals((long)1900L, (long)counts.stream().mapToLong(entry -> entry.getLong(1)).sum());
        HoodieDeltaStreamerTestBase.TestHelpers.assertAtLeastNCommits(1, tableBasePath);
        HoodieDeltaStreamerTestBase.TestHelpers.assertAtleastNDeltaCommits(2, tableBasePath);
        UtilitiesTestBase.Helpers.deleteFileFromDfs(fs, tableBasePath);
    }

    @Test
    public void testResumeCheckpointAfterChangingMOR2COW() throws Exception {
        String tableBasePath = basePath + "/test_resume_checkpoint_after_changing_mor_to_cow";
        HoodieDeltaStreamer.Config cfg = HoodieDeltaStreamerTestBase.TestHelpers.makeConfig(tableBasePath, WriteOperationType.BULK_INSERT);
        cfg.tableType = HoodieTableType.MERGE_ON_READ.name();
        new HoodieDeltaStreamer(cfg, jsc).sync();
        this.assertRecordCount(1000L, tableBasePath, sqlContext);
        HoodieDeltaStreamerTestBase.TestHelpers.assertCommitMetadata("00000", tableBasePath, 1);
        HoodieDeltaStreamerTestBase.TestHelpers.assertAtLeastNCommits(1, tableBasePath);
        cfg = HoodieDeltaStreamerTestBase.TestHelpers.makeConfig(tableBasePath, WriteOperationType.UPSERT);
        cfg.tableType = HoodieTableType.MERGE_ON_READ.name();
        cfg.configs.add("hoodie.compaction.strategy=org.apache.hudi.table.action.compact.strategy.UnBoundedCompactionStrategy");
        cfg.configs.add("hoodie.compact.inline.max.delta.commits=1");
        new HoodieDeltaStreamer(cfg, jsc).sync();
        this.assertRecordCount(1450L, tableBasePath, sqlContext);
        HoodieDeltaStreamerTestBase.TestHelpers.assertCommitMetadata(null, tableBasePath, 3);
        List<Row> counts = this.countsPerCommit(tableBasePath, sqlContext);
        Assertions.assertEquals((long)1450L, (long)counts.stream().mapToLong(entry -> entry.getLong(1)).sum());
        HoodieDeltaStreamerTestBase.TestHelpers.assertAtLeastNCommits(3, tableBasePath);
        HoodieDeltaStreamerTestBase.TestHelpers.assertAtleastNDeltaCommits(2, tableBasePath);
        HoodieTableMetaClient metaClient = HoodieTableMetaClient.builder().setConf(storage.getConf().newInstance()).setBasePath(cfg.targetBasePath).setLoadActiveTimelineOnLoad(false).build();
        Properties hoodieProps = new Properties();
        hoodieProps.load((InputStream)fs.open(new Path(cfg.targetBasePath + "/.hoodie/hoodie.properties")));
        LOG.info("old props: " + hoodieProps);
        hoodieProps.put("hoodie.table.type", HoodieTableType.COPY_ON_WRITE.name());
        LOG.info("new props: " + hoodieProps);
        StoragePath metaPathDir = new StoragePath(metaClient.getBasePath(), ".hoodie");
        HoodieTableConfig.create((HoodieStorage)metaClient.getStorage(), (StoragePath)metaPathDir, (Properties)hoodieProps);
        cfg = HoodieDeltaStreamerTestBase.TestHelpers.makeConfig(tableBasePath, WriteOperationType.UPSERT);
        cfg.tableType = HoodieTableType.COPY_ON_WRITE.name();
        new HoodieDeltaStreamer(cfg, jsc).sync();
        this.assertRecordCount(1900L, tableBasePath, sqlContext);
        HoodieDeltaStreamerTestBase.TestHelpers.assertCommitMetadata("00002", tableBasePath, 4);
        counts = this.countsPerCommit(tableBasePath, sqlContext);
        Assertions.assertEquals((long)1900L, (long)counts.stream().mapToLong(entry -> entry.getLong(1)).sum());
        HoodieDeltaStreamerTestBase.TestHelpers.assertAtLeastNCommits(4, tableBasePath);
        new HoodieDeltaStreamer(cfg, jsc).sync();
        this.assertRecordCount(2350L, tableBasePath, sqlContext);
        HoodieDeltaStreamerTestBase.TestHelpers.assertCommitMetadata("00003", tableBasePath, 5);
        counts = this.countsPerCommit(tableBasePath, sqlContext);
        Assertions.assertEquals((long)2350L, (long)counts.stream().mapToLong(entry -> entry.getLong(1)).sum());
        HoodieDeltaStreamerTestBase.TestHelpers.assertAtLeastNCommits(5, tableBasePath);
        UtilitiesTestBase.Helpers.deleteFileFromDfs(fs, tableBasePath);
    }

    @Test
    public void testAutoGenerateRecordKeys() throws Exception {
        boolean useSchemaProvider = false;
        List transformerClassNames = null;
        PARQUET_SOURCE_ROOT = basePath + "/parquetFilesDfs" + testNum;
        int parquetRecordsCount = 100;
        boolean hasTransformer = transformerClassNames != null && !transformerClassNames.isEmpty();
        TestHoodieDeltaStreamer.prepareParquetDFSFiles(parquetRecordsCount, PARQUET_SOURCE_ROOT, "1.parquet", false, null, null);
        this.prepareParquetDFSSource(useSchemaProvider, hasTransformer, "source.avsc", "target.avsc", "test-parquet-dfs-source.properties", PARQUET_SOURCE_ROOT, false, "partition_path", "", true);
        String tableBasePath = basePath + "/test_parquet_table" + testNum;
        HoodieDeltaStreamer.Config config = HoodieDeltaStreamerTestBase.TestHelpers.makeConfig(tableBasePath, WriteOperationType.INSERT, ParquetDFSSource.class.getName(), transformerClassNames, "test-parquet-dfs-source.properties", false, useSchemaProvider, 100000, false, null, null, "timestamp", null);
        HoodieDeltaStreamer deltaStreamer = new HoodieDeltaStreamer(config, jsc);
        deltaStreamer.sync();
        this.assertRecordCount(parquetRecordsCount, tableBasePath, sqlContext);
        HoodieTableMetaClient metaClient = HoodieTableMetaClient.builder().setBasePath(tableBasePath).setConf(HoodieTestUtils.getDefaultStorageConf()).build();
        Assertions.assertFalse((boolean)metaClient.getTableConfig().getRecordKeyFields().isPresent());
        TestHoodieDeltaStreamer.prepareParquetDFSFiles(200, PARQUET_SOURCE_ROOT, "2.parquet", false, null, null);
        deltaStreamer.sync();
        this.assertRecordCount(parquetRecordsCount + 200, tableBasePath, sqlContext);
        ++testNum;
    }

    @ParameterizedTest
    @EnumSource(value=HoodieTableType.class)
    public void testConfigurationHotUpdate(HoodieTableType tableType) throws Exception {
        HoodieRecord.HoodieRecordType recordType = HoodieRecord.HoodieRecordType.AVRO;
        String tableBasePath = basePath + String.format("/configurationHotUpdate_%s_%s", tableType.name(), recordType.name());
        HoodieDeltaStreamer.Config cfg = HoodieDeltaStreamerTestBase.TestHelpers.makeConfig(tableBasePath, WriteOperationType.UPSERT);
        this.addRecordMerger(recordType, cfg.configs);
        cfg.continuousMode = true;
        cfg.tableType = tableType.name();
        cfg.configHotUpdateStrategyClass = MockConfigurationHotUpdateStrategy.class.getName();
        long upsertParallelism = 200L;
        cfg.configs.add(String.format("%s=%s", HoodieWriteConfig.UPSERT_PARALLELISM_VALUE.key(), upsertParallelism));
        HoodieDeltaStreamer ds = new HoodieDeltaStreamer(cfg, jsc);
        TestHoodieDeltaStreamer.deltaStreamerTestRunner(ds, cfg, r -> {
            HoodieDeltaStreamerTestBase.TestHelpers.assertAtLeastNCommits(2, tableBasePath);
            Assertions.assertTrue((((HoodieStreamer.StreamSyncService)ds.getIngestionService()).getProps().getLong(HoodieWriteConfig.UPSERT_PARALLELISM_VALUE.key()) > upsertParallelism ? 1 : 0) != 0);
            return true;
        });
        UtilitiesTestBase.Helpers.deleteFileFromDfs(fs, tableBasePath);
    }

    @Test
    public void testBulkInsertWithUserDefinedPartitioner() throws Exception {
        String tableBasePath = basePath + "/test_table_bulk_insert";
        String sortColumn = "weight";
        TypedProperties bulkInsertProps = new DFSPropertiesConfiguration(fs.getConf(), new StoragePath(basePath + "/" + "test-source.properties")).getProps();
        bulkInsertProps.setProperty("hoodie.bulkinsert.shuffle.parallelism", "1");
        bulkInsertProps.setProperty("hoodie.bulkinsert.user.defined.partitioner.class", "org.apache.hudi.execution.bulkinsert.RDDCustomColumnsSortPartitioner");
        bulkInsertProps.setProperty("hoodie.bulkinsert.user.defined.partitioner.sort.columns", sortColumn);
        String bulkInsertPropsFileName = "bulk_insert_override.properties";
        UtilitiesTestBase.Helpers.savePropsToDFS(bulkInsertProps, storage, basePath + "/" + bulkInsertPropsFileName);
        HoodieDeltaStreamer.Config cfg = HoodieDeltaStreamerTestBase.TestHelpers.makeConfig(tableBasePath, WriteOperationType.BULK_INSERT, Collections.singletonList(TripsWithDistanceTransformer.class.getName()), bulkInsertPropsFileName, false);
        this.syncAndAssertRecordCount(cfg, 1000, tableBasePath, "00000", 1);
        HoodieTableMetaClient metaClient = HoodieTableMetaClient.builder().setBasePath(tableBasePath).setConf(HoodieTestUtils.getDefaultStorageConf()).build();
        List partitions = FSUtils.getAllPartitionPaths((HoodieEngineContext)new HoodieLocalEngineContext(metaClient.getStorageConf()), (HoodieStorage)metaClient.getStorage(), (StoragePath)metaClient.getBasePath(), (boolean)false);
        StorageConfiguration hadoopConf = metaClient.getStorageConf();
        HoodieLocalEngineContext engContext = new HoodieLocalEngineContext(hadoopConf);
        HoodieTableFileSystemView fsView = HoodieTableFileSystemView.fileListingBasedFileSystemView((HoodieEngineContext)engContext, (HoodieTableMetaClient)metaClient, (HoodieTimeline)metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants());
        List baseFiles = partitions.parallelStream().flatMap(partition -> fsView.getLatestBaseFiles(partition).map(BaseFile::getPath)).collect(Collectors.toList());
        Assertions.assertEquals((int)baseFiles.size(), (int)partitions.size());
        for (String filePath : baseFiles) {
            HoodieAvroParquetReader parquetReader = new HoodieAvroParquetReader(HoodieTestUtils.getStorage((String)filePath), new StoragePath(filePath));
            Throwable throwable = null;
            try {
                ClosableIterator iterator = parquetReader.getRecordIterator();
                ArrayList<Float> sortColumnValues = new ArrayList<Float>();
                while (iterator.hasNext()) {
                    IndexedRecord indexedRecord = (IndexedRecord)((HoodieRecord)iterator.next()).getData();
                    List fields = indexedRecord.getSchema().getFields();
                    for (int i = 0; i < fields.size(); ++i) {
                        if (!((Schema.Field)fields.get(i)).name().equals(sortColumn)) continue;
                        sortColumnValues.add((Float)indexedRecord.get(i));
                    }
                }
                ArrayList actualSortColumnValues = new ArrayList(sortColumnValues);
                Collections.sort(sortColumnValues);
                Assertions.assertEquals(sortColumnValues, actualSortColumnValues);
            }
            catch (Throwable throwable2) {
                throwable = throwable2;
                throw throwable2;
            }
            finally {
                if (parquetReader == null) continue;
                if (throwable != null) {
                    try {
                        parquetReader.close();
                    }
                    catch (Throwable throwable3) {
                        throwable.addSuppressed(throwable3);
                    }
                    continue;
                }
                parquetReader.close();
            }
        }
    }

    @ParameterizedTest
    @ValueSource(booleans={false, true})
    void testBulkInsertSkewedSortColumns(boolean suffixRecordKey) throws Exception {
        String tableBasePath = basePath + "/test_table_bulk_insert_skewed_sort_columns_" + suffixRecordKey;
        int outputParallelism = 100;
        int columnCardinality = 2;
        String sortColumn = "trip_type";
        TypedProperties bulkInsertProps = new DFSPropertiesConfiguration(fs.getConf(), new StoragePath(basePath + "/" + "test-source.properties")).getProps();
        bulkInsertProps.setProperty(HoodieWriteConfig.BULKINSERT_SUFFIX_RECORD_KEY_SORT_COLUMNS.key(), String.valueOf(suffixRecordKey));
        bulkInsertProps.setProperty("hoodie.bulkinsert.shuffle.parallelism", String.valueOf(outputParallelism));
        bulkInsertProps.setProperty("hoodie.datasource.write.partitionpath.field", "");
        bulkInsertProps.setProperty("hoodie.datasource.write.keygenerator.class", NonpartitionedKeyGenerator.class.getName());
        bulkInsertProps.setProperty("hoodie.bulkinsert.user.defined.partitioner.class", "org.apache.hudi.execution.bulkinsert.RDDCustomColumnsSortPartitioner");
        bulkInsertProps.setProperty("hoodie.bulkinsert.user.defined.partitioner.sort.columns", sortColumn);
        String bulkInsertPropsFileName = "bulk_insert_override.properties";
        UtilitiesTestBase.Helpers.savePropsToDFS(bulkInsertProps, storage, basePath + "/" + bulkInsertPropsFileName);
        HoodieDeltaStreamer.Config cfg = HoodieDeltaStreamerTestBase.TestHelpers.makeConfig(tableBasePath, WriteOperationType.BULK_INSERT, Collections.singletonList(TripsWithDistanceTransformer.class.getName()), bulkInsertPropsFileName, false);
        this.syncAndAssertRecordCount(cfg, 1000, tableBasePath, "00000", 1);
        HoodieTableMetaClient metaClient = HoodieTableMetaClient.builder().setBasePath(tableBasePath).setConf(HoodieTestUtils.getDefaultStorageConf()).build();
        StorageConfiguration hadoopConf = metaClient.getStorageConf();
        HoodieLocalEngineContext engContext = new HoodieLocalEngineContext(hadoopConf);
        HoodieTableFileSystemView fsView = FileSystemViewManager.createInMemoryFileSystemView((HoodieEngineContext)engContext, (HoodieTableMetaClient)metaClient, (HoodieMetadataConfig)HoodieMetadataConfig.newBuilder().enable(false).build());
        List baseFiles = fsView.getLatestBaseFiles("").map(BaseFile::getPath).collect(Collectors.toList());
        if (suffixRecordKey) {
            Assertions.assertEquals((int)baseFiles.size(), (int)outputParallelism);
        } else {
            Assertions.assertEquals((int)baseFiles.size(), (int)columnCardinality);
        }
    }

    @ParameterizedTest
    @MethodSource(value={"generateErrorTablePersistSourceRddArgs"})
    void testErrorTableSourcePersist(WriteOperationType writeOperationType, boolean persistSourceRdd) throws Exception {
        String tableBasePath = basePath + "/test_table_error_table" + persistSourceRdd + writeOperationType;
        TypedProperties tableProps = new DFSPropertiesConfiguration(fs.getConf(), new StoragePath(basePath + "/" + "test-source.properties")).getProps();
        tableProps.setProperty(HoodieErrorTableConfig.ERROR_TABLE_PERSIST_SOURCE_RDD.key(), String.valueOf(persistSourceRdd));
        switch (writeOperationType) {
            case BULK_INSERT: {
                tableProps.setProperty("hoodie.datasource.write.partitionpath.field", "");
                tableProps.setProperty("hoodie.datasource.write.keygenerator.class", NonpartitionedKeyGenerator.class.getName());
                tableProps.setProperty("hoodie.bulkinsert.sort.mode", BulkInsertSortMode.GLOBAL_SORT.name());
                break;
            }
            case UPSERT: {
                tableProps.setProperty("hoodie.datasource.write.recordkey.field", "_row_key");
                tableProps.setProperty("hoodie.datasource.write.partitionpath.field", "partition_path");
                break;
            }
            case INSERT: {
                tableProps.setProperty("hoodie.datasource.write.partitionpath.field", "partition_path");
                break;
            }
            default: {
                throw new UnsupportedOperationException("Invalid write operationType " + writeOperationType);
            }
        }
        String tablePropsFileName = "table_specific.properties";
        UtilitiesTestBase.Helpers.savePropsToDFS(tableProps, storage, basePath + "/" + tablePropsFileName);
        HoodieDeltaStreamer.Config cfg = HoodieDeltaStreamerTestBase.TestHelpers.makeConfig(tableBasePath, writeOperationType, Collections.singletonList(TripsWithDistanceTransformer.class.getName()), tablePropsFileName, false);
        HoodieStreamer deltaStreamer = new HoodieStreamer((HoodieStreamer.Config)cfg, jsc);
        HoodieStreamer.StreamSyncService streamSyncService = (HoodieStreamer.StreamSyncService)deltaStreamer.getIngestionService();
        HoodieTableMetaClient metaClient = HoodieTableMetaClient.builder().setConf(HoodieTestUtils.getDefaultStorageConf()).setBasePath(tableBasePath).build();
        InputBatch inputBatch = (InputBatch)streamSyncService.getStreamSync().readFromSource("00000", metaClient).getLeft();
        JavaRDD sourceRdd = (JavaRDD)inputBatch.getBatch().get();
        Assertions.assertEquals((long)1000L, (long)sourceRdd.count());
        if (persistSourceRdd) {
            Assertions.assertTrue((boolean)sourceRdd.toDebugString().contains("CachedPartitions"));
        } else {
            Assertions.assertFalse((boolean)sourceRdd.toDebugString().contains("CachedPartitions"));
        }
        streamSyncService.close();
        streamSyncService.ingestOnce();
        this.assertRecordCount(950L, tableBasePath, sqlContext);
    }

    private Set<String> getAllFileIDsInTable(String tableBasePath, Option<String> partition) {
        HoodieTableMetaClient metaClient = HoodieClientTestUtils.createMetaClient((JavaSparkContext)jsc, (String)tableBasePath);
        HoodieTableFileSystemView fsView = HoodieTableFileSystemView.fileListingBasedFileSystemView((HoodieEngineContext)context, (HoodieTableMetaClient)metaClient, (HoodieTimeline)metaClient.getCommitsAndCompactionTimeline());
        Stream baseFileStream = partition.isPresent() ? fsView.getLatestBaseFiles((String)partition.get()) : fsView.getLatestBaseFiles();
        return baseFileStream.map(HoodieBaseFile::getFileId).collect(Collectors.toSet());
    }

    private static Stream<Arguments> testORCDFSSource() {
        return Stream.of(Arguments.arguments((Object[])new Object[]{false, null}), Arguments.arguments((Object[])new Object[]{true, Collections.singletonList(TripsWithDistanceTransformer.class.getName())}));
    }

    private static Stream<Arguments> generateErrorTablePersistSourceRddArgs() {
        return Stream.of(Arguments.of((Object[])new Object[]{WriteOperationType.BULK_INSERT, false}), Arguments.of((Object[])new Object[]{WriteOperationType.BULK_INSERT, true}), Arguments.of((Object[])new Object[]{WriteOperationType.INSERT, false}), Arguments.of((Object[])new Object[]{WriteOperationType.INSERT, true}), Arguments.of((Object[])new Object[]{WriteOperationType.UPSERT, false}), Arguments.of((Object[])new Object[]{WriteOperationType.UPSERT, true}));
    }

    public static class NullValueSchemaProvider
    extends SchemaProvider {
        public NullValueSchemaProvider(TypedProperties props) {
            super(props);
        }

        public NullValueSchemaProvider(TypedProperties props, JavaSparkContext jssc) {
            super(props, jssc);
        }

        public Schema getSourceSchema() {
            return null;
        }
    }

    public static class TestFileBasedSchemaProviderNullTargetSchema
    extends FilebasedSchemaProvider {
        public TestFileBasedSchemaProviderNullTargetSchema(TypedProperties props, JavaSparkContext jssc) {
            super(props, jssc);
        }

        public Schema getTargetSchema() {
            return null;
        }
    }

    public static class TripsWithEvolvedOptionalFieldTransformer
    implements Transformer {
        public Dataset<Row> apply(JavaSparkContext jsc, SparkSession sparkSession, Dataset<Row> rowDataset, TypedProperties properties) {
            return rowDataset.withColumn("evoluted_optional_union_field", functions.col((String)"rider"));
        }
    }

    public static class TestSpecificPartitionTransformer
    implements Transformer {
        public Dataset<Row> apply(JavaSparkContext jsc, SparkSession sparkSession, Dataset<Row> rowDataset, TypedProperties properties) {
            Dataset toReturn = rowDataset.filter("partition_path == '2016/03/15'");
            return toReturn;
        }
    }

    public static class TestIdentityTransformer
    implements Transformer {
        public Dataset<Row> apply(JavaSparkContext jsc, SparkSession sparkSession, Dataset<Row> rowDataset, TypedProperties properties) {
            return rowDataset;
        }
    }

    public static class DropAllTransformer
    implements Transformer {
        public Dataset apply(JavaSparkContext jsc, SparkSession sparkSession, Dataset<Row> rowDataset, TypedProperties properties) {
            System.out.println("DropAllTransformer called !!");
            return sparkSession.createDataFrame(jsc.emptyRDD(), rowDataset.schema());
        }
    }

    public static class DummyAvroPayload
    extends OverwriteWithLatestAvroPayload {
        public DummyAvroPayload(GenericRecord gr, Comparable orderingVal) {
            super(gr, orderingVal);
        }
    }

    public static class TestGenerator
    extends SimpleKeyGenerator {
        public TestGenerator(TypedProperties props) {
            super(props);
        }
    }

    public static class TripsWithDistanceTransformer
    implements Transformer {
        public Dataset<Row> apply(JavaSparkContext jsc, SparkSession sparkSession, Dataset<Row> rowDataset, TypedProperties properties) {
            rowDataset.sqlContext().udf().register("distance_udf", (UDF4)new DistanceUDF(), DataTypes.DoubleType);
            return rowDataset.withColumn("haversine_distance", functions.callUDF((String)"distance_udf", (Column[])new Column[]{functions.col((String)"begin_lat"), functions.col((String)"end_lat"), functions.col((String)"begin_lon"), functions.col((String)"end_lat")}));
        }
    }

    public static class DistanceUDF
    implements UDF4<Double, Double, Double, Double, Double> {
        public Double call(Double lat1, Double lat2, Double lon1, Double lon2) {
            return HoodieDeltaStreamerTestBase.RANDOM.nextDouble();
        }
    }

    class TestReleaseResourcesStreamSync
    extends DeltaSync {
        private final Set<String> releaseResourcesCalledSet;

        public TestReleaseResourcesStreamSync(HoodieDeltaStreamer.Config cfg, SparkSession sparkSession, SchemaProvider schemaProvider, TypedProperties props, JavaSparkContext jssc, FileSystem fs, Configuration conf, Function<SparkRDDWriteClient, Boolean> onInitializingHoodieWriteClient) throws IOException {
            super((HoodieStreamer.Config)cfg, sparkSession, schemaProvider, props, jssc, fs, conf, onInitializingHoodieWriteClient);
            this.releaseResourcesCalledSet = new HashSet<String>();
        }

        protected void releaseResources(String instantTime) {
            super.releaseResources(instantTime);
            this.releaseResourcesCalledSet.add(instantTime);
        }
    }

    static class DummyStreamSync
    extends StreamSync {
        public DummyStreamSync(HoodieDeltaStreamer.Config cfg, SparkSession sparkSession, SchemaProvider schemaProvider, TypedProperties props, JavaSparkContext jssc, FileSystem fs, Configuration conf, Function<SparkRDDWriteClient, Boolean> onInitializingHoodieWriteClient) throws IOException {
            super((HoodieStreamer.Config)cfg, sparkSession, schemaProvider, props, jssc, fs, conf, onInitializingHoodieWriteClient);
        }
    }
}

