/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.utilities.sources.helpers;

import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import org.apache.hudi.utilities.sources.helpers.KafkaOffsetGen;
import org.apache.kafka.common.TopicPartition;
import org.apache.spark.streaming.kafka010.OffsetRange;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;

public class TestCheckpointUtils {
    private static final String TEST_TOPIC_NAME = "hoodie_test";

    @Test
    public void testStringToOffsets() {
        OffsetRange[] ranges = KafkaOffsetGen.CheckpointUtils.computeOffsetRanges(TestCheckpointUtils.makeOffsetMap(new int[]{0, 1}, new long[]{200000L, 250000L}), TestCheckpointUtils.makeOffsetMap(new int[]{0, 1}, new long[]{300000L, 350000L}), (long)1000000L, (long)0L);
        String checkpointStr = KafkaOffsetGen.CheckpointUtils.offsetsToStr((OffsetRange[])ranges);
        Map offsetMap = KafkaOffsetGen.CheckpointUtils.strToOffsets((String)checkpointStr);
        Assertions.assertEquals((int)2, (int)offsetMap.size());
        HashSet<TopicPartition> topicPartitions = new HashSet<TopicPartition>(2);
        TopicPartition partition0 = new TopicPartition(TEST_TOPIC_NAME, 0);
        TopicPartition partition1 = new TopicPartition(TEST_TOPIC_NAME, 1);
        topicPartitions.add(partition0);
        topicPartitions.add(partition1);
        Assertions.assertEquals(topicPartitions, offsetMap.keySet());
        Assertions.assertEquals((long)300000L, (Long)((Long)offsetMap.get(partition0)));
        Assertions.assertEquals((long)350000L, (Long)((Long)offsetMap.get(partition1)));
    }

    @Test
    public void testOffsetToString() {
        OffsetRange[] ranges = KafkaOffsetGen.CheckpointUtils.computeOffsetRanges(TestCheckpointUtils.makeOffsetMap(new int[]{0, 1}, new long[]{200000L, 250000L}), TestCheckpointUtils.makeOffsetMap(new int[]{0, 1}, new long[]{300000L, 350000L}), (long)1000000L, (long)0L);
        Assertions.assertEquals((Object)"hoodie_test,0:300000,1:350000", (Object)KafkaOffsetGen.CheckpointUtils.offsetsToStr((OffsetRange[])ranges));
        ranges = new OffsetRange[]{OffsetRange.apply((String)TEST_TOPIC_NAME, (int)0, (long)0L, (long)100L), OffsetRange.apply((String)TEST_TOPIC_NAME, (int)0, (long)100L, (long)200L), OffsetRange.apply((String)TEST_TOPIC_NAME, (int)1, (long)100L, (long)200L), OffsetRange.apply((String)TEST_TOPIC_NAME, (int)1, (long)200L, (long)300L)};
        Assertions.assertEquals((Object)"hoodie_test,0:200,1:300", (Object)KafkaOffsetGen.CheckpointUtils.offsetsToStr((OffsetRange[])ranges));
    }

