/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.utilities.sources.helpers;

import java.time.Instant;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.table.checkpoint.StreamerCheckpointV2;
import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
import org.apache.hudi.common.util.LogicalClock;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieNotSupportedException;
import org.apache.hudi.utilities.config.KafkaSourceConfig;
import org.apache.hudi.utilities.ingestion.HoodieIngestionMetrics;
import org.apache.hudi.utilities.sources.helpers.KafkaOffsetGen;
import org.apache.hudi.utilities.testutils.UtilitiesTestBase;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.Config;
import org.apache.kafka.clients.admin.ConfigEntry;
import org.apache.kafka.clients.admin.DescribeConfigsResult;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndTimestamp;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.config.ConfigResource;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.spark.streaming.kafka010.KafkaTestUtils;
import org.apache.spark.streaming.kafka010.OffsetRange;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.CsvSource;
import org.junit.jupiter.params.provider.MethodSource;
import org.mockito.ArgumentMatchers;
import org.mockito.MockedStatic;
import org.mockito.Mockito;
import org.mockito.verification.VerificationMode;

public class TestKafkaOffsetGen {
    private final String testTopicName = "hoodie_test_" + UUID.randomUUID();
    private HoodieIngestionMetrics metrics = (HoodieIngestionMetrics)Mockito.mock(HoodieIngestionMetrics.class);
    private KafkaTestUtils testUtils;

    @BeforeEach
    public void setup() throws Exception {
        this.testUtils = new KafkaTestUtils();
        this.testUtils.setup();
    }

    @AfterEach
    public void teardown() throws Exception {
        this.testUtils.teardown();
    }

    private TypedProperties getConsumerConfigs(String autoOffsetReset, String kafkaCheckpointType) {
        TypedProperties props = new TypedProperties();
        props.put((Object)"hoodie.streamer.source.kafka.checkpoint.type", (Object)kafkaCheckpointType);
        props.put((Object)"auto.offset.reset", (Object)autoOffsetReset);
        props.put((Object)"hoodie.streamer.source.kafka.topic", (Object)this.testTopicName);
        props.setProperty("bootstrap.servers", this.testUtils.brokerAddress());
        props.setProperty("key.deserializer", StringDeserializer.class.getName());
        props.setProperty("value.deserializer", StringDeserializer.class.getName());
        props.setProperty("group.id", UUID.randomUUID().toString());
        return props;
    }

    private TypedProperties getConsumerConfigs(String topicName, String autoOffsetReset, String kafkaCheckpointType) {
        TypedProperties props = this.getConsumerConfigs(autoOffsetReset, kafkaCheckpointType);
        props.put((Object)"hoodie.streamer.source.kafka.topic", (Object)topicName);
        return props;
    }

    @Test
    public void testGetNextOffsetRangesFromEarliest() {
        HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator();
        this.testUtils.createTopic(this.testTopicName, 1);
        this.testUtils.sendMessages(this.testTopicName, UtilitiesTestBase.Helpers.jsonifyRecords(dataGenerator.generateInserts("000", Integer.valueOf(1000))));
        KafkaOffsetGen kafkaOffsetGen = new KafkaOffsetGen(this.getConsumerConfigs("earliest", "string"));
        OffsetRange[] nextOffsetRanges = kafkaOffsetGen.getNextOffsetRanges(Option.empty(), 500L, this.metrics);
        Assertions.assertEquals((int)1, (int)nextOffsetRanges.length);
        Assertions.assertEquals((long)0L, (long)nextOffsetRanges[0].fromOffset());
        Assertions.assertEquals((long)500L, (long)nextOffsetRanges[0].untilOffset());
        nextOffsetRanges = kafkaOffsetGen.getNextOffsetRanges(Option.empty(), 5000L, this.metrics);
        Assertions.assertEquals((int)1, (int)nextOffsetRanges.length);
        Assertions.assertEquals((long)0L, (long)nextOffsetRanges[0].fromOffset());
        Assertions.assertEquals((long)1000L, (long)nextOffsetRanges[0].untilOffset());
    }

    @Test
    public void testGetNextOffsetRangesFromLatest() {
        HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator();
        this.testUtils.createTopic(this.testTopicName, 1);
        this.testUtils.sendMessages(this.testTopicName, UtilitiesTestBase.Helpers.jsonifyRecords(dataGenerator.generateInserts("000", Integer.valueOf(1000))));
        KafkaOffsetGen kafkaOffsetGen = new KafkaOffsetGen(this.getConsumerConfigs("latest", "string"));
        OffsetRange[] nextOffsetRanges = kafkaOffsetGen.getNextOffsetRanges(Option.empty(), 500L, this.metrics);
        Assertions.assertEquals((int)1, (int)nextOffsetRanges.length);
        Assertions.assertEquals((long)1000L, (long)nextOffsetRanges[0].fromOffset());
        Assertions.assertEquals((long)1000L, (long)nextOffsetRanges[0].untilOffset());
    }

