/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.table.functional;

import java.io.IOException;
import java.nio.file.Paths;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.hudi.avro.model.HoodieCompactionPlan;
import org.apache.hudi.avro.model.HoodieRollbackPlan;
import org.apache.hudi.client.SparkRDDWriteClient;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.client.transaction.lock.InProcessLockProvider;
import org.apache.hudi.common.config.HoodieMetadataConfig;
import org.apache.hudi.common.config.HoodieReaderConfig;
import org.apache.hudi.common.config.HoodieStorageConfig;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.DefaultHoodieRecordPayload;
import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.model.PartialUpdateAvroPayload;
import org.apache.hudi.common.model.WriteConcurrencyMode;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
import org.apache.hudi.common.util.CompactionUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieCleanConfig;
import org.apache.hudi.config.HoodieCompactionConfig;
import org.apache.hudi.config.HoodieIndexConfig;
import org.apache.hudi.config.HoodieLayoutConfig;
import org.apache.hudi.config.HoodieLockConfig;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieWriteConflictException;
import org.apache.hudi.index.HoodieIndex;
import org.apache.hudi.metadata.HoodieTableMetadata;
import org.apache.hudi.storage.HoodieStorage;
import org.apache.hudi.storage.StorageConfiguration;
import org.apache.hudi.storage.StoragePath;
import org.apache.hudi.storage.StoragePathInfo;
import org.apache.hudi.table.action.commit.SparkBucketIndexPartitioner;
import org.apache.hudi.table.action.rollback.RollbackUtils;
import org.apache.hudi.table.storage.HoodieStorageLayout;
import org.apache.hudi.testutils.Assertions;
import org.apache.hudi.testutils.HoodieMergeOnReadTestUtils;
import org.apache.hudi.testutils.SparkClientFunctionalTestHarness;
import org.apache.spark.api.java.JavaRDD;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.CsvSource;
import org.junit.jupiter.params.provider.MethodSource;

