/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.utilities.sources;

import com.fasterxml.jackson.databind.ObjectMapper;
import java.io.IOException;
import java.io.Serializable;
import java.net.URL;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.UUID;
import java.util.stream.Collectors;
import org.apache.hudi.client.common.HoodieSparkEngineContext;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.model.HoodieAvroRecord;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieErrorTableConfig;
import org.apache.hudi.utilities.UtilHelpers;
import org.apache.hudi.utilities.config.HoodieStreamerConfig;
import org.apache.hudi.utilities.config.KafkaSourceConfig;
import org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer;
import org.apache.hudi.utilities.ingestion.HoodieIngestionMetrics;
import org.apache.hudi.utilities.schema.FilebasedSchemaProvider;
import org.apache.hudi.utilities.sources.BaseTestKafkaSource;
import org.apache.hudi.utilities.sources.InputBatch;
import org.apache.hudi.utilities.sources.JsonKafkaSource;
import org.apache.hudi.utilities.sources.Source;
import org.apache.hudi.utilities.streamer.BaseErrorTableWriter;
import org.apache.hudi.utilities.streamer.DefaultStreamContext;
import org.apache.hudi.utilities.streamer.ErrorEvent;
import org.apache.hudi.utilities.streamer.HoodieStreamer;
import org.apache.hudi.utilities.streamer.SourceFormatAdapter;
import org.apache.hudi.utilities.streamer.StreamContext;
import org.apache.hudi.utilities.testutils.UtilitiesTestBase;
import org.apache.kafka.common.TopicPartition;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.SparkSession;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import scala.Tuple2;

