/*
 * Decompiled with CFR 0.152.
 */
package com.playtika.test.kafka.configuration;

import com.github.dockerjava.api.model.Capability;
import com.playtika.test.common.utils.ContainerUtils;
import com.playtika.test.kafka.KafkaTopicsConfigurer;
import com.playtika.test.kafka.checks.KafkaStatusCheck;
import com.playtika.test.kafka.configuration.ZookeeperContainerConfiguration;
import com.playtika.test.kafka.properties.KafkaConfigurationProperties;
import java.net.URI;
import java.nio.file.Paths;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.LinkedHashMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.condition.ConditionalOnBean;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.DependsOn;
import org.springframework.core.env.ConfigurableEnvironment;
import org.springframework.core.env.MapPropertySource;
import org.springframework.core.env.PropertySource;
import org.testcontainers.containers.BindMode;
import org.testcontainers.containers.FixedHostPortGenericContainer;
import org.testcontainers.containers.GenericContainer;
import org.testcontainers.containers.Network;
import org.testcontainers.containers.wait.strategy.WaitStrategy;
import org.testcontainers.utility.MountableFile;

@Configuration
@ConditionalOnProperty(value={"embedded.kafka.enabled"}, havingValue="true", matchIfMissing=true)
@ConditionalOnBean(value={ZookeeperContainerConfiguration.class})
@EnableConfigurationProperties(value={KafkaConfigurationProperties.class})
public class KafkaContainerConfiguration {
    private static final Logger log = LoggerFactory.getLogger(KafkaContainerConfiguration.class);
    public static final String KAFKA_HOST_NAME = "kafka-broker.testcontainer.docker";
    private static final String DOCKER_HOST = "DOCKER_HOST";

    @Bean
    @ConditionalOnMissingBean
    public KafkaStatusCheck kafkaStartupCheckStrategy(KafkaConfigurationProperties kafkaProperties) {
        return new KafkaStatusCheck(kafkaProperties);
    }

    @Bean(name={"kafka"}, destroyMethod="stop")
    @DependsOn(value={"zookeeper"})
    public GenericContainer kafka(KafkaStatusCheck kafkaStatusCheck, KafkaConfigurationProperties kafkaProperties, @Value(value="${embedded.zookeeper.containerZookeeperConnect}") String containerZookeeperConnect, ConfigurableEnvironment environment, Network network) {
        int kafkaInternalPort = kafkaProperties.getContainerBrokerPort();
        int kafkaExternalPort = kafkaProperties.getBrokerPort();
        int saslPlaintextKafkaExternalPort = kafkaProperties.getSaslPlaintextBrokerPort();
        log.info("Starting kafka broker. Docker image: {}", (Object)kafkaProperties.getDockerImage());
        GenericContainer kafka = ((FixedHostPortGenericContainer)((FixedHostPortGenericContainer)((FixedHostPortGenericContainer)((FixedHostPortGenericContainer)((FixedHostPortGenericContainer)((FixedHostPortGenericContainer)((FixedHostPortGenericContainer)((FixedHostPortGenericContainer)((FixedHostPortGenericContainer)((FixedHostPortGenericContainer)((FixedHostPortGenericContainer)((FixedHostPortGenericContainer)((FixedHostPortGenericContainer)((FixedHostPortGenericContainer)((FixedHostPortGenericContainer)((FixedHostPortGenericContainer)((FixedHostPortGenericContainer)((FixedHostPortGenericContainer)((FixedHostPortGenericContainer)((FixedHostPortGenericContainer)((FixedHostPortGenericContainer)((FixedHostPortGenericContainer)((FixedHostPortGenericContainer)((FixedHostPortGenericContainer)((FixedHostPortGenericContainer)((FixedHostPortGenericContainer)new FixedHostPortGenericContainer(kafkaProperties.getDockerImage()).withLogConsumer(ContainerUtils.containerLogsConsumer((Logger)log))).withCreateContainerCmdModifier(cmd -> cmd.withHostName(KAFKA_HOST_NAME))).withCreateContainerCmdModifier(cmd -> cmd.withCapAdd(new Capability[]{Capability.NET_ADMIN}))).withEnv("KAFKA_ZOOKEEPER_CONNECT", containerZookeeperConnect)).withEnv("KAFKA_BROKER_ID", "-1")).withEnv("KAFKA_LISTENER_SECURITY_PROTOCOL_MAP", "EXTERNAL_PLAINTEXT:PLAINTEXT,EXTERNAL_SASL_PLAINTEXT:SASL_PLAINTEXT,INTERNAL_PLAINTEXT:PLAINTEXT")).withEnv("KAFKA_ADVERTISED_LISTENERS", "EXTERNAL_PLAINTEXT://" + this.kafkaHost() + ":" + kafkaExternalPort + ",EXTERNAL_SASL_PLAINTEXT://" + this.kafkaHost() + ":" + saslPlaintextKafkaExternalPort + ",INTERNAL_PLAINTEXT://" + KAFKA_HOST_NAME + ":" + kafkaInternalPort)).withEnv("KAFKA_LISTENERS", "EXTERNAL_PLAINTEXT://0.0.0.0:" + kafkaExternalPort + ",EXTERNAL_SASL_PLAINTEXT://0.0.0.0:" + saslPlaintextKafkaExternalPort + ",INTERNAL_PLAINTEXT://0.0.0.0:" + kafkaInternalPort)).withEnv("KAFKA_INTER_BROKER_LISTENER_NAME", "INTERNAL_PLAINTEXT")).withEnv("KAFKA_OFFSETS_TOPIC_NUM_PARTITIONS", "1")).withEnv("KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR", String.valueOf(kafkaProperties.getReplicationFactor()))).withEnv("KAFKA_TRANSACTION_STATE_LOG_MIN_ISR", "1")).withEnv("KAFKA_CONFLUENT_SUPPORT_METRICS_ENABLE", "false")).withEnv("KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR", "1")).withEnv("KAFKA_LOG_FLUSH_INTERVAL_MS", String.valueOf(kafkaProperties.getLogFlushIntervalMs()))).withEnv("KAFKA_REPLICA_SOCKET_TIMEOUT_MS", String.valueOf(kafkaProperties.getReplicaSocketTimeoutMs()))).withEnv("KAFKA_CONTROLLER_SOCKET_TIMEOUT_MS", String.valueOf(kafkaProperties.getControllerSocketTimeoutMs()))).withEnv("KAFKA_SASL_ENABLED_MECHANISMS", "PLAIN")).withEnv("ZOOKEEPER_SASL_ENABLED", "false")).withCopyFileToContainer(MountableFile.forClasspathResource((String)"kafka_server_jaas.conf"), "/etc/kafka/kafka_server_jaas.conf")).withEnv("KAFKA_OPTS", "-Djava.security.auth.login.config=/etc/kafka/kafka_server_jaas.conf")).withExposedPorts(new Integer[]{kafkaInternalPort, kafkaExternalPort, saslPlaintextKafkaExternalPort})).withFixedExposedPort(kafkaInternalPort, kafkaInternalPort).withFixedExposedPort(kafkaExternalPort, kafkaExternalPort).withFixedExposedPort(saslPlaintextKafkaExternalPort, saslPlaintextKafkaExternalPort).withNetwork(network)).withNetworkAliases(new String[]{KAFKA_HOST_NAME})).withExtraHost(KAFKA_HOST_NAME, "127.0.0.1")).waitingFor((WaitStrategy)kafkaStatusCheck)).withStartupTimeout(kafkaProperties.getTimeoutDuration());
        KafkaConfigurationProperties.FileSystemBind fileSystemBind = kafkaProperties.getFileSystemBind();
        if (fileSystemBind.isEnabled()) {
            String currentTimestamp = LocalDateTime.now().format(DateTimeFormatter.ofPattern("HH-mm-ss-nnnnnnnnn"));
            String dataFolder = fileSystemBind.getDataFolder();
            String kafkaData = Paths.get(dataFolder, currentTimestamp).toAbsolutePath().toString();
            log.info("Writing kafka data to: {}", (Object)kafkaData);
            kafka.withFileSystemBind(kafkaData, "/var/lib/kafka/data", BindMode.READ_WRITE);
        }
        ContainerUtils.startAndLogTime((GenericContainer)kafka);
        this.registerKafkaEnvironment(kafka, environment, kafkaProperties);
        return kafka;
    }

