/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.utilities.sources.helpers;

import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.stream.Collectors;
import kafka.common.TopicAndPartition;
import org.apache.hudi.DataSourceUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.TypedProperties;
import org.apache.hudi.exception.HoodieNotSupportedException;
import org.apache.hudi.utilities.exception.HoodieDeltaStreamerException;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.spark.streaming.kafka.KafkaCluster;
import org.apache.spark.streaming.kafka.OffsetRange;
import scala.Predef;
import scala.collection.JavaConverters;
import scala.collection.immutable.Map;
import scala.collection.mutable.ArrayBuffer;
import scala.collection.mutable.Set;
import scala.collection.mutable.StringBuilder;
import scala.util.Either;

public class KafkaOffsetGen {
    private static volatile Logger log = LogManager.getLogger(KafkaOffsetGen.class);
    private static long DEFAULT_MAX_EVENTS_TO_READ = 1000000L;
    private final HashMap<String, String> kafkaParams;
    private final TypedProperties props;
    protected final String topicName;

    public KafkaOffsetGen(TypedProperties props) {
        this.props = props;
        this.kafkaParams = new HashMap();
        for (Object prop : props.keySet()) {
            this.kafkaParams.put(prop.toString(), props.getString(prop.toString()));
        }
        DataSourceUtils.checkRequiredProperties((TypedProperties)props, Collections.singletonList("hoodie.deltastreamer.source.kafka.topic"));
        this.topicName = props.getString("hoodie.deltastreamer.source.kafka.topic");
    }

    public OffsetRange[] getNextOffsetRanges(Option<String> lastCheckpointStr, long sourceLimit) {
        HashMap<Object, Object> fromOffsets;
        KafkaCluster cluster = new KafkaCluster(ScalaHelpers.toScalaMap(this.kafkaParams));
        Either either = cluster.getPartitions(ScalaHelpers.toScalaSet(new HashSet<String>(Collections.singletonList(this.topicName))));
        if (either.isLeft()) {
            throw new HoodieDeltaStreamerException("Error obtaining partition metadata", (Throwable)((ArrayBuffer)either.left().get()).head());
        }
        scala.collection.immutable.Set topicPartitions = (scala.collection.immutable.Set)either.right().get();
        if (lastCheckpointStr.isPresent()) {
            fromOffsets = this.checkupValidOffsets(cluster, lastCheckpointStr, (scala.collection.immutable.Set<TopicAndPartition>)topicPartitions);
        } else {
            KafkaResetOffsetStrategies autoResetValue = KafkaResetOffsetStrategies.valueOf(this.props.getString("auto.offset.reset", Config.DEFAULT_AUTO_RESET_OFFSET.toString()).toUpperCase());
            switch (autoResetValue) {
                case SMALLEST: {
                    fromOffsets = new HashMap(ScalaHelpers.toJavaMap((Map)cluster.getEarliestLeaderOffsets(topicPartitions).right().get()));
                    break;
                }
                case LARGEST: {
                    fromOffsets = new HashMap(ScalaHelpers.toJavaMap((Map)cluster.getLatestLeaderOffsets(topicPartitions).right().get()));
                    break;
                }
                default: {
                    throw new HoodieNotSupportedException("Auto reset value must be one of 'smallest' or 'largest' ");
                }
            }
        }
        HashMap<TopicAndPartition, KafkaCluster.LeaderOffset> toOffsets = new HashMap<TopicAndPartition, KafkaCluster.LeaderOffset>(ScalaHelpers.toJavaMap((Map)cluster.getLatestLeaderOffsets(topicPartitions).right().get()));
        long numEvents = Math.min(DEFAULT_MAX_EVENTS_TO_READ, sourceLimit);
        OffsetRange[] offsetRanges = CheckpointUtils.computeOffsetRanges(fromOffsets, toOffsets, numEvents);
        return offsetRanges;
    }

    private HashMap<TopicAndPartition, KafkaCluster.LeaderOffset> checkupValidOffsets(KafkaCluster cluster, Option<String> lastCheckpointStr, scala.collection.immutable.Set<TopicAndPartition> topicPartitions) {
        HashMap<TopicAndPartition, KafkaCluster.LeaderOffset> checkpointOffsets = CheckpointUtils.strToOffsets((String)lastCheckpointStr.get());
        HashMap earliestOffsets = new HashMap(ScalaHelpers.toJavaMap((Map)cluster.getEarliestLeaderOffsets(topicPartitions).right().get()));
        boolean checkpointOffsetReseter = checkpointOffsets.entrySet().stream().anyMatch(offset -> ((KafkaCluster.LeaderOffset)offset.getValue()).offset() < ((KafkaCluster.LeaderOffset)earliestOffsets.get(offset.getKey())).offset());
        return checkpointOffsetReseter ? earliestOffsets : checkpointOffsets;
    }

