/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.utilities.deltastreamer;

import java.util.stream.Stream;
import org.apache.avro.Schema;
import org.apache.hudi.AvroConversionUtils;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.testutils.HoodieClientTestUtils;
import org.apache.hudi.utilities.config.HoodieStreamerConfig;
import org.apache.hudi.utilities.schema.RowBasedSchemaProvider;
import org.apache.hudi.utilities.schema.SchemaProvider;
import org.apache.hudi.utilities.sources.InputBatch;
import org.apache.hudi.utilities.sources.RowSource;
import org.apache.hudi.utilities.sources.Source;
import org.apache.hudi.utilities.streamer.SourceFormatAdapter;
import org.apache.hudi.utilities.testutils.SanitizationTestUtils;
import org.apache.spark.SparkContext;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.avro.SchemaConverters;
import org.apache.spark.sql.types.DataType;
import org.apache.spark.sql.types.StructType;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;

public class TestSourceFormatAdapter {
    protected static SparkSession spark;
    protected static JavaSparkContext jsc;
    private static final String DUMMY_CHECKPOINT = "dummy_checkpoint";
    private TestRowDataSource testRowDataSource;
    private TestJsonDataSource testJsonDataSource;

    @BeforeAll
    public static void start() {
        spark = SparkSession.builder().config(HoodieClientTestUtils.getSparkConfForTest((String)TestSourceFormatAdapter.class.getName())).getOrCreate();
        jsc = JavaSparkContext.fromSparkContext((SparkContext)spark.sparkContext());
    }

    @AfterAll
    public static void shutdown() {
        jsc.close();
        spark.close();
    }

    @AfterEach
    public void teardown() {
        this.testRowDataSource = null;
        this.testJsonDataSource = null;
    }

    private void setupRowSource(Dataset<Row> ds, TypedProperties properties, SchemaProvider schemaProvider) {
        InputBatch batch = new InputBatch(Option.of(ds), DUMMY_CHECKPOINT, schemaProvider);
        this.testRowDataSource = new TestRowDataSource(properties, jsc, spark, schemaProvider, (InputBatch<Dataset<Row>>)batch);
    }

    private void setupJsonSource(JavaRDD<String> ds, Schema schema) {
        BasicSchemaProvider basicSchemaProvider = new BasicSchemaProvider(schema);
        InputBatch batch = new InputBatch(Option.of(ds), DUMMY_CHECKPOINT, (SchemaProvider)basicSchemaProvider);
        this.testJsonDataSource = new TestJsonDataSource(new TypedProperties(), jsc, spark, basicSchemaProvider, (InputBatch<JavaRDD<String>>)batch);
    }

    private InputBatch<Dataset<Row>> fetchRowData(JavaRDD<String> rdd, StructType unsanitizedSchema, SchemaProvider schemaProvider) {
        TypedProperties typedProperties = new TypedProperties();
        typedProperties.put((Object)HoodieStreamerConfig.SANITIZE_SCHEMA_FIELD_NAMES.key(), (Object)true);
        typedProperties.put((Object)HoodieStreamerConfig.SCHEMA_FIELD_NAME_INVALID_CHAR_MASK.key(), (Object)"__");
        this.setupRowSource((Dataset<Row>)spark.read().schema(unsanitizedSchema).json(rdd), typedProperties, schemaProvider);
        SourceFormatAdapter sourceFormatAdapter = new SourceFormatAdapter((Source)this.testRowDataSource, Option.empty(), Option.of((Object)typedProperties));
        return sourceFormatAdapter.fetchNewDataInRowFormat(Option.of((Object)DUMMY_CHECKPOINT), 10L);
    }

    private InputBatch<Dataset<Row>> fetchJsonData(JavaRDD<String> rdd, StructType sanitizedSchema) {
        TypedProperties typedProperties = new TypedProperties();
        typedProperties.put((Object)HoodieStreamerConfig.SANITIZE_SCHEMA_FIELD_NAMES.key(), (Object)true);
        typedProperties.put((Object)HoodieStreamerConfig.SCHEMA_FIELD_NAME_INVALID_CHAR_MASK.key(), (Object)"__");
        this.setupJsonSource(rdd, SchemaConverters.toAvroType((DataType)sanitizedSchema, (boolean)false, (String)"record", (String)""));
        SourceFormatAdapter sourceFormatAdapter = new SourceFormatAdapter((Source)this.testJsonDataSource, Option.empty(), Option.of((Object)typedProperties));
        return sourceFormatAdapter.fetchNewDataInRowFormat(Option.of((Object)DUMMY_CHECKPOINT), 10L);
    }

