/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.utilities.sources.helpers;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.table.checkpoint.Checkpoint;
import org.apache.hudi.common.util.ConfigUtils;
import org.apache.hudi.common.util.LogicalClock;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.SystemClock;
import org.apache.hudi.common.util.VisibleForTesting;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieNotSupportedException;
import org.apache.hudi.utilities.config.KafkaSourceConfig;
import org.apache.hudi.utilities.exception.HoodieStreamerException;
import org.apache.hudi.utilities.ingestion.HoodieIngestionMetrics;
import org.apache.hudi.utilities.sources.HoodieRetryingKafkaConsumer;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.Config;
import org.apache.kafka.clients.admin.ConfigEntry;
import org.apache.kafka.clients.admin.DescribeConfigsResult;
import org.apache.kafka.clients.consumer.CommitFailedException;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.consumer.OffsetAndTimestamp;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.config.ConfigResource;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.spark.streaming.kafka010.OffsetRange;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class KafkaOffsetGen {
    private static final Logger LOG = LoggerFactory.getLogger(KafkaOffsetGen.class);
    private static final String METRIC_NAME_KAFKA_DELAY_COUNT = "kafkaDelayCount";
    private static final Comparator<OffsetRange> SORT_BY_PARTITION = Comparator.comparing(OffsetRange::partition);
    private final Map<String, Object> kafkaParams;
    private final TypedProperties props;
    protected final String topicName;
    private KafkaSourceConfig.KafkaResetOffsetStrategies autoResetValue;
    private final String kafkaCheckpointType;
    private final LogicalClock clock;

    public KafkaOffsetGen(TypedProperties props, LogicalClock clock) {
        this.props = props;
        this.kafkaParams = KafkaOffsetGen.excludeHoodieConfigs(props);
        ConfigUtils.checkRequiredConfigProperties((TypedProperties)props, Collections.singletonList(KafkaSourceConfig.KAFKA_TOPIC_NAME));
        this.topicName = ConfigUtils.getStringWithAltKeys((Properties)props, KafkaSourceConfig.KAFKA_TOPIC_NAME);
        this.kafkaCheckpointType = ConfigUtils.getStringWithAltKeys((Properties)props, KafkaSourceConfig.KAFKA_CHECKPOINT_TYPE, (boolean)true);
        String kafkaAutoResetOffsetsStr = props.getString(KafkaSourceConfig.KAFKA_AUTO_OFFSET_RESET.key(), ((KafkaSourceConfig.KafkaResetOffsetStrategies)((Object)KafkaSourceConfig.KAFKA_AUTO_OFFSET_RESET.defaultValue())).name().toLowerCase());
        boolean found = false;
        for (KafkaSourceConfig.KafkaResetOffsetStrategies entry : KafkaSourceConfig.KafkaResetOffsetStrategies.values()) {
            if (!entry.name().toLowerCase().equals(kafkaAutoResetOffsetsStr)) continue;
            found = true;
            this.autoResetValue = entry;
            break;
        }
        if (!found) {
            throw new HoodieStreamerException(KafkaSourceConfig.KAFKA_AUTO_OFFSET_RESET.key() + " config set to unknown value " + kafkaAutoResetOffsetsStr);
        }
        if (this.autoResetValue.equals((Object)KafkaSourceConfig.KafkaResetOffsetStrategies.GROUP)) {
            this.kafkaParams.put(KafkaSourceConfig.KAFKA_AUTO_OFFSET_RESET.key(), ((KafkaSourceConfig.KafkaResetOffsetStrategies)((Object)KafkaSourceConfig.KAFKA_AUTO_OFFSET_RESET.defaultValue())).name().toLowerCase());
        }
        this.clock = clock;
    }

    public KafkaOffsetGen(TypedProperties props) {
        this(props, (LogicalClock)new SystemClock());
    }

    public OffsetRange[] getNextOffsetRanges(Option<Checkpoint> lastCheckpoint, long sourceLimit, HoodieIngestionMetrics metrics) {
        long numEvents;
        long maxEventsToReadFromKafka = ConfigUtils.getLongWithAltKeys((TypedProperties)this.props, KafkaSourceConfig.MAX_EVENTS_FROM_KAFKA_SOURCE);
        if (sourceLimit == Long.MAX_VALUE) {
            numEvents = maxEventsToReadFromKafka;
            LOG.info("SourceLimit not configured, set numEvents to default value : {}", (Object)maxEventsToReadFromKafka);
        } else {
            numEvents = sourceLimit;
        }
        long minPartitions = ConfigUtils.getLongWithAltKeys((TypedProperties)this.props, KafkaSourceConfig.KAFKA_SOURCE_MIN_PARTITIONS);
        LOG.info("getNextOffsetRanges set config {} to {}", (Object)KafkaSourceConfig.KAFKA_SOURCE_MIN_PARTITIONS.key(), (Object)minPartitions);
        return this.getNextOffsetRanges(lastCheckpoint, numEvents, minPartitions, metrics);
    }

    public OffsetRange[] getNextOffsetRanges(Option<Checkpoint> lastCheckpoint, long numEvents, long minPartitions, HoodieIngestionMetrics metrics) {
        Map toOffsets;
        Map<TopicPartition, Long> fromOffsets;
        try (HoodieRetryingKafkaConsumer consumer = new HoodieRetryingKafkaConsumer(this.props, this.kafkaParams);){
            Option lastCheckpointStr;
            if (!this.checkTopicExists(consumer)) {
                throw new HoodieException("Kafka topic:" + this.topicName + " does not exist");
            }
            List<PartitionInfo> partitionInfoList = this.fetchPartitionInfos(consumer, this.topicName);
            Set<TopicPartition> topicPartitions = partitionInfoList.stream().map(x -> new TopicPartition(x.topic(), x.partition())).collect(Collectors.toSet());
            if ("timestamp".equalsIgnoreCase(this.kafkaCheckpointType) && this.isValidTimestampCheckpointType(lastCheckpoint).booleanValue()) {
                lastCheckpointStr = this.getOffsetsByTimestamp(consumer, partitionInfoList, topicPartitions, this.topicName, Long.parseLong(((Checkpoint)lastCheckpoint.get()).getCheckpointKey()));
            } else {
                if ("single_offset".equalsIgnoreCase(this.kafkaCheckpointType) && partitionInfoList.size() != 1) {
                    throw new HoodieException("Kafka topic " + this.topicName + " has " + partitionInfoList.size() + " partitions (more than 1). single_offset checkpoint type is not applicable.");
                }
                if ("single_offset".equalsIgnoreCase(this.kafkaCheckpointType) && partitionInfoList.size() == 1 && this.isValidOffsetCheckpointType(lastCheckpoint).booleanValue()) {
                    lastCheckpointStr = Option.of((Object)(this.topicName + ",0:" + ((Checkpoint)lastCheckpoint.get()).getCheckpointKey()));
                } else {
                    Option option = lastCheckpointStr = lastCheckpoint.isPresent() ? Option.of((Object)((Checkpoint)lastCheckpoint.get()).getCheckpointKey()) : Option.empty();
                }
            }
            if (lastCheckpointStr.isPresent() && !((String)lastCheckpointStr.get()).isEmpty() && CheckpointUtils.checkTopicCheckpoint(lastCheckpointStr)) {
                fromOffsets = this.fetchValidOffsets(consumer, lastCheckpointStr, topicPartitions);
                metrics.updateStreamerSourceDelayCount(METRIC_NAME_KAFKA_DELAY_COUNT, this.delayOffsetCalculation((Option<String>)lastCheckpointStr, topicPartitions, consumer));
            } else {
                switch (this.autoResetValue) {
                    case EARLIEST: {
                        Map earliestOffsets = consumer.beginningOffsets(topicPartitions);
                        fromOffsets = this.resolveFromOffsetsWithRetention(consumer, earliestOffsets, topicPartitions);
                        break;
                    }
                    case LATEST: {
                        fromOffsets = consumer.endOffsets(topicPartitions);
                        break;
                    }
                    case GROUP: {
                        fromOffsets = this.getGroupOffsets(consumer, topicPartitions);
                        break;
                    }
                    default: {
                        throw new HoodieNotSupportedException("Auto reset value must be one of 'earliest' or 'latest' or 'group' ");
                    }
                }
            }
            toOffsets = consumer.endOffsets(topicPartitions);
        }
        return CheckpointUtils.computeOffsetRanges(fromOffsets, toOffsets, numEvents, minPartitions);
    }

    private List<PartitionInfo> fetchPartitionInfos(KafkaConsumer consumer, String topicName) {
        List partitionInfos;
        if (ConfigUtils.containsConfigProperty((TypedProperties)this.props, KafkaSourceConfig.KAFKA_FETCH_PARTITION_TIME_OUT)) {
            LOG.warn("{} is deprecated and is not taking effect anymore. Use {}, {} and {} for setting up retrying configuration of KafkaConsumer", new Object[]{KafkaSourceConfig.KAFKA_FETCH_PARTITION_TIME_OUT.key(), KafkaSourceConfig.INITIAL_RETRY_INTERVAL_MS.key(), KafkaSourceConfig.MAX_RETRY_INTERVAL_MS.key(), KafkaSourceConfig.MAX_RETRY_COUNT.key()});
        }
        if ((partitionInfos = consumer.partitionsFor(topicName)) == null) {
            throw new HoodieStreamerException(String.format("Can not find metadata for topic %s from kafka cluster", topicName));
        }
        return partitionInfos;
    }

    private Map<TopicPartition, Long> fetchValidOffsets(KafkaConsumer consumer, Option<String> lastCheckpointStr, Set<TopicPartition> topicPartitions) {
        boolean isCheckpointOutOfBounds;
        Map earliestOffsets = consumer.beginningOffsets(topicPartitions);
        Map<TopicPartition, Long> checkpointOffsets = CheckpointUtils.strToOffsets((String)lastCheckpointStr.get());
        List outOfBoundPartitionList = checkpointOffsets.entrySet().stream().filter(offset -> (Long)offset.getValue() < (Long)earliestOffsets.get(offset.getKey())).map(Map.Entry::getKey).collect(Collectors.toList());
        boolean bl = isCheckpointOutOfBounds = !outOfBoundPartitionList.isEmpty();
        if (isCheckpointOutOfBounds) {
            String outOfBoundOffsets = outOfBoundPartitionList.stream().map(p -> p.toString() + ":{checkpoint=" + checkpointOffsets.get(p) + ",earliestOffset=" + earliestOffsets.get(p) + "}").collect(Collectors.joining(","));
            String message = "Some data may have been lost because they are not available in Kafka any more; either the data was aged out by Kafka or the topic may have been deleted before all the data in the topic was processed. Kafka partitions that have out-of-bound checkpoints: " + outOfBoundOffsets + " .";
            if (ConfigUtils.getBooleanWithAltKeys((Properties)this.props, KafkaSourceConfig.ENABLE_FAIL_ON_DATA_LOSS)) {
                throw new HoodieStreamerException(message);
            }
            LOG.warn("{} If you want Hudi Streamer to fail on such cases, set \"{}\" to \"true\".", (Object)message, (Object)KafkaSourceConfig.ENABLE_FAIL_ON_DATA_LOSS.key());
        }
        return isCheckpointOutOfBounds ? this.resolveFromOffsetsWithRetention(consumer, earliestOffsets, topicPartitions) : checkpointOffsets;
    }

    private Boolean isValidTimestampCheckpointType(Option<Checkpoint> lastCheckpoint) {
        if (!lastCheckpoint.isPresent()) {
            return false;
        }
        String checkpointStr = ((Checkpoint)lastCheckpoint.get()).getCheckpointKey();
        Pattern pattern = Pattern.compile("[-+]?[0-9]+(\\.[0-9]+)?");
        Matcher isNum = pattern.matcher(checkpointStr);
        return isNum.matches() && (checkpointStr.length() == 13 || checkpointStr.length() == 10);
    }

    private Boolean isValidOffsetCheckpointType(Option<Checkpoint> lastCheckpointStr) {
        if (!lastCheckpointStr.isPresent()) {
            return false;
        }
        try {
            Long.parseUnsignedLong(((Checkpoint)lastCheckpointStr.get()).getCheckpointKey());
            return true;
        }
        catch (NumberFormatException ex) {
            LOG.warn("Checkpoint type is set to single_offset, but provided value of checkpoint=\"{}\" is not a valid number", lastCheckpointStr.get());
            return false;
        }
    }

    private Long delayOffsetCalculation(Option<String> lastCheckpointStr, Set<TopicPartition> topicPartitions, KafkaConsumer consumer) {
        Long delayCount = 0L;
        Map<TopicPartition, Long> checkpointOffsets = CheckpointUtils.strToOffsets((String)lastCheckpointStr.get());
        Map lastOffsets = consumer.endOffsets(topicPartitions);
        for (Map.Entry entry : lastOffsets.entrySet()) {
            Long offset = checkpointOffsets.getOrDefault(entry.getKey(), 0L);
            delayCount = delayCount + ((Long)entry.getValue() - offset > 0L ? (Long)entry.getValue() - offset : 0L);
        }
        return delayCount;
    }

    private Option<String> getOffsetsByTimestamp(KafkaConsumer consumer, List<PartitionInfo> partitionInfoList, Set<TopicPartition> topicPartitions, String topicName, Long timestamp) {
        Map topicPartitionsTimestamp = partitionInfoList.stream().map(x -> new TopicPartition(x.topic(), x.partition())).collect(Collectors.toMap(Function.identity(), x -> timestamp));
        Map earliestOffsets = consumer.beginningOffsets(topicPartitions);
        Map offsetAndTimestamp = consumer.offsetsForTimes(topicPartitionsTimestamp);
        StringBuilder sb = new StringBuilder(topicName);
        for (Map.Entry map : offsetAndTimestamp.entrySet()) {
            if (map.getValue() != null) {
                sb.append(",").append(((TopicPartition)map.getKey()).partition()).append(":").append(((OffsetAndTimestamp)map.getValue()).offset());
                continue;
            }
            sb.append(",").append(((TopicPartition)map.getKey()).partition()).append(":").append(earliestOffsets.get(map.getKey()));
        }
        return Option.of((Object)sb.toString());
    }

    @VisibleForTesting
    Map<TopicPartition, Long> resolveFromOffsetsWithRetention(KafkaConsumer consumer, Map<TopicPartition, Long> fromOffsets, Set<TopicPartition> topicPartitions) {
        try {
            long offsetSkipIntervalMinutes = ConfigUtils.getLongWithAltKeys((TypedProperties)this.props, KafkaSourceConfig.OFFSET_SKIP_BUFFER_MINUTES);
            if (offsetSkipIntervalMinutes <= 0L) {
                LOG.debug("Not modifying fromOffsets as {} is not configured or set to a value <= 0", (Object)KafkaSourceConfig.OFFSET_SKIP_BUFFER_MINUTES.key());
                return fromOffsets;
            }
            Long retentionMs = this.getTopicRetentionMs(this.getTopicName());
            if (retentionMs == null || retentionMs <= 0L) {
                LOG.debug("Not modifying fromOffsets as topic {} retention is missing or set to a value <= 0", (Object)this.getTopicName());
                return fromOffsets;
            }
            long retentionTs = this.clock.currentEpoch() - retentionMs + TimeUnit.MINUTES.toMillis(offsetSkipIntervalMinutes);
            Map topicPartitionsTimestamp = topicPartitions.stream().collect(Collectors.toMap(Function.identity(), v -> retentionTs));
            Map offsetAndTimestamp = consumer.offsetsForTimes(topicPartitionsTimestamp);
            List nullPartitions = offsetAndTimestamp.entrySet().stream().filter(entry -> entry.getValue() == null).map(Map.Entry::getKey).collect(Collectors.toList());
            if (!nullPartitions.isEmpty()) {
                LOG.warn("OffsetAndTimestamp not available for partitions: {} since {}", nullPartitions, (Object)retentionTs);
            }
            HashMap skippedOffsetsPerPartition = new HashMap();
            Map<TopicPartition, Long> newFromOffsets = offsetAndTimestamp.entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, entry -> {
                Long offset = (Long)fromOffsets.get(entry.getKey());
                long newOffset = entry.getValue() == null ? offset : Math.max(((OffsetAndTimestamp)entry.getValue()).offset(), offset);
                skippedOffsetsPerPartition.put(entry.getKey(), Math.max(newOffset - offset, 0L));
                return newOffset;
            }));
            LOG.warn("Adjusted fromOffsets with retention; oldFromOffsets: {}, newFromOffsets: {}, skippedOffsetsPerPartition: {}", new Object[]{fromOffsets, newFromOffsets, skippedOffsetsPerPartition});
            return newFromOffsets;
        }
        catch (KafkaException e) {
            LOG.error("Error resolving fromOffsets with retention, falling back to fromOffsets", (Throwable)e);
            return fromOffsets;
        }
    }

    public boolean checkTopicExists(KafkaConsumer consumer) {
        Map result = consumer.listTopics();
        return result.containsKey(this.topicName);
    }

    public String getTopicName() {
        return this.topicName;
    }

    public Map<String, Object> getKafkaParams() {
        return this.kafkaParams;
    }

    public static Map<String, Object> excludeHoodieConfigs(TypedProperties props) {
        HashMap<String, Object> kafkaParams = new HashMap<String, Object>();
        props.keySet().stream().filter(prop -> !prop.toString().startsWith("hoodie.") || prop.toString().startsWith("hoodie.streamer.source.kafka.value.deserializer.") || prop.toString().startsWith("hoodie.deltastreamer.source.kafka.value.deserializer.")).forEach(prop -> kafkaParams.put(prop.toString(), props.get((Object)prop.toString())));
        return kafkaParams;
    }

    public void commitOffsetToKafka(String checkpointStr) {
        ConfigUtils.checkRequiredProperties((TypedProperties)this.props, Collections.singletonList("group.id"));
        Map<TopicPartition, Long> offsetMap = CheckpointUtils.strToOffsets(checkpointStr);
        HashMap offsetAndMetadataMap = new HashMap(offsetMap.size());
        try (HoodieRetryingKafkaConsumer consumer = new HoodieRetryingKafkaConsumer(this.props, this.kafkaParams);){
            offsetMap.forEach((topicPartition, offset) -> offsetAndMetadataMap.put(topicPartition, new OffsetAndMetadata(offset.longValue())));
            consumer.commitSync(offsetAndMetadataMap);
        }
        catch (CommitFailedException | TimeoutException e) {
            LOG.warn("Committing offsets to Kafka failed, this does not impact processing of records", e);
        }
    }

    private Map<TopicPartition, Long> getGroupOffsets(KafkaConsumer consumer, Set<TopicPartition> topicPartitions) {
        Map<Object, Long> fromOffsets = new HashMap<TopicPartition, Long>();
        for (TopicPartition topicPartition : topicPartitions) {
            OffsetAndMetadata committedOffsetAndMetadata = consumer.committed(topicPartition);
            if (committedOffsetAndMetadata != null) {
                fromOffsets.put(topicPartition, committedOffsetAndMetadata.offset());
                continue;
            }
            LOG.warn("There are no commits associated with this consumer group, starting to consume from latest offset");
            fromOffsets = consumer.endOffsets(topicPartitions);
            break;
        }
        return fromOffsets;
    }

    /*
     * Enabled aggressive block sorting
     * Enabled unnecessary exception pruning
     * Enabled aggressive exception aggregation
     */
    @VisibleForTesting
    Long getTopicRetentionMs(String topicName) {
        try (AdminClient client = AdminClient.create(this.getKafkaParams());){
            ConfigResource configResource = new ConfigResource(ConfigResource.Type.TOPIC, topicName);
            DescribeConfigsResult configsResult = client.describeConfigs(Collections.singleton(configResource));
            Config topicConfigs = (Config)((Map)configsResult.all().get()).get(configResource);
            ConfigEntry retentionConfig = topicConfigs.get("retention.ms");
            if (retentionConfig == null || retentionConfig.value() == null) {
                LOG.info("{} config missing for topic {}", (Object)"retention.ms", (Object)topicName);
                Long l2 = null;
                return l2;
            }
            Long l = Long.parseLong(retentionConfig.value());
            return l;
        }
        catch (ExecutionException | KafkaException e) {
            LOG.error("Error getting retention config for topic {}", (Object)topicName, (Object)e);
            return null;
        }
        catch (InterruptedException ex) {
            LOG.error("Interrupted while fetching topic {} configuration", (Object)topicName);
            Thread.currentThread().interrupt();
            return null;
        }
    }

    public static class CheckpointUtils {
        private static final Pattern PATTERN = Pattern.compile(".*,.*:.*");

        public static Map<TopicPartition, Long> strToOffsets(String checkpointStr) {
            HashMap<TopicPartition, Long> offsetMap = new HashMap<TopicPartition, Long>();
            String[] splits = checkpointStr.split(",");
            String topic = splits[0];
            for (int i = 1; i < splits.length; ++i) {
                String[] subSplits = splits[i].split(":");
                offsetMap.put(new TopicPartition(topic, Integer.parseInt(subSplits[0])), Long.parseLong(subSplits[1]));
            }
            return offsetMap;
        }

        public static String offsetsToStr(OffsetRange[] ranges) {
            ranges = CheckpointUtils.mergeRangesByTopicPartition(ranges);
            return ranges[0].topic() + "," + Arrays.stream(ranges).map(r -> String.format("%s:%d", r.partition(), r.untilOffset())).collect(Collectors.joining(","));
        }

        public static OffsetRange[] computeOffsetRanges(Map<TopicPartition, Long> fromOffsetMap, Map<TopicPartition, Long> toOffsetMap, long numEvents, long minPartitions) {
            OffsetRange[] ranges = toOffsetMap.keySet().stream().map(tp -> {
                long fromOffset = fromOffsetMap.getOrDefault(tp, 0L);
                return OffsetRange.create((TopicPartition)tp, (long)fromOffset, (long)((Long)toOffsetMap.get(tp)));
            }).sorted(SORT_BY_PARTITION).collect(Collectors.toList()).toArray(new OffsetRange[toOffsetMap.size()]);
            LOG.debug("numEvents {}, minPartitions {}, ranges {}", new Object[]{numEvents, minPartitions, ranges});
            long actualNumEvents = Math.min(CheckpointUtils.totalNewMessages(ranges), numEvents);
            minPartitions = Math.max(minPartitions, (long)toOffsetMap.size());
            long eventsPerPartition = Math.max(1L, actualNumEvents / minPartitions);
            long allocatedEvents = 0L;
            HashMap<TopicPartition, List> finalRanges = new HashMap<TopicPartition, List>();
            HashMap<TopicPartition, Long> partitionToAllocatedOffset = new HashMap<TopicPartition, Long>();
            block0: while (allocatedEvents < actualNumEvents) {
                for (OffsetRange range : ranges) {
                    if (allocatedEvents == actualNumEvents) continue block0;
                    long startOffset = range.fromOffset();
                    if (partitionToAllocatedOffset.containsKey(range.topicPartition())) {
                        startOffset = (Long)partitionToAllocatedOffset.get(range.topicPartition());
                    }
                    long eventsForThisPartition = Math.min(eventsPerPartition, actualNumEvents - allocatedEvents);
                    long toOffset = -1L;
                    toOffset = startOffset + eventsForThisPartition > startOffset ? Math.min(range.untilOffset(), startOffset + eventsForThisPartition) : range.untilOffset();
                    allocatedEvents += toOffset - startOffset;
                    OffsetRange thisRange = OffsetRange.create((TopicPartition)range.topicPartition(), (long)startOffset, (long)toOffset);
                    if (!finalRanges.containsKey(range.topicPartition())) {
                        finalRanges.put(range.topicPartition(), new ArrayList<OffsetRange>(Collections.singleton(thisRange)));
                        partitionToAllocatedOffset.put(range.topicPartition(), thisRange.untilOffset());
                        continue;
                    }
                    if (toOffset <= startOffset) continue;
                    ((List)finalRanges.get(range.topicPartition())).add(thisRange);
                    partitionToAllocatedOffset.put(range.topicPartition(), thisRange.untilOffset());
                }
            }
            Map<TopicPartition, List> missedRanges = fromOffsetMap.entrySet().stream().filter(kv -> !finalRanges.containsKey(kv.getKey())).map(kv -> Pair.of(kv.getKey(), Collections.singletonList(OffsetRange.create((TopicPartition)((TopicPartition)kv.getKey()), (long)((Long)kv.getValue()), (long)((Long)kv.getValue()))))).collect(Collectors.toMap(Pair::getKey, Pair::getValue));
            finalRanges.putAll(missedRanges);
            Object[] sortedRangeArray = (OffsetRange[])finalRanges.values().stream().flatMap(Collection::stream).sorted(SORT_BY_PARTITION).toArray(OffsetRange[]::new);
            if (actualNumEvents == 0L) {
                sortedRangeArray = ranges;
            }
            LOG.info("final ranges {}", (Object)Arrays.toString(sortedRangeArray));
            return sortedRangeArray;
        }

        public static OffsetRange[] mergeRangesByTopicPartition(OffsetRange[] oldRanges) {
            ArrayList<OffsetRange> newRanges = new ArrayList<OffsetRange>();
            Map<TopicPartition, List<OffsetRange>> tpOffsets = Arrays.stream(oldRanges).collect(Collectors.groupingBy(OffsetRange::topicPartition));
            for (Map.Entry<TopicPartition, List<OffsetRange>> entry : tpOffsets.entrySet()) {
                long from = entry.getValue().stream().map(OffsetRange::fromOffset).min(Long::compare).get();
                long until = entry.getValue().stream().map(OffsetRange::untilOffset).max(Long::compare).get();
                newRanges.add(OffsetRange.create((TopicPartition)entry.getKey(), (long)from, (long)until));
            }
            newRanges.sort(SORT_BY_PARTITION);
            return newRanges.toArray(new OffsetRange[0]);
        }

        public static long totalNewMessages(OffsetRange[] ranges) {
            return Arrays.stream(ranges).mapToLong(OffsetRange::count).sum();
        }

        public static boolean checkTopicCheckpoint(Option<String> lastCheckpointStr) {
            Matcher matcher = PATTERN.matcher((CharSequence)lastCheckpointStr.get());
            return matcher.matches();
        }
    }
}

