/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.utilities.sources.debezium;

import java.io.Serializable;
import java.util.Arrays;
import java.util.List;
import java.util.Properties;
import java.util.stream.Collectors;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.hudi.AvroConversionUtils;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.table.checkpoint.Checkpoint;
import org.apache.hudi.common.table.checkpoint.StreamerCheckpointV2;
import org.apache.hudi.common.util.ConfigUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.utilities.config.HoodieSchemaProviderConfig;
import org.apache.hudi.utilities.config.KafkaSourceConfig;
import org.apache.hudi.utilities.exception.HoodieReadFromSourceException;
import org.apache.hudi.utilities.ingestion.HoodieIngestionMetrics;
import org.apache.hudi.utilities.schema.SchemaProvider;
import org.apache.hudi.utilities.schema.SchemaRegistryProvider;
import org.apache.hudi.utilities.sources.RowSource;
import org.apache.hudi.utilities.sources.helpers.AvroConvertor;
import org.apache.hudi.utilities.sources.helpers.KafkaOffsetGen;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.rdd.RDD;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.functions;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
import org.apache.spark.streaming.kafka010.KafkaUtils;
import org.apache.spark.streaming.kafka010.LocationStrategies;
import org.apache.spark.streaming.kafka010.LocationStrategy;
import org.apache.spark.streaming.kafka010.OffsetRange;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class DebeziumSource
extends RowSource {
    private static final Logger LOG = LoggerFactory.getLogger(DebeziumSource.class);
    private static final String NATIVE_KAFKA_KEY_DESERIALIZER_PROP = "key.deserializer";
    private static final String NATIVE_KAFKA_VALUE_DESERIALIZER_PROP = "value.deserializer";
    private static final String OVERRIDE_CHECKPOINT_STRING = "hoodie.debezium.override.initial.checkpoint.key";
    private static final String CONNECT_NAME_KEY = "connect.name";
    private static final String DATE_CONNECT_NAME = "custom.debezium.DateString";
    private final KafkaOffsetGen offsetGen;
    private final HoodieIngestionMetrics metrics;
    private final SchemaRegistryProvider schemaRegistryProvider;
    private final String deserializerClassName;

    public DebeziumSource(TypedProperties props, JavaSparkContext sparkContext, SparkSession sparkSession, SchemaProvider schemaProvider, HoodieIngestionMetrics metrics) {
        super(props, sparkContext, sparkSession, schemaProvider);
        props.put((Object)NATIVE_KAFKA_KEY_DESERIALIZER_PROP, (Object)StringDeserializer.class.getName());
        this.deserializerClassName = ConfigUtils.getStringWithAltKeys((Properties)props, KafkaSourceConfig.KAFKA_AVRO_VALUE_DESERIALIZER_CLASS, (boolean)true);
        try {
            props.put((Object)NATIVE_KAFKA_VALUE_DESERIALIZER_PROP, (Object)Class.forName(this.deserializerClassName).getName());
        }
        catch (ClassNotFoundException e) {
            String error = "Could not load custom avro kafka deserializer: " + this.deserializerClassName;
            LOG.error(error);
            throw new HoodieReadFromSourceException(error, e);
        }
        this.schemaRegistryProvider = schemaProvider == null || !(schemaProvider instanceof SchemaRegistryProvider) ? new SchemaRegistryProvider(props, sparkContext) : (SchemaRegistryProvider)schemaProvider;
        this.offsetGen = new KafkaOffsetGen(props);
        this.metrics = metrics;
    }

    @Override
    protected Pair<Option<Dataset<Row>>, Checkpoint> fetchNextBatch(Option<Checkpoint> lastCheckpoint, long sourceLimit) {
        String overrideCheckpointStr = this.props.getString(OVERRIDE_CHECKPOINT_STRING, "");
        OffsetRange[] offsetRanges = this.offsetGen.getNextOffsetRanges(lastCheckpoint, sourceLimit, this.metrics);
        long totalNewMsgs = KafkaOffsetGen.CheckpointUtils.totalNewMessages(offsetRanges);
        LOG.info("About to read " + totalNewMsgs + " from Kafka for topic :" + this.offsetGen.getTopicName());
        if (totalNewMsgs == 0L) {
            return Pair.of((Object)Option.of((Object)this.sparkSession.emptyDataFrame()), (Object)new StreamerCheckpointV2(overrideCheckpointStr.isEmpty() ? KafkaOffsetGen.CheckpointUtils.offsetsToStr(offsetRanges) : overrideCheckpointStr));
        }
        try {
            String schemaStr = this.schemaRegistryProvider.fetchSchemaFromRegistry(ConfigUtils.getStringWithAltKeys((Properties)this.props, HoodieSchemaProviderConfig.SRC_SCHEMA_REGISTRY_URL));
            Dataset<Row> dataset = this.toDataset(offsetRanges, this.offsetGen, schemaStr);
            LOG.info(String.format("Spark schema of Kafka Payload for topic %s:\n%s", this.offsetGen.getTopicName(), dataset.schema().treeString()));
            LOG.info(String.format("New checkpoint string: %s", KafkaOffsetGen.CheckpointUtils.offsetsToStr(offsetRanges)));
            return Pair.of((Object)Option.of(dataset), (Object)new StreamerCheckpointV2(overrideCheckpointStr.isEmpty() ? KafkaOffsetGen.CheckpointUtils.offsetsToStr(offsetRanges) : overrideCheckpointStr));
        }
        catch (Exception e) {
            LOG.error("Fatal error reading and parsing incoming debezium event", (Throwable)e);
            throw new HoodieReadFromSourceException("Fatal error reading and parsing incoming debezium event", e);
        }
    }

    protected abstract Dataset<Row> processDataset(Dataset<Row> var1);

    private Dataset<Row> toDataset(OffsetRange[] offsetRanges, KafkaOffsetGen offsetGen, String schemaStr) {
        AvroConvertor convertor = new AvroConvertor(schemaStr);
        Dataset kafkaData = this.deserializerClassName.equals(StringDeserializer.class.getName()) ? AvroConversionUtils.createDataFrame((RDD)KafkaUtils.createRDD((JavaSparkContext)this.sparkContext, offsetGen.getKafkaParams(), (OffsetRange[])offsetRanges, (LocationStrategy)LocationStrategies.PreferConsistent()).map((Function & Serializable)obj -> convertor.fromJson((String)obj.value())).rdd(), (String)schemaStr, (SparkSession)this.sparkSession) : AvroConversionUtils.createDataFrame((RDD)KafkaUtils.createRDD((JavaSparkContext)this.sparkContext, offsetGen.getKafkaParams(), (OffsetRange[])offsetRanges, (LocationStrategy)LocationStrategies.PreferConsistent()).map((Function & Serializable)obj -> (GenericRecord)obj.value()).rdd(), (String)schemaStr, (SparkSession)this.sparkSession);
        Dataset<Row> debeziumDataset = this.processDataset((Dataset<Row>)kafkaData);
        return DebeziumSource.convertArrayColumnsToString(DebeziumSource.convertColumnToNullable(this.sparkSession, DebeziumSource.convertDateColumns(debeziumDataset, new Schema.Parser().parse(schemaStr))));
    }

    public static Dataset<Row> convertDateColumns(Dataset<Row> dataset, Schema schema) {
        if (schema.getField("before") != null) {
            List dateFields = ((Schema)schema.getField("before").schema().getTypes().get(1)).getFields().stream().filter(field -> {
                if (field.schema().getType() == Schema.Type.UNION) {
                    return field.schema().getTypes().stream().anyMatch(schemaInUnion -> DATE_CONNECT_NAME.equals(schemaInUnion.getProp(CONNECT_NAME_KEY)));
                }
                return DATE_CONNECT_NAME.equals(field.schema().getProp(CONNECT_NAME_KEY));
            }).map(Schema.Field::name).collect(Collectors.toList());
            LOG.info("Date fields: " + dateFields.toString());
            for (String dateCol : dateFields) {
                dataset = dataset.withColumn(dateCol, functions.col((String)dateCol).cast(DataTypes.DateType));
            }
        }
        return dataset;
    }

    private static Dataset<Row> convertColumnToNullable(SparkSession sparkSession, Dataset<Row> dataset) {
        List<String> columns = Arrays.asList(dataset.columns());
        StructField[] modifiedStructFields = (StructField[])Arrays.stream(dataset.schema().fields()).map(field -> columns.contains(field.name()) ? new StructField(field.name(), field.dataType(), true, field.metadata()) : field).toArray(StructField[]::new);
        return sparkSession.createDataFrame(dataset.rdd(), new StructType(modifiedStructFields));
    }

    private static Dataset<Row> convertArrayColumnsToString(Dataset<Row> dataset) {
        List arrayColumns = Arrays.stream(dataset.schema().fields()).filter(field -> field.dataType().typeName().toLowerCase().startsWith("array")).map(StructField::name).collect(Collectors.toList());
        for (String colName : arrayColumns) {
            dataset = dataset.withColumn(colName, functions.col((String)colName).cast(DataTypes.StringType));
        }
        return dataset;
    }

    @Override
    public void onCommit(String lastCkptStr) {
        if (ConfigUtils.getBooleanWithAltKeys((Properties)this.props, KafkaSourceConfig.ENABLE_KAFKA_COMMIT_OFFSET)) {
            this.offsetGen.commitOffsetToKafka(lastCkptStr);
        }
    }
}

