/*
 * Decompiled with CFR 0.152.
 */
package io.quarkus.kafka.client.deployment;

import io.quarkus.deployment.Feature;
import io.quarkus.deployment.IsNormal;
import io.quarkus.deployment.annotations.BuildStep;
import io.quarkus.deployment.annotations.BuildSteps;
import io.quarkus.deployment.builditem.CuratedApplicationShutdownBuildItem;
import io.quarkus.deployment.builditem.DevServicesResultBuildItem;
import io.quarkus.deployment.builditem.DevServicesSharedNetworkBuildItem;
import io.quarkus.deployment.builditem.DockerStatusBuildItem;
import io.quarkus.deployment.builditem.LaunchModeBuildItem;
import io.quarkus.deployment.console.ConsoleInstalledBuildItem;
import io.quarkus.deployment.console.StartupLogCompressor;
import io.quarkus.deployment.dev.devservices.GlobalDevServicesConfig;
import io.quarkus.deployment.logging.LoggingSetupBuildItem;
import io.quarkus.devservices.common.ConfigureUtil;
import io.quarkus.devservices.common.ContainerLocator;
import io.quarkus.kafka.client.deployment.KafkaBuildTimeConfig;
import io.quarkus.kafka.client.deployment.KafkaDevServicesBuildTimeConfig;
import io.quarkus.kafka.client.deployment.KafkaNativeContainer;
import io.quarkus.kafka.client.deployment.RedpandaBuildTimeConfig;
import io.quarkus.kafka.client.deployment.RedpandaKafkaContainer;
import io.quarkus.runtime.LaunchMode;
import io.quarkus.runtime.configuration.ConfigUtils;
import io.strimzi.test.container.StrimziKafkaContainer;
import java.time.Duration;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.CreateTopicsResult;
import org.apache.kafka.clients.admin.KafkaAdminClient;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.admin.TopicDescription;
import org.eclipse.microprofile.config.Config;
import org.eclipse.microprofile.config.ConfigProvider;
import org.jboss.logging.Logger;
import org.testcontainers.containers.GenericContainer;
import org.testcontainers.utility.DockerImageName;

@BuildSteps(onlyIfNot={IsNormal.class}, onlyIf={GlobalDevServicesConfig.Enabled.class})
public class DevServicesKafkaProcessor {
    private static final Logger log = Logger.getLogger(DevServicesKafkaProcessor.class);
    private static final String KAFKA_BOOTSTRAP_SERVERS = "kafka.bootstrap.servers";
    static final String DEV_SERVICE_LABEL = "quarkus-dev-service-kafka";
    static final int KAFKA_PORT = 9092;
    private static final ContainerLocator kafkaContainerLocator = new ContainerLocator("quarkus-dev-service-kafka", 9092);
    static volatile DevServicesResultBuildItem.RunningDevService devService;
    static volatile KafkaDevServiceCfg cfg;
    static volatile boolean first;

    @BuildStep
    public DevServicesResultBuildItem startKafkaDevService(DockerStatusBuildItem dockerStatusBuildItem, LaunchModeBuildItem launchMode, KafkaBuildTimeConfig kafkaClientBuildTimeConfig, List<DevServicesSharedNetworkBuildItem> devServicesSharedNetworkBuildItem, Optional<ConsoleInstalledBuildItem> consoleInstalledBuildItem, CuratedApplicationShutdownBuildItem closeBuildItem, LoggingSetupBuildItem loggingSetupBuildItem, GlobalDevServicesConfig devServicesConfig) {
        KafkaDevServiceCfg configuration = this.getConfiguration(kafkaClientBuildTimeConfig);
        if (devService != null) {
            boolean shouldShutdownTheBroker;
            boolean bl = shouldShutdownTheBroker = !configuration.equals(cfg);
            if (!shouldShutdownTheBroker) {
                return devService.toBuildItem();
            }
            this.shutdownBroker();
            cfg = null;
        }
        StartupLogCompressor compressor = new StartupLogCompressor((launchMode.isTest() ? "(test) " : "") + "Kafka Dev Services Starting:", consoleInstalledBuildItem, loggingSetupBuildItem);
        try {
            devService = this.startKafka(dockerStatusBuildItem, configuration, launchMode, !devServicesSharedNetworkBuildItem.isEmpty(), devServicesConfig.timeout);
            if (devService == null) {
                compressor.closeAndDumpCaptured();
            } else {
                compressor.close();
            }
        }
        catch (Throwable t) {
            compressor.closeAndDumpCaptured();
            throw new RuntimeException(t);
        }
        if (devService == null) {
            return null;
        }
        if (first) {
            first = false;
            Runnable closeTask = () -> {
                if (devService != null) {
                    this.shutdownBroker();
                }
                first = true;
                devService = null;
                cfg = null;
            };
            closeBuildItem.addCloseTask(closeTask, true);
        }
        cfg = configuration;
        if (devService.isOwner()) {
            log.infof("Dev Services for Kafka started. Other Quarkus applications in dev mode will find the broker automatically. For Quarkus applications in production mode, you can connect to this by starting your application with -Dkafka.bootstrap.servers=%s", (Object)DevServicesKafkaProcessor.getKafkaBootstrapServers());
        }
        this.createTopicPartitions(DevServicesKafkaProcessor.getKafkaBootstrapServers(), configuration);
        return devService.toBuildItem();
    }