    @Test
    public void testGetNextOffsetRangesFromCheckpoint() {
        String lastCheckpointString = this.testTopicName + ",0:250";
        HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator();
        this.testUtils.createTopic(this.testTopicName, 1);
        this.testUtils.sendMessages(this.testTopicName, UtilitiesTestBase.Helpers.jsonifyRecords(dataGenerator.generateInserts("000", Integer.valueOf(1000))));
        KafkaOffsetGen kafkaOffsetGen = new KafkaOffsetGen(this.getConsumerConfigs("latest", "string"));
        OffsetRange[] nextOffsetRanges = kafkaOffsetGen.getNextOffsetRanges(Option.of((Object)new StreamerCheckpointV2(lastCheckpointString)), 500L, this.metrics);
        Assertions.assertEquals((int)1, (int)nextOffsetRanges.length);
        Assertions.assertEquals((long)250L, (long)nextOffsetRanges[0].fromOffset());
        Assertions.assertEquals((long)750L, (long)nextOffsetRanges[0].untilOffset());
    }

    @Test
    public void testGetNextOffsetRangesFromTimestampCheckpointType() {
        HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator();
        this.testUtils.createTopic(this.testTopicName, 1);
        this.testUtils.sendMessages(this.testTopicName, UtilitiesTestBase.Helpers.jsonifyRecords(dataGenerator.generateInserts("000", Integer.valueOf(1000))));
        KafkaOffsetGen kafkaOffsetGen = new KafkaOffsetGen(this.getConsumerConfigs("latest", "timestamp"));
        OffsetRange[] nextOffsetRanges = kafkaOffsetGen.getNextOffsetRanges(Option.of((Object)new StreamerCheckpointV2(String.valueOf(System.currentTimeMillis() - 100000L))), 500L, this.metrics);
        Assertions.assertEquals((int)1, (int)nextOffsetRanges.length);
        Assertions.assertEquals((long)0L, (long)nextOffsetRanges[0].fromOffset());
        Assertions.assertEquals((long)500L, (long)nextOffsetRanges[0].untilOffset());
    }

    @Test
    public void testGetNextOffsetRangesFromSingleOffsetCheckpoint() {
        HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator();
        this.testUtils.createTopic(this.testTopicName, 1);
        this.testUtils.sendMessages(this.testTopicName, UtilitiesTestBase.Helpers.jsonifyRecords(dataGenerator.generateInserts("000", Integer.valueOf(1000))));
        KafkaOffsetGen kafkaOffsetGen = new KafkaOffsetGen(this.getConsumerConfigs("latest", "single_offset"));
        String lastCheckpointString = "250";
        OffsetRange[] nextOffsetRanges = kafkaOffsetGen.getNextOffsetRanges(Option.of((Object)new StreamerCheckpointV2(lastCheckpointString)), 500L, this.metrics);
        Assertions.assertEquals((int)1, (int)nextOffsetRanges.length);
        Assertions.assertEquals((long)250L, (long)nextOffsetRanges[0].fromOffset());
        Assertions.assertEquals((long)750L, (long)nextOffsetRanges[0].untilOffset());
        lastCheckpointString = "-2";
        nextOffsetRanges = kafkaOffsetGen.getNextOffsetRanges(Option.of((Object)new StreamerCheckpointV2(lastCheckpointString)), 500L, this.metrics);
        Assertions.assertEquals((int)1, (int)nextOffsetRanges.length);
        Assertions.assertEquals((long)1000L, (long)nextOffsetRanges[0].fromOffset());
        Assertions.assertEquals((long)1000L, (long)nextOffsetRanges[0].untilOffset());
        kafkaOffsetGen = new KafkaOffsetGen(this.getConsumerConfigs("earliest", "single_offset"));
        lastCheckpointString = "garbage";
        nextOffsetRanges = kafkaOffsetGen.getNextOffsetRanges(Option.of((Object)new StreamerCheckpointV2(lastCheckpointString)), 5000L, this.metrics);
        Assertions.assertEquals((int)1, (int)nextOffsetRanges.length);
        Assertions.assertEquals((long)0L, (long)nextOffsetRanges[0].fromOffset());
        Assertions.assertEquals((long)1000L, (long)nextOffsetRanges[0].untilOffset());
    }

