/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.utilities.sources;

import com.google.protobuf.Message;
import java.io.Serializable;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.util.Collections;
import org.apache.hudi.DataSourceUtils;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.util.ReflectionUtils;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamerMetrics;
import org.apache.hudi.utilities.schema.ProtoClassBasedSchemaProvider;
import org.apache.hudi.utilities.schema.SchemaProvider;
import org.apache.hudi.utilities.sources.KafkaSource;
import org.apache.hudi.utilities.sources.Source;
import org.apache.hudi.utilities.sources.helpers.KafkaOffsetGen;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.streaming.kafka010.KafkaUtils;
import org.apache.spark.streaming.kafka010.LocationStrategies;
import org.apache.spark.streaming.kafka010.OffsetRange;

public class ProtoKafkaSource
extends KafkaSource<Message> {
    private final String className;

    public ProtoKafkaSource(TypedProperties props, JavaSparkContext sparkContext, SparkSession sparkSession, SchemaProvider schemaProvider, HoodieDeltaStreamerMetrics metrics) {
        super(props, sparkContext, sparkSession, schemaProvider, Source.SourceType.PROTO, metrics);
        DataSourceUtils.checkRequiredProperties(props, Collections.singletonList(ProtoClassBasedSchemaProvider.Config.PROTO_SCHEMA_CLASS_NAME.key()));
        props.put("key.deserializer", StringDeserializer.class);
        props.put("value.deserializer", ByteArrayDeserializer.class);
        this.className = props.getString(ProtoClassBasedSchemaProvider.Config.PROTO_SCHEMA_CLASS_NAME.key());
        this.offsetGen = new KafkaOffsetGen(props);
    }

    @Override
    JavaRDD<Message> toRDD(OffsetRange[] offsetRanges) {
        ProtoDeserializer deserializer = new ProtoDeserializer(this.className);
        return KafkaUtils.createRDD(this.sparkContext, this.offsetGen.getKafkaParams(), offsetRanges, LocationStrategies.PreferConsistent()).map((Function & Serializable)obj -> deserializer.parse((byte[])obj.value()));
    }

    private static class ProtoDeserializer
    implements Serializable {
        private final String className;
        private transient Class protoClass;
        private transient Method parseMethod;

        public ProtoDeserializer(String className) {
            this.className = className;
        }

        public Message parse(byte[] bytes) {
            try {
                return (Message)this.getParseMethod().invoke(this.getClass(), new Object[]{bytes});
            }
            catch (IllegalAccessException | InvocationTargetException ex) {
                throw new HoodieException("Failed to parse proto message from kafka", ex);
            }
        }

        private Class getProtoClass() {
            if (this.protoClass == null) {
                this.protoClass = ReflectionUtils.getClass(this.className);
            }
            return this.protoClass;
        }

        private Method getParseMethod() {
            if (this.parseMethod == null) {
                try {
                    this.parseMethod = this.getProtoClass().getMethod("parseFrom", byte[].class);
                }
                catch (NoSuchMethodException ex) {
                    throw new HoodieException("Unable to get proto parsing method from specified class: " + this.className, ex);
                }
            }
            return this.parseMethod;
        }
    }
}

