/*
 * Decompiled with CFR 0.152.
 */
package com.playtika.test.kafka.configuration;

import com.github.dockerjava.api.model.Capability;
import com.playtika.test.common.utils.ContainerUtils;
import com.playtika.test.kafka.KafkaTopicsConfigurer;
import com.playtika.test.kafka.checks.KafkaStatusCheck;
import com.playtika.test.kafka.properties.KafkaConfigurationProperties;
import com.playtika.test.kafka.properties.ZookeeperConfigurationProperties;
import java.nio.file.Paths;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.LinkedHashMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.env.ConfigurableEnvironment;
import org.springframework.core.env.MapPropertySource;
import org.springframework.core.env.PropertySource;
import org.testcontainers.containers.BindMode;
import org.testcontainers.containers.GenericContainer;
import org.testcontainers.containers.KafkaContainer;
import org.testcontainers.containers.Network;
import org.testcontainers.containers.wait.strategy.WaitStrategy;
import org.testcontainers.utility.MountableFile;

@Configuration
@ConditionalOnProperty(value={"embedded.kafka.enabled"}, havingValue="true", matchIfMissing=true)
@EnableConfigurationProperties(value={KafkaConfigurationProperties.class, ZookeeperConfigurationProperties.class})
public class KafkaContainerConfiguration {
    private static final Logger log = LoggerFactory.getLogger(KafkaContainerConfiguration.class);
    public static final String KAFKA_HOST_NAME = "kafka-broker.testcontainer.docker";

    @Bean(destroyMethod="close")
    @ConditionalOnMissingBean(value={Network.class})
    public Network kafkaNetwork() {
        Network network = Network.newNetwork();
        log.info("Created docker Network id={}", (Object)network.getId());
        return network;
    }

    @Bean
    @ConditionalOnMissingBean
    public KafkaStatusCheck kafkaStartupCheckStrategy(KafkaConfigurationProperties kafkaProperties) {
        return new KafkaStatusCheck(kafkaProperties);
    }

    @Bean(name={"kafka"}, destroyMethod="stop")
    public GenericContainer kafka(KafkaStatusCheck kafkaStatusCheck, KafkaConfigurationProperties kafkaProperties, ZookeeperConfigurationProperties zookeeperProperties, ConfigurableEnvironment environment, Network network) {
        final int kafkaInternalPort = kafkaProperties.getContainerBrokerPort();
        final int kafkaExternalPort = kafkaProperties.getBrokerPort();
        final int saslPlaintextKafkaExternalPort = kafkaProperties.getSaslPlaintextBrokerPort();
        String dockerImageVersion = kafkaProperties.getDockerImageVersion();
        log.info("Starting kafka broker. Docker image version: {}", (Object)dockerImageVersion);
        KafkaContainer kafka = (KafkaContainer)((KafkaContainer)((KafkaContainer)((KafkaContainer)((KafkaContainer)((KafkaContainer)((KafkaContainer)((KafkaContainer)((KafkaContainer)((KafkaContainer)((KafkaContainer)((KafkaContainer)((KafkaContainer)((KafkaContainer)((KafkaContainer)((KafkaContainer)((KafkaContainer)((KafkaContainer)((KafkaContainer)((KafkaContainer)((KafkaContainer)((KafkaContainer)((KafkaContainer)((KafkaContainer)((KafkaContainer)new KafkaContainer(dockerImageVersion){

            public String getBootstrapServers() {
                super.getBootstrapServers();
                return "EXTERNAL_PLAINTEXT://" + this.getHost() + ":" + this.getMappedPort(kafkaExternalPort) + ",EXTERNAL_SASL_PLAINTEXT://" + this.getHost() + ":" + this.getMappedPort(saslPlaintextKafkaExternalPort) + ",INTERNAL_PLAINTEXT://" + KafkaContainerConfiguration.KAFKA_HOST_NAME + ":" + kafkaInternalPort;
            }
        }.withLogConsumer(ContainerUtils.containerLogsConsumer((Logger)log))).withCreateContainerCmdModifier(cmd -> cmd.withHostName(KAFKA_HOST_NAME))).withCreateContainerCmdModifier(cmd -> cmd.withCapAdd(new Capability[]{Capability.NET_ADMIN}))).withEmbeddedZookeeper().withEnv("KAFKA_BROKER_ID", "-1")).withEnv("KAFKA_LISTENER_SECURITY_PROTOCOL_MAP", "EXTERNAL_PLAINTEXT:PLAINTEXT,EXTERNAL_SASL_PLAINTEXT:SASL_PLAINTEXT,INTERNAL_PLAINTEXT:PLAINTEXT,BROKER:PLAINTEXT")).withEnv("KAFKA_LISTENERS", "EXTERNAL_PLAINTEXT://0.0.0.0:" + kafkaExternalPort + ",EXTERNAL_SASL_PLAINTEXT://0.0.0.0:" + saslPlaintextKafkaExternalPort + ",INTERNAL_PLAINTEXT://0.0.0.0:" + kafkaInternalPort + ",BROKER://0.0.0.0:9092")).withEnv("KAFKA_INTER_BROKER_LISTENER_NAME", "INTERNAL_PLAINTEXT")).withEnv("KAFKA_OFFSETS_TOPIC_NUM_PARTITIONS", "1")).withEnv("KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR", String.valueOf(kafkaProperties.getReplicationFactor()))).withEnv("KAFKA_TRANSACTION_STATE_LOG_MIN_ISR", "1")).withEnv("KAFKA_CONFLUENT_SUPPORT_METRICS_ENABLE", "false")).withEnv("KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR", "1")).withEnv("KAFKA_LOG_FLUSH_INTERVAL_MS", String.valueOf(kafkaProperties.getLogFlushIntervalMs()))).withEnv("KAFKA_REPLICA_SOCKET_TIMEOUT_MS", String.valueOf(kafkaProperties.getReplicaSocketTimeoutMs()))).withEnv("KAFKA_CONTROLLER_SOCKET_TIMEOUT_MS", String.valueOf(kafkaProperties.getControllerSocketTimeoutMs()))).withEnv("KAFKA_SASL_ENABLED_MECHANISMS", "PLAIN")).withEnv("ZOOKEEPER_SASL_ENABLED", "false")).withCopyFileToContainer(MountableFile.forClasspathResource((String)"kafka_server_jaas.conf"), "/etc/kafka/kafka_server_jaas.conf")).withEnv("KAFKA_OPTS", "-Djava.security.auth.login.config=/etc/kafka/kafka_server_jaas.conf")).withEnv("KAFKA_GC_LOG_OPTS", "-Dnogclog")).withExposedPorts(new Integer[]{kafkaInternalPort, kafkaExternalPort, saslPlaintextKafkaExternalPort, 9093})).withNetwork(network).withNetworkAliases(new String[]{KAFKA_HOST_NAME})).withExtraHost(KAFKA_HOST_NAME, "127.0.0.1")).waitingFor((WaitStrategy)kafkaStatusCheck)).withStartupTimeout(kafkaProperties.getTimeoutDuration());
        this.kafkaFileSystemBind(kafkaProperties, kafka);
        this.zookeperFileSystemBind(zookeeperProperties, kafka);
        ContainerUtils.startAndLogTime((GenericContainer)kafka);
        this.registerKafkaEnvironment((GenericContainer)kafka, environment, kafkaProperties);
        return kafka;
    }

