/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.utilities.sources;

import com.google.crypto.tink.subtle.Base64;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Properties;
import java.util.UUID;
import java.util.stream.Collectors;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.generic.IndexedRecord;
import org.apache.hudi.avro.HoodieAvroUtils;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.common.util.hash.HashID;
import org.apache.hudi.testutils.SparkClientFunctionalTestHarness;
import org.apache.hudi.utilities.UtilHelpers;
import org.apache.hudi.utilities.config.HoodieStreamerConfig;
import org.apache.hudi.utilities.config.KafkaSourceConfig;
import org.apache.hudi.utilities.deser.KafkaAvroSchemaDeserializer;
import org.apache.hudi.utilities.exception.HoodieReadFromSourceException;
import org.apache.hudi.utilities.ingestion.HoodieIngestionMetrics;
import org.apache.hudi.utilities.schema.FilebasedSchemaProvider;
import org.apache.hudi.utilities.schema.SchemaProvider;
import org.apache.hudi.utilities.sources.AvroKafkaSource;
import org.apache.hudi.utilities.sources.Source;
import org.apache.hudi.utilities.streamer.SourceFormatAdapter;
import org.apache.hudi.utilities.testutils.UtilitiesTestBase;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.Dataset;
import org.apache.spark.streaming.kafka010.KafkaTestUtils;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.mockito.Mockito;

