/*
 * Decompiled with CFR 0.152.
 */
package io.quarkus.kafka.client.deployment;

import com.github.dockerjava.api.command.InspectContainerResponse;
import io.quarkus.bootstrap.classloading.QuarkusClassLoader;
import io.quarkus.builder.item.BuildItem;
import io.quarkus.deployment.IsDockerWorking;
import io.quarkus.deployment.IsNormal;
import io.quarkus.deployment.annotations.BuildProducer;
import io.quarkus.deployment.annotations.BuildStep;
import io.quarkus.deployment.builditem.DevServicesNativeConfigResultBuildItem;
import io.quarkus.deployment.builditem.LaunchModeBuildItem;
import io.quarkus.deployment.builditem.RunTimeConfigurationDefaultBuildItem;
import io.quarkus.deployment.builditem.ServiceStartBuildItem;
import io.quarkus.devservices.common.ContainerLocator;
import io.quarkus.kafka.client.deployment.DevServicesKafkaBrokerBuildItem;
import io.quarkus.kafka.client.deployment.KafkaBuildTimeConfig;
import io.quarkus.kafka.client.deployment.KafkaDevServicesBuildTimeConfig;
import io.quarkus.runtime.LaunchMode;
import io.quarkus.runtime.configuration.ConfigUtils;
import java.io.Closeable;
import java.nio.charset.StandardCharsets;
import java.util.Objects;
import java.util.Optional;
import java.util.function.Supplier;
import org.eclipse.microprofile.config.Config;
import org.eclipse.microprofile.config.ConfigProvider;
import org.jboss.logging.Logger;
import org.testcontainers.containers.GenericContainer;
import org.testcontainers.containers.Network;
import org.testcontainers.containers.wait.strategy.Wait;
import org.testcontainers.containers.wait.strategy.WaitStrategy;
import org.testcontainers.images.builder.Transferable;
import org.testcontainers.utility.DockerImageName;

public class DevServicesKafkaProcessor {
    private static final Logger log = Logger.getLogger(DevServicesKafkaProcessor.class);
    private static final int KAFKA_PORT = 9092;
    private static final String KAFKA_BOOTSTRAP_SERVERS = "kafka.bootstrap.servers";
    private static final String DEV_SERVICE_LABEL = "quarkus-dev-service-kafka";
    private static final ContainerLocator kafkaContainerLocator = new ContainerLocator("quarkus-dev-service-kafka", 9092);
    static volatile Closeable closeable;
    static volatile KafkaDevServiceCfg cfg;
    static volatile boolean first;
    private final IsDockerWorking isDockerWorking = new IsDockerWorking(true);

    @BuildStep(onlyIfNot={IsNormal.class})
    public DevServicesKafkaBrokerBuildItem startKafkaDevService(LaunchModeBuildItem launchMode, KafkaBuildTimeConfig kafkaClientBuildTimeConfig, BuildProducer<RunTimeConfigurationDefaultBuildItem> runTimeConfiguration, BuildProducer<DevServicesNativeConfigResultBuildItem> devServicePropertiesProducer, BuildProducer<ServiceStartBuildItem> serviceStartBuildItemBuildProducer) {
        KafkaDevServiceCfg configuration = this.getConfiguration(kafkaClientBuildTimeConfig);
        if (closeable != null) {
            boolean shouldShutdownTheBroker;
            boolean bl = shouldShutdownTheBroker = launchMode.getLaunchMode() == LaunchMode.TEST;
            if (!shouldShutdownTheBroker) {
                boolean bl2 = shouldShutdownTheBroker = !configuration.equals(cfg);
            }
            if (!shouldShutdownTheBroker) {
                return null;
            }
            this.shutdownBroker();
            cfg = null;
        }
        KafkaBroker kafkaBroker = this.startKafka(configuration, launchMode);
        DevServicesKafkaBrokerBuildItem bootstrapServers = null;
        if (kafkaBroker != null) {
            closeable = kafkaBroker.getCloseable();
            runTimeConfiguration.produce((BuildItem)new RunTimeConfigurationDefaultBuildItem(KAFKA_BOOTSTRAP_SERVERS, kafkaBroker.getBootstrapServers()));
            bootstrapServers = new DevServicesKafkaBrokerBuildItem(kafkaBroker.getBootstrapServers());
        }
        if (first) {
            first = false;
            Runnable closeTask = () -> {
                if (closeable != null) {
                    this.shutdownBroker();
                }
                first = true;
                closeable = null;
                cfg = null;
            };
            QuarkusClassLoader cl = (QuarkusClassLoader)Thread.currentThread().getContextClassLoader();
            ((QuarkusClassLoader)cl.parent()).addCloseTask(closeTask);
            Thread closeHookThread = new Thread(closeTask, "Kafka container shutdown thread");
            Runtime.getRuntime().addShutdownHook(closeHookThread);
            ((QuarkusClassLoader)cl.parent()).addCloseTask(() -> Runtime.getRuntime().removeShutdownHook(closeHookThread));
        }
        cfg = configuration;
        if (bootstrapServers != null) {
            if (kafkaBroker.isOwner()) {
                log.infof("Dev Services for Kafka started. Other Quarkus applications in dev mode will find the broker automatically. For Quarkus applications in production mode, you can connect to this by starting your application with -Dkafka.bootstrap.servers=%s", (Object)bootstrapServers.getBootstrapServers());
            }
            devServicePropertiesProducer.produce((BuildItem)new DevServicesNativeConfigResultBuildItem(KAFKA_BOOTSTRAP_SERVERS, bootstrapServers.getBootstrapServers()));
        }
        return bootstrapServers;
    }

