/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.utilities.sources;

import java.io.Serializable;
import org.apache.avro.generic.GenericRecord;
import org.apache.hudi.DataSourceWriteOptions;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamerMetrics;
import org.apache.hudi.utilities.deser.KafkaAvroSchemaDeserializer;
import org.apache.hudi.utilities.schema.SchemaProvider;
import org.apache.hudi.utilities.sources.KafkaSource;
import org.apache.hudi.utilities.sources.Source;
import org.apache.hudi.utilities.sources.helpers.AvroConvertor;
import org.apache.hudi.utilities.sources.helpers.KafkaOffsetGen;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.streaming.kafka010.KafkaUtils;
import org.apache.spark.streaming.kafka010.LocationStrategies;
import org.apache.spark.streaming.kafka010.OffsetRange;

public class AvroKafkaSource
extends KafkaSource<GenericRecord> {
    private static final Logger LOG = LogManager.getLogger(AvroKafkaSource.class);
    public static final String KAFKA_AVRO_VALUE_DESERIALIZER_PROPERTY_PREFIX = "hoodie.deltastreamer.source.kafka.value.deserializer.";
    public static final String KAFKA_AVRO_VALUE_DESERIALIZER_SCHEMA = "hoodie.deltastreamer.source.kafka.value.deserializer.schema";
    private final String deserializerClassName;

    public AvroKafkaSource(TypedProperties props, JavaSparkContext sparkContext, SparkSession sparkSession, SchemaProvider schemaProvider, HoodieDeltaStreamerMetrics metrics) {
        super(props, sparkContext, sparkSession, schemaProvider, Source.SourceType.AVRO, metrics);
        props.put("key.deserializer", StringDeserializer.class.getName());
        this.deserializerClassName = props.getString(DataSourceWriteOptions.KAFKA_AVRO_VALUE_DESERIALIZER_CLASS().key(), DataSourceWriteOptions.KAFKA_AVRO_VALUE_DESERIALIZER_CLASS().defaultValue());
        try {
            props.put("value.deserializer", Class.forName(this.deserializerClassName).getName());
            if (this.deserializerClassName.equals(KafkaAvroSchemaDeserializer.class.getName())) {
                if (schemaProvider == null) {
                    throw new HoodieIOException("SchemaProvider has to be set to use KafkaAvroSchemaDeserializer");
                }
                props.put(KAFKA_AVRO_VALUE_DESERIALIZER_SCHEMA, schemaProvider.getSourceSchema().toString());
            }
        }
        catch (ClassNotFoundException e) {
            String error = "Could not load custom avro kafka deserializer: " + this.deserializerClassName;
            LOG.error((Object)error);
            throw new HoodieException(error, e);
        }
        this.offsetGen = new KafkaOffsetGen(props);
    }

    @Override
    JavaRDD<GenericRecord> toRDD(OffsetRange[] offsetRanges) {
        if (this.deserializerClassName.equals(ByteArrayDeserializer.class.getName())) {
            if (this.schemaProvider == null) {
                throw new HoodieException("Please provide a valid schema provider class when use ByteArrayDeserializer!");
            }
            AvroConvertor convertor = new AvroConvertor(this.schemaProvider.getSourceSchema());
            return KafkaUtils.createRDD(this.sparkContext, this.offsetGen.getKafkaParams(), offsetRanges, LocationStrategies.PreferConsistent()).map((Function & Serializable)obj -> convertor.fromAvroBinary((byte[])obj.value()));
        }
        return KafkaUtils.createRDD(this.sparkContext, this.offsetGen.getKafkaParams(), offsetRanges, LocationStrategies.PreferConsistent()).map((Function & Serializable)obj -> (GenericRecord)obj.value());
    }
}