    private void kafkaFileSystemBind(KafkaConfigurationProperties kafkaProperties, KafkaContainer kafka) {
        KafkaConfigurationProperties.FileSystemBind fileSystemBind = kafkaProperties.getFileSystemBind();
        if (fileSystemBind.isEnabled()) {
            String currentTimestamp = LocalDateTime.now().format(DateTimeFormatter.ofPattern("HH-mm-ss-nnnnnnnnn"));
            String dataFolder = fileSystemBind.getDataFolder();
            String kafkaData = Paths.get(dataFolder, currentTimestamp).toAbsolutePath().toString();
            log.info("Writing kafka data to: {}", (Object)kafkaData);
            kafka.withFileSystemBind(kafkaData, "/var/lib/kafka/data", BindMode.READ_WRITE);
        }
    }

    private void zookeperFileSystemBind(ZookeeperConfigurationProperties zookeeperProperties, KafkaContainer kafka) {
        ZookeeperConfigurationProperties.FileSystemBind zookeeperFileSystemBind = zookeeperProperties.getFileSystemBind();
        if (zookeeperFileSystemBind.isEnabled()) {
            String currentTimestamp = LocalDateTime.now().format(DateTimeFormatter.ofPattern("HH-mm-ss-nnnnnnnnn"));
            String dataFolder = zookeeperFileSystemBind.getDataFolder();
            String zkData = Paths.get(dataFolder, currentTimestamp).toAbsolutePath().toString();
            log.info("Writing zookeeper data to: {}", (Object)zkData);
            String txnLogsFolder = zookeeperFileSystemBind.getTxnLogsFolder();
            String zkTransactionLogs = Paths.get(txnLogsFolder, currentTimestamp).toAbsolutePath().toString();
            log.info("Writing zookeeper transaction logs to: {}", (Object)zkTransactionLogs);
            ((KafkaContainer)kafka.withFileSystemBind(zkData, "/var/lib/zookeeper/data", BindMode.READ_WRITE)).withFileSystemBind(zkTransactionLogs, "/var/lib/zookeeper/log", BindMode.READ_WRITE);
        }
    }

    private void registerKafkaEnvironment(GenericContainer kafka, ConfigurableEnvironment environment, KafkaConfigurationProperties kafkaProperties) {
        LinkedHashMap<String, String> map = new LinkedHashMap<String, String>();
        String host = kafka.getContainerIpAddress();
        Integer mappedBrokerPort = kafka.getMappedPort(kafkaProperties.getBrokerPort());
        String kafkaBrokerList = String.format("%s:%d", host, mappedBrokerPort);
        map.put("embedded.kafka.brokerList", kafkaBrokerList);
        Integer mappedSaslBrokerPort = kafka.getMappedPort(kafkaProperties.getSaslPlaintextBrokerPort());
        String saslPlaintextKafkaBrokerList = String.format("%s:%d", host, mappedSaslBrokerPort);
        map.put("embedded.kafka.saslPlaintext.brokerList", saslPlaintextKafkaBrokerList);
        map.put("embedded.kafka.saslPlaintext.user", "alice");
        map.put("embedded.kafka.saslPlaintext.password", "alice-secret");
        Integer containerPort = kafkaProperties.getContainerBrokerPort();
        String kafkaBrokerListForContainers = String.format("%s:%d", KAFKA_HOST_NAME, containerPort);
        map.put("embedded.kafka.containerBrokerList", kafkaBrokerListForContainers);
        MapPropertySource propertySource = new MapPropertySource("embeddedKafkaInfo", map);
        log.info("Started kafka broker. Connection details: {}", map);
        environment.getPropertySources().addFirst((PropertySource)propertySource);
    }

    @Bean
    public KafkaTopicsConfigurer kafkaConfigurer(GenericContainer kafka, KafkaConfigurationProperties kafkaProperties, ZookeeperConfigurationProperties zookeeperProperties) {
        return new KafkaTopicsConfigurer(kafka, zookeeperProperties, kafkaProperties);
    }
}