    private void verifySanitization(InputBatch<Dataset<Row>> inputBatch, String sanitizedDataFile, StructType sanitizedSchema) {
        JavaRDD expectedRDD = jsc.textFile(sanitizedDataFile);
        Assertions.assertTrue((boolean)inputBatch.getBatch().isPresent());
        Dataset ds = (Dataset)inputBatch.getBatch().get();
        Assertions.assertEquals((int)2, (int)ds.collectAsList().size());
        Assertions.assertEquals((Object)sanitizedSchema, (Object)ds.schema());
        if (inputBatch.getSchemaProvider() instanceof RowBasedSchemaProvider) {
            Assertions.assertEquals((Object)AvroConversionUtils.convertStructTypeToAvroSchema((DataType)sanitizedSchema, (String)"hoodie_source", (String)"hoodie.source"), (Object)inputBatch.getSchemaProvider().getSourceSchema());
        }
        Assertions.assertEquals((Object)expectedRDD.collect(), (Object)ds.toJSON().collectAsList());
    }

    @ParameterizedTest
    @MethodSource(value={"provideDataFiles"})
    public void testRowSanitization(String unsanitizedDataFile, String sanitizedDataFile, StructType unsanitizedSchema, StructType sanitizedSchema) {
        JavaRDD unsanitizedRDD = jsc.textFile(unsanitizedDataFile);
        InputBatch.NullSchemaProvider schemaProvider = InputBatch.NullSchemaProvider.getInstance();
        this.verifySanitization(this.fetchRowData((JavaRDD<String>)unsanitizedRDD, unsanitizedSchema, (SchemaProvider)schemaProvider), sanitizedDataFile, sanitizedSchema);
        this.verifySanitization(this.fetchRowData((JavaRDD<String>)unsanitizedRDD, unsanitizedSchema, null), sanitizedDataFile, sanitizedSchema);
    }

    @ParameterizedTest
    @MethodSource(value={"provideDataFiles"})
    public void testJsonSanitization(String unsanitizedDataFile, String sanitizedDataFile, StructType unsanitizedSchema, StructType sanitizedSchema) {
        JavaRDD unsanitizedRDD = jsc.textFile(unsanitizedDataFile);
        this.verifySanitization(this.fetchJsonData((JavaRDD<String>)unsanitizedRDD, sanitizedSchema), sanitizedDataFile, sanitizedSchema);
    }

    private static Stream<Arguments> provideDataFiles() {
        return SanitizationTestUtils.provideDataFiles();
    }

    public static class BasicSchemaProvider
    extends SchemaProvider {
        private final Schema schema;

        public BasicSchemaProvider(Schema schema) {
            this(null, null, schema);
        }

        public BasicSchemaProvider(TypedProperties props, JavaSparkContext jssc, Schema schema) {
            super(props, jssc);
            this.schema = schema;
        }

        public Schema getSourceSchema() {
            return this.schema;
        }
    }

    public static class TestJsonDataSource
    extends Source<JavaRDD<String>> {
        private final InputBatch<JavaRDD<String>> batch;

        public TestJsonDataSource(TypedProperties props, JavaSparkContext sparkContext, SparkSession sparkSession, SchemaProvider schemaProvider, InputBatch<JavaRDD<String>> batch) {
            super(props, sparkContext, sparkSession, schemaProvider, Source.SourceType.JSON);
            this.batch = batch;
        }

        protected InputBatch<JavaRDD<String>> fetchNewData(Option<String> lastCkptStr, long sourceLimit) {
            return this.batch;
        }
    }

    public static class TestRowDataSource
    extends RowSource {
        private final InputBatch<Dataset<Row>> batch;

        public TestRowDataSource(TypedProperties props, JavaSparkContext sparkContext, SparkSession sparkSession, SchemaProvider schemaProvider, InputBatch<Dataset<Row>> batch) {
            super(props, sparkContext, sparkSession, schemaProvider);
            this.batch = batch;
        }

        protected Pair<Option<Dataset<Row>>, String> fetchNextBatch(Option<String> lastCkptStr, long sourceLimit) {
            return Pair.of((Object)this.batch.getBatch(), (Object)this.batch.getCheckpointForNextBatch());
        }
    }
}

