/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.clients.consumer;

import java.io.ByteArrayOutputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerGroupMetadata;
import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor;
import org.apache.kafka.clients.consumer.internals.PartitionAssignorAdapter;
import org.apache.kafka.common.TopicPartition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class PulsarConsumerCoordinator {
    private static final Logger log = LoggerFactory.getLogger(PulsarConsumerCoordinator.class);

    public static void invokePartitionsAssigned(String groupId, ConsumerConfig config, List<TopicPartition> assignedPartitions) {
        List assignors = PartitionAssignorAdapter.getAssignorInstances((List)config.getList("partition.assignment.strategy"), (Map)config.originals());
        ConsumerGroupMetadata groupMetadata = new ConsumerGroupMetadata(groupId);
        ByteBuffer bbInfo = new AssignmentInfo(assignedPartitions).encode();
        ConsumerPartitionAssignor.Assignment assignment = new ConsumerPartitionAssignor.Assignment(assignedPartitions, bbInfo);
        for (ConsumerPartitionAssignor assignor : assignors) {
            assignor.onAssignment(assignment, groupMetadata);
        }
    }

    private static class AssignmentInfo {
        private final int usedVersion = 2;
        private final List<TopicPartition> partitions;

        public AssignmentInfo(List<TopicPartition> partitions) {
            this.partitions = partitions;
        }

        public ByteBuffer encode() {
            ByteBuffer byteBuffer;
            ByteArrayOutputStream baos = new ByteArrayOutputStream();
            try (DataOutputStream out = new DataOutputStream(baos);){
                out.writeInt(2);
                this.encodeActiveAndStandbyTaskAssignment(out, this.partitions);
                this.encodePartitionsByHost(out);
                out.flush();
                out.close();
                byteBuffer = ByteBuffer.wrap(baos.toByteArray());
            }
            return byteBuffer;
        }

        private void encodeActiveAndStandbyTaskAssignment(DataOutputStream out, List<TopicPartition> partitions) throws IOException {
            int lastId = 0;
            HashMap<String, Integer> topicGroupIds = new HashMap<String, Integer>();
            out.writeInt(partitions.size());
            for (TopicPartition p : partitions) {
                int topicGroupId;
                if (topicGroupIds.containsKey(p.topic())) {
                    topicGroupId = (Integer)topicGroupIds.get(p.topic());
                } else {
                    topicGroupId = lastId++;
                    topicGroupIds.put(p.topic(), topicGroupId);
                }
                out.writeInt(topicGroupId);
                out.writeInt(p.partition());
            }
            out.writeInt(0);
        }

        private void encodePartitionsByHost(DataOutputStream out) throws IOException {
            out.writeInt(1);
            this.writeHostInfo(out, "fakeHost", 9999);
            this.writeTopicPartitions(out, this.partitions);
        }

        private void writeHostInfo(DataOutputStream out, String host, int port) throws IOException {
            out.writeUTF(host);
            out.writeInt(port);
        }

        private void writeTopicPartitions(DataOutputStream out, List<TopicPartition> partitions) throws IOException {
            out.writeInt(partitions.size());
            for (TopicPartition partition : partitions) {
                out.writeUTF(partition.topic());
                out.writeInt(partition.partition());
            }
        }
    }
}

