/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.table;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.function.BiPredicate;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.avro.Schema;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hudi.client.HoodieReadClient;
import org.apache.hudi.client.SparkRDDReadClient;
import org.apache.hudi.client.SparkRDDWriteClient;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.config.HoodieMetadataConfig;
import org.apache.hudi.common.config.HoodieStorageConfig;
import org.apache.hudi.common.data.HoodieData;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.BaseFile;
import org.apache.hudi.common.model.FileSlice;
import org.apache.hudi.common.model.HoodieBaseFile;
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.HoodieFileFormat;
import org.apache.hudi.common.model.HoodieLogFile;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.model.HoodieWriteStat;
import org.apache.hudi.common.table.HoodieTableConfig;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.InstantComparison;
import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
import org.apache.hudi.common.table.view.TableFileSystemView;
import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
import org.apache.hudi.common.testutils.HoodieTestUtils;
import org.apache.hudi.common.testutils.RawTripTestPayload;
import org.apache.hudi.common.testutils.Transformations;
import org.apache.hudi.common.util.CollectionUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieClusteringConfig;
import org.apache.hudi.config.HoodieCompactionConfig;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.data.HoodieJavaRDD;
import org.apache.hudi.index.HoodieIndex;
import org.apache.hudi.metadata.HoodieTableMetadataWriter;
import org.apache.hudi.metadata.SparkHoodieBackedTableMetadataWriter;
import org.apache.hudi.storage.HoodieStorage;
import org.apache.hudi.storage.StorageConfiguration;
import org.apache.hudi.storage.StoragePath;
import org.apache.hudi.table.HoodieSparkMergeOnReadTable;
import org.apache.hudi.table.HoodieSparkTable;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.WorkloadProfile;
import org.apache.hudi.table.action.HoodieWriteMetadata;
import org.apache.hudi.table.action.deltacommit.BaseSparkDeltaCommitActionExecutor;
import org.apache.hudi.table.action.deltacommit.SparkDeleteDeltaCommitActionExecutor;
import org.apache.hudi.testutils.Assertions;
import org.apache.hudi.testutils.HoodieClientTestUtils;
import org.apache.hudi.testutils.HoodieMergeOnReadTestUtils;
import org.apache.hudi.testutils.HoodieSparkClientTestHarness;
import org.apache.hudi.testutils.HoodieSparkWriteableTestTable;
import org.apache.hudi.testutils.MetadataMergeWriteStatus;
import org.apache.hudi.testutils.SparkClientFunctionalTestHarness;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SQLContext;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.CsvSource;
import org.junit.jupiter.params.provider.ValueSource;

