/*
 * Decompiled with CFR 0.152.
 */
package io.quarkus.kafka.client.deployment;

import io.quarkus.deployment.Feature;
import io.quarkus.deployment.IsDevServicesSupportedByLaunchMode;
import io.quarkus.deployment.annotations.BuildStep;
import io.quarkus.deployment.annotations.BuildSteps;
import io.quarkus.deployment.builditem.DevServicesComposeProjectBuildItem;
import io.quarkus.deployment.builditem.DevServicesResultBuildItem;
import io.quarkus.deployment.builditem.DevServicesSharedNetworkBuildItem;
import io.quarkus.deployment.builditem.DockerStatusBuildItem;
import io.quarkus.deployment.builditem.LaunchModeBuildItem;
import io.quarkus.deployment.builditem.Startable;
import io.quarkus.deployment.dev.devservices.DevServicesConfig;
import io.quarkus.devservices.common.ComposeLocator;
import io.quarkus.devservices.common.ConfigureUtil;
import io.quarkus.devservices.common.ContainerLocator;
import io.quarkus.devservices.common.StartableContainer;
import io.quarkus.kafka.client.deployment.KafkaBuildTimeConfig;
import io.quarkus.kafka.client.deployment.KafkaDevServicesBuildTimeConfig;
import io.quarkus.kafka.client.deployment.KafkaNativeContainer;
import io.quarkus.kafka.client.deployment.RedpandaKafkaContainer;
import io.quarkus.runtime.LaunchMode;
import io.quarkus.runtime.configuration.ConfigUtils;
import io.strimzi.test.container.StrimziKafkaContainer;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.CreateTopicsResult;
import org.apache.kafka.clients.admin.KafkaAdminClient;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.admin.TopicDescription;
import org.eclipse.microprofile.config.Config;
import org.eclipse.microprofile.config.ConfigProvider;
import org.jboss.logging.Logger;
import org.testcontainers.containers.GenericContainer;
import org.testcontainers.utility.DockerImageName;

@BuildSteps(onlyIf={IsDevServicesSupportedByLaunchMode.class, DevServicesConfig.Enabled.class})
public class DevServicesKafkaProcessor {
    private static final Logger log = Logger.getLogger(DevServicesKafkaProcessor.class);
    private static final String KAFKA_BOOTSTRAP_SERVERS = "kafka.bootstrap.servers";
    static final String DEV_SERVICE_LABEL = "quarkus-dev-service-kafka";
    static final int KAFKA_PORT = 9092;
    private static final ContainerLocator kafkaContainerLocator = ContainerLocator.locateContainerWithLabels((int)9092, (String[])new String[]{"quarkus-dev-service-kafka"});

    @BuildStep
    public DevServicesResultBuildItem startKafkaDevService(DockerStatusBuildItem dockerStatusBuildItem, DevServicesComposeProjectBuildItem compose, LaunchModeBuildItem launchMode, KafkaBuildTimeConfig kafkaClientBuildTimeConfig, List<DevServicesSharedNetworkBuildItem> sharedNetwork, DevServicesConfig devServicesConfig) {
        KafkaDevServicesBuildTimeConfig config = kafkaClientBuildTimeConfig.devservices();
        if (this.devServiceDisabled(dockerStatusBuildItem, config)) {
            return null;
        }
        boolean useSharedNetwork = DevServicesSharedNetworkBuildItem.isSharedNetworkRequired((DevServicesConfig)devServicesConfig, sharedNetwork);
        return kafkaContainerLocator.locateContainer(config.serviceName(), config.shared(), launchMode.getLaunchMode()).or(() -> ComposeLocator.locateContainer((DevServicesComposeProjectBuildItem)compose, List.of(config.effectiveImageName(), "kafka", "strimzi", "redpanda"), (int)9092, (LaunchMode)launchMode.getLaunchMode(), (boolean)useSharedNetwork)).map(containerAddress -> {
            this.createTopicPartitions(containerAddress.getUrl(), config);
            return DevServicesResultBuildItem.discovered().feature(Feature.KAFKA_CLIENT).containerId(containerAddress.getId()).config(Map.of(KAFKA_BOOTSTRAP_SERVERS, containerAddress.getUrl())).build();
        }).orElseGet(() -> DevServicesResultBuildItem.owned().feature(Feature.KAFKA_CLIENT).serviceName(config.serviceName()).serviceConfig((Object)config).startable(() -> this.createContainer(compose, config, useSharedNetwork, launchMode)).postStartHook(s -> this.logStartedAndCreateTopicPartitions(s.getConnectionInfo(), config)).configProvider(Map.of(KAFKA_BOOTSTRAP_SERVERS, Startable::getConnectionInfo)).build());
    }