    @Test
    public void testComputeOffsetRangesWithoutMinPartitions() {
        long totalMsgs = KafkaOffsetGen.CheckpointUtils.totalNewMessages((OffsetRange[])new OffsetRange[]{OffsetRange.apply((String)TEST_TOPIC_NAME, (int)0, (long)0L, (long)100L), OffsetRange.apply((String)TEST_TOPIC_NAME, (int)0, (long)100L, (long)200L)});
        Assertions.assertEquals((long)200L, (long)totalMsgs);
        OffsetRange[] ranges = KafkaOffsetGen.CheckpointUtils.computeOffsetRanges(TestCheckpointUtils.makeOffsetMap(new int[]{0, 1}, new long[]{200000L, 250000L}), TestCheckpointUtils.makeOffsetMap(new int[]{0, 1}, new long[]{300000L, 350000L}), (long)1000000L, (long)0L);
        Assertions.assertEquals((long)200000L, (long)KafkaOffsetGen.CheckpointUtils.totalNewMessages((OffsetRange[])ranges));
        ranges = KafkaOffsetGen.CheckpointUtils.computeOffsetRanges(TestCheckpointUtils.makeOffsetMap(new int[]{0, 1}, new long[]{200000L, 250000L}), TestCheckpointUtils.makeOffsetMap(new int[]{0, 1}, new long[]{300000L, 350000L}), (long)10000L, (long)0L);
        Assertions.assertEquals((long)10000L, (long)KafkaOffsetGen.CheckpointUtils.totalNewMessages((OffsetRange[])ranges));
        Assertions.assertEquals((long)200000L, (long)ranges[0].fromOffset());
        Assertions.assertEquals((long)205000L, (long)ranges[0].untilOffset());
        Assertions.assertEquals((long)250000L, (long)ranges[1].fromOffset());
        Assertions.assertEquals((long)255000L, (long)ranges[1].untilOffset());
        ranges = KafkaOffsetGen.CheckpointUtils.computeOffsetRanges(TestCheckpointUtils.makeOffsetMap(new int[]{0, 1}, new long[]{200000L, 250000L}), TestCheckpointUtils.makeOffsetMap(new int[]{0, 1, 2}, new long[]{300000L, 350000L, 100000L}), (long)1000000L, (long)0L);
        Assertions.assertEquals((long)300000L, (long)KafkaOffsetGen.CheckpointUtils.totalNewMessages((OffsetRange[])ranges));
        Assertions.assertEquals((int)3, (int)ranges.length);
        ranges = KafkaOffsetGen.CheckpointUtils.computeOffsetRanges(TestCheckpointUtils.makeOffsetMap(new int[]{0, 1}, new long[]{200000L, 250000L}), TestCheckpointUtils.makeOffsetMap(new int[]{0, 1, 2}, new long[]{200010L, 350000L, 10000L}), (long)100000L, (long)0L);
        Assertions.assertEquals((long)100000L, (long)KafkaOffsetGen.CheckpointUtils.totalNewMessages((OffsetRange[])ranges));
        Assertions.assertEquals((int)5, (int)ranges.length);
        Assertions.assertEquals((int)0, (int)ranges[0].partition());
        Assertions.assertEquals((long)10L, (long)ranges[0].count());
        Assertions.assertEquals((int)1, (int)ranges[1].partition());
        Assertions.assertEquals((long)33333L, (long)ranges[1].count());
        Assertions.assertEquals((long)33333L, (long)ranges[2].count());
        Assertions.assertEquals((long)23324L, (long)ranges[3].count());
        Assertions.assertEquals((int)2, (int)ranges[4].partition());
        Assertions.assertEquals((long)10000L, (long)ranges[4].count());
        ranges = KafkaOffsetGen.CheckpointUtils.computeOffsetRanges(TestCheckpointUtils.makeOffsetMap(new int[]{0, 1}, new long[]{200000L, 250000L}), TestCheckpointUtils.makeOffsetMap(new int[]{0, 1, 2}, new long[]{200010L, 350000L, 10000L}), (long)1000000L, (long)0L);
        Assertions.assertEquals((long)110010L, (long)KafkaOffsetGen.CheckpointUtils.totalNewMessages((OffsetRange[])ranges));
        Assertions.assertEquals((long)10L, (long)ranges[0].count());
        Assertions.assertEquals((long)36670L, (long)ranges[1].count());
        Assertions.assertEquals((long)36670L, (long)ranges[2].count());
        Assertions.assertEquals((long)26660L, (long)ranges[3].count());
        Assertions.assertEquals((long)10000L, (long)ranges[4].count());
        ranges = KafkaOffsetGen.CheckpointUtils.computeOffsetRanges(TestCheckpointUtils.makeOffsetMap(new int[]{0, 1, 2, 3, 4}, new long[]{0L, 0L, 0L, 0L, 0L}), TestCheckpointUtils.makeOffsetMap(new int[]{0, 1, 2, 3, 4}, new long[]{100L, 1000L, 1000L, 1000L, 1000L}), (long)1001L, (long)0L);
        Assertions.assertEquals((long)1001L, (long)KafkaOffsetGen.CheckpointUtils.totalNewMessages((OffsetRange[])ranges));
        Assertions.assertEquals((long)100L, (long)ranges[0].count());
        Assertions.assertEquals((long)200L, (long)ranges[1].count());
        Assertions.assertEquals((long)101L, (long)ranges[2].count());
        Assertions.assertEquals((long)200L, (long)ranges[3].count());
        Assertions.assertEquals((long)200L, (long)ranges[4].count());
        Assertions.assertEquals((long)200L, (long)ranges[5].count());
    }