    @Test
    public void testGetNextOffsetRangesFromSingleOffsetCheckpointNotApplicable() {
        this.testUtils.createTopic(this.testTopicName, 2);
        KafkaOffsetGen kafkaOffsetGen = new KafkaOffsetGen(this.getConsumerConfigs("latest", "single_offset"));
        String lastCheckpointString = "250";
        Exception exception = (Exception)Assertions.assertThrows(HoodieException.class, () -> kafkaOffsetGen.getNextOffsetRanges(Option.of((Object)new StreamerCheckpointV2(lastCheckpointString)), 500L, this.metrics));
        Assertions.assertTrue((boolean)exception.getMessage().startsWith("Kafka topic " + this.testTopicName + " has 2 partitions (more than 1)"));
    }

    @Test
    public void testGetNextOffsetRangesFromMultiplePartitions() {
        HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator();
        this.testUtils.createTopic(this.testTopicName, 2);
        this.testUtils.sendMessages(this.testTopicName, UtilitiesTestBase.Helpers.jsonifyRecords(dataGenerator.generateInserts("000", Integer.valueOf(1000))));
        KafkaOffsetGen kafkaOffsetGen = new KafkaOffsetGen(this.getConsumerConfigs("earliest", "string"));
        OffsetRange[] nextOffsetRanges = kafkaOffsetGen.getNextOffsetRanges(Option.empty(), 499L, this.metrics);
        Assertions.assertEquals((int)3, (int)nextOffsetRanges.length);
        Assertions.assertEquals((long)0L, (long)nextOffsetRanges[0].fromOffset());
        Assertions.assertEquals((long)249L, (long)nextOffsetRanges[0].untilOffset());
        Assertions.assertEquals((long)249L, (long)nextOffsetRanges[1].fromOffset());
        Assertions.assertEquals((long)250L, (long)nextOffsetRanges[1].untilOffset());
        Assertions.assertEquals((long)0L, (long)nextOffsetRanges[2].fromOffset());
        Assertions.assertEquals((long)249L, (long)nextOffsetRanges[2].untilOffset());
    }

    @Test
    public void testGetNextOffsetRangesFromGroup() {
        HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator();
        this.testUtils.createTopic(this.testTopicName, 2);
        this.testUtils.sendMessages(this.testTopicName, UtilitiesTestBase.Helpers.jsonifyRecordsByPartitions(dataGenerator.generateInserts("000", Integer.valueOf(1000)), 2));
        KafkaOffsetGen kafkaOffsetGen = new KafkaOffsetGen(this.getConsumerConfigs("group", "string"));
        String lastCheckpointString = this.testTopicName + ",0:250,1:249";
        kafkaOffsetGen.commitOffsetToKafka(lastCheckpointString);
        OffsetRange[] nextOffsetRanges = kafkaOffsetGen.getNextOffsetRanges(Option.empty(), 300L, this.metrics);
        Assertions.assertEquals((long)250L, (long)nextOffsetRanges[0].fromOffset());
        Assertions.assertEquals((long)400L, (long)nextOffsetRanges[0].untilOffset());
        Assertions.assertEquals((long)249L, (long)nextOffsetRanges[1].fromOffset());
        Assertions.assertEquals((long)399L, (long)nextOffsetRanges[1].untilOffset());
        lastCheckpointString = this.testTopicName + ",0:400,1:500";
        kafkaOffsetGen.commitOffsetToKafka(lastCheckpointString);
        nextOffsetRanges = kafkaOffsetGen.getNextOffsetRanges(Option.empty(), 300L, this.metrics);
        Assertions.assertEquals((int)3, (int)nextOffsetRanges.length);
        Assertions.assertEquals((long)400L, (long)nextOffsetRanges[0].fromOffset());
        Assertions.assertEquals((long)450L, (long)nextOffsetRanges[0].untilOffset());
        Assertions.assertEquals((long)450L, (long)nextOffsetRanges[1].fromOffset());
        Assertions.assertEquals((long)500L, (long)nextOffsetRanges[1].untilOffset());
        Assertions.assertEquals((int)0, (int)nextOffsetRanges[1].partition());
        Assertions.assertEquals((long)500L, (long)nextOffsetRanges[2].fromOffset());
        Assertions.assertEquals((long)500L, (long)nextOffsetRanges[2].untilOffset());
        Assertions.assertEquals((int)1, (int)nextOffsetRanges[2].partition());
        lastCheckpointString = this.testTopicName + ",0:499,1:500";
        kafkaOffsetGen.commitOffsetToKafka(lastCheckpointString);
        nextOffsetRanges = kafkaOffsetGen.getNextOffsetRanges(Option.empty(), 300L, this.metrics);
        Assertions.assertEquals((int)2, (int)nextOffsetRanges.length);
        Assertions.assertEquals((long)499L, (long)nextOffsetRanges[0].fromOffset());
        Assertions.assertEquals((long)500L, (long)nextOffsetRanges[0].untilOffset());
        Assertions.assertEquals((int)0, (int)nextOffsetRanges[0].partition());
        Assertions.assertEquals((long)500L, (long)nextOffsetRanges[1].fromOffset());
        Assertions.assertEquals((long)500L, (long)nextOffsetRanges[1].untilOffset());
        Assertions.assertEquals((int)1, (int)nextOffsetRanges[1].partition());
        kafkaOffsetGen = new KafkaOffsetGen(this.getConsumerConfigs("group", "string"));
        nextOffsetRanges = kafkaOffsetGen.getNextOffsetRanges(Option.empty(), 300L, this.metrics);
        Assertions.assertEquals((long)500L, (long)nextOffsetRanges[0].fromOffset());
        Assertions.assertEquals((long)500L, (long)nextOffsetRanges[0].untilOffset());
        Assertions.assertEquals((long)500L, (long)nextOffsetRanges[1].fromOffset());
        Assertions.assertEquals((long)500L, (long)nextOffsetRanges[1].untilOffset());
    }