public class TestHoodieMergeOnReadTable
extends SparkClientFunctionalTestHarness {
    private HoodieTableMetaClient metaClient;
    private HoodieTestDataGenerator dataGen;

    void setUp(Properties props) throws IOException {
        Properties properties = CollectionUtils.copy((Properties)props);
        properties.setProperty(HoodieTableConfig.BASE_FILE_FORMAT.key(), ((HoodieFileFormat)HoodieTableConfig.BASE_FILE_FORMAT.defaultValue()).toString());
        this.metaClient = this.getHoodieMetaClient(HoodieTableType.MERGE_ON_READ, properties);
        this.dataGen = new HoodieTestDataGenerator();
    }

    @BeforeEach
    void beforeEach() {
        this.jsc().getPersistentRDDs().values().forEach(JavaRDD::unpersist);
    }

    @Test
    public void testMetadataAggregateFromWriteStatus() throws Exception {
        HoodieWriteConfig cfg = this.getConfigBuilder(false).withWriteStatusClass(MetadataMergeWriteStatus.class).build();
        this.setUp((Properties)cfg.getProps());
        try (SparkRDDWriteClient client = this.getHoodieWriteClient(cfg);){
            String newCommitTime = "001";
            List records = this.dataGen.generateInserts(newCommitTime, Integer.valueOf(200));
            JavaRDD writeRecords = this.jsc().parallelize(records, 1);
            client.startCommitWithTime(newCommitTime);
            List statuses = client.upsert(writeRecords, newCommitTime).collect();
            Assertions.assertNoWriteErrors((List)statuses);
            Map allWriteStatusMergedMetadataMap = MetadataMergeWriteStatus.mergeMetadataForWriteStatuses((List)statuses);
            org.junit.jupiter.api.Assertions.assertTrue((boolean)allWriteStatusMergedMetadataMap.containsKey("InputRecordCount_1506582000"));
            org.junit.jupiter.api.Assertions.assertEquals((Object)String.valueOf(2 * records.size()), allWriteStatusMergedMetadataMap.get("InputRecordCount_1506582000"));
        }
    }

    @ParameterizedTest
    @ValueSource(booleans={true, false})
    public void testUpsertPartitioner(boolean populateMetaFields) throws Exception {
        HoodieWriteConfig.Builder cfgBuilder = this.getConfigBuilder(true);
        this.addConfigsForPopulateMetaFields(cfgBuilder, populateMetaFields);
        HoodieWriteConfig cfg = cfgBuilder.build();
        this.setUp((Properties)cfg.getProps());
        try (SparkRDDWriteClient client = this.getHoodieWriteClient(cfg);){
            String newCommitTime = "001";
            client.startCommitWithTime(newCommitTime);
            List records = this.dataGen.generateInserts(newCommitTime, Integer.valueOf(20));
            JavaRDD writeRecords = this.jsc().parallelize(records, 1);
            List statuses = client.upsert(writeRecords, newCommitTime).collect();
            Assertions.assertNoWriteErrors((List)statuses);
            HoodieSparkTable hoodieTable = HoodieSparkTable.create((HoodieWriteConfig)cfg, (HoodieEngineContext)this.context(), (HoodieTableMetaClient)this.metaClient);
            Option deltaCommit = this.metaClient.getActiveTimeline().getDeltaCommitTimeline().firstInstant();
            org.junit.jupiter.api.Assertions.assertTrue((boolean)deltaCommit.isPresent());
            org.junit.jupiter.api.Assertions.assertEquals((Object)"001", (Object)((HoodieInstant)deltaCommit.get()).requestedTime(), (String)"Delta commit should be 001");
            Option commit = this.metaClient.getActiveTimeline().getCommitAndReplaceTimeline().firstInstant();
            org.junit.jupiter.api.Assertions.assertFalse((boolean)commit.isPresent());
            List allFiles = this.listAllBaseFilesInPath((HoodieTable)hoodieTable);
            HoodieTableFileSystemView roView = this.getHoodieTableFileSystemView(this.metaClient, this.metaClient.getCommitsTimeline().filterCompletedInstants(), allFiles);
            Stream dataFilesToRead = roView.getLatestBaseFiles();
            Map<String, Long> fileIdToSize = dataFilesToRead.collect(Collectors.toMap(HoodieBaseFile::getFileId, BaseFile::getFileSize));
            roView = this.getHoodieTableFileSystemView(this.metaClient, hoodieTable.getCompletedCommitsTimeline(), allFiles);
            dataFilesToRead = roView.getLatestBaseFiles();
            List dataFilesList = dataFilesToRead.collect(Collectors.toList());
            org.junit.jupiter.api.Assertions.assertTrue((dataFilesList.size() > 0 ? 1 : 0) != 0, (String)"Should list the base files we wrote in the delta commit");
            newCommitTime = "002";
            client.startCommitWithTime(newCommitTime);
            List newRecords = this.dataGen.generateUpdates(newCommitTime, records);
            newRecords.addAll(this.dataGen.generateInserts(newCommitTime, Integer.valueOf(20)));
            statuses = client.upsert(this.jsc().parallelize(newRecords), newCommitTime).collect();
            Assertions.assertNoWriteErrors((List)statuses);
            this.metaClient = HoodieTableMetaClient.reload((HoodieTableMetaClient)this.metaClient);
            deltaCommit = this.metaClient.getActiveTimeline().getDeltaCommitTimeline().lastInstant();
            org.junit.jupiter.api.Assertions.assertTrue((boolean)deltaCommit.isPresent());
            org.junit.jupiter.api.Assertions.assertEquals((Object)"002", (Object)((HoodieInstant)deltaCommit.get()).requestedTime(), (String)"Latest Delta commit should be 002");
            commit = this.metaClient.getActiveTimeline().getCommitAndReplaceTimeline().firstInstant();
            org.junit.jupiter.api.Assertions.assertFalse((boolean)commit.isPresent());
            allFiles = this.listAllBaseFilesInPath((HoodieTable)hoodieTable);
            roView = this.getHoodieTableFileSystemView(this.metaClient, hoodieTable.getActiveTimeline().reload().getCommitsTimeline().filterCompletedInstants(), allFiles);
            dataFilesToRead = roView.getLatestBaseFiles();
            List newDataFilesList = dataFilesToRead.collect(Collectors.toList());
            Map<String, Long> fileIdToNewSize = newDataFilesList.stream().collect(Collectors.toMap(HoodieBaseFile::getFileId, BaseFile::getFileSize));
            org.junit.jupiter.api.Assertions.assertTrue((boolean)fileIdToNewSize.entrySet().stream().anyMatch(entry -> (Long)fileIdToSize.get(entry.getKey()) < (Long)entry.getValue()));
            List inputPaths = roView.getLatestBaseFiles().map(baseFile -> new Path(baseFile.getPath()).getParent().toString()).collect(Collectors.toList());
            List recordsRead = HoodieMergeOnReadTestUtils.getRecordsUsingInputFormat((StorageConfiguration)this.storageConf(), inputPaths, (String)this.basePath(), (JobConf)new JobConf((Configuration)this.storageConf().unwrap()), (boolean)true, (boolean)populateMetaFields);
            org.junit.jupiter.api.Assertions.assertEquals((int)40, (int)recordsRead.size(), (String)"Must contain 40 records");
        }
    }

    @Test
    public void testUpsertPartitionerWithTableVersionSix() throws Exception {
        HoodieWriteConfig.Builder cfgBuilder = this.getConfigBuilder(true);
        this.addConfigsForPopulateMetaFields(cfgBuilder, true);
        cfgBuilder.withWriteTableVersion(6);
        HoodieWriteConfig cfg = cfgBuilder.build();
        Properties props = this.getPropertiesForKeyGen(true);
        props.put(HoodieWriteConfig.WRITE_TABLE_VERSION.key(), "6");
        this.metaClient = this.getHoodieMetaClient(this.storageConf(), this.basePath(), props, HoodieTableType.MERGE_ON_READ);
        this.dataGen = new HoodieTestDataGenerator();
        try (SparkRDDWriteClient client = this.getHoodieWriteClient(cfg);){
            String newCommitTime = "001";
            client.startCommitWithTime(newCommitTime);
            List records = this.dataGen.generateInserts(newCommitTime, Integer.valueOf(20));
            JavaRDD writeRecords = this.jsc().parallelize(records, 1);
            List statuses = client.upsert(writeRecords, newCommitTime).collect();
            Assertions.assertNoWriteErrors((List)statuses);
            HoodieSparkTable hoodieTable = HoodieSparkTable.create((HoodieWriteConfig)cfg, (HoodieEngineContext)this.context(), (HoodieTableMetaClient)this.metaClient);
            Option deltaCommit = this.metaClient.getActiveTimeline().getDeltaCommitTimeline().firstInstant();
            org.junit.jupiter.api.Assertions.assertTrue((boolean)deltaCommit.isPresent());
            org.junit.jupiter.api.Assertions.assertEquals((Object)"001", (Object)((HoodieInstant)deltaCommit.get()).requestedTime(), (String)"Delta commit should be 001");
            Option commit = this.metaClient.getActiveTimeline().getCommitAndReplaceTimeline().firstInstant();
            org.junit.jupiter.api.Assertions.assertFalse((boolean)commit.isPresent());
            List allFiles = this.listAllBaseFilesInPath((HoodieTable)hoodieTable);
            HoodieTableFileSystemView roView = this.getHoodieTableFileSystemView(this.metaClient, this.metaClient.getCommitsTimeline().filterCompletedInstants(), allFiles);
            HashMap<String, String> baseFileMapping = new HashMap<String, String>();
            HashMap<String, List<String>> baseFileToLogFileMapping = new HashMap<String, List<String>>();
            HoodieTableFileSystemView finalRoView = roView;
            Arrays.stream(HoodieTestDataGenerator.DEFAULT_PARTITION_PATHS).forEach(arg_0 -> TestHoodieMergeOnReadTable.lambda$testUpsertPartitionerWithTableVersionSix$2((TableFileSystemView.BaseFileOnlyView)finalRoView, baseFileMapping, baseFileToLogFileMapping, arg_0));
            this.writeAndValidateLogFileBaseInstantTimeMatches(client, "002", records, cfg, baseFileMapping, baseFileToLogFileMapping);
            this.writeAndValidateLogFileBaseInstantTimeMatches(client, "003", records, cfg, baseFileMapping, baseFileToLogFileMapping);
            this.writeAndValidateLogFileBaseInstantTimeMatches(client, "004", records, cfg, baseFileMapping, baseFileToLogFileMapping);
        }
    }

    private void writeAndValidateLogFileBaseInstantTimeMatches(SparkRDDWriteClient client, String newCommitTime, List<HoodieRecord> records, HoodieWriteConfig cfg, Map<String, String> baseFileMapping, Map<String, List<String>> baseFileToLogFileMapping) throws IOException {
        HoodieSparkTable hoodieTable;
        client.startCommitWithTime(newCommitTime);
        List newRecords = this.dataGen.generateUpdates(newCommitTime, records);
        List statuses = client.upsert(this.jsc().parallelize(newRecords), newCommitTime).collect();
        this.validateNewData(newRecords);
        Assertions.assertNoWriteErrors((List)statuses);
        this.metaClient = HoodieTableMetaClient.reload((HoodieTableMetaClient)this.metaClient);
        Option deltaCommit = this.metaClient.getActiveTimeline().getDeltaCommitTimeline().lastInstant();
        org.junit.jupiter.api.Assertions.assertTrue((boolean)deltaCommit.isPresent());
        org.junit.jupiter.api.Assertions.assertEquals((Object)newCommitTime, (Object)((HoodieInstant)deltaCommit.get()).requestedTime(), (String)"Latest Delta commit should be 002");
        HoodieSparkTable finalHoodieTable = hoodieTable = HoodieSparkTable.create((HoodieWriteConfig)cfg, (HoodieEngineContext)this.context(), (HoodieTableMetaClient)this.metaClient);
        baseFileMapping.entrySet().forEach(arg_0 -> TestHoodieMergeOnReadTable.lambda$writeAndValidateLogFileBaseInstantTimeMatches$3((HoodieTable)finalHoodieTable, baseFileToLogFileMapping, arg_0));
    }

    private void validateNewData(List<HoodieRecord> newRecords) {
        Dataset inputDf = this.spark().read().json(this.jsc().parallelize(RawTripTestPayload.recordsToStrings(newRecords), 2)).drop("partition");
        List updatedKeys = inputDf.select("_row_key", new String[0]).as(Encoders.STRING()).collectAsList();
        Dataset outputDf = this.spark().read().format("hudi").load(this.basePath());
        outputDf = outputDf.drop(new String[]{HoodieRecord.RECORD_KEY_METADATA_FIELD, HoodieRecord.PARTITION_PATH_METADATA_FIELD, HoodieRecord.FILENAME_METADATA_FIELD, HoodieRecord.COMMIT_TIME_METADATA_FIELD, HoodieRecord.COMMIT_SEQNO_METADATA_FIELD});
        outputDf = outputDf.filter(outputDf.col("_row_key").isin(updatedKeys.toArray()));
        org.junit.jupiter.api.Assertions.assertTrue((boolean)TestHoodieMergeOnReadTable.areDataframesEqual((Dataset)inputDf, (Dataset)outputDf, new HashSet<String>(Arrays.asList("_hoodie_is_deleted", "_row_key", "begin_lat", "begin_lon", "current_ts", "distance_in_meters", "driver", "end_lat", "end_lon", "fare"))), (String)"Dataframe mismatch");
    }

    @Test
    public void testLogFileCountsAfterCompaction() throws Exception {
        boolean populateMetaFields = true;
        HoodieWriteConfig.Builder cfgBuilder = this.getConfigBuilder(true, false, HoodieIndex.IndexType.BLOOM, 0x40000000L, HoodieClusteringConfig.newBuilder().build());
        this.addConfigsForPopulateMetaFields(cfgBuilder, populateMetaFields);
        HoodieWriteConfig config = cfgBuilder.build();
        this.setUp((Properties)config.getProps());
        try (SparkRDDWriteClient writeClient = this.getHoodieWriteClient(config);){
            String newCommitTime = "100";
            writeClient.startCommitWithTime(newCommitTime);
            List records = this.dataGen.generateInserts(newCommitTime, Integer.valueOf(100));
            JavaRDD recordsRDD = this.jsc().parallelize(records, 1);
            writeClient.insert(recordsRDD, newCommitTime).collect();
            newCommitTime = "101";
            List updatedRecords = this.dataGen.generateUpdates(newCommitTime, records);
            JavaRDD updatedRecordsRDD = this.jsc().parallelize(updatedRecords, 1);
            SparkRDDReadClient readClient = new SparkRDDReadClient(this.context(), config);
            JavaRDD updatedTaggedRecordsRDD = readClient.tagLocation(updatedRecordsRDD);
            writeClient.startCommitWithTime(newCommitTime);
            writeClient.upsertPreppedRecords(updatedTaggedRecordsRDD, newCommitTime).collect();
            this.metaClient = HoodieTableMetaClient.reload((HoodieTableMetaClient)this.metaClient);
            try (HoodieTableMetadataWriter metadataWriter = SparkHoodieBackedTableMetadataWriter.create((StorageConfiguration)writeClient.getEngineContext().getStorageConf(), (HoodieWriteConfig)config, (HoodieEngineContext)writeClient.getEngineContext());){
                HoodieSparkWriteableTestTable testTable = HoodieSparkWriteableTestTable.of((HoodieTableMetaClient)this.metaClient, (Schema)HoodieTestDataGenerator.AVRO_SCHEMA_WITH_METADATA_FIELDS, (HoodieTableMetadataWriter)metadataWriter);
                Set<String> allPartitions = updatedRecords.stream().map(record -> record.getPartitionPath()).collect(Collectors.groupingBy(partitionPath -> partitionPath)).keySet();
                org.junit.jupiter.api.Assertions.assertEquals((int)allPartitions.size(), (int)testTable.listAllBaseFiles().size());
                HoodieSparkTable table = HoodieSparkTable.create((HoodieWriteConfig)config, (HoodieEngineContext)this.context(), (HoodieTableMetaClient)this.metaClient);
                for (String string : this.dataGen.getPartitionPaths()) {
                    List groupedLogFiles = table.getSliceView().getLatestFileSlices(string).collect(Collectors.toList());
                    for (FileSlice fileSlice : groupedLogFiles) {
                        org.junit.jupiter.api.Assertions.assertEquals((long)1L, (long)fileSlice.getLogFiles().count(), (String)("There should be 1 log file written for the latest data file - " + fileSlice));
                    }
                }
                String compactionInstantTime = writeClient.scheduleCompaction(Option.empty()).get().toString();
                HoodieWriteMetadata result = writeClient.compact(compactionInstantTime);
                this.metaClient = HoodieTableMetaClient.reload((HoodieTableMetaClient)this.metaClient);
                table = HoodieSparkTable.create((HoodieWriteConfig)config, (HoodieEngineContext)this.context(), (HoodieTableMetaClient)this.metaClient);
                HoodieActiveTimeline timeline = this.metaClient.getActiveTimeline();
                org.junit.jupiter.api.Assertions.assertTrue((boolean)InstantComparison.compareTimestamps((String)((HoodieInstant)timeline.lastInstant().get()).requestedTime(), (BiPredicate)InstantComparison.GREATER_THAN, (String)newCommitTime), (String)"Compaction commit should be > than last insert");
                for (String partitionPath3 : this.dataGen.getPartitionPaths()) {
                    List groupedLogFiles = table.getSliceView().getLatestFileSlices(partitionPath3).collect(Collectors.toList());
                    for (FileSlice slice : groupedLogFiles) {
                        org.junit.jupiter.api.Assertions.assertEquals((long)0L, (long)slice.getLogFiles().count(), (String)"After compaction there should be no log files visible on a full view");
                    }
                    org.junit.jupiter.api.Assertions.assertTrue((boolean)((HoodieCommitMetadata)result.getCommitMetadata().get()).getWritePartitionPaths().stream().anyMatch(part -> part.contentEquals(partitionPath3)));
                }
                String[] stringArray = new String[this.dataGen.getPartitionPaths().length];
                for (int i = 0; i < stringArray.length; ++i) {
                    stringArray[i] = String.format("%s/%s/*", this.basePath(), this.dataGen.getPartitionPaths()[i]);
                }
                Dataset actual = HoodieClientTestUtils.read((JavaSparkContext)this.jsc(), (String)this.basePath(), (SQLContext)this.sqlContext(), (HoodieStorage)this.hoodieStorage(), (String[])stringArray);
                List rows = actual.collectAsList();
                org.junit.jupiter.api.Assertions.assertEquals((int)updatedRecords.size(), (int)rows.size());
                for (Row row : rows) {
                    org.junit.jupiter.api.Assertions.assertEquals((Object)row.getAs(HoodieRecord.COMMIT_TIME_METADATA_FIELD), (Object)newCommitTime);
                }
            }
        }
    }

    @ParameterizedTest
    @CsvSource(value={"true,avro", "true,parquet", "false,avro", "false,parquet"})
    public void testLogBlocksCountsAfterLogCompaction(boolean populateMetaFields, String logFileFormat) throws Exception {
        HoodieCompactionConfig compactionConfig = HoodieCompactionConfig.newBuilder().withMaxNumDeltaCommitsBeforeCompaction(1).withLogCompactionBlocksThreshold(1).build();
        HoodieWriteConfig.Builder cfgBuilder = this.getConfigBuilder(true).withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(true).build()).withCompactionConfig(compactionConfig);
        this.addConfigsForPopulateMetaFields(cfgBuilder, populateMetaFields);
        HoodieWriteConfig config = cfgBuilder.withStorageConfig(HoodieStorageConfig.newBuilder().hfileMaxFileSize(0x40000000L).parquetMaxFileSize(0x40000000L).logFileDataBlockFormat(logFileFormat).build()).build();
        this.setUp((Properties)config.getProps());
        try (SparkRDDWriteClient writeClient = this.getHoodieWriteClient(config);){
            String newCommitTime = "100";
            writeClient.startCommitWithTime(newCommitTime);
            List records = this.dataGen.generateInserts(newCommitTime, Integer.valueOf(100));
            JavaRDD recordsRDD = this.jsc().parallelize(records, 1);
            writeClient.insert(recordsRDD, newCommitTime).collect();
            newCommitTime = "101";
            List updatedRecords = this.dataGen.generateUpdates(newCommitTime, records);
            JavaRDD updatedRecordsRDD = this.jsc().parallelize(updatedRecords, 1);
            HoodieReadClient readClient = new HoodieReadClient(this.context(), config);
            JavaRDD updatedTaggedRecordsRDD = readClient.tagLocation(updatedRecordsRDD);
            writeClient.startCommitWithTime(newCommitTime);
            writeClient.upsertPreppedRecords(updatedTaggedRecordsRDD, newCommitTime).collect();
            newCommitTime = "102";
            writeClient.startCommitWithTime(newCommitTime);
            writeClient.upsertPreppedRecords(updatedTaggedRecordsRDD, newCommitTime).collect();
            this.metaClient = HoodieTableMetaClient.reload((HoodieTableMetaClient)this.metaClient);
            try (HoodieTableMetadataWriter metadataWriter = SparkHoodieBackedTableMetadataWriter.create((StorageConfiguration)writeClient.getEngineContext().getStorageConf(), (HoodieWriteConfig)config, (HoodieEngineContext)writeClient.getEngineContext());){
                HoodieSparkWriteableTestTable testTable = HoodieSparkWriteableTestTable.of((HoodieTableMetaClient)this.metaClient, (Schema)HoodieTestDataGenerator.AVRO_SCHEMA_WITH_METADATA_FIELDS, (HoodieTableMetadataWriter)metadataWriter);
                Set<String> allPartitions = updatedRecords.stream().map(record -> record.getPartitionPath()).collect(Collectors.groupingBy(partitionPath -> partitionPath)).keySet();
                org.junit.jupiter.api.Assertions.assertEquals((int)allPartitions.size(), (int)testTable.listAllBaseFiles().size());
                HoodieSparkTable table = HoodieSparkTable.create((HoodieWriteConfig)config, (HoodieEngineContext)this.context(), (HoodieTableMetaClient)this.metaClient);
                for (String partitionPath2 : this.dataGen.getPartitionPaths()) {
                    List groupedLogFiles = table.getSliceView().getLatestFileSlices(partitionPath2).collect(Collectors.toList());
                    for (FileSlice fileSlice : groupedLogFiles) {
                        org.junit.jupiter.api.Assertions.assertEquals((long)2L, (long)fileSlice.getLogFiles().count(), (String)("There should be 1 log file written for the latest data file - " + fileSlice));
                    }
                }
                String logCompactionInstantTime = writeClient.scheduleLogCompaction(Option.empty()).get().toString();
                HoodieWriteMetadata result = writeClient.logCompact(logCompactionInstantTime);
                this.metaClient = HoodieTableMetaClient.reload((HoodieTableMetaClient)this.metaClient);
                table = HoodieSparkTable.create((HoodieWriteConfig)config, (HoodieEngineContext)this.context(), (HoodieTableMetaClient)this.metaClient);
                HoodieActiveTimeline timeline = this.metaClient.getActiveTimeline();
                org.junit.jupiter.api.Assertions.assertTrue((boolean)InstantComparison.compareTimestamps((String)((HoodieInstant)timeline.lastInstant().get()).requestedTime(), (BiPredicate)InstantComparison.GREATER_THAN, (String)newCommitTime), (String)"Compaction commit should be > than last insert");
                for (String partitionPath3 : this.dataGen.getPartitionPaths()) {
                    List fileSlices = table.getSliceView().getLatestFileSlices(partitionPath3).collect(Collectors.toList());
                    org.junit.jupiter.api.Assertions.assertEquals((int)1, (int)fileSlices.size());
                    for (FileSlice slice : fileSlices) {
                        org.junit.jupiter.api.Assertions.assertEquals((long)3L, (long)slice.getLogFiles().count(), (String)"After compaction there will still be one log file.");
                        org.junit.jupiter.api.Assertions.assertNotNull((Object)slice.getBaseFile(), (String)"Base file is not created by log compaction operation.");
                    }
                    org.junit.jupiter.api.Assertions.assertTrue((boolean)((HoodieCommitMetadata)result.getCommitMetadata().get()).getWritePartitionPaths().stream().anyMatch(part -> part.contentEquals(partitionPath3)));
                }
            }
        }
    }

    @ParameterizedTest
    @ValueSource(booleans={true, false})
    public void testMetadataStatsOnCommit(Boolean rollbackUsingMarkers) throws Exception {
        HoodieWriteConfig cfg = this.getConfigBuilder(false, rollbackUsingMarkers, HoodieIndex.IndexType.INMEMORY).withAvroSchemaValidate(false).withAllowAutoEvolutionColumnDrop(true).withAutoCommit(false).withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(true).withMetadataIndexColumnStats(false).build()).build();
        this.setUp((Properties)cfg.getProps());
        try (SparkRDDWriteClient client = this.getHoodieWriteClient(cfg);){
            HoodieSparkTable table = HoodieSparkTable.create((HoodieWriteConfig)cfg, (HoodieEngineContext)this.context(), (HoodieTableMetaClient)this.metaClient);
            HoodieActiveTimeline activeTimeline = table.getActiveTimeline();
            String commitActionType = table.getMetaClient().getCommitActionType();
            ArrayList instants = new ArrayList();
            String instant0 = this.metaClient.createNewInstantTime();
            HoodieInstant instant = HoodieTestUtils.INSTANT_GENERATOR.createNewInstant(HoodieInstant.State.REQUESTED, commitActionType, instant0);
            activeTimeline.createNewInstant(instant);
            activeTimeline.transitionRequestedToInflight(instant, Option.empty());
            instant = HoodieTestUtils.INSTANT_GENERATOR.createNewInstant(HoodieInstant.State.INFLIGHT, commitActionType, instant0);
            activeTimeline.saveAsComplete(instant, Option.empty());
            String instant1 = this.metaClient.createNewInstantTime();
            client.startCommitWithTime(instant1);
            List records = this.dataGen.generateInserts(instant1, Integer.valueOf(200));
            JavaRDD writeRecords = this.jsc().parallelize(records, 1);
            JavaRDD statuses = client.insert(writeRecords, instant1);
            org.junit.jupiter.api.Assertions.assertTrue((boolean)client.commit(instant1, (Object)statuses), (String)"Commit should succeed");
            table = HoodieSparkTable.create((HoodieWriteConfig)cfg, (HoodieEngineContext)this.context());
            HoodieInstant instantOne = (HoodieInstant)table.getActiveTimeline().getDeltaCommitTimeline().lastInstant().get();
            HoodieCommitMetadata metadata = (HoodieCommitMetadata)this.metaClient.getCommitMetadataSerDe().deserialize(instantOne, (byte[])table.getActiveTimeline().getInstantDetails(instantOne).get(), HoodieCommitMetadata.class);
            int inserts = 0;
            for (Map.Entry pstat : metadata.getPartitionToWriteStats().entrySet()) {
                for (Object stat : (List)pstat.getValue()) {
                    inserts = (int)((long)inserts + stat.getNumInserts());
                }
            }
            org.junit.jupiter.api.Assertions.assertEquals((int)200, (int)inserts);
            String instant2 = this.metaClient.createNewInstantTime();
            client.startCommitWithTime(instant2);
            records = this.dataGen.generateUpdates(instant2, records);
            writeRecords = this.jsc().parallelize(records, 1);
            statuses = client.upsert(writeRecords, instant2);
            inserts = 0;
            int upserts = 0;
            List writeStatusList = statuses.collect();
            for (WriteStatus ws : writeStatusList) {
                inserts = (int)((long)inserts + ws.getStat().getNumInserts());
                upserts = (int)((long)upserts + ws.getStat().getNumUpdateWrites());
            }
            org.junit.jupiter.api.Assertions.assertEquals((int)0, (int)inserts);
            org.junit.jupiter.api.Assertions.assertEquals((int)200, (int)upserts);
            client.rollback(instant2);
            table = HoodieSparkTable.create((HoodieWriteConfig)cfg, (HoodieEngineContext)this.context());
            HoodieInstant instant3 = (HoodieInstant)table.getActiveTimeline().getDeltaCommitTimeline().lastInstant().get();
            metadata = (HoodieCommitMetadata)this.metaClient.getCommitMetadataSerDe().deserialize(instant3, (byte[])table.getActiveTimeline().getInstantDetails(instant3).get(), HoodieCommitMetadata.class);
            inserts = 0;
            upserts = 0;
            for (Map.Entry pstat : metadata.getPartitionToWriteStats().entrySet()) {
                for (HoodieWriteStat stat : (List)pstat.getValue()) {
                    inserts = (int)((long)inserts + stat.getNumInserts());
                    upserts = (int)((long)upserts + stat.getNumUpdateWrites());
                }
            }
            org.junit.jupiter.api.Assertions.assertEquals((int)200, (int)inserts);
            org.junit.jupiter.api.Assertions.assertEquals((int)0, (int)upserts);
        }
    }

    @Test
    public void testRollingStatsWithSmallFileHandling() throws Exception {
        HoodieWriteConfig cfg = this.getConfigBuilder(false, HoodieIndex.IndexType.INMEMORY).withAutoCommit(false).build();
        this.setUp((Properties)cfg.getProps());
        try (SparkRDDWriteClient client = this.getHoodieWriteClient(cfg);){
            HashMap<String, Long> fileIdToInsertsMap = new HashMap<String, Long>();
            HashMap<String, Long> fileIdToUpsertsMap = new HashMap<String, Long>();
            String instantTime = "000";
            client.startCommitWithTime(instantTime);
            List records = this.dataGen.generateInserts(instantTime, Integer.valueOf(200));
            JavaRDD writeRecords = this.jsc().parallelize(records, 1);
            JavaRDD statuses = client.insert(writeRecords, instantTime);
            org.junit.jupiter.api.Assertions.assertTrue((boolean)client.commit(instantTime, (Object)statuses), (String)"Commit should succeed");
            HoodieSparkTable table = HoodieSparkTable.create((HoodieWriteConfig)cfg, (HoodieEngineContext)this.context());
            HoodieInstant instantOne = (HoodieInstant)table.getActiveTimeline().getDeltaCommitTimeline().lastInstant().get();
            HoodieCommitMetadata metadata = (HoodieCommitMetadata)this.metaClient.getCommitMetadataSerDe().deserialize(instantOne, (byte[])table.getActiveTimeline().getInstantDetails(instantOne).get(), HoodieCommitMetadata.class);
            int inserts = 0;
            for (Map.Entry pstat : metadata.getPartitionToWriteStats().entrySet()) {
                for (HoodieWriteStat hoodieWriteStat : (List)pstat.getValue()) {
                    inserts = (int)((long)inserts + hoodieWriteStat.getNumInserts());
                    fileIdToInsertsMap.put(hoodieWriteStat.getFileId(), hoodieWriteStat.getNumInserts());
                    fileIdToUpsertsMap.put(hoodieWriteStat.getFileId(), hoodieWriteStat.getNumUpdateWrites());
                }
            }
            org.junit.jupiter.api.Assertions.assertEquals((int)200, (int)inserts);
            instantTime = "001";
            client.startCommitWithTime(instantTime);
            records = this.dataGen.generateUpdates(instantTime, records);
            records.addAll(this.dataGen.generateInserts(instantTime, Integer.valueOf(200)));
            writeRecords = this.jsc().parallelize(records, 1);
            statuses = client.upsert(writeRecords, instantTime);
            org.junit.jupiter.api.Assertions.assertTrue((boolean)client.commit(instantTime, (Object)statuses), (String)"Commit should succeed");
            table = HoodieSparkTable.create((HoodieWriteConfig)cfg, (HoodieEngineContext)this.context());
            HoodieInstant instantTwo = (HoodieInstant)table.getActiveTimeline().getDeltaCommitTimeline().lastInstant().get();
            metadata = (HoodieCommitMetadata)this.metaClient.getCommitMetadataSerDe().deserialize(instantTwo, (byte[])table.getActiveTimeline().getInstantDetails(instantTwo).get(), HoodieCommitMetadata.class);
            inserts = 0;
            int upserts = 0;
            for (Map.Entry entry : metadata.getPartitionToWriteStats().entrySet()) {
                for (Object stat : (List)entry.getValue()) {
                    org.junit.jupiter.api.Assertions.assertTrue((boolean)fileIdToInsertsMap.containsKey(stat.getFileId()));
                    org.junit.jupiter.api.Assertions.assertTrue((boolean)fileIdToUpsertsMap.containsKey(stat.getFileId()));
                    inserts = (int)((long)inserts + stat.getNumInserts());
                    upserts = (int)((long)upserts + stat.getNumUpdateWrites());
                }
            }
            org.junit.jupiter.api.Assertions.assertEquals((int)200, (int)inserts);
            org.junit.jupiter.api.Assertions.assertEquals((int)200, (int)upserts);
            instantTime = "002";
            client.scheduleCompactionAtInstant(instantTime, Option.of((Object)metadata.getExtraMetadata()));
            HoodieWriteMetadata compactionMetadata = client.compact(instantTime);
            statuses = (JavaRDD)compactionMetadata.getWriteStatuses();
            client.commitCompaction(instantTime, (HoodieCommitMetadata)compactionMetadata.getCommitMetadata().get(), Option.empty());
            table = HoodieSparkTable.create((HoodieWriteConfig)cfg, (HoodieEngineContext)this.context());
            HoodieInstant hoodieInstant = (HoodieInstant)table.getActiveTimeline().getCommitsTimeline().lastInstant().get();
            HoodieCommitMetadata metadata1 = (HoodieCommitMetadata)this.metaClient.getCommitMetadataSerDe().deserialize(hoodieInstant, (byte[])table.getActiveTimeline().getInstantDetails(hoodieInstant).get(), HoodieCommitMetadata.class);
            for (Map.Entry entry : metadata.getPartitionToWriteStats().entrySet()) {
                org.junit.jupiter.api.Assertions.assertTrue((boolean)metadata1.getPartitionToWriteStats().containsKey(entry.getKey()));
                org.junit.jupiter.api.Assertions.assertEquals((int)((List)metadata1.getPartitionToWriteStats().get(entry.getKey())).size(), (int)((List)entry.getValue()).size());
            }
            instantTime = "003";
            client.startCommitWithTime(instantTime);
            records = this.dataGen.generateUpdates(instantTime, records);
            records.addAll(this.dataGen.generateInserts(instantTime, Integer.valueOf(200)));
            writeRecords = this.jsc().parallelize(records, 1);
            statuses = client.upsert(writeRecords, instantTime);
            org.junit.jupiter.api.Assertions.assertTrue((boolean)client.commit(instantTime, (Object)statuses), (String)"Commit should succeed");
            table = HoodieSparkTable.create((HoodieWriteConfig)cfg, (HoodieEngineContext)this.context());
            HoodieInstant instant = (HoodieInstant)table.getActiveTimeline().getDeltaCommitTimeline().lastInstant().get();
            metadata = (HoodieCommitMetadata)this.metaClient.getCommitMetadataSerDe().deserialize(instant, (byte[])table.getActiveTimeline().getInstantDetails(instant).get(), HoodieCommitMetadata.class);
            inserts = 0;
            upserts = 0;
            for (Map.Entry pstat : metadata.getPartitionToWriteStats().entrySet()) {
                for (HoodieWriteStat stat : (List)pstat.getValue()) {
                    org.junit.jupiter.api.Assertions.assertTrue((boolean)fileIdToInsertsMap.containsKey(stat.getFileId()));
                    inserts = (int)((long)inserts + stat.getNumInserts());
                    upserts = (int)((long)upserts + stat.getNumUpdateWrites());
                }
            }
            org.junit.jupiter.api.Assertions.assertEquals((int)200, (int)inserts);
            org.junit.jupiter.api.Assertions.assertEquals((int)400, (int)upserts);
        }
    }

    @Test
    public void testHandleUpdateWithMultiplePartitions() throws Exception {
        HoodieWriteConfig cfg = this.getConfig(true);
        this.setUp((Properties)cfg.getProps());
        try (SparkRDDWriteClient client = this.getHoodieWriteClient(cfg);){
            String newCommitTime = "001";
            client.startCommitWithTime(newCommitTime);
            List records = this.dataGen.generateInserts(newCommitTime, Integer.valueOf(20));
            JavaRDD writeRecords = this.jsc().parallelize(records, 1);
            List statuses = client.upsert(writeRecords, newCommitTime).collect();
            Assertions.assertNoWriteErrors((List)statuses);
            HoodieSparkMergeOnReadTable hoodieTable = (HoodieSparkMergeOnReadTable)HoodieSparkTable.create((HoodieWriteConfig)cfg, (HoodieEngineContext)this.context(), (HoodieTableMetaClient)this.metaClient);
            Option deltaCommit = this.metaClient.getActiveTimeline().getDeltaCommitTimeline().firstInstant();
            org.junit.jupiter.api.Assertions.assertTrue((boolean)deltaCommit.isPresent());
            org.junit.jupiter.api.Assertions.assertEquals((Object)"001", (Object)((HoodieInstant)deltaCommit.get()).requestedTime(), (String)"Delta commit should be 001");
            Option commit = this.metaClient.getActiveTimeline().getCommitAndReplaceTimeline().firstInstant();
            org.junit.jupiter.api.Assertions.assertFalse((boolean)commit.isPresent());
            List allFiles = this.listAllBaseFilesInPath((HoodieTable)hoodieTable);
            HoodieTableFileSystemView roView = this.getHoodieTableFileSystemView(this.metaClient, this.metaClient.getCommitTimeline().filterCompletedInstants(), allFiles);
            Stream dataFilesToRead = roView.getLatestBaseFiles();
            org.junit.jupiter.api.Assertions.assertFalse((boolean)dataFilesToRead.findAny().isPresent());
            roView = this.getHoodieTableFileSystemView(this.metaClient, hoodieTable.getCompletedCommitsTimeline(), allFiles);
            dataFilesToRead = roView.getLatestBaseFiles();
            org.junit.jupiter.api.Assertions.assertTrue((boolean)dataFilesToRead.findAny().isPresent(), (String)"should list the base files we wrote in the delta commit");
            newCommitTime = "002";
            client.startCommitWithTime(newCommitTime);
            this.metaClient.reloadActiveTimeline();
            records = this.dataGen.generateUpdates(newCommitTime, records);
            writeRecords = this.jsc().parallelize(records, 1);
            statuses = client.upsert(writeRecords, newCommitTime).collect();
            Assertions.assertNoWriteErrors((List)statuses);
            String newDeleteTime = "004";
            String partitionPath = ((HoodieRecord)records.get(0)).getPartitionPath();
            String fileId = ((WriteStatus)statuses.get(0)).getFileId();
            client.startCommitWithTime("004");
            this.metaClient.reloadActiveTimeline();
            List fewRecordsForDelete = this.dataGen.generateDeletesFromExistingRecords(records);
            JavaRDD deleteRDD = this.jsc().parallelize(fewRecordsForDelete, 1);
            hoodieTable.getHoodieView().sync();
            SparkDeleteDeltaCommitActionExecutor actionExecutor = new SparkDeleteDeltaCommitActionExecutor(this.context(), cfg, (HoodieTable)hoodieTable, "004", (HoodieData)HoodieJavaRDD.of((JavaRDD)deleteRDD));
            actionExecutor.getUpsertPartitioner(new WorkloadProfile(HoodieSparkClientTestHarness.buildProfile((JavaRDD)deleteRDD)));
            List deleteStatus = this.jsc().parallelize(Arrays.asList(1)).map(arg_0 -> TestHoodieMergeOnReadTable.lambda$testHandleUpdateWithMultiplePartitions$7cc9a839$1((BaseSparkDeltaCommitActionExecutor)actionExecutor, partitionPath, fileId, fewRecordsForDelete, arg_0)).map(Transformations::flatten).collect();
            WriteStatus status = (WriteStatus)((List)deleteStatus.get(0)).get(0);
            org.junit.jupiter.api.Assertions.assertTrue((boolean)status.hasErrors());
            long numRecordsInPartition = fewRecordsForDelete.stream().filter(u -> u.getPartitionPath().equals(partitionPath)).count();
            org.junit.jupiter.api.Assertions.assertEquals((long)((long)fewRecordsForDelete.size() - numRecordsInPartition), (long)status.getTotalErrorRecords());
        }
    }

    private static /* synthetic */ Iterator lambda$testHandleUpdateWithMultiplePartitions$7cc9a839$1(BaseSparkDeltaCommitActionExecutor actionExecutor, String partitionPath, String fileId, List fewRecordsForDelete, Integer x) throws Exception {
        return actionExecutor.handleUpdate(partitionPath, fileId, fewRecordsForDelete.iterator());
    }

    private static /* synthetic */ void lambda$writeAndValidateLogFileBaseInstantTimeMatches$3(HoodieTable finalHoodieTable, Map baseFileToLogFileMapping, Map.Entry entry) {
        FileSlice fileSlice = (FileSlice)finalHoodieTable.getSliceView().getLatestFileSlices((String)entry.getKey()).collect(Collectors.toList()).get(0);
        String baseFileName = (String)entry.getValue();
        String baseInstantTime = FSUtils.getCommitTime((String)baseFileName);
        List logFiles = fileSlice.getLogFiles().collect(Collectors.toList());
        for (int counter = 0; counter < logFiles.size(); ++counter) {
            HoodieLogFile logFile = (HoodieLogFile)logFiles.get(counter);
            if (counter == logFiles.size() - 1) {
                ((List)baseFileToLogFileMapping.get(baseFileName)).add(logFile.getFileName());
            } else {
                ((List)baseFileToLogFileMapping.get(baseFileName)).contains(logFile.getFileName());
            }
            org.junit.jupiter.api.Assertions.assertEquals((Object)baseInstantTime, (Object)FSUtils.getDeltaCommitTimeFromLogPath((StoragePath)logFile.getPath()));
        }
    }

    private static /* synthetic */ void lambda$testUpsertPartitionerWithTableVersionSix$2(TableFileSystemView.BaseFileOnlyView finalRoView, Map baseFileMapping, Map baseFileToLogFileMapping, String partitionPath) {
        String baseFileName = ((HoodieBaseFile)finalRoView.getLatestBaseFiles(partitionPath).collect(Collectors.toList()).get(0)).getFileName();
        baseFileMapping.put(partitionPath, baseFileName);
        baseFileToLogFileMapping.put(baseFileName, new ArrayList());
    }
}

