/*
 * Decompiled with CFR 0.152.
 */
package com.playtika.test.kafka.configuration;

import com.playtika.test.common.utils.ContainerUtils;
import com.playtika.test.kafka.KafkaTopicsConfigurer;
import com.playtika.test.kafka.checks.KafkaStatusCheck;
import com.playtika.test.kafka.configuration.ZookeeperContainerConfiguration;
import com.playtika.test.kafka.properties.KafkaConfigurationProperties;
import java.nio.file.Paths;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.LinkedHashMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.condition.ConditionalOnBean;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.env.ConfigurableEnvironment;
import org.springframework.core.env.MapPropertySource;
import org.springframework.core.env.PropertySource;
import org.testcontainers.containers.BindMode;
import org.testcontainers.containers.FixedHostPortGenericContainer;
import org.testcontainers.containers.GenericContainer;
import org.testcontainers.containers.Network;
import org.testcontainers.containers.wait.strategy.WaitStrategy;

@Configuration
@ConditionalOnProperty(value={"embedded.kafka.enabled"}, havingValue="true", matchIfMissing=true)
@ConditionalOnBean(value={ZookeeperContainerConfiguration.class})
@EnableConfigurationProperties(value={KafkaConfigurationProperties.class})
public class KafkaContainerConfiguration {
    private static final Logger log = LoggerFactory.getLogger(KafkaContainerConfiguration.class);
    public static final String KAFKA_HOST_NAME = "kafka-broker.testcontainer.docker";

    @Bean
    @ConditionalOnMissingBean
    public KafkaStatusCheck kafkaStartupCheckStrategy(KafkaConfigurationProperties kafkaProperties) {
        return new KafkaStatusCheck(kafkaProperties);
    }

    @Bean(name={"kafka"}, destroyMethod="stop")
    public GenericContainer kafka(GenericContainer zookeeper, KafkaStatusCheck kafkaStatusCheck, KafkaConfigurationProperties kafkaProperties, @Value(value="${embedded.zookeeper.containerZookeeperConnect}") String containerZookeeperConnect, ConfigurableEnvironment environment, Network network) {
        int kafkaInternalPort = kafkaProperties.getContainerBrokerPort();
        int kafkaExternalPort = kafkaProperties.getBrokerPort();
        String currentTimestamp = LocalDateTime.now().format(DateTimeFormatter.ofPattern("HH-mm-ss-nnnnnnnnn"));
        String kafkaData = Paths.get(kafkaProperties.getDataFileSystemBind(), currentTimestamp).toAbsolutePath().toString();
        log.info("Writing kafka data to: {}", (Object)kafkaData);
        log.info("Starting kafka broker. Docker image: {}", (Object)kafkaProperties.getDockerImage());
        GenericContainer kafka = ((FixedHostPortGenericContainer)((FixedHostPortGenericContainer)((FixedHostPortGenericContainer)((FixedHostPortGenericContainer)((FixedHostPortGenericContainer)((FixedHostPortGenericContainer)((FixedHostPortGenericContainer)((FixedHostPortGenericContainer)((FixedHostPortGenericContainer)((FixedHostPortGenericContainer)((FixedHostPortGenericContainer)((FixedHostPortGenericContainer)((FixedHostPortGenericContainer)((FixedHostPortGenericContainer)((FixedHostPortGenericContainer)((FixedHostPortGenericContainer)((FixedHostPortGenericContainer)((FixedHostPortGenericContainer)((FixedHostPortGenericContainer)new FixedHostPortGenericContainer(kafkaProperties.getDockerImage()).withLogConsumer(ContainerUtils.containerLogsConsumer((Logger)log))).withCreateContainerCmdModifier(cmd -> cmd.withHostName(KAFKA_HOST_NAME))).withEnv("KAFKA_ZOOKEEPER_CONNECT", containerZookeeperConnect)).withEnv("KAFKA_BROKER_ID", "-1")).withEnv("KAFKA_LISTENER_SECURITY_PROTOCOL_MAP", "EXTERNAL_PLAINTEXT:PLAINTEXT,INTERNAL_PLAINTEXT:PLAINTEXT")).withEnv("KAFKA_ADVERTISED_LISTENERS", "EXTERNAL_PLAINTEXT://localhost:" + kafkaExternalPort + ",INTERNAL_PLAINTEXT://" + KAFKA_HOST_NAME + ":" + kafkaInternalPort)).withEnv("KAFKA_LISTENERS", "EXTERNAL_PLAINTEXT://0.0.0.0:" + kafkaExternalPort + ",INTERNAL_PLAINTEXT://0.0.0.0:" + kafkaInternalPort)).withEnv("KAFKA_INTER_BROKER_LISTENER_NAME", "INTERNAL_PLAINTEXT")).withEnv("KAFKA_OFFSETS_TOPIC_NUM_PARTITIONS", "1")).withEnv("KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR", String.valueOf(kafkaProperties.getReplicationFactor()))).withEnv("KAFKA_LOG_FLUSH_INTERVAL_MS", String.valueOf(kafkaProperties.getLogFlushIntervalMs()))).withEnv("KAFKA_REPLICA_SOCKET_TIMEOUT_MS", String.valueOf(kafkaProperties.getReplicaSocketTimeoutMs()))).withEnv("KAFKA_CONTROLLER_SOCKET_TIMEOUT_MS", String.valueOf(kafkaProperties.getControllerSocketTimeoutMs()))).withFileSystemBind(kafkaData, "/var/lib/kafka/data", BindMode.READ_WRITE)).withExposedPorts(new Integer[]{kafkaInternalPort, kafkaExternalPort})).withFixedExposedPort(kafkaInternalPort, kafkaInternalPort).withFixedExposedPort(kafkaExternalPort, kafkaExternalPort).withNetwork(network)).withNetworkAliases(new String[]{KAFKA_HOST_NAME})).withExtraHost(KAFKA_HOST_NAME, "127.0.0.1")).waitingFor((WaitStrategy)kafkaStatusCheck)).withStartupTimeout(kafkaProperties.getTimeoutDuration());
        kafka.start();
        this.registerKafkaEnvironment(kafka, environment, kafkaProperties);
        return kafka;
    }

    private void registerKafkaEnvironment(GenericContainer kafka, ConfigurableEnvironment environment, KafkaConfigurationProperties kafkaProperties) {
        LinkedHashMap<String, String> map = new LinkedHashMap<String, String>();
        String host = kafka.getContainerIpAddress();
        String kafkaBrokerList = String.format("%s:%d", host, kafkaProperties.getBrokerPort());
        map.put("embedded.kafka.brokerList", kafkaBrokerList);
        Integer mappedPort = kafka.getMappedPort(kafkaProperties.getContainerBrokerPort());
        String kafkaBrokerListForContainers = String.format("%s:%d", KAFKA_HOST_NAME, mappedPort);
        map.put("embedded.kafka.containerBrokerList", kafkaBrokerListForContainers);
        MapPropertySource propertySource = new MapPropertySource("embeddedKafkaInfo", map);
        log.info("Started kafka broker. Connection details: {}", map);
        environment.getPropertySources().addFirst((PropertySource)propertySource);
    }

    @Bean
    public KafkaTopicsConfigurer kafkaConfigurer(GenericContainer zookeeper, GenericContainer kafka, KafkaConfigurationProperties kafkaProperties, @Value(value="${embedded.zookeeper.containerZookeeperConnect}") String containerZookeeperConnect) {
        return new KafkaTopicsConfigurer(kafka, containerZookeeperConnect, kafkaProperties);
    }
}

