/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.functional;

import java.io.IOException;
import java.io.InputStream;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.IntStream;
import java.util.stream.Stream;
import org.apache.avro.Schema;
import org.apache.hudi.AvroConversionUtils;
import org.apache.hudi.DataSourceWriteOptions;
import org.apache.hudi.HoodieDatasetBulkInsertHelper;
import org.apache.hudi.SparkAdapterSupport$;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.util.FileIOUtils;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.execution.bulkinsert.NonSortPartitionerWithRows;
import org.apache.hudi.keygen.ComplexKeyGenerator;
import org.apache.hudi.keygen.NonpartitionedKeyGenerator;
import org.apache.hudi.keygen.SimpleKeyGenerator;
import org.apache.hudi.keygen.constant.KeyGeneratorOptions;
import org.apache.hudi.table.BulkInsertPartitioner;
import org.apache.hudi.testutils.DataSourceTestUtils;
import org.apache.hudi.testutils.HoodieSparkClientTestBase;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.api.java.function.ReduceFunction;
import org.apache.spark.api.java.function.VoidFunction;
import org.apache.spark.scheduler.SparkListener;
import org.apache.spark.scheduler.SparkListenerInterface;
import org.apache.spark.scheduler.SparkListenerStageSubmitted;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoder;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.HoodieUnsafeUtils;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder;
import org.apache.spark.sql.types.StructType;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;

