/*
 * Decompiled with CFR 0.152.
 */
package io.quarkus.kafka.streams.runtime;

import io.quarkus.arc.Unremovable;
import io.quarkus.kafka.streams.runtime.KafkaStreamsPropertiesUtil;
import io.quarkus.kafka.streams.runtime.KafkaStreamsRuntimeConfig;
import io.quarkus.kafka.streams.runtime.KafkaStreamsSupport;
import io.quarkus.kafka.streams.runtime.KafkaStreamsTopologyManager;
import io.quarkus.kafka.streams.runtime.SaslConfig;
import io.quarkus.kafka.streams.runtime.SslConfig;
import io.quarkus.kafka.streams.runtime.StoreConfig;
import io.quarkus.runtime.ShutdownEvent;
import io.quarkus.runtime.Startup;
import java.net.InetSocketAddress;
import java.time.Duration;
import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.Function;
import java.util.stream.Collectors;
import javax.enterprise.event.Observes;
import javax.enterprise.inject.Instance;
import javax.enterprise.inject.Produces;
import javax.inject.Inject;
import javax.inject.Singleton;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.admin.ListTopicsResult;
import org.apache.kafka.streams.KafkaClientSupplier;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.processor.StateRestoreListener;
import org.jboss.logging.Logger;

@Singleton
public class KafkaStreamsProducer {
    private static final Logger LOGGER = Logger.getLogger((String)KafkaStreamsProducer.class.getName());
    private static volatile boolean shutdown = false;
    private final ExecutorService executorService;
    private final KafkaStreams kafkaStreams;
    private final KafkaStreamsTopologyManager kafkaStreamsTopologyManager;
    private final Admin kafkaAdminClient;

    @Inject
    public KafkaStreamsProducer(KafkaStreamsSupport kafkaStreamsSupport, KafkaStreamsRuntimeConfig runtimeConfig, Instance<Topology> topology, Instance<KafkaClientSupplier> kafkaClientSupplier, Instance<KafkaStreams.StateListener> stateListener, Instance<StateRestoreListener> globalStateRestoreListener) {
        if (topology.isUnsatisfied()) {
            LOGGER.debug((Object)"No Topology producer; Kafka Streams will not be started");
            this.executorService = null;
            this.kafkaStreams = null;
            this.kafkaStreamsTopologyManager = null;
            this.kafkaAdminClient = null;
            return;
        }
        Properties buildTimeProperties = kafkaStreamsSupport.getProperties();
        String bootstrapServersConfig = KafkaStreamsProducer.asString(runtimeConfig.bootstrapServers);
        Properties kafkaStreamsProperties = KafkaStreamsProducer.getStreamsProperties(buildTimeProperties, bootstrapServersConfig, runtimeConfig);
        this.kafkaAdminClient = Admin.create((Properties)KafkaStreamsProducer.getAdminClientConfig(kafkaStreamsProperties));
        this.executorService = Executors.newSingleThreadExecutor();
        this.kafkaStreams = KafkaStreamsProducer.initializeKafkaStreams(kafkaStreamsProperties, runtimeConfig, this.kafkaAdminClient, (Topology)topology.get(), kafkaClientSupplier, stateListener, globalStateRestoreListener, this.executorService);
        this.kafkaStreamsTopologyManager = new KafkaStreamsTopologyManager(this.kafkaAdminClient);
    }

    @Produces
    @Singleton
    @Unremovable
    @Startup
    public KafkaStreams getKafkaStreams() {
        return this.kafkaStreams;
    }

    @Produces
    @Singleton
    @Unremovable
    @Startup
    public KafkaStreamsTopologyManager kafkaStreamsTopologyManager() {
        return this.kafkaStreamsTopologyManager;
    }

    void onStop(@Observes ShutdownEvent event) {
        shutdown = true;
        if (this.executorService != null) {
            this.executorService.shutdown();
        }
        if (this.kafkaStreams != null) {
            LOGGER.debug((Object)"Stopping Kafka Streams pipeline");
            this.kafkaStreams.close();
        }
        if (this.kafkaAdminClient != null) {
            this.kafkaAdminClient.close();
        }
    }