    public static String getKafkaBootstrapServers() {
        return (String)devService.getConfig().get(KAFKA_BOOTSTRAP_SERVERS);
    }

    public void createTopicPartitions(String bootstrapServers, KafkaDevServiceCfg configuration) {
        Map<String, Integer> topicPartitions = configuration.topicPartitions;
        if (topicPartitions.isEmpty()) {
            return;
        }
        Map props = Map.ofEntries(Map.entry("bootstrap.servers", bootstrapServers), Map.entry("client.id", "kafka-devservices"));
        try (AdminClient adminClient = KafkaAdminClient.create(props);){
            long adminClientTimeout = configuration.topicPartitionsTimeout.toMillis();
            Set currentTopics = (Set)adminClient.listTopics().names().get(adminClientTimeout, TimeUnit.MILLISECONDS);
            Map partitions = (Map)adminClient.describeTopics((Collection)currentTopics).allTopicNames().get(adminClientTimeout, TimeUnit.MILLISECONDS);
            List<NewTopic> newTopics = topicPartitions.entrySet().stream().filter(e -> {
                TopicDescription topicDescription = (TopicDescription)partitions.get(e.getKey());
                if (topicDescription == null) {
                    return true;
                }
                log.warnf("Topic '%s' already exists with %s partition(s)", e.getKey(), (Object)topicDescription.partitions().size());
                return false;
            }).map(e -> new NewTopic((String)e.getKey(), ((Integer)e.getValue()).intValue(), 1)).collect(Collectors.toList());
            CreateTopicsResult topics = adminClient.createTopics(newTopics);
            topics.all().get(adminClientTimeout, TimeUnit.MILLISECONDS);
            HashMap newTopicPartitions = new HashMap();
            partitions.forEach((key, value) -> newTopicPartitions.put(key, value.partitions().size()));
            newTopics.forEach(t -> newTopicPartitions.put(t.name(), t.numPartitions()));
            log.infof("Dev Services for Kafka broker contains following topics with partitions: %s", newTopicPartitions);
        }
        catch (InterruptedException | ExecutionException | TimeoutException e2) {
            log.errorf((Throwable)e2, "Failed to create topics: %s", topicPartitions);
        }
    }

    private void shutdownBroker() {
        if (devService != null) {
            try {
                devService.close();
            }
            catch (Throwable e) {
                log.error((Object)"Failed to stop the Kafka broker", e);
            }
            finally {
                devService = null;
            }
        }
    }

    private DevServicesResultBuildItem.RunningDevService startKafka(DockerStatusBuildItem dockerStatusBuildItem, KafkaDevServiceCfg config, LaunchModeBuildItem launchMode, boolean useSharedNetwork, Optional<Duration> timeout) {
        if (!config.devServicesEnabled) {
            log.debug((Object)"Not starting dev services for Kafka, as it has been disabled in the config.");
            return null;
        }
        if (ConfigUtils.isPropertyPresent((String)KAFKA_BOOTSTRAP_SERVERS)) {
            log.debug((Object)"Not starting dev services for Kafka, the kafka.bootstrap.servers is configured.");
            return null;
        }
        if (!this.hasKafkaChannelWithoutBootstrapServers()) {
            log.debug((Object)"Not starting dev services for Kafka, all the channels are configured.");
            return null;
        }
        if (!dockerStatusBuildItem.isDockerAvailable()) {
            log.warn((Object)"Docker isn't working, please configure the Kafka bootstrap servers property (kafka.bootstrap.servers).");
            return null;
        }
        Optional maybeContainerAddress = kafkaContainerLocator.locateContainer(config.serviceName, config.shared, launchMode.getLaunchMode());
        Supplier<DevServicesResultBuildItem.RunningDevService> defaultKafkaBrokerSupplier = () -> {
            switch (config.provider) {
                case REDPANDA: {
                    RedpandaKafkaContainer redpanda = new RedpandaKafkaContainer(DockerImageName.parse((String)config.imageName).asCompatibleSubstituteFor("redpandadata/redpanda"), config.fixedExposedPort, launchMode.getLaunchMode() == LaunchMode.DEVELOPMENT ? config.serviceName : null, useSharedNetwork, config.redpanda);
                    timeout.ifPresent(arg_0 -> ((RedpandaKafkaContainer)redpanda).withStartupTimeout(arg_0));
                    redpanda.withEnv(config.containerEnv);
                    redpanda.start();
                    return new DevServicesResultBuildItem.RunningDevService(Feature.KAFKA_CLIENT.getName(), redpanda.getContainerId(), () -> ((RedpandaKafkaContainer)redpanda).close(), KAFKA_BOOTSTRAP_SERVERS, redpanda.getBootstrapServers());
                }
                case STRIMZI: {
                    StrimziKafkaContainer strimzi = new StrimziKafkaContainer(config.imageName).withBrokerId(1).withKraft().waitForRunning();
                    ConfigureUtil.configureSharedNetwork((GenericContainer)strimzi, (String)"kafka");
                    if (config.serviceName != null) {
                        strimzi.withLabel(DEV_SERVICE_LABEL, config.serviceName);
                    }
                    if (config.fixedExposedPort != 0) {
                        strimzi.withPort(config.fixedExposedPort.intValue());
                    }
                    timeout.ifPresent(arg_0 -> ((StrimziKafkaContainer)strimzi).withStartupTimeout(arg_0));
                    strimzi.withEnv(config.containerEnv);
                    strimzi.start();
                    return new DevServicesResultBuildItem.RunningDevService(Feature.KAFKA_CLIENT.getName(), strimzi.getContainerId(), () -> ((StrimziKafkaContainer)strimzi).close(), KAFKA_BOOTSTRAP_SERVERS, strimzi.getBootstrapServers());
                }
                case KAFKA_NATIVE: {
                    KafkaNativeContainer kafkaNative = new KafkaNativeContainer(DockerImageName.parse((String)config.imageName), config.fixedExposedPort, launchMode.getLaunchMode() == LaunchMode.DEVELOPMENT ? config.serviceName : null, useSharedNetwork);
                    timeout.ifPresent(arg_0 -> ((KafkaNativeContainer)kafkaNative).withStartupTimeout(arg_0));
                    kafkaNative.withEnv(config.containerEnv);
                    kafkaNative.start();
                    return new DevServicesResultBuildItem.RunningDevService(Feature.KAFKA_CLIENT.getName(), kafkaNative.getContainerId(), () -> ((KafkaNativeContainer)kafkaNative).close(), KAFKA_BOOTSTRAP_SERVERS, kafkaNative.getBootstrapServers());
                }
            }
            return null;
        };
        return maybeContainerAddress.map(containerAddress -> new DevServicesResultBuildItem.RunningDevService(Feature.KAFKA_CLIENT.getName(), containerAddress.getId(), null, KAFKA_BOOTSTRAP_SERVERS, containerAddress.getUrl())).orElseGet(defaultKafkaBrokerSupplier);
    }