@Tag(value="functional")
public class TestHoodieDatasetBulkInsertHelper
extends HoodieSparkClientTestBase {
    private String schemaStr;
    private transient Schema schema;
    private StructType structType;

    public TestHoodieDatasetBulkInsertHelper() throws IOException {
        this.init();
    }

    private static Stream<Arguments> providePreCombineArgs() {
        return Stream.of(Arguments.of((Object[])new Object[]{false}), Arguments.of((Object[])new Object[]{true}));
    }

    private void init() throws IOException {
        this.schemaStr = FileIOUtils.readAsUTFString((InputStream)((Object)((Object)this)).getClass().getResourceAsStream("/exampleSchema.txt"));
        this.schema = DataSourceTestUtils.getStructTypeExampleSchema();
        this.structType = AvroConversionUtils.convertAvroSchemaToStructType((Schema)this.schema);
    }

    @Test
    public void testBulkInsertHelperConcurrently() {
        IntStream.range(0, 2).parallel().forEach(i -> {
            if (i % 2 == 0) {
                this.testBulkInsertHelperFor(SimpleKeyGenerator.class.getName(), "_row_key");
            } else {
                this.testBulkInsertHelperFor(SimpleKeyGenerator.class.getName(), "ts");
            }
        });
    }

    private static Stream<Arguments> provideKeyGenArgs() {
        return Stream.of(Arguments.of((Object[])new Object[]{SimpleKeyGenerator.class.getName()}), Arguments.of((Object[])new Object[]{ComplexKeyGenerator.class.getName()}), Arguments.of((Object[])new Object[]{NonpartitionedKeyGenerator.class.getName()}));
    }

    @ParameterizedTest
    @MethodSource(value={"provideKeyGenArgs"})
    public void testBulkInsertHelper(String keyGenClass) {
        this.testBulkInsertHelperFor(keyGenClass, "_row_key");
    }

    private void testBulkInsertHelperFor(String keyGenClass, String recordKeyField) {
        Map<String, String> props = null;
        props = keyGenClass.equals(SimpleKeyGenerator.class.getName()) ? this.getPropsAllSet(recordKeyField) : (keyGenClass.equals(ComplexKeyGenerator.class.getName()) ? this.getPropsForComplexKeyGen(recordKeyField) : this.getPropsForNonPartitionedKeyGen(recordKeyField));
        HoodieWriteConfig config = this.getConfigBuilder(this.schemaStr).withProps(props).combineInput(false, false).build();
        List<Row> rows = DataSourceTestUtils.generateRandomRows(10);
        Dataset dataset = this.sqlContext.createDataFrame(rows, this.structType);
        Dataset result = HoodieDatasetBulkInsertHelper.prepareForBulkInsert((Dataset)dataset, (HoodieWriteConfig)config, (BulkInsertPartitioner)new NonSortPartitionerWithRows(), (String)"0000000001");
        StructType resultSchema = result.schema();
        Assertions.assertEquals((long)result.count(), (long)10L);
        Assertions.assertEquals((int)resultSchema.fieldNames().length, (int)(this.structType.fieldNames().length + HoodieRecord.HOODIE_META_COLUMNS.size()));
        for (Map.Entry entry2 : HoodieRecord.HOODIE_META_COLUMNS_NAME_TO_POS.entrySet()) {
            Assertions.assertEquals((Integer)((Integer)entry2.getValue()), (int)resultSchema.fieldIndex((String)entry2.getKey()));
        }
        boolean isNonPartitionedKeyGen = keyGenClass.equals(NonpartitionedKeyGenerator.class.getName());
        boolean isComplexKeyGen = keyGenClass.equals(ComplexKeyGenerator.class.getName());
        TypedProperties keyGenProperties = new TypedProperties();
        keyGenProperties.put((Object)KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key(), (Object)recordKeyField);
        keyGenProperties.put((Object)KeyGeneratorOptions.PARTITIONPATH_FIELD_NAME.key(), (Object)"partition");
        ComplexKeyGenerator complexKeyGenerator = new ComplexKeyGenerator(keyGenProperties);
        result.toJavaRDD().foreach((VoidFunction & Serializable)entry -> {
            String recordKey = isComplexKeyGen ? complexKeyGenerator.getRecordKey(entry) : entry.getAs(recordKeyField).toString();
            Assertions.assertEquals((Object)recordKey, (Object)entry.get(resultSchema.fieldIndex(HoodieRecord.RECORD_KEY_METADATA_FIELD)));
            String partitionPath = isNonPartitionedKeyGen ? "" : entry.getAs("partition").toString();
            Assertions.assertEquals((Object)partitionPath, (Object)entry.get(resultSchema.fieldIndex(HoodieRecord.PARTITION_PATH_METADATA_FIELD)));
            Assertions.assertEquals((Object)"", (Object)entry.get(resultSchema.fieldIndex(HoodieRecord.COMMIT_SEQNO_METADATA_FIELD)));
            Assertions.assertEquals((Object)"", (Object)entry.get(resultSchema.fieldIndex(HoodieRecord.COMMIT_TIME_METADATA_FIELD)));
            Assertions.assertEquals((Object)"", (Object)entry.get(resultSchema.fieldIndex(HoodieRecord.FILENAME_METADATA_FIELD)));
        });
        Dataset trimmedOutput = result.drop(HoodieRecord.PARTITION_PATH_METADATA_FIELD).drop(HoodieRecord.RECORD_KEY_METADATA_FIELD).drop(HoodieRecord.FILENAME_METADATA_FIELD).drop(HoodieRecord.COMMIT_SEQNO_METADATA_FIELD).drop(HoodieRecord.COMMIT_TIME_METADATA_FIELD);
        Assertions.assertTrue((dataset.except(trimmedOutput).count() == 0L ? 1 : 0) != 0);
    }

    @Test
    public void testBulkInsertHelperNoMetaFields() {
        List<Row> rows = DataSourceTestUtils.generateRandomRows(10);
        HoodieWriteConfig config = this.getConfigBuilder(this.schemaStr).withProps(this.getPropsAllSet("_row_key")).withPopulateMetaFields(false).build();
        Dataset dataset = this.sqlContext.createDataFrame(rows, this.structType);
        Dataset result = HoodieDatasetBulkInsertHelper.prepareForBulkInsert((Dataset)dataset, (HoodieWriteConfig)config, (BulkInsertPartitioner)new NonSortPartitionerWithRows(), (String)"000001111");
        StructType resultSchema = result.schema();
        Assertions.assertEquals((long)result.count(), (long)10L);
        Assertions.assertEquals((int)resultSchema.fieldNames().length, (int)(this.structType.fieldNames().length + HoodieRecord.HOODIE_META_COLUMNS.size()));
        for (Map.Entry entry2 : HoodieRecord.HOODIE_META_COLUMNS_NAME_TO_POS.entrySet()) {
            Assertions.assertTrue((resultSchema.fieldIndex((String)entry2.getKey()) == ((Integer)entry2.getValue()).intValue() ? 1 : 0) != 0);
        }
        result.toJavaRDD().foreach((VoidFunction & Serializable)entry -> {
            Assertions.assertTrue((boolean)entry.get(resultSchema.fieldIndex(HoodieRecord.RECORD_KEY_METADATA_FIELD)).equals(""));
            Assertions.assertTrue((boolean)entry.get(resultSchema.fieldIndex(HoodieRecord.PARTITION_PATH_METADATA_FIELD)).equals(""));
            Assertions.assertTrue((boolean)entry.get(resultSchema.fieldIndex(HoodieRecord.COMMIT_SEQNO_METADATA_FIELD)).equals(""));
            Assertions.assertTrue((boolean)entry.get(resultSchema.fieldIndex(HoodieRecord.COMMIT_TIME_METADATA_FIELD)).equals(""));
            Assertions.assertTrue((boolean)entry.get(resultSchema.fieldIndex(HoodieRecord.FILENAME_METADATA_FIELD)).equals(""));
        });
        Dataset trimmedOutput = result.drop(HoodieRecord.PARTITION_PATH_METADATA_FIELD).drop(HoodieRecord.RECORD_KEY_METADATA_FIELD).drop(HoodieRecord.FILENAME_METADATA_FIELD).drop(HoodieRecord.COMMIT_SEQNO_METADATA_FIELD).drop(HoodieRecord.COMMIT_TIME_METADATA_FIELD);
        Assertions.assertTrue((dataset.except(trimmedOutput).count() == 0L ? 1 : 0) != 0);
    }

    @ParameterizedTest
    @MethodSource(value={"providePreCombineArgs"})
    public void testBulkInsertPreCombine(boolean enablePreCombine) {
        HoodieWriteConfig config = this.getConfigBuilder(this.schemaStr).withProps(this.getPropsAllSet("_row_key")).combineInput(enablePreCombine, enablePreCombine).withPreCombineField("ts").build();
        List<Row> inserts = DataSourceTestUtils.generateRandomRows(10);
        Dataset toUpdateDataset = this.sqlContext.createDataFrame(inserts.subList(0, 5), this.structType);
        List<Row> updates = DataSourceTestUtils.updateRowsWithUpdatedTs((Dataset<Row>)toUpdateDataset);
        ArrayList<Row> rows = new ArrayList<Row>();
        rows.addAll(inserts);
        rows.addAll(updates);
        Dataset dataset = this.sqlContext.createDataFrame(rows, this.structType);
        Dataset result = HoodieDatasetBulkInsertHelper.prepareForBulkInsert((Dataset)dataset, (HoodieWriteConfig)config, (BulkInsertPartitioner)new NonSortPartitionerWithRows(), (String)"000001111");
        StructType resultSchema = result.schema();
        Assertions.assertEquals((long)result.count(), (long)(enablePreCombine ? 10L : 15L));
        Assertions.assertEquals((int)resultSchema.fieldNames().length, (int)(this.structType.fieldNames().length + HoodieRecord.HOODIE_META_COLUMNS.size()));
        for (Map.Entry entry2 : HoodieRecord.HOODIE_META_COLUMNS_NAME_TO_POS.entrySet()) {
            Assertions.assertTrue((resultSchema.fieldIndex((String)entry2.getKey()) == ((Integer)entry2.getValue()).intValue() ? 1 : 0) != 0);
        }
        int metadataRecordKeyIndex = resultSchema.fieldIndex(HoodieRecord.RECORD_KEY_METADATA_FIELD);
        int metadataPartitionPathIndex = resultSchema.fieldIndex(HoodieRecord.PARTITION_PATH_METADATA_FIELD);
        int metadataCommitTimeIndex = resultSchema.fieldIndex(HoodieRecord.COMMIT_TIME_METADATA_FIELD);
        int metadataCommitSeqNoIndex = resultSchema.fieldIndex(HoodieRecord.COMMIT_SEQNO_METADATA_FIELD);
        int metadataFilenameIndex = resultSchema.fieldIndex(HoodieRecord.FILENAME_METADATA_FIELD);
        result.toJavaRDD().collect().forEach(entry -> {
            Assertions.assertTrue((boolean)entry.get(metadataRecordKeyIndex).equals(entry.getAs("_row_key")));
            Assertions.assertTrue((boolean)entry.get(metadataPartitionPathIndex).equals(entry.getAs("partition")));
            Assertions.assertTrue((boolean)entry.get(metadataCommitSeqNoIndex).equals(""));
            Assertions.assertTrue((boolean)entry.get(metadataCommitTimeIndex).equals(""));
            Assertions.assertTrue((boolean)entry.get(metadataFilenameIndex).equals(""));
        });
        Dataset trimmedOutput = result.drop(HoodieRecord.PARTITION_PATH_METADATA_FIELD).drop(HoodieRecord.RECORD_KEY_METADATA_FIELD).drop(HoodieRecord.FILENAME_METADATA_FIELD).drop(HoodieRecord.COMMIT_SEQNO_METADATA_FIELD).drop(HoodieRecord.COMMIT_TIME_METADATA_FIELD);
        ExpressionEncoder encoder = this.getEncoder(dataset.schema());
        if (enablePreCombine) {
            Dataset inputSnapshotDf = dataset.groupByKey((MapFunction & Serializable)value -> value.getAs("partition") + ":" + value.getAs("_row_key"), Encoders.STRING()).reduceGroups((ReduceFunction & Serializable)(v1, v2) -> {
                long ts2;
                long ts1 = (Long)v1.getAs("ts");
                if (ts1 >= (ts2 = ((Long)v2.getAs("ts")).longValue())) {
                    return v1;
                }
                return v2;
            }).map((MapFunction & Serializable)value -> (Row)value._2, (Encoder)encoder);
            Assertions.assertEquals((long)0L, (long)inputSnapshotDf.except(trimmedOutput).count());
        } else {
            Assertions.assertEquals((long)0L, (long)dataset.except(trimmedOutput).count());
        }
    }

    private Map<String, String> getPropsAllSet(String recordKey) {
        return this.getProps(recordKey, true, true, true, true);
    }

    private Map<String, String> getProps(boolean setAll, boolean setKeyGen, boolean setRecordKey, boolean setPartitionPath) {
        return this.getProps("_row_key", setAll, setKeyGen, setRecordKey, setPartitionPath);
    }

    private Map<String, String> getProps(String recordKey, boolean setAll, boolean setKeyGen, boolean setRecordKey, boolean setPartitionPath) {
        HashMap<String, String> props = new HashMap<String, String>();
        if (setAll) {
            props.put(DataSourceWriteOptions.KEYGENERATOR_CLASS_NAME().key(), "org.apache.hudi.keygen.SimpleKeyGenerator");
            props.put(DataSourceWriteOptions.RECORDKEY_FIELD().key(), recordKey);
            props.put(DataSourceWriteOptions.PARTITIONPATH_FIELD().key(), "partition");
            props.put(HoodieWriteConfig.TBL_NAME.key(), recordKey + "_table");
        } else {
            if (setKeyGen) {
                props.put(DataSourceWriteOptions.KEYGENERATOR_CLASS_NAME().key(), "org.apache.hudi.keygen.SimpleKeyGenerator");
            }
            if (setRecordKey) {
                props.put(DataSourceWriteOptions.RECORDKEY_FIELD().key(), recordKey);
            }
            if (setPartitionPath) {
                props.put(DataSourceWriteOptions.PARTITIONPATH_FIELD().key(), "partition");
            }
        }
        return props;
    }

    private Map<String, String> getPropsForComplexKeyGen(String recordKey) {
        HashMap<String, String> props = new HashMap<String, String>();
        props.put(DataSourceWriteOptions.KEYGENERATOR_CLASS_NAME().key(), ComplexKeyGenerator.class.getName());
        props.put(DataSourceWriteOptions.RECORDKEY_FIELD().key(), recordKey);
        props.put(DataSourceWriteOptions.PARTITIONPATH_FIELD().key(), "partition");
        props.put(HoodieWriteConfig.TBL_NAME.key(), recordKey + "_table");
        return props;
    }

    private Map<String, String> getPropsForNonPartitionedKeyGen(String recordKey) {
        HashMap<String, String> props = new HashMap<String, String>();
        props.put(DataSourceWriteOptions.KEYGENERATOR_CLASS_NAME().key(), NonpartitionedKeyGenerator.class.getName());
        props.put(DataSourceWriteOptions.RECORDKEY_FIELD().key(), recordKey);
        props.put(HoodieWriteConfig.TBL_NAME.key(), recordKey + "_table");
        return props;
    }

    @Test
    public void testNoPropsSet() {
        Dataset preparedDF2;
        HoodieWriteConfig config = this.getConfigBuilder(this.schemaStr).build();
        List<Row> rows = DataSourceTestUtils.generateRandomRows(10);
        Dataset dataset = this.sqlContext.createDataFrame(rows, this.structType);
        try {
            preparedDF2 = HoodieDatasetBulkInsertHelper.prepareForBulkInsert((Dataset)dataset, (HoodieWriteConfig)config, (BulkInsertPartitioner)new NonSortPartitionerWithRows(), (String)"000001111");
            preparedDF2.count();
            Assertions.fail((String)"Should have thrown exception");
        }
        catch (Exception preparedDF2) {
            // empty catch block
        }
        config = this.getConfigBuilder(this.schemaStr).withProps(this.getProps(false, false, true, true)).build();
        rows = DataSourceTestUtils.generateRandomRows(10);
        dataset = this.sqlContext.createDataFrame(rows, this.structType);
        try {
            preparedDF2 = HoodieDatasetBulkInsertHelper.prepareForBulkInsert((Dataset)dataset, (HoodieWriteConfig)config, (BulkInsertPartitioner)new NonSortPartitionerWithRows(), (String)"000001111");
            preparedDF2.count();
            Assertions.fail((String)"Should have thrown exception");
        }
        catch (Exception preparedDF3) {
            // empty catch block
        }
        config = this.getConfigBuilder(this.schemaStr).withProps(this.getProps(false, true, true, false)).build();
        rows = DataSourceTestUtils.generateRandomRows(10);
        dataset = this.sqlContext.createDataFrame(rows, this.structType);
        try {
            preparedDF2 = HoodieDatasetBulkInsertHelper.prepareForBulkInsert((Dataset)dataset, (HoodieWriteConfig)config, (BulkInsertPartitioner)new NonSortPartitionerWithRows(), (String)"000001111");
            preparedDF2.count();
            Assertions.fail((String)"Should have thrown exception");
        }
        catch (Exception exception) {
            // empty catch block
        }
    }

    private ExpressionEncoder getEncoder(StructType schema) {
        return SparkAdapterSupport$.MODULE$.sparkAdapter().getCatalystExpressionUtils().getEncoder(schema);
    }

    @Test
    public void testBulkInsertParallelismParam() {
        HoodieWriteConfig config = this.getConfigBuilder(this.schemaStr).withProps(this.getPropsAllSet("_row_key")).combineInput(true, true).withPreCombineField("ts").build();
        int checkParallelism = 7;
        config.setValue("hoodie.bulkinsert.shuffle.parallelism", String.valueOf(checkParallelism));
        StageCheckBulkParallelismListener stageCheckBulkParallelismListener = new StageCheckBulkParallelismListener("org.apache.hudi.HoodieDatasetBulkInsertHelper$.dedupeRows");
        this.sqlContext.sparkContext().addSparkListener((SparkListenerInterface)stageCheckBulkParallelismListener);
        List<Row> inserts = DataSourceTestUtils.generateRandomRows(10);
        Dataset dataset = this.sqlContext.createDataFrame(inserts, this.structType).repartition(3);
        Assertions.assertNotEquals((int)checkParallelism, (int)HoodieUnsafeUtils.getNumPartitions((Dataset)dataset));
        Assertions.assertNotEquals((int)checkParallelism, (int)this.sqlContext.sparkContext().defaultParallelism());
        Dataset result = HoodieDatasetBulkInsertHelper.prepareForBulkInsert((Dataset)dataset, (HoodieWriteConfig)config, (BulkInsertPartitioner)new NonSortPartitionerWithRows(), (String)"000001111");
        result.count();
        Assertions.assertEquals((int)checkParallelism, (int)stageCheckBulkParallelismListener.getParallelism());
        this.sqlContext.sparkContext().removeSparkListener((SparkListenerInterface)stageCheckBulkParallelismListener);
    }

    class StageCheckBulkParallelismListener
    extends SparkListener {
        private boolean checkFlag = false;
        private String checkMessage;
        private int parallelism;

        StageCheckBulkParallelismListener(String checkMessage) {
            this.checkMessage = checkMessage;
        }

        public void onStageSubmitted(SparkListenerStageSubmitted stageSubmitted) {
            if (this.checkFlag) {
                this.parallelism = stageSubmitted.stageInfo().numTasks();
                this.checkFlag = false;
            }
            if (stageSubmitted.stageInfo().details().contains(this.checkMessage)) {
                this.checkFlag = true;
            }
        }

        public int getParallelism() {
            return this.parallelism;
        }
    }
}