@Tag(value="functional")
public class TestHoodieSparkMergeOnReadTableCompaction
extends SparkClientFunctionalTestHarness {
    private HoodieTestDataGenerator dataGen;
    private SparkRDDWriteClient client;
    private HoodieTableMetaClient metaClient;

    private static Stream<Arguments> writeLogTest() {
        Object[][] data = new Object[][]{{true, true}, {true, false}, {false, true}, {false, false}};
        return Stream.of(data).map(Arguments::of);
    }

    private static Stream<Arguments> writePayloadTest() {
        return Stream.of(Arguments.of((Object[])new Object[]{DefaultHoodieRecordPayload.class.getName(), HoodieIndex.IndexType.BUCKET}), Arguments.of((Object[])new Object[]{DefaultHoodieRecordPayload.class.getName(), HoodieIndex.IndexType.GLOBAL_SIMPLE}), Arguments.of((Object[])new Object[]{PartialUpdateAvroPayload.class.getName(), HoodieIndex.IndexType.BUCKET}), Arguments.of((Object[])new Object[]{PartialUpdateAvroPayload.class.getName(), HoodieIndex.IndexType.GLOBAL_SIMPLE}));
    }

    @BeforeEach
    public void setup() {
        this.dataGen = new HoodieTestDataGenerator();
    }

    @AfterEach
    public void teardown() throws IOException {
        if (this.client != null) {
            this.client.close();
        }
    }

    @ParameterizedTest
    @MethodSource(value={"writePayloadTest"})
    public void testWriteDuringCompaction(String payloadClass, HoodieIndex.IndexType indexType) throws IOException {
        Properties props = this.getPropertiesForKeyGen(true);
        HoodieLayoutConfig layoutConfig = indexType == HoodieIndex.IndexType.BUCKET ? HoodieLayoutConfig.newBuilder().withLayoutType(HoodieStorageLayout.LayoutType.BUCKET.name()).withLayoutPartitioner(SparkBucketIndexPartitioner.class.getName()).build() : HoodieLayoutConfig.newBuilder().build();
        HoodieWriteConfig config = HoodieWriteConfig.newBuilder().forTable("test-trip-table").withPath(this.basePath()).withSchema("{\"type\": \"record\",\"name\": \"triprec\",\"fields\": [ {\"name\": \"timestamp\",\"type\": \"long\"},{\"name\": \"_row_key\", \"type\": \"string\"},{\"name\": \"partition_path\", \"type\": [\"null\", \"string\"], \"default\": null },{\"name\": \"trip_type\", \"type\": {\"type\": \"enum\", \"name\": \"TripType\", \"symbols\": [\"UNKNOWN\", \"UBERX\", \"BLACK\"], \"default\": \"UNKNOWN\"}},{\"name\": \"rider\", \"type\": \"string\"},{\"name\": \"driver\", \"type\": \"string\"},{\"name\": \"begin_lat\", \"type\": \"double\"},{\"name\": \"begin_lon\", \"type\": \"double\"},{\"name\": \"end_lat\", \"type\": \"double\"},{\"name\": \"end_lon\", \"type\": \"double\"},{\"name\": \"distance_in_meters\", \"type\": \"int\"},{\"name\": \"seconds_since_epoch\", \"type\": \"long\"},{\"name\": \"weight\", \"type\": \"float\"},{\"name\": \"nation\", \"type\": \"bytes\"},{\"name\":\"current_date\",\"type\": {\"type\": \"int\", \"logicalType\": \"date\"}},{\"name\":\"current_ts\",\"type\": {\"type\": \"long\"}},{\"name\":\"height\",\"type\":{\"type\":\"fixed\",\"name\":\"abc\",\"size\":5,\"logicalType\":\"decimal\",\"precision\":10,\"scale\":6}},{\"name\": \"city_to_state\", \"type\": {\"type\": \"map\", \"values\": \"string\"}},{\"name\": \"fare\",\"type\": {\"type\":\"record\", \"name\":\"fare\",\"fields\": [{\"name\": \"amount\",\"type\": \"double\"},{\"name\": \"currency\", \"type\": \"string\"}]}},{\"name\": \"tip_history\", \"default\": [], \"type\": {\"type\": \"array\", \"default\": [], \"items\": {\"type\": \"record\", \"default\": null, \"name\": \"tip_history\", \"fields\": [{\"name\": \"amount\", \"type\": \"double\"}, {\"name\": \"currency\", \"type\": \"string\"}]}}},{\"name\": \"_hoodie_is_deleted\", \"type\": \"boolean\", \"default\": false} ]}").withParallelism(2, 2).withAutoCommit(false).withWritePayLoad(payloadClass).withCompactionConfig(HoodieCompactionConfig.newBuilder().withMaxNumDeltaCommitsBeforeCompaction(1).compactionSmallFileSize(0L).build()).withStorageConfig(HoodieStorageConfig.newBuilder().parquetMaxFileSize(1024L).build()).withLayoutConfig(layoutConfig).withIndexConfig(HoodieIndexConfig.newBuilder().fromProperties(props).withIndexType(indexType).withBucketNum("1").build()).build();
        props.putAll((Map<?, ?>)config.getProps());
        this.metaClient = this.getHoodieMetaClient(HoodieTableType.MERGE_ON_READ, props);
        this.client = this.getHoodieWriteClient(config);
        String instant1 = this.client.createNewInstantTime();
        this.writeData(instant1, this.dataGen.generateInserts(instant1, Integer.valueOf(100)), true);
        String instant2 = this.client.createNewInstantTime();
        List updates1 = this.dataGen.generateUpdates(instant2, Integer.valueOf(100));
        List newRecords1 = this.dataGen.generateInserts(instant2, Integer.valueOf(100));
        this.writeData(instant2, Stream.concat(newRecords1.stream(), updates1.stream()).collect(Collectors.toList()), true);
        org.junit.jupiter.api.Assertions.assertEquals((long)200L, (long)this.readTableTotalRecordsNum());
        String compactionTime = (String)this.client.scheduleCompaction(Option.empty()).get();
        String instant3 = this.client.createNewInstantTime();
        List updates2 = this.dataGen.generateUpdates(instant3, Integer.valueOf(200));
        List newRecords2 = this.dataGen.generateInserts(instant3, Integer.valueOf(100));
        List<WriteStatus> writeStatuses = this.writeData(instant3, Stream.concat(newRecords2.stream(), updates2.stream()).collect(Collectors.toList()), false);
        org.junit.jupiter.api.Assertions.assertEquals((long)200L, (long)this.readTableTotalRecordsNum());
        this.client.commitStats(instant3, writeStatuses.stream().map(WriteStatus::getStat).collect(Collectors.toList()), Option.empty(), this.metaClient.getCommitActionType());
        org.junit.jupiter.api.Assertions.assertEquals((long)300L, (long)this.readTableTotalRecordsNum());
        config.setValue(HoodieWriteConfig.AUTO_COMMIT_ENABLE, "true");
        this.client.compact(compactionTime);
        org.junit.jupiter.api.Assertions.assertEquals((long)300L, (long)this.readTableTotalRecordsNum());
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @ParameterizedTest
    @MethodSource(value={"writeLogTest"})
    public void testWriteLogDuringCompaction(boolean enableMetadataTable, boolean enableTimelineServer) throws IOException {
        try {
            this.jsc().hadoopConfiguration().set(HoodieReaderConfig.FILE_GROUP_READER_ENABLED.key(), "false");
            Properties props = this.getPropertiesForKeyGen(true);
            HoodieWriteConfig config = HoodieWriteConfig.newBuilder().forTable("test-trip-table").withPath(this.basePath()).withSchema("{\"type\": \"record\",\"name\": \"triprec\",\"fields\": [ {\"name\": \"timestamp\",\"type\": \"long\"},{\"name\": \"_row_key\", \"type\": \"string\"},{\"name\": \"partition_path\", \"type\": [\"null\", \"string\"], \"default\": null },{\"name\": \"trip_type\", \"type\": {\"type\": \"enum\", \"name\": \"TripType\", \"symbols\": [\"UNKNOWN\", \"UBERX\", \"BLACK\"], \"default\": \"UNKNOWN\"}},{\"name\": \"rider\", \"type\": \"string\"},{\"name\": \"driver\", \"type\": \"string\"},{\"name\": \"begin_lat\", \"type\": \"double\"},{\"name\": \"begin_lon\", \"type\": \"double\"},{\"name\": \"end_lat\", \"type\": \"double\"},{\"name\": \"end_lon\", \"type\": \"double\"},{\"name\": \"distance_in_meters\", \"type\": \"int\"},{\"name\": \"seconds_since_epoch\", \"type\": \"long\"},{\"name\": \"weight\", \"type\": \"float\"},{\"name\": \"nation\", \"type\": \"bytes\"},{\"name\":\"current_date\",\"type\": {\"type\": \"int\", \"logicalType\": \"date\"}},{\"name\":\"current_ts\",\"type\": {\"type\": \"long\"}},{\"name\":\"height\",\"type\":{\"type\":\"fixed\",\"name\":\"abc\",\"size\":5,\"logicalType\":\"decimal\",\"precision\":10,\"scale\":6}},{\"name\": \"city_to_state\", \"type\": {\"type\": \"map\", \"values\": \"string\"}},{\"name\": \"fare\",\"type\": {\"type\":\"record\", \"name\":\"fare\",\"fields\": [{\"name\": \"amount\",\"type\": \"double\"},{\"name\": \"currency\", \"type\": \"string\"}]}},{\"name\": \"tip_history\", \"default\": [], \"type\": {\"type\": \"array\", \"default\": [], \"items\": {\"type\": \"record\", \"default\": null, \"name\": \"tip_history\", \"fields\": [{\"name\": \"amount\", \"type\": \"double\"}, {\"name\": \"currency\", \"type\": \"string\"}]}}},{\"name\": \"_hoodie_is_deleted\", \"type\": \"boolean\", \"default\": false} ]}").withParallelism(2, 2).withAutoCommit(true).withEmbeddedTimelineServerEnabled(enableTimelineServer).withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(enableMetadataTable).build()).withCompactionConfig(HoodieCompactionConfig.newBuilder().withMaxNumDeltaCommitsBeforeCompaction(1).build()).withLayoutConfig(HoodieLayoutConfig.newBuilder().withLayoutType(HoodieStorageLayout.LayoutType.BUCKET.name()).withLayoutPartitioner(SparkBucketIndexPartitioner.class.getName()).build()).withIndexConfig(HoodieIndexConfig.newBuilder().fromProperties(props).withIndexType(HoodieIndex.IndexType.BUCKET).withBucketNum("1").build()).build();
            props.putAll((Map<?, ?>)config.getProps());
            this.metaClient = this.getHoodieMetaClient(HoodieTableType.MERGE_ON_READ, props);
            this.client = this.getHoodieWriteClient(config);
            List records = this.dataGen.generateInserts("001", Integer.valueOf(100));
            JavaRDD writeRecords = this.jsc().parallelize(records, 2);
            this.client.upsert(writeRecords, this.client.startCommit());
            this.client.upsert(writeRecords, this.client.startCommit());
            this.client.scheduleCompaction(Option.empty());
            List toBeDeleted = records.stream().map(HoodieRecord::getKey).limit(50L).collect(Collectors.toList());
            JavaRDD deleteRecords = this.jsc().parallelize(toBeDeleted, 2);
            this.client.delete(deleteRecords, this.client.startCommit());
            this.client.upsert(writeRecords, this.client.startCommit());
            org.junit.jupiter.api.Assertions.assertEquals((long)100L, (long)this.readTableTotalRecordsNum());
        }
        finally {
            this.jsc().hadoopConfiguration().set(HoodieReaderConfig.FILE_GROUP_READER_ENABLED.key(), "true");
        }
    }

    @ParameterizedTest
    @CsvSource(value={"true,true", "true,false", "false,true", "false,false"})
    void testCompactionSchedulingWithUncommittedLogFileOrRollback(boolean enableMetadataTable, boolean runRollback) throws IOException {
        Properties props = this.getPropertiesForKeyGen(true);
        HoodieWriteConfig config = HoodieWriteConfig.newBuilder().forTable("test-trip-table").withPath(this.basePath()).withSchema("{\"type\": \"record\",\"name\": \"triprec\",\"fields\": [ {\"name\": \"timestamp\",\"type\": \"long\"},{\"name\": \"_row_key\", \"type\": \"string\"},{\"name\": \"partition_path\", \"type\": [\"null\", \"string\"], \"default\": null },{\"name\": \"trip_type\", \"type\": {\"type\": \"enum\", \"name\": \"TripType\", \"symbols\": [\"UNKNOWN\", \"UBERX\", \"BLACK\"], \"default\": \"UNKNOWN\"}},{\"name\": \"rider\", \"type\": \"string\"},{\"name\": \"driver\", \"type\": \"string\"},{\"name\": \"begin_lat\", \"type\": \"double\"},{\"name\": \"begin_lon\", \"type\": \"double\"},{\"name\": \"end_lat\", \"type\": \"double\"},{\"name\": \"end_lon\", \"type\": \"double\"},{\"name\": \"distance_in_meters\", \"type\": \"int\"},{\"name\": \"seconds_since_epoch\", \"type\": \"long\"},{\"name\": \"weight\", \"type\": \"float\"},{\"name\": \"nation\", \"type\": \"bytes\"},{\"name\":\"current_date\",\"type\": {\"type\": \"int\", \"logicalType\": \"date\"}},{\"name\":\"current_ts\",\"type\": {\"type\": \"long\"}},{\"name\":\"height\",\"type\":{\"type\":\"fixed\",\"name\":\"abc\",\"size\":5,\"logicalType\":\"decimal\",\"precision\":10,\"scale\":6}},{\"name\": \"city_to_state\", \"type\": {\"type\": \"map\", \"values\": \"string\"}},{\"name\": \"fare\",\"type\": {\"type\":\"record\", \"name\":\"fare\",\"fields\": [{\"name\": \"amount\",\"type\": \"double\"},{\"name\": \"currency\", \"type\": \"string\"}]}},{\"name\": \"tip_history\", \"default\": [], \"type\": {\"type\": \"array\", \"default\": [], \"items\": {\"type\": \"record\", \"default\": null, \"name\": \"tip_history\", \"fields\": [{\"name\": \"amount\", \"type\": \"double\"}, {\"name\": \"currency\", \"type\": \"string\"}]}}},{\"name\": \"_hoodie_is_deleted\", \"type\": \"boolean\", \"default\": false} ]}").withParallelism(2, 2).withAutoCommit(false).withCompactionConfig(HoodieCompactionConfig.newBuilder().withMaxNumDeltaCommitsBeforeCompaction(1).build()).withStorageConfig(HoodieStorageConfig.newBuilder().parquetMaxFileSize(20480L).build()).withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(enableMetadataTable).build()).withWriteConcurrencyMode(WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL).withHeartbeatIntervalInMs(Integer.valueOf(120000)).withLockConfig(HoodieLockConfig.newBuilder().withLockProvider(InProcessLockProvider.class).build()).withCleanConfig(HoodieCleanConfig.newBuilder().withFailedWritesCleaningPolicy(HoodieFailedWritesCleaningPolicy.LAZY).build()).build();
        props.putAll((Map<?, ?>)config.getProps());
        this.metaClient = this.getHoodieMetaClient(HoodieTableType.MERGE_ON_READ, props);
        this.client = this.getHoodieWriteClient(config);
        String instant1 = this.client.createNewInstantTime();
        List recordList = this.dataGen.generateInserts(instant1, Integer.valueOf(100));
        this.writeData(instant1, recordList, true);
        org.junit.jupiter.api.Assertions.assertEquals((long)100L, (long)this.readTableTotalRecordsNum());
        this.validateFileListingInMetadataTable();
        String instant2 = this.client.createNewInstantTime();
        recordList = this.dataGen.generateUpdates(instant2, Integer.valueOf(100));
        List<WriteStatus> writeStatuses2 = this.writeData(instant2, recordList, false);
        List files = this.hoodieStorage().listFiles(new StoragePath(this.basePath()));
        int numTotalLogFiles = 0;
        for (Object file : files) {
            if (!file.isFile() || file.getPath().toString().contains(".hoodie") || !FSUtils.isLogFile((StoragePath)file.getPath()) || ++numTotalLogFiles % 2 != 0) continue;
            this.hoodieStorage().deleteFile(file.getPath());
        }
        int numLogFilesAfterDeletion = 0;
        files = this.hoodieStorage().listFiles(new StoragePath(this.basePath()));
        for (StoragePathInfo file : files) {
            if (!file.isFile() || file.getPath().toString().contains(".hoodie") || !FSUtils.isLogFile((StoragePath)file.getPath())) continue;
            ++numLogFilesAfterDeletion;
        }
        if (enableMetadataTable) {
            HoodieTableMetaClient metadataMetaClient = HoodieTableMetaClient.builder().setConf(this.storageConf()).setBasePath(HoodieTableMetadata.getMetadataTableBasePath((String)this.basePath())).build();
            org.junit.jupiter.api.Assertions.assertEquals((Object)instant1, (Object)((HoodieInstant)metadataMetaClient.getActiveTimeline().lastInstant().get()).requestedTime());
        }
        org.junit.jupiter.api.Assertions.assertTrue((numLogFilesAfterDeletion > 0 && numLogFilesAfterDeletion < numTotalLogFiles ? 1 : 0) != 0);
        org.junit.jupiter.api.Assertions.assertEquals((Object)instant2, (Object)((HoodieInstant)this.metaClient.getActiveTimeline().lastInstant().get()).requestedTime());
        org.junit.jupiter.api.Assertions.assertEquals((Object)instant1, (Object)((HoodieInstant)this.metaClient.getActiveTimeline().filterCompletedInstants().lastInstant().get()).requestedTime());
        org.junit.jupiter.api.Assertions.assertEquals((long)100L, (long)this.readTableTotalRecordsNum());
        String instant3 = this.client.createNewInstantTime();
        recordList = this.dataGen.generateUpdates(instant3, Integer.valueOf(100));
        this.writeData(instant3, recordList, true);
        if (runRollback) {
            this.client.rollback(instant2);
            this.validateLogFilesExistInRollbackPlan();
            this.validateFileListingInMetadataTable();
        }
        String compactionInstant = (String)this.client.scheduleCompaction(Option.empty()).get();
        this.validateFilesExistInCompactionPlan(compactionInstant);
        if (!runRollback) {
            org.junit.jupiter.api.Assertions.assertThrows(HoodieWriteConflictException.class, () -> this.commitToTable(instant2, writeStatuses2));
        }
        config.setValue(HoodieWriteConfig.AUTO_COMMIT_ENABLE, "true");
        this.client.compact(compactionInstant);
        if (runRollback) {
            this.validateFileListingInMetadataTable();
        }
        org.junit.jupiter.api.Assertions.assertEquals((long)100L, (long)this.readTableTotalRecordsNum());
        org.junit.jupiter.api.Assertions.assertEquals((Object)compactionInstant, (Object)((HoodieInstant)this.metaClient.reloadActiveTimeline().filterCompletedInstants().lastInstant().get()).requestedTime());
    }

    private void validateLogFilesExistInRollbackPlan() throws IOException {
        this.metaClient.reloadActiveTimeline();
        HoodieRollbackPlan rollbackPlan = RollbackUtils.getRollbackPlan((HoodieTableMetaClient)this.metaClient, (HoodieInstant)((HoodieInstant)this.metaClient.getActiveTimeline().filter(e -> "rollback".equals(e.getAction())).lastInstant().get()));
        org.junit.jupiter.api.Assertions.assertTrue((boolean)rollbackPlan.getRollbackRequests().stream().map(request -> {
            boolean allExist = true;
            StoragePath partitionPath = new StoragePath(this.basePath(), request.getPartitionPath());
            for (String logFile : request.getLogBlocksToBeDeleted().keySet()) {
                try {
                    allExist = allExist && this.hoodieStorage().exists(new StoragePath(partitionPath, logFile));
                }
                catch (IOException e) {
                    throw new RuntimeException(e);
                }
            }
            return allExist;
        }).reduce(Boolean::logicalAnd).get());
    }

    private void validateFilesExistInCompactionPlan(String compactionInstant) {
        HoodieCompactionPlan compactionPlan = CompactionUtils.getCompactionPlan((HoodieTableMetaClient)this.metaClient, (String)compactionInstant);
        org.junit.jupiter.api.Assertions.assertTrue((boolean)compactionPlan.getOperations().stream().map(op -> {
            boolean allExist;
            StoragePath partitionPath = new StoragePath(this.basePath(), op.getPartitionPath());
            try {
                allExist = this.hoodieStorage().exists(new StoragePath(partitionPath, op.getDataFilePath()));
            }
            catch (IOException e) {
                throw new RuntimeException(e);
            }
            for (String logFilePath : op.getDeltaFilePaths()) {
                try {
                    allExist = allExist && this.hoodieStorage().exists(new StoragePath(partitionPath, logFilePath));
                }
                catch (IOException e) {
                    throw new RuntimeException(e);
                }
            }
            return allExist;
        }).reduce(Boolean::logicalAnd).get());
    }

    private void validateFileListingInMetadataTable() {
        List<String> partitionPaths = FSUtils.getAllPartitionPaths((HoodieEngineContext)this.context(), (HoodieStorage)this.hoodieStorage(), (String)this.basePath(), (boolean)false).stream().map(e -> new StoragePath(this.basePath(), e).toString()).collect(Collectors.toList());
        Map filesFromStorage = FSUtils.getFilesInPartitions((HoodieEngineContext)this.context(), (HoodieStorage)this.hoodieStorage(), (HoodieMetadataConfig)HoodieMetadataConfig.newBuilder().enable(false).build(), (String)this.basePath(), (String[])partitionPaths.toArray(new String[0]));
        Map filesFromMetadataTable = FSUtils.getFilesInPartitions((HoodieEngineContext)this.context(), (HoodieStorage)this.hoodieStorage(), (HoodieMetadataConfig)HoodieMetadataConfig.newBuilder().enable(true).build(), (String)this.basePath(), (String[])partitionPaths.toArray(new String[0]));
        org.junit.jupiter.api.Assertions.assertEquals((int)filesFromStorage.size(), (int)filesFromMetadataTable.size());
        for (String partition : filesFromStorage.keySet()) {
            List partitionFilesFromStorage = ((List)filesFromStorage.get(partition)).stream().sorted().collect(Collectors.toList());
            List partitionFilesFromMetadataTable = ((List)filesFromMetadataTable.get(partition)).stream().sorted().collect(Collectors.toList());
            org.junit.jupiter.api.Assertions.assertEquals((int)partitionFilesFromStorage.size(), (int)partitionFilesFromMetadataTable.size());
            for (int i = 0; i < partitionFilesFromStorage.size(); ++i) {
                org.junit.jupiter.api.Assertions.assertEquals((Object)((StoragePathInfo)partitionFilesFromStorage.get(i)).getPath().toString(), (Object)((StoragePathInfo)partitionFilesFromMetadataTable.get(i)).getPath().toString());
            }
        }
    }

    private long readTableTotalRecordsNum() {
        return HoodieMergeOnReadTestUtils.getRecordsUsingInputFormat((StorageConfiguration)this.storageConf(), Arrays.stream(this.dataGen.getPartitionPaths()).map(p -> Paths.get(this.basePath(), p).toString()).collect(Collectors.toList()), (String)this.basePath()).size();
    }

    private List<WriteStatus> writeData(String instant, List<HoodieRecord> hoodieRecords, boolean doCommit) {
        this.metaClient = HoodieTableMetaClient.reload((HoodieTableMetaClient)this.metaClient);
        JavaRDD records = this.jsc().parallelize(hoodieRecords, 2);
        this.metaClient = HoodieTableMetaClient.reload((HoodieTableMetaClient)this.metaClient);
        this.client.startCommitWithTime(instant);
        List writeStatuses = this.client.upsert(records, instant).collect();
        Assertions.assertNoWriteErrors((List)writeStatuses);
        if (doCommit) {
            List writeStats = writeStatuses.stream().map(WriteStatus::getStat).collect(Collectors.toList());
            boolean committed = this.client.commitStats(instant, writeStats, Option.empty(), this.metaClient.getCommitActionType());
            org.junit.jupiter.api.Assertions.assertTrue((boolean)committed);
        }
        this.metaClient = HoodieTableMetaClient.reload((HoodieTableMetaClient)this.metaClient);
        return writeStatuses;
    }

    private void commitToTable(String instant, List<WriteStatus> writeStatuses) {
        List writeStats = writeStatuses.stream().map(WriteStatus::getStat).collect(Collectors.toList());
        boolean committed = this.client.commitStats(instant, writeStats, Option.empty(), this.metaClient.getCommitActionType());
        org.junit.jupiter.api.Assertions.assertTrue((boolean)committed);
    }
}

