/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.table.functional;

import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.attribute.FileAttribute;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.function.BiPredicate;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.avro.generic.GenericRecord;
import org.apache.hadoop.fs.Path;
import org.apache.hudi.client.SparkRDDWriteClient;
import org.apache.hudi.common.config.HoodieStorageConfig;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.model.HoodieBaseFile;
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy;
import org.apache.hudi.common.model.HoodieFileFormat;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.table.HoodieTableConfig;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.marker.MarkerType;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.InstantComparison;
import org.apache.hudi.common.table.view.FileSystemViewStorageConfig;
import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
import org.apache.hudi.common.table.view.SyncableFileSystemView;
import org.apache.hudi.common.table.view.TableFileSystemView;
import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
import org.apache.hudi.common.testutils.HoodieTestUtils;
import org.apache.hudi.common.util.CollectionUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieCleanConfig;
import org.apache.hudi.config.HoodieCompactionConfig;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.index.HoodieIndex;
import org.apache.hudi.storage.StorageConfiguration;
import org.apache.hudi.table.HoodieSparkTable;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.action.HoodieWriteMetadata;
import org.apache.hudi.testutils.Assertions;
import org.apache.hudi.testutils.HoodieMergeOnReadTestUtils;
import org.apache.hudi.testutils.SparkClientFunctionalTestHarness;
import org.apache.spark.api.java.JavaRDD;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;

