/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.utilities.sources;

import java.util.ArrayList;
import java.util.Map;
import java.util.Properties;
import org.apache.hudi.AvroConversionUtils;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.exception.HoodieNotSupportedException;
import org.apache.hudi.testutils.SparkClientFunctionalTestHarness;
import org.apache.hudi.utilities.config.KafkaSourceConfig;
import org.apache.hudi.utilities.exception.HoodieStreamerException;
import org.apache.hudi.utilities.ingestion.HoodieIngestionMetrics;
import org.apache.hudi.utilities.schema.SchemaProvider;
import org.apache.hudi.utilities.sources.HoodieRetryingKafkaConsumer;
import org.apache.hudi.utilities.sources.InputBatch;
import org.apache.hudi.utilities.sources.Source;
import org.apache.hudi.utilities.sources.helpers.KafkaOffsetGen;
import org.apache.hudi.utilities.streamer.SourceFormatAdapter;
import org.apache.hudi.utilities.streamer.SourceProfile;
import org.apache.hudi.utilities.streamer.SourceProfileSupplier;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.rdd.RDD;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.streaming.kafka010.KafkaTestUtils;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.mockito.Mockito;
import org.mockito.verification.VerificationMode;

public abstract class BaseTestKafkaSource
extends SparkClientFunctionalTestHarness {
    protected static final String TEST_TOPIC_PREFIX = "hoodie_test_";
    protected final HoodieIngestionMetrics metrics = (HoodieIngestionMetrics)Mockito.mock(HoodieIngestionMetrics.class);
    protected final Option<SourceProfileSupplier> sourceProfile = Option.of((Object)Mockito.mock(SourceProfileSupplier.class));
    protected SchemaProvider schemaProvider;
    protected KafkaTestUtils testUtils;

    @BeforeEach
    public void initClass() {
        this.testUtils = new KafkaTestUtils();
        this.testUtils.setup();
    }

    @AfterEach
    public void cleanupClass() {
        this.testUtils.teardown();
    }

    protected abstract TypedProperties createPropsForKafkaSource(String var1, Long var2, String var3);

    protected abstract SourceFormatAdapter createSource(TypedProperties var1);

    protected abstract void sendMessagesToKafka(String var1, int var2, int var3);

    protected void verifyRddsArePersisted(Source source, String sparkPlan, boolean persistSourceRdd) {
        if (persistSourceRdd) {
            Assertions.assertTrue((boolean)sparkPlan.contains("CachedPartitions"));
            Assertions.assertEquals((int)1, (int)this.jsc().getPersistentRDDs().size());
        } else {
            Assertions.assertFalse((boolean)sparkPlan.contains("CachedPartitions"));
            Assertions.assertEquals((int)0, (int)this.jsc().getPersistentRDDs().size());
        }
        source.releaseResources();
        Assertions.assertEquals((int)0, (int)this.jsc().getPersistentRDDs().size());
    }

    @Test
    public void testKafkaSource() {
        String topic = "hoodie_test_testKafkaSource";
        this.testUtils.createTopic("hoodie_test_testKafkaSource", 2);
        TypedProperties props = this.createPropsForKafkaSource("hoodie_test_testKafkaSource", null, "earliest");
        SourceFormatAdapter kafkaSource = this.createSource(props);
        Assertions.assertEquals((Object)Option.empty(), (Object)kafkaSource.fetchNewDataInAvroFormat(Option.empty(), Long.MAX_VALUE).getBatch());
        this.sendMessagesToKafka("hoodie_test_testKafkaSource", 1000, 2);
        InputBatch fetch1 = kafkaSource.fetchNewDataInAvroFormat(Option.empty(), 900L);
        Assertions.assertEquals((long)900L, (long)((JavaRDD)fetch1.getBatch().get()).count());
        Dataset fetch1AsRows = AvroConversionUtils.createDataFrame((RDD)JavaRDD.toRDD((JavaRDD)((JavaRDD)fetch1.getBatch().get())), (String)this.schemaProvider.getSourceSchema().toString(), (SparkSession)kafkaSource.getSource().getSparkSession());
        Assertions.assertEquals((long)900L, (long)fetch1AsRows.count());
        this.sendMessagesToKafka("hoodie_test_testKafkaSource", 1000, 2);
        InputBatch fetch2 = kafkaSource.fetchNewDataInRowFormat(Option.of((Object)fetch1.getCheckpointForNextBatch()), Long.MAX_VALUE);
        Assertions.assertEquals((long)1100L, (long)((Dataset)fetch2.getBatch().get()).count());
        InputBatch fetch3 = kafkaSource.fetchNewDataInAvroFormat(Option.of((Object)fetch1.getCheckpointForNextBatch()), Long.MAX_VALUE);
        Assertions.assertEquals((long)((Dataset)fetch2.getBatch().get()).count(), (long)((JavaRDD)fetch3.getBatch().get()).count());
        Assertions.assertEquals((Object)fetch2.getCheckpointForNextBatch(), (Object)fetch3.getCheckpointForNextBatch());
        InputBatch fetch3AsRows = kafkaSource.fetchNewDataInRowFormat(Option.of((Object)fetch1.getCheckpointForNextBatch()), Long.MAX_VALUE);
        Assertions.assertEquals((long)((Dataset)fetch2.getBatch().get()).count(), (long)((Dataset)fetch3AsRows.getBatch().get()).count());
        Assertions.assertEquals((Object)fetch2.getCheckpointForNextBatch(), (Object)fetch3AsRows.getCheckpointForNextBatch());
        InputBatch fetch4 = kafkaSource.fetchNewDataInAvroFormat(Option.of((Object)fetch2.getCheckpointForNextBatch()), Long.MAX_VALUE);
        Assertions.assertEquals((Object)Option.empty(), (Object)fetch4.getBatch());
        InputBatch fetch4AsRows = kafkaSource.fetchNewDataInRowFormat(Option.of((Object)fetch2.getCheckpointForNextBatch()), Long.MAX_VALUE);
        Assertions.assertEquals((Object)Option.empty(), (Object)fetch4AsRows.getBatch());
    }

    @Test
    public void testKafkaSourceResetStrategy() {
        String topic = "hoodie_test_testKafkaSourceResetStrategy";
        this.testUtils.createTopic("hoodie_test_testKafkaSourceResetStrategy", 2);
        TypedProperties earliestProps = this.createPropsForKafkaSource("hoodie_test_testKafkaSourceResetStrategy", null, "earliest");
        SourceFormatAdapter earliestKafkaSource = this.createSource(earliestProps);
        TypedProperties latestProps = this.createPropsForKafkaSource("hoodie_test_testKafkaSourceResetStrategy", null, "latest");
        SourceFormatAdapter latestKafkaSource = this.createSource(latestProps);
        InputBatch earFetch0 = earliestKafkaSource.fetchNewDataInAvroFormat(Option.empty(), Long.MAX_VALUE);
        InputBatch latFetch0 = latestKafkaSource.fetchNewDataInAvroFormat(Option.empty(), Long.MAX_VALUE);
        Assertions.assertEquals((Object)earFetch0.getBatch(), (Object)latFetch0.getBatch());
        Assertions.assertEquals((Object)earFetch0.getCheckpointForNextBatch(), (Object)latFetch0.getCheckpointForNextBatch());
        this.sendMessagesToKafka("hoodie_test_testKafkaSourceResetStrategy", 1000, 2);
        InputBatch earFetch1 = earliestKafkaSource.fetchNewDataInAvroFormat(Option.empty(), Long.MAX_VALUE);
        InputBatch latFetch1 = latestKafkaSource.fetchNewDataInAvroFormat(Option.empty(), Long.MAX_VALUE);
        Assertions.assertEquals((Object)earFetch1.getCheckpointForNextBatch(), (Object)latFetch1.getCheckpointForNextBatch());
    }

    @Test
    public void testProtoKafkaSourceInsertRecordsLessSourceLimit() {
        String topic = "hoodie_test_testKafkaSourceInsertRecordsLessSourceLimit";
        this.testUtils.createTopic("hoodie_test_testKafkaSourceInsertRecordsLessSourceLimit", 2);
        TypedProperties props = this.createPropsForKafkaSource("hoodie_test_testKafkaSourceInsertRecordsLessSourceLimit", Long.MAX_VALUE, "earliest");
        SourceFormatAdapter kafkaSource = this.createSource(props);
        props.setProperty("hoodie.streamer.kafka.source.maxEvents", "500");
        this.sendMessagesToKafka("hoodie_test_testKafkaSourceInsertRecordsLessSourceLimit", 400, 2);
        InputBatch fetch1 = kafkaSource.fetchNewDataInAvroFormat(Option.empty(), 300L);
        Assertions.assertEquals((long)300L, (long)((JavaRDD)fetch1.getBatch().get()).count());
        this.sendMessagesToKafka("hoodie_test_testKafkaSourceInsertRecordsLessSourceLimit", 600, 2);
        InputBatch fetch2 = kafkaSource.fetchNewDataInRowFormat(Option.of((Object)fetch1.getCheckpointForNextBatch()), 300L);
        Assertions.assertEquals((long)300L, (long)((Dataset)fetch2.getBatch().get()).count());
    }

    @Test
    public void testCommitOffsetToKafka() {
        String topic = "hoodie_test_testCommitOffsetToKafka";
        this.testUtils.createTopic("hoodie_test_testCommitOffsetToKafka", 2);
        ArrayList<TopicPartition> topicPartitions = new ArrayList<TopicPartition>();
        TopicPartition topicPartition0 = new TopicPartition("hoodie_test_testCommitOffsetToKafka", 0);
        topicPartitions.add(topicPartition0);
        TopicPartition topicPartition1 = new TopicPartition("hoodie_test_testCommitOffsetToKafka", 1);
        topicPartitions.add(topicPartition1);
        TypedProperties props = this.createPropsForKafkaSource("hoodie_test_testCommitOffsetToKafka", null, "earliest");
        props.put((Object)KafkaSourceConfig.ENABLE_KAFKA_COMMIT_OFFSET.key(), (Object)"true");
        SourceFormatAdapter kafkaSource = this.createSource(props);
        Assertions.assertEquals((Object)Option.empty(), (Object)kafkaSource.fetchNewDataInAvroFormat(Option.empty(), Long.MAX_VALUE).getBatch());
        this.sendMessagesToKafka("hoodie_test_testCommitOffsetToKafka", 1000, 2);
        InputBatch fetch1 = kafkaSource.fetchNewDataInAvroFormat(Option.empty(), 599L);
        kafkaSource.getSource().onCommit(fetch1.getCheckpointForNextBatch().getCheckpointKey());
        try (HoodieRetryingKafkaConsumer consumer = new HoodieRetryingKafkaConsumer(props, KafkaOffsetGen.excludeHoodieConfigs((TypedProperties)props));){
            consumer.assign(topicPartitions);
            OffsetAndMetadata offsetAndMetadata = consumer.committed(topicPartition0);
            Assertions.assertNotNull((Object)offsetAndMetadata);
            Assertions.assertEquals((long)300L, (long)offsetAndMetadata.offset());
            offsetAndMetadata = consumer.committed(topicPartition1);
            Assertions.assertNotNull((Object)offsetAndMetadata);
            Assertions.assertEquals((long)299L, (long)offsetAndMetadata.offset());
            Map endOffsets = consumer.endOffsets(topicPartitions);
            Assertions.assertEquals((long)500L, (Long)((Long)endOffsets.get(topicPartition0)));
            Assertions.assertEquals((long)500L, (Long)((Long)endOffsets.get(topicPartition1)));
            this.sendMessagesToKafka("hoodie_test_testCommitOffsetToKafka", 500, 2);
            InputBatch fetch2 = kafkaSource.fetchNewDataInRowFormat(Option.of((Object)fetch1.getCheckpointForNextBatch()), Long.MAX_VALUE);
            kafkaSource.getSource().onCommit(fetch2.getCheckpointForNextBatch().getCheckpointKey());
            offsetAndMetadata = consumer.committed(topicPartition0);
            Assertions.assertNotNull((Object)offsetAndMetadata);
            Assertions.assertEquals((long)750L, (long)offsetAndMetadata.offset());
            offsetAndMetadata = consumer.committed(topicPartition1);
            Assertions.assertNotNull((Object)offsetAndMetadata);
            Assertions.assertEquals((long)750L, (long)offsetAndMetadata.offset());
            endOffsets = consumer.endOffsets(topicPartitions);
            Assertions.assertEquals((long)750L, (Long)((Long)endOffsets.get(topicPartition0)));
            Assertions.assertEquals((long)750L, (Long)((Long)endOffsets.get(topicPartition1)));
        }
        props.remove((Object)"group.id");
        Assertions.assertThrows(HoodieNotSupportedException.class, () -> kafkaSource.getSource().onCommit(""));
    }

    @Test
    public void testFailOnDataLoss() throws Exception {
        String topic = "hoodie_test_testFailOnDataLoss";
        Properties topicConfig = new Properties();
        topicConfig.setProperty("retention.ms", "8000");
        this.testUtils.createTopic("hoodie_test_testFailOnDataLoss", 2, topicConfig);
        TypedProperties failOnDataLossProps = this.createPropsForKafkaSource("hoodie_test_testFailOnDataLoss", null, "earliest");
        failOnDataLossProps.setProperty(KafkaSourceConfig.ENABLE_FAIL_ON_DATA_LOSS.key(), Boolean.toString(true));
        SourceFormatAdapter kafkaSource = this.createSource(failOnDataLossProps);
        this.sendMessagesToKafka("hoodie_test_testFailOnDataLoss", 10, 2);
        InputBatch fetch1 = kafkaSource.fetchNewDataInAvroFormat(Option.empty(), 2L);
        Assertions.assertEquals((long)2L, (long)((JavaRDD)fetch1.getBatch().get()).count());
        Thread.sleep(30000L);
        Throwable t = Assertions.assertThrows(HoodieStreamerException.class, () -> kafkaSource.fetchNewDataInAvroFormat(Option.of((Object)fetch1.getCheckpointForNextBatch()), Long.MAX_VALUE));
        String errorMessagePrefix = "Some data may have been lost because they are not available in Kafka any more; either the data was aged out by Kafka or the topic may have been deleted before all the data in the topic was processed. Kafka partitions that have out-of-bound checkpoints:";
        Assertions.assertTrue((boolean)t.getMessage().startsWith(errorMessagePrefix));
        t = Assertions.assertThrows(HoodieStreamerException.class, () -> kafkaSource.fetchNewDataInRowFormat(Option.of((Object)fetch1.getCheckpointForNextBatch()), Long.MAX_VALUE));
        Assertions.assertTrue((boolean)t.getMessage().startsWith(errorMessagePrefix));
    }

    @Test
    public void testKafkaSourceWithOffsetsFromSourceProfile() {
        String topic = "hoodie_test_testKafkaSourceWithOffsetRanges";
        this.testUtils.createTopic("hoodie_test_testKafkaSourceWithOffsetRanges", 2);
        TypedProperties props = this.createPropsForKafkaSource("hoodie_test_testKafkaSourceWithOffsetRanges", null, "earliest");
        Mockito.when((Object)((SourceProfileSupplier)this.sourceProfile.get()).getSourceProfile()).thenReturn((Object)new TestSourceProfile(Long.MAX_VALUE, 4, 500L));
        SourceFormatAdapter kafkaSource = this.createSource(props);
        Assertions.assertEquals((Object)Option.empty(), (Object)kafkaSource.fetchNewDataInAvroFormat(Option.empty(), Long.MAX_VALUE).getBatch());
        this.sendMessagesToKafka("hoodie_test_testKafkaSourceWithOffsetRanges", 1000, 2);
        InputBatch fetch1 = kafkaSource.fetchNewDataInAvroFormat(Option.empty(), 900L);
        Assertions.assertEquals((long)500L, (long)((JavaRDD)fetch1.getBatch().get()).count());
        ((HoodieIngestionMetrics)Mockito.verify((Object)this.metrics, (VerificationMode)Mockito.times((int)2))).updateStreamerSourceParallelism(4);
        ((HoodieIngestionMetrics)Mockito.verify((Object)this.metrics, (VerificationMode)Mockito.times((int)2))).updateStreamerSourceBytesToBeIngestedInSyncRound(Long.MAX_VALUE);
    }

    static class TestSourceProfile
    implements SourceProfile<Long> {
        private final long maxSourceBytes;
        private final int sourcePartitions;
        private final long numEvents;

        public TestSourceProfile(long maxSourceBytes, int sourcePartitions, long numEvents) {
            this.maxSourceBytes = maxSourceBytes;
            this.sourcePartitions = sourcePartitions;
            this.numEvents = numEvents;
        }

        public long getMaxSourceBytes() {
            return this.maxSourceBytes;
        }

        public int getSourcePartitions() {
            return this.sourcePartitions;
        }

        public Long getSourceSpecificContext() {
            return this.numEvents;
        }
    }
}

