/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.utilities.sources.helpers;

import java.util.Properties;
import java.util.UUID;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.exception.HoodieNotSupportedException;
import org.apache.hudi.utilities.config.KafkaSourceConfig;
import org.apache.hudi.utilities.ingestion.HoodieIngestionMetrics;
import org.apache.hudi.utilities.sources.helpers.KafkaOffsetGen;
import org.apache.hudi.utilities.testutils.UtilitiesTestBase;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.spark.streaming.kafka010.KafkaTestUtils;
import org.apache.spark.streaming.kafka010.OffsetRange;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.mockito.Mockito;

public class TestKafkaOffsetGen {
    private final String testTopicName = "hoodie_test_" + UUID.randomUUID();
    private HoodieIngestionMetrics metrics = (HoodieIngestionMetrics)Mockito.mock(HoodieIngestionMetrics.class);
    private KafkaTestUtils testUtils;

    @BeforeEach
    public void setup() throws Exception {
        this.testUtils = new KafkaTestUtils();
        this.testUtils.setup();
    }

    @AfterEach
    public void teardown() throws Exception {
        this.testUtils.teardown();
    }

    private TypedProperties getConsumerConfigs(String autoOffsetReset, String kafkaCheckpointType) {
        TypedProperties props = new TypedProperties();
        props.put((Object)"hoodie.streamer.source.kafka.checkpoint.type", (Object)kafkaCheckpointType);
        props.put((Object)"auto.offset.reset", (Object)autoOffsetReset);
        props.put((Object)"hoodie.streamer.source.kafka.topic", (Object)this.testTopicName);
        props.setProperty("bootstrap.servers", this.testUtils.brokerAddress());
        props.setProperty("key.deserializer", StringDeserializer.class.getName());
        props.setProperty("value.deserializer", StringDeserializer.class.getName());
        props.setProperty("group.id", UUID.randomUUID().toString());
        return props;
    }

    @Test
    public void testGetNextOffsetRangesFromEarliest() {
        HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator();
        this.testUtils.createTopic(this.testTopicName, 1);
        this.testUtils.sendMessages(this.testTopicName, UtilitiesTestBase.Helpers.jsonifyRecords(dataGenerator.generateInserts("000", Integer.valueOf(1000))));
        KafkaOffsetGen kafkaOffsetGen = new KafkaOffsetGen(this.getConsumerConfigs("earliest", "string"));
        OffsetRange[] nextOffsetRanges = kafkaOffsetGen.getNextOffsetRanges(Option.empty(), 500L, this.metrics);
        Assertions.assertEquals((int)1, (int)nextOffsetRanges.length);
        Assertions.assertEquals((long)0L, (long)nextOffsetRanges[0].fromOffset());
        Assertions.assertEquals((long)500L, (long)nextOffsetRanges[0].untilOffset());
        nextOffsetRanges = kafkaOffsetGen.getNextOffsetRanges(Option.empty(), 5000L, this.metrics);
        Assertions.assertEquals((int)1, (int)nextOffsetRanges.length);
        Assertions.assertEquals((long)0L, (long)nextOffsetRanges[0].fromOffset());
        Assertions.assertEquals((long)1000L, (long)nextOffsetRanges[0].untilOffset());
    }

    @Test
    public void testGetNextOffsetRangesFromLatest() {
        HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator();
        this.testUtils.createTopic(this.testTopicName, 1);
        this.testUtils.sendMessages(this.testTopicName, UtilitiesTestBase.Helpers.jsonifyRecords(dataGenerator.generateInserts("000", Integer.valueOf(1000))));
        KafkaOffsetGen kafkaOffsetGen = new KafkaOffsetGen(this.getConsumerConfigs("latest", "string"));
        OffsetRange[] nextOffsetRanges = kafkaOffsetGen.getNextOffsetRanges(Option.empty(), 500L, this.metrics);
        Assertions.assertEquals((int)1, (int)nextOffsetRanges.length);
        Assertions.assertEquals((long)1000L, (long)nextOffsetRanges[0].fromOffset());
        Assertions.assertEquals((long)1000L, (long)nextOffsetRanges[0].untilOffset());
    }

