/*
 * Decompiled with CFR 0.152.
 */
package com.playtika.testcontainer.nativekafka;

import com.playtika.testcontainer.nativekafka.properties.NativeKafkaConfigurationProperties;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;
import lombok.Generated;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.CreateTopicsResult;
import org.apache.kafka.clients.admin.NewTopic;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.InitializingBean;
import org.testcontainers.containers.GenericContainer;
import org.testcontainers.kafka.KafkaContainer;

public class NativeKafkaTopicsConfigurer
implements InitializingBean {
    @Generated
    private static final Logger log = LoggerFactory.getLogger(NativeKafkaTopicsConfigurer.class);
    private static final int DEFAULT_PARTITION_COUNT = 1;
    private final GenericContainer<?> nativeKafka;
    private final NativeKafkaConfigurationProperties nativeKafkaProperties;

    public void afterPropertiesSet() {
        this.createTopics(this.nativeKafkaProperties.getTopicsToCreate(), this.nativeKafkaProperties.getTopicsConfiguration());
    }

    public void createTopics(Collection<String> topics, Collection<NativeKafkaConfigurationProperties.TopicConfiguration> topicsConfiguration) {
        Map<String, NativeKafkaConfigurationProperties.TopicConfiguration> defaultTopicToTopicConfigurationMap = topics.stream().collect(Collectors.toMap(topic -> topic, topic -> new NativeKafkaConfigurationProperties.TopicConfiguration((String)topic, 1)));
        Map<String, NativeKafkaConfigurationProperties.TopicConfiguration> topicToTopicConfigurationMap = topicsConfiguration.stream().collect(Collectors.toMap(NativeKafkaConfigurationProperties.TopicConfiguration::getTopic, topicConfiguration -> topicConfiguration));
        defaultTopicToTopicConfigurationMap.putAll(topicToTopicConfigurationMap);
        Collection<NativeKafkaConfigurationProperties.TopicConfiguration> topicsConfigurationToCreate = defaultTopicToTopicConfigurationMap.values();
        if (!topicsConfigurationToCreate.isEmpty()) {
            log.info("Creating Native Kafka topics for configuration: {}", topicsConfigurationToCreate);
            this.createTopicsUsingAdminClient(topicsConfigurationToCreate);
            log.info("Created Native Kafka topics for configuration: {}", topicsConfigurationToCreate);
        }
    }

    private void createTopicsUsingAdminClient(Collection<NativeKafkaConfigurationProperties.TopicConfiguration> topicsConfigurationToCreate) {
        Properties adminProps = new Properties();
        adminProps.put("bootstrap.servers", ((KafkaContainer)this.nativeKafka).getBootstrapServers());
        try (AdminClient adminClient = AdminClient.create((Properties)adminProps);){
            List newTopics = topicsConfigurationToCreate.stream().map(config -> new NewTopic(config.getTopic(), config.getPartitions(), 1)).collect(Collectors.toList());
            CreateTopicsResult result = adminClient.createTopics(newTopics);
            result.all().get();
            log.debug("Successfully created {} topics using Admin API", (Object)newTopics.size());
        }
        catch (InterruptedException | ExecutionException e) {
            log.error("Failed to create topics using Admin API", (Throwable)e);
            throw new RuntimeException("Failed to create topics", e);
        }
    }

    @Generated
    public NativeKafkaTopicsConfigurer(GenericContainer<?> nativeKafka, NativeKafkaConfigurationProperties nativeKafkaProperties) {
        this.nativeKafka = nativeKafka;
        this.nativeKafkaProperties = nativeKafkaProperties;
    }

    @Generated
    public GenericContainer<?> getNativeKafka() {
        return this.nativeKafka;
    }

    @Generated
    public NativeKafkaConfigurationProperties getNativeKafkaProperties() {
        return this.nativeKafkaProperties;
    }
}

