/*
 * Decompiled with CFR 0.152.
 */
package io.micronaut.configuration.kafka.streams;

import io.micronaut.configuration.kafka.streams.AbstractKafkaStreamsConfiguration;
import io.micronaut.configuration.kafka.streams.ConfiguredStreamBuilder;
import io.micronaut.configuration.kafka.streams.InteractiveQueryService;
import io.micronaut.configuration.kafka.streams.event.AfterKafkaStreamsStart;
import io.micronaut.configuration.kafka.streams.event.BeforeKafkaStreamStart;
import io.micronaut.context.annotation.Context;
import io.micronaut.context.annotation.EachBean;
import io.micronaut.context.annotation.Factory;
import io.micronaut.context.event.ApplicationEventPublisher;
import java.io.Closeable;
import java.time.Duration;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import javax.annotation.PreDestroy;
import javax.inject.Singleton;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.kstream.KStream;

@Factory
public class KafkaStreamsFactory
implements Closeable {
    private final Map<KafkaStreams, ConfiguredStreamBuilder> streams = new ConcurrentHashMap<KafkaStreams, ConfiguredStreamBuilder>();
    private final ApplicationEventPublisher eventPublisher;

    public KafkaStreamsFactory(ApplicationEventPublisher eventPublisher) {
        this.eventPublisher = eventPublisher;
    }

    @EachBean(value=AbstractKafkaStreamsConfiguration.class)
    ConfiguredStreamBuilder streamsBuilder(AbstractKafkaStreamsConfiguration<?, ?> configuration) {
        return new ConfiguredStreamBuilder(configuration.getConfig());
    }

    public Map<KafkaStreams, ConfiguredStreamBuilder> getStreams() {
        return this.streams;
    }

    @EachBean(value=ConfiguredStreamBuilder.class)
    @Context
    KafkaStreams kafkaStreams(ConfiguredStreamBuilder builder, KStream<?, ?> ... kStreams) {
        KafkaStreams kafkaStreams = new KafkaStreams(builder.build(builder.getConfiguration()), builder.getConfiguration());
        this.eventPublisher.publishEvent((Object)new BeforeKafkaStreamStart(kafkaStreams, kStreams));
        this.streams.put(kafkaStreams, builder);
        kafkaStreams.start();
        this.eventPublisher.publishEvent((Object)new AfterKafkaStreamsStart(kafkaStreams, kStreams));
        return kafkaStreams;
    }

    @Singleton
    InteractiveQueryService interactiveQueryService() {
        return new InteractiveQueryService(this.streams.keySet());
    }

    @Override
    @PreDestroy
    public void close() {
        for (KafkaStreams stream : this.streams.keySet()) {
            try {
                stream.close(Duration.ofSeconds(3L));
            }
            catch (Exception exception) {}
        }
    }
}

