/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.functional;

import java.io.IOException;
import java.io.Serializable;
import java.nio.file.Path;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Random;
import java.util.Spliterators;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.StreamSupport;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hudi.DataSourceWriteOptions;
import org.apache.hudi.avro.model.HoodieFileStatus;
import org.apache.hudi.avro.model.HoodiePath;
import org.apache.hudi.client.SparkRDDWriteClient;
import org.apache.hudi.client.bootstrap.BootstrapMode;
import org.apache.hudi.client.bootstrap.FullRecordBootstrapDataProvider;
import org.apache.hudi.client.bootstrap.selector.BootstrapModeSelector;
import org.apache.hudi.client.bootstrap.selector.FullRecordBootstrapModeSelector;
import org.apache.hudi.client.bootstrap.selector.MetadataOnlyBootstrapModeSelector;
import org.apache.hudi.client.common.HoodieSparkEngineContext;
import org.apache.hudi.common.bootstrap.index.BootstrapIndex;
import org.apache.hudi.common.config.HoodieMetadataConfig;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.HoodieAvroRecord;
import org.apache.hudi.common.model.HoodieFileFormat;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.InstantFileNameGenerator;
import org.apache.hudi.common.table.timeline.TimelineUtils;
import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
import org.apache.hudi.common.testutils.HoodieTestUtils;
import org.apache.hudi.common.testutils.RawTripTestPayload;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ParquetReaderIterator;
import org.apache.hudi.common.util.PartitionPathEncodeUtils;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieBootstrapConfig;
import org.apache.hudi.config.HoodieCompactionConfig;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.hadoop.HoodieParquetInputFormat;
import org.apache.hudi.hadoop.fs.HadoopFSUtils;
import org.apache.hudi.hadoop.realtime.HoodieParquetRealtimeInputFormat;
import org.apache.hudi.index.HoodieIndex;
import org.apache.hudi.io.hadoop.HoodieAvroParquetReader;
import org.apache.hudi.keygen.NonpartitionedKeyGenerator;
import org.apache.hudi.keygen.SimpleKeyGenerator;
import org.apache.hudi.storage.HoodieStorage;
import org.apache.hudi.storage.StorageConfiguration;
import org.apache.hudi.storage.StoragePath;
import org.apache.hudi.table.action.bootstrap.BootstrapUtils;
import org.apache.hudi.testutils.HoodieMergeOnReadTestUtils;
import org.apache.hudi.testutils.HoodieSparkClientTestBase;
import org.apache.parquet.avro.AvroParquetReader;
import org.apache.parquet.avro.AvroReadSupport;
import org.apache.parquet.avro.AvroSchemaConverter;
import org.apache.parquet.hadoop.ParquetFileReader;
import org.apache.parquet.schema.MessageType;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.Column;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.api.java.UDF1;
import org.apache.spark.sql.functions;
import org.apache.spark.sql.types.DataTypes;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;

