/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Properties;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.avro.Schema;
import org.apache.hudi.AvroConversionUtils;
import org.apache.hudi.DefaultSparkRecordMerger;
import org.apache.hudi.client.SparkRDDWriteClient;
import org.apache.hudi.common.config.HoodieReaderConfig;
import org.apache.hudi.common.config.HoodieStorageConfig;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.model.FileSlice;
import org.apache.hudi.common.model.HoodieBaseFile;
import org.apache.hudi.common.model.HoodieEmptyRecord;
import org.apache.hudi.common.model.HoodieFileFormat;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieLogFile;
import org.apache.hudi.common.model.HoodieOperation;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordMerger;
import org.apache.hudi.common.model.HoodieSparkRecord;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.table.HoodieTableConfig;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.keygen.constant.KeyGeneratorOptions;
import org.apache.hudi.table.HoodieSparkTable;
import org.apache.hudi.testutils.SparkClientFunctionalTestHarness;
import org.apache.hudi.testutils.SparkDatasetTestUtils;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder;
import org.apache.spark.sql.types.DataType;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

public class TestHoodieMergeHandleWithSparkMerger
extends SparkClientFunctionalTestHarness {
    private static final Schema SCHEMA = TestHoodieMergeHandleWithSparkMerger.getAvroSchema("AvroSchema", "AvroSchemaNS");
    private HoodieTableMetaClient metaClient;

    public static String getPartitionPath() {
        return "2023-10-01";
    }

    @BeforeEach
    public void setUp() throws IOException {
        Properties properties = new Properties();
        properties.setProperty(HoodieTableConfig.BASE_FILE_FORMAT.key(), ((HoodieFileFormat)HoodieTableConfig.BASE_FILE_FORMAT.defaultValue()).toString());
        properties.setProperty(HoodieTableConfig.PRECOMBINE_FIELD.key(), "record_key");
        properties.setProperty(KeyGeneratorOptions.PARTITIONPATH_FIELD_NAME.key(), "partition_path");
        properties.setProperty(HoodieTableConfig.PARTITION_FIELDS.key(), "partition_path");
        this.metaClient = this.getHoodieMetaClient(this.storageConf(), this.basePath(), HoodieTableType.MERGE_ON_READ, properties);
    }

    @Test
    public void testDefaultMerger() throws Exception {
        DefaultWriteConfig writeConfig = this.buildDefaultWriteConfig(SCHEMA);
        HoodieRecordMerger merger = writeConfig.getRecordMerger();
        Assertions.assertTrue((boolean)(merger instanceof DefaultMerger));
        Assertions.assertTrue((boolean)writeConfig.getBooleanOrDefault(HoodieReaderConfig.FILE_GROUP_READER_ENABLED.key(), false));
        this.insertAndUpdate(writeConfig, 114);
    }

    @Test
    public void testNoFlushMerger() throws Exception {
        NoFlushWriteConfig writeConfig = this.buildNoFlushWriteConfig(SCHEMA);
        HoodieRecordMerger merger = writeConfig.getRecordMerger();
        Assertions.assertTrue((boolean)(merger instanceof NoFlushMerger));
        Assertions.assertTrue((boolean)writeConfig.getBooleanOrDefault(HoodieReaderConfig.FILE_GROUP_READER_ENABLED.key(), false));
        this.insertAndUpdate(writeConfig, 64);
    }

    @Test
    public void testCustomMerger() throws Exception {
        CustomWriteConfig writeConfig = this.buildCustomWriteConfig(SCHEMA);
        HoodieRecordMerger merger = writeConfig.getRecordMerger();
        Assertions.assertTrue((boolean)(merger instanceof CustomMerger));
        Assertions.assertTrue((boolean)writeConfig.getBooleanOrDefault(HoodieReaderConfig.FILE_GROUP_READER_ENABLED.key(), false));
        this.insertAndUpdate(writeConfig, 95);
    }

    public List<HoodieRecord> generateRecords(int numOfRecords, String commitTime) throws Exception {
        Dataset rows = SparkDatasetTestUtils.getRandomRowsWithCommitTime((SQLContext)new SQLContext(this.jsc()), (int)numOfRecords, (String)TestHoodieMergeHandleWithSparkMerger.getPartitionPath(), (boolean)false, (String)commitTime);
        List internalRows = SparkDatasetTestUtils.toInternalRows((Dataset)rows, (ExpressionEncoder)SparkDatasetTestUtils.ENCODER);
        return internalRows.stream().map(r -> new HoodieSparkRecord(new HoodieKey(r.getString(2), r.getString(3)), r, SparkDatasetTestUtils.STRUCT_TYPE, false)).collect(Collectors.toList());
    }

    public List<HoodieRecord> generateRecordUpdates(List<HoodieKey> keys, String commitTime) throws Exception {
        Dataset rows = SparkDatasetTestUtils.getRandomRowsWithKeys((SQLContext)new SQLContext(this.jsc()), keys, (boolean)false, (String)commitTime);
        List internalRows = SparkDatasetTestUtils.toInternalRows((Dataset)rows, (ExpressionEncoder)SparkDatasetTestUtils.ENCODER);
        return internalRows.stream().map(r -> new HoodieSparkRecord(new HoodieKey(r.getString(2), r.getString(3)), r, SparkDatasetTestUtils.STRUCT_TYPE, false)).collect(Collectors.toList());
    }

    public List<HoodieRecord> generateEmptyRecords(List<HoodieKey> keys) {
        ArrayList<HoodieRecord> records = new ArrayList<HoodieRecord>();
        for (HoodieKey key : keys) {
            records.add((HoodieRecord)new HoodieEmptyRecord(key, HoodieOperation.DELETE, (Comparable)((Object)key.getRecordKey()), HoodieRecord.HoodieRecordType.SPARK));
        }
        return records;
    }

    public static List<HoodieKey> getKeys(List<HoodieRecord> records) {
        return records.stream().map(r -> r.getKey()).collect(Collectors.toList());
    }

    private static Schema getAvroSchema(String schemaName, String schemaNameSpace) {
        return AvroConversionUtils.convertStructTypeToAvroSchema((DataType)SparkDatasetTestUtils.STRUCT_TYPE, (String)schemaName, (String)schemaNameSpace);
    }

    public HoodieWriteConfig getWriteConfig(Schema avroSchema) {
        Properties extraProperties = new Properties();
        extraProperties.setProperty(HoodieWriteConfig.RECORD_MERGE_IMPL_CLASSES.key(), "org.apache.hudi.DefaultSparkRecordMerger");
        extraProperties.setProperty(HoodieStorageConfig.LOGFILE_DATA_BLOCK_FORMAT.key(), "parquet");
        extraProperties.setProperty(HoodieWriteConfig.PRECOMBINE_FIELD_NAME.key(), "record_key");
        extraProperties.setProperty(HoodieReaderConfig.FILE_GROUP_READER_ENABLED.key(), "true");
        extraProperties.setProperty(HoodieWriteConfig.WRITE_RECORD_POSITIONS.key(), "true");
        extraProperties.setProperty(KeyGeneratorOptions.PARTITIONPATH_FIELD_NAME.key(), "partition_path");
        return this.getConfigBuilder(true).withPath(this.basePath()).withSchema(avroSchema.toString()).withProperties(extraProperties).build();
    }

    public DefaultWriteConfig buildDefaultWriteConfig(Schema avroSchema) {
        HoodieWriteConfig config = this.getWriteConfig(avroSchema);
        return new DefaultWriteConfig(config);
    }

    public NoFlushWriteConfig buildNoFlushWriteConfig(Schema avroSchema) {
        HoodieWriteConfig config = this.getWriteConfig(avroSchema);
        return new NoFlushWriteConfig(config);
    }

    public CustomWriteConfig buildCustomWriteConfig(Schema avroSchema) {
        HoodieWriteConfig config = this.getWriteConfig(avroSchema);
        return new CustomWriteConfig(config);
    }

    public HoodieTableFileSystemView getFileSystemView() {
        return new HoodieTableFileSystemView(this.metaClient, (HoodieTimeline)this.metaClient.getActiveTimeline());
    }

    public List<FileSlice> getLatestFileSlices(String partitionPath) {
        return this.getFileSystemView().getLatestFileSlices(partitionPath).collect(Collectors.toList());
    }

    public Option<FileSlice> getLatestFileSlice(String partitionPath, String fileId) {
        return this.getFileSystemView().getLatestFileSlice(partitionPath, fileId);
    }

    public Option<HoodieBaseFile> getLatestBaseFile(String partitionPath, String fileId) {
        return this.getLatestFileSlice(partitionPath, fileId).map(fs -> (HoodieBaseFile)fs.getBaseFile().get());
    }

    public List<HoodieLogFile> getLatestLogFiles(String partitionPath, String fileId) {
        Option<FileSlice> fileSliceOpt = this.getLatestFileSlice(partitionPath, fileId);
        if (fileSliceOpt.isPresent()) {
            return ((FileSlice)fileSliceOpt.get()).getLogFiles().collect(Collectors.toList());
        }
        return Collections.emptyList();
    }

    public List<String> getFileIds(String partitionPath) {
        List<FileSlice> fileSlices = this.getLatestFileSlices(partitionPath);
        return fileSlices.stream().map(fs -> fs.getFileId()).collect(Collectors.toList());
    }

    public void checkDataEquality(int numRecords) {
        HashMap<String, String> properties = new HashMap<String, String>();
        properties.put(HoodieWriteConfig.RECORD_MERGE_IMPL_CLASSES.key(), "org.apache.hudi.DefaultSparkRecordMerger");
        properties.put(HoodieStorageConfig.LOGFILE_DATA_BLOCK_FORMAT.key(), "parquet");
        properties.put("hoodie.payload.ordering.field", HoodieRecord.HoodieMetadataField.RECORD_KEY_METADATA_FIELD.getFieldName());
        properties.put(HoodieReaderConfig.FILE_GROUP_READER_ENABLED.key(), "true");
        properties.put(HoodieWriteConfig.WRITE_RECORD_POSITIONS.key(), "true");
        Dataset rows = this.spark().read().options(properties).format("org.apache.hudi").load(this.basePath());
        List result = rows.collectAsList();
        Assertions.assertEquals((int)numRecords, (int)result.size());
    }

    public void insertAndUpdate(HoodieWriteConfig writeConfig, int expectedRecordNum) throws Exception {
        HoodieTableMetaClient reloadedMetaClient = HoodieTableMetaClient.reload((HoodieTableMetaClient)this.metaClient);
        HoodieSparkTable hoodieTable = HoodieSparkTable.create((HoodieWriteConfig)writeConfig, (HoodieEngineContext)this.context(), (HoodieTableMetaClient)reloadedMetaClient);
        Assertions.assertEquals((Object)hoodieTable.getMetaClient().getTableType(), (Object)HoodieTableType.MERGE_ON_READ);
        try (SparkRDDWriteClient writeClient = this.getHoodieWriteClient(writeConfig);){
            String instantTime = "001";
            writeClient.startCommitWithTime(instantTime);
            List<HoodieRecord> records = this.generateRecords(100, instantTime);
            Stream baseFileStream = this.insertRecordsToMORTable(reloadedMetaClient, records, writeClient, writeConfig, instantTime);
            Assertions.assertTrue((boolean)baseFileStream.findAny().isPresent());
            Option deltaCommit = reloadedMetaClient.getActiveTimeline().getDeltaCommitTimeline().lastInstant();
            Assertions.assertTrue((boolean)deltaCommit.isPresent());
            Assertions.assertEquals((Object)instantTime, (Object)((HoodieInstant)deltaCommit.get()).requestedTime(), (String)"Delta commit should be specified value");
            List<String> fileIds = this.getFileIds(TestHoodieMergeHandleWithSparkMerger.getPartitionPath());
            Assertions.assertEquals((int)1, (int)fileIds.size());
            Option<HoodieBaseFile> baseFileOption = this.getLatestBaseFile(TestHoodieMergeHandleWithSparkMerger.getPartitionPath(), fileIds.get(0));
            Assertions.assertTrue((boolean)baseFileOption.isPresent());
            List<HoodieLogFile> logFiles = this.getLatestLogFiles(TestHoodieMergeHandleWithSparkMerger.getPartitionPath(), fileIds.get(0));
            Assertions.assertTrue((boolean)logFiles.isEmpty());
            this.checkDataEquality(100);
            instantTime = "002";
            writeClient.startCommitWithTime(instantTime);
            List<HoodieRecord> records2 = this.generateEmptyRecords(TestHoodieMergeHandleWithSparkMerger.getKeys(records).subList(0, 17));
            List<HoodieRecord> records3 = this.generateRecordUpdates(TestHoodieMergeHandleWithSparkMerger.getKeys(records).subList(17, 36), "001");
            List<HoodieRecord> records4 = this.generateRecords(31, instantTime);
            records2.addAll(records3);
            records2.addAll(records4);
            Assertions.assertEquals((int)67, (int)records2.size());
            this.updateRecordsInMORTable(reloadedMetaClient, records2, writeClient, writeConfig, instantTime, false);
            deltaCommit = reloadedMetaClient.getActiveTimeline().getDeltaCommitTimeline().lastInstant();
            Assertions.assertTrue((boolean)deltaCommit.isPresent());
            List<String> fileIds2 = this.getFileIds(TestHoodieMergeHandleWithSparkMerger.getPartitionPath());
            Assertions.assertFalse((boolean)fileIds2.isEmpty());
            Assertions.assertEquals((int)1, (int)fileIds2.size());
            baseFileOption = this.getLatestBaseFile(TestHoodieMergeHandleWithSparkMerger.getPartitionPath(), fileIds2.get(0));
            Assertions.assertTrue((boolean)baseFileOption.isPresent());
            this.checkDataEquality(expectedRecordNum);
            instantTime = "003";
            writeClient.startCommitWithTime(instantTime);
            List<HoodieRecord> records5 = this.generateEmptyRecords(TestHoodieMergeHandleWithSparkMerger.getKeys(records).subList(50, 59));
            Assertions.assertEquals((int)9, (int)records5.size());
            this.updateRecordsInMORTable(reloadedMetaClient, records5, writeClient, writeConfig, instantTime, false);
            this.checkDataEquality(expectedRecordNum - 9);
        }
    }

    public static class CustomMerger
    extends DefaultSparkRecordMerger {
        public boolean shouldFlush(HoodieRecord record, Schema schema, TypedProperties props) throws IOException {
            return !((InternalRow)((HoodieSparkRecord)record).getData()).getString(0).equals("001");
        }
    }

    public static class NoFlushMerger
    extends DefaultSparkRecordMerger {
        public boolean shouldFlush(HoodieRecord record, Schema schema, TypedProperties props) {
            return false;
        }
    }

    public static class DefaultMerger
    extends DefaultSparkRecordMerger {
        public boolean shouldFlush(HoodieRecord record, Schema schema, TypedProperties props) {
            return true;
        }
    }

    public static class CustomWriteConfig
    extends TestHoodieWriteConfig {
        CustomWriteConfig(HoodieWriteConfig writeConfig) {
            super(writeConfig);
        }

        public HoodieRecordMerger getRecordMerger() {
            return new CustomMerger();
        }
    }

    public static class NoFlushWriteConfig
    extends TestHoodieWriteConfig {
        NoFlushWriteConfig(HoodieWriteConfig writeConfig) {
            super(writeConfig);
        }

        public HoodieRecordMerger getRecordMerger() {
            return new NoFlushMerger();
        }
    }

    public static class DefaultWriteConfig
    extends TestHoodieWriteConfig {
        DefaultWriteConfig(HoodieWriteConfig writeConfig) {
            super(writeConfig);
        }

        public HoodieRecordMerger getRecordMerger() {
            return new DefaultMerger();
        }
    }

    public static class TestHoodieWriteConfig
    extends HoodieWriteConfig {
        TestHoodieWriteConfig(HoodieWriteConfig writeConfig) {
            super(writeConfig.getEngineType(), (Properties)writeConfig.getProps());
        }
    }
}

