/*
 * Decompiled with CFR 0.152.
 */
package com.github.grantneale.kafka;

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.Configurable;
import org.apache.kafka.common.TopicPartition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class LagBasedPartitionAssignor
implements ConsumerPartitionAssignor,
Configurable {
    private static final Logger LOGGER = LoggerFactory.getLogger(LagBasedPartitionAssignor.class);
    private Properties consumerGroupProps;
    private Properties metadataConsumerProps;
    private KafkaConsumer<byte[], byte[]> metadataConsumer;

    public void configure(Map<String, ?> configs) {
        this.consumerGroupProps = new Properties();
        for (Map.Entry<String, ?> prop : configs.entrySet()) {
            this.consumerGroupProps.put(prop.getKey(), prop.getValue());
        }
        String groupId = this.consumerGroupProps.getProperty("group.id");
        if (groupId == null) {
            throw new IllegalArgumentException("group.id cannot be null when using partition.assignment.strategy=" + this.getClass().getName());
        }
        this.metadataConsumerProps = new Properties();
        this.metadataConsumerProps.putAll((Map<?, ?>)this.consumerGroupProps);
        this.metadataConsumerProps.put("enable.auto.commit", "false");
        String clientId = groupId + ".assignor";
        this.metadataConsumerProps.put("client.id", clientId);
        LOGGER.debug("Configured LagBasedPartitionAssignor with values:\n\tgroup.id = {}\n\tclient.id = {}\n", (Object)groupId, (Object)clientId);
    }

    public String name() {
        return "lag";
    }

    public ConsumerPartitionAssignor.GroupAssignment assign(Cluster metadata, ConsumerPartitionAssignor.GroupSubscription subscriptions) {
        HashSet<String> allSubscribedTopics = new HashSet<String>();
        HashMap<String, List<String>> topicSubscriptions = new HashMap<String, List<String>>();
        for (Map.Entry subscriptionEntry : subscriptions.groupSubscription().entrySet()) {
            List topics = ((ConsumerPartitionAssignor.Subscription)subscriptionEntry.getValue()).topics();
            allSubscribedTopics.addAll(topics);
            topicSubscriptions.put((String)subscriptionEntry.getKey(), topics);
        }
        Map<String, List<TopicPartitionLag>> topicLags = this.readTopicPartitionLags(metadata, allSubscribedTopics);
        Map<String, List<TopicPartition>> rawAssignments = LagBasedPartitionAssignor.assign(topicLags, topicSubscriptions);
        HashMap<String, ConsumerPartitionAssignor.Assignment> assignments = new HashMap<String, ConsumerPartitionAssignor.Assignment>();
        for (Map.Entry<String, List<TopicPartition>> assignmentEntry : rawAssignments.entrySet()) {
            assignments.put(assignmentEntry.getKey(), new ConsumerPartitionAssignor.Assignment(assignmentEntry.getValue()));
        }
        return new ConsumerPartitionAssignor.GroupAssignment(assignments);
    }

    static Map<String, List<TopicPartition>> assign(Map<String, List<TopicPartitionLag>> partitionLagPerTopic, Map<String, List<String>> subscriptions) {
        HashMap<String, List<TopicPartition>> assignment = new HashMap<String, List<TopicPartition>>();
        for (String memberId : subscriptions.keySet()) {
            assignment.put(memberId, new ArrayList());
        }
        Map<String, List<String>> consumersPerTopic = LagBasedPartitionAssignor.consumersPerTopic(subscriptions);
        for (Map.Entry<String, List<String>> topicEntry : consumersPerTopic.entrySet()) {
            LagBasedPartitionAssignor.assignTopic(assignment, topicEntry.getKey(), topicEntry.getValue(), partitionLagPerTopic.getOrDefault(topicEntry.getKey(), Collections.emptyList()));
        }
        return assignment;
    }

    private static void assignTopic(Map<String, List<TopicPartition>> assignment, String topic, List<String> consumers, List<TopicPartitionLag> partitionLags) {
        if (consumers.isEmpty()) {
            return;
        }
        HashMap<String, Long> consumerTotalLags = new HashMap<String, Long>(consumers.size());
        for (String string : consumers) {
            consumerTotalLags.put(string, 0L);
        }
        HashMap<String, Integer> consumerTotalPartitions = new HashMap<String, Integer>(consumers.size());
        for (String memberId : consumers) {
            consumerTotalPartitions.put(memberId, 0);
        }
        partitionLags.sort((p1, p2) -> {
            if (p1.getLag() == p2.getLag()) {
                return Integer.compare(p1.getPartition(), p2.getPartition());
            }
            return Long.compare(p2.getLag(), p1.getLag());
        });
        for (TopicPartitionLag partition : partitionLags) {
            String memberId = (String)Collections.min(consumerTotalLags.entrySet(), (c1, c2) -> {
                int comparePartitionCount = Integer.compare((Integer)consumerTotalPartitions.get(c1.getKey()), (Integer)consumerTotalPartitions.get(c2.getKey()));
                if (comparePartitionCount != 0) {
                    return comparePartitionCount;
                }
                int compareTotalLags = Long.compare((Long)c1.getValue(), (Long)c2.getValue());
                if (compareTotalLags != 0) {
                    return compareTotalLags;
                }
                return ((String)c1.getKey()).compareTo((String)c2.getKey());
            }).getKey();
            assignment.get(memberId).add(new TopicPartition(partition.getTopic(), partition.getPartition()));
            consumerTotalLags.put(memberId, consumerTotalLags.getOrDefault(memberId, 0L) + partition.getLag());
            consumerTotalPartitions.put(memberId, consumerTotalPartitions.getOrDefault(memberId, 0) + 1);
            LOGGER.trace("Assigned partition {}-{} to consumer {}.  partition_lag={}, consumer_current_total_lag={}", new Object[]{partition.getTopic(), partition.getPartition(), memberId, partition.getLag(), consumerTotalLags.get(memberId)});
        }
        if (LOGGER.isDebugEnabled()) {
            StringBuilder stringBuilder = new StringBuilder();
            for (Map.Entry entry : consumerTotalLags.entrySet()) {
                String memberId = (String)entry.getKey();
                stringBuilder.append(String.format("\t%s (total_lag=%d)\n", memberId, consumerTotalLags.get(memberId)));
                for (TopicPartition tp : assignment.getOrDefault(memberId, Collections.emptyList())) {
                    stringBuilder.append(String.format("\t\t%s\n", tp));
                }
            }
            LOGGER.debug("Assignment for {}:\n{}", (Object)topic, (Object)stringBuilder);
        }
    }

    private Map<String, List<TopicPartitionLag>> readTopicPartitionLags(Cluster metadata, Set<String> allSubscribedTopics) {
        if (this.metadataConsumer == null) {
            this.metadataConsumer = new KafkaConsumer(this.metadataConsumerProps);
        }
        HashMap<String, List<TopicPartitionLag>> topicPartitionLags = new HashMap<String, List<TopicPartitionLag>>();
        for (String topic : allSubscribedTopics) {
            List topicPartitionInfo = metadata.partitionsForTopic(topic);
            if (topicPartitionInfo != null && !topicPartitionInfo.isEmpty()) {
                List topicPartitions = topicPartitionInfo.stream().map(p -> new TopicPartition(p.topic(), p.partition())).collect(Collectors.toList());
                topicPartitionLags.put(topic, new ArrayList());
                Map topicBeginOffsets = this.metadataConsumer.beginningOffsets(topicPartitions);
                Map topicEndOffsets = this.metadataConsumer.endOffsets(topicPartitions);
                Map partitionMetadata = this.metadataConsumer.committed(new HashSet(topicPartitions));
                for (TopicPartition partition : topicPartitions) {
                    String autoOffsetResetMode = this.consumerGroupProps.getProperty("auto.offset.reset", "latest");
                    long lag = LagBasedPartitionAssignor.computePartitionLag((OffsetAndMetadata)partitionMetadata.get(partition), topicBeginOffsets.getOrDefault(partition, 0L), topicEndOffsets.getOrDefault(partition, 0L), autoOffsetResetMode);
                    ((List)topicPartitionLags.get(topic)).add(new TopicPartitionLag(topic, partition.partition(), lag));
                }
                continue;
            }
            LOGGER.warn("Skipping assignment for topic {} since no metadata is available", (Object)topic);
        }
        return topicPartitionLags;
    }

    static long computePartitionLag(OffsetAndMetadata partitionMetadata, long beginOffset, long endOffset, String autoOffsetResetMode) {
        long nextOffset = partitionMetadata != null ? partitionMetadata.offset() : (autoOffsetResetMode.equalsIgnoreCase("latest") ? endOffset : beginOffset);
        return Long.max(endOffset - nextOffset, 0L);
    }

    private static Map<String, List<String>> consumersPerTopic(Map<String, List<String>> subscriptions) {
        HashMap<String, List<String>> consumersPerTopic = new HashMap<String, List<String>>();
        for (Map.Entry<String, List<String>> subscriptionEntry : subscriptions.entrySet()) {
            String consumerId = subscriptionEntry.getKey();
            for (String topic : subscriptionEntry.getValue()) {
                List topicConsumers = consumersPerTopic.computeIfAbsent(topic, k -> new ArrayList());
                topicConsumers.add(consumerId);
            }
        }
        return consumersPerTopic;
    }

    static class TopicPartitionLag {
        private final String topic;
        private final int partition;
        private final long lag;

        TopicPartitionLag(String topic, int partition, long lag) {
            this.topic = topic;
            this.partition = partition;
            this.lag = lag;
        }

        String getTopic() {
            return this.topic;
        }

        int getPartition() {
            return this.partition;
        }

        long getLag() {
            return this.lag;
        }
    }
}