    @Test
    public void testComputeOffsetRangesWithMinPartitions() {
        OffsetRange[] ranges = KafkaOffsetGen.CheckpointUtils.computeOffsetRanges(TestCheckpointUtils.makeOffsetMap(new int[]{0}, new long[]{0L}), TestCheckpointUtils.makeOffsetMap(new int[]{0}, new long[]{1000L}), (long)300L, (long)0L);
        Assertions.assertEquals((int)1, (int)ranges.length);
        Assertions.assertEquals((long)0L, (long)ranges[0].fromOffset());
        Assertions.assertEquals((long)300L, (long)ranges[0].untilOffset());
        ranges = KafkaOffsetGen.CheckpointUtils.computeOffsetRanges(TestCheckpointUtils.makeOffsetMap(new int[]{0, 1}, new long[]{0L, 0L}), TestCheckpointUtils.makeOffsetMap(new int[]{0, 1}, new long[]{1000L, 1000L}), (long)300L, (long)0L);
        Assertions.assertEquals((int)2, (int)ranges.length);
        Assertions.assertEquals((long)0L, (long)ranges[0].fromOffset());
        Assertions.assertEquals((long)150L, (long)ranges[0].untilOffset());
        Assertions.assertEquals((long)0L, (long)ranges[1].fromOffset());
        Assertions.assertEquals((long)150L, (long)ranges[1].untilOffset());
        ranges = KafkaOffsetGen.CheckpointUtils.computeOffsetRanges(TestCheckpointUtils.makeOffsetMap(new int[]{0, 1, 2}, new long[]{0L, 0L, 0L}), TestCheckpointUtils.makeOffsetMap(new int[]{0, 1, 2}, new long[]{1000L, 1000L, 1000L}), (long)300L, (long)3L);
        Assertions.assertEquals((int)3, (int)ranges.length);
        Assertions.assertEquals((long)0L, (long)ranges[0].fromOffset());
        Assertions.assertEquals((long)100L, (long)ranges[0].untilOffset());
        Assertions.assertEquals((long)0L, (long)ranges[1].fromOffset());
        Assertions.assertEquals((long)100L, (long)ranges[1].untilOffset());
        Assertions.assertEquals((long)0L, (long)ranges[1].fromOffset());
        Assertions.assertEquals((long)100L, (long)ranges[1].untilOffset());
        ranges = KafkaOffsetGen.CheckpointUtils.computeOffsetRanges(TestCheckpointUtils.makeOffsetMap(new int[]{0}, new long[]{0L}), TestCheckpointUtils.makeOffsetMap(new int[]{0}, new long[]{1000L}), (long)300L, (long)3L);
        Assertions.assertEquals((int)3, (int)ranges.length);
        Assertions.assertEquals((long)0L, (long)ranges[0].fromOffset());
        Assertions.assertEquals((long)100L, (long)ranges[0].untilOffset());
        Assertions.assertEquals((long)100L, (long)ranges[1].fromOffset());
        Assertions.assertEquals((long)200L, (long)ranges[1].untilOffset());
        Assertions.assertEquals((long)200L, (long)ranges[2].fromOffset());
        Assertions.assertEquals((long)300L, (long)ranges[2].untilOffset());
        ranges = KafkaOffsetGen.CheckpointUtils.computeOffsetRanges(TestCheckpointUtils.makeOffsetMap(new int[]{0, 1}, new long[]{0L, 0L}), TestCheckpointUtils.makeOffsetMap(new int[]{0, 1}, new long[]{100L, 500L}), (long)600L, (long)3L);
        Assertions.assertEquals((int)4, (int)ranges.length);
        Assertions.assertEquals((long)0L, (long)ranges[0].fromOffset());
        Assertions.assertEquals((long)100L, (long)ranges[0].untilOffset());
        Assertions.assertEquals((long)0L, (long)ranges[1].fromOffset());
        Assertions.assertEquals((long)200L, (long)ranges[1].untilOffset());
        Assertions.assertEquals((long)200L, (long)ranges[2].fromOffset());
        Assertions.assertEquals((long)400L, (long)ranges[2].untilOffset());
        Assertions.assertEquals((long)400L, (long)ranges[3].fromOffset());
        Assertions.assertEquals((long)500L, (long)ranges[3].untilOffset());
        ranges = KafkaOffsetGen.CheckpointUtils.computeOffsetRanges(TestCheckpointUtils.makeOffsetMap(new int[]{0}, new long[]{0L}), TestCheckpointUtils.makeOffsetMap(new int[]{0}, new long[]{100L}), (long)600L, (long)3L);
        Assertions.assertEquals((int)4, (int)ranges.length);
        Assertions.assertEquals((long)0L, (long)ranges[0].fromOffset());
        Assertions.assertEquals((long)33L, (long)ranges[0].untilOffset());
        Assertions.assertEquals((long)33L, (long)ranges[1].fromOffset());
        Assertions.assertEquals((long)66L, (long)ranges[1].untilOffset());
        Assertions.assertEquals((long)66L, (long)ranges[2].fromOffset());
        Assertions.assertEquals((long)99L, (long)ranges[2].untilOffset());
        Assertions.assertEquals((long)99L, (long)ranges[3].fromOffset());
        Assertions.assertEquals((long)100L, (long)ranges[3].untilOffset());
        ranges = KafkaOffsetGen.CheckpointUtils.computeOffsetRanges(TestCheckpointUtils.makeOffsetMap(new int[]{0, 1}, new long[]{100L, 0L}), TestCheckpointUtils.makeOffsetMap(new int[]{0, 1}, new long[]{100L, 600L}), (long)600L, (long)3L);
        Assertions.assertEquals((int)4, (int)ranges.length);
        Assertions.assertEquals((int)0, (int)ranges[0].partition());
        Assertions.assertEquals((long)100L, (long)ranges[0].fromOffset());
        Assertions.assertEquals((long)100L, (long)ranges[0].untilOffset());
        Assertions.assertEquals((int)1, (int)ranges[1].partition());
        Assertions.assertEquals((long)0L, (long)ranges[1].fromOffset());
        Assertions.assertEquals((long)200L, (long)ranges[1].untilOffset());
        Assertions.assertEquals((int)1, (int)ranges[2].partition());
        Assertions.assertEquals((long)200L, (long)ranges[2].fromOffset());
        Assertions.assertEquals((long)400L, (long)ranges[2].untilOffset());
        Assertions.assertEquals((long)400L, (long)ranges[3].fromOffset());
        Assertions.assertEquals((long)600L, (long)ranges[3].untilOffset());
        ranges = KafkaOffsetGen.CheckpointUtils.computeOffsetRanges(TestCheckpointUtils.makeOffsetMap(new int[]{0, 1}, new long[]{100L, 0L}), TestCheckpointUtils.makeOffsetMap(new int[]{0, 1}, new long[]{100L, 0L}), (long)600L, (long)3L);
        Assertions.assertEquals((long)0L, (long)KafkaOffsetGen.CheckpointUtils.totalNewMessages((OffsetRange[])ranges));
        Assertions.assertEquals((int)2, (int)ranges.length);
        Assertions.assertEquals((int)0, (int)ranges[0].partition());
        Assertions.assertEquals((long)100L, (long)ranges[0].fromOffset());
        Assertions.assertEquals((long)100L, (long)ranges[0].untilOffset());
        Assertions.assertEquals((int)1, (int)ranges[1].partition());
        Assertions.assertEquals((long)0L, (long)ranges[1].fromOffset());
        Assertions.assertEquals((long)0L, (long)ranges[1].untilOffset());
        ranges = KafkaOffsetGen.CheckpointUtils.computeOffsetRanges(TestCheckpointUtils.makeOffsetMap(new int[]{0}, new long[]{0L}), TestCheckpointUtils.makeOffsetMap(new int[]{0}, new long[]{2L}), (long)600L, (long)3L);
        Assertions.assertEquals((int)2, (int)ranges.length);
        Assertions.assertEquals((long)0L, (long)ranges[0].fromOffset());
        Assertions.assertEquals((long)1L, (long)ranges[0].untilOffset());
        Assertions.assertEquals((long)1L, (long)ranges[1].fromOffset());
        Assertions.assertEquals((long)2L, (long)ranges[1].untilOffset());
    }