    @Test
    public void testGetNextOffsetRangesWithMinPartitionsForSinglePartition() {
        HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator();
        this.testUtils.createTopic(this.testTopicName, 1);
        this.testUtils.sendMessages(this.testTopicName, UtilitiesTestBase.Helpers.jsonifyRecords(dataGenerator.generateInserts("000", Integer.valueOf(1000))));
        TypedProperties props = this.getConsumerConfigs("earliest", "string");
        KafkaOffsetGen kafkaOffsetGen = new KafkaOffsetGen(props);
        OffsetRange[] nextOffsetRanges = kafkaOffsetGen.getNextOffsetRanges(Option.empty(), 300L, this.metrics);
        Assertions.assertEquals((long)0L, (long)nextOffsetRanges[0].fromOffset());
        Assertions.assertEquals((long)300L, (long)nextOffsetRanges[0].untilOffset());
        props.put((Object)KafkaSourceConfig.KAFKA_SOURCE_MIN_PARTITIONS.key(), (Object)2L);
        props.put((Object)KafkaSourceConfig.KAFKA_FETCH_PARTITION_TIME_OUT.key(), (Object)1L);
        kafkaOffsetGen = new KafkaOffsetGen(props);
        nextOffsetRanges = kafkaOffsetGen.getNextOffsetRanges(Option.empty(), 300L, this.metrics);
        Assertions.assertEquals((long)0L, (long)nextOffsetRanges[0].fromOffset());
        Assertions.assertEquals((long)150L, (long)nextOffsetRanges[0].untilOffset());
        Assertions.assertEquals((long)150L, (long)nextOffsetRanges[1].fromOffset());
        Assertions.assertEquals((long)300L, (long)nextOffsetRanges[1].untilOffset());
    }