    private static KafkaStreams initializeKafkaStreams(Properties kafkaStreamsProperties, final KafkaStreamsRuntimeConfig runtimeConfig, final Admin adminClient, Topology topology, Instance<KafkaClientSupplier> kafkaClientSupplier, Instance<KafkaStreams.StateListener> stateListener, Instance<StateRestoreListener> globalStateRestoreListener, ExecutorService executorService) {
        final KafkaStreams kafkaStreams = kafkaClientSupplier.isUnsatisfied() ? new KafkaStreams(topology, kafkaStreamsProperties) : new KafkaStreams(topology, kafkaStreamsProperties, (KafkaClientSupplier)kafkaClientSupplier.get());
        if (!stateListener.isUnsatisfied()) {
            kafkaStreams.setStateListener((KafkaStreams.StateListener)stateListener.get());
        }
        if (!globalStateRestoreListener.isUnsatisfied()) {
            kafkaStreams.setGlobalStateRestoreListener((StateRestoreListener)globalStateRestoreListener.get());
        }
        executorService.execute(new Runnable(){

            @Override
            public void run() {
                try {
                    KafkaStreamsProducer.waitForTopicsToBeCreated(adminClient, runtimeConfig.getTrimmedTopics());
                }
                catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                    return;
                }
                if (!shutdown) {
                    LOGGER.debug((Object)"Starting Kafka Streams pipeline");
                    kafkaStreams.start();
                }
            }
        });
        return kafkaStreams;
    }

    private static Properties getStreamsProperties(Properties properties, String bootstrapServersConfig, KafkaStreamsRuntimeConfig runtimeConfig) {
        SslConfig ssl;
        Properties streamsProperties = new Properties();
        streamsProperties.putAll((Map<?, ?>)properties);
        streamsProperties.putAll((Map<?, ?>)KafkaStreamsPropertiesUtil.quarkusKafkaStreamsProperties());
        streamsProperties.putAll((Map<?, ?>)KafkaStreamsPropertiesUtil.appKafkaStreamsProperties());
        streamsProperties.put("bootstrap.servers", bootstrapServersConfig);
        streamsProperties.put("application.id", runtimeConfig.applicationId);
        if (runtimeConfig.applicationServer.isPresent()) {
            streamsProperties.put("application.server", runtimeConfig.applicationServer.get());
        }
        if (runtimeConfig.schemaRegistryUrl.isPresent()) {
            streamsProperties.put(runtimeConfig.schemaRegistryKey, runtimeConfig.schemaRegistryUrl.get());
        }
        KafkaStreamsProducer.setProperty(runtimeConfig.securityProtocol, streamsProperties, "security.protocol");
        SaslConfig sc = runtimeConfig.sasl;
        if (sc != null) {
            KafkaStreamsProducer.setProperty(sc.jaasConfig, streamsProperties, "sasl.jaas.config");
            KafkaStreamsProducer.setProperty(sc.clientCallbackHandlerClass, streamsProperties, "sasl.client.callback.handler.class");
            KafkaStreamsProducer.setProperty(sc.loginCallbackHandlerClass, streamsProperties, "sasl.login.callback.handler.class");
            KafkaStreamsProducer.setProperty(sc.loginClass, streamsProperties, "sasl.login.class");
            KafkaStreamsProducer.setProperty(sc.kerberosServiceName, streamsProperties, "sasl.kerberos.service.name");
            KafkaStreamsProducer.setProperty(sc.kerberosKinitCmd, streamsProperties, "sasl.kerberos.kinit.cmd");
            KafkaStreamsProducer.setProperty(sc.kerberosTicketRenewWindowFactor, streamsProperties, "sasl.kerberos.ticket.renew.window.factor");
            KafkaStreamsProducer.setProperty(sc.kerberosTicketRenewJitter, streamsProperties, "sasl.kerberos.ticket.renew.jitter");
            KafkaStreamsProducer.setProperty(sc.kerberosMinTimeBeforeRelogin, streamsProperties, "sasl.kerberos.min.time.before.relogin");
            KafkaStreamsProducer.setProperty(sc.loginRefreshWindowFactor, streamsProperties, "sasl.login.refresh.window.factor");
            KafkaStreamsProducer.setProperty(sc.loginRefreshWindowJitter, streamsProperties, "sasl.login.refresh.window.jitter");
            KafkaStreamsProducer.setProperty(sc.loginRefreshMinPeriod, streamsProperties, "sasl.login.refresh.min.period.seconds", DurationToSecondsFunction.INSTANCE);
            KafkaStreamsProducer.setProperty(sc.loginRefreshBuffer, streamsProperties, "sasl.login.refresh.buffer.seconds", DurationToSecondsFunction.INSTANCE);
        }
        if ((ssl = runtimeConfig.ssl) != null) {
            KafkaStreamsProducer.setProperty(ssl.protocol, streamsProperties, "ssl.protocol");
            KafkaStreamsProducer.setProperty(ssl.provider, streamsProperties, "ssl.provider");
            KafkaStreamsProducer.setProperty(ssl.cipherSuites, streamsProperties, "ssl.cipher.suites");
            KafkaStreamsProducer.setProperty(ssl.enabledProtocols, streamsProperties, "ssl.enabled.protocols");
            KafkaStreamsProducer.setStoreConfig(ssl.truststore, streamsProperties, "ssl.truststore");
            KafkaStreamsProducer.setStoreConfig(ssl.keystore, streamsProperties, "ssl.keystore");
            KafkaStreamsProducer.setStoreConfig(ssl.key, streamsProperties, "ssl.key");
            KafkaStreamsProducer.setProperty(ssl.keymanagerAlgorithm, streamsProperties, "ssl.keymanager.algorithm");
            KafkaStreamsProducer.setProperty(ssl.trustmanagerAlgorithm, streamsProperties, "ssl.trustmanager.algorithm");
            Optional<String> eia = Optional.of(ssl.endpointIdentificationAlgorithm.orElse(""));
            KafkaStreamsProducer.setProperty(eia, streamsProperties, "ssl.endpoint.identification.algorithm");
            KafkaStreamsProducer.setProperty(ssl.secureRandomImplementation, streamsProperties, "ssl.secure.random.implementation");
        }
        return streamsProperties;
    }

    private static void setStoreConfig(StoreConfig sc, Properties properties, String key) {
        if (sc != null) {
            KafkaStreamsProducer.setProperty(sc.type, properties, key + ".type");
            KafkaStreamsProducer.setProperty(sc.location, properties, key + ".location");
            KafkaStreamsProducer.setProperty(sc.password, properties, key + ".password");
        }
    }

    private static <T> void setProperty(Optional<T> property, Properties properties, String key) {
        KafkaStreamsProducer.setProperty(property, properties, key, Objects::toString);
    }

    private static <T> void setProperty(Optional<T> property, Properties properties, String key, Function<T, String> fn) {
        if (property.isPresent()) {
            properties.put(key, fn.apply(property.get()));
        }
    }

    private static String asString(List<InetSocketAddress> addresses) {
        return addresses.stream().map(KafkaStreamsProducer::toHostPort).collect(Collectors.joining(","));
    }

    private static String toHostPort(InetSocketAddress inetSocketAddress) {
        return inetSocketAddress.getHostString() + ":" + inetSocketAddress.getPort();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private static void waitForTopicsToBeCreated(Admin adminClient, Collection<String> topicsToAwait) throws InterruptedException {
        HashSet<String> lastMissingTopics = null;
        while (!shutdown) {
            try {
                ListTopicsResult topics = adminClient.listTopics();
                Set existingTopics = (Set)topics.names().get(10L, TimeUnit.SECONDS);
                if (existingTopics.containsAll(topicsToAwait)) {
                    LOGGER.debug((Object)("All expected topics created: " + topicsToAwait));
                    return;
                }
                HashSet<String> missingTopics = new HashSet<String>(topicsToAwait);
                missingTopics.removeAll(existingTopics);
                if (missingTopics.equals(lastMissingTopics)) {
                    LOGGER.debug((Object)("Waiting for topic(s) to be created: " + missingTopics));
                    continue;
                }
                LOGGER.warn((Object)("Waiting for topic(s) to be created: " + missingTopics));
                lastMissingTopics = missingTopics;
            }
            catch (ExecutionException | TimeoutException e) {
                LOGGER.error((Object)"Failed to get topic names from broker", (Throwable)e);
            }
            finally {
                Thread.sleep(1000L);
            }
        }
    }

    private static Properties getAdminClientConfig(Properties properties) {
        Properties adminClientConfig = new Properties(properties);
        for (String knownAdminClientConfig : AdminClientConfig.configNames()) {
            if (properties.containsKey("admin." + knownAdminClientConfig)) {
                adminClientConfig.put(knownAdminClientConfig, properties.get("admin." + knownAdminClientConfig));
                continue;
            }
            if (!properties.containsKey(knownAdminClientConfig)) continue;
            adminClientConfig.put(knownAdminClientConfig, properties.get(knownAdminClientConfig));
        }
        return adminClientConfig;
    }

    private static final class DurationToSecondsFunction
    implements Function<Duration, String> {
        private static final DurationToSecondsFunction INSTANCE = new DurationToSecondsFunction();

        private DurationToSecondsFunction() {
        }

        @Override
        public String apply(Duration d) {
            return String.valueOf(d.getSeconds());
        }
    }
}

