/*
 * Decompiled with CFR 0.152.
 */
package io.quarkus.kafka.streams.runtime;

import io.quarkus.arc.Unremovable;
import io.quarkus.kafka.streams.runtime.KafkaStreamsPropertiesUtil;
import io.quarkus.kafka.streams.runtime.KafkaStreamsRuntimeConfig;
import io.quarkus.kafka.streams.runtime.KafkaStreamsSupport;
import io.quarkus.kafka.streams.runtime.KafkaStreamsTopologyManager;
import io.quarkus.kafka.streams.runtime.KeyConfig;
import io.quarkus.kafka.streams.runtime.KeyStoreConfig;
import io.quarkus.kafka.streams.runtime.SaslConfig;
import io.quarkus.kafka.streams.runtime.SslConfig;
import io.quarkus.kafka.streams.runtime.TrustStoreConfig;
import io.quarkus.runtime.ShutdownEvent;
import io.quarkus.runtime.StartupEvent;
import io.smallrye.common.annotation.Identifier;
import jakarta.enterprise.event.Event;
import jakarta.enterprise.event.Observes;
import jakarta.enterprise.inject.Instance;
import jakarta.enterprise.inject.Produces;
import jakarta.inject.Inject;
import jakarta.inject.Singleton;
import java.net.InetSocketAddress;
import java.time.Duration;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.admin.ListTopicsResult;
import org.apache.kafka.streams.KafkaClientSupplier;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.errors.StreamsUncaughtExceptionHandler;
import org.apache.kafka.streams.processor.StateRestoreListener;
import org.eclipse.microprofile.config.ConfigProvider;
import org.jboss.logging.Logger;

@Singleton
public class KafkaStreamsProducer {
    private static final Logger LOGGER = Logger.getLogger((String)KafkaStreamsProducer.class.getName());
    private static volatile boolean shutdown = false;
    private final ExecutorService executorService;
    private final StreamsConfig streamsConfig;
    private final KafkaStreams kafkaStreams;
    private final KafkaStreamsTopologyManager kafkaStreamsTopologyManager;
    private final Admin kafkaAdminClient;
    private final Duration topicsTimeout;
    private final List<String> trimmedTopics;

    @Inject
    public KafkaStreamsProducer(KafkaStreamsSupport kafkaStreamsSupport, KafkaStreamsRuntimeConfig runtimeConfig, ExecutorService executorService, Instance<Topology> topology, Instance<KafkaClientSupplier> kafkaClientSupplier, @Identifier(value="default-kafka-broker") Instance<Map<String, Object>> defaultConfiguration, Instance<KafkaStreams.StateListener> stateListener, Instance<StateRestoreListener> globalStateRestoreListener, Instance<StreamsUncaughtExceptionHandler> uncaughtExceptionHandlerListener) {
        shutdown = false;
        if (topology.isUnsatisfied()) {
            LOGGER.warn((Object)"No Topology producer; Kafka Streams will not be started");
            this.executorService = null;
            this.streamsConfig = null;
            this.kafkaStreams = null;
            this.kafkaStreamsTopologyManager = null;
            this.kafkaAdminClient = null;
            this.topicsTimeout = null;
            this.trimmedTopics = null;
            return;
        }
        Properties buildTimeProperties = kafkaStreamsSupport.getProperties();
        String bootstrapServersConfig = KafkaStreamsProducer.asString(runtimeConfig.bootstrapServers);
        if ("localhost:9092".equalsIgnoreCase(bootstrapServersConfig)) {
            bootstrapServersConfig = ConfigProvider.getConfig().getOptionalValue("kafka.bootstrap.servers", String.class).orElse(bootstrapServersConfig);
        }
        Map cfg = Collections.emptyMap();
        if (!defaultConfiguration.isUnsatisfied()) {
            cfg = (Map)defaultConfiguration.get();
        }
        Properties kafkaStreamsProperties = KafkaStreamsProducer.getStreamsProperties(buildTimeProperties, cfg, bootstrapServersConfig, runtimeConfig);
        this.kafkaAdminClient = Admin.create((Properties)KafkaStreamsProducer.getAdminClientConfig(kafkaStreamsProperties));
        this.executorService = executorService;
        this.topicsTimeout = runtimeConfig.topicsTimeout;
        this.trimmedTopics = this.isTopicsCheckEnabled() ? runtimeConfig.getTrimmedTopics() : Collections.emptyList();
        this.streamsConfig = new StreamsConfig((Map)kafkaStreamsProperties);
        this.kafkaStreams = KafkaStreamsProducer.initializeKafkaStreams(this.streamsConfig, (Topology)topology.get(), kafkaClientSupplier, stateListener, globalStateRestoreListener, uncaughtExceptionHandlerListener);
        this.kafkaStreamsTopologyManager = new KafkaStreamsTopologyManager(this.kafkaAdminClient);
    }