    @Test
    public void testGetNextOffsetRangesFromCheckpoint() {
        String lastCheckpointString = this.testTopicName + ",0:250";
        HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator();
        this.testUtils.createTopic(this.testTopicName, 1);
        this.testUtils.sendMessages(this.testTopicName, UtilitiesTestBase.Helpers.jsonifyRecords(dataGenerator.generateInserts("000", Integer.valueOf(1000))));
        KafkaOffsetGen kafkaOffsetGen = new KafkaOffsetGen(this.getConsumerConfigs("latest", "string"));
        OffsetRange[] nextOffsetRanges = kafkaOffsetGen.getNextOffsetRanges(Option.of((Object)lastCheckpointString), 500L, this.metrics);
        Assertions.assertEquals((int)1, (int)nextOffsetRanges.length);
        Assertions.assertEquals((long)250L, (long)nextOffsetRanges[0].fromOffset());
        Assertions.assertEquals((long)750L, (long)nextOffsetRanges[0].untilOffset());
    }

    @Test
    public void testGetNextOffsetRangesFromTimestampCheckpointType() {
        HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator();
        this.testUtils.createTopic(this.testTopicName, 1);
        this.testUtils.sendMessages(this.testTopicName, UtilitiesTestBase.Helpers.jsonifyRecords(dataGenerator.generateInserts("000", Integer.valueOf(1000))));
        KafkaOffsetGen kafkaOffsetGen = new KafkaOffsetGen(this.getConsumerConfigs("latest", "timestamp"));
        OffsetRange[] nextOffsetRanges = kafkaOffsetGen.getNextOffsetRanges(Option.of((Object)String.valueOf(System.currentTimeMillis() - 100000L)), 500L, this.metrics);
        Assertions.assertEquals((int)1, (int)nextOffsetRanges.length);
        Assertions.assertEquals((long)0L, (long)nextOffsetRanges[0].fromOffset());
        Assertions.assertEquals((long)500L, (long)nextOffsetRanges[0].untilOffset());
    }

    @Test
    public void testGetNextOffsetRangesFromMultiplePartitions() {
        HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator();
        this.testUtils.createTopic(this.testTopicName, 2);
        this.testUtils.sendMessages(this.testTopicName, UtilitiesTestBase.Helpers.jsonifyRecords(dataGenerator.generateInserts("000", Integer.valueOf(1000))));
        KafkaOffsetGen kafkaOffsetGen = new KafkaOffsetGen(this.getConsumerConfigs("earliest", "string"));
        OffsetRange[] nextOffsetRanges = kafkaOffsetGen.getNextOffsetRanges(Option.empty(), 499L, this.metrics);
        Assertions.assertEquals((int)3, (int)nextOffsetRanges.length);
        Assertions.assertEquals((long)0L, (long)nextOffsetRanges[0].fromOffset());
        Assertions.assertEquals((long)249L, (long)nextOffsetRanges[0].untilOffset());
        Assertions.assertEquals((long)249L, (long)nextOffsetRanges[1].fromOffset());
        Assertions.assertEquals((long)250L, (long)nextOffsetRanges[1].untilOffset());
        Assertions.assertEquals((long)0L, (long)nextOffsetRanges[2].fromOffset());
        Assertions.assertEquals((long)249L, (long)nextOffsetRanges[2].untilOffset());
    }