    private Startable createContainer(DevServicesComposeProjectBuildItem composeProjectBuildItem, KafkaDevServicesBuildTimeConfig config, boolean useSharedNetwork, LaunchModeBuildItem launchMode) {
        KafkaNativeContainer startable = switch (config.provider()) {
            default -> throw new IncompatibleClassChangeError();
            case KafkaDevServicesBuildTimeConfig.Provider.REDPANDA -> ((RedpandaKafkaContainer)new RedpandaKafkaContainer(DockerImageName.parse((String)config.effectiveImageName()).asCompatibleSubstituteFor("redpandadata/redpanda"), config.port().orElse(0), composeProjectBuildItem.getDefaultNetworkId(), useSharedNetwork, config.redpanda()).withEnv(config.containerEnv())).withSharedServiceLabel(launchMode.getLaunchMode(), config.serviceName());
            case KafkaDevServicesBuildTimeConfig.Provider.STRIMZI -> {
                StrimziKafkaContainer strimzi = new StrimziKafkaContainer(config.effectiveImageName()).withBrokerId(1).withKraft().waitForRunning();
                String hostName = ConfigureUtil.configureNetwork((GenericContainer)strimzi, (String)composeProjectBuildItem.getDefaultNetworkId(), (boolean)useSharedNetwork, (String)"kafka");
                if (useSharedNetwork) {
                    strimzi.withBootstrapServers(c -> String.format("PLAINTEXT://%s:%s", hostName, 9092));
                }
                if (config.port().isPresent() && config.port().get() != 0) {
                    strimzi.withPort(config.port().get().intValue());
                }
                strimzi.withEnv(config.containerEnv());
                ConfigureUtil.configureSharedServiceLabel((GenericContainer)strimzi, (LaunchMode)launchMode.getLaunchMode(), (String)DEV_SERVICE_LABEL, (String)config.serviceName());
                yield new StartableContainer((GenericContainer)strimzi, StrimziKafkaContainer::getBootstrapServers);
            }
            case KafkaDevServicesBuildTimeConfig.Provider.KAFKA_NATIVE -> ((KafkaNativeContainer)new KafkaNativeContainer(DockerImageName.parse((String)config.effectiveImageName()), config.port().orElse(0), composeProjectBuildItem.getDefaultNetworkId(), useSharedNetwork).withEnv(config.containerEnv())).withSharedServiceLabel(launchMode.getLaunchMode(), config.serviceName());
        };
        return startable;
    }

    public void logStartedAndCreateTopicPartitions(String bootstrapServers, KafkaDevServicesBuildTimeConfig configuration) {
        DevServicesKafkaProcessor.logStarted(bootstrapServers);
        this.createTopicPartitions(bootstrapServers, configuration);
    }

