/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.table.functional;

import java.io.IOException;
import java.nio.file.Paths;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Properties;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hudi.client.SparkRDDWriteClient;
import org.apache.hudi.common.model.HoodieFileFormat;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.table.HoodieTableConfig;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.hadoop.HoodieHFileInputFormat;
import org.apache.hudi.hadoop.HoodieParquetInputFormat;
import org.apache.hudi.hadoop.realtime.HoodieHFileRealtimeInputFormat;
import org.apache.hudi.hadoop.realtime.HoodieParquetRealtimeInputFormat;
import org.apache.hudi.hadoop.utils.HoodieInputFormatUtils;
import org.apache.hudi.storage.StorageConfiguration;
import org.apache.hudi.testutils.HoodieMergeOnReadTestUtils;
import org.apache.hudi.testutils.SparkClientFunctionalTestHarness;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;

@Tag(value="functional")
public class TestHoodieSparkMergeOnReadTableIncrementalRead
extends SparkClientFunctionalTestHarness {
    private JobConf roSnapshotJobConf;
    private JobConf roJobConf;
    private JobConf rtJobConf;

    @BeforeEach
    void setUp() {
        this.roSnapshotJobConf = new JobConf((Configuration)this.storageConf().unwrap());
        this.roJobConf = new JobConf((Configuration)this.storageConf().unwrap());
        this.rtJobConf = new JobConf((Configuration)this.storageConf().unwrap());
    }

    @Test
    public void testIncrementalReadsWithCompaction() throws Exception {
        String partitionPath = "2020/02/20";
        HoodieTestDataGenerator dataGen = new HoodieTestDataGenerator(new String[]{"2020/02/20"});
        Properties props = this.getPropertiesForKeyGen(true);
        props.setProperty(HoodieTableConfig.BASE_FILE_FORMAT.key(), HoodieFileFormat.PARQUET.toString());
        HoodieTableMetaClient metaClient = this.getHoodieMetaClient(HoodieTableType.MERGE_ON_READ, props);
        HoodieWriteConfig cfg = this.getConfigBuilder(true).build();
        try (SparkRDDWriteClient client = this.getHoodieWriteClient(cfg);){
            String commitTime1 = "001";
            client.startCommitWithTime(commitTime1);
            List records001 = dataGen.generateInserts(commitTime1, Integer.valueOf(200));
            Stream dataFiles = this.insertRecordsToMORTable(metaClient, records001, client, cfg, commitTime1);
            Assertions.assertTrue((boolean)dataFiles.findAny().isPresent(), (String)"should list the base files we wrote in the delta commit");
            FileStatus[] snapshotROFiles = this.getROSnapshotFiles("2020/02/20");
            this.validateFiles("2020/02/20", 1, snapshotROFiles, false, this.roSnapshotJobConf, 200, commitTime1);
            FileStatus[] incrementalROFiles = this.getROIncrementalFiles("2020/02/20", true);
            this.validateFiles("2020/02/20", 1, incrementalROFiles, false, this.roJobConf, 200, commitTime1);
            Path firstFilePath = incrementalROFiles[0].getPath();
            FileStatus[] incrementalRTFiles = this.getRTIncrementalFiles("2020/02/20");
            this.validateFiles("2020/02/20", 1, incrementalRTFiles, true, this.rtJobConf, 200, commitTime1);
            Assertions.assertEquals((Object)firstFilePath, (Object)incrementalRTFiles[0].getPath());
            String updateTime = "004";
            client.startCommitWithTime(updateTime);
            List records004 = dataGen.generateUpdates(updateTime, Integer.valueOf(100));
            this.updateRecordsInMORTable(metaClient, records004, client, cfg, updateTime, false);
            incrementalROFiles = this.getROIncrementalFiles("2020/02/20", false);
            this.validateFiles("2020/02/20", 1, incrementalROFiles, false, this.roJobConf, 200, commitTime1);
            Assertions.assertEquals((Object)firstFilePath, (Object)incrementalROFiles[0].getPath());
            incrementalRTFiles = this.getRTIncrementalFiles("2020/02/20");
            this.validateFiles("2020/02/20", 1, incrementalRTFiles, true, this.rtJobConf, 200, commitTime1, updateTime);
            String compactionCommitTime = "005";
            client.scheduleCompactionAtInstant("005", Option.empty());
            incrementalROFiles = this.getROIncrementalFiles("2020/02/20", true);
            this.validateFiles("2020/02/20", 1, incrementalROFiles, false, this.roJobConf, 200, commitTime1);
            incrementalRTFiles = this.getRTIncrementalFiles("2020/02/20");
            this.validateFiles("2020/02/20", 1, incrementalRTFiles, true, this.rtJobConf, 200, commitTime1, updateTime);
            String insertsTime = "006";
            List records006 = dataGen.generateInserts(insertsTime, Integer.valueOf(200));
            client.startCommitWithTime(insertsTime);
            dataFiles = this.insertRecordsToMORTable(metaClient, records006, client, cfg, insertsTime);
            Assertions.assertTrue((boolean)dataFiles.findAny().isPresent(), (String)"should list the base files we wrote in the delta commit");
            snapshotROFiles = this.getROSnapshotFiles("2020/02/20");
            this.validateFiles("2020/02/20", 2, snapshotROFiles, false, this.roSnapshotJobConf, 400, commitTime1, insertsTime);
            incrementalROFiles = this.getROIncrementalFiles("2020/02/20", true);
            Assertions.assertEquals((Object)firstFilePath, (Object)incrementalROFiles[0].getPath());
            this.validateFiles("2020/02/20", 1, incrementalROFiles, false, this.roJobConf, 200, commitTime1);
            incrementalROFiles = this.getROIncrementalFiles("2020/02/20", false);
            this.validateFiles("2020/02/20", 2, incrementalROFiles, false, this.roJobConf, 400, commitTime1, insertsTime);
            incrementalRTFiles = this.getRTIncrementalFiles("2020/02/20");
            this.validateFiles("2020/02/20", 2, incrementalRTFiles, true, this.rtJobConf, 400, commitTime1, updateTime, insertsTime);
            client.compact(compactionCommitTime);
            snapshotROFiles = this.getROSnapshotFiles("2020/02/20");
            this.validateFiles("2020/02/20", 2, snapshotROFiles, false, this.roSnapshotJobConf, 400, commitTime1, updateTime, insertsTime);
            incrementalROFiles = this.getROIncrementalFiles("2020/02/20", "002", -1, true);
            Assertions.assertTrue((incrementalROFiles.length == 2 ? 1 : 0) != 0);
            this.validateFiles("2020/02/20", 2, incrementalROFiles, false, this.roJobConf, 400, commitTime1, updateTime, insertsTime);
        }
    }

    private FileStatus[] getROSnapshotFiles(String partitionPath) throws Exception {
        FileInputFormat.setInputPaths((JobConf)this.roSnapshotJobConf, (String)Paths.get(this.basePath(), partitionPath).toString());
        return this.listStatus((HoodieFileFormat)HoodieTableConfig.BASE_FILE_FORMAT.defaultValue(), this.roSnapshotJobConf, false);
    }

    private FileStatus[] getROIncrementalFiles(String partitionPath, boolean stopAtCompaction) throws Exception {
        return this.getROIncrementalFiles(partitionPath, "000", -1, stopAtCompaction);
    }

    private FileStatus[] getROIncrementalFiles(String partitionPath, String startCommitTime, int numCommitsToPull, boolean stopAtCompaction) throws Exception {
        this.setupIncremental(this.roJobConf, startCommitTime, numCommitsToPull, stopAtCompaction);
        FileInputFormat.setInputPaths((JobConf)this.roJobConf, (String)Paths.get(this.basePath(), partitionPath).toString());
        return this.listStatus((HoodieFileFormat)HoodieTableConfig.BASE_FILE_FORMAT.defaultValue(), this.roJobConf, false);
    }

    private FileStatus[] getRTIncrementalFiles(String partitionPath) throws Exception {
        return this.getRTIncrementalFiles(partitionPath, "000", -1);
    }

    private FileStatus[] getRTIncrementalFiles(String partitionPath, String startCommitTime, int numCommitsToPull) throws Exception {
        this.setupIncremental(this.rtJobConf, startCommitTime, numCommitsToPull, false);
        FileInputFormat.setInputPaths((JobConf)this.rtJobConf, (String)Paths.get(this.basePath(), partitionPath).toString());
        return this.listStatus((HoodieFileFormat)HoodieTableConfig.BASE_FILE_FORMAT.defaultValue(), this.rtJobConf, true);
    }

    private void setupIncremental(JobConf jobConf, String startCommit, int numberOfCommitsToPull, boolean stopAtCompaction) {
        String modePropertyName = String.format("hoodie.%s.consume.mode", "raw_trips");
        jobConf.set(modePropertyName, "INCREMENTAL");
        String startCommitTimestampName = String.format("hoodie.%s.consume.start.timestamp", "raw_trips");
        jobConf.set(startCommitTimestampName, startCommit);
        String maxCommitPulls = String.format("hoodie.%s.consume.max.commits", "raw_trips");
        jobConf.setInt(maxCommitPulls, numberOfCommitsToPull);
        String stopAtCompactionPropName = String.format("hoodie.%s.ro.stop.at.compaction", "raw_trips");
        jobConf.setBoolean(stopAtCompactionPropName, stopAtCompaction);
    }

    private void validateFiles(String partitionPath, int expectedNumFiles, FileStatus[] files, boolean realtime, JobConf jobConf, int expectedRecords, String ... expectedCommits) {
        Assertions.assertEquals((int)expectedNumFiles, (int)files.length);
        Set expectedCommitsSet = Arrays.stream(expectedCommits).collect(Collectors.toSet());
        List records = HoodieMergeOnReadTestUtils.getRecordsUsingInputFormat((StorageConfiguration)this.storageConf(), Collections.singletonList(Paths.get(this.basePath(), partitionPath).toString()), (String)this.basePath(), (JobConf)jobConf, (boolean)realtime);
        Assertions.assertEquals((int)expectedRecords, (int)records.size());
        Set actualCommits = records.stream().map(r -> r.get(HoodieRecord.COMMIT_TIME_METADATA_FIELD).toString()).collect(Collectors.toSet());
        Assertions.assertEquals(expectedCommitsSet, actualCommits);
    }

    private FileStatus[] listStatus(HoodieFileFormat baseFileFormat, JobConf jobConf, boolean realtime) throws IOException {
        FileInputFormat inputFormat = HoodieInputFormatUtils.getInputFormat((HoodieFileFormat)baseFileFormat, (boolean)realtime, (Configuration)jobConf);
        switch (baseFileFormat) {
            case PARQUET: {
                if (realtime) {
                    return ((HoodieParquetRealtimeInputFormat)inputFormat).listStatus(jobConf);
                }
                return ((HoodieParquetInputFormat)inputFormat).listStatus(jobConf);
            }
            case HFILE: {
                if (realtime) {
                    return ((HoodieHFileRealtimeInputFormat)inputFormat).listStatus(jobConf);
                }
                return ((HoodieHFileInputFormat)inputFormat).listStatus(jobConf);
            }
        }
        throw new HoodieIOException("Hoodie InputFormat not implemented for base file format " + baseFileFormat);
    }
}

