/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.utilities.deltastreamer;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Random;
import java.util.UUID;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.function.BiPredicate;
import java.util.function.BooleanSupplier;
import java.util.function.Function;
import org.apache.avro.Schema;
import org.apache.hadoop.fs.Path;
import org.apache.hudi.common.config.HoodieCommonConfig;
import org.apache.hudi.common.config.RecordMergeMode;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieReplaceCommitMetadata;
import org.apache.hudi.common.model.OverwriteWithLatestAvroPayload;
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.common.table.HoodieTableConfig;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.HoodieTableVersion;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.table.timeline.InstantComparison;
import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
import org.apache.hudi.common.testutils.HoodieTestUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.common.util.TestAvroOrcUtils;
import org.apache.hudi.common.util.collection.Triple;
import org.apache.hudi.config.HoodieCleanConfig;
import org.apache.hudi.config.HoodieClusteringConfig;
import org.apache.hudi.hive.HiveSyncConfigHolder;
import org.apache.hudi.hive.MultiPartKeysValueExtractor;
import org.apache.hudi.hive.testutils.HiveTestService;
import org.apache.hudi.storage.HoodieStorage;
import org.apache.hudi.storage.StorageConfiguration;
import org.apache.hudi.sync.common.HoodieSyncConfig;
import org.apache.hudi.utilities.config.HoodieStreamerConfig;
import org.apache.hudi.utilities.config.KafkaSourceConfig;
import org.apache.hudi.utilities.config.SourceTestConfig;
import org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer;
import org.apache.hudi.utilities.deltastreamer.TestHoodieDeltaStreamer;
import org.apache.hudi.utilities.schema.FilebasedSchemaProvider;
import org.apache.hudi.utilities.sources.HoodieIncrSource;
import org.apache.hudi.utilities.sources.TestDataSource;
import org.apache.hudi.utilities.sources.TestParquetDFSSourceEmptyBatch;
import org.apache.hudi.utilities.testutils.UtilitiesTestBase;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.streaming.kafka010.KafkaTestUtils;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class HoodieDeltaStreamerTestBase
extends UtilitiesTestBase {
    private static final Logger LOG = LoggerFactory.getLogger(HoodieDeltaStreamerTestBase.class);
    static final Random RANDOM = new Random();
    static final String PROPS_FILENAME_TEST_SOURCE = "test-source.properties";
    static final String PROPS_FILENAME_TEST_SOURCE1 = "test-source1.properties";
    static final String PROPS_INVALID_HIVE_SYNC_TEST_SOURCE1 = "test-invalid-hive-sync-source1.properties";
    static final String PROPS_INVALID_FILE = "test-invalid-props.properties";
    static final String PROPS_INVALID_TABLE_CONFIG_FILE = "test-invalid-table-config.properties";
    static final String PROPS_FILENAME_TEST_INVALID = "test-invalid.properties";
    static final String PROPS_FILENAME_INFER_COMPLEX_KEYGEN = "test-infer-complex-keygen.properties";
    static final String PROPS_FILENAME_INFER_NONPARTITIONED_KEYGEN = "test-infer-nonpartitioned-keygen.properties";
    static final String PROPS_FILENAME_TEST_CSV = "test-csv-dfs-source.properties";
    static final String PROPS_FILENAME_TEST_PARQUET = "test-parquet-dfs-source.properties";
    static final String PROPS_FILENAME_TEST_ORC = "test-orc-dfs-source.properties";
    static final String PROPS_FILENAME_TEST_JSON_KAFKA = "test-json-kafka-dfs-source.properties";
    static final String PROPS_FILENAME_TEST_AVRO_KAFKA = "test-avro-kafka-dfs-source.properties";
    static final String PROPS_FILENAME_TEST_SQL_SOURCE = "test-sql-source-source.properties";
    static final String PROPS_FILENAME_TEST_MULTI_WRITER = "test-multi-writer.properties";
    static final String FIRST_PARQUET_FILE_NAME = "1.parquet";
    static final String FIRST_ORC_FILE_NAME = "1.orc";
    static String PARQUET_SOURCE_ROOT;
    static String ORC_SOURCE_ROOT;
    static String JSON_KAFKA_SOURCE_ROOT;
    static final int PARQUET_NUM_RECORDS = 5;
    static final int ORC_NUM_RECORDS = 5;
    static final int CSV_NUM_RECORDS = 3;
    static final int JSON_KAFKA_NUM_RECORDS = 5;
    static final int SQL_SOURCE_NUM_RECORDS = 1000;
    String kafkaCheckpointType = "string";
    static final String TGT_BASE_PATH_PARAM = "--target-base-path";
    static final String TGT_BASE_PATH_VALUE = "s3://mybucket/blah";
    static final String TABLE_TYPE_PARAM = "--table-type";
    static final String TABLE_TYPE_VALUE = "COPY_ON_WRITE";
    static final String TARGET_TABLE_PARAM = "--target-table";
    static final String TARGET_TABLE_VALUE = "test";
    static final String BASE_FILE_FORMAT_PARAM = "--base-file-format";
    static final String BASE_FILE_FORMAT_VALUE = "PARQUET";
    static final String SOURCE_LIMIT_PARAM = "--source-limit";
    static final String SOURCE_LIMIT_VALUE = "500";
    static final String ENABLE_HIVE_SYNC_PARAM = "--enable-hive-sync";
    static final String HOODIE_CONF_PARAM = "--hoodie-conf";
    static final String HOODIE_CONF_VALUE1 = "hoodie.datasource.hive_sync.table=test_table";
    static final String HOODIE_CONF_VALUE2 = "hoodie.datasource.write.recordkey.field=Field1,Field2,Field3";
    protected static String topicName;
    protected static String defaultSchemaProviderClassName;
    protected static int testNum;
    Map<String, String> hudiOpts = new HashMap<String, String>();
    public KafkaTestUtils testUtils;

    @BeforeEach
    protected void prepareTestSetup() throws IOException {
        this.setupTest();
        this.testUtils = new KafkaTestUtils();
        this.testUtils.setup();
        topicName = "topic" + testNum;
        HoodieDeltaStreamerTestBase.prepareInitialConfigs(storage, basePath, this.testUtils.brokerAddress());
    }

    @AfterEach
    public void cleanupKafkaTestUtils() {
        if (this.testUtils != null) {
            this.testUtils.teardown();
            this.testUtils = null;
        }
        if (this.hudiOpts != null) {
            this.hudiOpts = null;
        }
    }

    @BeforeAll
    public static void initClass() throws Exception {
        UtilitiesTestBase.initTestServices(false, true, false);
        PARQUET_SOURCE_ROOT = basePath + "parquetFiles";
        ORC_SOURCE_ROOT = basePath + "orcFiles";
        JSON_KAFKA_SOURCE_ROOT = basePath + "jsonKafkaFiles";
    }

    @AfterAll
    public static void tearDown() {
        UtilitiesTestBase.cleanUpUtilitiesTestServices();
    }

    public void setupTest() {
        TestDataSource.returnEmptyBatch = false;
        this.hudiOpts = new HashMap<String, String>();
    }

    protected static void prepareInitialConfigs(HoodieStorage storage, String dfsBasePath, String brokerAddress) throws IOException {
        UtilitiesTestBase.Helpers.copyToDFS("streamer-config/base.properties", storage, dfsBasePath + "/base.properties");
        UtilitiesTestBase.Helpers.copyToDFS("streamer-config/base.properties", storage, dfsBasePath + "/config/base.properties");
        UtilitiesTestBase.Helpers.copyToDFS("streamer-config/sql-transformer.properties", storage, dfsBasePath + "/sql-transformer.properties");
        UtilitiesTestBase.Helpers.copyToDFS("streamer-config/source.avsc", storage, dfsBasePath + "/source.avsc");
        UtilitiesTestBase.Helpers.copyToDFS("streamer-config/source_evolved.avsc", storage, dfsBasePath + "/source_evolved.avsc");
        UtilitiesTestBase.Helpers.copyToDFS("streamer-config/source-flattened.avsc", storage, dfsBasePath + "/source-flattened.avsc");
        UtilitiesTestBase.Helpers.copyToDFS("streamer-config/target.avsc", storage, dfsBasePath + "/target.avsc");
        UtilitiesTestBase.Helpers.copyToDFS("streamer-config/target-flattened.avsc", storage, dfsBasePath + "/target-flattened.avsc");
        UtilitiesTestBase.Helpers.copyToDFS("streamer-config/source_short_trip_uber.avsc", storage, dfsBasePath + "/source_short_trip_uber.avsc");
        UtilitiesTestBase.Helpers.copyToDFS("streamer-config/source_uber.avsc", storage, dfsBasePath + "/source_uber.avsc");
        UtilitiesTestBase.Helpers.copyToDFS("streamer-config/source_uber_encoded_decimal.json", storage, dfsBasePath + "/source_uber_encoded_decimal.json");
        UtilitiesTestBase.Helpers.copyToDFS("streamer-config/source_uber_encoded_decimal.avsc", storage, dfsBasePath + "/source_uber_encoded_decimal.avsc");
        UtilitiesTestBase.Helpers.copyToDFS("streamer-config/target_short_trip_uber.avsc", storage, dfsBasePath + "/target_short_trip_uber.avsc");
        UtilitiesTestBase.Helpers.copyToDFS("streamer-config/target_uber.avsc", storage, dfsBasePath + "/target_uber.avsc");
        UtilitiesTestBase.Helpers.copyToDFS("streamer-config/invalid_hive_sync_uber_config.properties", storage, dfsBasePath + "/config/invalid_hive_sync_uber_config.properties");
        UtilitiesTestBase.Helpers.copyToDFS("streamer-config/uber_config.properties", storage, dfsBasePath + "/config/uber_config.properties");
        UtilitiesTestBase.Helpers.copyToDFS("streamer-config/short_trip_uber_config.properties", storage, dfsBasePath + "/config/short_trip_uber_config.properties");
        UtilitiesTestBase.Helpers.copyToDFS("streamer-config/clusteringjob.properties", storage, dfsBasePath + "/clusteringjob.properties");
        UtilitiesTestBase.Helpers.copyToDFS("streamer-config/indexer.properties", storage, dfsBasePath + "/indexer.properties");
        HoodieDeltaStreamerTestBase.writeCommonPropsToFile(storage, dfsBasePath);
        TypedProperties downstreamProps = new TypedProperties();
        downstreamProps.setProperty("include", "base.properties");
        downstreamProps.setProperty("hoodie.datasource.write.recordkey.field", "_row_key");
        downstreamProps.setProperty("hoodie.datasource.write.partitionpath.field", "partition_path");
        downstreamProps.setProperty("hoodie.streamer.schemaprovider.source.schema.file", dfsBasePath + "/target.avsc");
        downstreamProps.setProperty("hoodie.streamer.schemaprovider.target.schema.file", dfsBasePath + "/target.avsc");
        UtilitiesTestBase.Helpers.savePropsToDFS(downstreamProps, storage, dfsBasePath + "/test-downstream-source.properties");
        TypedProperties invalidProps = new TypedProperties();
        invalidProps.setProperty("include", "sql-transformer.properties");
        invalidProps.setProperty("hoodie.datasource.write.keygenerator.class", "invalid");
        invalidProps.setProperty("hoodie.datasource.write.recordkey.field", "_row_key");
        invalidProps.setProperty("hoodie.datasource.write.partitionpath.field", "partition_path");
        invalidProps.setProperty("hoodie.streamer.schemaprovider.source.schema.file", dfsBasePath + "/source.avsc");
        invalidProps.setProperty("hoodie.streamer.schemaprovider.target.schema.file", dfsBasePath + "/target.avsc");
        UtilitiesTestBase.Helpers.savePropsToDFS(invalidProps, storage, dfsBasePath + "/" + PROPS_FILENAME_TEST_INVALID);
        TypedProperties inferKeygenProps = new TypedProperties();
        inferKeygenProps.setProperty("include", "base.properties");
        inferKeygenProps.setProperty("hoodie.datasource.write.recordkey.field", "timestamp,_row_key");
        inferKeygenProps.setProperty("hoodie.datasource.write.partitionpath.field", "partition_path");
        inferKeygenProps.setProperty("hoodie.streamer.schemaprovider.source.schema.file", dfsBasePath + "/source.avsc");
        inferKeygenProps.setProperty("hoodie.streamer.schemaprovider.target.schema.file", dfsBasePath + "/target.avsc");
        UtilitiesTestBase.Helpers.savePropsToDFS(inferKeygenProps, storage, dfsBasePath + "/" + PROPS_FILENAME_INFER_COMPLEX_KEYGEN);
        inferKeygenProps.setProperty("hoodie.datasource.write.partitionpath.field", "");
        UtilitiesTestBase.Helpers.savePropsToDFS(inferKeygenProps, storage, dfsBasePath + "/" + PROPS_FILENAME_INFER_NONPARTITIONED_KEYGEN);
        TypedProperties props1 = new TypedProperties();
        HoodieDeltaStreamerTestBase.populateAllCommonProps(props1, dfsBasePath, brokerAddress);
        UtilitiesTestBase.Helpers.savePropsToDFS(props1, storage, dfsBasePath + "/" + PROPS_FILENAME_TEST_SOURCE1);
        TypedProperties properties = new TypedProperties();
        HoodieDeltaStreamerTestBase.populateInvalidTableConfigFilePathProps(properties, dfsBasePath);
        UtilitiesTestBase.Helpers.savePropsToDFS(properties, storage, dfsBasePath + "/" + PROPS_INVALID_TABLE_CONFIG_FILE);
        TypedProperties invalidHiveSyncProps = new TypedProperties();
        invalidHiveSyncProps.setProperty("hoodie.streamer.ingestion.tablesToBeIngested", "uber_db.dummy_table_uber");
        invalidHiveSyncProps.setProperty("hoodie.streamer.ingestion.uber_db.dummy_table_uber.configFile", dfsBasePath + "/config/invalid_hive_sync_uber_config.properties");
        UtilitiesTestBase.Helpers.savePropsToDFS(invalidHiveSyncProps, storage, dfsBasePath + "/" + PROPS_INVALID_HIVE_SYNC_TEST_SOURCE1);
    }

    protected static void writeCommonPropsToFile(HoodieStorage storage, String dfsBasePath) throws IOException {
        TypedProperties props = new TypedProperties();
        props.setProperty("include", "sql-transformer.properties");
        props.setProperty("hoodie.datasource.write.keygenerator.class", TestHoodieDeltaStreamer.TestGenerator.class.getName());
        props.setProperty("hoodie.datasource.write.recordkey.field", "_row_key");
        props.setProperty("hoodie.datasource.write.partitionpath.field", "partition_path");
        props.setProperty("hoodie.streamer.schemaprovider.source.schema.file", dfsBasePath + "/source.avsc");
        props.setProperty("hoodie.streamer.schemaprovider.target.schema.file", dfsBasePath + "/target.avsc");
        props.setProperty(HiveSyncConfigHolder.HIVE_URL.key(), HiveTestService.HS2_JDBC_URL);
        props.setProperty(HoodieSyncConfig.META_SYNC_DATABASE_NAME.key(), "testdb1");
        props.setProperty(HoodieSyncConfig.META_SYNC_TABLE_NAME.key(), "hive_trips");
        props.setProperty(HoodieSyncConfig.META_SYNC_PARTITION_FIELDS.key(), "datestr");
        props.setProperty(HoodieSyncConfig.META_SYNC_PARTITION_EXTRACTOR_CLASS.key(), MultiPartKeysValueExtractor.class.getName());
        UtilitiesTestBase.Helpers.savePropsToDFS(props, storage, dfsBasePath + "/" + PROPS_FILENAME_TEST_SOURCE);
    }

    protected static void populateInvalidTableConfigFilePathProps(TypedProperties props, String dfsBasePath) {
        props.setProperty("hoodie.datasource.write.keygenerator.class", TestHoodieDeltaStreamer.TestGenerator.class.getName());
        props.setProperty("hoodie.keygen.timebased.output.dateformat", "yyyyMMdd");
        props.setProperty("hoodie.streamer.ingestion.tablesToBeIngested", "uber_db.dummy_table_uber");
        props.setProperty("hoodie.streamer.ingestion.uber_db.dummy_table_uber.configFile", dfsBasePath + "/config/invalid_uber_config.properties");
    }

    protected static void populateAllCommonProps(TypedProperties props, String dfsBasePath, String brokerAddress) {
        HoodieDeltaStreamerTestBase.populateCommonProps(props, dfsBasePath);
        HoodieDeltaStreamerTestBase.populateCommonKafkaProps(props, brokerAddress);
        HoodieDeltaStreamerTestBase.populateCommonHiveProps(props);
    }

    protected static void populateCommonProps(TypedProperties props, String dfsBasePath) {
        props.setProperty("hoodie.datasource.write.keygenerator.class", TestHoodieDeltaStreamer.TestGenerator.class.getName());
        props.setProperty("hoodie.keygen.timebased.output.dateformat", "yyyyMMdd");
        props.setProperty("hoodie.streamer.ingestion.tablesToBeIngested", "short_trip_db.dummy_table_short_trip,uber_db.dummy_table_uber");
        props.setProperty("hoodie.streamer.ingestion.uber_db.dummy_table_uber.configFile", dfsBasePath + "/config/uber_config.properties");
        props.setProperty("hoodie.streamer.ingestion.short_trip_db.dummy_table_short_trip.configFile", dfsBasePath + "/config/short_trip_uber_config.properties");
    }

    protected static void populateCommonKafkaProps(TypedProperties props, String brokerAddress) {
        props.setProperty("bootstrap.servers", brokerAddress);
        props.setProperty("auto.offset.reset", "earliest");
        props.setProperty("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.setProperty("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.setProperty("hoodie.streamer.kafka.source.maxEvents", String.valueOf(5000));
    }

    protected static void populateCommonHiveProps(TypedProperties props) {
        props.setProperty(HiveSyncConfigHolder.HIVE_URL.key(), HiveTestService.HS2_JDBC_URL);
        props.setProperty(HoodieSyncConfig.META_SYNC_DATABASE_NAME.key(), "testdb2");
        props.setProperty(HoodieSyncConfig.META_SYNC_PARTITION_FIELDS.key(), "datestr");
        props.setProperty(HoodieSyncConfig.META_SYNC_PARTITION_EXTRACTOR_CLASS.key(), MultiPartKeysValueExtractor.class.getName());
    }

    protected static void prepareParquetDFSFiles(int numRecords) throws IOException {
        HoodieDeltaStreamerTestBase.prepareParquetDFSFiles(numRecords, PARQUET_SOURCE_ROOT);
    }

    protected static void prepareParquetDFSFiles(int numRecords, String baseParquetPath) throws IOException {
        HoodieDeltaStreamerTestBase.prepareParquetDFSFiles(numRecords, baseParquetPath, FIRST_PARQUET_FILE_NAME, false, null, null).close();
    }

    protected static void prepareParquetDFSMultiFiles(int numRecords, String baseParquetPath, int numFiles) throws IOException {
        if (numFiles <= 0) {
            throw new IllegalArgumentException("Number of files must be greater than zero");
        }
        int recordsPerFile = numRecords / numFiles;
        int extraRecords = numRecords % numFiles;
        for (int i = 0; i < numFiles; ++i) {
            int recordsInThisFile;
            String fileName = String.format("%05d", i) + ".parquet";
            int n = recordsInThisFile = i == numFiles - 1 ? recordsPerFile + extraRecords : recordsPerFile;
            if (recordsInThisFile <= 0) continue;
            HoodieDeltaStreamerTestBase.prepareParquetDFSFiles(recordsInThisFile, baseParquetPath, fileName, false, null, null).close();
        }
    }

    protected static HoodieTestDataGenerator prepareParquetDFSFiles(int numRecords, String baseParquetPath, String fileName, boolean useCustomSchema, String schemaStr, Schema schema) throws IOException {
        return HoodieDeltaStreamerTestBase.prepareParquetDFSFiles(numRecords, baseParquetPath, fileName, useCustomSchema, schemaStr, schema, false);
    }

    protected static HoodieTestDataGenerator prepareParquetDFSFiles(int numRecords, String baseParquetPath, String fileName, boolean useCustomSchema, String schemaStr, Schema schema, boolean makeDatesAmbiguous) throws IOException {
        String path = baseParquetPath + "/" + fileName;
        HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator(makeDatesAmbiguous);
        if (useCustomSchema) {
            UtilitiesTestBase.Helpers.saveParquetToDFS(UtilitiesTestBase.Helpers.toGenericRecords(dataGenerator.generateInsertsAsPerSchema("000", Integer.valueOf(numRecords), schemaStr), schema), new Path(path), HoodieTestDataGenerator.AVRO_TRIP_SCHEMA);
        } else {
            UtilitiesTestBase.Helpers.saveParquetToDFS(UtilitiesTestBase.Helpers.toGenericRecords(dataGenerator.generateInserts("000", Integer.valueOf(numRecords))), new Path(path));
        }
        return dataGenerator;
    }

    protected static void prepareParquetDFSUpdates(int numRecords, String baseParquetPath, String fileName, boolean useCustomSchema, String schemaStr, Schema schema, HoodieTestDataGenerator dataGenerator, String timestamp) throws IOException {
        String path = baseParquetPath + "/" + fileName;
        if (useCustomSchema) {
            UtilitiesTestBase.Helpers.saveParquetToDFS(UtilitiesTestBase.Helpers.toGenericRecords(dataGenerator.generateUpdatesAsPerSchema(timestamp, Integer.valueOf(numRecords), schemaStr), schema), new Path(path), HoodieTestDataGenerator.AVRO_TRIP_SCHEMA);
        } else {
            UtilitiesTestBase.Helpers.saveParquetToDFS(UtilitiesTestBase.Helpers.toGenericRecords(dataGenerator.generateUpdates(timestamp, Integer.valueOf(numRecords))), new Path(path));
        }
    }

    protected void prepareParquetDFSSource(boolean useSchemaProvider, boolean hasTransformer, String emptyBatchParam) throws IOException {
        this.prepareParquetDFSSource(useSchemaProvider, hasTransformer, "source.avsc", "target.avsc", PROPS_FILENAME_TEST_PARQUET, PARQUET_SOURCE_ROOT, false, "partition_path", emptyBatchParam, false);
    }

    protected void prepareParquetDFSSource(boolean useSchemaProvider, boolean hasTransformer) throws IOException {
        this.prepareParquetDFSSource(useSchemaProvider, hasTransformer, "");
    }

    protected void prepareParquetDFSSource(boolean useSchemaProvider, boolean hasTransformer, String sourceSchemaFile, String targetSchemaFile, String propsFileName, String parquetSourceRoot, boolean addCommonProps, String partitionPath) throws IOException {
        this.prepareParquetDFSSource(useSchemaProvider, hasTransformer, sourceSchemaFile, targetSchemaFile, propsFileName, parquetSourceRoot, addCommonProps, partitionPath, "", false);
    }

    protected void prepareParquetDFSSource(boolean useSchemaProvider, boolean hasTransformer, String sourceSchemaFile, String targetSchemaFile, String propsFileName, String parquetSourceRoot, boolean addCommonProps, String partitionPath, String emptyBatchParam) throws IOException {
        this.prepareParquetDFSSource(useSchemaProvider, hasTransformer, sourceSchemaFile, targetSchemaFile, propsFileName, parquetSourceRoot, addCommonProps, partitionPath, emptyBatchParam, false);
    }

    protected void prepareParquetDFSSource(boolean useSchemaProvider, boolean hasTransformer, String sourceSchemaFile, String targetSchemaFile, String propsFileName, String parquetSourceRoot, boolean addCommonProps, String partitionPath, String emptyBatchParam, boolean skipRecordKeyField) throws IOException {
        this.prepareParquetDFSSource(useSchemaProvider, hasTransformer, sourceSchemaFile, targetSchemaFile, propsFileName, parquetSourceRoot, addCommonProps, partitionPath, emptyBatchParam, null, skipRecordKeyField);
    }

    protected void prepareParquetDFSSource(boolean useSchemaProvider, boolean hasTransformer, String sourceSchemaFile, String targetSchemaFile, String propsFileName, String parquetSourceRoot, boolean addCommonProps, String partitionPath, String emptyBatchParam, TypedProperties extraProps, boolean skipRecordKeyField) throws IOException {
        TypedProperties parquetProps = TypedProperties.copy((Properties)extraProps);
        if (addCommonProps) {
            HoodieDeltaStreamerTestBase.populateCommonProps(parquetProps, basePath);
        }
        parquetProps.setProperty("hoodie.datasource.write.keygenerator.class", TestHoodieDeltaStreamer.TestGenerator.class.getName());
        parquetProps.setProperty("include", "base.properties");
        parquetProps.setProperty("hoodie.embed.timeline.server", "false");
        if (!skipRecordKeyField) {
            parquetProps.setProperty("hoodie.datasource.write.recordkey.field", "_row_key");
        }
        parquetProps.setProperty("hoodie.datasource.write.partitionpath.field", partitionPath);
        if (useSchemaProvider) {
            parquetProps.setProperty("hoodie.streamer.schemaprovider.source.schema.file", basePath + "/" + sourceSchemaFile);
            if (hasTransformer) {
                parquetProps.setProperty("hoodie.streamer.schemaprovider.target.schema.file", basePath + "/" + targetSchemaFile);
            }
        }
        parquetProps.setProperty("hoodie.streamer.source.dfs.root", parquetSourceRoot);
        if (!StringUtils.isNullOrEmpty((String)emptyBatchParam)) {
            parquetProps.setProperty(TestParquetDFSSourceEmptyBatch.RETURN_EMPTY_BATCH, emptyBatchParam);
        }
        UtilitiesTestBase.Helpers.savePropsToDFS(parquetProps, storage, basePath + "/" + propsFileName);
    }

    protected void prepareAvroKafkaDFSSource(String propsFileName, Long maxEventsToReadFromKafkaSource, String topicName, String partitionPath, TypedProperties extraProps) throws IOException {
        TypedProperties props = TypedProperties.copy((Properties)extraProps);
        props.setProperty("bootstrap.servers", this.testUtils.brokerAddress());
        props.put((Object)HoodieStreamerConfig.KAFKA_APPEND_OFFSETS.key(), (Object)"false");
        props.setProperty("auto.offset.reset", "earliest");
        props.setProperty("include", "base.properties");
        props.setProperty("hoodie.embed.timeline.server", "false");
        props.setProperty("hoodie.datasource.write.recordkey.field", "_row_key");
        props.setProperty("hoodie.datasource.write.partitionpath.field", partitionPath);
        props.setProperty("hoodie.streamer.source.kafka.topic", topicName);
        props.setProperty("hoodie.streamer.kafka.source.maxEvents", String.valueOf(5000));
        props.setProperty("enable.auto.commit", "false");
        props.setProperty(KafkaSourceConfig.KAFKA_AVRO_VALUE_DESERIALIZER_CLASS.key(), ByteArrayDeserializer.class.getName());
        props.setProperty("hoodie.streamer.kafka.source.maxEvents", maxEventsToReadFromKafkaSource != null ? String.valueOf(maxEventsToReadFromKafkaSource) : String.valueOf(KafkaSourceConfig.MAX_EVENTS_FROM_KAFKA_SOURCE.defaultValue()));
        props.setProperty("group.id", UUID.randomUUID().toString());
        UtilitiesTestBase.Helpers.savePropsToDFS(props, storage, basePath + "/" + propsFileName);
    }

    protected static void prepareORCDFSFiles(int numRecords) throws IOException {
        HoodieDeltaStreamerTestBase.prepareORCDFSFiles(numRecords, ORC_SOURCE_ROOT);
    }

    protected static void prepareORCDFSFiles(int numRecords, String baseORCPath) throws IOException {
        HoodieDeltaStreamerTestBase.prepareORCDFSFiles(numRecords, baseORCPath, FIRST_ORC_FILE_NAME, false, null, null);
    }

    protected static void prepareORCDFSFiles(int numRecords, String baseORCPath, String fileName, boolean useCustomSchema, String schemaStr, Schema schema) throws IOException {
        String path = baseORCPath + "/" + fileName;
        HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator();
        if (useCustomSchema) {
            UtilitiesTestBase.Helpers.saveORCToDFS(UtilitiesTestBase.Helpers.toGenericRecords(dataGenerator.generateInsertsAsPerSchema("000", Integer.valueOf(numRecords), schemaStr), schema), new Path(path), TestAvroOrcUtils.ORC_TRIP_SCHEMA);
        } else {
            UtilitiesTestBase.Helpers.saveORCToDFS(UtilitiesTestBase.Helpers.toGenericRecords(dataGenerator.generateInserts("000", Integer.valueOf(numRecords))), new Path(path));
        }
    }

    static List<String> getTableServicesConfigs(int totalRecords, String autoClean, String inlineCluster, String inlineClusterMaxCommit, String asyncCluster, String asyncClusterMaxCommit) {
        ArrayList<String> configs = new ArrayList<String>();
        configs.add(String.format("%s=%d", SourceTestConfig.MAX_UNIQUE_RECORDS_PROP.key(), totalRecords));
        if (StringUtils.nonEmpty((String)autoClean)) {
            configs.add(String.format("%s=%s", HoodieCleanConfig.AUTO_CLEAN.key(), autoClean));
        }
        if (StringUtils.nonEmpty((String)inlineCluster)) {
            configs.add(String.format("%s=%s", HoodieClusteringConfig.INLINE_CLUSTERING.key(), inlineCluster));
        }
        if (StringUtils.nonEmpty((String)inlineClusterMaxCommit)) {
            configs.add(String.format("%s=%s", HoodieClusteringConfig.INLINE_CLUSTERING_MAX_COMMITS.key(), inlineClusterMaxCommit));
        }
        if (StringUtils.nonEmpty((String)asyncCluster)) {
            configs.add(String.format("%s=%s", HoodieClusteringConfig.ASYNC_CLUSTERING_ENABLE.key(), asyncCluster));
        }
        if (StringUtils.nonEmpty((String)asyncClusterMaxCommit)) {
            configs.add(String.format("%s=%s", HoodieClusteringConfig.ASYNC_CLUSTERING_MAX_COMMITS.key(), asyncClusterMaxCommit));
        }
        return configs;
    }

    static void addCommitToTimeline(HoodieTableMetaClient metaClient) throws IOException {
        HoodieDeltaStreamerTestBase.addCommitToTimeline(metaClient, Collections.emptyMap());
    }

    static void addCommitToTimeline(HoodieTableMetaClient metaClient, Map<String, String> extraMetadata) throws IOException {
        HoodieDeltaStreamerTestBase.addCommitToTimeline(metaClient, WriteOperationType.UPSERT, "commit", extraMetadata);
    }

    static void addClusterCommitToTimeline(HoodieTableMetaClient metaClient, Map<String, String> extraMetadata) throws IOException {
        HoodieDeltaStreamerTestBase.addCommitToTimeline(metaClient, WriteOperationType.CLUSTER, "clustering", extraMetadata);
    }

    static void addCommitToTimeline(HoodieTableMetaClient metaClient, WriteOperationType writeOperationType, String commitActiontype, Map<String, String> extraMetadata) {
        HoodieReplaceCommitMetadata commitMetadata = commitActiontype.equals("clustering") ? new HoodieReplaceCommitMetadata() : new HoodieCommitMetadata();
        commitMetadata.setOperationType(writeOperationType);
        extraMetadata.forEach((arg_0, arg_1) -> HoodieDeltaStreamerTestBase.lambda$addCommitToTimeline$0((HoodieCommitMetadata)commitMetadata, arg_0, arg_1));
        String commitTime = metaClient.createNewInstantTime();
        metaClient.getActiveTimeline().createNewInstant(HoodieTestUtils.INSTANT_GENERATOR.createNewInstant(HoodieInstant.State.REQUESTED, commitActiontype, commitTime));
        HoodieInstant inflightInstant = HoodieTestUtils.INSTANT_GENERATOR.createNewInstant(HoodieInstant.State.INFLIGHT, commitActiontype, commitTime);
        metaClient.getActiveTimeline().createNewInstant(inflightInstant);
        if (commitActiontype.equals("clustering")) {
            metaClient.getActiveTimeline().transitionClusterInflightToComplete(true, inflightInstant, commitMetadata);
        } else {
            metaClient.getActiveTimeline().saveAsComplete(HoodieTestUtils.INSTANT_GENERATOR.createNewInstant(HoodieInstant.State.INFLIGHT, commitActiontype, commitTime), Option.of((Object)commitMetadata));
        }
    }

    void assertRecordCount(long expected, String tablePath, SQLContext sqlContext) {
        sqlContext.clearCache();
        long recordCount = sqlContext.read().options(this.hudiOpts).format("org.apache.hudi").load(tablePath).count();
        Assertions.assertEquals((long)expected, (long)recordCount);
    }

    void assertDistinctRecordCount(long expected, String tablePath, SQLContext sqlContext) {
        sqlContext.clearCache();
        long recordCount = sqlContext.read().options(this.hudiOpts).format("org.apache.hudi").load(tablePath).select("_hoodie_record_key", new String[0]).distinct().count();
        Assertions.assertEquals((long)expected, (long)recordCount);
    }

    List<Row> countsPerCommit(String tablePath, SQLContext sqlContext) {
        sqlContext.clearCache();
        List rows = sqlContext.read().options(this.hudiOpts).format("org.apache.hudi").load(tablePath).groupBy("_hoodie_commit_time", new String[0]).count().sort("_hoodie_commit_time", new String[0]).collectAsList();
        return rows;
    }

    void assertDistanceCount(long expected, String tablePath, SQLContext sqlContext) {
        sqlContext.clearCache();
        sqlContext.read().options(this.hudiOpts).format("org.apache.hudi").load(tablePath).registerTempTable("tmp_trips");
        long recordCount = sqlContext.sql("select * from tmp_trips where haversine_distance is not NULL").count();
        Assertions.assertEquals((long)expected, (long)recordCount);
    }

    void assertDistanceCountWithExactValue(long expected, String tablePath, SQLContext sqlContext) {
        sqlContext.clearCache();
        sqlContext.read().options(this.hudiOpts).format("org.apache.hudi").load(tablePath).registerTempTable("tmp_trips");
        long recordCount = sqlContext.sql("select * from tmp_trips where haversine_distance = 1.0").count();
        Assertions.assertEquals((long)expected, (long)recordCount);
    }

    Map<String, Long> getPartitionRecordCount(String basePath, SQLContext sqlContext) {
        sqlContext.clearCache();
        List rows = sqlContext.read().options(this.hudiOpts).format("org.apache.hudi").load(basePath).groupBy(HoodieRecord.PARTITION_PATH_METADATA_FIELD, new String[0]).count().collectAsList();
        HashMap<String, Long> partitionRecordCount = new HashMap<String, Long>();
        rows.stream().forEach(row -> partitionRecordCount.put(row.getString(0), row.getLong(1)));
        return partitionRecordCount;
    }

    void assertNoPartitionMatch(String basePath, SQLContext sqlContext, String partitionToValidate) {
        sqlContext.clearCache();
        Assertions.assertEquals((long)0L, (long)sqlContext.read().options(this.hudiOpts).format("org.apache.hudi").load(basePath).filter(HoodieRecord.PARTITION_PATH_METADATA_FIELD + " = " + partitionToValidate).count());
    }

    private static /* synthetic */ void lambda$addCommitToTimeline$0(HoodieCommitMetadata commitMetadata, String k, String v) {
        commitMetadata.getExtraMetadata().put(k, v);
    }

    static {
        defaultSchemaProviderClassName = FilebasedSchemaProvider.class.getName();
        testNum = 1;
    }

    public static class TestHelpers {
        static HoodieDeltaStreamer.Config makeDropAllConfig(String basePath, WriteOperationType op) {
            return TestHelpers.makeConfig(basePath, op, Collections.singletonList(TestHoodieDeltaStreamer.DropAllTransformer.class.getName()));
        }

        static HoodieDeltaStreamer.Config makeConfig(String basePath, WriteOperationType op) {
            return TestHelpers.makeConfig(basePath, op, Collections.singletonList(TestHoodieDeltaStreamer.TripsWithDistanceTransformer.class.getName()));
        }

        static HoodieDeltaStreamer.Config makeConfig(String basePath, WriteOperationType op, List<String> transformerClassNames) {
            return TestHelpers.makeConfig(basePath, op, transformerClassNames, HoodieDeltaStreamerTestBase.PROPS_FILENAME_TEST_SOURCE, false);
        }

        static HoodieDeltaStreamer.Config makeConfig(String basePath, WriteOperationType op, List<String> transformerClassNames, String propsFilename, boolean enableHiveSync) {
            return TestHelpers.makeConfig(basePath, op, transformerClassNames, propsFilename, enableHiveSync, true, true, OverwriteWithLatestAvroPayload.class.getName(), null);
        }

        static HoodieDeltaStreamer.Config makeConfig(String basePath, WriteOperationType op, List<String> transformerClassNames, String propsFilename, boolean enableHiveSync, boolean useSchemaProviderClass, boolean updatePayloadClass, String payloadClassName, String tableType) {
            return TestHelpers.makeConfig(basePath, op, TestDataSource.class.getName(), transformerClassNames, propsFilename, enableHiveSync, useSchemaProviderClass, 1000, updatePayloadClass, payloadClassName, tableType, "timestamp", null);
        }

        public static HoodieDeltaStreamer.Config makeConfig(String basePath, WriteOperationType op, String sourceClassName, List<String> transformerClassNames, String propsFilename, boolean enableHiveSync, boolean useSchemaProviderClass, int sourceLimit, boolean updatePayloadClass, String payloadClassName, String tableType, String sourceOrderingField, String checkpoint) {
            return TestHelpers.makeConfig(basePath, op, sourceClassName, transformerClassNames, propsFilename, enableHiveSync, useSchemaProviderClass, sourceLimit, updatePayloadClass, payloadClassName, tableType, sourceOrderingField, checkpoint, false);
        }

        static HoodieDeltaStreamer.Config makeConfig(String basePath, WriteOperationType op, String sourceClassName, List<String> transformerClassNames, String propsFilename, boolean enableHiveSync, boolean useSchemaProviderClass, int sourceLimit, boolean updatePayloadClass, String payloadClassName, String tableType, String sourceOrderingField, String checkpoint, boolean allowCommitOnNoCheckpointChange) {
            HoodieDeltaStreamer.Config cfg = new HoodieDeltaStreamer.Config();
            cfg.targetBasePath = basePath;
            cfg.targetTableName = "hoodie_trips";
            cfg.tableType = tableType == null ? HoodieDeltaStreamerTestBase.TABLE_TYPE_VALUE : tableType;
            cfg.sourceClassName = sourceClassName;
            cfg.transformerClassNames = transformerClassNames;
            cfg.operation = op;
            cfg.enableHiveSync = enableHiveSync;
            cfg.sourceOrderingField = sourceOrderingField;
            cfg.propsFilePath = basePath + "/" + propsFilename;
            cfg.sourceLimit = sourceLimit;
            cfg.checkpoint = checkpoint;
            if (updatePayloadClass) {
                cfg.payloadClassName = payloadClassName;
            }
            if (useSchemaProviderClass) {
                cfg.schemaProviderClassName = defaultSchemaProviderClassName;
            }
            cfg.allowCommitOnNoCheckpointChange = allowCommitOnNoCheckpointChange;
            Triple mergeCfgs = HoodieTableConfig.inferCorrectMergingBehavior((RecordMergeMode)cfg.recordMergeMode, (String)cfg.payloadClassName, (String)cfg.recordMergeStrategyId, (String)cfg.sourceOrderingField, (HoodieTableVersion)HoodieTableVersion.current());
            cfg.recordMergeMode = (RecordMergeMode)mergeCfgs.getLeft();
            cfg.payloadClassName = (String)mergeCfgs.getMiddle();
            cfg.recordMergeStrategyId = (String)mergeCfgs.getRight();
            return cfg;
        }

        static HoodieDeltaStreamer.Config makeConfigForHudiIncrSrc(String srcBasePath, String basePath, WriteOperationType op, boolean addReadLatestOnMissingCkpt, String schemaProviderClassName) {
            HoodieDeltaStreamer.Config cfg = new HoodieDeltaStreamer.Config();
            cfg.targetBasePath = basePath;
            cfg.targetTableName = "hoodie_trips_copy";
            cfg.tableType = HoodieDeltaStreamerTestBase.TABLE_TYPE_VALUE;
            cfg.sourceClassName = HoodieIncrSource.class.getName();
            cfg.operation = op;
            cfg.sourceOrderingField = "timestamp";
            cfg.propsFilePath = basePath + "/test-downstream-source.properties";
            cfg.sourceLimit = 1000L;
            if (null != schemaProviderClassName) {
                cfg.schemaProviderClassName = schemaProviderClassName;
            }
            ArrayList<String> cfgs = new ArrayList<String>();
            cfgs.add(HoodieCommonConfig.SET_NULL_FOR_MISSING_COLUMNS.key() + "=true");
            cfgs.add("hoodie.streamer.source.hoodieincr.read_latest_on_missing_ckpt=" + addReadLatestOnMissingCkpt);
            cfgs.add("hoodie.streamer.source.hoodieincr.path=" + srcBasePath);
            cfgs.add("hoodie.streamer.source.hoodieincr.partition.fields=datestr");
            cfg.configs = cfgs;
            return cfg;
        }

        static void assertAtleastNCompactionCommits(int minExpected, String tablePath) {
            HoodieTableMetaClient meta = HoodieTestUtils.createMetaClient((HoodieStorage)storage, (String)tablePath);
            HoodieTimeline timeline = meta.getActiveTimeline().getCommitAndReplaceTimeline().filterCompletedInstants();
            LOG.info("Timeline Instants=" + meta.getActiveTimeline().getInstants());
            int numCompactionCommits = timeline.countInstants();
            Assertions.assertTrue((minExpected <= numCompactionCommits ? 1 : 0) != 0, (String)("Got=" + numCompactionCommits + ", exp >=" + minExpected));
        }

        static void assertAtleastNDeltaCommits(int minExpected, String tablePath) {
            HoodieTableMetaClient meta = HoodieTestUtils.createMetaClient((StorageConfiguration)storage.getConf(), (String)tablePath);
            HoodieTimeline timeline = meta.getActiveTimeline().getDeltaCommitTimeline().filterCompletedInstants();
            LOG.info("Timeline Instants=" + meta.getActiveTimeline().getInstants());
            int numDeltaCommits = timeline.countInstants();
            Assertions.assertTrue((minExpected <= numDeltaCommits ? 1 : 0) != 0, (String)("Got=" + numDeltaCommits + ", exp >=" + minExpected));
        }

        static void assertAtleastNCompactionCommitsAfterCommit(int minExpected, String lastSuccessfulCommit, String tablePath) {
            HoodieTableMetaClient meta = HoodieTestUtils.createMetaClient((StorageConfiguration)storage.getConf(), (String)tablePath);
            HoodieTimeline timeline = meta.getActiveTimeline().getCommitAndReplaceTimeline().findInstantsAfter(lastSuccessfulCommit).filterCompletedInstants();
            LOG.info("Timeline Instants=" + meta.getActiveTimeline().getInstants());
            int numCompactionCommits = timeline.countInstants();
            Assertions.assertTrue((minExpected <= numCompactionCommits ? 1 : 0) != 0, (String)("Got=" + numCompactionCommits + ", exp >=" + minExpected));
        }

        static void assertAtleastNDeltaCommitsAfterCommit(int minExpected, String lastSuccessfulCommit, String tablePath) {
            HoodieTableMetaClient meta = HoodieTestUtils.createMetaClient((StorageConfiguration)storage.getConf(), (String)tablePath);
            HoodieTimeline timeline = meta.reloadActiveTimeline().getDeltaCommitTimeline().findInstantsAfter(lastSuccessfulCommit).filterCompletedInstants();
            LOG.info("Timeline Instants=" + meta.getActiveTimeline().getInstants());
            int numDeltaCommits = timeline.countInstants();
            Assertions.assertTrue((minExpected <= numDeltaCommits ? 1 : 0) != 0, (String)("Got=" + numDeltaCommits + ", exp >=" + minExpected));
        }

        static HoodieInstant assertCommitMetadata(String expected, String tablePath, int totalCommits) throws IOException {
            HoodieTableMetaClient meta = HoodieTestUtils.createMetaClient((StorageConfiguration)storage.getConf(), (String)tablePath);
            HoodieTimeline timeline = meta.getActiveTimeline().getCommitsTimeline().filterCompletedInstants();
            HoodieInstant lastInstant = (HoodieInstant)timeline.lastInstant().get();
            HoodieCommitMetadata commitMetadata = timeline.readCommitMetadata(lastInstant);
            Assertions.assertEquals((int)totalCommits, (int)timeline.countInstants());
            if (meta.getTableConfig().getTableVersion() == HoodieTableVersion.EIGHT) {
                Assertions.assertEquals((Object)expected, (Object)commitMetadata.getMetadata("streamer.checkpoint.key.v2"));
            } else {
                Assertions.assertEquals((Object)expected, (Object)commitMetadata.getMetadata("deltastreamer.checkpoint.key"));
            }
            return lastInstant;
        }

        static void waitTillCondition(Function<Boolean, Boolean> condition, Future dsFuture, long timeoutInSecs) throws Exception {
            Future<Boolean> res = Executors.newSingleThreadExecutor().submit(() -> {
                boolean ret = false;
                while (!ret && !dsFuture.isDone()) {
                    try {
                        Thread.sleep(2000L);
                        ret = (Boolean)condition.apply(true);
                        LOG.info("Condition completed successfully");
                    }
                    catch (Throwable error) {
                        LOG.debug("Got error waiting for condition", error);
                        ret = false;
                    }
                }
                return ret;
            });
            res.get(timeoutInSecs, TimeUnit.SECONDS);
        }

        static void waitFor(BooleanSupplier booleanSupplier) {
            while (!booleanSupplier.getAsBoolean()) {
                try {
                    Thread.sleep(5L);
                }
                catch (Throwable error) {
                    LOG.debug("Got error waiting for condition", error);
                }
            }
        }

        static void assertAtLeastNCommits(int minExpected, String tablePath) {
            HoodieTableMetaClient meta = HoodieTestUtils.createMetaClient((StorageConfiguration)storage.getConf(), (String)tablePath);
            HoodieTimeline timeline = meta.getActiveTimeline().filterCompletedInstants();
            LOG.info("Timeline Instants=" + meta.getActiveTimeline().getInstants());
            int numDeltaCommits = timeline.countInstants();
            Assertions.assertTrue((minExpected <= numDeltaCommits ? 1 : 0) != 0, (String)("Got=" + numDeltaCommits + ", exp >=" + minExpected));
        }

        static void assertAtLeastNReplaceCommits(int minExpected, String tablePath) {
            HoodieTableMetaClient meta = HoodieTestUtils.createMetaClient((StorageConfiguration)storage.getConf(), (String)tablePath);
            HoodieTimeline timeline = meta.getActiveTimeline().getCompletedReplaceTimeline();
            LOG.info("Timeline Instants=" + meta.getActiveTimeline().getInstants());
            int numDeltaCommits = timeline.countInstants();
            Assertions.assertTrue((minExpected <= numDeltaCommits ? 1 : 0) != 0, (String)("Got=" + numDeltaCommits + ", exp >=" + minExpected));
        }

        static void assertPendingIndexCommit(String tablePath) {
            HoodieTableMetaClient meta = HoodieTestUtils.createMetaClient((StorageConfiguration)storage.getConf(), (String)tablePath);
            HoodieTimeline timeline = meta.reloadActiveTimeline().getAllCommitsTimeline().filterPendingIndexTimeline();
            LOG.info("Timeline Instants=" + meta.getActiveTimeline().getInstants());
            int numIndexCommits = timeline.countInstants();
            Assertions.assertEquals((int)1, (int)numIndexCommits, (String)("Got=" + numIndexCommits + ", exp=1"));
        }

        static void assertCompletedIndexCommit(String tablePath) {
            HoodieTableMetaClient meta = HoodieTestUtils.createMetaClient((StorageConfiguration)storage.getConf(), (String)tablePath);
            HoodieTimeline timeline = meta.reloadActiveTimeline().getAllCommitsTimeline().filterCompletedIndexTimeline();
            LOG.info("Timeline Instants=" + meta.getActiveTimeline().getInstants());
            int numIndexCommits = timeline.countInstants();
            Assertions.assertEquals((int)1, (int)numIndexCommits, (String)("Got=" + numIndexCommits + ", exp=1"));
        }

        static void assertNoReplaceCommits(String tablePath) {
            HoodieTableMetaClient meta = HoodieTestUtils.createMetaClient((StorageConfiguration)storage.getConf(), (String)tablePath);
            HoodieTimeline timeline = meta.getActiveTimeline().getCompletedReplaceTimeline();
            LOG.info("Timeline Instants=" + meta.getActiveTimeline().getInstants());
            int numDeltaCommits = timeline.countInstants();
            Assertions.assertEquals((int)0, (int)numDeltaCommits, (String)("Got=" + numDeltaCommits + ", exp =" + 0));
        }

        static void assertAtLeastNClusterRequests(int minExpected, String tablePath) {
            HoodieTableMetaClient meta = HoodieTestUtils.createMetaClient((StorageConfiguration)storage.getConf(), (String)tablePath);
            HoodieTimeline timeline = meta.getActiveTimeline().filterPendingClusteringTimeline();
            LOG.info("Timeline Instants=" + meta.getActiveTimeline().getInstants());
            int numDeltaCommits = timeline.countInstants();
            Assertions.assertTrue((minExpected <= numDeltaCommits ? 1 : 0) != 0, (String)("Got=" + numDeltaCommits + ", exp >=" + minExpected));
        }

        static void assertAtLeastNCommitsAfterRollback(int minExpectedRollback, int minExpectedCommits, String tablePath) {
            HoodieTableMetaClient meta = HoodieTestUtils.createMetaClient((StorageConfiguration)storage.getConf(), (String)tablePath);
            HoodieTimeline timeline = meta.getActiveTimeline().getRollbackTimeline().filterCompletedInstants();
            LOG.info("Rollback Timeline Instants=" + meta.getActiveTimeline().getInstants());
            int numRollbackCommits = timeline.countInstants();
            Assertions.assertTrue((minExpectedRollback <= numRollbackCommits ? 1 : 0) != 0, (String)("Got=" + numRollbackCommits + ", exp >=" + minExpectedRollback));
            HoodieInstant firstRollback = (HoodieInstant)timeline.getInstants().get(0);
            HoodieTimeline commitsTimeline = meta.getActiveTimeline().filterCompletedInstants().filter(instant -> InstantComparison.compareTimestamps((String)instant.requestedTime(), (BiPredicate)InstantComparison.GREATER_THAN, (String)firstRollback.requestedTime()));
            int numCommits = commitsTimeline.countInstants();
            Assertions.assertTrue((minExpectedCommits <= numCommits ? 1 : 0) != 0, (String)("Got=" + numCommits + ", exp >=" + minExpectedCommits));
        }
    }
}