    @Test
    public void testGetNextOffsetRangesFromGroup() {
        HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator();
        this.testUtils.createTopic(this.testTopicName, 2);
        this.testUtils.sendMessages(this.testTopicName, UtilitiesTestBase.Helpers.jsonifyRecordsByPartitions(dataGenerator.generateInserts("000", Integer.valueOf(1000)), 2));
        KafkaOffsetGen kafkaOffsetGen = new KafkaOffsetGen(this.getConsumerConfigs("group", "string"));
        String lastCheckpointString = this.testTopicName + ",0:250,1:249";
        kafkaOffsetGen.commitOffsetToKafka(lastCheckpointString);
        OffsetRange[] nextOffsetRanges = kafkaOffsetGen.getNextOffsetRanges(Option.empty(), 300L, this.metrics);
        Assertions.assertEquals((long)250L, (long)nextOffsetRanges[0].fromOffset());
        Assertions.assertEquals((long)400L, (long)nextOffsetRanges[0].untilOffset());
        Assertions.assertEquals((long)249L, (long)nextOffsetRanges[1].fromOffset());
        Assertions.assertEquals((long)399L, (long)nextOffsetRanges[1].untilOffset());
        lastCheckpointString = this.testTopicName + ",0:400,1:500";
        kafkaOffsetGen.commitOffsetToKafka(lastCheckpointString);
        nextOffsetRanges = kafkaOffsetGen.getNextOffsetRanges(Option.empty(), 300L, this.metrics);
        Assertions.assertEquals((int)3, (int)nextOffsetRanges.length);
        Assertions.assertEquals((long)400L, (long)nextOffsetRanges[0].fromOffset());
        Assertions.assertEquals((long)450L, (long)nextOffsetRanges[0].untilOffset());
        Assertions.assertEquals((long)450L, (long)nextOffsetRanges[1].fromOffset());
        Assertions.assertEquals((long)500L, (long)nextOffsetRanges[1].untilOffset());
        Assertions.assertEquals((int)0, (int)nextOffsetRanges[1].partition());
        Assertions.assertEquals((long)500L, (long)nextOffsetRanges[2].fromOffset());
        Assertions.assertEquals((long)500L, (long)nextOffsetRanges[2].untilOffset());
        Assertions.assertEquals((int)1, (int)nextOffsetRanges[2].partition());
        lastCheckpointString = this.testTopicName + ",0:499,1:500";
        kafkaOffsetGen.commitOffsetToKafka(lastCheckpointString);
        nextOffsetRanges = kafkaOffsetGen.getNextOffsetRanges(Option.empty(), 300L, this.metrics);
        Assertions.assertEquals((int)2, (int)nextOffsetRanges.length);
        Assertions.assertEquals((long)499L, (long)nextOffsetRanges[0].fromOffset());
        Assertions.assertEquals((long)500L, (long)nextOffsetRanges[0].untilOffset());
        Assertions.assertEquals((int)0, (int)nextOffsetRanges[0].partition());
        Assertions.assertEquals((long)500L, (long)nextOffsetRanges[1].fromOffset());
        Assertions.assertEquals((long)500L, (long)nextOffsetRanges[1].untilOffset());
        Assertions.assertEquals((int)1, (int)nextOffsetRanges[1].partition());
        kafkaOffsetGen = new KafkaOffsetGen(this.getConsumerConfigs("group", "string"));
        nextOffsetRanges = kafkaOffsetGen.getNextOffsetRanges(Option.empty(), 300L, this.metrics);
        Assertions.assertEquals((long)500L, (long)nextOffsetRanges[0].fromOffset());
        Assertions.assertEquals((long)500L, (long)nextOffsetRanges[0].untilOffset());
        Assertions.assertEquals((long)500L, (long)nextOffsetRanges[1].fromOffset());
        Assertions.assertEquals((long)500L, (long)nextOffsetRanges[1].untilOffset());
    }

    @Test
    public void testGetNextOffsetRangesWithMinPartitionsForSinglePartition() {
        HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator();
        this.testUtils.createTopic(this.testTopicName, 1);
        this.testUtils.sendMessages(this.testTopicName, UtilitiesTestBase.Helpers.jsonifyRecords(dataGenerator.generateInserts("000", Integer.valueOf(1000))));
        TypedProperties props = this.getConsumerConfigs("earliest", "string");
        KafkaOffsetGen kafkaOffsetGen = new KafkaOffsetGen(props);
        OffsetRange[] nextOffsetRanges = kafkaOffsetGen.getNextOffsetRanges(Option.empty(), 300L, this.metrics);
        Assertions.assertEquals((long)0L, (long)nextOffsetRanges[0].fromOffset());
        Assertions.assertEquals((long)300L, (long)nextOffsetRanges[0].untilOffset());
        props.put((Object)KafkaSourceConfig.KAFKA_SOURCE_MIN_PARTITIONS.key(), (Object)2L);
        kafkaOffsetGen = new KafkaOffsetGen(props);
        nextOffsetRanges = kafkaOffsetGen.getNextOffsetRanges(Option.empty(), 300L, this.metrics);
        Assertions.assertEquals((long)0L, (long)nextOffsetRanges[0].fromOffset());
        Assertions.assertEquals((long)150L, (long)nextOffsetRanges[0].untilOffset());
        Assertions.assertEquals((long)150L, (long)nextOffsetRanges[1].fromOffset());
        Assertions.assertEquals((long)300L, (long)nextOffsetRanges[1].untilOffset());
    }