    @Test
    public void testGetNextOffsetRangesWithMinPartitionsForMultiPartition() {
        HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator();
        this.testUtils.createTopic(this.testTopicName, 2);
        this.testUtils.sendMessages(this.testTopicName, UtilitiesTestBase.Helpers.jsonifyRecords(dataGenerator.generateInserts("000", Integer.valueOf(1000))));
        TypedProperties props = this.getConsumerConfigs("earliest", "string");
        KafkaOffsetGen kafkaOffsetGen = new KafkaOffsetGen(props);
        OffsetRange[] nextOffsetRanges = kafkaOffsetGen.getNextOffsetRanges(Option.empty(), 300L, this.metrics);
        Assertions.assertEquals((int)2, (int)nextOffsetRanges.length);
        Assertions.assertEquals((int)0, (int)nextOffsetRanges[0].partition());
        Assertions.assertEquals((long)0L, (long)nextOffsetRanges[0].fromOffset());
        Assertions.assertEquals((long)150L, (long)nextOffsetRanges[0].untilOffset());
        Assertions.assertEquals((int)1, (int)nextOffsetRanges[1].partition());
        Assertions.assertEquals((long)0L, (long)nextOffsetRanges[1].fromOffset());
        Assertions.assertEquals((long)150L, (long)nextOffsetRanges[1].untilOffset());
        props.put((Object)KafkaSourceConfig.KAFKA_SOURCE_MIN_PARTITIONS.key(), (Object)1L);
        kafkaOffsetGen = new KafkaOffsetGen(props);
        nextOffsetRanges = kafkaOffsetGen.getNextOffsetRanges(Option.empty(), 300L, this.metrics);
        Assertions.assertEquals((int)2, (int)nextOffsetRanges.length);
        Assertions.assertEquals((int)0, (int)nextOffsetRanges[0].partition());
        Assertions.assertEquals((long)0L, (long)nextOffsetRanges[0].fromOffset());
        Assertions.assertEquals((long)150L, (long)nextOffsetRanges[0].untilOffset());
        Assertions.assertEquals((int)1, (int)nextOffsetRanges[1].partition());
        Assertions.assertEquals((long)0L, (long)nextOffsetRanges[1].fromOffset());
        Assertions.assertEquals((long)150L, (long)nextOffsetRanges[1].untilOffset());
        props.put((Object)KafkaSourceConfig.KAFKA_SOURCE_MIN_PARTITIONS.key(), (Object)4L);
        kafkaOffsetGen = new KafkaOffsetGen(props);
        nextOffsetRanges = kafkaOffsetGen.getNextOffsetRanges(Option.empty(), 300L, this.metrics);
        Assertions.assertEquals((int)4, (int)nextOffsetRanges.length);
        Assertions.assertEquals((int)0, (int)nextOffsetRanges[0].partition());
        Assertions.assertEquals((long)0L, (long)nextOffsetRanges[0].fromOffset());
        Assertions.assertEquals((long)75L, (long)nextOffsetRanges[0].untilOffset());
        Assertions.assertEquals((int)0, (int)nextOffsetRanges[1].partition());
        Assertions.assertEquals((long)75L, (long)nextOffsetRanges[1].fromOffset());
        Assertions.assertEquals((long)150L, (long)nextOffsetRanges[1].untilOffset());
        Assertions.assertEquals((int)1, (int)nextOffsetRanges[2].partition());
        Assertions.assertEquals((long)0L, (long)nextOffsetRanges[2].fromOffset());
        Assertions.assertEquals((long)75L, (long)nextOffsetRanges[2].untilOffset());
        Assertions.assertEquals((int)1, (int)nextOffsetRanges[3].partition());
        Assertions.assertEquals((long)75L, (long)nextOffsetRanges[3].fromOffset());
        Assertions.assertEquals((long)150L, (long)nextOffsetRanges[3].untilOffset());
    }

    @Test
    public void testCheckTopicExists() {
        TypedProperties props = this.getConsumerConfigs("latest", "string");
        KafkaOffsetGen kafkaOffsetGen = new KafkaOffsetGen(props);
        this.testUtils.createTopic(this.testTopicName, 1);
        boolean topicExists = kafkaOffsetGen.checkTopicExists(new KafkaConsumer((Properties)props));
        Assertions.assertTrue((boolean)topicExists);
        props.put((Object)"hoodie.streamer.source.kafka.topic", (Object)"random");
        kafkaOffsetGen = new KafkaOffsetGen(props);
        topicExists = kafkaOffsetGen.checkTopicExists(new KafkaConsumer((Properties)props));
        Assertions.assertFalse((boolean)topicExists);
    }

    @Test
    public void testTopicNameNotPresentInProps() {
        Assertions.assertThrows(HoodieNotSupportedException.class, () -> new KafkaOffsetGen(new TypedProperties()));
    }

    @ParameterizedTest
    @CsvSource(value={"null", "86400000", "-1"})
    void testGetTopicRetentionMs(String retentionMsStr) {
        Long retentionMs;
        KafkaOffsetGen kafkaOffsetGen = new KafkaOffsetGen(this.getConsumerConfigs("earliest", "string"));
        Long l = retentionMs = "null".equals(retentionMsStr) ? null : Long.valueOf(retentionMsStr);
        if (retentionMs != null) {
            Properties props = new Properties();
            props.put("retention.ms", retentionMsStr);
            this.testUtils.createTopic(this.testTopicName, 1, props);
        } else {
            this.testUtils.createTopic(this.testTopicName, 1);
        }
        Long topicRetentionMs = kafkaOffsetGen.getTopicRetentionMs(this.testTopicName);
        if (retentionMs != null) {
            Assertions.assertEquals((Long)retentionMs, (Long)topicRetentionMs);
        } else {
            Assertions.assertEquals((long)TimeUnit.DAYS.toMillis(7L), (Long)topicRetentionMs);
        }
    }

    static Stream<Arguments> getNullRetentionMsTopicConfigTestArgs() {
        return Stream.of(Arguments.of((Object[])new Object[]{new Config(Collections.singletonList(new ConfigEntry("retention.ms", null)))}), Arguments.of((Object[])new Object[]{new Config(Collections.singletonList(new ConfigEntry("retention.bytes", "1024")))}));
    }