@Tag(value="functional")
public class TestHoodieSparkMergeOnReadTableRollback
extends SparkClientFunctionalTestHarness {
    @ParameterizedTest
    @ValueSource(booleans={true, false})
    void testCOWToMORConvertedTableRollback(boolean rollbackUsingMarkers) throws Exception {
        HoodieTableMetaClient metaClient = this.getHoodieMetaClient(HoodieTableType.COPY_ON_WRITE);
        HoodieWriteConfig cfg = this.getConfig(false, rollbackUsingMarkers);
        try (SparkRDDWriteClient client = this.getHoodieWriteClient(cfg);){
            HoodieTestDataGenerator dataGen = new HoodieTestDataGenerator();
            String newCommitTime = "001";
            client.startCommitWithTime(newCommitTime);
            List records = dataGen.generateInserts(newCommitTime, Integer.valueOf(200));
            JavaRDD writeRecords = this.jsc().parallelize(records, 1);
            List statuses = client.upsert(writeRecords, newCommitTime).collect();
            Assertions.assertNoWriteErrors((List)statuses);
            client.commit(newCommitTime, (Object)this.jsc().parallelize(statuses));
            metaClient = HoodieTableMetaClient.reload((HoodieTableMetaClient)metaClient);
            Option commit = metaClient.getActiveTimeline().getCommitAndReplaceTimeline().firstInstant();
            org.junit.jupiter.api.Assertions.assertTrue((boolean)commit.isPresent());
            org.junit.jupiter.api.Assertions.assertEquals((Object)"001", (Object)((HoodieInstant)commit.get()).requestedTime(), (String)"commit should be 001");
            newCommitTime = "002";
            client.startCommitWithTime(newCommitTime);
            records = dataGen.generateUpdates(newCommitTime, records);
            statuses = client.upsert(this.jsc().parallelize(records, 1), newCommitTime).collect();
            Assertions.assertNoWriteErrors((List)statuses);
            metaClient = this.getHoodieMetaClient(HoodieTableType.MERGE_ON_READ);
            client.rollback(newCommitTime);
            metaClient = HoodieTableMetaClient.reload((HoodieTableMetaClient)metaClient);
            HoodieSparkTable hoodieTable = HoodieSparkTable.create((HoodieWriteConfig)cfg, (HoodieEngineContext)this.context(), (HoodieTableMetaClient)metaClient);
            List allFiles = this.listAllBaseFilesInPath((HoodieTable)hoodieTable);
            HoodieTableFileSystemView tableView = this.getHoodieTableFileSystemView(metaClient, hoodieTable.getCompletedCommitsTimeline(), allFiles);
            String absentCommit = newCommitTime;
            org.junit.jupiter.api.Assertions.assertAll(tableView.getLatestBaseFiles().map(file -> () -> org.junit.jupiter.api.Assertions.assertNotEquals((Object)absentCommit, (Object)file.getCommitTime())));
        }
    }

    @ParameterizedTest
    @ValueSource(booleans={true, false})
    void testRollbackWithDeltaAndCompactionCommit(boolean rollbackUsingMarkers) throws Exception {
        HoodieWriteConfig.Builder cfgBuilder = this.getConfigBuilder(false, rollbackUsingMarkers, HoodieIndex.IndexType.SIMPLE);
        this.addConfigsForPopulateMetaFields(cfgBuilder, true);
        HoodieWriteConfig cfg = cfgBuilder.build();
        Properties properties = CollectionUtils.copy((Properties)cfg.getProps());
        properties.setProperty(HoodieTableConfig.BASE_FILE_FORMAT.key(), ((HoodieFileFormat)HoodieTableConfig.BASE_FILE_FORMAT.defaultValue()).toString());
        HoodieTableMetaClient metaClient = this.getHoodieMetaClient(HoodieTableType.MERGE_ON_READ, properties);
        try (SparkRDDWriteClient client = this.getHoodieWriteClient(cfg);){
            HoodieTestDataGenerator dataGen = new HoodieTestDataGenerator();
            String newCommitTime = "000000001";
            client.startCommitWithTime(newCommitTime);
            List records = dataGen.generateInserts(newCommitTime, Integer.valueOf(200));
            JavaRDD writeRecords = this.jsc().parallelize(records, 1);
            JavaRDD writeStatusJavaRDD = client.upsert(writeRecords, newCommitTime);
            List statuses = writeStatusJavaRDD.collect();
            Assertions.assertNoWriteErrors((List)statuses);
            client.commit(newCommitTime, (Object)this.jsc().parallelize(statuses));
            HoodieSparkTable hoodieTable = HoodieSparkTable.create((HoodieWriteConfig)cfg, (HoodieEngineContext)this.context(), (HoodieTableMetaClient)metaClient);
            Option deltaCommit = metaClient.getActiveTimeline().getDeltaCommitTimeline().firstInstant();
            org.junit.jupiter.api.Assertions.assertTrue((boolean)deltaCommit.isPresent());
            org.junit.jupiter.api.Assertions.assertEquals((Object)"000000001", (Object)((HoodieInstant)deltaCommit.get()).requestedTime(), (String)"Delta commit should be 000000001");
            Option commit = metaClient.getActiveTimeline().getCommitAndReplaceTimeline().firstInstant();
            org.junit.jupiter.api.Assertions.assertFalse((boolean)commit.isPresent());
            List allFiles = this.listAllBaseFilesInPath((HoodieTable)hoodieTable);
            HoodieTableFileSystemView tableView = this.getHoodieTableFileSystemView(metaClient, metaClient.getCommitTimeline().filterCompletedInstants(), allFiles);
            Stream dataFilesToRead = tableView.getLatestBaseFiles();
            org.junit.jupiter.api.Assertions.assertFalse((boolean)dataFilesToRead.findAny().isPresent());
            tableView = this.getHoodieTableFileSystemView(metaClient, hoodieTable.getCompletedCommitsTimeline(), allFiles);
            dataFilesToRead = tableView.getLatestBaseFiles();
            org.junit.jupiter.api.Assertions.assertTrue((boolean)dataFilesToRead.findAny().isPresent(), (String)"should list the base files we wrote in the delta commit");
            String commitTime1 = "000000002";
            try (SparkRDDWriteClient secondClient = this.getHoodieWriteClient(this.getHoodieWriteConfigWithSmallFileHandlingOff(true));){
                secondClient.startCommitWithTime("000000002");
                List copyOfRecords = new ArrayList(records);
                copyOfRecords = dataGen.generateUpdates("000000002", copyOfRecords);
                copyOfRecords.addAll(dataGen.generateInserts("000000002", Integer.valueOf(200)));
                List inputPaths = tableView.getLatestBaseFiles().map(baseFile -> new Path(baseFile.getPath()).getParent().toString()).collect(Collectors.toList());
                List recordsRead = HoodieMergeOnReadTestUtils.getRecordsUsingInputFormat((StorageConfiguration)this.storageConf(), inputPaths, (String)this.basePath());
                org.junit.jupiter.api.Assertions.assertEquals((int)200, (int)recordsRead.size());
                statuses = secondClient.upsert(this.jsc().parallelize(copyOfRecords, 1), "000000002").collect();
                Assertions.assertNoWriteErrors((List)statuses);
                secondClient.rollback("000000002");
                allFiles = this.listAllBaseFilesInPath((HoodieTable)hoodieTable);
                List remainingFiles = allFiles.stream().filter(file -> file.getPath().getName().contains("_000000002")).map(fileStatus -> fileStatus.getPath().toString()).collect(Collectors.toList());
                org.junit.jupiter.api.Assertions.assertEquals((int)0, (int)remainingFiles.size(), (String)("These files should have been rolled-back when rolling back commit 000000002 but are still remaining. Files: " + remainingFiles));
                inputPaths = tableView.getLatestBaseFiles().map(baseFile -> new Path(baseFile.getPath()).getParent().toString()).collect(Collectors.toList());
                recordsRead = HoodieMergeOnReadTestUtils.getRecordsUsingInputFormat((StorageConfiguration)this.storageConf(), inputPaths, (String)this.basePath());
                org.junit.jupiter.api.Assertions.assertEquals((int)200, (int)recordsRead.size());
            }
            String commitTime2 = "000000003";
            try (SparkRDDWriteClient thirdClient = this.getHoodieWriteClient(this.getHoodieWriteConfigWithSmallFileHandlingOff(true));){
                thirdClient.startCommitWithTime("000000003");
                List copyOfRecords = new ArrayList(records);
                copyOfRecords = dataGen.generateUpdates("000000003", copyOfRecords);
                copyOfRecords.addAll(dataGen.generateInserts("000000003", Integer.valueOf(200)));
                List inputPaths = tableView.getLatestBaseFiles().map(baseFile -> new Path(baseFile.getPath()).getParent().toString()).collect(Collectors.toList());
                List recordsRead = HoodieMergeOnReadTestUtils.getRecordsUsingInputFormat((StorageConfiguration)this.storageConf(), inputPaths, (String)this.basePath());
                org.junit.jupiter.api.Assertions.assertEquals((int)200, (int)recordsRead.size());
                writeRecords = this.jsc().parallelize(copyOfRecords, 1);
                writeStatusJavaRDD = thirdClient.upsert(writeRecords, "000000003");
                statuses = writeStatusJavaRDD.collect();
                Assertions.assertNoWriteErrors((List)statuses);
                thirdClient.rollback("000000003");
                allFiles = this.listAllBaseFilesInPath((HoodieTable)hoodieTable);
                List remainingFiles = allFiles.stream().filter(file -> file.getPath().getName().contains("_000000003")).map(fileStatus -> fileStatus.getPath().toString()).collect(Collectors.toList());
                org.junit.jupiter.api.Assertions.assertEquals((int)0, (int)remainingFiles.size(), (String)("These files should have been rolled-back when rolling back commit 000000003 but are still remaining. Files: " + remainingFiles));
                metaClient = HoodieTableMetaClient.reload((HoodieTableMetaClient)metaClient);
                hoodieTable = HoodieSparkTable.create((HoodieWriteConfig)cfg, (HoodieEngineContext)this.context(), (HoodieTableMetaClient)metaClient);
                tableView = this.getHoodieTableFileSystemView(metaClient, hoodieTable.getCompletedCommitsTimeline(), allFiles);
                inputPaths = tableView.getLatestBaseFiles().map(baseFile -> new Path(baseFile.getPath()).getParent().toString()).collect(Collectors.toList());
                recordsRead = HoodieMergeOnReadTestUtils.getRecordsUsingInputFormat((StorageConfiguration)this.storageConf(), inputPaths, (String)this.basePath());
                org.junit.jupiter.api.Assertions.assertEquals((int)200, (int)recordsRead.size());
                newCommitTime = "000000004";
                thirdClient.startCommitWithTime(newCommitTime);
                writeStatusJavaRDD = thirdClient.upsert(writeRecords, newCommitTime);
                statuses = writeStatusJavaRDD.collect();
                Assertions.assertNoWriteErrors((List)statuses);
                thirdClient.commit(newCommitTime, (Object)this.jsc().parallelize(statuses));
                metaClient = HoodieTableMetaClient.reload((HoodieTableMetaClient)metaClient);
                String compactionInstantTime = thirdClient.scheduleCompaction(Option.empty()).get().toString();
                thirdClient.compact(compactionInstantTime);
                metaClient = HoodieTableMetaClient.reload((HoodieTableMetaClient)metaClient);
                String compactedCommitTime = ((HoodieInstant)metaClient.getActiveTimeline().reload().lastInstant().get()).requestedTime();
                org.junit.jupiter.api.Assertions.assertTrue((boolean)this.listAllBaseFilesInPath((HoodieTable)hoodieTable).stream().anyMatch(file -> compactedCommitTime.equals(new HoodieBaseFile(file).getCommitTime())));
                hoodieTable.rollbackInflightCompaction(HoodieTestUtils.INSTANT_GENERATOR.createNewInstant(HoodieInstant.State.INFLIGHT, "compaction", compactedCommitTime));
                allFiles = this.listAllBaseFilesInPath((HoodieTable)hoodieTable);
                metaClient = HoodieTableMetaClient.reload((HoodieTableMetaClient)metaClient);
                tableView = this.getHoodieTableFileSystemView(metaClient, metaClient.getCommitsTimeline(), allFiles);
                org.junit.jupiter.api.Assertions.assertFalse((boolean)tableView.getLatestBaseFiles().anyMatch(file -> compactedCommitTime.equals(file.getCommitTime())));
                org.junit.jupiter.api.Assertions.assertAll(tableView.getLatestBaseFiles().map(file -> () -> org.junit.jupiter.api.Assertions.assertNotEquals((Object)compactedCommitTime, (Object)file.getCommitTime())));
            }
        }
    }

    @Test
    void testMultiRollbackWithDeltaAndCompactionCommit() throws Exception {
        boolean populateMetaFields = true;
        HoodieWriteConfig.Builder cfgBuilder = this.getConfigBuilder(false).withMarkersType(MarkerType.DIRECT.name());
        this.addConfigsForPopulateMetaFields(cfgBuilder, populateMetaFields);
        HoodieWriteConfig cfg = cfgBuilder.build();
        Properties properties = this.getPropertiesForKeyGen(populateMetaFields);
        properties.setProperty(HoodieTableConfig.BASE_FILE_FORMAT.key(), ((HoodieFileFormat)HoodieTableConfig.BASE_FILE_FORMAT.defaultValue()).toString());
        HoodieTableMetaClient metaClient = this.getHoodieMetaClient(HoodieTableType.MERGE_ON_READ, properties);
        try (SparkRDDWriteClient client = this.getHoodieWriteClient(cfg);){
            HoodieTestDataGenerator dataGen = new HoodieTestDataGenerator();
            String newCommitTime = "001";
            client.startCommitWithTime(newCommitTime);
            List records = dataGen.generateInserts(newCommitTime, Integer.valueOf(200));
            JavaRDD writeRecords = this.jsc().parallelize(records, 1);
            JavaRDD writeStatusJavaRDD = client.upsert(writeRecords, newCommitTime);
            List statuses = writeStatusJavaRDD.collect();
            Assertions.assertNoWriteErrors((List)statuses);
            client.commit(newCommitTime, (Object)this.jsc().parallelize(statuses));
            client.close();
            Option instantCommitMetadataPairOpt = metaClient.getActiveTimeline().getLastCommitMetadataWithValidData();
            org.junit.jupiter.api.Assertions.assertTrue((boolean)instantCommitMetadataPairOpt.isPresent());
            HoodieInstant commitInstant = (HoodieInstant)((Pair)instantCommitMetadataPairOpt.get()).getKey();
            org.junit.jupiter.api.Assertions.assertEquals((Object)"001", (Object)commitInstant.requestedTime());
            org.junit.jupiter.api.Assertions.assertEquals((Object)"deltacommit", (Object)commitInstant.getAction());
            org.junit.jupiter.api.Assertions.assertEquals((long)200L, (long)this.getTotalRecordsWritten((HoodieCommitMetadata)((Pair)instantCommitMetadataPairOpt.get()).getValue()));
            Option commit = metaClient.getActiveTimeline().getCommitAndReplaceTimeline().firstInstant();
            org.junit.jupiter.api.Assertions.assertFalse((boolean)commit.isPresent());
            HoodieSparkTable hoodieTable = HoodieSparkTable.create((HoodieWriteConfig)cfg, (HoodieEngineContext)this.context(), (HoodieTableMetaClient)metaClient);
            List allFiles = this.listAllBaseFilesInPath((HoodieTable)hoodieTable);
            HoodieTableFileSystemView tableView = this.getHoodieTableFileSystemView(metaClient, metaClient.getCommitTimeline().filterCompletedInstants(), allFiles);
            Stream dataFilesToRead = tableView.getLatestBaseFiles();
            org.junit.jupiter.api.Assertions.assertFalse((boolean)dataFilesToRead.findAny().isPresent());
            tableView = this.getHoodieTableFileSystemView(metaClient, hoodieTable.getCompletedCommitsTimeline(), allFiles);
            dataFilesToRead = tableView.getLatestBaseFiles();
            org.junit.jupiter.api.Assertions.assertTrue((boolean)dataFilesToRead.findAny().isPresent(), (String)"Should list the base files we wrote in the delta commit");
            newCommitTime = "002";
            HoodieWriteConfig smallFileWriteConfig = this.getHoodieWriteConfigWithSmallFileHandlingOffBuilder(populateMetaFields).withMarkersType(MarkerType.DIRECT.name()).build();
            try (SparkRDDWriteClient nClient = this.getHoodieWriteClient(smallFileWriteConfig);){
                nClient.startCommitWithTime(newCommitTime);
                List copyOfRecords = new ArrayList(records);
                copyOfRecords = dataGen.generateUpdates(newCommitTime, copyOfRecords);
                copyOfRecords.addAll(dataGen.generateInserts(newCommitTime, Integer.valueOf(200)));
                List dataFiles = tableView.getLatestBaseFiles().map(baseFile -> new Path(baseFile.getPath()).getParent().toString()).collect(Collectors.toList());
                List recordsRead = HoodieMergeOnReadTestUtils.getRecordsUsingInputFormat((StorageConfiguration)this.storageConf(), dataFiles, (String)this.basePath());
                org.junit.jupiter.api.Assertions.assertEquals((int)200, (int)recordsRead.size());
                statuses = nClient.upsert(this.jsc().parallelize(copyOfRecords, 1), newCommitTime).collect();
                Assertions.assertNoWriteErrors((List)statuses);
                nClient.commit(newCommitTime, (Object)this.jsc().parallelize(statuses));
                copyOfRecords.clear();
            }
            newCommitTime = "003";
            client.startCommitWithTime(newCommitTime);
            List newInserts = dataGen.generateInserts(newCommitTime, Integer.valueOf(100));
            records = dataGen.generateUpdates(newCommitTime, records);
            records.addAll(newInserts);
            writeRecords = this.jsc().parallelize(records, 1);
            writeStatusJavaRDD = client.upsert(writeRecords, newCommitTime);
            statuses = writeStatusJavaRDD.collect();
            Assertions.assertNoWriteErrors((List)statuses);
            client.commit(newCommitTime, (Object)this.jsc().parallelize(statuses));
            metaClient = HoodieTableMetaClient.reload((HoodieTableMetaClient)metaClient);
            String compactionInstantTime = "004";
            client.scheduleCompactionAtInstant(compactionInstantTime, Option.empty());
            newCommitTime = "005";
            client.startCommitWithTime(newCommitTime);
            records = dataGen.generateUpdates(newCommitTime, records);
            writeRecords = this.jsc().parallelize(records, 1);
            writeStatusJavaRDD = client.upsert(writeRecords, newCommitTime);
            statuses = writeStatusJavaRDD.collect();
            Assertions.assertNoWriteErrors((List)statuses);
            client.commit(newCommitTime, (Object)this.jsc().parallelize(statuses));
            metaClient = HoodieTableMetaClient.reload((HoodieTableMetaClient)metaClient);
            compactionInstantTime = "006";
            client.scheduleCompactionAtInstant(compactionInstantTime, Option.empty());
            HoodieWriteMetadata compactionMetadata = client.compact(compactionInstantTime);
            client.commitCompaction(compactionInstantTime, (HoodieCommitMetadata)compactionMetadata.getCommitMetadata().get(), Option.empty());
            allFiles = this.listAllBaseFilesInPath((HoodieTable)hoodieTable);
            metaClient = HoodieTableMetaClient.reload((HoodieTableMetaClient)metaClient);
            tableView = this.getHoodieTableFileSystemView(metaClient, metaClient.getCommitsTimeline(), allFiles);
            String compactedCommitTime = ((HoodieInstant)metaClient.getActiveTimeline().reload().getCommitsTimeline().lastInstant().get()).requestedTime();
            org.junit.jupiter.api.Assertions.assertTrue((boolean)tableView.getLatestBaseFiles().anyMatch(file -> compactedCommitTime.equals(file.getCommitTime())));
            newCommitTime = "007";
            client.startCommitWithTime(newCommitTime);
            List copyOfRecords = new ArrayList(records);
            copyOfRecords = dataGen.generateUpdates(newCommitTime, copyOfRecords);
            copyOfRecords.addAll(dataGen.generateInserts(newCommitTime, Integer.valueOf(200)));
            statuses = client.upsert(this.jsc().parallelize(copyOfRecords, 1), newCommitTime).collect();
            Assertions.assertNoWriteErrors((List)statuses);
            client.commit(newCommitTime, (Object)this.jsc().parallelize(statuses));
            copyOfRecords.clear();
            client.restoreToInstant("003", cfg.isMetadataTableEnabled());
            metaClient.reloadActiveTimeline();
            allFiles = this.listAllBaseFilesInPath((HoodieTable)hoodieTable);
            tableView = this.getHoodieTableFileSystemView(metaClient, metaClient.reloadActiveTimeline().getCommitsTimeline().filterCompletedInstants(), allFiles);
            dataFilesToRead = tableView.getLatestBaseFiles();
            org.junit.jupiter.api.Assertions.assertFalse((boolean)dataFilesToRead.filter(hoodieBaseFile -> hoodieBaseFile.getCommitTime().compareTo("003") > 0).findAny().isPresent());
            HoodieTableFileSystemView rtView = this.getHoodieTableFileSystemView(metaClient, metaClient.getCommitTimeline().filterCompletedInstants(), allFiles);
            List fileGroups = rtView.getAllFileGroups().collect(Collectors.toList());
            org.junit.jupiter.api.Assertions.assertFalse((boolean)fileGroups.isEmpty());
            org.junit.jupiter.api.Assertions.assertFalse((boolean)fileGroups.stream().filter(fileGroup -> fileGroup.getAllFileSlices().map(fileSlice -> fileSlice.getBaseInstantTime().compareTo("003") > 0).count() > 0L).findAny().isPresent());
        }
    }

    @Test
    void testRestoreWithCleanedUpCommits() throws Exception {
        boolean populateMetaFields = true;
        HoodieWriteConfig.Builder cfgBuilder = this.getConfigBuilder(false).withMarkersType(MarkerType.DIRECT.name());
        this.addConfigsForPopulateMetaFields(cfgBuilder, populateMetaFields);
        HoodieWriteConfig cfg = cfgBuilder.build();
        Properties properties = populateMetaFields ? new Properties() : this.getPropertiesForKeyGen();
        properties.setProperty(HoodieTableConfig.BASE_FILE_FORMAT.key(), ((HoodieFileFormat)HoodieTableConfig.BASE_FILE_FORMAT.defaultValue()).toString());
        HoodieTableMetaClient metaClient = this.getHoodieMetaClient(HoodieTableType.MERGE_ON_READ, properties);
        try (SparkRDDWriteClient client = this.getHoodieWriteClient(cfg);){
            HoodieTestDataGenerator dataGen = new HoodieTestDataGenerator();
            String newCommitTime = "001";
            client.startCommitWithTime(newCommitTime);
            List records = dataGen.generateInserts(newCommitTime, Integer.valueOf(200));
            JavaRDD writeRecords = this.jsc().parallelize(records, 1);
            JavaRDD writeStatusJavaRDD = client.upsert(writeRecords, newCommitTime);
            List statuses = writeStatusJavaRDD.collect();
            Assertions.assertNoWriteErrors((List)statuses);
            client.commit(newCommitTime, (Object)this.jsc().parallelize(statuses));
            this.upsertRecords(client, "002", records, dataGen);
            client.savepoint("002", "user1", "comment1");
            this.upsertRecords(client, "003", records, dataGen);
            this.upsertRecords(client, "004", records, dataGen);
            String compactionInstantTime = "006";
            client.scheduleCompactionAtInstant(compactionInstantTime, Option.empty());
            HoodieWriteMetadata compactionMetadata = client.compact(compactionInstantTime);
            client.commitCompaction(compactionInstantTime, (HoodieCommitMetadata)compactionMetadata.getCommitMetadata().get(), Option.empty());
            this.upsertRecords(client, "007", records, dataGen);
            this.upsertRecords(client, "008", records, dataGen);
            String compactionInstantTime1 = "009";
            client.scheduleCompactionAtInstant(compactionInstantTime1, Option.empty());
            HoodieWriteMetadata compactionMetadata1 = client.compact(compactionInstantTime1);
            client.commitCompaction(compactionInstantTime1, (HoodieCommitMetadata)compactionMetadata1.getCommitMetadata().get(), Option.empty());
            this.upsertRecords(client, "010", records, dataGen);
            cfgBuilder = this.getConfigBuilder(false).withCleanConfig(HoodieCleanConfig.newBuilder().retainCommits(1).build()).withMarkersType(MarkerType.DIRECT.name());
            this.addConfigsForPopulateMetaFields(cfgBuilder, populateMetaFields);
            HoodieWriteConfig cfg1 = cfgBuilder.build();
            try (SparkRDDWriteClient client1 = this.getHoodieWriteClient(cfg1);){
                client1.clean();
            }
            metaClient = HoodieTableMetaClient.reload((HoodieTableMetaClient)metaClient);
            this.upsertRecords(client, "011", records, dataGen);
            client.restoreToInstant("002", cfg.isMetadataTableEnabled());
            HoodieSparkTable hoodieTable = HoodieSparkTable.create((HoodieWriteConfig)cfg, (HoodieEngineContext)this.context(), (HoodieTableMetaClient)metaClient);
            List allFiles = this.listAllBaseFilesInPath((HoodieTable)hoodieTable);
            HoodieTableFileSystemView tableView = this.getHoodieTableFileSystemView(metaClient, metaClient.getCommitTimeline().filterCompletedInstants(), allFiles);
            Stream dataFilesToRead = tableView.getLatestBaseFiles();
            org.junit.jupiter.api.Assertions.assertFalse((boolean)dataFilesToRead.anyMatch(file -> InstantComparison.compareTimestamps((String)"002", (BiPredicate)InstantComparison.GREATER_THAN, (String)file.getCommitTime())));
            client.deleteSavepoint("002");
            org.junit.jupiter.api.Assertions.assertFalse((boolean)metaClient.reloadActiveTimeline().getSavePointTimeline().containsInstant("002"));
        }
    }

    private void upsertRecords(SparkRDDWriteClient client, String commitTime, List<HoodieRecord> records, HoodieTestDataGenerator dataGen) throws IOException {
        client.startCommitWithTime(commitTime);
        List<Object> copyOfRecords = new ArrayList<HoodieRecord>(records);
        copyOfRecords = dataGen.generateUpdates(commitTime, copyOfRecords);
        List statuses = client.upsert(this.jsc().parallelize(copyOfRecords, 1), commitTime).collect();
        Assertions.assertNoWriteErrors((List)statuses);
        client.commit(commitTime, (Object)this.jsc().parallelize(statuses));
    }

    private long getTotalRecordsWritten(HoodieCommitMetadata commitMetadata) {
        return commitMetadata.getPartitionToWriteStats().values().stream().flatMap(Collection::stream).map(stat -> stat.getNumWrites() + stat.getNumUpdateWrites()).reduce(0L, Long::sum);
    }

    @ParameterizedTest
    @ValueSource(booleans={true, false})
    void testMORTableRestore(boolean restoreAfterCompaction) throws Exception {
        HoodieWriteConfig.Builder cfgBuilder = this.getConfigBuilder(false).withMarkersType(MarkerType.DIRECT.name());
        HoodieWriteConfig cfg = cfgBuilder.build();
        Properties properties = this.getPropertiesForKeyGen(true);
        properties.putAll((Map<?, ?>)cfg.getProps());
        properties.setProperty(HoodieTableConfig.BASE_FILE_FORMAT.key(), ((HoodieFileFormat)HoodieTableConfig.BASE_FILE_FORMAT.defaultValue()).toString());
        HoodieTableMetaClient metaClient = this.getHoodieMetaClient(HoodieTableType.MERGE_ON_READ, properties);
        try (SparkRDDWriteClient client = this.getHoodieWriteClient(cfg);){
            HoodieTestDataGenerator dataGen = new HoodieTestDataGenerator();
            List<HoodieRecord> records = this.insertAndGetRecords("001", client, dataGen, 200);
            List<HoodieRecord> updates1 = this.updateAndGetRecords("002", client, dataGen, records);
            List<HoodieRecord> updates2 = this.updateAndGetRecords("003", client, dataGen, records);
            List<HoodieRecord> updates3 = this.updateAndGetRecords("004", client, dataGen, records);
            this.validateRecords(cfg, metaClient, updates3);
            if (!restoreAfterCompaction) {
                client.restoreToInstant("002", cfg.isMetadataTableEnabled());
                this.validateRecords(cfg, metaClient, updates1);
            } else {
                metaClient = HoodieTableMetaClient.reload((HoodieTableMetaClient)metaClient);
                String compactionInstantTime = "005";
                client.scheduleCompactionAtInstant(compactionInstantTime, Option.empty());
                HoodieWriteMetadata compactionMetadata = client.compact(compactionInstantTime);
                client.commitCompaction(compactionInstantTime, (HoodieCommitMetadata)compactionMetadata.getCommitMetadata().get(), Option.empty());
                this.validateRecords(cfg, metaClient, updates3);
                List<HoodieRecord> updates4 = this.updateAndGetRecords("006", client, dataGen, records);
                List<HoodieRecord> updates5 = this.updateAndGetRecords("007", client, dataGen, records);
                this.validateRecords(cfg, metaClient, updates5);
                client.restoreToInstant("003", cfg.isMetadataTableEnabled());
                this.validateRecords(cfg, metaClient, updates2);
            }
        }
    }

    private List<HoodieRecord> insertAndGetRecords(String newCommitTime, SparkRDDWriteClient client, HoodieTestDataGenerator dataGen, int count) {
        client.startCommitWithTime(newCommitTime);
        List records = dataGen.generateInserts(newCommitTime, Integer.valueOf(count));
        JavaRDD writeRecords = this.jsc().parallelize(records, 1);
        JavaRDD writeStatusJavaRDD = client.upsert(writeRecords, newCommitTime);
        client.commit(newCommitTime, (Object)writeStatusJavaRDD);
        return records;
    }

    private List<HoodieRecord> updateAndGetRecords(String newCommitTime, SparkRDDWriteClient client, HoodieTestDataGenerator dataGen, List<HoodieRecord> records) throws IOException {
        client.startCommitWithTime(newCommitTime);
        List updates = dataGen.generateUpdates(newCommitTime, records);
        JavaRDD writeStatusJavaRDD = client.upsert(this.jsc().parallelize(updates, 1), newCommitTime);
        client.commit(newCommitTime, (Object)writeStatusJavaRDD);
        return updates;
    }

    private void validateRecords(HoodieWriteConfig cfg, HoodieTableMetaClient metaClient, List<HoodieRecord> expectedRecords) throws IOException {
        HoodieSparkTable hoodieTable = HoodieSparkTable.create((HoodieWriteConfig)cfg, (HoodieEngineContext)this.context(), (HoodieTableMetaClient)metaClient);
        List allFiles = this.listAllBaseFilesInPath((HoodieTable)hoodieTable);
        HoodieTableFileSystemView tableView = this.getHoodieTableFileSystemView(metaClient, hoodieTable.getCompletedCommitsTimeline(), allFiles);
        List inputPaths = tableView.getLatestBaseFiles().map(hf -> new Path(hf.getPath()).getParent().toString()).collect(Collectors.toList());
        List recordsRead = HoodieMergeOnReadTestUtils.getRecordsUsingInputFormat((StorageConfiguration)this.storageConf(), inputPaths, (String)this.basePath());
        this.assertRecords(expectedRecords, recordsRead);
    }

    private void assertRecords(List<HoodieRecord> inputRecords, List<GenericRecord> recordsRead) {
        org.junit.jupiter.api.Assertions.assertEquals((int)recordsRead.size(), (int)inputRecords.size());
        HashMap expectedRecords = new HashMap();
        inputRecords.forEach(entry -> {
            try {
                expectedRecords.put(entry.getRecordKey(), (GenericRecord)((HoodieRecordPayload)entry.getData()).getInsertValue(HoodieTestDataGenerator.AVRO_SCHEMA).get());
            }
            catch (IOException e) {
                e.printStackTrace();
            }
        });
        HashMap actualRecords = new HashMap();
        recordsRead.forEach(entry -> actualRecords.put(String.valueOf(entry.get("_row_key")), entry));
        for (Map.Entry entry2 : expectedRecords.entrySet()) {
            org.junit.jupiter.api.Assertions.assertEquals((Object)String.valueOf(((GenericRecord)entry2.getValue()).get("driver")), (Object)String.valueOf(((GenericRecord)actualRecords.get(entry2.getKey())).get("driver")));
        }
    }

    private HoodieWriteConfig getHoodieWriteConfigWithSmallFileHandlingOff(boolean populateMetaFields) {
        return this.getHoodieWriteConfigWithSmallFileHandlingOffBuilder(populateMetaFields).build();
    }

    private HoodieWriteConfig.Builder getHoodieWriteConfigWithSmallFileHandlingOffBuilder(boolean populateMetaFields) {
        HoodieWriteConfig.Builder cfgBuilder = HoodieWriteConfig.newBuilder().withPath(this.basePath()).withSchema("{\"type\": \"record\",\"name\": \"triprec\",\"fields\": [ {\"name\": \"timestamp\",\"type\": \"long\"},{\"name\": \"_row_key\", \"type\": \"string\"},{\"name\": \"partition_path\", \"type\": [\"null\", \"string\"], \"default\": null },{\"name\": \"trip_type\", \"type\": {\"type\": \"enum\", \"name\": \"TripType\", \"symbols\": [\"UNKNOWN\", \"UBERX\", \"BLACK\"], \"default\": \"UNKNOWN\"}},{\"name\": \"rider\", \"type\": \"string\"},{\"name\": \"driver\", \"type\": \"string\"},{\"name\": \"begin_lat\", \"type\": \"double\"},{\"name\": \"begin_lon\", \"type\": \"double\"},{\"name\": \"end_lat\", \"type\": \"double\"},{\"name\": \"end_lon\", \"type\": \"double\"},{\"name\": \"distance_in_meters\", \"type\": \"int\"},{\"name\": \"seconds_since_epoch\", \"type\": \"long\"},{\"name\": \"weight\", \"type\": \"float\"},{\"name\": \"nation\", \"type\": \"bytes\"},{\"name\":\"current_date\",\"type\": {\"type\": \"int\", \"logicalType\": \"date\"}},{\"name\":\"current_ts\",\"type\": {\"type\": \"long\"}},{\"name\":\"height\",\"type\":{\"type\":\"fixed\",\"name\":\"abc\",\"size\":5,\"logicalType\":\"decimal\",\"precision\":10,\"scale\":6}},{\"name\": \"city_to_state\", \"type\": {\"type\": \"map\", \"values\": \"string\"}},{\"name\": \"fare\",\"type\": {\"type\":\"record\", \"name\":\"fare\",\"fields\": [{\"name\": \"amount\",\"type\": \"double\"},{\"name\": \"currency\", \"type\": \"string\"}]}},{\"name\": \"tip_history\", \"default\": [], \"type\": {\"type\": \"array\", \"default\": [], \"items\": {\"type\": \"record\", \"default\": null, \"name\": \"tip_history\", \"fields\": [{\"name\": \"amount\", \"type\": \"double\"}, {\"name\": \"currency\", \"type\": \"string\"}]}}},{\"name\": \"_hoodie_is_deleted\", \"type\": \"boolean\", \"default\": false} ]}").withParallelism(2, 2).withDeleteParallelism(2).withAutoCommit(false).withCompactionConfig(HoodieCompactionConfig.newBuilder().compactionSmallFileSize(1024L).withInlineCompaction(Boolean.valueOf(false)).withMaxNumDeltaCommitsBeforeCompaction(1).build()).withFileSystemViewConfig(FileSystemViewStorageConfig.newBuilder().withRemoteServerPort(Integer.valueOf(timelineServicePort)).build()).withEmbeddedTimelineServerEnabled(true).withStorageConfig(HoodieStorageConfig.newBuilder().hfileMaxFileSize(1024L).parquetMaxFileSize(1024L).build()).forTable("test-trip-table");
        if (!populateMetaFields) {
            this.addConfigsForPopulateMetaFields(cfgBuilder, false);
        }
        return cfgBuilder;
    }

    @ParameterizedTest
    @ValueSource(booleans={true, false})
    void testInsertsGeneratedIntoLogFilesRollback(boolean rollbackUsingMarkers) throws Exception {
        Properties properties = new Properties();
        properties.setProperty(HoodieTableConfig.BASE_FILE_FORMAT.key(), ((HoodieFileFormat)HoodieTableConfig.BASE_FILE_FORMAT.defaultValue()).toString());
        HoodieTableMetaClient metaClient = this.getHoodieMetaClient(HoodieTableType.MERGE_ON_READ, properties);
        HoodieTestDataGenerator dataGen = new HoodieTestDataGenerator();
        HoodieWriteConfig config = this.getConfigBuilder(false, rollbackUsingMarkers, HoodieIndex.IndexType.INMEMORY).build();
        try (SparkRDDWriteClient writeClient = this.getHoodieWriteClient(config);){
            String newCommitTime = "100";
            writeClient.startCommitWithTime(newCommitTime);
            List records = dataGen.generateInserts(newCommitTime, Integer.valueOf(100));
            JavaRDD recordsRDD = this.jsc().parallelize(records, 1);
            List writeStatuses = writeClient.insert(recordsRDD, newCommitTime).collect();
            org.junit.jupiter.api.Assertions.assertEquals((long)0L, (long)writeStatuses.stream().filter(writeStatus -> !writeStatus.getStat().getPath().contains("log")).count());
            org.junit.jupiter.api.Assertions.assertTrue((boolean)writeStatuses.stream().anyMatch(writeStatus -> writeStatus.getStat().getPath().contains("log")));
            boolean rollback = writeClient.rollback(newCommitTime);
            org.junit.jupiter.api.Assertions.assertTrue((boolean)rollback);
            newCommitTime = "101";
            writeClient.startCommitWithTime(newCommitTime);
            records = dataGen.generateInserts(newCommitTime, Integer.valueOf(100));
            recordsRDD = this.jsc().parallelize(records, 1);
            writeClient.insert(recordsRDD, newCommitTime).collect();
            Thread.sleep(1000L);
            String lastCommitTime = newCommitTime;
            java.nio.file.Path tempFolder = Files.createTempDirectory(((Object)((Object)this)).getClass().getCanonicalName(), new FileAttribute[0]);
            HashMap<String, String> fileNameMap = new HashMap<String, String>();
            for (HoodieInstant.State state : Arrays.asList(HoodieInstant.State.REQUESTED, HoodieInstant.State.INFLIGHT)) {
                HoodieInstant toCopy = HoodieTestUtils.INSTANT_GENERATOR.createNewInstant(state, "deltacommit", lastCommitTime);
                File file = Files.createTempFile(tempFolder, null, null, new FileAttribute[0]).toFile();
                this.fs().copyToLocalFile(new Path(metaClient.getTimelinePath().toString(), HoodieTestUtils.INSTANT_FILE_NAME_GENERATOR.getFileName(toCopy)), new Path(file.getAbsolutePath()));
                fileNameMap.put(file.getAbsolutePath(), HoodieTestUtils.INSTANT_FILE_NAME_GENERATOR.getFileName(toCopy));
            }
            Path markerDir = new Path(Files.createTempDirectory(tempFolder, null, new FileAttribute[0]).toAbsolutePath().toString());
            if (rollbackUsingMarkers) {
                this.fs().copyToLocalFile(new Path(metaClient.getMarkerFolderPath(lastCommitTime)), markerDir);
            }
            writeClient.rollback(newCommitTime);
            metaClient = HoodieTableMetaClient.reload((HoodieTableMetaClient)metaClient);
            HoodieSparkTable table = HoodieSparkTable.create((HoodieWriteConfig)config, (HoodieEngineContext)this.context());
            TableFileSystemView.SliceView tableRTFileSystemView = table.getSliceView();
            long numLogFiles = 0L;
            for (String partitionPath : dataGen.getPartitionPaths()) {
                org.junit.jupiter.api.Assertions.assertTrue((boolean)tableRTFileSystemView.getLatestFileSlices(partitionPath).noneMatch(fileSlice -> fileSlice.getBaseFile().isPresent()));
                org.junit.jupiter.api.Assertions.assertTrue((boolean)tableRTFileSystemView.getLatestFileSlices(partitionPath).noneMatch(fileSlice -> fileSlice.getLogFiles().count() > 0L));
                numLogFiles += tableRTFileSystemView.getLatestFileSlices(partitionPath).filter(fileSlice -> fileSlice.getLogFiles().count() > 0L).count();
            }
            org.junit.jupiter.api.Assertions.assertEquals((long)0L, (long)numLogFiles);
            for (Map.Entry entry : fileNameMap.entrySet()) {
                try {
                    this.fs().copyFromLocalFile(new Path((String)entry.getKey()), new Path(metaClient.getTimelinePath().toString(), (String)entry.getValue()));
                }
                catch (IOException e) {
                    throw new HoodieIOException("Error copying state from local disk.", e);
                }
            }
            if (rollbackUsingMarkers) {
                this.fs().copyFromLocalFile(new Path(markerDir, lastCommitTime), new Path(metaClient.getMarkerFolderPath(lastCommitTime)));
            }
            Thread.sleep(1000L);
            writeClient.rollback(newCommitTime);
        }
    }

    @ParameterizedTest
    @ValueSource(booleans={true, false})
    void testInsertsGeneratedIntoLogFilesRollbackAfterCompaction(boolean rollbackUsingMarkers) throws Exception {
        Properties properties = new Properties();
        properties.setProperty(HoodieTableConfig.BASE_FILE_FORMAT.key(), ((HoodieFileFormat)HoodieTableConfig.BASE_FILE_FORMAT.defaultValue()).toString());
        HoodieTableMetaClient metaClient = this.getHoodieMetaClient(HoodieTableType.MERGE_ON_READ, properties);
        HoodieTestDataGenerator dataGen = new HoodieTestDataGenerator();
        HoodieWriteConfig config = this.getConfigBuilder(false, rollbackUsingMarkers, HoodieIndex.IndexType.INMEMORY).build();
        try (SparkRDDWriteClient writeClient = this.getHoodieWriteClient(config);){
            String newCommitTime = "100";
            writeClient.startCommitWithTime(newCommitTime);
            List records = dataGen.generateInserts(newCommitTime, Integer.valueOf(100));
            JavaRDD recordsRDD = this.jsc().parallelize(records, 1);
            JavaRDD statuses = writeClient.insert(recordsRDD, newCommitTime);
            writeClient.commit(newCommitTime, (Object)statuses);
            metaClient = HoodieTableMetaClient.reload((HoodieTableMetaClient)metaClient);
            HoodieSparkTable table = HoodieSparkTable.create((HoodieWriteConfig)config, (HoodieEngineContext)this.context(), (HoodieTableMetaClient)metaClient);
            table.getHoodieView().sync();
            TableFileSystemView.SliceView tableRTFileSystemView = table.getSliceView();
            long numLogFiles = 0L;
            for (String partitionPath : dataGen.getPartitionPaths()) {
                org.junit.jupiter.api.Assertions.assertTrue((boolean)tableRTFileSystemView.getLatestFileSlices(partitionPath).noneMatch(fileSlice -> fileSlice.getBaseFile().isPresent()));
                org.junit.jupiter.api.Assertions.assertTrue((boolean)tableRTFileSystemView.getLatestFileSlices(partitionPath).anyMatch(fileSlice -> fileSlice.getLogFiles().count() > 0L));
                numLogFiles += tableRTFileSystemView.getLatestFileSlices(partitionPath).filter(fileSlice -> fileSlice.getLogFiles().count() > 0L).count();
            }
            org.junit.jupiter.api.Assertions.assertTrue((numLogFiles > 0L ? 1 : 0) != 0);
            newCommitTime = writeClient.scheduleCompaction(Option.empty()).get().toString();
            HoodieWriteMetadata compactionMetadata = writeClient.compact(newCommitTime);
            statuses = (JavaRDD)compactionMetadata.getWriteStatuses();
            String extension = table.getBaseFileExtension();
            Collection stats = ((HoodieCommitMetadata)compactionMetadata.getCommitMetadata().get()).getPartitionToWriteStats().values();
            org.junit.jupiter.api.Assertions.assertEquals((long)numLogFiles, (long)stats.stream().flatMap(Collection::stream).filter(state -> state.getPath().contains(extension)).count());
            org.junit.jupiter.api.Assertions.assertEquals((long)numLogFiles, (long)stats.stream().mapToLong(Collection::size).sum());
            table.getActiveTimeline().reload();
            table.rollbackInflightCompaction(HoodieTestUtils.INSTANT_GENERATOR.createNewInstant(HoodieInstant.State.INFLIGHT, "compaction", newCommitTime));
            metaClient = HoodieTableMetaClient.reload((HoodieTableMetaClient)metaClient);
            table = HoodieSparkTable.create((HoodieWriteConfig)config, (HoodieEngineContext)this.context(), (HoodieTableMetaClient)metaClient);
            tableRTFileSystemView = table.getSliceView();
            ((SyncableFileSystemView)tableRTFileSystemView).reset();
            for (String partitionPath : dataGen.getPartitionPaths()) {
                List fileSlices = this.getFileSystemViewWithUnCommittedSlices(metaClient).getAllFileSlices(partitionPath).filter(fs -> fs.getBaseInstantTime().equals("100")).collect(Collectors.toList());
                org.junit.jupiter.api.Assertions.assertTrue((boolean)fileSlices.stream().noneMatch(fileSlice -> fileSlice.getBaseFile().isPresent()));
                org.junit.jupiter.api.Assertions.assertTrue((boolean)fileSlices.stream().anyMatch(fileSlice -> fileSlice.getLogFiles().count() > 0L));
            }
        }
    }

    @ParameterizedTest
    @ValueSource(booleans={true, false})
    public void testLazyRollbackOfFailedCommit(boolean rollbackUsingMarkers) throws Exception {
        Properties properties = new Properties();
        properties.setProperty(HoodieTableConfig.BASE_FILE_FORMAT.key(), ((HoodieFileFormat)HoodieTableConfig.BASE_FILE_FORMAT.defaultValue()).toString());
        HoodieTableMetaClient metaClient = this.getHoodieMetaClient(HoodieTableType.MERGE_ON_READ, properties);
        HoodieWriteConfig cfg = this.getWriteConfig(true, rollbackUsingMarkers);
        HoodieWriteConfig autoCommitFalseCfg = this.getWriteConfig(false, rollbackUsingMarkers);
        HoodieTestDataGenerator dataGen = new HoodieTestDataGenerator();
        try (SparkRDDWriteClient client = this.getHoodieWriteClient(cfg);
             SparkRDDWriteClient autoCommitFalseClient = this.getHoodieWriteClient(autoCommitFalseCfg);){
            List<HoodieRecord> records = this.insertRecords(client, dataGen, "001");
            List<HoodieRecord> updates1 = this.updateRecords(client, dataGen, "002", records, metaClient, cfg, true);
            List<HoodieRecord> updates2 = this.updateRecords(autoCommitFalseClient, dataGen, "003", records, metaClient, autoCommitFalseCfg, false);
            List<HoodieRecord> updates3 = this.updateRecords(client, dataGen, "004", records, metaClient, cfg, false);
            long numLogFiles = this.getNumLogFilesInLatestFileSlice(metaClient, cfg, dataGen);
            this.doCompaction(autoCommitFalseClient, metaClient, cfg, numLogFiles);
            long numLogFilesAfterCompaction = this.getNumLogFilesInLatestFileSlice(metaClient, cfg, dataGen);
            org.junit.jupiter.api.Assertions.assertNotEquals((long)numLogFiles, (long)numLogFilesAfterCompaction);
            client.rollback("003");
            long numLogFilesAfterRollback = this.getNumLogFilesInLatestFileSlice(metaClient, cfg, dataGen);
            org.junit.jupiter.api.Assertions.assertEquals((long)numLogFilesAfterRollback, (long)numLogFilesAfterCompaction);
        }
    }

    private List<HoodieRecord> insertRecords(SparkRDDWriteClient client, HoodieTestDataGenerator dataGen, String commitTime) {
        client.startCommitWithTime(commitTime);
        List records = dataGen.generateInserts(commitTime, Integer.valueOf(20));
        JavaRDD writeRecords = this.jsc().parallelize(records, 1);
        List statuses = client.upsert(writeRecords, commitTime).collect();
        Assertions.assertNoWriteErrors((List)statuses);
        return records;
    }

    private List<HoodieRecord> updateRecords(SparkRDDWriteClient client, HoodieTestDataGenerator dataGen, String commitTime, List<HoodieRecord> records, HoodieTableMetaClient metaClient, HoodieWriteConfig cfg, boolean assertLogFiles) throws IOException {
        client.startCommitWithTime(commitTime);
        records = dataGen.generateUpdates(commitTime, records);
        JavaRDD writeRecords = this.jsc().parallelize(records, 1);
        List statuses = client.upsert(writeRecords, commitTime).collect();
        Assertions.assertNoWriteErrors((List)statuses);
        if (assertLogFiles) {
            HoodieSparkTable table = HoodieSparkTable.create((HoodieWriteConfig)cfg, (HoodieEngineContext)this.context(), (HoodieTableMetaClient)metaClient);
            table.getHoodieView().sync();
            TableFileSystemView.SliceView tableRTFileSystemView = table.getSliceView();
            long numLogFiles = 0L;
            for (String partitionPath : dataGen.getPartitionPaths()) {
                List allSlices = tableRTFileSystemView.getLatestFileSlices(partitionPath).collect(Collectors.toList());
                org.junit.jupiter.api.Assertions.assertEquals((long)1L, (long)allSlices.stream().filter(fileSlice -> fileSlice.getBaseFile().isPresent()).count());
                org.junit.jupiter.api.Assertions.assertTrue((boolean)allSlices.stream().anyMatch(fileSlice -> fileSlice.getLogFiles().count() > 0L));
                numLogFiles += allSlices.stream().filter(fileSlice -> fileSlice.getLogFiles().count() > 0L).count();
            }
            org.junit.jupiter.api.Assertions.assertTrue((numLogFiles > 0L ? 1 : 0) != 0);
        }
        return records;
    }

    private long doCompaction(SparkRDDWriteClient client, HoodieTableMetaClient metaClient, HoodieWriteConfig cfg, long numLogFiles) {
        String instantTime = client.scheduleCompaction(Option.empty()).get().toString();
        HoodieWriteMetadata compactionMetadata = client.compact(instantTime);
        metaClient.reloadActiveTimeline();
        HoodieSparkTable table = HoodieSparkTable.create((HoodieWriteConfig)cfg, (HoodieEngineContext)this.context(), (HoodieTableMetaClient)metaClient);
        String extension = table.getBaseFileExtension();
        Collection stats = ((HoodieCommitMetadata)compactionMetadata.getCommitMetadata().get()).getPartitionToWriteStats().values();
        org.junit.jupiter.api.Assertions.assertEquals((long)numLogFiles, (long)stats.stream().flatMap(Collection::stream).filter(state -> state.getPath().contains(extension)).count());
        org.junit.jupiter.api.Assertions.assertEquals((long)numLogFiles, (long)stats.stream().mapToLong(Collection::size).sum());
        client.commitCompaction(instantTime, (HoodieCommitMetadata)compactionMetadata.getCommitMetadata().get(), Option.empty());
        return numLogFiles;
    }

    private long getNumLogFilesInLatestFileSlice(HoodieTableMetaClient metaClient, HoodieWriteConfig cfg, HoodieTestDataGenerator dataGen) {
        metaClient.reloadActiveTimeline();
        HoodieSparkTable table = HoodieSparkTable.create((HoodieWriteConfig)cfg, (HoodieEngineContext)this.context(), (HoodieTableMetaClient)metaClient);
        table.getHoodieView().sync();
        TableFileSystemView.SliceView tableRTFileSystemView = table.getSliceView();
        long numLogFiles = 0L;
        for (String partitionPath : dataGen.getPartitionPaths()) {
            List allSlices = tableRTFileSystemView.getLatestFileSlices(partitionPath).collect(Collectors.toList());
            numLogFiles += allSlices.stream().filter(fileSlice -> fileSlice.getLogFiles().count() > 0L).count();
        }
        return numLogFiles;
    }

    private HoodieWriteConfig getWriteConfig(boolean autoCommit, boolean rollbackUsingMarkers) {
        HoodieWriteConfig.Builder cfgBuilder = this.getConfigBuilder(autoCommit).withRollbackUsingMarkers(rollbackUsingMarkers).withCleanConfig(HoodieCleanConfig.newBuilder().withAutoClean(Boolean.valueOf(false)).withFailedWritesCleaningPolicy(HoodieFailedWritesCleaningPolicy.LAZY).build()).withCompactionConfig(HoodieCompactionConfig.newBuilder().compactionSmallFileSize(0x40000000L).withInlineCompaction(Boolean.valueOf(false)).withMaxNumDeltaCommitsBeforeCompaction(3).build());
        return cfgBuilder.build();
    }
}

