/*
 * Decompiled with CFR 0.152.
 */
package com.playtika.testcontainer.kafka;

import com.playtika.testcontainer.common.utils.ContainerUtils;
import com.playtika.testcontainer.kafka.properties.KafkaConfigurationProperties;
import com.playtika.testcontainer.kafka.properties.ZookeeperConfigurationProperties;
import java.util.Collection;
import java.util.Map;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.InitializingBean;
import org.testcontainers.containers.Container;
import org.testcontainers.containers.GenericContainer;

public class KafkaTopicsConfigurer
implements InitializingBean {
    private static final Logger log = LoggerFactory.getLogger(KafkaTopicsConfigurer.class);
    private static final int DEFAULT_PARTITION_COUNT = 1;
    private final GenericContainer<?> kafka;
    private final ZookeeperConfigurationProperties zookeeperProperties;
    private final KafkaConfigurationProperties kafkaProperties;

    public void afterPropertiesSet() {
        this.createTopics(this.kafkaProperties.getTopicsToCreate(), this.kafkaProperties.getTopicsConfiguration());
        this.restrictTopics("alice", this.kafkaProperties.getSecureTopics());
    }

    public void createTopics(Collection<String> topics, Collection<KafkaConfigurationProperties.TopicConfiguration> topicsConfiguration) {
        Map<String, KafkaConfigurationProperties.TopicConfiguration> defaultTopicToTopicConfigurationMap = topics.stream().collect(Collectors.toMap(topic -> topic, topic -> new KafkaConfigurationProperties.TopicConfiguration((String)topic, 1)));
        Map<String, KafkaConfigurationProperties.TopicConfiguration> topicToTopicConfigurationMap = topicsConfiguration.stream().collect(Collectors.toMap(KafkaConfigurationProperties.TopicConfiguration::getTopic, topicConfiguration -> topicConfiguration));
        defaultTopicToTopicConfigurationMap.putAll(topicToTopicConfigurationMap);
        Collection<KafkaConfigurationProperties.TopicConfiguration> topicsConfigurationToCreate = defaultTopicToTopicConfigurationMap.values();
        if (!topicsConfigurationToCreate.isEmpty()) {
            log.info("Creating Kafka topics for configuration: {}", topicsConfigurationToCreate);
            topicsConfigurationToCreate.parallelStream().forEach(this::createTopic);
            log.info("Created Kafka topics for configuration: {}", topicsConfigurationToCreate);
        }
    }

    private void createTopic(KafkaConfigurationProperties.TopicConfiguration topicConfiguration) {
        String topic = topicConfiguration.getTopic();
        int partitions = topicConfiguration.getPartitions();
        String containerBrokerList = String.format("%s:%d", "kafka-broker.testcontainer.docker", this.kafkaProperties.getInternalBrokerPort());
        String[] createTopicCmd = this.getCreateTopicCmd(topic, partitions, containerBrokerList);
        Container.ExecResult execResult = ContainerUtils.executeInContainer(this.kafka, (String[])createTopicCmd);
        log.debug("Topic={} creation cmd='{}' execResult={}", new Object[]{topic, createTopicCmd, execResult});
    }

    private void restrictTopics(String username, Collection<String> topics) {
        if (!topics.isEmpty()) {
            log.info("Creating ACLs for Kafka topics: {}", topics);
            for (String topic : topics) {
                String[] topicConsumerACLsCmd = this.getTopicConsumerACLCmd(username, topic, this.zookeeperProperties.getZookeeperConnect());
                String[] topicProducerACLsCmd = this.getTopicProducerACLCmd(username, topic, this.zookeeperProperties.getZookeeperConnect());
                Container.ExecResult topicConsumerACLsOutput = ContainerUtils.executeInContainer(this.kafka, (String[])topicConsumerACLsCmd);
                Container.ExecResult topicProducerACLsOutput = ContainerUtils.executeInContainer(this.kafka, (String[])topicProducerACLsCmd);
                log.debug("Topic={} consumer ACLs cmd='{}' execResult={}, producer ACLs cmd='{}' execResult={}", new Object[]{topic, topicConsumerACLsCmd, topicConsumerACLsOutput, topicProducerACLsCmd, topicProducerACLsOutput.getExitCode()});
            }
            log.info("Created ACLs for Kafka topics: {}", topics);
        }
    }

    private String[] getCreateTopicCmd(String topicName, int partitions, String kafkaContainerBrokerList) {
        return new String[]{"kafka-topics", "--create", "--topic", topicName, "--partitions", String.valueOf(partitions), "--replication-factor", "1", "--if-not-exists", "--bootstrap-server", kafkaContainerBrokerList};
    }

    private String[] getTopicConsumerACLCmd(String username, String topicName, String kafkaZookeeperConnect) {
        return new String[]{"kafka-acls", "--authorizer-properties", "zookeeper.connect=" + kafkaZookeeperConnect, "--add", "--allow-principal", "User:" + username, "--consumer", "--topic", topicName, "--group", "*"};
    }

    private String[] getTopicProducerACLCmd(String username, String topicName, String kafkaZookeeperConnect) {
        return new String[]{"kafka-acls", "--authorizer-properties", "zookeeper.connect=" + kafkaZookeeperConnect, "--add", "--allow-principal", "User:" + username, "--producer", "--topic", topicName};
    }

    public KafkaTopicsConfigurer(GenericContainer<?> kafka, ZookeeperConfigurationProperties zookeeperProperties, KafkaConfigurationProperties kafkaProperties) {
        this.kafka = kafka;
        this.zookeeperProperties = zookeeperProperties;
        this.kafkaProperties = kafkaProperties;
    }

    public GenericContainer<?> getKafka() {
        return this.kafka;
    }

    public ZookeeperConfigurationProperties getZookeeperProperties() {
        return this.zookeeperProperties;
    }

    public KafkaConfigurationProperties getKafkaProperties() {
        return this.kafkaProperties;
    }
}