    private boolean isTopicsCheckEnabled() {
        return this.topicsTimeout.compareTo(Duration.ZERO) > 0;
    }

    public void onStartup(@Observes StartupEvent event, Event<KafkaStreams> kafkaStreamsEvent) {
        if (this.kafkaStreams != null) {
            kafkaStreamsEvent.fire((Object)this.kafkaStreams);
            this.executorService.execute(() -> {
                if (this.isTopicsCheckEnabled()) {
                    try {
                        KafkaStreamsProducer.waitForTopicsToBeCreated(this.kafkaAdminClient, this.trimmedTopics, this.topicsTimeout);
                    }
                    catch (InterruptedException e) {
                        Thread.currentThread().interrupt();
                        return;
                    }
                }
                if (!shutdown) {
                    LOGGER.debug((Object)"Starting Kafka Streams pipeline");
                    this.kafkaStreams.start();
                }
            });
        }
    }

    @Produces
    @Singleton
    @Unremovable
    public KafkaStreams getKafkaStreams() {
        return this.kafkaStreams;
    }

    @Produces
    @Singleton
    @Unremovable
    public StreamsConfig getStreamsConfig() {
        return this.streamsConfig;
    }

    @Produces
    @Singleton
    @Unremovable
    public KafkaStreamsTopologyManager kafkaStreamsTopologyManager() {
        return this.kafkaStreamsTopologyManager;
    }

    void onStop(@Observes ShutdownEvent event) {
        shutdown = true;
        if (this.kafkaStreams != null) {
            LOGGER.debug((Object)"Stopping Kafka Streams pipeline");
            this.kafkaStreams.close();
        }
        if (this.kafkaAdminClient != null) {
            this.kafkaAdminClient.close(Duration.ZERO);
        }
    }

    private static KafkaStreams initializeKafkaStreams(StreamsConfig streamsConfig, Topology topology, Instance<KafkaClientSupplier> kafkaClientSupplier, Instance<KafkaStreams.StateListener> stateListener, Instance<StateRestoreListener> globalStateRestoreListener, Instance<StreamsUncaughtExceptionHandler> uncaughtExceptionHandlerListener) {
        KafkaStreams kafkaStreams = kafkaClientSupplier.isUnsatisfied() ? new KafkaStreams(topology, streamsConfig) : new KafkaStreams(topology, streamsConfig, (KafkaClientSupplier)kafkaClientSupplier.get());
        if (!stateListener.isUnsatisfied()) {
            kafkaStreams.setStateListener((KafkaStreams.StateListener)stateListener.get());
        }
        if (!globalStateRestoreListener.isUnsatisfied()) {
            kafkaStreams.setGlobalStateRestoreListener((StateRestoreListener)globalStateRestoreListener.get());
        }
        if (!uncaughtExceptionHandlerListener.isUnsatisfied()) {
            kafkaStreams.setUncaughtExceptionHandler((StreamsUncaughtExceptionHandler)uncaughtExceptionHandlerListener.get());
        }
        return kafkaStreams;
    }