    public String getTopicName() {
        return this.topicName;
    }

    public HashMap<String, String> getKafkaParams() {
        return this.kafkaParams;
    }

    static class Config {
        private static final String KAFKA_TOPIC_NAME = "hoodie.deltastreamer.source.kafka.topic";
        private static final KafkaResetOffsetStrategies DEFAULT_AUTO_RESET_OFFSET = KafkaResetOffsetStrategies.LARGEST;

        Config() {
        }
    }

    static enum KafkaResetOffsetStrategies {
        LARGEST,
        SMALLEST;

    }

    static class ScalaHelpers {
        ScalaHelpers() {
        }

        public static <K, V> Map<K, V> toScalaMap(HashMap<K, V> m) {
            return ((scala.collection.mutable.Map)JavaConverters.mapAsScalaMapConverter(m).asScala()).toMap(Predef.conforms());
        }

        public static scala.collection.immutable.Set<String> toScalaSet(HashSet<String> s) {
            return ((Set)JavaConverters.asScalaSetConverter(s).asScala()).toSet();
        }

        public static <K, V> java.util.Map<K, V> toJavaMap(Map<K, V> m) {
            return (java.util.Map)JavaConverters.mapAsJavaMapConverter(m).asJava();
        }
    }

    public static class CheckpointUtils {
        public static HashMap<TopicAndPartition, KafkaCluster.LeaderOffset> strToOffsets(String checkpointStr) {
            HashMap<TopicAndPartition, KafkaCluster.LeaderOffset> offsetMap = new HashMap<TopicAndPartition, KafkaCluster.LeaderOffset>();
            if (checkpointStr.length() == 0) {
                return offsetMap;
            }
            String[] splits = checkpointStr.split(",");
            String topic = splits[0];
            for (int i = 1; i < splits.length; ++i) {
                String[] subSplits = splits[i].split(":");
                offsetMap.put(new TopicAndPartition(topic, Integer.parseInt(subSplits[0])), new KafkaCluster.LeaderOffset("", -1, Long.parseLong(subSplits[1])));
            }
            return offsetMap;
        }

        public static String offsetsToStr(OffsetRange[] ranges) {
            StringBuilder sb = new StringBuilder();
            sb.append(ranges[0].topic() + ",");
            sb.append(Arrays.stream(ranges).map(r -> String.format("%s:%d", r.partition(), r.untilOffset())).collect(Collectors.joining(",")));
            return sb.toString();
        }

        public static OffsetRange[] computeOffsetRanges(HashMap<TopicAndPartition, KafkaCluster.LeaderOffset> fromOffsetMap, HashMap<TopicAndPartition, KafkaCluster.LeaderOffset> toOffsetMap, long numEvents) {
            Comparator<OffsetRange> byPartition = Comparator.comparing(OffsetRange::partition);
            OffsetRange[] ranges = new OffsetRange[toOffsetMap.size()];
            toOffsetMap.entrySet().stream().map(e -> {
                TopicAndPartition tp = (TopicAndPartition)e.getKey();
                long fromOffset = fromOffsetMap.getOrDefault(tp, new KafkaCluster.LeaderOffset("", -1, 0L)).offset();
                return OffsetRange.create((TopicAndPartition)tp, (long)fromOffset, (long)fromOffset);
            }).sorted(byPartition).collect(Collectors.toList()).toArray(ranges);
            long allocedEvents = 0L;
            HashSet<Integer> exhaustedPartitions = new HashSet<Integer>();
            while (allocedEvents < numEvents && exhaustedPartitions.size() < toOffsetMap.size()) {
                long remainingEvents = numEvents - allocedEvents;
                long eventsPerPartition = (long)Math.ceil(1.0 * (double)remainingEvents / (double)(toOffsetMap.size() - exhaustedPartitions.size()));
                for (int i = 0; i < ranges.length; ++i) {
                    OffsetRange range = ranges[i];
                    if (exhaustedPartitions.contains(range.partition())) continue;
                    long toOffsetMax = toOffsetMap.get(range.topicAndPartition()).offset();
                    long toOffset = Math.min(toOffsetMax, range.untilOffset() + eventsPerPartition);
                    if (toOffset == toOffsetMax) {
                        exhaustedPartitions.add(range.partition());
                    }
                    if ((allocedEvents += toOffset - range.untilOffset()) > numEvents) {
                        long offsetsToAdd = Math.min(eventsPerPartition, numEvents - allocedEvents);
                        toOffset = Math.min(toOffsetMax, toOffset + offsetsToAdd);
                    }
                    ranges[i] = OffsetRange.create((TopicAndPartition)range.topicAndPartition(), (long)range.fromOffset(), (long)toOffset);
                }
            }
            return ranges;
        }

        public static long totalNewMessages(OffsetRange[] ranges) {
            return Arrays.stream(ranges).mapToLong(OffsetRange::count).sum();
        }
    }
}