    @Test
    public void testSplitAndMergeRanges() {
        OffsetRange range = OffsetRange.apply((String)TEST_TOPIC_NAME, (int)0, (long)0L, (long)100L);
        OffsetRange[] ranges = KafkaOffsetGen.CheckpointUtils.computeOffsetRanges(TestCheckpointUtils.makeOffsetMap(new int[]{0, 1}, new long[]{0L, 0L}), TestCheckpointUtils.makeOffsetMap(new int[]{0, 1}, new long[]{100L, 500L}), (long)600L, (long)4L);
        Assertions.assertEquals((int)5, (int)ranges.length);
        OffsetRange[] mergedRanges = KafkaOffsetGen.CheckpointUtils.mergeRangesByTopicPartition((OffsetRange[])ranges);
        Assertions.assertEquals((int)2, (int)mergedRanges.length);
        Assertions.assertEquals((int)0, (int)mergedRanges[0].partition());
        Assertions.assertEquals((long)0L, (long)mergedRanges[0].fromOffset());
        Assertions.assertEquals((long)100L, (long)mergedRanges[0].untilOffset());
        Assertions.assertEquals((int)1, (int)mergedRanges[1].partition());
        Assertions.assertEquals((long)0L, (long)mergedRanges[1].fromOffset());
        Assertions.assertEquals((long)500L, (long)mergedRanges[1].untilOffset());
        ranges = KafkaOffsetGen.CheckpointUtils.computeOffsetRanges(TestCheckpointUtils.makeOffsetMap(new int[]{0}, new long[]{0L}), TestCheckpointUtils.makeOffsetMap(new int[]{0}, new long[]{1000L}), (long)300L, (long)3L);
        Assertions.assertEquals((int)3, (int)ranges.length);
        mergedRanges = KafkaOffsetGen.CheckpointUtils.mergeRangesByTopicPartition((OffsetRange[])ranges);
        Assertions.assertEquals((int)1, (int)mergedRanges.length);
        Assertions.assertEquals((long)0L, (long)mergedRanges[0].fromOffset());
        Assertions.assertEquals((long)300L, (long)mergedRanges[0].untilOffset());
    }

