/*
 * Decompiled with CFR 0.152.
 */
package io.micronaut.configuration.kafka.streams;

import io.micronaut.configuration.kafka.streams.AbstractKafkaStreamsConfiguration;
import io.micronaut.configuration.kafka.streams.ConfiguredStreamBuilder;
import io.micronaut.configuration.kafka.streams.InteractiveQueryService;
import io.micronaut.configuration.kafka.streams.event.AfterKafkaStreamsStart;
import io.micronaut.configuration.kafka.streams.event.BeforeKafkaStreamStart;
import io.micronaut.context.annotation.Context;
import io.micronaut.context.annotation.EachBean;
import io.micronaut.context.annotation.Factory;
import io.micronaut.context.annotation.Parameter;
import io.micronaut.context.event.ApplicationEventPublisher;
import jakarta.inject.Singleton;
import java.io.Closeable;
import java.time.Duration;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import javax.annotation.PreDestroy;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.kstream.KStream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Factory
public class KafkaStreamsFactory
implements Closeable {
    private static final Logger LOG = LoggerFactory.getLogger(KafkaStreamsFactory.class);
    private static final String START_KAFKA_STREAMS_PROPERTY = "start-kafka-streams";
    private final Map<KafkaStreams, ConfiguredStreamBuilder> streams = new ConcurrentHashMap<KafkaStreams, ConfiguredStreamBuilder>();
    private final ApplicationEventPublisher eventPublisher;

    public KafkaStreamsFactory(ApplicationEventPublisher eventPublisher) {
        this.eventPublisher = eventPublisher;
    }

    @EachBean(value=AbstractKafkaStreamsConfiguration.class)
    ConfiguredStreamBuilder streamsBuilder(AbstractKafkaStreamsConfiguration<?, ?> configuration) {
        return new ConfiguredStreamBuilder(configuration.getConfig());
    }

    public Map<KafkaStreams, ConfiguredStreamBuilder> getStreams() {
        return this.streams;
    }

    @EachBean(value=ConfiguredStreamBuilder.class)
    @Context
    KafkaStreams kafkaStreams(@Parameter String name, ConfiguredStreamBuilder builder, KStream<?, ?> ... kStreams) {
        Topology topology = builder.build(builder.getConfiguration());
        KafkaStreams kafkaStreams = new KafkaStreams(topology, builder.getConfiguration());
        String startKafkaStreamsValue = builder.getConfiguration().getProperty(START_KAFKA_STREAMS_PROPERTY, Boolean.TRUE.toString());
        boolean startKafkaStreams = Boolean.parseBoolean(startKafkaStreamsValue);
        if (startKafkaStreams) {
            this.eventPublisher.publishEvent((Object)new BeforeKafkaStreamStart(kafkaStreams, kStreams));
        }
        this.streams.put(kafkaStreams, builder);
        if (LOG.isDebugEnabled()) {
            LOG.debug("Initializing Application {} with topology:\n{}", (Object)name, (Object)topology.describe().toString());
        }
        if (startKafkaStreams) {
            kafkaStreams.start();
            this.eventPublisher.publishEvent((Object)new AfterKafkaStreamsStart(kafkaStreams, kStreams));
        }
        return kafkaStreams;
    }

    @Singleton
    InteractiveQueryService interactiveQueryService() {
        return new InteractiveQueryService(this.streams.keySet());
    }

    @Override
    @PreDestroy
    public void close() {
        for (KafkaStreams stream : this.streams.keySet()) {
            try {
                stream.close(Duration.ofSeconds(3L));
            }
            catch (Exception exception) {}
        }
    }
}

