/*
 * Decompiled with CFR 0.152.
 */
package io.micronaut.configuration.kafka.embedded;

import io.micronaut.configuration.kafka.config.AbstractKafkaConfiguration;
import io.micronaut.configuration.kafka.embedded.KafkaEmbeddedConfiguration;
import io.micronaut.context.annotation.Requirements;
import io.micronaut.context.annotation.Requires;
import io.micronaut.context.event.BeanCreatedEvent;
import io.micronaut.context.event.BeanCreatedEventListener;
import io.micronaut.context.exceptions.ConfigurationException;
import io.micronaut.core.io.socket.SocketUtils;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.attribute.FileAttribute;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Function;
import java.util.stream.Collectors;
import javax.annotation.PreDestroy;
import javax.inject.Singleton;
import kafka.server.KafkaConfig;
import kafka.server.KafkaServer;
import kafka.zk.EmbeddedZookeeper;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.CreateTopicsResult;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.test.TestUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Singleton
@Requirements(value={@Requires(env={"test", "dev"}), @Requires(classes={KafkaServer.class, kafka.utils.TestUtils.class, TestUtils.class}), @Requires(property="kafka.embedded.enabled")})
@Deprecated
public class KafkaEmbedded
implements BeanCreatedEventListener<AbstractKafkaConfiguration>,
AutoCloseable {
    private static final Logger LOG = LoggerFactory.getLogger(KafkaEmbedded.class);
    private EmbeddedZookeeper zkServer;
    private KafkaServer kafkaServer;
    private final KafkaEmbeddedConfiguration embeddedConfiguration;
    private final AtomicBoolean init = new AtomicBoolean(false);

    public KafkaEmbedded(KafkaEmbeddedConfiguration embeddedConfiguration) {
        this.embeddedConfiguration = embeddedConfiguration;
    }

    public synchronized AbstractKafkaConfiguration onCreated(BeanCreatedEvent<AbstractKafkaConfiguration> event) {
        boolean randomPort;
        LOG.warn("Embedded Kafka is deprecated. For Testing please use Test Containers instead: https://www.testcontainers.org/modules/kafka/");
        AbstractKafkaConfiguration config = (AbstractKafkaConfiguration)event.getBean();
        if (this.kafkaServer != null) {
            return config;
        }
        String bootstrapServer = config.getConfig().getProperty("bootstrap.servers");
        String[] hostAndPort = bootstrapServer.split(",")[0].split(":");
        int kafkaPort = -1;
        if (hostAndPort.length == 2) {
            try {
                kafkaPort = Integer.parseInt(hostAndPort[1]);
            }
            catch (NumberFormatException e) {
                return config;
            }
        }
        boolean bl = randomPort = kafkaPort == -1;
        if (this.embeddedConfiguration.isEnabled()) {
            int retries = 0;
            do {
                int targetPort;
                int n = targetPort = randomPort ? SocketUtils.findAvailableTcpPort() : kafkaPort;
                if (this.kafkaServer != null || targetPort <= -1 || !SocketUtils.isTcpPortAvailable((int)targetPort) || !this.init.compareAndSet(false, true)) continue;
                try {
                    if (this.zkServer == null) {
                        this.initZooKeeper();
                    }
                    Properties brokerProps = this.embeddedConfiguration.getProperties();
                    String zkConnect = "127.0.0.1:" + this.zkServer.port();
                    brokerProps.setProperty("zookeeper.connect", zkConnect);
                    brokerProps.putIfAbsent("broker.id", "0");
                    brokerProps.put("port", (Object)targetPort);
                    brokerProps.putIfAbsent("offsets.topic.replication.factor", "1");
                    brokerProps.computeIfAbsent("log.dirs", (Function<? super Object, ?>)((Function<Object, Object>)o -> {
                        try {
                            return Files.createTempDirectory("kafka-", new FileAttribute[0]).toAbsolutePath().toString();
                        }
                        catch (IOException e) {
                            throw new ConfigurationException("Error creating log directory for embedded Kafka server: " + e.getMessage(), (Throwable)e);
                        }
                    }));
                    brokerProps.setProperty("listeners", "PLAINTEXT://127.0.0.1:" + targetPort);
                    KafkaConfig kafkaConfig = new KafkaConfig((Map)brokerProps);
                    this.kafkaServer = kafka.utils.TestUtils.createServer((KafkaConfig)kafkaConfig, (Time)new MockTime());
                    Integer numPartitions = kafkaConfig.numPartitions();
                    LOG.info("Started Embedded Kafka on Port: {}", (Object)targetPort);
                    this.createTopics(targetPort, numPartitions);
                    return config;
                }
                catch (Throwable e) {
                    if (!e.getMessage().contains("Address already in use")) {
                        throw new ConfigurationException("Error starting embedded Kafka server: " + e.getMessage(), e);
                    }
                    ++retries;
                }
            } while (retries < 3);
            throw new ConfigurationException("Error starting embedded Kafka server. Could not start after attempting port binding several times");
        }
        if (kafkaPort > -1) {
            try {
                this.createTopics(kafkaPort, 1);
            }
            catch (Throwable e) {
                throw new ConfigurationException("Error creating Kafka Topics: " + e.getMessage(), e);
            }
        }
        return config;
    }

    private void createTopics(int targetPort, Integer numPartitions) throws InterruptedException, ExecutionException {
        List<String> topics = this.embeddedConfiguration.getTopics();
        LOG.debug("Creating Kafka Topics in Embedded Kafka: {}", topics);
        if (!topics.isEmpty()) {
            Properties properties = new Properties();
            properties.put("bootstrap.servers", "127.0.0.1:" + targetPort);
            AdminClient adminClient = AdminClient.create((Properties)properties);
            CreateTopicsResult result = adminClient.createTopics((Collection)topics.stream().map(s -> new NewTopic(s, numPartitions.intValue(), 1)).collect(Collectors.toList()));
            result.all().get();
            LOG.info("Created Kafka Topics in Embedded Kafka: {}", topics);
        }
    }

    @Override
    @PreDestroy
    public void close() {
        new Thread(() -> {
            if (this.kafkaServer != null) {
                try {
                    this.kafkaServer.shutdown();
                }
                catch (Exception e) {
                    LOG.warn("Error shutting down embedded Kafka Server: {}", (Object)e.getMessage(), (Object)e);
                }
            }
            if (this.zkServer != null) {
                try {
                    this.zkServer.shutdown();
                }
                catch (Exception e) {
                    LOG.warn("Error shutting down embedded ZooKeeper: {}", (Object)e.getMessage(), (Object)e);
                }
            }
        }, "embedded-kafka-shutdown-thread").start();
    }

    public Optional<KafkaServer> getKafkaServer() {
        return Optional.ofNullable(this.kafkaServer);
    }

    public Optional<Integer> getZkPort() {
        if (this.zkServer != null) {
            return Optional.of(this.zkServer.port());
        }
        return Optional.empty();
    }

    private void initZooKeeper() {
        this.zkServer = new EmbeddedZookeeper();
    }
}