    private void shutdownBroker() {
        if (closeable != null) {
            try {
                closeable.close();
            }
            catch (Throwable e) {
                log.error((Object)"Failed to stop the Kafka broker", e);
            }
            finally {
                closeable = null;
            }
        }
    }

    private KafkaBroker startKafka(KafkaDevServiceCfg config, LaunchModeBuildItem launchMode) {
        if (!config.devServicesEnabled) {
            log.debug((Object)"Not starting dev services for Kafka, as it has been disabled in the config.");
            return null;
        }
        if (ConfigUtils.isPropertyPresent((String)KAFKA_BOOTSTRAP_SERVERS)) {
            log.debug((Object)"Not starting dev services for Kafka, the kafka.bootstrap.servers is configured.");
            return null;
        }
        if (!this.hasKafkaChannelWithoutBootstrapServers()) {
            log.debug((Object)"Not starting dev services for Kafka, all the channels are configured.");
            return null;
        }
        if (!this.isDockerWorking.getAsBoolean()) {
            log.warn((Object)"Docker isn't working, please configure the Kafka bootstrap servers property (kafka.bootstrap.servers).");
            return null;
        }
        Optional maybeContainerAddress = kafkaContainerLocator.locateContainer(config.serviceName, config.shared, launchMode.getLaunchMode());
        Supplier<KafkaBroker> defaultKafkaBrokerSupplier = () -> {
            RedPandaKafkaContainer container = new RedPandaKafkaContainer(DockerImageName.parse((String)config.imageName), config.fixedExposedPort, launchMode.getLaunchMode() == LaunchMode.DEVELOPMENT ? config.serviceName : null);
            container.start();
            return new KafkaBroker(container.getBootstrapServers(), () -> ((RedPandaKafkaContainer)container).close());
        };
        return maybeContainerAddress.map(containerAddress -> new KafkaBroker(containerAddress.getUrl(), null)).orElseGet(defaultKafkaBrokerSupplier);
    }

    private boolean hasKafkaChannelWithoutBootstrapServers() {
        Config config = ConfigProvider.getConfig();
        for (String name : config.getPropertyNames()) {
            boolean isIncoming = name.startsWith("mp.messaging.incoming.");
            boolean isOutgoing = name.startsWith("mp.messaging.outgoing.");
            boolean isConnector = name.endsWith(".connector");
            boolean isKafka = isConnector && "smallrye-kafka".equals(config.getOptionalValue(name, String.class).orElse("ignored"));
            boolean isConfigured = false;
            if ((isIncoming || isOutgoing) && isKafka) {
                isConfigured = ConfigUtils.isPropertyPresent((String)name.replace(".connector", ".bootstrap.servers"));
            }
            if (isConfigured) continue;
            return true;
        }
        return false;
    }