    @Test
    public void testNumAllocatedEventsGreaterThanNumActualEvents() {
        int[] partitions = new int[]{0, 1, 2, 3, 4};
        long[] committedOffsets = new long[]{76888767L, 76725043L, 76899767L, 76833267L, 76952055L};
        long[] latestOffsets = new long[]{77005407L, 76768151L, 76985456L, 76917973L, 77080447L};
        long numEvents = 400000L;
        long minPartitions = 20L;
        Object[] ranges = KafkaOffsetGen.CheckpointUtils.computeOffsetRanges(TestCheckpointUtils.makeOffsetMap(partitions, committedOffsets), TestCheckpointUtils.makeOffsetMap(partitions, latestOffsets), (long)numEvents, (long)minPartitions);
        long totalNewMsgs = KafkaOffsetGen.CheckpointUtils.totalNewMessages((OffsetRange[])ranges);
        Assertions.assertEquals((long)400000L, (long)totalNewMsgs);
        for (OffsetRange offsetRange2 : ranges) {
            if (offsetRange2.fromOffset() <= offsetRange2.untilOffset()) continue;
            throw new IllegalArgumentException("Invalid offset range " + offsetRange2);
        }
        long eventPerPartition = numEvents / minPartitions;
        long rangesWhereDiffIsLessThanEventsPerPartition = Arrays.stream(ranges).filter(offsetRange -> offsetRange.untilOffset() - offsetRange.fromOffset() <= eventPerPartition).count();
        Assertions.assertEquals((long)ranges.length, (long)rangesWhereDiffIsLessThanEventsPerPartition);
        Object[] expectedRanges = new OffsetRange[]{OffsetRange.apply((String)TEST_TOPIC_NAME, (int)0, (long)76888767L, (long)76908767L), OffsetRange.apply((String)TEST_TOPIC_NAME, (int)0, (long)76908767L, (long)76928767L), OffsetRange.apply((String)TEST_TOPIC_NAME, (int)0, (long)76928767L, (long)76948767L), OffsetRange.apply((String)TEST_TOPIC_NAME, (int)0, (long)76948767L, (long)76968767L), OffsetRange.apply((String)TEST_TOPIC_NAME, (int)0, (long)76968767L, (long)76988767L), OffsetRange.apply((String)TEST_TOPIC_NAME, (int)1, (long)76725043L, (long)76745043L), OffsetRange.apply((String)TEST_TOPIC_NAME, (int)1, (long)76745043L, (long)76765043L), OffsetRange.apply((String)TEST_TOPIC_NAME, (int)1, (long)76765043L, (long)76768151L), OffsetRange.apply((String)TEST_TOPIC_NAME, (int)2, (long)76899767L, (long)76919767L), OffsetRange.apply((String)TEST_TOPIC_NAME, (int)2, (long)76919767L, (long)76939767L), OffsetRange.apply((String)TEST_TOPIC_NAME, (int)2, (long)76939767L, (long)76959767L), OffsetRange.apply((String)TEST_TOPIC_NAME, (int)2, (long)76959767L, (long)76979767L), OffsetRange.apply((String)TEST_TOPIC_NAME, (int)2, (long)76979767L, (long)76985456L), OffsetRange.apply((String)TEST_TOPIC_NAME, (int)3, (long)76833267L, (long)76853267L), OffsetRange.apply((String)TEST_TOPIC_NAME, (int)3, (long)76853267L, (long)76873267L), OffsetRange.apply((String)TEST_TOPIC_NAME, (int)3, (long)76873267L, (long)76893267L), OffsetRange.apply((String)TEST_TOPIC_NAME, (int)3, (long)76893267L, (long)76913267L), OffsetRange.apply((String)TEST_TOPIC_NAME, (int)3, (long)76913267L, (long)76917973L), OffsetRange.apply((String)TEST_TOPIC_NAME, (int)4, (long)76952055L, (long)76972055L), OffsetRange.apply((String)TEST_TOPIC_NAME, (int)4, (long)76972055L, (long)76992055L), OffsetRange.apply((String)TEST_TOPIC_NAME, (int)4, (long)76992055L, (long)77012055L), OffsetRange.apply((String)TEST_TOPIC_NAME, (int)4, (long)77012055L, (long)77032055L), OffsetRange.apply((String)TEST_TOPIC_NAME, (int)4, (long)77032055L, (long)77038552L)};
        Assertions.assertArrayEquals((Object[])expectedRanges, (Object[])ranges);
    }