    @Test
    public void testGetNextOffsetRangesWithMinPartitionsForMultiPartition() {
        HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator();
        this.testUtils.createTopic(this.testTopicName, 2);
        this.testUtils.sendMessages(this.testTopicName, UtilitiesTestBase.Helpers.jsonifyRecords(dataGenerator.generateInserts("000", Integer.valueOf(1000))));
        TypedProperties props = this.getConsumerConfigs("earliest", "string");
        KafkaOffsetGen kafkaOffsetGen = new KafkaOffsetGen(props);
        OffsetRange[] nextOffsetRanges = kafkaOffsetGen.getNextOffsetRanges(Option.empty(), 300L, this.metrics);
        Assertions.assertEquals((int)2, (int)nextOffsetRanges.length);
        Assertions.assertEquals((int)0, (int)nextOffsetRanges[0].partition());
        Assertions.assertEquals((long)0L, (long)nextOffsetRanges[0].fromOffset());
        Assertions.assertEquals((long)150L, (long)nextOffsetRanges[0].untilOffset());
        Assertions.assertEquals((int)1, (int)nextOffsetRanges[1].partition());
        Assertions.assertEquals((long)0L, (long)nextOffsetRanges[1].fromOffset());
        Assertions.assertEquals((long)150L, (long)nextOffsetRanges[1].untilOffset());
        props.put((Object)KafkaSourceConfig.KAFKA_SOURCE_MIN_PARTITIONS.key(), (Object)1L);
        kafkaOffsetGen = new KafkaOffsetGen(props);
        nextOffsetRanges = kafkaOffsetGen.getNextOffsetRanges(Option.empty(), 300L, this.metrics);
        Assertions.assertEquals((int)2, (int)nextOffsetRanges.length);
        Assertions.assertEquals((int)0, (int)nextOffsetRanges[0].partition());
        Assertions.assertEquals((long)0L, (long)nextOffsetRanges[0].fromOffset());
        Assertions.assertEquals((long)150L, (long)nextOffsetRanges[0].untilOffset());
        Assertions.assertEquals((int)1, (int)nextOffsetRanges[1].partition());
        Assertions.assertEquals((long)0L, (long)nextOffsetRanges[1].fromOffset());
        Assertions.assertEquals((long)150L, (long)nextOffsetRanges[1].untilOffset());
        props.put((Object)KafkaSourceConfig.KAFKA_SOURCE_MIN_PARTITIONS.key(), (Object)4L);
        kafkaOffsetGen = new KafkaOffsetGen(props);
        nextOffsetRanges = kafkaOffsetGen.getNextOffsetRanges(Option.empty(), 300L, this.metrics);
        Assertions.assertEquals((int)4, (int)nextOffsetRanges.length);
        Assertions.assertEquals((int)0, (int)nextOffsetRanges[0].partition());
        Assertions.assertEquals((long)0L, (long)nextOffsetRanges[0].fromOffset());
        Assertions.assertEquals((long)75L, (long)nextOffsetRanges[0].untilOffset());
        Assertions.assertEquals((int)0, (int)nextOffsetRanges[1].partition());
        Assertions.assertEquals((long)75L, (long)nextOffsetRanges[1].fromOffset());
        Assertions.assertEquals((long)150L, (long)nextOffsetRanges[1].untilOffset());
        Assertions.assertEquals((int)1, (int)nextOffsetRanges[2].partition());
        Assertions.assertEquals((long)0L, (long)nextOffsetRanges[2].fromOffset());
        Assertions.assertEquals((long)75L, (long)nextOffsetRanges[2].untilOffset());
        Assertions.assertEquals((int)1, (int)nextOffsetRanges[3].partition());
        Assertions.assertEquals((long)75L, (long)nextOffsetRanges[3].fromOffset());
        Assertions.assertEquals((long)150L, (long)nextOffsetRanges[3].untilOffset());
    }

    @Test
    public void testCheckTopicExists() {
        TypedProperties props = this.getConsumerConfigs("latest", "string");
        KafkaOffsetGen kafkaOffsetGen = new KafkaOffsetGen(props);
        this.testUtils.createTopic(this.testTopicName, 1);
        boolean topicExists = kafkaOffsetGen.checkTopicExists(new KafkaConsumer((Properties)props));
        Assertions.assertTrue((boolean)topicExists);
        props.put((Object)"hoodie.streamer.source.kafka.topic", (Object)"random");
        kafkaOffsetGen = new KafkaOffsetGen(props);
        topicExists = kafkaOffsetGen.checkTopicExists(new KafkaConsumer((Properties)props));
        Assertions.assertFalse((boolean)topicExists);
    }

    @Test
    public void testTopicNameNotPresentInProps() {
        Assertions.assertThrows(HoodieNotSupportedException.class, () -> new KafkaOffsetGen(new TypedProperties()));
    }
}