public class TestAvroKafkaSource
extends SparkClientFunctionalTestHarness {
    protected static final String TEST_TOPIC_PREFIX = "hoodie_avro_test_";
    protected static HoodieTestDataGenerator dataGen;
    protected static String SCHEMA_PATH;
    protected final HoodieIngestionMetrics metrics = (HoodieIngestionMetrics)Mockito.mock(HoodieIngestionMetrics.class);
    protected SchemaProvider schemaProvider;
    protected KafkaTestUtils testUtils;

    @BeforeAll
    public static void initClass() {
        dataGen = new HoodieTestDataGenerator(57069L);
    }

    @BeforeEach
    public void setup() {
        this.testUtils = new KafkaTestUtils();
        this.testUtils.setup();
    }

    @AfterEach
    public void tearDown() {
        this.testUtils.teardown();
    }

    protected TypedProperties createPropsForKafkaSource(String topic, Long maxEventsToReadFromKafkaSource, String resetStrategy) {
        TypedProperties props = new TypedProperties();
        props.setProperty("hoodie.streamer.source.kafka.topic", topic);
        props.setProperty("bootstrap.servers", this.testUtils.brokerAddress());
        props.setProperty("auto.offset.reset", resetStrategy);
        props.setProperty("enable.auto.commit", "false");
        props.setProperty("hoodie.streamer.kafka.source.maxEvents", maxEventsToReadFromKafkaSource != null ? String.valueOf(maxEventsToReadFromKafkaSource) : String.valueOf(KafkaSourceConfig.MAX_EVENTS_FROM_KAFKA_SOURCE.defaultValue()));
        props.setProperty("group.id", UUID.randomUUID().toString());
        return props;
    }

    void sendMessagesToKafka(String topic, int count, int numPartitions) {
        List genericRecords = dataGen.generateGenericRecords(count);
        Properties config = this.getProducerProperties();
        try (KafkaProducer producer = new KafkaProducer(config);){
            for (int i = 0; i < genericRecords.size(); ++i) {
                producer.send(new ProducerRecord(topic, Integer.valueOf(i % numPartitions), (Object)"key", (Object)HoodieAvroUtils.avroToBytes((IndexedRecord)((IndexedRecord)genericRecords.get(i)))));
            }
        }
    }

    void sendMessagesToKafkaWithNullKafkaKey(String topic, int count, int numPartitions) {
        List genericRecords = dataGen.generateGenericRecords(count);
        Properties config = this.getProducerProperties();
        try (KafkaProducer producer = new KafkaProducer(config);){
            for (int i = 0; i < genericRecords.size(); ++i) {
                producer.send(new ProducerRecord(topic, Integer.valueOf(i % numPartitions), null, (Object)HoodieAvroUtils.avroToBytes((IndexedRecord)((IndexedRecord)genericRecords.get(i)))));
            }
        }
    }

    void sendMessagesToKafkaWithNullKafkaValue(String topic, int count, int numPartitions) {
        Properties config = this.getProducerProperties();
        try (KafkaProducer producer = new KafkaProducer(config);){
            for (int i = 0; i < count; ++i) {
                producer.send(new ProducerRecord(topic, Integer.valueOf(i % numPartitions), (Object)"key", null));
            }
        }
    }

    private Properties getProducerProperties() {
        Properties props = new Properties();
        props.put("bootstrap.servers", this.testUtils.brokerAddress());
        props.put("value.serializer", ByteArraySerializer.class.getName());
        props.put("value.deserializer", ByteArraySerializer.class.getName());
        props.put("key.serializer", StringSerializer.class.getName());
        props.put("auto.register.schemas", "false");
        props.put("acks", "all");
        return props;
    }

    @Test
    void testKafkaSource_InvalidHostException() throws IOException {
        UtilitiesTestBase.Helpers.saveStringsToDFS(new String[]{dataGen.generateGenericRecord().getSchema().toString()}, this.hoodieStorage(), SCHEMA_PATH);
        String topic = "hoodie_avro_test_testKafkaOffsetAppend";
        TypedProperties props = this.createPropsForKafkaSource("hoodie_avro_test_testKafkaOffsetAppend", null, "earliest");
        props.put((Object)"hoodie.streamer.schemaprovider.source.schema.file", (Object)SCHEMA_PATH);
        SchemaProvider schemaProvider = UtilHelpers.wrapSchemaProviderWithPostProcessor((SchemaProvider)UtilHelpers.createSchemaProvider((String)FilebasedSchemaProvider.class.getName(), (TypedProperties)props, (JavaSparkContext)this.jsc()), (TypedProperties)props, (JavaSparkContext)this.jsc(), new ArrayList());
        AvroKafkaSource avroSourceWithConfluentConfigException = new AvroKafkaSource(props, this.jsc(), this.spark(), schemaProvider, this.metrics);
        Assertions.assertThrows(HoodieReadFromSourceException.class, () -> avroSourceWithConfluentConfigException.readFromCheckpoint(Option.empty(), Long.MAX_VALUE));
        props.setProperty("schema.registry.url", "schema-registry-url");
        props.setProperty("bootstrap.servers", "unknownhost");
        AvroKafkaSource avroSourceWithKafkaConfiException = new AvroKafkaSource(props, this.jsc(), this.spark(), schemaProvider, this.metrics);
        Assertions.assertThrows(HoodieReadFromSourceException.class, () -> avroSourceWithKafkaConfiException.readFromCheckpoint(Option.empty(), Long.MAX_VALUE));
    }

    @Test
    public void testAppendKafkaOffsets() throws IOException {
        UtilitiesTestBase.Helpers.saveStringsToDFS(new String[]{dataGen.generateGenericRecord().getSchema().toString()}, this.hoodieStorage(), SCHEMA_PATH);
        ConsumerRecord recordConsumerRecord = new ConsumerRecord("test", 0, 1L, (Object)"test", (Object)dataGen.generateGenericRecord());
        JavaRDD rdd = this.jsc().parallelize(Arrays.asList(recordConsumerRecord));
        TypedProperties props = new TypedProperties();
        props.put((Object)"hoodie.streamer.source.kafka.topic", (Object)"test");
        props.put((Object)"hoodie.streamer.schemaprovider.source.schema.file", (Object)SCHEMA_PATH);
        SchemaProvider schemaProvider = UtilHelpers.wrapSchemaProviderWithPostProcessor((SchemaProvider)UtilHelpers.createSchemaProvider((String)FilebasedSchemaProvider.class.getName(), (TypedProperties)props, (JavaSparkContext)this.jsc()), (TypedProperties)props, (JavaSparkContext)this.jsc(), new ArrayList());
        AvroKafkaSource avroKafkaSource = new AvroKafkaSource(props, this.jsc(), this.spark(), schemaProvider, null);
        GenericRecord withoutKafkaOffsets = (GenericRecord)avroKafkaSource.maybeAppendKafkaOffsets(rdd).collect().get(0);
        props.put((Object)HoodieStreamerConfig.KAFKA_APPEND_OFFSETS.key(), (Object)"true");
        schemaProvider = UtilHelpers.wrapSchemaProviderWithPostProcessor((SchemaProvider)UtilHelpers.createSchemaProvider((String)FilebasedSchemaProvider.class.getName(), (TypedProperties)props, (JavaSparkContext)this.jsc()), (TypedProperties)props, (JavaSparkContext)this.jsc(), new ArrayList());
        avroKafkaSource = new AvroKafkaSource(props, this.jsc(), this.spark(), schemaProvider, null);
        GenericRecord withKafkaOffsets = (GenericRecord)avroKafkaSource.maybeAppendKafkaOffsets(rdd).collect().get(0);
        Assertions.assertEquals((int)4, (int)(withKafkaOffsets.getSchema().getFields().size() - withoutKafkaOffsets.getSchema().getFields().size()));
        Assertions.assertEquals((Object)"test", (Object)withKafkaOffsets.get("_hoodie_kafka_source_key").toString());
        ConsumerRecord recordConsumerRecordNullKafkaKey = new ConsumerRecord("test", 0, 1L, null, (Object)dataGen.generateGenericRecord());
        JavaRDD rddNullKafkaKey = this.jsc().parallelize(Arrays.asList(recordConsumerRecordNullKafkaKey));
        avroKafkaSource = new AvroKafkaSource(props, this.jsc(), this.spark(), schemaProvider, null);
        GenericRecord withKafkaOffsetsAndNullKafkaKey = (GenericRecord)avroKafkaSource.maybeAppendKafkaOffsets(rddNullKafkaKey).collect().get(0);
        Assertions.assertNull((Object)withKafkaOffsetsAndNullKafkaKey.get("_hoodie_kafka_source_key"));
    }

    @Test
    public void testAppendKafkaOffsetsSourceFormatAdapter() throws IOException {
        UtilitiesTestBase.Helpers.saveStringsToDFS(new String[]{dataGen.generateGenericRecord().getSchema().toString()}, this.hoodieStorage(), SCHEMA_PATH);
        String topic = "hoodie_avro_test_testKafkaOffsetAppend";
        TypedProperties props = this.createPropsForKafkaSource("hoodie_avro_test_testKafkaOffsetAppend", null, "earliest");
        props.put((Object)"hoodie.streamer.schemaprovider.source.schema.file", (Object)SCHEMA_PATH);
        SchemaProvider schemaProvider = UtilHelpers.wrapSchemaProviderWithPostProcessor((SchemaProvider)UtilHelpers.createSchemaProvider((String)FilebasedSchemaProvider.class.getName(), (TypedProperties)props, (JavaSparkContext)this.jsc()), (TypedProperties)props, (JavaSparkContext)this.jsc(), new ArrayList());
        props.put((Object)"hoodie.streamer.source.kafka.value.deserializer.class", (Object)ByteArrayDeserializer.class.getName());
        int numPartitions = 2;
        int numMessages = 30;
        this.testUtils.createTopic("hoodie_avro_test_testKafkaOffsetAppend", numPartitions);
        this.sendMessagesToKafka("hoodie_avro_test_testKafkaOffsetAppend", numMessages, numPartitions);
        this.sendMessagesToKafkaWithNullKafkaValue("hoodie_avro_test_testKafkaOffsetAppend", numMessages, numPartitions);
        AvroKafkaSource avroKafkaSource = new AvroKafkaSource(props, this.jsc(), this.spark(), schemaProvider, this.metrics);
        SourceFormatAdapter kafkaSource = new SourceFormatAdapter((Source)avroKafkaSource);
        Dataset c = (Dataset)kafkaSource.fetchNewDataInRowFormat(Option.empty(), Long.MAX_VALUE).getBatch().get();
        List columns = Arrays.stream(c.columns()).collect(Collectors.toList());
        props.put((Object)HoodieStreamerConfig.KAFKA_APPEND_OFFSETS.key(), (Object)"true");
        schemaProvider = UtilHelpers.wrapSchemaProviderWithPostProcessor((SchemaProvider)UtilHelpers.createSchemaProvider((String)FilebasedSchemaProvider.class.getName(), (TypedProperties)props, (JavaSparkContext)this.jsc()), (TypedProperties)props, (JavaSparkContext)this.jsc(), new ArrayList());
        avroKafkaSource = new AvroKafkaSource(props, this.jsc(), this.spark(), schemaProvider, this.metrics);
        kafkaSource = new SourceFormatAdapter((Source)avroKafkaSource);
        Dataset d = (Dataset)kafkaSource.fetchNewDataInRowFormat(Option.empty(), Long.MAX_VALUE).getBatch().get();
        Assertions.assertEquals((long)numMessages, (long)d.count());
        for (int i = 0; i < numPartitions; ++i) {
            Assertions.assertEquals((int)(numMessages / numPartitions), (int)d.filter("_hoodie_kafka_source_partition=" + i).collectAsList().size());
        }
        List withKafkaOffsetColumns = Arrays.stream(d.columns()).collect(Collectors.toList());
        Assertions.assertEquals((long)0L, (long)d.drop(new String[]{"_hoodie_kafka_source_offset", "_hoodie_kafka_source_partition", "_hoodie_kafka_source_timestamp", "_hoodie_kafka_source_key", "city_to_state"}).except(c.drop("city_to_state")).count());
        Assertions.assertEquals((int)4, (int)(withKafkaOffsetColumns.size() - columns.size()));
        List<String> appendList = Arrays.asList("_hoodie_kafka_source_offset", "_hoodie_kafka_source_partition", "_hoodie_kafka_source_timestamp", "_hoodie_kafka_source_key");
        Assertions.assertEquals(appendList, withKafkaOffsetColumns.subList(withKafkaOffsetColumns.size() - 4, withKafkaOffsetColumns.size()));
        this.sendMessagesToKafkaWithNullKafkaKey("hoodie_avro_test_testKafkaOffsetAppend", numMessages, numPartitions);
        AvroKafkaSource avroKafkaSourceWithNullKafkaKey = new AvroKafkaSource(props, this.jsc(), this.spark(), schemaProvider, this.metrics);
        SourceFormatAdapter kafkaSourceWithNullKafkaKey = new SourceFormatAdapter((Source)avroKafkaSourceWithNullKafkaKey);
        Dataset nullKafkaKeyDataset = (Dataset)kafkaSourceWithNullKafkaKey.fetchNewDataInRowFormat(Option.empty(), Long.MAX_VALUE).getBatch().get();
        Assertions.assertEquals((long)numMessages, (long)nullKafkaKeyDataset.toDF().filter("_hoodie_kafka_source_key is null").count());
    }

    @Test
    void testConfigureSchemaDeserializer() throws IOException {
        String topic = "hoodie_avro_test_testAvroSchemaDeserializer";
        TypedProperties props = this.createPropsForKafkaSource("hoodie_avro_test_testAvroSchemaDeserializer", null, "earliest");
        props.put((Object)"hoodie.streamer.source.kafka.value.deserializer.class", (Object)KafkaAvroSchemaDeserializer.class.getName());
        Assertions.assertThrows(HoodieReadFromSourceException.class, () -> new AvroKafkaSource(props, this.jsc(), this.spark(), this.schemaProvider, this.metrics));
        String schemaFilePath = TestAvroKafkaSource.class.getClassLoader().getResource("schema/simple-test-with-default-value.avsc").getPath();
        props.put((Object)"hoodie.streamer.schemaprovider.source.schema.file", (Object)schemaFilePath);
        SchemaProvider schemaProvider = UtilHelpers.wrapSchemaProviderWithPostProcessor((SchemaProvider)UtilHelpers.createSchemaProvider((String)FilebasedSchemaProvider.class.getName(), (TypedProperties)props, (JavaSparkContext)this.jsc()), (TypedProperties)props, (JavaSparkContext)this.jsc(), new ArrayList());
        AvroKafkaSource avroKafkaSource = new AvroKafkaSource(props, this.jsc(), this.spark(), schemaProvider, this.metrics);
        Assertions.assertTrue((boolean)avroKafkaSource.props.containsKey((Object)"group.id"));
        String groupId = avroKafkaSource.props.getString("group.id", "");
        Assertions.assertTrue((groupId.length() <= 255 ? 1 : 0) != 0);
        schemaFilePath = TestAvroKafkaSource.class.getClassLoader().getResource("schema/evolved-test-with-default-value.avsc").getPath();
        props.put((Object)"hoodie.streamer.schemaprovider.source.schema.file", (Object)schemaFilePath);
        avroKafkaSource = new AvroKafkaSource(props, this.jsc(), this.spark(), schemaProvider, this.metrics);
        String newGroupId = avroKafkaSource.props.getString("group.id", "");
        Assertions.assertNotEquals((Object)groupId, (Object)newGroupId);
        String schemaHash = Base64.encode((byte[])HashID.hash((String)schemaProvider.getSourceSchema().toString(), (HashID.Size)HashID.Size.BITS_128));
        Assertions.assertEquals((Object)StringUtils.concatenateWithThreshold((String)String.format("%s_", groupId), (String)schemaHash, (int)255), (Object)newGroupId);
    }

    static {
        SCHEMA_PATH = "/tmp/schema_file.avsc";
    }
}

