/*
 * Decompiled with CFR 0.152.
 */
package com.playtika.test.kafka.configuration;

import com.github.dockerjava.api.model.Link;
import com.playtika.test.common.utils.ContainerUtils;
import com.playtika.test.kafka.KafkaTopicsConfigurer;
import com.playtika.test.kafka.checks.KafkaStatusCheck;
import com.playtika.test.kafka.configuration.ZookeeperContainerConfiguration;
import com.playtika.test.kafka.properties.KafkaConfigurationProperties;
import com.playtika.test.kafka.properties.ZookeeperConfigurationProperties;
import java.nio.file.Paths;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.LinkedHashMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.autoconfigure.condition.ConditionalOnBean;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.env.ConfigurableEnvironment;
import org.springframework.core.env.MapPropertySource;
import org.springframework.core.env.PropertySource;
import org.testcontainers.containers.BindMode;
import org.testcontainers.containers.FixedHostPortGenericContainer;
import org.testcontainers.containers.GenericContainer;
import org.testcontainers.containers.startupcheck.StartupCheckStrategy;

@Configuration
@ConditionalOnProperty(value={"embedded.kafka.enabled"}, havingValue="true", matchIfMissing=true)
@ConditionalOnBean(value={ZookeeperContainerConfiguration.class})
@EnableConfigurationProperties(value={KafkaConfigurationProperties.class})
public class KafkaContainerConfiguration {
    private static final Logger log = LoggerFactory.getLogger(KafkaContainerConfiguration.class);

    @Bean
    @ConditionalOnMissingBean
    public KafkaStatusCheck kafkaStartupCheckStrategy(KafkaConfigurationProperties kafkaProperties) {
        return new KafkaStatusCheck(kafkaProperties);
    }

    @Bean(name={"kafka"}, destroyMethod="stop")
    public GenericContainer kafka(GenericContainer zookeeper, KafkaStatusCheck kafkaStatusCheck, KafkaConfigurationProperties kafkaProperties, ZookeeperConfigurationProperties zookeeperProperties, ConfigurableEnvironment environment) {
        String zookeeperHostname = ContainerUtils.getContainerHostname((GenericContainer)zookeeper);
        int kafkaMappingPort = kafkaProperties.getBrokerPort();
        String kafkaAdvertisedListeners = String.format("PLAINTEXT://localhost:%d", kafkaMappingPort);
        String currentTimestamp = LocalDateTime.now().format(DateTimeFormatter.ofPattern("HH-mm-ss-nnnnnnnnn"));
        String kafkaData = Paths.get(kafkaProperties.getDataFileSystemBind(), currentTimestamp).toAbsolutePath().toString();
        log.info("Writing kafka data to: {}", (Object)kafkaData);
        log.info("Starting kafka broker. Docker image: {}", (Object)kafkaProperties.getDockerImage());
        FixedHostPortGenericContainer kafka = ((FixedHostPortGenericContainer)((FixedHostPortGenericContainer)((FixedHostPortGenericContainer)((FixedHostPortGenericContainer)((FixedHostPortGenericContainer)((FixedHostPortGenericContainer)((FixedHostPortGenericContainer)((FixedHostPortGenericContainer)((FixedHostPortGenericContainer)new FixedHostPortGenericContainer(kafkaProperties.getDockerImage()).withStartupCheckStrategy((StartupCheckStrategy)kafkaStatusCheck)).withLogConsumer(ContainerUtils.containerLogsConsumer((Logger)log))).withEnv("KAFKA_ZOOKEEPER_CONNECT", "zookeeper:" + zookeeperProperties.getZookeeperPort())).withEnv("KAFKA_BROKER_ID", "-1")).withEnv("KAFKA_ADVERTISED_LISTENERS", kafkaAdvertisedListeners)).withEnv("KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR", String.valueOf(kafkaProperties.getReplicationFactor()))).withFileSystemBind(kafkaData, "/var/lib/kafka/data", BindMode.READ_WRITE)).withCreateContainerCmdModifier(cmd -> cmd.withLinks(new Link[]{new Link(zookeeperHostname, "zookeeper")}))).withExposedPorts(new Integer[]{kafkaMappingPort})).withFixedExposedPort(kafkaMappingPort, kafkaMappingPort);
        kafka.start();
        this.registerKafkaEnvironment((GenericContainer)kafka, environment, kafkaProperties);
        return kafka;
    }

    private void registerKafkaEnvironment(GenericContainer kafka, ConfigurableEnvironment environment, KafkaConfigurationProperties kafkaProperties) {
        Integer mappedPort = kafka.getMappedPort(kafkaProperties.getBrokerPort());
        String host = kafka.getContainerIpAddress();
        String kafkaBrokerList = String.format("%s:%d", host, mappedPort);
        LinkedHashMap<String, String> map = new LinkedHashMap<String, String>();
        map.put("embedded.kafka.brokerList", kafkaBrokerList);
        MapPropertySource propertySource = new MapPropertySource("embeddedKafkaInfo", map);
        log.info("Started kafka broker. Connection details: {}", map);
        environment.getPropertySources().addFirst((PropertySource)propertySource);
    }

    @Bean
    public KafkaTopicsConfigurer kafkaConfigurer(GenericContainer kafka, KafkaConfigurationProperties kafkaProperties, ZookeeperConfigurationProperties zookeeperProperties) {
        String zookeeperConnect = String.format("%s:%d", "zookeeper", zookeeperProperties.getZookeeperPort());
        return new KafkaTopicsConfigurer(kafka, zookeeperConnect, kafkaProperties);
    }
}