    public void createTopicPartitions(String bootstrapServers, KafkaDevServicesBuildTimeConfig configuration) {
        Map<String, Integer> topicPartitions = configuration.topicPartitions();
        if (topicPartitions.isEmpty()) {
            return;
        }
        Map props = Map.ofEntries(Map.entry("bootstrap.servers", bootstrapServers), Map.entry("client.id", "kafka-devservices"));
        try (AdminClient adminClient = KafkaAdminClient.create(props);){
            long adminClientTimeout = configuration.topicPartitionsTimeout().toMillis();
            Set currentTopics = (Set)adminClient.listTopics().names().get(adminClientTimeout, TimeUnit.MILLISECONDS);
            Map partitions = (Map)adminClient.describeTopics((Collection)currentTopics).allTopicNames().get(adminClientTimeout, TimeUnit.MILLISECONDS);
            List<NewTopic> newTopics = topicPartitions.entrySet().stream().filter(e -> {
                TopicDescription topicDescription = (TopicDescription)partitions.get(e.getKey());
                if (topicDescription == null) {
                    return true;
                }
                log.warnf("Topic '%s' already exists with %s partition(s)", e.getKey(), (Object)topicDescription.partitions().size());
                return false;
            }).map(e -> new NewTopic((String)e.getKey(), ((Integer)e.getValue()).intValue(), 1)).collect(Collectors.toList());
            CreateTopicsResult topics = adminClient.createTopics(newTopics);
            topics.all().get(adminClientTimeout, TimeUnit.MILLISECONDS);
            HashMap newTopicPartitions = new HashMap();
            partitions.forEach((key, value) -> newTopicPartitions.put(key, value.partitions().size()));
            newTopics.forEach(t -> newTopicPartitions.put(t.name(), t.numPartitions()));
            log.infof("Dev Services for Kafka broker contains following topics with partitions: %s", newTopicPartitions);
        }
        catch (InterruptedException | ExecutionException | TimeoutException e2) {
            log.errorf((Throwable)e2, "Failed to create topics: %s", topicPartitions);
        }
    }

    private static void logStarted(String bootstrapServers) {
        log.infof("Dev Services for Kafka started. Other Quarkus applications in dev mode will find the broker automatically. For Quarkus applications in production mode, you can connect to this by starting your application with -Dkafka.bootstrap.servers=%s", (Object)bootstrapServers);
    }

    private boolean devServiceDisabled(DockerStatusBuildItem dockerStatusBuildItem, KafkaDevServicesBuildTimeConfig config) {
        if (!config.enabled().orElse(true).booleanValue()) {
            log.debug((Object)"Not starting dev services for Kafka, as it has been disabled in the config.");
            return true;
        }
        if (ConfigUtils.isPropertyNonEmpty((String)KAFKA_BOOTSTRAP_SERVERS)) {
            log.debug((Object)"Not starting dev services for Kafka, the kafka.bootstrap.servers is configured.");
            return true;
        }
        if (!this.hasKafkaChannelWithoutBootstrapServers()) {
            log.debug((Object)"Not starting dev services for Kafka, all the channels are configured.");
            return true;
        }
        if (!dockerStatusBuildItem.isContainerRuntimeAvailable()) {
            log.warn((Object)"Docker isn't working, please configure the Kafka bootstrap servers property (kafka.bootstrap.servers).");
            return true;
        }
        return false;
    }

    private boolean hasKafkaChannelWithoutBootstrapServers() {
        Config config = ConfigProvider.getConfig();
        for (String name : config.getPropertyNames()) {
            boolean isIncoming = name.startsWith("mp.messaging.incoming.");
            boolean isOutgoing = name.startsWith("mp.messaging.outgoing.");
            boolean isConnector = name.endsWith(".connector");
            boolean isKafka = isConnector && "smallrye-kafka".equals(config.getOptionalValue(name, String.class).orElse("ignored"));
            boolean isConfigured = false;
            if ((isIncoming || isOutgoing) && isKafka) {
                isConfigured = ConfigUtils.isPropertyNonEmpty((String)name.replace(".connector", ".bootstrap.servers"));
            }
            if (isConfigured) continue;
            return true;
        }
        return false;
    }
}