    private KafkaDevServiceCfg getConfiguration(KafkaBuildTimeConfig cfg) {
        KafkaDevServicesBuildTimeConfig devServicesConfig = cfg.devservices;
        return new KafkaDevServiceCfg(devServicesConfig);
    }

    static {
        first = true;
    }

    private static final class RedPandaKafkaContainer
    extends GenericContainer<RedPandaKafkaContainer> {
        private final int port;
        private static final String STARTER_SCRIPT = "/redpanda.sh";

        private RedPandaKafkaContainer(DockerImageName dockerImageName, int fixedExposedPort, String serviceName) {
            super(dockerImageName);
            this.port = fixedExposedPort;
            this.withNetwork(Network.SHARED);
            this.withExposedPorts(new Integer[]{9092});
            if (serviceName != null) {
                this.withLabel(DevServicesKafkaProcessor.DEV_SERVICE_LABEL, serviceName);
            }
            if (!dockerImageName.getRepository().equals("vectorized/redpanda")) {
                throw new IllegalArgumentException("Only vectorized/redpanda images are supported");
            }
            this.withCreateContainerCmdModifier(cmd -> cmd.withEntrypoint(new String[]{"sh"}));
            this.withCommand(new String[]{"-c", "while [ ! -f /redpanda.sh ]; do sleep 0.1; done; /redpanda.sh"});
            this.waitingFor((WaitStrategy)Wait.forLogMessage((String)".*Started Kafka API server.*", (int)1));
        }

        protected void containerIsStarting(InspectContainerResponse containerInfo, boolean reused) {
            super.containerIsStarting(containerInfo, reused);
            Object command = "#!/bin/bash\n";
            command = (String)command + "/usr/bin/rpk redpanda start --check=false --node-id 0 ";
            command = (String)command + "--kafka-addr PLAINTEXT://0.0.0.0:29092,OUTSIDE://0.0.0.0:9092 ";
            command = (String)command + "--advertise-kafka-addr PLAINTEXT://kafka:29092,OUTSIDE://" + this.getHost() + ":" + this.getMappedPort(9092);
            this.copyFileToContainer(Transferable.of((byte[])((String)command).getBytes(StandardCharsets.UTF_8), (int)511), STARTER_SCRIPT);
        }

        protected void configure() {
            super.configure();
            if (this.port > 0) {
                this.addFixedExposedPort(this.port, 9092);
            }
        }

        public String getBootstrapServers() {
            return String.format("PLAINTEXT://%s:%s", this.getContainerIpAddress(), this.getMappedPort(9092));
        }
    }

    private static final class KafkaDevServiceCfg {
        private final boolean devServicesEnabled;
        private final String imageName;
        private final Integer fixedExposedPort;
        private final boolean shared;
        private final String serviceName;

        public KafkaDevServiceCfg(KafkaDevServicesBuildTimeConfig config) {
            this.devServicesEnabled = config.enabled.orElse(true);
            this.imageName = config.imageName;
            this.fixedExposedPort = config.port.orElse(0);
            this.shared = config.shared;
            this.serviceName = config.serviceName;
        }

        public boolean equals(Object o) {
            if (this == o) {
                return true;
            }
            if (o == null || this.getClass() != o.getClass()) {
                return false;
            }
            KafkaDevServiceCfg that = (KafkaDevServiceCfg)o;
            return this.devServicesEnabled == that.devServicesEnabled && Objects.equals(this.imageName, that.imageName) && Objects.equals(this.fixedExposedPort, that.fixedExposedPort);
        }

        public int hashCode() {
            return Objects.hash(this.devServicesEnabled, this.imageName, this.fixedExposedPort);
        }
    }

    private static class KafkaBroker {
        private final String url;
        private final Closeable closeable;

        public KafkaBroker(String url, Closeable closeable) {
            this.url = url;
            this.closeable = closeable;
        }

        public boolean isOwner() {
            return this.closeable != null;
        }

        public String getBootstrapServers() {
            return this.url;
        }

        public Closeable getCloseable() {
            return this.closeable;
        }
    }
}

