/*
 * Decompiled with CFR 0.152.
 */
package net.mguenther.kafka.junit;

import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.stream.Collectors;
import kafka.server.KafkaConfig$;
import net.mguenther.kafka.junit.EmbeddedConnect;
import net.mguenther.kafka.junit.EmbeddedKafka;
import net.mguenther.kafka.junit.EmbeddedKafkaClusterConfig;
import net.mguenther.kafka.junit.EmbeddedLifecycle;
import net.mguenther.kafka.junit.EmbeddedZooKeeper;
import net.mguenther.kafka.junit.KeyValue;
import net.mguenther.kafka.junit.LeaderAndIsr;
import net.mguenther.kafka.junit.ObserveKeyValues;
import net.mguenther.kafka.junit.ReadKeyValues;
import net.mguenther.kafka.junit.RecordConsumer;
import net.mguenther.kafka.junit.RecordProducer;
import net.mguenther.kafka.junit.SendKeyValues;
import net.mguenther.kafka.junit.SendKeyValuesTransactional;
import net.mguenther.kafka.junit.SendValues;
import net.mguenther.kafka.junit.SendValuesTransactional;
import net.mguenther.kafka.junit.TopicConfig;
import net.mguenther.kafka.junit.TopicManager;
import net.mguenther.kafka.junit.provider.DefaultRecordConsumer;
import net.mguenther.kafka.junit.provider.DefaultRecordProducer;
import net.mguenther.kafka.junit.provider.DefaultTopicManager;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class EmbeddedKafkaCluster
implements EmbeddedLifecycle,
RecordProducer,
RecordConsumer,
TopicManager,
AutoCloseable {
    private static final Logger log = LoggerFactory.getLogger(EmbeddedKafkaCluster.class);
    private final EmbeddedKafkaClusterConfig config;
    private EmbeddedZooKeeper zooKeeper;
    private Map<Integer, EmbeddedKafka> brokers;
    private EmbeddedConnect connect;
    private RecordProducer producerDelegate;
    private RecordConsumer consumerDelegate;
    private TopicManager topicManagerDelegate;

    @Override
    public void start() {
        try {
            this.zooKeeper = new EmbeddedZooKeeper(this.config.getZooKeeperConfig());
            this.zooKeeper.start();
            this.brokers = new HashMap<Integer, EmbeddedKafka>();
            for (int i = 0; i < this.config.getKafkaConfig().getNumberOfBrokers(); ++i) {
                int brokerId = i + 1;
                EmbeddedKafka broker = new EmbeddedKafka(brokerId, this.config.getKafkaConfig().listenerFor(i), this.config.getKafkaConfig(), this.zooKeeper.getConnectString());
                broker.start();
                this.brokers.put(broker.getBrokerId(), broker);
            }
            if (this.config.usesConnect()) {
                this.connect = new EmbeddedConnect(this.config.getConnectConfig(), this.getBrokerList(), this.getClusterId());
                this.connect.start();
            }
            this.producerDelegate = new DefaultRecordProducer(this.getBrokerList());
            this.consumerDelegate = new DefaultRecordConsumer(this.getBrokerList());
            this.topicManagerDelegate = new DefaultTopicManager(this.getBrokerList());
        }
        catch (Exception e) {
            throw new RuntimeException("Unable to start the embedded Kafka cluster.", e);
        }
    }

    @Override
    public void stop() {
        if (this.connect != null) {
            this.connect.stop();
        }
        this.brokers.values().forEach(EmbeddedKafka::stop);
        this.zooKeeper.stop();
    }

    public String getBrokerList() {
        return this.brokers.values().stream().filter(EmbeddedKafka::isActive).map(EmbeddedKafka::getBrokerList).collect(Collectors.joining(", "));
    }

    public String getClusterId() {
        return this.brokers.values().stream().map(EmbeddedKafka::getClusterId).findFirst().orElse("");
    }

    public void disconnect(Integer brokerId) {
        if (!this.brokers.containsKey(brokerId)) {
            log.info("There is no broker with ID {}. Omitting the disconnect request.", (Object)brokerId);
            return;
        }
        EmbeddedKafka broker = this.brokers.get(brokerId);
        broker.deactivate();
    }

    public void connect(Integer brokerId) {
        if (!this.brokers.containsKey(brokerId)) {
            log.info("There is no broker with ID {}. Omitting the connection request.", (Object)brokerId);
            return;
        }
        EmbeddedKafka broker = this.brokers.get(brokerId);
        broker.activate();
    }

    public void connect(Set<Integer> brokerIds) {
        brokerIds.forEach(this::connect);
    }

    public Set<Integer> disconnectUntilIsrFallsBelowMinimumSize(String topic) {
        Properties topicConfig = this.topicManagerDelegate.fetchTopicConfig(topic);
        int minimumIsrSize = Integer.parseInt(topicConfig.getProperty(KafkaConfig$.MODULE$.MinInSyncReplicasProp(), "1"));
        log.info("Attempting to drop the number of brokers in the ISR for topic {} below {}.", (Object)topic, (Object)minimumIsrSize);
        HashSet<Integer> disconnectedBrokers = new HashSet<Integer>();
        Set leaders = this.topicManagerDelegate.fetchLeaderAndIsr(topic).values().stream().map(LeaderAndIsr::getLeader).collect(Collectors.toSet());
        String joinedSetOfLeaders = leaders.stream().map(String::valueOf).collect(Collectors.joining(", "));
        log.info("Active brokers ({}) in the ISR for topic {} are: {}", new Object[]{leaders.size(), topic, joinedSetOfLeaders});
        int currentSizeOfIsr = leaders.size();
        while (currentSizeOfIsr >= minimumIsrSize) {
            Integer brokerId = (Integer)leaders.stream().limit(1L).findFirst().get();
            this.disconnect(brokerId);
            disconnectedBrokers.add(brokerId);
            leaders.remove(brokerId);
            log.info("Disconnected broker with ID {}. The current size of the ISR for topic {} is {}.", new Object[]{brokerId, topic, --currentSizeOfIsr});
        }
        return Collections.unmodifiableSet(disconnectedBrokers);
    }

    @Override
    public <K, V> List<RecordMetadata> send(SendKeyValues<K, V> sendRequest) throws InterruptedException {
        return this.producerDelegate.send(sendRequest);
    }

    @Override
    public <K, V> List<RecordMetadata> send(SendKeyValuesTransactional<K, V> sendRequest) throws InterruptedException {
        return this.producerDelegate.send(sendRequest);
    }

    @Override
    public <V> List<RecordMetadata> send(SendValues<V> sendRequest) throws InterruptedException {
        return this.producerDelegate.send(sendRequest);
    }

    @Override
    public <V> List<RecordMetadata> send(SendValuesTransactional<V> sendRequest) throws InterruptedException {
        return this.producerDelegate.send(sendRequest);
    }

    @Override
    public <V> List<V> readValues(ReadKeyValues<String, V> readRequest) throws InterruptedException {
        return this.consumerDelegate.readValues(readRequest);
    }

    @Override
    public <K, V> List<KeyValue<K, V>> read(ReadKeyValues<K, V> readRequest) throws InterruptedException {
        return this.consumerDelegate.read(readRequest);
    }

    @Override
    public <V> List<V> observeValues(ObserveKeyValues<String, V> observeRequest) throws InterruptedException {
        return this.consumerDelegate.observeValues(observeRequest);
    }

    @Override
    public <K, V> List<KeyValue<K, V>> observe(ObserveKeyValues<K, V> observeRequest) throws InterruptedException {
        return this.consumerDelegate.observe(observeRequest);
    }

    @Override
    public void createTopic(TopicConfig topicConfig) {
        this.topicManagerDelegate.createTopic(topicConfig);
    }

    @Override
    public void deleteTopic(String topic) {
        this.topicManagerDelegate.deleteTopic(topic);
    }

    @Override
    public boolean exists(String topic) {
        return this.topicManagerDelegate.exists(topic);
    }

    @Override
    public Map<Integer, LeaderAndIsr> fetchLeaderAndIsr(String topic) {
        return this.topicManagerDelegate.fetchLeaderAndIsr(topic);
    }

    @Override
    public Properties fetchTopicConfig(String topic) {
        return this.topicManagerDelegate.fetchTopicConfig(topic);
    }

    @Override
    public void close() {
        this.stop();
    }

    public static EmbeddedKafkaCluster provisionWith(EmbeddedKafkaClusterConfig config) {
        return new EmbeddedKafkaCluster(config);
    }

    public static EmbeddedKafkaCluster provisionWith(EmbeddedKafkaClusterConfig.EmbeddedKafkaClusterConfigBuilder builder) {
        return EmbeddedKafkaCluster.provisionWith(builder.build());
    }

    public EmbeddedKafkaCluster(EmbeddedKafkaClusterConfig config) {
        this.config = config;
    }
}

