/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.functional;

import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.LinkOption;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.hudi.DataSourceReadOptions;
import org.apache.hudi.DataSourceWriteOptions;
import org.apache.hudi.client.SparkRDDWriteClient;
import org.apache.hudi.common.config.HoodieMetadataConfig;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
import org.apache.hudi.common.testutils.HoodieTestUtils;
import org.apache.hudi.common.testutils.RawTripTestPayload;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieCompactionConfig;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.testutils.HoodieSparkClientTestBase;
import org.apache.spark.SparkException;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SaveMode;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;

public class TestDataSkippingWithMORColstats
extends HoodieSparkClientTestBase {
    private static String matchCond = "trip_type = 'UBERX'";
    private static String nonMatchCond = "trip_type = 'BLACK'";
    private static String[] dropColumns = new String[]{"_hoodie_commit_time", "_hoodie_commit_seqno", "_hoodie_record_key", "_hoodie_partition_path", "_hoodie_file_name"};
    private Boolean shouldOverwrite;
    Map<String, String> options;
    @TempDir
    public Path basePath;

    @BeforeEach
    public void setUp() throws Exception {
        this.initSparkContexts();
        this.dataGen = new HoodieTestDataGenerator();
        this.shouldOverwrite = true;
        this.options = this.getOptions();
        Properties props = new Properties();
        props.putAll(this.options);
        try {
            this.metaClient = HoodieTableMetaClient.newTableBuilder().fromProperties(props).setTableType(HoodieTableType.MERGE_ON_READ.name()).initTable(this.storageConf.newInstance(), this.basePath.toString());
        }
        catch (IOException e) {
            throw new RuntimeException(e);
        }
    }

    @AfterEach
    public void tearDown() throws IOException {
        this.cleanupSparkContexts();
        this.cleanupTestDataGenerator();
        this.metaClient = null;
    }

    @Test
    public void testBaseFileOnly() {
        this.options.put(HoodieMetadataConfig.COLUMN_STATS_INDEX_MAX_COLUMNS.key(), "1");
        this.options.put(HoodieMetadataConfig.ENABLE_METADATA_INDEX_PARTITION_STATS.key(), "false");
        Dataset<Row> inserts = this.makeInsertDf("000", 100);
        Dataset batch1 = inserts.where(matchCond);
        Dataset batch2 = inserts.where(nonMatchCond);
        this.doWrite((Dataset<Row>)batch1);
        this.doWrite((Dataset<Row>)batch2);
        List<Path> filesToCorrupt = this.getFilesToCorrupt();
        Assertions.assertEquals((int)1, (int)filesToCorrupt.size());
        filesToCorrupt.forEach(TestDataSkippingWithMORColstats::corruptFile);
        Assertions.assertEquals((long)0L, (long)this.readMatchingRecords().except(batch1).count());
        Assertions.assertThrows(SparkException.class, () -> this.readMatchingRecords(false).count());
    }

    @Test
    public void testBaseFileAndLogFileUpdateMatches() {
        this.testBaseFileAndLogFileUpdateMatchesHelper(false, false, false, false);
    }

    @Test
    public void testBaseFileAndLogFileUpdateMatchesDoCompaction() {
        this.testBaseFileAndLogFileUpdateMatchesHelper(false, true, false, false);
    }

    @Test
    public void testBaseFileAndLogFileUpdateMatchesScheduleCompaction() {
        this.testBaseFileAndLogFileUpdateMatchesHelper(true, false, false, false);
    }

    @Test
    public void testBaseFileAndLogFileUpdateMatchesDeleteBlock() {
        this.testBaseFileAndLogFileUpdateMatchesHelper(false, false, true, false);
    }

    @Test
    public void testBaseFileAndLogFileUpdateMatchesDeleteBlockCompact() {
        this.options.put(HoodieMetadataConfig.ENABLE_METADATA_INDEX_PARTITION_STATS.key(), "false");
        this.testBaseFileAndLogFileUpdateMatchesHelper(false, true, true, false);
    }

    @Test
    public void testBaseFileAndLogFileUpdateMatchesAndRollBack() {
        this.options.put(HoodieMetadataConfig.ENABLE_METADATA_INDEX_PARTITION_STATS.key(), "false");
        this.testBaseFileAndLogFileUpdateMatchesHelper(false, false, false, true);
    }

    private void testBaseFileAndLogFileUpdateMatchesHelper(Boolean shouldScheduleCompaction, Boolean shouldInlineCompact, Boolean shouldDelete, Boolean shouldRollback) {
        Dataset<Row> inserts = this.makeInsertDf("000", 100);
        Dataset batch1 = inserts.where(matchCond);
        Dataset batch2 = inserts.where(nonMatchCond);
        this.doWrite((Dataset<Row>)batch1);
        this.doWrite((Dataset<Row>)batch2);
        if (shouldScheduleCompaction.booleanValue()) {
            this.doWrite(inserts);
            this.scheduleCompaction();
        }
        List<Path> filesToCorrupt = this.getFilesToCorrupt();
        Assertions.assertEquals((int)1, (int)filesToCorrupt.size());
        Dataset recordToUpdate = batch2.limit(1);
        Dataset<Row> updatedRecord = this.makeRecordMatch((Dataset<Row>)recordToUpdate);
        this.doWrite(updatedRecord);
        if (shouldRollback.booleanValue()) {
            this.deleteLatestDeltacommit();
            this.enableInlineCompaction(shouldInlineCompact);
            this.doWrite((Dataset<Row>)recordToUpdate);
            Assertions.assertEquals((long)0L, (long)this.readMatchingRecords().except(batch1).count());
        } else if (shouldDelete.booleanValue()) {
            this.enableInlineCompaction(shouldInlineCompact);
            this.doDelete(updatedRecord);
            Assertions.assertEquals((long)0L, (long)this.readMatchingRecords().except(batch1).count());
        } else {
            Assertions.assertEquals((long)0L, (long)this.readMatchingRecords().except(batch1.union(updatedRecord)).count());
        }
        if (shouldInlineCompact.booleanValue()) {
            filesToCorrupt = this.getFilesToCorrupt();
            filesToCorrupt.forEach(TestDataSkippingWithMORColstats::corruptFile);
            if (shouldDelete.booleanValue() || shouldRollback.booleanValue()) {
                Assertions.assertEquals((int)1, (int)filesToCorrupt.size());
                Assertions.assertEquals((long)0L, (long)this.readMatchingRecords().except(batch1).count());
            } else {
                this.enableInlineCompaction(true);
                this.doWrite(updatedRecord);
                Assertions.assertEquals((int)0, (int)filesToCorrupt.size());
            }
        } else {
            filesToCorrupt.forEach(TestDataSkippingWithMORColstats::corruptFile);
            Assertions.assertEquals((int)1, (int)filesToCorrupt.size());
            if (shouldRollback.booleanValue()) {
                Assertions.assertDoesNotThrow(() -> this.readMatchingRecords().count(), (String)"The non match file group got rollback correctly");
            } else {
                Assertions.assertThrows(SparkException.class, () -> this.readMatchingRecords().count(), (String)"The corrupt parquets are not excluded");
            }
        }
    }

    @Test
    public void testBaseFileAndLogFileUpdateUnmatches() {
        this.testBaseFileAndLogFileUpdateUnmatchesHelper(false);
    }

    @Test
    public void testBaseFileAndLogFileUpdateUnmatchesScheduleCompaction() {
        this.testBaseFileAndLogFileUpdateUnmatchesHelper(true);
    }

    private void testBaseFileAndLogFileUpdateUnmatchesHelper(Boolean shouldScheduleCompaction) {
        Dataset<Row> inserts = this.makeInsertDf("000", 100);
        Dataset batch1 = inserts.where(matchCond);
        this.doWrite((Dataset<Row>)batch1);
        Dataset batch2 = inserts.where(nonMatchCond);
        Dataset recordToMod = batch2.limit(1);
        Dataset<Row> initialRecordToMod = this.makeRecordMatch((Dataset<Row>)recordToMod);
        Dataset modBatch2 = this.removeRecord((Dataset<Row>)batch2, (Dataset<Row>)recordToMod).union(initialRecordToMod);
        this.doWrite((Dataset<Row>)modBatch2);
        if (shouldScheduleCompaction.booleanValue()) {
            this.doWrite((Dataset<Row>)batch1.union(modBatch2));
            this.scheduleCompaction();
        }
        this.doWrite((Dataset<Row>)recordToMod);
        Assertions.assertEquals((long)0L, (long)this.readMatchingRecords().except(batch1).count());
        List<Path> filesToCorrupt = this.getFilesToCorrupt();
        Assertions.assertEquals((int)1, (int)filesToCorrupt.size());
        filesToCorrupt.forEach(TestDataSkippingWithMORColstats::corruptFile);
        Assertions.assertThrows(SparkException.class, () -> this.readMatchingRecords().count());
    }

    private Map<String, String> getOptions() {
        HashMap<String, String> options = new HashMap<String, String>();
        options.put(HoodieMetadataConfig.ENABLE.key(), "true");
        options.put(HoodieMetadataConfig.ENABLE_METADATA_INDEX_COLUMN_STATS.key(), "true");
        options.put(HoodieMetadataConfig.COLUMN_STATS_INDEX_FOR_COLUMNS.key(), "trip_type");
        options.put(DataSourceReadOptions.ENABLE_DATA_SKIPPING().key(), "true");
        options.put(DataSourceWriteOptions.TABLE_TYPE().key(), DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL());
        options.put(HoodieWriteConfig.TBL_NAME.key(), "testTable");
        options.put(DataSourceWriteOptions.PRECOMBINE_FIELD().key(), "timestamp");
        options.put(DataSourceWriteOptions.RECORDKEY_FIELD().key(), "_row_key");
        options.put("hoodie.datasource.write.keygenerator.class", "org.apache.hudi.keygen.NonpartitionedKeyGenerator");
        options.put(HoodieCompactionConfig.PARQUET_SMALL_FILE_LIMIT.key(), "0");
        options.put(HoodieWriteConfig.ROLLBACK_USING_MARKERS_ENABLE.key(), "false");
        options.put(HoodieCompactionConfig.INLINE_COMPACT.key(), "false");
        return options;
    }

    private void scheduleCompaction() {
        HoodieWriteConfig cfg = HoodieWriteConfig.newBuilder().withPath(this.basePath.toString()).withRollbackUsingMarkers(false).withAutoCommit(false).withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(true).withMetadataIndexColumnStats(true).withColumnStatsIndexForColumns("trip_type").build()).withCompactionConfig(HoodieCompactionConfig.newBuilder().compactionSmallFileSize(0L).withInlineCompaction(Boolean.valueOf(false)).withMaxNumDeltaCommitsBeforeCompaction(1).build()).forTable("testTable").withKeyGenerator("org.apache.hudi.keygen.NonpartitionedKeyGenerator").build();
        try (SparkRDDWriteClient client = this.getHoodieWriteClient(cfg);){
            client.scheduleCompactionAtInstant(client.createNewInstantTime(), Option.empty());
        }
    }

    private Dataset<Row> removeRecord(Dataset<Row> batch, Dataset<Row> recordToRemove) {
        return batch.where("_row_key != '" + ((Row)recordToRemove.first()).getString(1) + "'");
    }

    /*
     * Enabled aggressive block sorting
     * Enabled unnecessary exception pruning
     * Enabled aggressive exception aggregation
     */
    private List<Path> getFilesToCorrupt() {
        HashSet fileNames = new HashSet();
        this.sparkSession.read().format("hudi").load(this.basePath.toString()).where(matchCond).select("_hoodie_file_name", new String[0]).distinct().collectAsList().forEach(row -> {
            String fileName = row.getString(0);
            if (fileName.contains(".parquet")) {
                fileNames.add(FSUtils.getFileId((String)fileName));
            } else {
                fileNames.add(fileName);
            }
        });
        try (Stream<Path> stream = Files.list(this.basePath);){
            HashMap latestBaseFiles = new HashMap();
            List<Path> files = stream.filter(file -> !Files.isDirectory(file, new LinkOption[0])).filter(file -> file.toString().contains(".parquet")).filter(file -> !file.toString().contains(".crc")).filter(file -> !fileNames.contains(FSUtils.getFileId((String)file.getFileName().toString()))).collect(Collectors.toList());
            files.forEach(f -> {
                String fileID = FSUtils.getFileId((String)f.getFileName().toString());
                if (!latestBaseFiles.containsKey(fileID) || FSUtils.getCommitTime((String)f.getFileName().toString()).compareTo(FSUtils.getCommitTime((String)((Path)latestBaseFiles.get(fileID)).getFileName().toString())) > 0) {
                    latestBaseFiles.put(fileID, f);
                }
            });
            ArrayList<Path> arrayList = new ArrayList<Path>(latestBaseFiles.values());
            return arrayList;
        }
        catch (IOException e) {
            throw new RuntimeException(e);
        }
    }

    private void doWrite(Dataset<Row> df) {
        if (this.shouldOverwrite.booleanValue()) {
            this.shouldOverwrite = false;
            df.write().format("hudi").options(this.options).mode(SaveMode.Overwrite).save(this.basePath.toString());
        } else {
            df.write().format("hudi").options(this.options).mode(SaveMode.Append).save(this.basePath.toString());
        }
    }

    private void doDelete(Dataset<Row> df) {
        df.write().format("hudi").options(this.options).option(DataSourceWriteOptions.OPERATION().key(), DataSourceWriteOptions.DELETE_OPERATION_OPT_VAL()).mode(SaveMode.Append).save(this.basePath.toString());
    }

    private Dataset<Row> makeRecordMatch(Dataset<Row> rowToMod) {
        return this.updateTripType(rowToMod, "UBERX");
    }

    private Dataset<Row> updateTripType(Dataset<Row> rowToMod, String value) {
        rowToMod.createOrReplaceTempView("rowToMod");
        return this.sparkSession.sqlContext().createDataFrame(this.sparkSession.sql("select _hoodie_is_deleted, _row_key, begin_lat, begin_lon, current_date, current_ts, distance_in_meters, driver, end_lat, end_lon, fare, height, nation, partition, partition_path, rider, seconds_since_epoch, timestamp, tip_history, '" + value + "' as trip_type, weight from rowToMod").rdd(), rowToMod.schema());
    }

    private Dataset<Row> readMatchingRecords() {
        return this.readMatchingRecords(true);
    }

    public Dataset<Row> readMatchingRecords(Boolean useDataSkipping) {
        if (useDataSkipping.booleanValue()) {
            return this.sparkSession.read().format("hudi").options(this.options).load(this.basePath.toString()).where(matchCond).drop(dropColumns);
        }
        return this.sparkSession.read().format("hudi").option(DataSourceReadOptions.ENABLE_DATA_SKIPPING().key(), "false").load(this.basePath.toString()).where(matchCond).drop(dropColumns);
    }

    protected static void corruptFile(Path path) {
        File fileToCorrupt = path.toFile();
        fileToCorrupt.delete();
        try {
            fileToCorrupt.createNewFile();
        }
        catch (IOException e) {
            throw new RuntimeException(e);
        }
    }

    protected Dataset<Row> makeInsertDf(String instantTime, Integer n) {
        List records = this.dataGen.generateInserts(instantTime, n).stream().map(r -> (String)RawTripTestPayload.recordToString((HoodieRecord)r).get()).collect(Collectors.toList());
        JavaRDD rdd = this.jsc.parallelize(records);
        return this.sparkSession.read().json(rdd).drop("city_to_state");
    }

    public void deleteLatestDeltacommit() {
        String filename = HoodieTestUtils.INSTANT_FILE_NAME_GENERATOR.getFileName((HoodieInstant)this.metaClient.getActiveTimeline().lastInstant().get());
        File deltacommit = new File(this.metaClient.getBasePath() + "/.hoodie/timeline/" + filename);
        deltacommit.delete();
    }

    public void enableInlineCompaction(Boolean shouldEnable) {
        if (shouldEnable.booleanValue()) {
            this.options.put(HoodieCompactionConfig.INLINE_COMPACT.key(), "true");
            this.options.put(HoodieCompactionConfig.INLINE_COMPACT_NUM_DELTA_COMMITS.key(), "1");
        }
    }
}