    @ParameterizedTest
    @MethodSource(value={"getNullRetentionMsTopicConfigTestArgs"})
    void testGetTopicRetentionMs_retentionMsConfigIsNullOrNotSet(Config topicConfig) {
        KafkaOffsetGen kafkaOffsetGen = new KafkaOffsetGen(this.getConsumerConfigs("earliest", "string"));
        try (MockedStatic staticMock = Mockito.mockStatic(AdminClient.class);){
            this.mockDescribeTopicConfigs((MockedStatic<AdminClient>)staticMock, kafkaOffsetGen.getKafkaParams(), topicConfig);
            Long topicRetentionMs = kafkaOffsetGen.getTopicRetentionMs(this.testTopicName);
            Assertions.assertNull((Object)topicRetentionMs);
        }
    }

    @Test
    void testGetTopicRetentionMs_Failure() {
        KafkaOffsetGen kafkaOffsetGen = new KafkaOffsetGen(this.getConsumerConfigs("earliest", "string"));
        try (MockedStatic staticMock = Mockito.mockStatic(AdminClient.class);){
            AdminClient mock = (AdminClient)Mockito.mock(AdminClient.class);
            staticMock.when(() -> AdminClient.create((Map)kafkaOffsetGen.getKafkaParams())).thenReturn((Object)mock);
            ConfigResource resource = new ConfigResource(ConfigResource.Type.TOPIC, this.testTopicName);
            Mockito.when((Object)mock.describeConfigs(Collections.singleton(resource))).thenThrow(new Throwable[]{new KafkaException("something went wrong")});
            Long topicRetentionMs = kafkaOffsetGen.getTopicRetentionMs(this.testTopicName);
            Assertions.assertNull((Object)topicRetentionMs);
        }
    }

    @Test
    void testResolveFromOffsetsWithRetention_skipBufferMinutesNotSet() {
        KafkaConsumer mockConsumer = (KafkaConsumer)Mockito.mock(KafkaConsumer.class);
        Map fromOffsets = KafkaOffsetGen.CheckpointUtils.strToOffsets((String)String.format("%s,0:10,1:23", this.testTopicName));
        KafkaOffsetGen kafkaOffsetGen = new KafkaOffsetGen(this.getConsumerConfigs("earliest", "string"));
        Map offsets = kafkaOffsetGen.resolveFromOffsetsWithRetention(mockConsumer, new HashMap(fromOffsets), Collections.emptySet());
        Assertions.assertEquals((Object)fromOffsets, (Object)offsets);
        ((KafkaConsumer)Mockito.verify((Object)mockConsumer, (VerificationMode)Mockito.never())).offsetsForTimes((Map)ArgumentMatchers.any());
    }

    @Test
    void testResolveFromOffsetsWithRetention_topicRetentionMsNotSet() {
        KafkaConsumer mockConsumer = (KafkaConsumer)Mockito.mock(KafkaConsumer.class);
        Map fromOffsets = KafkaOffsetGen.CheckpointUtils.strToOffsets((String)String.format("%s,0:10,1:23", this.testTopicName));
        TypedProperties consumerConfigs = this.getConsumerConfigs("earliest", "string");
        consumerConfigs.put((Object)KafkaSourceConfig.OFFSET_SKIP_BUFFER_MINUTES.key(), (Object)"30");
        KafkaOffsetGen kafkaOffsetGen = new KafkaOffsetGen(consumerConfigs);
        try (MockedStatic staticMock = Mockito.mockStatic(AdminClient.class);){
            this.mockDescribeTopicConfigs((MockedStatic<AdminClient>)staticMock, kafkaOffsetGen.getKafkaParams(), new Config(Collections.emptyList()));
            Map offsets = kafkaOffsetGen.resolveFromOffsetsWithRetention(mockConsumer, new HashMap(fromOffsets), Collections.emptySet());
            Assertions.assertEquals((Object)fromOffsets, (Object)offsets);
            ((KafkaConsumer)Mockito.verify((Object)mockConsumer, (VerificationMode)Mockito.never())).offsetsForTimes((Map)ArgumentMatchers.any());
            Long topicRetentionMs = kafkaOffsetGen.getTopicRetentionMs(this.testTopicName);
            Assertions.assertNull((Object)topicRetentionMs);
        }
    }