    private static Properties getStreamsProperties(Properties properties, Map<String, Object> cfg, String bootstrapServersConfig, KafkaStreamsRuntimeConfig runtimeConfig) {
        SslConfig ssl;
        Properties streamsProperties = new Properties();
        streamsProperties.putAll((Map<?, ?>)properties);
        streamsProperties.putAll(cfg);
        streamsProperties.putAll((Map<?, ?>)KafkaStreamsPropertiesUtil.quarkusKafkaStreamsProperties());
        streamsProperties.putAll((Map<?, ?>)KafkaStreamsPropertiesUtil.appKafkaStreamsProperties());
        streamsProperties.put("bootstrap.servers", bootstrapServersConfig);
        streamsProperties.put("application.id", runtimeConfig.applicationId);
        if (runtimeConfig.applicationServer.isPresent()) {
            streamsProperties.put("application.server", runtimeConfig.applicationServer.get());
        }
        if (runtimeConfig.schemaRegistryUrl.isPresent()) {
            streamsProperties.put(runtimeConfig.schemaRegistryKey, runtimeConfig.schemaRegistryUrl.get());
        }
        KafkaStreamsProducer.setProperty(runtimeConfig.securityProtocol, streamsProperties, "security.protocol");
        SaslConfig sc = runtimeConfig.sasl;
        if (sc != null) {
            KafkaStreamsProducer.setProperty(sc.mechanism, streamsProperties, "sasl.mechanism");
            KafkaStreamsProducer.setProperty(sc.jaasConfig, streamsProperties, "sasl.jaas.config");
            KafkaStreamsProducer.setProperty(sc.clientCallbackHandlerClass, streamsProperties, "sasl.client.callback.handler.class");
            KafkaStreamsProducer.setProperty(sc.loginCallbackHandlerClass, streamsProperties, "sasl.login.callback.handler.class");
            KafkaStreamsProducer.setProperty(sc.loginClass, streamsProperties, "sasl.login.class");
            KafkaStreamsProducer.setProperty(sc.kerberosServiceName, streamsProperties, "sasl.kerberos.service.name");
            KafkaStreamsProducer.setProperty(sc.kerberosKinitCmd, streamsProperties, "sasl.kerberos.kinit.cmd");
            KafkaStreamsProducer.setProperty(sc.kerberosTicketRenewWindowFactor, streamsProperties, "sasl.kerberos.ticket.renew.window.factor");
            KafkaStreamsProducer.setProperty(sc.kerberosTicketRenewJitter, streamsProperties, "sasl.kerberos.ticket.renew.jitter");
            KafkaStreamsProducer.setProperty(sc.kerberosMinTimeBeforeRelogin, streamsProperties, "sasl.kerberos.min.time.before.relogin");
            KafkaStreamsProducer.setProperty(sc.loginRefreshWindowFactor, streamsProperties, "sasl.login.refresh.window.factor");
            KafkaStreamsProducer.setProperty(sc.loginRefreshWindowJitter, streamsProperties, "sasl.login.refresh.window.jitter");
            KafkaStreamsProducer.setProperty(sc.loginRefreshMinPeriod, streamsProperties, "sasl.login.refresh.min.period.seconds", DurationToSecondsFunction.INSTANCE);
            KafkaStreamsProducer.setProperty(sc.loginRefreshBuffer, streamsProperties, "sasl.login.refresh.buffer.seconds", DurationToSecondsFunction.INSTANCE);
        }
        if ((ssl = runtimeConfig.ssl) != null) {
            KafkaStreamsProducer.setProperty(ssl.protocol, streamsProperties, "ssl.protocol");
            KafkaStreamsProducer.setProperty(ssl.provider, streamsProperties, "ssl.provider");
            KafkaStreamsProducer.setProperty(ssl.cipherSuites, streamsProperties, "ssl.cipher.suites");
            KafkaStreamsProducer.setProperty(ssl.enabledProtocols, streamsProperties, "ssl.enabled.protocols");
            KafkaStreamsProducer.setTrustStoreConfig(ssl.truststore, streamsProperties);
            KafkaStreamsProducer.setKeyStoreConfig(ssl.keystore, streamsProperties);
            KafkaStreamsProducer.setKeyConfig(ssl.key, streamsProperties);
            KafkaStreamsProducer.setProperty(ssl.keymanagerAlgorithm, streamsProperties, "ssl.keymanager.algorithm");
            KafkaStreamsProducer.setProperty(ssl.trustmanagerAlgorithm, streamsProperties, "ssl.trustmanager.algorithm");
            Optional<String> eia = Optional.of(ssl.endpointIdentificationAlgorithm.orElse(""));
            KafkaStreamsProducer.setProperty(eia, streamsProperties, "ssl.endpoint.identification.algorithm");
            KafkaStreamsProducer.setProperty(ssl.secureRandomImplementation, streamsProperties, "ssl.secure.random.implementation");
        }
        return streamsProperties;
    }