    @Test
    public void testNumAllocatedEventsLesserThanNumActualEvents() {
        Object[] ranges;
        int[] partitions = new int[]{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17};
        long[] committedOffsets = new long[]{788543084L, 787746335L, 788016034L, 788171708L, 788327954L, 788055939L, 788179691L, 788004145L, 788105897L, 788496138L, 788317057L, 788325907L, 788287519L, 787958075L, 788403560L, 788118894L, 788383733L, 787273821L};
        long[] latestOffsets = new long[]{788946534L, 788442557L, 788712188L, 788867819L, 789023943L, 788752030L, 788875648L, 788700234L, 788802091L, 789192155L, 789013192L, 789021874L, 788983544L, 788654092L, 789099516L, 788814985L, 789079650L, 787273821L};
        long numEvents = 10000000L;
        long minPartitions = 36L;
        for (OffsetRange offsetRange2 : ranges = KafkaOffsetGen.CheckpointUtils.computeOffsetRanges(TestCheckpointUtils.makeOffsetMap(partitions, committedOffsets), TestCheckpointUtils.makeOffsetMap(partitions, latestOffsets), (long)numEvents, (long)minPartitions)) {
            if (offsetRange2.fromOffset() <= offsetRange2.untilOffset()) continue;
            throw new IllegalArgumentException("Invalid offset range " + offsetRange2);
        }
        Assertions.assertEquals((long)10000000L, (long)KafkaOffsetGen.CheckpointUtils.totalNewMessages((OffsetRange[])ranges));
        Assertions.assertEquals((int)41, (int)ranges.length);
        long eventPerPartition = numEvents / minPartitions;
        long rangesWhereDiffIsLessThanEventsPerPartition = Arrays.stream(ranges).filter(offsetRange -> offsetRange.untilOffset() - offsetRange.fromOffset() <= eventPerPartition).count();
        Assertions.assertEquals((long)ranges.length, (long)rangesWhereDiffIsLessThanEventsPerPartition);
        Object[] expectedRanges = new OffsetRange[]{OffsetRange.apply((String)TEST_TOPIC_NAME, (int)0, (long)788543084L, (long)788820861L), OffsetRange.apply((String)TEST_TOPIC_NAME, (int)0, (long)788820861L, (long)788946534L), OffsetRange.apply((String)TEST_TOPIC_NAME, (int)1, (long)787746335L, (long)788024112L), OffsetRange.apply((String)TEST_TOPIC_NAME, (int)1, (long)788024112L, (long)788301889L), OffsetRange.apply((String)TEST_TOPIC_NAME, (int)1, (long)788301889L, (long)788442557L), OffsetRange.apply((String)TEST_TOPIC_NAME, (int)2, (long)788016034L, (long)788293811L), OffsetRange.apply((String)TEST_TOPIC_NAME, (int)2, (long)788293811L, (long)788571588L), OffsetRange.apply((String)TEST_TOPIC_NAME, (int)2, (long)788571588L, (long)788712188L), OffsetRange.apply((String)TEST_TOPIC_NAME, (int)3, (long)788171708L, (long)788449485L), OffsetRange.apply((String)TEST_TOPIC_NAME, (int)3, (long)788449485L, (long)788727262L), OffsetRange.apply((String)TEST_TOPIC_NAME, (int)3, (long)788727262L, (long)788867819L), OffsetRange.apply((String)TEST_TOPIC_NAME, (int)4, (long)788327954L, (long)788605731L), OffsetRange.apply((String)TEST_TOPIC_NAME, (int)4, (long)788605731L, (long)788883508L), OffsetRange.apply((String)TEST_TOPIC_NAME, (int)4, (long)788883508L, (long)789023943L), OffsetRange.apply((String)TEST_TOPIC_NAME, (int)5, (long)788055939L, (long)788333716L), OffsetRange.apply((String)TEST_TOPIC_NAME, (int)5, (long)788333716L, (long)788611493L), OffsetRange.apply((String)TEST_TOPIC_NAME, (int)5, (long)788611493L, (long)788752030L), OffsetRange.apply((String)TEST_TOPIC_NAME, (int)6, (long)788179691L, (long)788457468L), OffsetRange.apply((String)TEST_TOPIC_NAME, (int)6, (long)788457468L, (long)788735245L), OffsetRange.apply((String)TEST_TOPIC_NAME, (int)6, (long)788735245L, (long)788740134L), OffsetRange.apply((String)TEST_TOPIC_NAME, (int)7, (long)788004145L, (long)788281922L), OffsetRange.apply((String)TEST_TOPIC_NAME, (int)7, (long)788281922L, (long)788559699L), OffsetRange.apply((String)TEST_TOPIC_NAME, (int)8, (long)788105897L, (long)788383674L), OffsetRange.apply((String)TEST_TOPIC_NAME, (int)8, (long)788383674L, (long)788661451L), OffsetRange.apply((String)TEST_TOPIC_NAME, (int)9, (long)788496138L, (long)788773915L), OffsetRange.apply((String)TEST_TOPIC_NAME, (int)9, (long)788773915L, (long)789051692L), OffsetRange.apply((String)TEST_TOPIC_NAME, (int)10, (long)788317057L, (long)788594834L), OffsetRange.apply((String)TEST_TOPIC_NAME, (int)10, (long)788594834L, (long)788872611L), OffsetRange.apply((String)TEST_TOPIC_NAME, (int)11, (long)788325907L, (long)788603684L), OffsetRange.apply((String)TEST_TOPIC_NAME, (int)11, (long)788603684L, (long)788881461L), OffsetRange.apply((String)TEST_TOPIC_NAME, (int)12, (long)788287519L, (long)788565296L), OffsetRange.apply((String)TEST_TOPIC_NAME, (int)12, (long)788565296L, (long)788843073L), OffsetRange.apply((String)TEST_TOPIC_NAME, (int)13, (long)787958075L, (long)788235852L), OffsetRange.apply((String)TEST_TOPIC_NAME, (int)13, (long)788235852L, (long)788513629L), OffsetRange.apply((String)TEST_TOPIC_NAME, (int)14, (long)788403560L, (long)788681337L), OffsetRange.apply((String)TEST_TOPIC_NAME, (int)14, (long)788681337L, (long)788959114L), OffsetRange.apply((String)TEST_TOPIC_NAME, (int)15, (long)788118894L, (long)788396671L), OffsetRange.apply((String)TEST_TOPIC_NAME, (int)15, (long)788396671L, (long)788674448L), OffsetRange.apply((String)TEST_TOPIC_NAME, (int)16, (long)788383733L, (long)788661510L), OffsetRange.apply((String)TEST_TOPIC_NAME, (int)16, (long)788661510L, (long)788939287L), OffsetRange.apply((String)TEST_TOPIC_NAME, (int)17, (long)787273821L, (long)787273821L)};
        Assertions.assertArrayEquals((Object[])expectedRanges, (Object[])ranges);
    }

    private static Map<TopicPartition, Long> makeOffsetMap(int[] partitions, long[] offsets) {
        HashMap<TopicPartition, Long> map = new HashMap<TopicPartition, Long>();
        for (int i = 0; i < partitions.length; ++i) {
            map.put(new TopicPartition(TEST_TOPIC_NAME, partitions[i]), offsets[i]);
        }
        return map;
    }
}

