/*
 * Decompiled with CFR 0.152.
 */
package com.playtika.test.kafka;

import com.github.dockerjava.api.DockerClient;
import com.playtika.test.common.utils.ContainerUtils;
import com.playtika.test.kafka.properties.KafkaConfigurationProperties;
import java.util.Collection;
import javax.annotation.PostConstruct;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testcontainers.containers.GenericContainer;

public class KafkaTopicsConfigurer {
    private static final Logger log = LoggerFactory.getLogger(KafkaTopicsConfigurer.class);
    private final GenericContainer kafka;
    private final String kafkaZookeeperConnect;
    private final KafkaConfigurationProperties properties;

    @PostConstruct
    void configure() {
        this.createTopics(this.properties.getTopicsToCreate());
        this.restrictTopics("alice", this.properties.getSecureTopics());
    }

    public void createTopics(Collection<String> topics) {
        if (!topics.isEmpty()) {
            log.info("Creating Kafka topics: {}", topics);
            topics.parallelStream().forEach(this::createTopic);
            log.info("Created Kafka topics: {}", topics);
        }
    }

    private void createTopic(String topic) {
        String[] createTopicCmd = this.getCreateTopicCmd(topic, this.kafkaZookeeperConnect);
        ContainerUtils.ExecCmdResult output = ContainerUtils.execCmd((DockerClient)this.kafka.getDockerClient(), (String)this.kafka.getContainerId(), (String[])createTopicCmd);
        log.debug("Topic={} creation cmd='{}' exitCode={} : {}", new Object[]{topic, createTopicCmd, output.getExitCode(), output.getOutput()});
    }

    private void restrictTopics(String username, Collection<String> topics) {
        if (!topics.isEmpty()) {
            log.info("Creating ACLs for Kafka topics: {}", topics);
            for (String topic : topics) {
                String[] topicConsumerACLsCmd = this.getTopicConsumerACLCmd(username, topic, this.kafkaZookeeperConnect);
                String[] topicProducerACLsCmd = this.getTopicProducerACLCmd(username, topic, this.kafkaZookeeperConnect);
                ContainerUtils.ExecCmdResult topicConsumerACLsOutput = ContainerUtils.execCmd((DockerClient)this.kafka.getDockerClient(), (String)this.kafka.getContainerId(), (String[])topicConsumerACLsCmd);
                ContainerUtils.ExecCmdResult topicProducerACLsOutput = ContainerUtils.execCmd((DockerClient)this.kafka.getDockerClient(), (String)this.kafka.getContainerId(), (String[])topicProducerACLsCmd);
                log.debug("Topic={} consumer ACLs cmd='{}' exitCode={} : {}, producer ACLs cmd='{}' exitCode={} : {}", new Object[]{topic, topicConsumerACLsCmd, topicConsumerACLsOutput.getExitCode(), topicConsumerACLsOutput.getOutput(), topicProducerACLsCmd, topicProducerACLsOutput.getExitCode(), topicProducerACLsOutput.getOutput()});
            }
            log.info("Created ACLs for Kafka topics: {}", topics);
        }
    }

    private String[] getCreateTopicCmd(String topicName, String kafkaZookeeperConnect) {
        return new String[]{"kafka-topics", "--create", "--topic", topicName, "--partitions", "1", "--replication-factor", "1", "--if-not-exists", "--zookeeper", kafkaZookeeperConnect};
    }

    private String[] getTopicConsumerACLCmd(String username, String topicName, String kafkaZookeeperConnect) {
        return new String[]{"kafka-acls", "--authorizer-properties", "zookeeper.connect=" + kafkaZookeeperConnect, "--add", "--allow-principal", "User:" + username, "--consumer", "--topic", topicName, "--group", "*"};
    }

    private String[] getTopicProducerACLCmd(String username, String topicName, String kafkaZookeeperConnect) {
        return new String[]{"kafka-acls", "--authorizer-properties", "zookeeper.connect=" + kafkaZookeeperConnect, "--add", "--allow-principal", "User:" + username, "--producer", "--topic", topicName};
    }

    public KafkaTopicsConfigurer(GenericContainer kafka, String kafkaZookeeperConnect, KafkaConfigurationProperties properties) {
        this.kafka = kafka;
        this.kafkaZookeeperConnect = kafkaZookeeperConnect;
        this.properties = properties;
    }

    public GenericContainer getKafka() {
        return this.kafka;
    }

    public String getKafkaZookeeperConnect() {
        return this.kafkaZookeeperConnect;
    }

    public KafkaConfigurationProperties getProperties() {
        return this.properties;
    }
}

