/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.utilities.sources;

import com.google.protobuf.BoolValue;
import com.google.protobuf.ByteString;
import com.google.protobuf.BytesValue;
import com.google.protobuf.DoubleValue;
import com.google.protobuf.FloatValue;
import com.google.protobuf.Int32Value;
import com.google.protobuf.Int64Value;
import com.google.protobuf.Message;
import com.google.protobuf.MessageOrBuilder;
import com.google.protobuf.StringValue;
import com.google.protobuf.UInt32Value;
import com.google.protobuf.UInt64Value;
import com.google.protobuf.util.JsonFormat;
import com.google.protobuf.util.Timestamps;
import io.confluent.kafka.serializers.protobuf.KafkaProtobufDeserializer;
import io.confluent.kafka.serializers.protobuf.KafkaProtobufSerializer;
import java.io.Serializable;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Properties;
import java.util.Random;
import java.util.UUID;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.config.HoodieErrorTableConfig;
import org.apache.hudi.utilities.config.KafkaSourceConfig;
import org.apache.hudi.utilities.config.ProtoClassBasedSchemaProviderConfig;
import org.apache.hudi.utilities.schema.ProtoClassBasedSchemaProvider;
import org.apache.hudi.utilities.schema.SchemaProvider;
import org.apache.hudi.utilities.schema.SchemaRegistryProvider;
import org.apache.hudi.utilities.sources.BaseTestKafkaSource;
import org.apache.hudi.utilities.sources.InputBatch;
import org.apache.hudi.utilities.sources.ProtoKafkaSource;
import org.apache.hudi.utilities.sources.Source;
import org.apache.hudi.utilities.streamer.DefaultStreamContext;
import org.apache.hudi.utilities.streamer.SourceFormatAdapter;
import org.apache.hudi.utilities.streamer.StreamContext;
import org.apache.hudi.utilities.test.proto.Nested;
import org.apache.hudi.utilities.test.proto.Sample;
import org.apache.hudi.utilities.test.proto.SampleEnum;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.sql.Dataset;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;

