/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.utilities.sources.helpers;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.util.ConfigUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieNotSupportedException;
import org.apache.hudi.utilities.config.KafkaSourceConfig;
import org.apache.hudi.utilities.exception.HoodieStreamerException;
import org.apache.hudi.utilities.ingestion.HoodieIngestionMetrics;
import org.apache.kafka.clients.consumer.CommitFailedException;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.consumer.OffsetAndTimestamp;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.spark.streaming.kafka010.OffsetRange;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class KafkaOffsetGen {
    private static final Logger LOG = LoggerFactory.getLogger(KafkaOffsetGen.class);
    private static final String METRIC_NAME_KAFKA_DELAY_COUNT = "kafkaDelayCount";
    private static final Comparator<OffsetRange> SORT_BY_PARTITION = Comparator.comparing(OffsetRange::partition);
    public static final String KAFKA_CHECKPOINT_TYPE_TIMESTAMP = "timestamp";
    private final Map<String, Object> kafkaParams;
    private final TypedProperties props;
    protected final String topicName;
    private KafkaSourceConfig.KafkaResetOffsetStrategies autoResetValue;
    private final String kafkaCheckpointType;

    public KafkaOffsetGen(TypedProperties props) {
        this.props = props;
        this.kafkaParams = this.excludeHoodieConfigs(props);
        ConfigUtils.checkRequiredConfigProperties(props, Collections.singletonList(KafkaSourceConfig.KAFKA_TOPIC_NAME));
        this.topicName = ConfigUtils.getStringWithAltKeys(props, KafkaSourceConfig.KAFKA_TOPIC_NAME);
        this.kafkaCheckpointType = ConfigUtils.getStringWithAltKeys(props, KafkaSourceConfig.KAFKA_CHECKPOINT_TYPE, true);
        String kafkaAutoResetOffsetsStr = props.getString(KafkaSourceConfig.KAFKA_AUTO_OFFSET_RESET.key(), KafkaSourceConfig.KAFKA_AUTO_OFFSET_RESET.defaultValue().name().toLowerCase());
        boolean found = false;
        for (KafkaSourceConfig.KafkaResetOffsetStrategies entry : KafkaSourceConfig.KafkaResetOffsetStrategies.values()) {
            if (!entry.name().toLowerCase().equals(kafkaAutoResetOffsetsStr)) continue;
            found = true;
            this.autoResetValue = entry;
            break;
        }
        if (!found) {
            throw new HoodieStreamerException(KafkaSourceConfig.KAFKA_AUTO_OFFSET_RESET.key() + " config set to unknown value " + kafkaAutoResetOffsetsStr);
        }
        if (this.autoResetValue.equals((Object)KafkaSourceConfig.KafkaResetOffsetStrategies.GROUP)) {
            this.kafkaParams.put(KafkaSourceConfig.KAFKA_AUTO_OFFSET_RESET.key(), KafkaSourceConfig.KAFKA_AUTO_OFFSET_RESET.defaultValue().name().toLowerCase());
        }
    }

    public OffsetRange[] getNextOffsetRanges(Option<String> lastCheckpointStr, long sourceLimit, HoodieIngestionMetrics metrics) {
        long numEvents;
        long maxEventsToReadFromKafka = ConfigUtils.getLongWithAltKeys(this.props, KafkaSourceConfig.MAX_EVENTS_FROM_KAFKA_SOURCE);
        if (sourceLimit == Long.MAX_VALUE) {
            numEvents = maxEventsToReadFromKafka;
            LOG.info("SourceLimit not configured, set numEvents to default value : " + maxEventsToReadFromKafka);
        } else {
            numEvents = sourceLimit;
        }
        long minPartitions = ConfigUtils.getLongWithAltKeys(this.props, KafkaSourceConfig.KAFKA_SOURCE_MIN_PARTITIONS);
        LOG.info("getNextOffsetRanges set config " + KafkaSourceConfig.KAFKA_SOURCE_MIN_PARTITIONS.key() + " to " + minPartitions);
        return this.getNextOffsetRanges(lastCheckpointStr, numEvents, minPartitions, metrics);
    }

    public OffsetRange[] getNextOffsetRanges(Option<String> lastCheckpointStr, long numEvents, long minPartitions, HoodieIngestionMetrics metrics) {
        Map<TopicPartition, Long> toOffsets;
        Map<TopicPartition, Long> fromOffsets;
        try (KafkaConsumer consumer = new KafkaConsumer(this.kafkaParams);){
            if (!this.checkTopicExists(consumer)) {
                throw new HoodieException("Kafka topic:" + this.topicName + " does not exist");
            }
            List<PartitionInfo> partitionInfoList = this.fetchPartitionInfos(consumer, this.topicName);
            Set<TopicPartition> topicPartitions = partitionInfoList.stream().map(x -> new TopicPartition(x.topic(), x.partition())).collect(Collectors.toSet());
            if (KAFKA_CHECKPOINT_TYPE_TIMESTAMP.equals(this.kafkaCheckpointType) && this.isValidTimestampCheckpointType(lastCheckpointStr).booleanValue()) {
                lastCheckpointStr = this.getOffsetsByTimestamp(consumer, partitionInfoList, topicPartitions, this.topicName, Long.parseLong(lastCheckpointStr.get()));
            }
            if (lastCheckpointStr.isPresent() && !lastCheckpointStr.get().isEmpty() && CheckpointUtils.checkTopicCheckpoint(lastCheckpointStr)) {
                fromOffsets = this.fetchValidOffsets(consumer, lastCheckpointStr, topicPartitions);
                metrics.updateStreamerSourceDelayCount(METRIC_NAME_KAFKA_DELAY_COUNT, this.delayOffsetCalculation(lastCheckpointStr, topicPartitions, consumer));
            } else {
                switch (this.autoResetValue) {
                    case EARLIEST: {
                        fromOffsets = consumer.beginningOffsets(topicPartitions);
                        break;
                    }
                    case LATEST: {
                        fromOffsets = consumer.endOffsets(topicPartitions);
                        break;
                    }
                    case GROUP: {
                        fromOffsets = this.getGroupOffsets(consumer, topicPartitions);
                        break;
                    }
                    default: {
                        throw new HoodieNotSupportedException("Auto reset value must be one of 'earliest' or 'latest' or 'group' ");
                    }
                }
            }
            toOffsets = consumer.endOffsets(topicPartitions);
        }
        return CheckpointUtils.computeOffsetRanges(fromOffsets, toOffsets, numEvents, minPartitions);
    }

    private List<PartitionInfo> fetchPartitionInfos(KafkaConsumer consumer, String topicName) {
        List<PartitionInfo> partitionInfos;
        long timeout = ConfigUtils.getLongWithAltKeys(this.props, KafkaSourceConfig.KAFKA_FETCH_PARTITION_TIME_OUT);
        long start2 = System.currentTimeMillis();
        do {
            partitionInfos = consumer.partitionsFor(topicName);
            try {
                if (partitionInfos != null) continue;
                TimeUnit.SECONDS.sleep(10L);
            }
            catch (InterruptedException e) {
                LOG.error("Sleep failed while fetching partitions");
            }
        } while (partitionInfos == null && System.currentTimeMillis() <= start2 + timeout);
        if (partitionInfos == null) {
            throw new HoodieStreamerException(String.format("Can not find metadata for topic %s from kafka cluster", topicName));
        }
        return partitionInfos;
    }

    private Map<TopicPartition, Long> fetchValidOffsets(KafkaConsumer consumer, Option<String> lastCheckpointStr, Set<TopicPartition> topicPartitions) {
        boolean isCheckpointOutOfBounds;
        Map<TopicPartition, Long> earliestOffsets = consumer.beginningOffsets(topicPartitions);
        Map<TopicPartition, Long> checkpointOffsets = CheckpointUtils.strToOffsets(lastCheckpointStr.get());
        List outOfBoundPartitionList = checkpointOffsets.entrySet().stream().filter(offset -> (Long)offset.getValue() < (Long)earliestOffsets.get(offset.getKey())).map(Map.Entry::getKey).collect(Collectors.toList());
        boolean bl = isCheckpointOutOfBounds = !outOfBoundPartitionList.isEmpty();
        if (isCheckpointOutOfBounds) {
            String outOfBoundOffsets = outOfBoundPartitionList.stream().map(p -> p.toString() + ":{checkpoint=" + checkpointOffsets.get(p) + ",earliestOffset=" + earliestOffsets.get(p) + "}").collect(Collectors.joining(","));
            String message = "Some data may have been lost because they are not available in Kafka any more; either the data was aged out by Kafka or the topic may have been deleted before all the data in the topic was processed. Kafka partitions that have out-of-bound checkpoints: " + outOfBoundOffsets + " .";
            if (ConfigUtils.getBooleanWithAltKeys(this.props, KafkaSourceConfig.ENABLE_FAIL_ON_DATA_LOSS)) {
                throw new HoodieStreamerException(message);
            }
            LOG.warn(message + " If you want Hudi Streamer to fail on such cases, set \"" + KafkaSourceConfig.ENABLE_FAIL_ON_DATA_LOSS.key() + "\" to \"true\".");
        }
        return isCheckpointOutOfBounds ? earliestOffsets : checkpointOffsets;
    }

    private Boolean isValidTimestampCheckpointType(Option<String> lastCheckpointStr) {
        if (!lastCheckpointStr.isPresent()) {
            return false;
        }
        Pattern pattern = Pattern.compile("[-+]?[0-9]+(\\.[0-9]+)?");
        Matcher isNum = pattern.matcher(lastCheckpointStr.get());
        return isNum.matches() && (lastCheckpointStr.get().length() == 13 || lastCheckpointStr.get().length() == 10);
    }

    private Long delayOffsetCalculation(Option<String> lastCheckpointStr, Set<TopicPartition> topicPartitions, KafkaConsumer consumer) {
        Long delayCount = 0L;
        Map<TopicPartition, Long> checkpointOffsets = CheckpointUtils.strToOffsets(lastCheckpointStr.get());
        Map<TopicPartition, Long> lastOffsets = consumer.endOffsets(topicPartitions);
        for (Map.Entry<TopicPartition, Long> entry : lastOffsets.entrySet()) {
            Long offect = checkpointOffsets.getOrDefault(entry.getKey(), 0L);
            delayCount = delayCount + (entry.getValue() - offect > 0L ? entry.getValue() - offect : 0L);
        }
        return delayCount;
    }

    private Option<String> getOffsetsByTimestamp(KafkaConsumer consumer, List<PartitionInfo> partitionInfoList, Set<TopicPartition> topicPartitions, String topicName, Long timestamp) {
        Map<TopicPartition, Long> topicPartitionsTimestamp = partitionInfoList.stream().map(x -> new TopicPartition(x.topic(), x.partition())).collect(Collectors.toMap(Function.identity(), x -> timestamp));
        Map<TopicPartition, Long> earliestOffsets = consumer.beginningOffsets(topicPartitions);
        Map<TopicPartition, OffsetAndTimestamp> offsetAndTimestamp = consumer.offsetsForTimes(topicPartitionsTimestamp);
        StringBuilder sb = new StringBuilder();
        sb.append(topicName + ",");
        for (Map.Entry<TopicPartition, OffsetAndTimestamp> map : offsetAndTimestamp.entrySet()) {
            if (map.getValue() != null) {
                sb.append(map.getKey().partition()).append(":").append(map.getValue().offset()).append(",");
                continue;
            }
            sb.append(map.getKey().partition()).append(":").append(earliestOffsets.get(map.getKey())).append(",");
        }
        return Option.of(sb.deleteCharAt(sb.length() - 1).toString());
    }

    public boolean checkTopicExists(KafkaConsumer consumer) {
        Map<String, List<PartitionInfo>> result = consumer.listTopics();
        return result.containsKey(this.topicName);
    }

    public String getTopicName() {
        return this.topicName;
    }

    public Map<String, Object> getKafkaParams() {
        return this.kafkaParams;
    }

    private Map<String, Object> excludeHoodieConfigs(TypedProperties props) {
        HashMap<String, Object> kafkaParams = new HashMap<String, Object>();
        props.keySet().stream().filter(prop -> !prop.toString().startsWith("hoodie.") || prop.toString().startsWith("hoodie.streamer.source.kafka.value.deserializer.") || prop.toString().startsWith("hoodie.deltastreamer.source.kafka.value.deserializer.")).forEach(prop -> kafkaParams.put(prop.toString(), props.get(prop.toString())));
        return kafkaParams;
    }

    public void commitOffsetToKafka(String checkpointStr) {
        ConfigUtils.checkRequiredProperties(this.props, Collections.singletonList("group.id"));
        Map<TopicPartition, Long> offsetMap = CheckpointUtils.strToOffsets(checkpointStr);
        HashMap<TopicPartition, OffsetAndMetadata> offsetAndMetadataMap = new HashMap<TopicPartition, OffsetAndMetadata>(offsetMap.size());
        try (KafkaConsumer consumer = new KafkaConsumer(this.kafkaParams);){
            offsetMap.forEach((topicPartition, offset) -> offsetAndMetadataMap.put((TopicPartition)topicPartition, new OffsetAndMetadata((long)offset)));
            consumer.commitSync(offsetAndMetadataMap);
        }
        catch (CommitFailedException | TimeoutException e) {
            LOG.warn("Committing offsets to Kafka failed, this does not impact processing of records", (Throwable)e);
        }
    }

    private Map<TopicPartition, Long> getGroupOffsets(KafkaConsumer consumer, Set<TopicPartition> topicPartitions) {
        Map<TopicPartition, Long> fromOffsets = new HashMap<TopicPartition, Long>();
        for (TopicPartition topicPartition : topicPartitions) {
            OffsetAndMetadata committedOffsetAndMetadata = consumer.committed(topicPartition);
            if (committedOffsetAndMetadata != null) {
                fromOffsets.put(topicPartition, committedOffsetAndMetadata.offset());
                continue;
            }
            LOG.warn("There are no commits associated with this consumer group, starting to consume from latest offset");
            fromOffsets = consumer.endOffsets(topicPartitions);
            break;
        }
        return fromOffsets;
    }

    public static class CheckpointUtils {
        private static final Pattern PATTERN = Pattern.compile(".*,.*:.*");

        public static Map<TopicPartition, Long> strToOffsets(String checkpointStr) {
            HashMap<TopicPartition, Long> offsetMap = new HashMap<TopicPartition, Long>();
            String[] splits = checkpointStr.split(",");
            String topic = splits[0];
            for (int i = 1; i < splits.length; ++i) {
                String[] subSplits = splits[i].split(":");
                offsetMap.put(new TopicPartition(topic, Integer.parseInt(subSplits[0])), Long.parseLong(subSplits[1]));
            }
            return offsetMap;
        }

        public static String offsetsToStr(OffsetRange[] ranges) {
            ranges = CheckpointUtils.mergeRangesByTopicPartition(ranges);
            StringBuilder sb = new StringBuilder();
            sb.append(ranges[0].topic() + ",");
            sb.append(Arrays.stream(ranges).map(r -> String.format("%s:%d", r.partition(), r.untilOffset())).collect(Collectors.joining(",")));
            return sb.toString();
        }

        public static OffsetRange[] computeOffsetRanges(Map<TopicPartition, Long> fromOffsetMap, Map<TopicPartition, Long> toOffsetMap, long numEvents, long minPartitions) {
            OffsetRange[] ranges = toOffsetMap.keySet().stream().map(tp -> {
                long fromOffset = fromOffsetMap.getOrDefault(tp, 0L);
                return OffsetRange.create(tp, fromOffset, (Long)toOffsetMap.get(tp));
            }).sorted(SORT_BY_PARTITION).collect(Collectors.toList()).toArray(new OffsetRange[toOffsetMap.size()]);
            LOG.debug("numEvents {}, minPartitions {}, ranges {}", new Object[]{numEvents, minPartitions, ranges});
            long actualNumEvents = Math.min(CheckpointUtils.totalNewMessages(ranges), numEvents);
            minPartitions = Math.max(minPartitions, (long)toOffsetMap.size());
            long eventsPerPartition = Math.max(1L, actualNumEvents / minPartitions);
            long allocatedEvents = 0L;
            HashMap<TopicPartition, List> finalRanges = new HashMap<TopicPartition, List>();
            HashMap<TopicPartition, Long> partitionToAllocatedOffset = new HashMap<TopicPartition, Long>();
            block0: while (allocatedEvents < actualNumEvents) {
                for (OffsetRange range : ranges) {
                    if (allocatedEvents == actualNumEvents) continue block0;
                    long startOffset = range.fromOffset();
                    if (partitionToAllocatedOffset.containsKey(range.topicPartition())) {
                        startOffset = (Long)partitionToAllocatedOffset.get(range.topicPartition());
                    }
                    long eventsForThisPartition = Math.min(eventsPerPartition, actualNumEvents - allocatedEvents);
                    long toOffset = -1L;
                    toOffset = startOffset + eventsForThisPartition > startOffset ? Math.min(range.untilOffset(), startOffset + eventsForThisPartition) : range.untilOffset();
                    allocatedEvents += toOffset - startOffset;
                    OffsetRange thisRange = OffsetRange.create(range.topicPartition(), startOffset, toOffset);
                    if (!finalRanges.containsKey(range.topicPartition())) {
                        finalRanges.put(range.topicPartition(), new ArrayList<OffsetRange>(Collections.singleton(thisRange)));
                        partitionToAllocatedOffset.put(range.topicPartition(), thisRange.untilOffset());
                        continue;
                    }
                    if (toOffset <= startOffset) continue;
                    ((List)finalRanges.get(range.topicPartition())).add(thisRange);
                    partitionToAllocatedOffset.put(range.topicPartition(), thisRange.untilOffset());
                }
            }
            Map<TopicPartition, List> missedRanges = fromOffsetMap.entrySet().stream().filter(kv -> !finalRanges.containsKey(kv.getKey())).map(kv -> Pair.of(kv.getKey(), Collections.singletonList(OffsetRange.create((TopicPartition)kv.getKey(), (Long)kv.getValue(), (Long)kv.getValue())))).collect(Collectors.toMap(Pair::getKey, Pair::getValue));
            finalRanges.putAll(missedRanges);
            Object[] sortedRangeArray = (OffsetRange[])finalRanges.values().stream().flatMap(Collection::stream).sorted(SORT_BY_PARTITION).toArray(OffsetRange[]::new);
            if (actualNumEvents == 0L) {
                sortedRangeArray = ranges;
            }
            LOG.info("final ranges {}", (Object)Arrays.toString(sortedRangeArray));
            return sortedRangeArray;
        }

        public static OffsetRange[] mergeRangesByTopicPartition(OffsetRange[] oldRanges) {
            ArrayList<OffsetRange> newRanges = new ArrayList<OffsetRange>();
            Map<TopicPartition, List<OffsetRange>> tpOffsets = Arrays.stream(oldRanges).collect(Collectors.groupingBy(OffsetRange::topicPartition));
            for (Map.Entry<TopicPartition, List<OffsetRange>> entry : tpOffsets.entrySet()) {
                long from = entry.getValue().stream().map(OffsetRange::fromOffset).min(Long::compare).get();
                long until = entry.getValue().stream().map(OffsetRange::untilOffset).max(Long::compare).get();
                newRanges.add(OffsetRange.create(entry.getKey(), from, until));
            }
            newRanges.sort(SORT_BY_PARTITION);
            return newRanges.toArray(new OffsetRange[0]);
        }

        public static long totalNewMessages(OffsetRange[] ranges) {
            return Arrays.stream(ranges).mapToLong(OffsetRange::count).sum();
        }

        public static boolean checkTopicCheckpoint(Option<String> lastCheckpointStr) {
            Matcher matcher = PATTERN.matcher(lastCheckpointStr.get());
            return matcher.matches();
        }
    }
}

