/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.utilities.sources;

import java.io.Serializable;
import java.util.Properties;
import org.apache.avro.generic.GenericRecord;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.table.checkpoint.Checkpoint;
import org.apache.hudi.common.util.ConfigUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.utilities.UtilHelpers;
import org.apache.hudi.utilities.config.KafkaSourceConfig;
import org.apache.hudi.utilities.deser.KafkaAvroSchemaDeserializer;
import org.apache.hudi.utilities.exception.HoodieReadFromSourceException;
import org.apache.hudi.utilities.ingestion.HoodieIngestionMetrics;
import org.apache.hudi.utilities.schema.SchemaProvider;
import org.apache.hudi.utilities.sources.InputBatch;
import org.apache.hudi.utilities.sources.KafkaSource;
import org.apache.hudi.utilities.sources.Source;
import org.apache.hudi.utilities.sources.helpers.AvroConvertor;
import org.apache.hudi.utilities.sources.helpers.KafkaOffsetGen;
import org.apache.hudi.utilities.sources.helpers.KafkaSourceUtil;
import org.apache.hudi.utilities.streamer.DefaultStreamContext;
import org.apache.hudi.utilities.streamer.SourceProfileSupplier;
import org.apache.hudi.utilities.streamer.StreamContext;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.streaming.kafka010.KafkaUtils;
import org.apache.spark.streaming.kafka010.LocationStrategies;
import org.apache.spark.streaming.kafka010.LocationStrategy;
import org.apache.spark.streaming.kafka010.OffsetRange;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class AvroKafkaSource
extends KafkaSource<JavaRDD<GenericRecord>> {
    private static final Logger LOG = LoggerFactory.getLogger(AvroKafkaSource.class);
    public static final String KAFKA_AVRO_VALUE_DESERIALIZER_PROPERTY_PREFIX = "hoodie.streamer.source.kafka.value.deserializer.";
    @Deprecated
    public static final String OLD_KAFKA_AVRO_VALUE_DESERIALIZER_PROPERTY_PREFIX = "hoodie.deltastreamer.source.kafka.value.deserializer.";
    @Deprecated
    public static final String KAFKA_AVRO_VALUE_DESERIALIZER_SCHEMA = "hoodie.deltastreamer.source.kafka.value.deserializer.schema";
    private final String deserializerClassName;
    protected final SchemaProvider originalSchemaProvider;

    public AvroKafkaSource(TypedProperties props, JavaSparkContext sparkContext, SparkSession sparkSession, SchemaProvider schemaProvider, HoodieIngestionMetrics metrics) {
        this(props, sparkContext, sparkSession, metrics, (StreamContext)new DefaultStreamContext(schemaProvider, (Option<SourceProfileSupplier>)Option.empty()));
    }

    public AvroKafkaSource(TypedProperties properties, JavaSparkContext sparkContext, SparkSession sparkSession, HoodieIngestionMetrics metrics, StreamContext streamContext) {
        super(properties, sparkContext, sparkSession, Source.SourceType.AVRO, metrics, new DefaultStreamContext(UtilHelpers.getSchemaProviderForKafkaSource(streamContext.getSchemaProvider(), properties, sparkContext), streamContext.getSourceProfileSupplier()));
        this.originalSchemaProvider = streamContext.getSchemaProvider();
        this.props.put((Object)"key.deserializer", (Object)StringDeserializer.class.getName());
        this.deserializerClassName = ConfigUtils.getStringWithAltKeys((Properties)this.props, KafkaSourceConfig.KAFKA_AVRO_VALUE_DESERIALIZER_CLASS, (boolean)true);
        try {
            this.props.put((Object)"value.deserializer", (Object)Class.forName(this.deserializerClassName).getName());
        }
        catch (ClassNotFoundException e) {
            String error = "Could not load custom avro kafka deserializer: " + this.deserializerClassName;
            LOG.error(error);
            throw new HoodieReadFromSourceException(error, e);
        }
        if (this.deserializerClassName.equals(KafkaAvroSchemaDeserializer.class.getName())) {
            KafkaSourceUtil.configureSchemaDeserializer(this.schemaProvider, this.props);
        }
        this.offsetGen = new KafkaOffsetGen(this.props);
    }

    @Override
    protected InputBatch<JavaRDD<GenericRecord>> readFromCheckpoint(Option<Checkpoint> lastCheckpoint, long sourceLimit) {
        if (this.deserializerClassName.equals(KafkaAvroSchemaDeserializer.class.getName())) {
            KafkaSourceUtil.configureSchemaDeserializer(this.schemaProvider, this.props);
            this.offsetGen = new KafkaOffsetGen(this.props);
        }
        return super.readFromCheckpoint(lastCheckpoint, sourceLimit);
    }

    @Override
    protected JavaRDD<GenericRecord> toBatch(OffsetRange[] offsetRanges) {
        JavaRDD kafkaRDD;
        if (this.deserializerClassName.equals(ByteArrayDeserializer.class.getName())) {
            if (this.schemaProvider == null) {
                throw new HoodieReadFromSourceException("Please provide a valid schema provider class when use ByteArrayDeserializer!");
            }
            AvroConvertor convertor = new AvroConvertor(this.originalSchemaProvider.getSourceSchema());
            kafkaRDD = KafkaUtils.createRDD((JavaSparkContext)this.sparkContext, this.offsetGen.getKafkaParams(), (OffsetRange[])offsetRanges, (LocationStrategy)LocationStrategies.PreferConsistent()).filter((Function & Serializable)obj -> obj.value() != null).map((Function & Serializable)obj -> new ConsumerRecord(obj.topic(), obj.partition(), obj.offset(), obj.key(), (Object)convertor.fromAvroBinary((byte[])obj.value())));
        } else {
            kafkaRDD = KafkaUtils.createRDD((JavaSparkContext)this.sparkContext, this.offsetGen.getKafkaParams(), (OffsetRange[])offsetRanges, (LocationStrategy)LocationStrategies.PreferConsistent());
        }
        return this.maybeAppendKafkaOffsets((JavaRDD<ConsumerRecord<Object, Object>>)kafkaRDD.filter((Function & Serializable)consemerRec -> consemerRec.value() != null));
    }

    protected JavaRDD<GenericRecord> maybeAppendKafkaOffsets(JavaRDD<ConsumerRecord<Object, Object>> kafkaRDD) {
        if (this.shouldAddOffsets) {
            AvroConvertor convertor = new AvroConvertor(this.schemaProvider.getSourceSchema());
            return kafkaRDD.map(convertor::withKafkaFieldsAppended);
        }
        return kafkaRDD.map((Function & Serializable)consumerRecord -> (GenericRecord)consumerRecord.value());
    }
}