public class TestProtoKafkaSource
extends BaseTestKafkaSource {
    private static final JsonFormat.Printer PRINTER = JsonFormat.printer().omittingInsignificantWhitespace();
    private static final Random RANDOM = new Random();
    private static final String MOCK_REGISTRY_URL = "mock://127.0.0.1:8081";

    @Override
    protected TypedProperties createPropsForKafkaSource(String topic, Long maxEventsToReadFromKafkaSource, String resetStrategy) {
        TypedProperties props = new TypedProperties();
        props.setProperty("hoodie.streamer.source.kafka.topic", topic);
        props.setProperty("bootstrap.servers", this.testUtils.brokerAddress());
        props.setProperty("auto.offset.reset", resetStrategy);
        props.setProperty("enable.auto.commit", "false");
        props.setProperty("hoodie.streamer.kafka.source.maxEvents", maxEventsToReadFromKafkaSource != null ? String.valueOf(maxEventsToReadFromKafkaSource) : String.valueOf(KafkaSourceConfig.MAX_EVENTS_FROM_KAFKA_SOURCE.defaultValue()));
        props.setProperty("group.id", UUID.randomUUID().toString());
        props.setProperty(ProtoClassBasedSchemaProviderConfig.PROTO_SCHEMA_CLASS_NAME.key(), Sample.class.getName());
        return props;
    }

    @Override
    protected SourceFormatAdapter createSource(TypedProperties props) {
        this.schemaProvider = new ProtoClassBasedSchemaProvider(props, this.jsc());
        ProtoKafkaSource protoKafkaSource = new ProtoKafkaSource(props, this.jsc(), this.spark(), this.metrics, (StreamContext)new DefaultStreamContext(this.schemaProvider, this.sourceProfile));
        return new SourceFormatAdapter((Source)protoKafkaSource);
    }

    @ParameterizedTest
    @ValueSource(booleans={true, false})
    public void testProtoKafkaSourceWithConfluentProtoDeserialization(boolean persistSourceRdd) {
        String topic = "hoodie_test_testProtoKafkaSourceWithConfluentDeserializer_" + persistSourceRdd;
        this.testUtils.createTopic(topic, 2);
        TypedProperties props = this.createPropsForKafkaSource(topic, null, "earliest");
        props.put((Object)KafkaSourceConfig.KAFKA_PROTO_VALUE_DESERIALIZER_CLASS.key(), (Object)"io.confluent.kafka.serializers.protobuf.KafkaProtobufDeserializer");
        props.put((Object)"schema.registry.url", (Object)MOCK_REGISTRY_URL);
        props.put((Object)"hoodie.streamer.schemaprovider.registry.url", (Object)MOCK_REGISTRY_URL);
        props.setProperty(ProtoClassBasedSchemaProviderConfig.PROTO_SCHEMA_WRAPPED_PRIMITIVES_AS_RECORDS.key(), "true");
        props.setProperty(HoodieErrorTableConfig.ERROR_TABLE_PERSIST_SOURCE_RDD.key(), String.valueOf(persistSourceRdd));
        props.remove((Object)ProtoClassBasedSchemaProviderConfig.PROTO_SCHEMA_CLASS_NAME.key());
        SchemaRegistryProvider schemaProvider = new SchemaRegistryProvider(props, this.jsc());
        ProtoKafkaSource protoKafkaSource = new ProtoKafkaSource(props, this.jsc(), this.spark(), (SchemaProvider)schemaProvider, this.metrics);
        List<Sample> messages = TestProtoKafkaSource.createSampleMessages(1000);
        this.sendMessagesToKafkaWithConfluentSerializer(topic, 2, messages);
        JavaRDD messagesRead = (JavaRDD)protoKafkaSource.fetchNext(Option.empty(), 1000L).getBatch().get();
        Assertions.assertEquals(messages.stream().map(this::protoToJson).collect(Collectors.toSet()), new HashSet(messagesRead.map((Function & Serializable)message -> PRINTER.print((MessageOrBuilder)message)).collect()));
    }

    @ParameterizedTest
    @ValueSource(booleans={true, false})
    public void testProtoKafkaSourceWithFlattenWrappedPrimitives(boolean persistSourceRdd) {
        String topic = "hoodie_test_test_proto_kafka_source_flatten_persist_source_rdd_" + persistSourceRdd;
        this.testUtils.createTopic(topic, 2);
        TypedProperties props = this.createPropsForKafkaSource(topic, null, "earliest");
        props.setProperty(ProtoClassBasedSchemaProviderConfig.PROTO_SCHEMA_WRAPPED_PRIMITIVES_AS_RECORDS.key(), "true");
        props.setProperty(HoodieErrorTableConfig.ERROR_TABLE_PERSIST_SOURCE_RDD.key(), Boolean.toString(persistSourceRdd));
        ProtoClassBasedSchemaProvider schemaProvider = new ProtoClassBasedSchemaProvider(props, this.jsc());
        ProtoKafkaSource protoKafkaSource = new ProtoKafkaSource(props, this.jsc(), this.spark(), (SchemaProvider)schemaProvider, this.metrics);
        SourceFormatAdapter kafkaSource = new SourceFormatAdapter((Source)protoKafkaSource);
        Assertions.assertEquals((Object)Option.empty(), (Object)kafkaSource.fetchNewDataInAvroFormat(Option.empty(), Long.MAX_VALUE).getBatch());
        this.sendMessagesToKafka(topic, 1000, 2);
        InputBatch fetch1 = kafkaSource.fetchNewDataInAvroFormat(Option.empty(), 900L);
        Assertions.assertEquals((long)900L, (long)((JavaRDD)fetch1.getBatch().get()).count());
        Dataset fetch1AsRows = (Dataset)kafkaSource.fetchNewDataInRowFormat(Option.empty(), 900L).getBatch().get();
        Assertions.assertEquals((long)900L, (long)fetch1AsRows.count());
        this.sendMessagesToKafka(topic, 1000, 2);
        InputBatch fetch2 = kafkaSource.fetchNewDataInRowFormat(Option.of((Object)fetch1.getCheckpointForNextBatch()), Long.MAX_VALUE);
        Assertions.assertEquals((long)1100L, (long)((Dataset)fetch2.getBatch().get()).count());
        InputBatch fetch3 = kafkaSource.fetchNewDataInAvroFormat(Option.of((Object)fetch1.getCheckpointForNextBatch()), Long.MAX_VALUE);
        Assertions.assertEquals((long)((Dataset)fetch2.getBatch().get()).count(), (long)((JavaRDD)fetch3.getBatch().get()).count());
        Assertions.assertEquals((Object)fetch2.getCheckpointForNextBatch(), (Object)fetch3.getCheckpointForNextBatch());
        InputBatch fetch3AsRows = kafkaSource.fetchNewDataInRowFormat(Option.of((Object)fetch1.getCheckpointForNextBatch()), Long.MAX_VALUE);
        Assertions.assertEquals((long)((Dataset)fetch2.getBatch().get()).count(), (long)((Dataset)fetch3AsRows.getBatch().get()).count());
        Assertions.assertEquals((Object)fetch2.getCheckpointForNextBatch(), (Object)fetch3AsRows.getCheckpointForNextBatch());
        InputBatch fetch4 = kafkaSource.fetchNewDataInAvroFormat(Option.of((Object)fetch2.getCheckpointForNextBatch()), Long.MAX_VALUE);
        Assertions.assertEquals((Object)Option.empty(), (Object)fetch4.getBatch());
        InputBatch fetch4AsRows = kafkaSource.fetchNewDataInRowFormat(Option.of((Object)fetch2.getCheckpointForNextBatch()), Long.MAX_VALUE);
        Assertions.assertEquals((Object)Option.empty(), (Object)fetch4AsRows.getBatch());
    }

    private static List<Sample> createSampleMessages(int count) {
        return IntStream.range(0, count).mapToObj(unused -> {
            Sample.Builder builder = Sample.newBuilder().setPrimitiveDouble(RANDOM.nextDouble()).setPrimitiveFloat(RANDOM.nextFloat()).setPrimitiveInt(RANDOM.nextInt()).setPrimitiveLong(RANDOM.nextLong()).setPrimitiveUnsignedInt(RANDOM.nextInt()).setPrimitiveUnsignedLong(RANDOM.nextLong()).setPrimitiveSignedInt(RANDOM.nextInt()).setPrimitiveSignedLong(RANDOM.nextLong()).setPrimitiveFixedInt(RANDOM.nextInt()).setPrimitiveFixedLong(RANDOM.nextLong()).setPrimitiveFixedSignedInt(RANDOM.nextInt()).setPrimitiveFixedSignedLong(RANDOM.nextLong()).setPrimitiveBoolean(RANDOM.nextBoolean()).setPrimitiveString(UUID.randomUUID().toString()).setPrimitiveBytes(ByteString.copyFrom((byte[])StringUtils.getUTF8Bytes((String)UUID.randomUUID().toString())));
            if (RANDOM.nextBoolean()) {
                HashMap<String, Integer> primitiveMap = new HashMap<String, Integer>();
                primitiveMap.put(UUID.randomUUID().toString(), RANDOM.nextInt());
                HashMap<String, Nested> messageMap = new HashMap<String, Nested>();
                messageMap.put(UUID.randomUUID().toString(), TestProtoKafkaSource.generateRandomNestedMessage());
                builder.addAllRepeatedPrimitive(Arrays.asList(RANDOM.nextInt(), RANDOM.nextInt())).putAllMapPrimitive(primitiveMap).setNestedMessage(TestProtoKafkaSource.generateRandomNestedMessage()).addAllRepeatedMessage(Arrays.asList(TestProtoKafkaSource.generateRandomNestedMessage(), TestProtoKafkaSource.generateRandomNestedMessage())).putAllMapMessage(messageMap).setWrappedString(StringValue.of((String)UUID.randomUUID().toString())).setWrappedInt(Int32Value.of((int)RANDOM.nextInt())).setWrappedLong(Int64Value.of((long)RANDOM.nextLong())).setWrappedUnsignedInt(UInt32Value.of((int)RANDOM.nextInt())).setWrappedUnsignedLong(UInt64Value.of((long)RANDOM.nextLong())).setWrappedDouble(DoubleValue.of((double)RANDOM.nextDouble())).setWrappedFloat(FloatValue.of((float)RANDOM.nextFloat())).setWrappedBoolean(BoolValue.of((boolean)RANDOM.nextBoolean())).setWrappedBytes(BytesValue.of((ByteString)ByteString.copyFrom((byte[])StringUtils.getUTF8Bytes((String)UUID.randomUUID().toString())))).setEnum(SampleEnum.SECOND).setTimestamp(Timestamps.fromMillis((long)System.currentTimeMillis()));
            }
            return builder.build();
        }).collect(Collectors.toList());
    }

    private static Nested generateRandomNestedMessage() {
        return Nested.newBuilder().setNestedInt(RANDOM.nextInt()).build();
    }

    @Override
    protected void sendMessagesToKafka(String topic, int count, int numPartitions) {
        List<Sample> messages = TestProtoKafkaSource.createSampleMessages(count);
        try (KafkaProducer producer = new KafkaProducer(this.getProducerProperties(false));){
            for (int i = 0; i < messages.size(); ++i) {
                producer.send(new ProducerRecord(topic, (Object)Integer.toString(i % numPartitions), (Object)messages.get(i).toByteArray()));
            }
        }
    }

    private void sendMessagesToKafkaWithConfluentSerializer(String topic, int numPartitions, List<Sample> messages) {
        try (KafkaProducer producer = new KafkaProducer(this.getProducerProperties(true));){
            for (int i = 0; i < messages.size(); ++i) {
                producer.send(new ProducerRecord(topic, (Object)Integer.toString(i % numPartitions), (Object)messages.get(i)));
            }
        }
    }

    private Properties getProducerProperties(boolean useConfluentProtobufSerializer) {
        Properties props = new Properties();
        props.put("bootstrap.servers", this.testUtils.brokerAddress());
        if (useConfluentProtobufSerializer) {
            props.put("value.serializer", KafkaProtobufSerializer.class.getName());
            props.put("value.deserializer", KafkaProtobufDeserializer.class.getName());
            props.put("schema.registry.url", MOCK_REGISTRY_URL);
            props.put("auto.register.schemas", "true");
        } else {
            props.put("value.serializer", ByteArraySerializer.class.getName());
        }
        props.put("key.serializer", StringSerializer.class.getName());
        props.put("acks", "all");
        return props;
    }

    private String protoToJson(Message input) {
        try {
            return PRINTER.print((MessageOrBuilder)input);
        }
        catch (Exception e) {
            throw new RuntimeException("Failed to convert proto to json", e);
        }
    }
}