public class TestJsonKafkaSource
extends BaseTestKafkaSource {
    private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
    static final URL SCHEMA_FILE_URL = TestJsonKafkaSource.class.getClassLoader().getResource("streamer-config/source_short_trip_uber.avsc");

    @BeforeEach
    public void init() throws Exception {
        String schemaFilePath = Objects.requireNonNull(SCHEMA_FILE_URL).toURI().getPath();
        TypedProperties props = new TypedProperties();
        props.put((Object)"hoodie.streamer.schemaprovider.source.schema.file", (Object)schemaFilePath);
        this.schemaProvider = new FilebasedSchemaProvider(props, this.jsc());
    }

    @Override
    protected TypedProperties createPropsForKafkaSource(String topic, Long maxEventsToReadFromKafkaSource, String resetStrategy) {
        return TestJsonKafkaSource.createPropsForJsonKafkaSource(this.testUtils.brokerAddress(), topic, maxEventsToReadFromKafkaSource, resetStrategy);
    }

    static TypedProperties createPropsForJsonKafkaSource(String brokerAddress, String topic, Long maxEventsToReadFromKafkaSource, String resetStrategy) {
        TypedProperties props = new TypedProperties();
        props.setProperty("hoodie.streamer.source.kafka.topic", topic);
        props.setProperty("bootstrap.servers", brokerAddress);
        props.setProperty("auto.offset.reset", resetStrategy);
        props.setProperty("enable.auto.commit", "false");
        props.setProperty("hoodie.streamer.kafka.source.maxEvents", maxEventsToReadFromKafkaSource != null ? String.valueOf(maxEventsToReadFromKafkaSource) : String.valueOf(KafkaSourceConfig.MAX_EVENTS_FROM_KAFKA_SOURCE.defaultValue()));
        props.setProperty("group.id", UUID.randomUUID().toString());
        return props;
    }

    @Override
    protected SourceFormatAdapter createSource(TypedProperties props) {
        return new SourceFormatAdapter((Source)new JsonKafkaSource(props, this.jsc(), this.spark(), this.metrics, (StreamContext)new DefaultStreamContext(this.schemaProvider, this.sourceProfile)));
    }

    @Test
    public void testJsonKafkaSourceFilterNullMsg() {
        String topic = "hoodie_test_testJsonKafkaSourceFilterNullMsg";
        this.testUtils.createTopic("hoodie_test_testJsonKafkaSourceFilterNullMsg", 2);
        HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator();
        TypedProperties props = this.createPropsForKafkaSource("hoodie_test_testJsonKafkaSourceFilterNullMsg", null, "earliest");
        JsonKafkaSource jsonSource = new JsonKafkaSource(props, this.jsc(), this.spark(), this.schemaProvider, this.metrics);
        SourceFormatAdapter kafkaSource = new SourceFormatAdapter((Source)jsonSource);
        Assertions.assertEquals((Object)Option.empty(), (Object)kafkaSource.fetchNewDataInAvroFormat(Option.empty(), Long.MAX_VALUE).getBatch());
        this.testUtils.sendMessages("hoodie_test_testJsonKafkaSourceFilterNullMsg", UtilitiesTestBase.Helpers.jsonifyRecords(dataGenerator.generateInsertsAsPerSchema("000", Integer.valueOf(1000), "{\"type\":\"record\",\"name\":\"shortTripRec\",\"fields\":[{\"name\":\"timestamp\",\"type\":\"long\"},{\"name\":\"_row_key\",\"type\":\"string\"},{\"name\":\"rider\",\"type\":\"string\"},{\"name\":\"driver\",\"type\":\"string\"},{\"name\":\"fare\",\"type\":\"double\"},{\"name\": \"_hoodie_is_deleted\", \"type\": \"boolean\", \"default\": false}]}")));
        this.testUtils.sendMessages("hoodie_test_testJsonKafkaSourceFilterNullMsg", new String[100]);
        InputBatch fetch1 = kafkaSource.fetchNewDataInAvroFormat(Option.empty(), Long.MAX_VALUE);
        Assertions.assertEquals((long)1000L, (long)((JavaRDD)fetch1.getBatch().get()).count());
    }

    @Test
    public void testJsonKafkaSourceWithDefaultUpperCap() {
        String topic = "hoodie_test_testJsonKafkaSourceWithDefaultUpperCap";
        this.testUtils.createTopic("hoodie_test_testJsonKafkaSourceWithDefaultUpperCap", 2);
        HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator();
        TypedProperties props = this.createPropsForKafkaSource("hoodie_test_testJsonKafkaSourceWithDefaultUpperCap", Long.MAX_VALUE, "earliest");
        JsonKafkaSource jsonSource = new JsonKafkaSource(props, this.jsc(), this.spark(), this.schemaProvider, this.metrics);
        SourceFormatAdapter kafkaSource = new SourceFormatAdapter((Source)jsonSource);
        List send1 = dataGenerator.generateInsertsAsPerSchema("000", Integer.valueOf(1000), "{\"type\":\"record\",\"name\":\"shortTripRec\",\"fields\":[{\"name\":\"timestamp\",\"type\":\"long\"},{\"name\":\"_row_key\",\"type\":\"string\"},{\"name\":\"rider\",\"type\":\"string\"},{\"name\":\"driver\",\"type\":\"string\"},{\"name\":\"fare\",\"type\":\"double\"},{\"name\": \"_hoodie_is_deleted\", \"type\": \"boolean\", \"default\": false}]}");
        this.testUtils.sendMessages("hoodie_test_testJsonKafkaSourceWithDefaultUpperCap", UtilitiesTestBase.Helpers.jsonifyRecords(send1));
        InputBatch fetch1 = kafkaSource.fetchNewDataInAvroFormat(Option.empty(), Long.MAX_VALUE);
        Assertions.assertEquals((long)1000L, (long)((JavaRDD)fetch1.getBatch().get()).count());
        this.testUtils.sendMessages("hoodie_test_testJsonKafkaSourceWithDefaultUpperCap", UtilitiesTestBase.Helpers.jsonifyRecords(dataGenerator.generateInsertsAsPerSchema("001", Integer.valueOf(1000), "{\"type\":\"record\",\"name\":\"shortTripRec\",\"fields\":[{\"name\":\"timestamp\",\"type\":\"long\"},{\"name\":\"_row_key\",\"type\":\"string\"},{\"name\":\"rider\",\"type\":\"string\"},{\"name\":\"driver\",\"type\":\"string\"},{\"name\":\"fare\",\"type\":\"double\"},{\"name\": \"_hoodie_is_deleted\", \"type\": \"boolean\", \"default\": false}]}")));
        InputBatch fetch2 = kafkaSource.fetchNewDataInRowFormat(Option.of((Object)fetch1.getCheckpointForNextBatch()), 1500L);
        Assertions.assertEquals((long)1000L, (long)((Dataset)fetch2.getBatch().get()).count());
    }

    @Test
    public void testJsonKafkaSourceWithConfigurableUpperCap() {
        String topic = "hoodie_test_testJsonKafkaSourceWithConfigurableUpperCap";
        this.testUtils.createTopic("hoodie_test_testJsonKafkaSourceWithConfigurableUpperCap", 2);
        HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator();
        TypedProperties props = this.createPropsForKafkaSource("hoodie_test_testJsonKafkaSourceWithConfigurableUpperCap", 500L, "earliest");
        JsonKafkaSource jsonSource = new JsonKafkaSource(props, this.jsc(), this.spark(), this.schemaProvider, this.metrics);
        SourceFormatAdapter kafkaSource = new SourceFormatAdapter((Source)jsonSource);
        this.testUtils.sendMessages("hoodie_test_testJsonKafkaSourceWithConfigurableUpperCap", UtilitiesTestBase.Helpers.jsonifyRecords(dataGenerator.generateInsertsAsPerSchema("000", Integer.valueOf(1000), "{\"type\":\"record\",\"name\":\"shortTripRec\",\"fields\":[{\"name\":\"timestamp\",\"type\":\"long\"},{\"name\":\"_row_key\",\"type\":\"string\"},{\"name\":\"rider\",\"type\":\"string\"},{\"name\":\"driver\",\"type\":\"string\"},{\"name\":\"fare\",\"type\":\"double\"},{\"name\": \"_hoodie_is_deleted\", \"type\": \"boolean\", \"default\": false}]}")));
        InputBatch fetch1 = kafkaSource.fetchNewDataInAvroFormat(Option.empty(), 900L);
        Assertions.assertEquals((long)900L, (long)((JavaRDD)fetch1.getBatch().get()).count());
        this.testUtils.sendMessages("hoodie_test_testJsonKafkaSourceWithConfigurableUpperCap", UtilitiesTestBase.Helpers.jsonifyRecords(dataGenerator.generateInsertsAsPerSchema("001", Integer.valueOf(1000), "{\"type\":\"record\",\"name\":\"shortTripRec\",\"fields\":[{\"name\":\"timestamp\",\"type\":\"long\"},{\"name\":\"_row_key\",\"type\":\"string\"},{\"name\":\"rider\",\"type\":\"string\"},{\"name\":\"driver\",\"type\":\"string\"},{\"name\":\"fare\",\"type\":\"double\"},{\"name\": \"_hoodie_is_deleted\", \"type\": \"boolean\", \"default\": false}]}")));
        InputBatch fetch2 = kafkaSource.fetchNewDataInRowFormat(Option.of((Object)fetch1.getCheckpointForNextBatch()), Long.MAX_VALUE);
        Assertions.assertEquals((long)500L, (long)((Dataset)fetch2.getBatch().get()).count());
        InputBatch fetch3 = kafkaSource.fetchNewDataInAvroFormat(Option.of((Object)fetch1.getCheckpointForNextBatch()), 400L);
        Assertions.assertEquals((long)400L, (long)((JavaRDD)fetch3.getBatch().get()).count());
        InputBatch fetch4 = kafkaSource.fetchNewDataInAvroFormat(Option.of((Object)fetch2.getCheckpointForNextBatch()), 600L);
        Assertions.assertEquals((long)600L, (long)((JavaRDD)fetch4.getBatch().get()).count());
        InputBatch fetch5 = kafkaSource.fetchNewDataInAvroFormat(Option.of((Object)fetch1.getCheckpointForNextBatch()), Long.MAX_VALUE);
        Assertions.assertEquals((long)((Dataset)fetch2.getBatch().get()).count(), (long)((JavaRDD)fetch5.getBatch().get()).count());
        Assertions.assertEquals((Object)fetch2.getCheckpointForNextBatch(), (Object)fetch5.getCheckpointForNextBatch());
        InputBatch fetch6 = kafkaSource.fetchNewDataInAvroFormat(Option.of((Object)fetch4.getCheckpointForNextBatch()), Long.MAX_VALUE);
        Assertions.assertEquals((Object)Option.empty(), (Object)fetch6.getBatch());
    }

    @Override
    protected void sendMessagesToKafka(String topic, int count, int numPartitions) {
        HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator();
        this.testUtils.sendMessages(topic, UtilitiesTestBase.Helpers.jsonifyRecordsByPartitions(dataGenerator.generateInsertsAsPerSchema("000", Integer.valueOf(count), "{\"type\":\"record\",\"name\":\"shortTripRec\",\"fields\":[{\"name\":\"timestamp\",\"type\":\"long\"},{\"name\":\"_row_key\",\"type\":\"string\"},{\"name\":\"rider\",\"type\":\"string\"},{\"name\":\"driver\",\"type\":\"string\"},{\"name\":\"fare\",\"type\":\"double\"},{\"name\": \"_hoodie_is_deleted\", \"type\": \"boolean\", \"default\": false}]}"), numPartitions));
    }

    void sendNullKafkaKeyMessagesToKafka(String topic, int count, int numPartitions) {
        HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator();
        this.testUtils.sendMessages(topic, UtilitiesTestBase.Helpers.jsonifyRecordsByPartitionsWithNullKafkaKey(dataGenerator.generateInsertsAsPerSchema("000", Integer.valueOf(count), "{\"type\":\"record\",\"name\":\"shortTripRec\",\"fields\":[{\"name\":\"timestamp\",\"type\":\"long\"},{\"name\":\"_row_key\",\"type\":\"string\"},{\"name\":\"rider\",\"type\":\"string\"},{\"name\":\"driver\",\"type\":\"string\"},{\"name\":\"fare\",\"type\":\"double\"},{\"name\": \"_hoodie_is_deleted\", \"type\": \"boolean\", \"default\": false}]}"), numPartitions));
    }

    void sendJsonSafeMessagesToKafka(String topic, int count, int numPartitions) {
        try {
            Tuple2[] keyValues = new Tuple2[count];
            HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator();
            String[] records = UtilitiesTestBase.Helpers.jsonifyRecords(dataGenerator.generateInsertsAsPerSchema("000", Integer.valueOf(count), "{\"type\":\"record\",\"name\":\"shortTripRec\",\"fields\":[{\"name\":\"timestamp\",\"type\":\"long\"},{\"name\":\"_row_key\",\"type\":\"string\"},{\"name\":\"rider\",\"type\":\"string\"},{\"name\":\"driver\",\"type\":\"string\"},{\"name\":\"fare\",\"type\":\"double\"},{\"name\": \"_hoodie_is_deleted\", \"type\": \"boolean\", \"default\": false}]}"));
            for (int i = 0; i < count; ++i) {
                Map node = (Map)OBJECT_MAPPER.readValue(records[i], Map.class);
                node.remove("height");
                node.remove("current_date");
                node.remove("nation");
                keyValues[i] = new Tuple2((Object)Integer.toString(i % numPartitions), (Object)OBJECT_MAPPER.writeValueAsString((Object)node));
            }
            this.testUtils.sendMessages(topic, keyValues);
        }
        catch (Exception ex) {
            throw new RuntimeException(ex);
        }
    }

    @Test
    public void testErrorEventsForDataInRowFormat() throws IOException {
        String topic = "hoodie_test_testErrorEventsForDataInRowFormat";
        this.testUtils.createTopic("hoodie_test_testErrorEventsForDataInRowFormat", 2);
        ArrayList<TopicPartition> topicPartitions = new ArrayList<TopicPartition>();
        TopicPartition topicPartition0 = new TopicPartition("hoodie_test_testErrorEventsForDataInRowFormat", 0);
        topicPartitions.add(topicPartition0);
        TopicPartition topicPartition1 = new TopicPartition("hoodie_test_testErrorEventsForDataInRowFormat", 1);
        topicPartitions.add(topicPartition1);
        HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator();
        this.sendJsonSafeMessagesToKafka("hoodie_test_testErrorEventsForDataInRowFormat", 1000, 2);
        this.testUtils.sendMessages("hoodie_test_testErrorEventsForDataInRowFormat", new String[]{"error_event1", "error_event2"});
        TypedProperties props = this.createPropsForKafkaSource("hoodie_test_testErrorEventsForDataInRowFormat", null, "earliest");
        props.put((Object)KafkaSourceConfig.ENABLE_KAFKA_COMMIT_OFFSET.key(), (Object)"true");
        props.put((Object)HoodieErrorTableConfig.ERROR_TABLE_BASE_PATH.key(), (Object)"/tmp/qurantine_table_test/json_kafka_row_events");
        props.put((Object)HoodieErrorTableConfig.ERROR_TARGET_TABLE.key(), (Object)"json_kafka_row_events");
        props.put((Object)"hoodie.errortable.validate.targetschema.enable", (Object)"true");
        props.put((Object)"hoodie.base.path", (Object)"/tmp/json_kafka_row_events");
        JsonKafkaSource jsonSource = new JsonKafkaSource(props, this.jsc(), this.spark(), this.schemaProvider, this.metrics);
        Option errorTableWriter = Option.of((Object)this.getAnonymousErrorTableWriter(props));
        SourceFormatAdapter kafkaSource = new SourceFormatAdapter((Source)jsonSource, errorTableWriter, Option.of((Object)props));
        Assertions.assertEquals((long)1000L, (long)((Dataset)kafkaSource.fetchNewDataInRowFormat(Option.empty(), Long.MAX_VALUE).getBatch().get()).count());
        Assertions.assertEquals((long)2L, (long)((JavaRDD)((BaseErrorTableWriter)errorTableWriter.get()).getErrorEvents(HoodieActiveTimeline.createNewInstantTime(), Option.empty()).get()).count());
    }

    @Test
    public void testErrorEventsForDataInAvroFormat() throws IOException {
        String topic = "hoodie_test_testErrorEventsForDataInAvroFormat";
        this.testUtils.createTopic("hoodie_test_testErrorEventsForDataInAvroFormat", 2);
        ArrayList<TopicPartition> topicPartitions = new ArrayList<TopicPartition>();
        TopicPartition topicPartition0 = new TopicPartition("hoodie_test_testErrorEventsForDataInAvroFormat", 0);
        topicPartitions.add(topicPartition0);
        TopicPartition topicPartition1 = new TopicPartition("hoodie_test_testErrorEventsForDataInAvroFormat", 1);
        topicPartitions.add(topicPartition1);
        HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator();
        this.testUtils.sendMessages("hoodie_test_testErrorEventsForDataInAvroFormat", UtilitiesTestBase.Helpers.jsonifyRecords(dataGenerator.generateInsertsAsPerSchema("000", Integer.valueOf(1000), "{\"type\":\"record\",\"name\":\"shortTripRec\",\"fields\":[{\"name\":\"timestamp\",\"type\":\"long\"},{\"name\":\"_row_key\",\"type\":\"string\"},{\"name\":\"rider\",\"type\":\"string\"},{\"name\":\"driver\",\"type\":\"string\"},{\"name\":\"fare\",\"type\":\"double\"},{\"name\": \"_hoodie_is_deleted\", \"type\": \"boolean\", \"default\": false}]}")));
        this.testUtils.sendMessages("hoodie_test_testErrorEventsForDataInAvroFormat", new String[]{"error_event1", "error_event2"});
        TypedProperties props = this.createPropsForKafkaSource("hoodie_test_testErrorEventsForDataInAvroFormat", null, "earliest");
        props.put((Object)KafkaSourceConfig.ENABLE_KAFKA_COMMIT_OFFSET.key(), (Object)"true");
        props.put((Object)HoodieErrorTableConfig.ERROR_TABLE_BASE_PATH.key(), (Object)"/tmp/qurantine_table_test/json_kafka_events");
        props.put((Object)HoodieErrorTableConfig.ERROR_TARGET_TABLE.key(), (Object)"json_kafka_events");
        props.put((Object)"hoodie.base.path", (Object)"/tmp/json_kafka_events");
        JsonKafkaSource jsonSource = new JsonKafkaSource(props, this.jsc(), this.spark(), this.schemaProvider, this.metrics);
        Option errorTableWriter = Option.of((Object)this.getAnonymousErrorTableWriter(props));
        SourceFormatAdapter kafkaSource = new SourceFormatAdapter((Source)jsonSource, errorTableWriter, Option.of((Object)props));
        InputBatch fetch1 = kafkaSource.fetchNewDataInAvroFormat(Option.empty(), Long.MAX_VALUE);
        Assertions.assertEquals((long)1000L, (long)((JavaRDD)fetch1.getBatch().get()).count());
        Assertions.assertEquals((long)2L, (long)((JavaRDD)((BaseErrorTableWriter)errorTableWriter.get()).getErrorEvents(HoodieActiveTimeline.createNewInstantTime(), Option.empty()).get()).count());
    }

    private BaseErrorTableWriter getAnonymousErrorTableWriter(TypedProperties props) {
        return new BaseErrorTableWriter<ErrorEvent<String>>((HoodieStreamer.Config)new HoodieDeltaStreamer.Config(), this.spark(), props, new HoodieSparkEngineContext(this.jsc()), this.fs()){
            List<JavaRDD<HoodieAvroRecord>> errorEvents;
            {
                this.errorEvents = new LinkedList<JavaRDD<HoodieAvroRecord>>();
            }

            public void addErrorEvents(JavaRDD errorEvent) {
                this.errorEvents.add((JavaRDD<HoodieAvroRecord>)errorEvent.map((Function & Serializable)r -> new HoodieAvroRecord(new HoodieKey(), null)));
            }

            public Option<JavaRDD<HoodieAvroRecord>> getErrorEvents(String baseTableInstantTime, Option commitedInstantTime) {
                return Option.of(this.errorEvents.stream().reduce((rdd1, rdd2) -> rdd1.union(rdd2)).get());
            }

            public boolean upsertAndCommit(String baseTableInstantTime, Option commitedInstantTime) {
                return false;
            }
        };
    }

    @Test
    public void testAppendKafkaOffset() {
        String topic = "hoodie_test_testKafkaOffsetAppend";
        int numPartitions = 2;
        int numMessages = 30;
        this.testUtils.createTopic("hoodie_test_testKafkaOffsetAppend", numPartitions);
        this.sendMessagesToKafka("hoodie_test_testKafkaOffsetAppend", numMessages, numPartitions);
        TypedProperties props = this.createPropsForKafkaSource("hoodie_test_testKafkaOffsetAppend", null, "earliest");
        JsonKafkaSource jsonSource = new JsonKafkaSource(props, this.jsc(), this.spark(), this.schemaProvider, this.metrics);
        SourceFormatAdapter kafkaSource = new SourceFormatAdapter((Source)jsonSource);
        Dataset dfNoOffsetInfo = ((Dataset)kafkaSource.fetchNewDataInRowFormat(Option.empty(), Long.MAX_VALUE).getBatch().get()).cache();
        Assertions.assertEquals((long)numMessages, (long)dfNoOffsetInfo.count());
        List columns = Arrays.stream(dfNoOffsetInfo.columns()).collect(Collectors.toList());
        props.put((Object)HoodieStreamerConfig.KAFKA_APPEND_OFFSETS.key(), (Object)"true");
        jsonSource = new JsonKafkaSource(props, this.jsc(), this.spark(), this.schemaProvider, this.metrics);
        kafkaSource = new SourceFormatAdapter((Source)jsonSource);
        Dataset dfWithOffsetInfo = ((Dataset)kafkaSource.fetchNewDataInRowFormat(Option.empty(), Long.MAX_VALUE).getBatch().get()).cache();
        Assertions.assertEquals((long)numMessages, (long)dfWithOffsetInfo.count());
        for (int i = 0; i < numPartitions; ++i) {
            Assertions.assertEquals((long)(numMessages / numPartitions), (long)dfWithOffsetInfo.filter("_hoodie_kafka_source_partition=" + i).count());
        }
        Assertions.assertEquals((long)0L, (long)dfWithOffsetInfo.drop(new String[]{"_hoodie_kafka_source_offset", "_hoodie_kafka_source_partition", "_hoodie_kafka_source_timestamp", "_hoodie_kafka_source_key"}).except(dfNoOffsetInfo).count());
        List withKafkaOffsetColumns = Arrays.stream(dfWithOffsetInfo.columns()).collect(Collectors.toList());
        Assertions.assertEquals((int)4, (int)(withKafkaOffsetColumns.size() - columns.size()));
        List<String> appendList = Arrays.asList("_hoodie_kafka_source_offset", "_hoodie_kafka_source_partition", "_hoodie_kafka_source_timestamp", "_hoodie_kafka_source_key");
        Assertions.assertEquals(appendList, withKafkaOffsetColumns.subList(withKafkaOffsetColumns.size() - 4, withKafkaOffsetColumns.size()));
        this.sendNullKafkaKeyMessagesToKafka("hoodie_test_testKafkaOffsetAppend", numMessages, numPartitions);
        jsonSource = new JsonKafkaSource(props, this.jsc(), this.spark(), this.schemaProvider, this.metrics);
        kafkaSource = new SourceFormatAdapter((Source)jsonSource);
        Dataset dfWithOffsetInfoAndNullKafkaKey = ((Dataset)kafkaSource.fetchNewDataInRowFormat(Option.empty(), Long.MAX_VALUE).getBatch().get()).cache();
        Assertions.assertEquals((long)numMessages, (long)dfWithOffsetInfoAndNullKafkaKey.toDF().filter("_hoodie_kafka_source_key is null").count());
        Assertions.assertEquals((long)numMessages, (long)dfWithOffsetInfoAndNullKafkaKey.toDF().filter("_hoodie_kafka_source_key is not null").count());
        Assertions.assertEquals((long)(numMessages * 2), (long)dfWithOffsetInfoAndNullKafkaKey.toDF().filter("_hoodie_kafka_source_offset is not null").count());
        dfNoOffsetInfo.unpersist();
        dfWithOffsetInfo.unpersist();
        dfWithOffsetInfoAndNullKafkaKey.unpersist();
    }

    @Test
    public void testCreateSource() throws IOException {
        String topic = "hoodie_test_testJsonKafkaSourceCreation";
        this.testUtils.createTopic("hoodie_test_testJsonKafkaSourceCreation", 2);
        TypedProperties props = this.createPropsForKafkaSource("hoodie_test_testJsonKafkaSourceCreation", null, "earliest");
        Source jsonKafkaSource = UtilHelpers.createSource((String)JsonKafkaSource.class.getName(), (TypedProperties)props, (JavaSparkContext)this.jsc(), (SparkSession)this.spark(), (HoodieIngestionMetrics)this.metrics, (StreamContext)new DefaultStreamContext(this.schemaProvider, this.sourceProfile));
        Assertions.assertEquals((Object)Source.SourceType.JSON, (Object)jsonKafkaSource.getSourceType());
    }
}