    @Test
    void testResolveFromOffsetsWithRetention_failureWhenGettingOffsetsForTimes() {
        long retentionMs = 0x6DDD00L;
        long currentEpochMillis = Instant.now().toEpochMilli();
        long offsetSkipIntervalMinutes = 30L;
        long retentionTs = currentEpochMillis - retentionMs + TimeUnit.MINUTES.toMillis(offsetSkipIntervalMinutes);
        Map fromOffsets = KafkaOffsetGen.CheckpointUtils.strToOffsets((String)String.format("%s,0:10,1:23", this.testTopicName));
        Map<TopicPartition, Long> topicPartitionsTimestamp = fromOffsets.entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, entry -> retentionTs));
        KafkaConsumer mockConsumer = (KafkaConsumer)Mockito.mock(KafkaConsumer.class);
        LogicalClock mockClock = (LogicalClock)Mockito.mock(LogicalClock.class);
        Mockito.when((Object)mockConsumer.offsetsForTimes(topicPartitionsTimestamp)).thenThrow(new Throwable[]{new KafkaException("something went wrong")});
        Mockito.when((Object)mockClock.currentEpoch()).thenReturn((Object)currentEpochMillis);
        TypedProperties consumerConfigs = this.getConsumerConfigs("earliest", "string");
        consumerConfigs.put((Object)KafkaSourceConfig.OFFSET_SKIP_BUFFER_MINUTES.key(), (Object)String.valueOf(offsetSkipIntervalMinutes));
        KafkaOffsetGen kafkaOffsetGen = new KafkaOffsetGen(consumerConfigs, mockClock);
        try (MockedStatic staticMock = Mockito.mockStatic(AdminClient.class);){
            Config topicConfig = new Config(Collections.singletonList(new ConfigEntry("retention.ms", String.valueOf(retentionMs))));
            this.mockDescribeTopicConfigs((MockedStatic<AdminClient>)staticMock, kafkaOffsetGen.getKafkaParams(), topicConfig);
            Map offsets = kafkaOffsetGen.resolveFromOffsetsWithRetention(mockConsumer, new HashMap(fromOffsets), fromOffsets.keySet());
            Assertions.assertEquals((Object)fromOffsets, (Object)offsets);
            ((KafkaConsumer)Mockito.verify((Object)mockConsumer, (VerificationMode)Mockito.times((int)1))).offsetsForTimes(topicPartitionsTimestamp);
            ((LogicalClock)Mockito.verify((Object)mockClock, (VerificationMode)Mockito.times((int)1))).currentEpoch();
        }
    }

    static Stream<Arguments> resolveEarliestOffsetsWithRetentionTestArgs() {
        long ts = System.currentTimeMillis();
        String topicName = "kafka-topic-" + UUID.randomUUID();
        Map fromOffsets = KafkaOffsetGen.CheckpointUtils.strToOffsets((String)String.format("%s,0:10,1:23", topicName));
        List<TopicPartition> topicPartitions = Arrays.asList(new TopicPartition(topicName, 0), new TopicPartition(topicName, 1));
        HashMap<TopicPartition, OffsetAndTimestamp> offsetAndTimestamp1 = new HashMap<TopicPartition, OffsetAndTimestamp>();
        offsetAndTimestamp1.put(topicPartitions.get(0), new OffsetAndTimestamp(15L, ts));
        offsetAndTimestamp1.put(topicPartitions.get(1), new OffsetAndTimestamp(26L, ts));
        Map expectedOffsets1 = KafkaOffsetGen.CheckpointUtils.strToOffsets((String)String.format("%s,0:15,1:26", topicName));
        HashMap<TopicPartition, OffsetAndTimestamp> offsetAndTimestamp2 = new HashMap<TopicPartition, OffsetAndTimestamp>();
        offsetAndTimestamp2.put(topicPartitions.get(0), new OffsetAndTimestamp(15L, ts));
        offsetAndTimestamp2.put(topicPartitions.get(1), null);
        Map expectedOffsets2 = KafkaOffsetGen.CheckpointUtils.strToOffsets((String)String.format("%s,0:15,1:23", topicName));
        HashMap<TopicPartition, Object> offsetAndTimestamp3 = new HashMap<TopicPartition, Object>();
        offsetAndTimestamp3.put(topicPartitions.get(0), null);
        offsetAndTimestamp3.put(topicPartitions.get(1), null);
        HashMap<TopicPartition, OffsetAndTimestamp> offsetAndTimestamp4 = new HashMap<TopicPartition, OffsetAndTimestamp>();
        offsetAndTimestamp4.put(topicPartitions.get(0), new OffsetAndTimestamp(15L, ts));
        offsetAndTimestamp4.put(topicPartitions.get(1), new OffsetAndTimestamp(26L, ts));
        Map fromOffsets4 = KafkaOffsetGen.CheckpointUtils.strToOffsets((String)String.format("%s,0:25,1:37", topicName));
        Map expectedOffsets4 = KafkaOffsetGen.CheckpointUtils.strToOffsets((String)String.format("%s,0:25,1:37", topicName));
        return Stream.of(Arguments.of((Object[])new Object[]{fromOffsets, topicPartitions, offsetAndTimestamp1, expectedOffsets1}), Arguments.of((Object[])new Object[]{fromOffsets, topicPartitions, offsetAndTimestamp2, expectedOffsets2}), Arguments.of((Object[])new Object[]{fromOffsets, topicPartitions, offsetAndTimestamp3, fromOffsets}), Arguments.of((Object[])new Object[]{fromOffsets4, topicPartitions, offsetAndTimestamp4, expectedOffsets4}));
    }

    @ParameterizedTest
    @MethodSource(value={"resolveEarliestOffsetsWithRetentionTestArgs"})
    void testResolveEarliestOffsetsWithRetention(Map<TopicPartition, Long> fromOffsets, List<TopicPartition> topicPartitions, Map<TopicPartition, OffsetAndTimestamp> offsetAndTimestamp, Map<TopicPartition, Long> expectedOffsets) {
        String topicName = topicPartitions.get(0).topic();
        long retentionMs = 0x6DDD00L;
        long skipOffsetBufferMinutes = 30L;
        long currentEpochMillis = Instant.now().toEpochMilli();
        long retentionTs = currentEpochMillis - retentionMs + TimeUnit.MINUTES.toMillis(skipOffsetBufferMinutes);
        KafkaConsumer mockConsumer = (KafkaConsumer)Mockito.mock(KafkaConsumer.class);
        LogicalClock mockClock = (LogicalClock)Mockito.mock(LogicalClock.class);
        TypedProperties consumerConfigs = this.getConsumerConfigs(topicName, "earliest", "string");
        consumerConfigs.put((Object)KafkaSourceConfig.OFFSET_SKIP_BUFFER_MINUTES.key(), (Object)String.valueOf(skipOffsetBufferMinutes));
        KafkaOffsetGen kafkaOffsetGen = new KafkaOffsetGen(consumerConfigs, mockClock);
        HashMap<TopicPartition, Long> topicPartitionsTimestamp = new HashMap<TopicPartition, Long>();
        topicPartitionsTimestamp.put(topicPartitions.get(0), retentionTs);
        topicPartitionsTimestamp.put(topicPartitions.get(1), retentionTs);
        Mockito.when((Object)mockConsumer.offsetsForTimes(topicPartitionsTimestamp)).thenReturn(offsetAndTimestamp);
        Mockito.when((Object)mockClock.currentEpoch()).thenReturn((Object)currentEpochMillis);
        try (MockedStatic staticMock = Mockito.mockStatic(AdminClient.class);){
            Config topicConfig = new Config(Collections.singletonList(new ConfigEntry("retention.ms", String.valueOf(retentionMs))));
            this.mockDescribeTopicConfigs((MockedStatic<AdminClient>)staticMock, kafkaOffsetGen.getKafkaParams(), topicConfig, topicName);
            Map offsets = kafkaOffsetGen.resolveFromOffsetsWithRetention(mockConsumer, fromOffsets, new HashSet<TopicPartition>(topicPartitions));
            Assertions.assertEquals(expectedOffsets, (Object)offsets);
            ((LogicalClock)Mockito.verify((Object)mockClock, (VerificationMode)Mockito.times((int)1))).currentEpoch();
            ((KafkaConsumer)Mockito.verify((Object)mockConsumer, (VerificationMode)Mockito.times((int)1))).offsetsForTimes(topicPartitionsTimestamp);
        }
    }

    void mockDescribeTopicConfigs(MockedStatic<AdminClient> staticMock, Map kafkaParams, Config topicConfig) {
        this.mockDescribeTopicConfigs(staticMock, kafkaParams, topicConfig, this.testTopicName);
    }

    void mockDescribeTopicConfigs(MockedStatic<AdminClient> staticMock, Map kafkaParams, Config topicConfig, String topicName) {
        AdminClient mock = (AdminClient)Mockito.mock(AdminClient.class);
        staticMock.when(() -> AdminClient.create((Map)kafkaParams)).thenReturn((Object)mock);
        ConfigResource resource = new ConfigResource(ConfigResource.Type.TOPIC, topicName);
        DescribeConfigsResult mockResult = (DescribeConfigsResult)Mockito.mock(DescribeConfigsResult.class);
        KafkaFuture future = KafkaFuture.completedFuture(Collections.singletonMap(resource, topicConfig));
        Mockito.when((Object)mock.describeConfigs(Collections.singleton(resource))).thenReturn((Object)mockResult);
        Mockito.when((Object)mockResult.all()).thenReturn((Object)future);
    }
}