    private static void setTrustStoreConfig(TrustStoreConfig tsc, Properties properties) {
        if (tsc != null) {
            KafkaStreamsProducer.setProperty(tsc.type, properties, "ssl.truststore.type");
            KafkaStreamsProducer.setProperty(tsc.location, properties, "ssl.truststore.location");
            KafkaStreamsProducer.setProperty(tsc.password, properties, "ssl.truststore.password");
            KafkaStreamsProducer.setProperty(tsc.certificates, properties, "ssl.truststore.certificates");
        }
    }

    private static void setKeyStoreConfig(KeyStoreConfig ksc, Properties properties) {
        if (ksc != null) {
            KafkaStreamsProducer.setProperty(ksc.type, properties, "ssl.keystore.type");
            KafkaStreamsProducer.setProperty(ksc.location, properties, "ssl.keystore.location");
            KafkaStreamsProducer.setProperty(ksc.password, properties, "ssl.keystore.password");
            KafkaStreamsProducer.setProperty(ksc.key, properties, "ssl.keystore.key");
            KafkaStreamsProducer.setProperty(ksc.certificateChain, properties, "ssl.keystore.certificate.chain");
        }
    }

    private static void setKeyConfig(KeyConfig kc, Properties properties) {
        if (kc != null) {
            KafkaStreamsProducer.setProperty(kc.password, properties, "ssl.key.password");
        }
    }

    private static <T> void setProperty(Optional<T> property, Properties properties, String key) {
        KafkaStreamsProducer.setProperty(property, properties, key, Objects::toString);
    }

    private static <T> void setProperty(Optional<T> property, Properties properties, String key, Function<T, String> fn) {
        if (property.isPresent()) {
            properties.put(key, fn.apply(property.get()));
        }
    }

    private static String asString(List<InetSocketAddress> addresses) {
        return addresses.stream().map(KafkaStreamsProducer::toHostPort).collect(Collectors.joining(","));
    }

    private static String toHostPort(InetSocketAddress inetSocketAddress) {
        return inetSocketAddress.getHostString() + ":" + inetSocketAddress.getPort();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private static void waitForTopicsToBeCreated(Admin adminClient, Collection<String> topicsToAwait, Duration timeout) throws InterruptedException {
        HashSet<String> lastMissingTopics = null;
        while (!shutdown) {
            try {
                ListTopicsResult topics = adminClient.listTopics();
                Set existingTopics = (Set)topics.names().get(timeout.toMillis(), TimeUnit.MILLISECONDS);
                if (existingTopics.containsAll(topicsToAwait)) {
                    LOGGER.debug((Object)("All expected topics created: " + String.valueOf(topicsToAwait)));
                    return;
                }
                HashSet<String> missingTopics = new HashSet<String>(topicsToAwait);
                missingTopics.removeAll(existingTopics);
                if (missingTopics.equals(lastMissingTopics)) {
                    LOGGER.debug((Object)("Waiting for topic(s) to be created: " + String.valueOf(missingTopics)));
                    continue;
                }
                LOGGER.warn((Object)("Waiting for topic(s) to be created: " + String.valueOf(missingTopics)));
                lastMissingTopics = missingTopics;
            }
            catch (ExecutionException | TimeoutException e) {
                LOGGER.error((Object)"Failed to get topic names from broker", (Throwable)e);
            }
            finally {
                Thread.sleep(1000L);
            }
        }
    }

    private static Properties getAdminClientConfig(Properties properties) {
        Properties adminClientConfig = new Properties(properties);
        if (properties.containsKey("tls-configuration-name")) {
            adminClientConfig.put("tls-configuration-name", properties.get("tls-configuration-name"));
        }
        for (String knownAdminClientConfig : AdminClientConfig.configNames()) {
            if (properties.containsKey("admin." + knownAdminClientConfig)) {
                adminClientConfig.put(knownAdminClientConfig, properties.get("admin." + knownAdminClientConfig));
                continue;
            }
            if (!properties.containsKey(knownAdminClientConfig)) continue;
            adminClientConfig.put(knownAdminClientConfig, properties.get(knownAdminClientConfig));
        }
        return adminClientConfig;
    }

    private static final class DurationToSecondsFunction
    implements Function<Duration, String> {
        private static final DurationToSecondsFunction INSTANCE = new DurationToSecondsFunction();

        private DurationToSecondsFunction() {
        }

        @Override
        public String apply(Duration d) {
            return String.valueOf(d.getSeconds());
        }
    }
}

