/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.utilities.deltastreamer;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hudi.AvroConversionUtils;
import org.apache.hudi.DataSourceWriteOptions;
import org.apache.hudi.HoodieSparkUtils;
import org.apache.hudi.TestHoodieSparkUtils;
import org.apache.hudi.avro.HoodieAvroUtils;
import org.apache.hudi.client.common.HoodieSparkEngineContext;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.model.HoodieAvroRecord;
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieClusteringConfig;
import org.apache.hudi.config.HoodieCompactionConfig;
import org.apache.hudi.config.HoodieErrorTableConfig;
import org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer;
import org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamerTestBase;
import org.apache.hudi.utilities.deltastreamer.TestHoodieDeltaStreamer;
import org.apache.hudi.utilities.schema.FilebasedSchemaProvider;
import org.apache.hudi.utilities.schema.SchemaProvider;
import org.apache.hudi.utilities.sources.AvroKafkaSource;
import org.apache.hudi.utilities.sources.ParquetDFSSource;
import org.apache.hudi.utilities.streamer.BaseErrorTableWriter;
import org.apache.hudi.utilities.streamer.HoodieStreamer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.types.DataType;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;

public class TestHoodieDeltaStreamerSchemaEvolutionBase
extends HoodieDeltaStreamerTestBase {
    protected static Set<String> createdTopicNames = new HashSet<String>();
    protected String tableType;
    protected String tableBasePath;
    protected String tableName;
    protected Boolean shouldCluster;
    protected Boolean shouldCompact;
    protected Boolean rowWriterEnable;
    protected Boolean addFilegroups;
    protected Boolean multiLogFiles;
    protected Boolean useSchemaProvider;
    protected Boolean hasTransformer;
    protected String sourceSchemaFile;
    protected String targetSchemaFile;
    protected boolean useKafkaSource;
    protected boolean withErrorTable;
    protected boolean useTransformer;
    protected boolean userProvidedSchema;
    protected HoodieStreamer deltaStreamer;

    @BeforeAll
    public static void initKafka() {
        defaultSchemaProviderClassName = TestSchemaProvider.class.getName();
    }

    @Override
    @BeforeEach
    public void setupTest() {
        super.setupTest();
        TestErrorTable.commited = new HashMap<String, Option<JavaRDD>>();
        TestErrorTable.errorEvents = new ArrayList<JavaRDD>();
        this.useSchemaProvider = false;
        this.hasTransformer = false;
        this.withErrorTable = false;
        this.sourceSchemaFile = "";
        this.targetSchemaFile = "";
        topicName = "topic" + testNum;
    }

    @Override
    @AfterEach
    public void teardown() throws Exception {
        super.teardown();
        TestSchemaProvider.resetTargetSchema();
    }

    @AfterAll
    static void teardownAll() {
        defaultSchemaProviderClassName = FilebasedSchemaProvider.class.getName();
    }

    protected HoodieDeltaStreamer.Config getDeltaStreamerConfig() throws IOException {
        return this.getDeltaStreamerConfig(true);
    }

    protected HoodieDeltaStreamer.Config getDeltaStreamerConfig(boolean nullForDeletedCols) throws IOException {
        String[] stringArray;
        if (this.useTransformer) {
            String[] stringArray2 = new String[1];
            stringArray = stringArray2;
            stringArray2[0] = TestHoodieDeltaStreamer.TestIdentityTransformer.class.getName();
        } else {
            stringArray = new String[]{};
        }
        String[] transformerClasses = stringArray;
        return this.getDeltaStreamerConfig(transformerClasses, nullForDeletedCols);
    }

    protected HoodieDeltaStreamer.Config getDeltaStreamerConfig(String[] transformerClasses, boolean nullForDeletedCols) throws IOException {
        return this.getDeltaStreamerConfig(transformerClasses, nullForDeletedCols, new TypedProperties());
    }

    protected HoodieDeltaStreamer.Config getDeltaStreamerConfig(String[] transformerClasses, boolean nullForDeletedCols, TypedProperties extraProps) throws IOException {
        HoodieDeltaStreamer.Config cfg;
        extraProps.setProperty("hoodie.datasource.write.table.type", this.tableType);
        extraProps.setProperty("hoodie.datasource.write.row.writer.enable", this.rowWriterEnable.toString());
        extraProps.setProperty(DataSourceWriteOptions.SET_NULL_FOR_MISSING_COLUMNS().key(), Boolean.toString(nullForDeletedCols));
        extraProps.setProperty("hoodie.parquet.small.file.limit", "0");
        int maxCommits = 2;
        if (this.addFilegroups.booleanValue()) {
            ++maxCommits;
        }
        if (this.multiLogFiles.booleanValue()) {
            ++maxCommits;
        }
        extraProps.setProperty(HoodieCompactionConfig.INLINE_COMPACT.key(), this.shouldCompact.toString());
        if (this.shouldCompact.booleanValue()) {
            extraProps.setProperty(HoodieCompactionConfig.INLINE_COMPACT_NUM_DELTA_COMMITS.key(), Integer.toString(maxCommits));
        }
        if (this.shouldCluster.booleanValue()) {
            extraProps.setProperty(HoodieClusteringConfig.INLINE_CLUSTERING.key(), "true");
            extraProps.setProperty(HoodieClusteringConfig.INLINE_CLUSTERING_MAX_COMMITS.key(), Integer.toString(maxCommits));
            extraProps.setProperty(HoodieClusteringConfig.PLAN_STRATEGY_SORT_COLUMNS.key(), "_row_key");
        }
        if (this.withErrorTable) {
            extraProps.setProperty(HoodieErrorTableConfig.ERROR_TABLE_ENABLED.key(), "true");
            extraProps.setProperty(HoodieErrorTableConfig.ERROR_ENABLE_VALIDATE_TARGET_SCHEMA.key(), "true");
            extraProps.setProperty(HoodieErrorTableConfig.ERROR_ENABLE_VALIDATE_RECORD_CREATION.key(), "true");
            extraProps.setProperty(HoodieErrorTableConfig.ERROR_TARGET_TABLE.key(), this.tableName + "ERROR");
            extraProps.setProperty(HoodieErrorTableConfig.ERROR_TABLE_BASE_PATH.key(), basePath + this.tableName + "ERROR");
            extraProps.setProperty(HoodieErrorTableConfig.ERROR_TABLE_WRITE_CLASS.key(), TestErrorTable.class.getName());
            extraProps.setProperty("hoodie.base.path", this.tableBasePath);
        }
        ArrayList<String> transformerClassNames = new ArrayList<String>();
        Collections.addAll(transformerClassNames, transformerClasses);
        if (this.useKafkaSource) {
            this.prepareAvroKafkaDFSSource("test-avro-kafka-dfs-source.properties", null, topicName, "partition_path", extraProps);
            cfg = HoodieDeltaStreamerTestBase.TestHelpers.makeConfig(this.tableBasePath, WriteOperationType.UPSERT, AvroKafkaSource.class.getName(), transformerClassNames, "test-avro-kafka-dfs-source.properties", false, this.useSchemaProvider, 100000, false, null, this.tableType, "timestamp", null);
        } else {
            this.prepareParquetDFSSource(false, this.hasTransformer, this.sourceSchemaFile, this.targetSchemaFile, "test-parquet-dfs-source.properties", PARQUET_SOURCE_ROOT, false, "partition_path", "", extraProps, false);
            cfg = HoodieDeltaStreamerTestBase.TestHelpers.makeConfig(this.tableBasePath, WriteOperationType.UPSERT, ParquetDFSSource.class.getName(), transformerClassNames, "test-parquet-dfs-source.properties", false, this.useSchemaProvider, 100000, false, null, this.tableType, "timestamp", null);
        }
        cfg.forceDisableCompaction = this.shouldCompact == false;
        return cfg;
    }

    protected void addData(Dataset<Row> df, Boolean isFirst) {
        if (this.useSchemaProvider.booleanValue()) {
            TestSchemaProvider.sourceSchema = AvroConversionUtils.convertStructTypeToAvroSchema((DataType)df.schema(), (String)"hoodie_source", (String)"hoodie.source");
            if (this.withErrorTable && isFirst.booleanValue()) {
                TestSchemaProvider.setTargetSchema(AvroConversionUtils.convertStructTypeToAvroSchema((DataType)TestHoodieSparkUtils.getSchemaColumnNotNullable((StructType)df.schema(), (String)"_row_key"), (String)"idk", (String)"idk"));
            }
        }
        if (this.useKafkaSource) {
            this.addKafkaData(df, isFirst);
        } else {
            this.addParquetData(df, isFirst);
        }
    }

    protected void addParquetData(Dataset<Row> df, Boolean isFirst) {
        df.write().format("parquet").mode(isFirst != false ? SaveMode.Overwrite : SaveMode.Append).save(PARQUET_SOURCE_ROOT);
    }

    protected void addKafkaData(Dataset<Row> df, Boolean isFirst) {
        if (isFirst.booleanValue() && !createdTopicNames.contains(topicName)) {
            this.testUtils.createTopic(topicName);
            createdTopicNames.add(topicName);
        }
        List records = HoodieSparkUtils.createRdd(df, (String)"hoodie_source", (String)"hoodie.source", (boolean)false, (Option)Option.empty()).toJavaRDD().collect();
        try (KafkaProducer producer = new KafkaProducer(this.getProducerProperties());){
            for (GenericRecord record : records) {
                producer.send(new ProducerRecord(topicName, Integer.valueOf(0), (Object)"key", (Object)HoodieAvroUtils.avroToBytes((GenericRecord)record)));
            }
        }
    }

    protected Properties getProducerProperties() {
        Properties props = new Properties();
        props.put("bootstrap.servers", this.testUtils.brokerAddress());
        props.put("value.serializer", ByteArraySerializer.class.getName());
        props.put("value.deserializer", ByteArraySerializer.class.getName());
        props.put("key.serializer", StringSerializer.class.getName());
        props.put("auto.register.schemas", "false");
        props.put("acks", "all");
        return props;
    }

    protected void assertFileNumber(int expected, boolean isCow) {
        if (isCow) {
            this.assertBaseFileOnlyNumber(expected);
        } else {
            Assertions.assertEquals((long)expected, (long)sparkSession.read().format("hudi").load(this.tableBasePath).select("_hoodie_commit_time", new String[]{"_hoodie_file_name"}).distinct().count());
        }
    }

    protected void assertBaseFileOnlyNumber(int expected) {
        Dataset df = sparkSession.read().format("hudi").load(this.tableBasePath).select("_hoodie_file_name", new String[0]);
        df.createOrReplaceTempView("assertFileNumberPostCompactCluster");
        Assertions.assertEquals((long)df.count(), (long)sparkSession.sql("select * from assertFileNumberPostCompactCluster where _hoodie_file_name like '%.parquet'").count());
        Assertions.assertEquals((long)expected, (long)df.distinct().count());
    }

    protected void assertRecordCount(int expected) {
        sqlContext.clearCache();
        long recordCount = sqlContext.read().format("org.apache.hudi").load(this.tableBasePath).count();
        Assertions.assertEquals((long)expected, (long)recordCount);
    }

    protected StructType createFareStruct(DataType amountType) {
        return this.createFareStruct(amountType, false);
    }

    protected StructType createFareStruct(DataType amountType, Boolean dropCols) {
        if (dropCols.booleanValue()) {
            return DataTypes.createStructType((StructField[])new StructField[]{new StructField("amount", amountType, true, Metadata.empty())});
        }
        return DataTypes.createStructType((StructField[])new StructField[]{new StructField("amount", amountType, true, Metadata.empty()), new StructField("currency", DataTypes.StringType, true, Metadata.empty())});
    }

    public static class TestErrorTable
    extends BaseErrorTableWriter {
        public static List<JavaRDD> errorEvents = new ArrayList<JavaRDD>();
        public static Map<String, Option<JavaRDD>> commited = new HashMap<String, Option<JavaRDD>>();

        public TestErrorTable(HoodieStreamer.Config cfg, SparkSession sparkSession, TypedProperties props, HoodieSparkEngineContext hoodieSparkContext, FileSystem fileSystem) {
            super(cfg, sparkSession, props, hoodieSparkContext, fileSystem);
        }

        public void addErrorEvents(JavaRDD errorEvent) {
            errorEvents.add(errorEvent);
        }

        public boolean upsertAndCommit(String baseTableInstantTime, Option commitedInstantTime) {
            if (errorEvents.size() > 0) {
                JavaRDD errorsCombined = errorEvents.get(0);
                for (int i = 1; i < errorEvents.size(); ++i) {
                    errorsCombined = errorsCombined.union(errorEvents.get(i));
                }
                commited.put(baseTableInstantTime, (Option<JavaRDD>)Option.of((Object)errorsCombined));
                errorEvents = new ArrayList<JavaRDD>();
            } else {
                commited.put(baseTableInstantTime, (Option<JavaRDD>)Option.empty());
            }
            return true;
        }

        public Option<JavaRDD<HoodieAvroRecord>> getErrorEvents(String baseTableInstantTime, Option commitedInstantTime) {
            return Option.empty();
        }
    }

    public static class TestSchemaProvider
    extends SchemaProvider {
        public static Schema sourceSchema;
        public static Schema targetSchema;

        public TestSchemaProvider(TypedProperties props, JavaSparkContext jssc) {
            super(props, jssc);
        }

        public Schema getSourceSchema() {
            return sourceSchema;
        }

        public Schema getTargetSchema() {
            return targetSchema != null ? targetSchema : sourceSchema;
        }

        public static void setTargetSchema(Schema targetSchema) {
            TestSchemaProvider.targetSchema = targetSchema;
        }

        public static void resetTargetSchema() {
            targetSchema = null;
        }

        static {
            targetSchema = null;
        }
    }
}