    private String kafkaHost() {
        String dockerHost = System.getenv(DOCKER_HOST);
        if (dockerHost != null) {
            try {
                String dockerHostHost = new URI(dockerHost).getHost();
                log.info("From {}={} parsed Kafka host: {}", new Object[]{DOCKER_HOST, dockerHost, dockerHostHost});
                return dockerHostHost;
            }
            catch (Exception e) {
                log.info("Failed to parse {}={}, use localhost instead: {}", new Object[]{DOCKER_HOST, dockerHost, e.getMessage()});
            }
        }
        log.info("Use localhost as Kafka host");
        return "localhost";
    }

    private void registerKafkaEnvironment(GenericContainer kafka, ConfigurableEnvironment environment, KafkaConfigurationProperties kafkaProperties) {
        LinkedHashMap<String, String> map = new LinkedHashMap<String, String>();
        String host = kafka.getContainerIpAddress();
        String kafkaBrokerList = String.format("%s:%d", host, kafkaProperties.getBrokerPort());
        map.put("embedded.kafka.brokerList", kafkaBrokerList);
        String saslPlaintextKafkaBrokerList = String.format("%s:%d", host, kafkaProperties.getSaslPlaintextBrokerPort());
        map.put("embedded.kafka.saslPlaintext.brokerList", saslPlaintextKafkaBrokerList);
        map.put("embedded.kafka.saslPlaintext.user", "alice");
        map.put("embedded.kafka.saslPlaintext.password", "alice-secret");
        Integer mappedPort = kafka.getMappedPort(kafkaProperties.getContainerBrokerPort());
        String kafkaBrokerListForContainers = String.format("%s:%d", KAFKA_HOST_NAME, mappedPort);
        map.put("embedded.kafka.containerBrokerList", kafkaBrokerListForContainers);
        MapPropertySource propertySource = new MapPropertySource("embeddedKafkaInfo", map);
        log.info("Started kafka broker. Connection details: {}", map);
        environment.getPropertySources().addFirst((PropertySource)propertySource);
    }

    @Bean
    public KafkaTopicsConfigurer kafkaConfigurer(GenericContainer zookeeper, GenericContainer kafka, KafkaConfigurationProperties kafkaProperties, @Value(value="${embedded.zookeeper.containerZookeeperConnect}") String containerZookeeperConnect) {
        return new KafkaTopicsConfigurer(kafka, containerZookeeperConnect, kafkaProperties);
    }
}