@Disabled(value="HUDI-7353")
@Tag(value="functional")
public class TestBootstrap
extends HoodieSparkClientTestBase {
    public static final String TRIP_HIVE_COLUMN_TYPES = "bigint,string,string,string,string,double,double,double,double,struct<amount:double,currency:string>,array<struct<amount:double,currency:string>>,boolean";
    @TempDir
    public Path tmpFolder;
    protected String bootstrapBasePath = null;
    private HoodieParquetInputFormat roInputFormat;
    private JobConf roJobConf;
    private HoodieParquetRealtimeInputFormat rtInputFormat;
    private JobConf rtJobConf;

    @BeforeEach
    public void setUp() throws Exception {
        this.bootstrapBasePath = this.tmpFolder.toAbsolutePath() + "/data";
        this.initPath();
        this.initSparkContexts();
        this.initTestDataGenerator();
        this.initMetaClient();
        this.reloadInputFormats();
    }

    @AfterEach
    public void tearDown() throws IOException {
        this.cleanupSparkContexts();
        this.cleanupClients();
        this.cleanupTestDataGenerator();
    }

    private void reloadInputFormats() {
        this.roInputFormat = new HoodieParquetInputFormat();
        this.roJobConf = new JobConf(this.jsc.hadoopConfiguration());
        this.roInputFormat.setConf((Configuration)this.roJobConf);
        this.rtInputFormat = new HoodieParquetRealtimeInputFormat();
        this.rtJobConf = new JobConf(this.jsc.hadoopConfiguration());
        this.rtInputFormat.setConf((Configuration)this.rtJobConf);
    }

    public Schema generateNewDataSetAndReturnSchema(long timestamp, int numRecords, List<String> partitionPaths, String srcPath) throws Exception {
        boolean isPartitioned = partitionPaths != null && !partitionPaths.isEmpty();
        Dataset<Row> df = TestBootstrap.generateTestRawTripDataset(timestamp, 0, numRecords, partitionPaths, this.jsc, this.sqlContext);
        df.printSchema();
        if (isPartitioned) {
            df.write().partitionBy(new String[]{"datestr"}).format("parquet").mode(SaveMode.Overwrite).save(srcPath);
        } else {
            df.write().format("parquet").mode(SaveMode.Overwrite).save(srcPath);
        }
        String filePath = HadoopFSUtils.toPath((HoodiePath)((HoodieFileStatus)((Optional)BootstrapUtils.getAllLeafFoldersWithFiles((HoodieFileFormat)this.getConfig().getBaseFileFormat(), (HoodieStorage)this.metaClient.getStorage(), (String)srcPath, (HoodieEngineContext)this.context).stream().findAny().map(p -> ((List)p.getValue()).stream().findAny()).orElse(null)).get()).getPath()).toString();
        HoodieAvroParquetReader parquetReader = new HoodieAvroParquetReader(this.metaClient.getStorage(), new StoragePath(filePath));
        return parquetReader.getSchema();
    }

    @Test
    public void testMetadataBootstrapNonpartitionedCOW() throws Exception {
        this.testBootstrapCommon(false, false, EffectiveMode.METADATA_BOOTSTRAP_MODE);
    }

    @Test
    public void testMetadataBootstrapWithUpdatesCOW() throws Exception {
        this.testBootstrapCommon(true, false, EffectiveMode.METADATA_BOOTSTRAP_MODE);
    }

    private void testBootstrapCommon(boolean partitioned, boolean deltaCommit, EffectiveMode mode) throws Exception {
        this.testBootstrapCommon(partitioned, deltaCommit, mode, BootstrapMode.METADATA_ONLY);
    }

    private void testBootstrapCommon(boolean partitioned, boolean deltaCommit, EffectiveMode mode, BootstrapMode modeForRegexMatch) throws Exception {
        List<String> bootstrapInstants;
        int numInstantsAfterBootstrap;
        boolean isBootstrapIndexCreated;
        boolean checkNumRawFiles;
        String bootstrapCommitInstantTs;
        String bootstrapModeSelectorClass;
        String keyGeneratorClass = partitioned ? SimpleKeyGenerator.class.getCanonicalName() : NonpartitionedKeyGenerator.class.getCanonicalName();
        this.metaClient = deltaCommit ? HoodieTestUtils.init((String)this.basePath, (HoodieTableType)HoodieTableType.MERGE_ON_READ, (String)this.bootstrapBasePath, (boolean)true, (String)keyGeneratorClass, (String)"partition_path") : HoodieTestUtils.init((String)this.basePath, (HoodieTableType)HoodieTableType.COPY_ON_WRITE, (String)this.bootstrapBasePath, (boolean)true, (String)keyGeneratorClass, (String)"partition_path");
        int totalRecords = 100;
        switch (mode) {
            case FULL_BOOTSTRAP_MODE: {
                bootstrapModeSelectorClass = FullRecordBootstrapModeSelector.class.getCanonicalName();
                bootstrapCommitInstantTs = "00000000000002";
                checkNumRawFiles = false;
                isBootstrapIndexCreated = false;
                numInstantsAfterBootstrap = 1;
                bootstrapInstants = Arrays.asList(bootstrapCommitInstantTs);
                break;
            }
            case METADATA_BOOTSTRAP_MODE: {
                bootstrapModeSelectorClass = MetadataOnlyBootstrapModeSelector.class.getCanonicalName();
                bootstrapCommitInstantTs = "00000000000001";
                checkNumRawFiles = true;
                isBootstrapIndexCreated = true;
                numInstantsAfterBootstrap = 1;
                bootstrapInstants = Arrays.asList(bootstrapCommitInstantTs);
                break;
            }
            default: {
                bootstrapModeSelectorClass = TestRandomBootstrapModeSelector.class.getName();
                bootstrapCommitInstantTs = "00000000000002";
                checkNumRawFiles = false;
                isBootstrapIndexCreated = true;
                numInstantsAfterBootstrap = 2;
                bootstrapInstants = Arrays.asList("00000000000001", "00000000000002");
            }
        }
        List<String> partitions = partitioned ? Arrays.asList("2020/04/01", "2020/04/02", "2020/04/03") : Collections.EMPTY_LIST;
        long timestamp = Instant.now().toEpochMilli();
        Schema schema = this.generateNewDataSetAndReturnSchema(timestamp, totalRecords, partitions, this.bootstrapBasePath);
        HoodieWriteConfig config = this.getConfigBuilder(schema.toString()).withPreCombineField("timestamp").withAutoCommit(true).withSchema(schema.toString()).withKeyGenerator(keyGeneratorClass).withCompactionConfig(HoodieCompactionConfig.newBuilder().withMaxNumDeltaCommitsBeforeCompaction(1).build()).withBootstrapConfig(HoodieBootstrapConfig.newBuilder().withBootstrapBasePath(this.bootstrapBasePath).withFullBootstrapInputProvider(TestFullBootstrapDataProvider.class.getName()).withBootstrapParallelism(3).withBootstrapModeSelector(bootstrapModeSelectorClass).withBootstrapModeForRegexMatch(modeForRegexMatch).build()).withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(true).withMaxNumDeltaCommitsBeforeCompaction(3).withMetadataIndexColumnStats(false).build()).build();
        SparkRDDWriteClient client = new SparkRDDWriteClient((HoodieEngineContext)this.context, config);
        client.bootstrap(Option.empty());
        this.checkBootstrapResults(totalRecords, schema, bootstrapCommitInstantTs, checkNumRawFiles, numInstantsAfterBootstrap, numInstantsAfterBootstrap, timestamp, timestamp, deltaCommit, bootstrapInstants, true);
        this.metaClient.getActiveTimeline().reload().getInstantsAsStream().filter(s -> s.equals((Object)HoodieTestUtils.INSTANT_GENERATOR.createNewInstant(HoodieInstant.State.COMPLETED, deltaCommit ? "deltacommit" : "commit", bootstrapCommitInstantTs))).forEach(instant -> TimelineUtils.deleteInstantFile((HoodieStorage)this.metaClient.getStorage(), (StoragePath)this.metaClient.getTimelinePath(), (HoodieInstant)instant, (InstantFileNameGenerator)HoodieTestUtils.INSTANT_FILE_NAME_GENERATOR));
        this.metaClient.reloadActiveTimeline();
        client.getTableServiceClient().rollbackFailedBootstrap();
        this.metaClient.reloadActiveTimeline();
        Assertions.assertEquals((int)0, (int)this.metaClient.getCommitsTimeline().countInstants());
        Assertions.assertEquals((long)0L, (long)BootstrapUtils.getAllLeafFoldersWithFiles((HoodieFileFormat)config.getBaseFileFormat(), (HoodieStorage)this.metaClient.getStorage(), (String)this.basePath, (HoodieEngineContext)this.context).stream().mapToLong(f -> ((List)f.getValue()).size()).sum());
        BootstrapIndex index = BootstrapIndex.getBootstrapIndex((HoodieTableMetaClient)this.metaClient);
        Assertions.assertFalse((boolean)index.useIndex());
        client.close();
        client = new SparkRDDWriteClient((HoodieEngineContext)this.context, config);
        client.bootstrap(Option.empty());
        this.metaClient.reloadActiveTimeline();
        index = BootstrapIndex.getBootstrapIndex((HoodieTableMetaClient)this.metaClient);
        if (isBootstrapIndexCreated) {
            Assertions.assertTrue((boolean)index.useIndex());
        } else {
            Assertions.assertFalse((boolean)index.useIndex());
        }
        this.checkBootstrapResults(totalRecords, schema, bootstrapCommitInstantTs, checkNumRawFiles, numInstantsAfterBootstrap, numInstantsAfterBootstrap, timestamp, timestamp, deltaCommit, bootstrapInstants, true);
        long updateTimestamp = Instant.now().toEpochMilli();
        String updateSPath = this.tmpFolder.toAbsolutePath() + "/data2";
        this.generateNewDataSetAndReturnSchema(updateTimestamp, totalRecords, partitions, updateSPath);
        JavaRDD<HoodieRecord> updateBatch = TestBootstrap.generateInputBatch(this.jsc, BootstrapUtils.getAllLeafFoldersWithFiles((HoodieFileFormat)config.getBaseFileFormat(), (HoodieStorage)this.metaClient.getStorage(), (String)updateSPath, (HoodieEngineContext)this.context), schema);
        String newInstantTs = client.startCommit();
        client.upsert(updateBatch, newInstantTs);
        this.checkBootstrapResults(totalRecords, schema, newInstantTs, false, numInstantsAfterBootstrap + 1, updateTimestamp, deltaCommit ? timestamp : updateTimestamp, deltaCommit, true);
        if (deltaCommit) {
            Option compactionInstant = client.scheduleCompaction(Option.empty());
            Assertions.assertTrue((boolean)compactionInstant.isPresent());
            client.compact((String)compactionInstant.get());
            this.checkBootstrapResults(totalRecords, schema, (String)compactionInstant.get(), checkNumRawFiles, numInstantsAfterBootstrap + 2, 2, updateTimestamp, updateTimestamp, !deltaCommit, Arrays.asList((String)compactionInstant.get()), false);
        }
        client.close();
    }

    @Test
    public void testMetadataBootstrapWithUpdatesMOR() throws Exception {
        this.testBootstrapCommon(true, true, EffectiveMode.METADATA_BOOTSTRAP_MODE);
    }

    @Test
    public void testFullBootstrapOnlyCOW() throws Exception {
        this.testBootstrapCommon(true, false, EffectiveMode.FULL_BOOTSTRAP_MODE);
    }

    @Test
    public void testFullBootstrapWithUpdatesMOR() throws Exception {
        this.testBootstrapCommon(true, true, EffectiveMode.FULL_BOOTSTRAP_MODE);
    }

    @Test
    public void testFullBootstrapWithRegexModeWithOnlyCOW() throws Exception {
        this.testBootstrapCommon(true, false, EffectiveMode.FULL_BOOTSTRAP_MODE, BootstrapMode.FULL_RECORD);
    }

    @Test
    public void testFullBootstrapWithRegexModeWithUpdatesMOR() throws Exception {
        this.testBootstrapCommon(true, true, EffectiveMode.FULL_BOOTSTRAP_MODE, BootstrapMode.FULL_RECORD);
    }

    @Test
    public void testMetaAndFullBootstrapCOW() throws Exception {
        this.testBootstrapCommon(true, false, EffectiveMode.MIXED_BOOTSTRAP_MODE);
    }

    @Test
    public void testMetadataAndFullBootstrapWithUpdatesMOR() throws Exception {
        this.testBootstrapCommon(true, true, EffectiveMode.MIXED_BOOTSTRAP_MODE);
    }

    private void checkBootstrapResults(int totalRecords, Schema schema, String maxInstant, boolean checkNumRawFiles, int expNumInstants, long expTimestamp, long expROTimestamp, boolean isDeltaCommit, boolean validateRecordsForCommitTime) throws Exception {
        this.checkBootstrapResults(totalRecords, schema, maxInstant, checkNumRawFiles, expNumInstants, expNumInstants, expTimestamp, expROTimestamp, isDeltaCommit, Arrays.asList(maxInstant), validateRecordsForCommitTime);
    }

    private void checkBootstrapResults(int totalRecords, Schema schema, String instant, boolean checkNumRawFiles, int expNumInstants, int numVersions, long expTimestamp, long expROTimestamp, boolean isDeltaCommit, List<String> instantsWithValidRecords, boolean validateRecordsForCommitTime) throws Exception {
        this.metaClient.reloadActiveTimeline();
        Assertions.assertEquals((int)expNumInstants, (int)this.metaClient.getCommitsTimeline().filterCompletedInstants().countInstants());
        Assertions.assertEquals((Object)instant, (Object)((HoodieInstant)this.metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants().lastInstant().get()).requestedTime());
        this.verifyNoMarkerInTempFolder();
        Dataset bootstrapped = this.sqlContext.read().format("parquet").load(this.basePath);
        Dataset original = this.sqlContext.read().format("parquet").load(this.bootstrapBasePath);
        bootstrapped.registerTempTable("bootstrapped");
        original.registerTempTable("original");
        if (checkNumRawFiles) {
            List files = BootstrapUtils.getAllLeafFoldersWithFiles((HoodieFileFormat)this.getConfig().getBaseFileFormat(), (HoodieStorage)this.metaClient.getStorage(), (String)this.bootstrapBasePath, (HoodieEngineContext)this.context).stream().flatMap(x -> ((List)x.getValue()).stream()).collect(Collectors.toList());
            Assertions.assertEquals((long)(files.size() * numVersions), (long)this.sqlContext.sql("select distinct _hoodie_file_name from bootstrapped").count());
        }
        if (!isDeltaCommit) {
            String predicate = String.join((CharSequence)", ", instantsWithValidRecords.stream().map(p -> "\"" + p + "\"").collect(Collectors.toList()));
            if (validateRecordsForCommitTime) {
                Assertions.assertEquals((long)totalRecords, (long)this.sqlContext.sql("select * from bootstrapped where _hoodie_commit_time IN (" + predicate + ")").count());
            }
            Dataset missingOriginal = this.sqlContext.sql("select a._row_key from original a where a._row_key not in (select _hoodie_record_key from bootstrapped)");
            Assertions.assertEquals((long)0L, (long)missingOriginal.count());
            Dataset missingBootstrapped = this.sqlContext.sql("select a._hoodie_record_key from bootstrapped a where a._hoodie_record_key not in (select _row_key from original)");
            Assertions.assertEquals((long)0L, (long)missingBootstrapped.count());
        }
        this.reloadInputFormats();
        List records = HoodieMergeOnReadTestUtils.getRecordsUsingInputFormat((StorageConfiguration)HadoopFSUtils.getStorageConf((Configuration)this.jsc.hadoopConfiguration()), FSUtils.getAllPartitionPaths((HoodieEngineContext)this.context, (HoodieStorage)this.storage, (String)this.basePath, (boolean)false).stream().map(f -> this.basePath + "/" + f).collect(Collectors.toList()), (String)this.basePath, (JobConf)this.roJobConf, (boolean)false, (Schema)schema, (String)TRIP_HIVE_COLUMN_TYPES, (boolean)false, new ArrayList());
        Assertions.assertEquals((int)totalRecords, (int)records.size());
        HashSet<String> seenKeys = new HashSet<String>();
        for (GenericRecord r : records) {
            Assertions.assertEquals((Object)r.get("_row_key").toString(), (Object)r.get("_hoodie_record_key").toString(), (String)("Record :" + r));
            Assertions.assertEquals((double)expROTimestamp, (double)((LongWritable)r.get("timestamp")).get(), (double)0.1, (String)("Record :" + r));
            Assertions.assertFalse((boolean)seenKeys.contains(r.get("_hoodie_record_key").toString()));
            seenKeys.add(r.get("_hoodie_record_key").toString());
        }
        Assertions.assertEquals((int)totalRecords, (int)seenKeys.size());
        this.reloadInputFormats();
        seenKeys = new HashSet();
        records = HoodieMergeOnReadTestUtils.getRecordsUsingInputFormat((StorageConfiguration)HadoopFSUtils.getStorageConf((Configuration)this.jsc.hadoopConfiguration()), FSUtils.getAllPartitionPaths((HoodieEngineContext)this.context, (HoodieStorage)this.storage, (String)this.basePath, (boolean)true).stream().map(f -> this.basePath + "/" + f).collect(Collectors.toList()), (String)this.basePath, (JobConf)this.rtJobConf, (boolean)true, (Schema)schema, (String)TRIP_HIVE_COLUMN_TYPES, (boolean)false, new ArrayList());
        Assertions.assertEquals((int)totalRecords, (int)records.size());
        for (GenericRecord r : records) {
            Assertions.assertEquals((Object)r.get("_row_key").toString(), (Object)r.get("_hoodie_record_key").toString(), (String)("Realtime Record :" + r));
            Assertions.assertEquals((double)expTimestamp, (double)((LongWritable)r.get("timestamp")).get(), (double)0.1, (String)("Realtime Record :" + r));
            Assertions.assertFalse((boolean)seenKeys.contains(r.get("_hoodie_record_key").toString()));
            seenKeys.add(r.get("_hoodie_record_key").toString());
        }
        Assertions.assertEquals((int)totalRecords, (int)seenKeys.size());
        this.reloadInputFormats();
        records = HoodieMergeOnReadTestUtils.getRecordsUsingInputFormat((StorageConfiguration)HadoopFSUtils.getStorageConf((Configuration)this.jsc.hadoopConfiguration()), FSUtils.getAllPartitionPaths((HoodieEngineContext)this.context, (HoodieStorage)this.storage, (String)this.basePath, (boolean)true).stream().map(f -> this.basePath + "/" + f).collect(Collectors.toList()), (String)this.basePath, (JobConf)this.roJobConf, (boolean)false, (Schema)schema, (String)TRIP_HIVE_COLUMN_TYPES, (boolean)true, (List)HoodieRecord.HOODIE_META_COLUMNS);
        Assertions.assertEquals((int)totalRecords, (int)records.size());
        seenKeys = new HashSet();
        for (GenericRecord r : records) {
            Assertions.assertFalse((boolean)seenKeys.contains(r.get("_hoodie_record_key").toString()));
            seenKeys.add(r.get("_hoodie_record_key").toString());
        }
        Assertions.assertEquals((int)totalRecords, (int)seenKeys.size());
        this.reloadInputFormats();
        seenKeys = new HashSet();
        records = HoodieMergeOnReadTestUtils.getRecordsUsingInputFormat((StorageConfiguration)HadoopFSUtils.getStorageConf((Configuration)this.jsc.hadoopConfiguration()), FSUtils.getAllPartitionPaths((HoodieEngineContext)this.context, (HoodieStorage)this.storage, (String)this.basePath, (boolean)true).stream().map(f -> this.basePath + "/" + f).collect(Collectors.toList()), (String)this.basePath, (JobConf)this.rtJobConf, (boolean)true, (Schema)schema, (String)TRIP_HIVE_COLUMN_TYPES, (boolean)true, (List)HoodieRecord.HOODIE_META_COLUMNS);
        Assertions.assertEquals((int)totalRecords, (int)records.size());
        for (GenericRecord r : records) {
            Assertions.assertFalse((boolean)seenKeys.contains(r.get("_hoodie_record_key").toString()));
            seenKeys.add(r.get("_hoodie_record_key").toString());
        }
        Assertions.assertEquals((int)totalRecords, (int)seenKeys.size());
        this.reloadInputFormats();
        records = HoodieMergeOnReadTestUtils.getRecordsUsingInputFormat((StorageConfiguration)HadoopFSUtils.getStorageConf((Configuration)this.jsc.hadoopConfiguration()), FSUtils.getAllPartitionPaths((HoodieEngineContext)this.context, (HoodieStorage)this.storage, (String)this.basePath, (boolean)true).stream().map(f -> this.basePath + "/" + f).collect(Collectors.toList()), (String)this.basePath, (JobConf)this.roJobConf, (boolean)false, (Schema)schema, (String)TRIP_HIVE_COLUMN_TYPES, (boolean)true, Arrays.asList("_row_key"));
        Assertions.assertEquals((int)totalRecords, (int)records.size());
        seenKeys = new HashSet();
        for (GenericRecord r : records) {
            Assertions.assertFalse((boolean)seenKeys.contains(r.get("_row_key").toString()));
            seenKeys.add(r.get("_row_key").toString());
        }
        Assertions.assertEquals((int)totalRecords, (int)seenKeys.size());
        this.reloadInputFormats();
        seenKeys = new HashSet();
        records = HoodieMergeOnReadTestUtils.getRecordsUsingInputFormat((StorageConfiguration)HadoopFSUtils.getStorageConf((Configuration)this.jsc.hadoopConfiguration()), FSUtils.getAllPartitionPaths((HoodieEngineContext)this.context, (HoodieStorage)this.storage, (String)this.basePath, (boolean)true).stream().map(f -> this.basePath + "/" + f).collect(Collectors.toList()), (String)this.basePath, (JobConf)this.rtJobConf, (boolean)true, (Schema)schema, (String)TRIP_HIVE_COLUMN_TYPES, (boolean)true, Arrays.asList("_row_key"));
        Assertions.assertEquals((int)totalRecords, (int)records.size());
        for (GenericRecord r : records) {
            Assertions.assertFalse((boolean)seenKeys.contains(r.get("_row_key").toString()));
            seenKeys.add(r.get("_row_key").toString());
        }
        Assertions.assertEquals((int)totalRecords, (int)seenKeys.size());
    }

    private void verifyNoMarkerInTempFolder() throws IOException {
        String tempFolderPath = this.metaClient.getTempFolderPath();
        FileSystem fileSystem = HadoopFSUtils.getFs((String)tempFolderPath, (Configuration)this.jsc.hadoopConfiguration());
        Assertions.assertEquals((int)0, (int)fileSystem.listStatus(new org.apache.hadoop.fs.Path(tempFolderPath)).length);
    }

    private static JavaRDD<HoodieRecord> generateInputBatch(JavaSparkContext jsc, List<Pair<String, List<HoodieFileStatus>>> partitionPaths, Schema writerSchema) {
        List fullFilePathsWithPartition = partitionPaths.stream().flatMap(p -> ((List)p.getValue()).stream().map(x -> Pair.of((Object)p.getKey(), (Object)HadoopFSUtils.toPath((HoodiePath)x.getPath())))).collect(Collectors.toList());
        return jsc.parallelize(fullFilePathsWithPartition.stream().flatMap(p -> {
            try {
                Configuration conf = jsc.hadoopConfiguration();
                AvroReadSupport.setAvroReadSchema((Configuration)conf, (Schema)writerSchema);
                ParquetReaderIterator recIterator = new ParquetReaderIterator(AvroParquetReader.builder((org.apache.hadoop.fs.Path)((org.apache.hadoop.fs.Path)p.getValue())).withConf(conf).build());
                return StreamSupport.stream(Spliterators.spliteratorUnknownSize(recIterator, 0), false).map(gr -> {
                    try {
                        String key = gr.get("_row_key").toString();
                        String pPath = (String)p.getKey();
                        return new HoodieAvroRecord(new HoodieKey(key, pPath), (HoodieRecordPayload)new RawTripTestPayload(gr.toString(), key, pPath, "{\"type\": \"record\",\"name\": \"triprec\",\"fields\": [ {\"name\": \"timestamp\",\"type\": \"long\"},{\"name\": \"_row_key\", \"type\": \"string\"},{\"name\": \"partition_path\", \"type\": [\"null\", \"string\"], \"default\": null },{\"name\": \"trip_type\", \"type\": {\"type\": \"enum\", \"name\": \"TripType\", \"symbols\": [\"UNKNOWN\", \"UBERX\", \"BLACK\"], \"default\": \"UNKNOWN\"}},{\"name\": \"rider\", \"type\": \"string\"},{\"name\": \"driver\", \"type\": \"string\"},{\"name\": \"begin_lat\", \"type\": \"double\"},{\"name\": \"begin_lon\", \"type\": \"double\"},{\"name\": \"end_lat\", \"type\": \"double\"},{\"name\": \"end_lon\", \"type\": \"double\"},{\"name\": \"distance_in_meters\", \"type\": \"int\"},{\"name\": \"seconds_since_epoch\", \"type\": \"long\"},{\"name\": \"weight\", \"type\": \"float\"},{\"name\": \"nation\", \"type\": \"bytes\"},{\"name\":\"current_date\",\"type\": {\"type\": \"int\", \"logicalType\": \"date\"}},{\"name\":\"current_ts\",\"type\": {\"type\": \"long\"}},{\"name\":\"height\",\"type\":{\"type\":\"fixed\",\"name\":\"abc\",\"size\":5,\"logicalType\":\"decimal\",\"precision\":10,\"scale\":6}},{\"name\": \"city_to_state\", \"type\": {\"type\": \"map\", \"values\": \"string\"}},{\"name\": \"fare\",\"type\": {\"type\":\"record\", \"name\":\"fare\",\"fields\": [{\"name\": \"amount\",\"type\": \"double\"},{\"name\": \"currency\", \"type\": \"string\"}]}},{\"name\": \"tip_history\", \"default\": [], \"type\": {\"type\": \"array\", \"default\": [], \"items\": {\"type\": \"record\", \"default\": null, \"name\": \"tip_history\", \"fields\": [{\"name\": \"amount\", \"type\": \"double\"}, {\"name\": \"currency\", \"type\": \"string\"}]}}},{\"name\": \"_hoodie_is_deleted\", \"type\": \"boolean\", \"default\": false} ]}"));
                    }
                    catch (IOException e) {
                        throw new HoodieIOException(e.getMessage(), e);
                    }
                });
            }
            catch (IOException ioe) {
                throw new HoodieIOException(ioe.getMessage(), ioe);
            }
        }).collect(Collectors.toList()));
    }

    public HoodieWriteConfig.Builder getConfigBuilder(String schemaStr) {
        HoodieWriteConfig.Builder builder = this.getConfigBuilder(schemaStr, HoodieIndex.IndexType.BLOOM).withExternalSchemaTrasformation(true);
        TypedProperties properties = new TypedProperties();
        properties.setProperty(DataSourceWriteOptions.RECORDKEY_FIELD().key(), "_row_key");
        properties.setProperty(DataSourceWriteOptions.PARTITIONPATH_FIELD().key(), "datestr");
        builder = builder.withProps((Map)properties);
        return builder;
    }

    public static Dataset<Row> generateTestRawTripDataset(long timestamp, int from, int to, List<String> partitionPaths, JavaSparkContext jsc, SQLContext sqlContext) {
        boolean isPartitioned = partitionPaths != null && !partitionPaths.isEmpty();
        ArrayList records = new ArrayList();
        IntStream.range(from, to).forEach(i -> {
            String id = "" + i;
            records.add(new HoodieTestDataGenerator().generateGenericRecord("trip_" + id, Long.toString(timestamp), "rider_" + id, "driver_" + id, timestamp, false, false).toString());
        });
        if (isPartitioned) {
            sqlContext.udf().register("partgen", (UDF1 & Serializable)val -> PartitionPathEncodeUtils.escapePathName((String)((String)partitionPaths.get(Integer.parseInt(val.split("_")[1]) % partitionPaths.size()))), DataTypes.StringType);
        }
        JavaRDD rdd = jsc.parallelize(records);
        Dataset df = sqlContext.read().json(rdd);
        if (isPartitioned) {
            df = df.withColumn("datestr", functions.callUDF((String)"partgen", (Column[])new Column[]{new Column("_row_key")}));
            df = df.select("timestamp", new String[]{"_row_key", "partition_path", "rider", "driver", "begin_lat", "begin_lon", "end_lat", "end_lon", "fare", "tip_history", "_hoodie_is_deleted", "datestr"});
        } else {
            df = df.select("timestamp", new String[]{"_row_key", "partition_path", "rider", "driver", "begin_lat", "begin_lon", "end_lat", "end_lon", "fare", "tip_history", "_hoodie_is_deleted"});
        }
        return df;
    }

    public static class TestRandomBootstrapModeSelector
    extends BootstrapModeSelector {
        private int currIdx = new Random().nextInt(2);

        public TestRandomBootstrapModeSelector(HoodieWriteConfig writeConfig) {
            super(writeConfig);
        }

        public Map<BootstrapMode, List<String>> select(List<Pair<String, List<HoodieFileStatus>>> partitions) {
            ArrayList selections = new ArrayList();
            partitions.stream().forEach(p -> {
                BootstrapMode mode = this.currIdx == 0 ? BootstrapMode.METADATA_ONLY : BootstrapMode.FULL_RECORD;
                this.currIdx = (this.currIdx + 1) % 2;
                selections.add(Pair.of((Object)mode, (Object)p.getKey()));
            });
            return selections.stream().collect(Collectors.groupingBy(Pair::getKey, Collectors.mapping(Pair::getValue, Collectors.toList())));
        }
    }

    public static class TestFullBootstrapDataProvider
    extends FullRecordBootstrapDataProvider<JavaRDD<HoodieRecord>> {
        public TestFullBootstrapDataProvider(TypedProperties props, HoodieSparkEngineContext context) {
            super(props, (HoodieEngineContext)context);
        }

        public JavaRDD<HoodieRecord> generateInputRecords(String tableName, String sourceBasePath, List<Pair<String, List<HoodieFileStatus>>> partitionPaths, HoodieWriteConfig config) {
            String filePath = HadoopFSUtils.toPath((HoodiePath)((HoodieFileStatus)partitionPaths.stream().flatMap(p -> ((List)p.getValue()).stream()).findAny().get()).getPath()).toString();
            ParquetFileReader reader = null;
            JavaSparkContext jsc = HoodieSparkEngineContext.getSparkContext((HoodieEngineContext)this.context);
            try {
                reader = ParquetFileReader.open((Configuration)jsc.hadoopConfiguration(), (org.apache.hadoop.fs.Path)new org.apache.hadoop.fs.Path(filePath));
            }
            catch (IOException e) {
                throw new HoodieIOException(e.getMessage(), e);
            }
            MessageType parquetSchema = reader.getFooter().getFileMetaData().getSchema();
            Schema schema = new AvroSchemaConverter().convert(parquetSchema);
            return TestBootstrap.generateInputBatch(jsc, partitionPaths, schema);
        }
    }

    private static enum EffectiveMode {
        FULL_BOOTSTRAP_MODE,
        METADATA_BOOTSTRAP_MODE,
        MIXED_BOOTSTRAP_MODE;

    }
}