    private boolean hasKafkaChannelWithoutBootstrapServers() {
        Config config = ConfigProvider.getConfig();
        for (String name : config.getPropertyNames()) {
            boolean isIncoming = name.startsWith("mp.messaging.incoming.");
            boolean isOutgoing = name.startsWith("mp.messaging.outgoing.");
            boolean isConnector = name.endsWith(".connector");
            boolean isKafka = isConnector && "smallrye-kafka".equals(config.getOptionalValue(name, String.class).orElse("ignored"));
            boolean isConfigured = false;
            if ((isIncoming || isOutgoing) && isKafka) {
                isConfigured = ConfigUtils.isPropertyPresent((String)name.replace(".connector", ".bootstrap.servers"));
            }
            if (isConfigured) continue;
            return true;
        }
        return false;
    }

    private KafkaDevServiceCfg getConfiguration(KafkaBuildTimeConfig cfg) {
        KafkaDevServicesBuildTimeConfig devServicesConfig = cfg.devservices;
        return new KafkaDevServiceCfg(devServicesConfig);
    }

    static {
        first = true;
    }

    private static final class KafkaDevServiceCfg {
        private final boolean devServicesEnabled;
        private final String imageName;
        private final Integer fixedExposedPort;
        private final boolean shared;
        private final String serviceName;
        private final Map<String, Integer> topicPartitions;
        private final Duration topicPartitionsTimeout;
        private final Map<String, String> containerEnv;
        private final KafkaDevServicesBuildTimeConfig.Provider provider;
        private final RedpandaBuildTimeConfig redpanda;

        public KafkaDevServiceCfg(KafkaDevServicesBuildTimeConfig config) {
            this.devServicesEnabled = config.enabled.orElse(true);
            this.provider = config.provider;
            this.imageName = config.imageName.orElseGet(this.provider::getDefaultImageName);
            this.fixedExposedPort = config.port.orElse(0);
            this.shared = config.shared;
            this.serviceName = config.serviceName;
            this.topicPartitions = config.topicPartitions;
            this.topicPartitionsTimeout = config.topicPartitionsTimeout;
            this.containerEnv = config.containerEnv;
            this.redpanda = config.redpanda;
        }

        public boolean equals(Object o) {
            if (this == o) {
                return true;
            }
            if (o == null || this.getClass() != o.getClass()) {
                return false;
            }
            KafkaDevServiceCfg that = (KafkaDevServiceCfg)o;
            return this.devServicesEnabled == that.devServicesEnabled && Objects.equals((Object)this.provider, (Object)that.provider) && Objects.equals(this.imageName, that.imageName) && Objects.equals(this.fixedExposedPort, that.fixedExposedPort) && Objects.equals(this.containerEnv, that.containerEnv);
        }

        public int hashCode() {
            return Objects.hash(new Object[]{this.devServicesEnabled, this.provider, this.imageName, this.fixedExposedPort, this.containerEnv});
        }
    }
}

