/*
 * Decompiled with CFR 0.152.
 */
package io.micronaut.configuration.kafka.streams;

import io.micronaut.configuration.kafka.streams.AbstractKafkaStreamsConfiguration;
import io.micronaut.configuration.kafka.streams.ConfiguredStreamBuilder;
import io.micronaut.configuration.kafka.streams.InteractiveQueryService;
import io.micronaut.configuration.kafka.streams.event.AfterKafkaStreamsStart;
import io.micronaut.configuration.kafka.streams.event.BeforeKafkaStreamStart;
import io.micronaut.context.annotation.Context;
import io.micronaut.context.annotation.EachBean;
import io.micronaut.context.annotation.Factory;
import io.micronaut.context.annotation.Parameter;
import io.micronaut.context.annotation.Secondary;
import io.micronaut.context.event.ApplicationEventPublisher;
import jakarta.annotation.PreDestroy;
import jakarta.inject.Singleton;
import java.io.Closeable;
import java.util.Arrays;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Predicate;
import org.apache.kafka.streams.KafkaClientSupplier;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.errors.StreamsUncaughtExceptionHandler;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.processor.internals.DefaultKafkaClientSupplier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Factory
public class KafkaStreamsFactory
implements Closeable {
    private static final Logger LOG = LoggerFactory.getLogger(KafkaStreamsFactory.class);
    private static final String START_KAFKA_STREAMS_PROPERTY = "start-kafka-streams";
    private static final String UNCAUGHT_EXCEPTION_HANDLER_PROPERTY = "uncaught-exception-handler";
    private final Map<KafkaStreams, ConfiguredStreamBuilder> streams = new ConcurrentHashMap<KafkaStreams, ConfiguredStreamBuilder>();
    private final ApplicationEventPublisher eventPublisher;

    public KafkaStreamsFactory(ApplicationEventPublisher eventPublisher) {
        this.eventPublisher = eventPublisher;
    }

    @EachBean(value=AbstractKafkaStreamsConfiguration.class)
    ConfiguredStreamBuilder streamsBuilder(AbstractKafkaStreamsConfiguration<?, ?> configuration) {
        return new ConfiguredStreamBuilder(configuration.getConfig(), configuration.getName(), configuration.getCloseTimeout());
    }

    public Map<KafkaStreams, ConfiguredStreamBuilder> getStreams() {
        return this.streams;
    }

    @EachBean(value=ConfiguredStreamBuilder.class)
    @Context
    KafkaStreams kafkaStreams(@Parameter String name, ConfiguredStreamBuilder builder, KafkaClientSupplier kafkaClientSupplier, KStream<?, ?> ... kStreams) {
        Topology topology = builder.build(builder.getConfiguration());
        KafkaStreams kafkaStreams = new KafkaStreams(topology, builder.getConfiguration(), kafkaClientSupplier);
        this.makeUncaughtExceptionHandler(builder.getConfiguration()).ifPresent(arg_0 -> ((KafkaStreams)kafkaStreams).setUncaughtExceptionHandler(arg_0));
        String startKafkaStreamsValue = builder.getConfiguration().getProperty(START_KAFKA_STREAMS_PROPERTY, Boolean.TRUE.toString());
        boolean startKafkaStreams = Boolean.parseBoolean(startKafkaStreamsValue);
        if (startKafkaStreams) {
            this.eventPublisher.publishEvent((Object)new BeforeKafkaStreamStart(kafkaStreams, kStreams));
        }
        this.streams.put(kafkaStreams, builder);
        if (LOG.isDebugEnabled()) {
            LOG.debug("Initializing Application {} with topology:\n{}", (Object)name, (Object)topology.describe().toString());
        }
        if (startKafkaStreams) {
            kafkaStreams.start();
            this.eventPublisher.publishEvent((Object)new AfterKafkaStreamsStart(kafkaStreams, kStreams));
        }
        return kafkaStreams;
    }

    @Singleton
    InteractiveQueryService interactiveQueryService() {
        return new InteractiveQueryService(this.streams.keySet());
    }

    @Singleton
    @Secondary
    KafkaClientSupplier kafkaClientSupplier() {
        return new DefaultKafkaClientSupplier();
    }

    @Override
    @PreDestroy
    public void close() {
        this.streams.forEach((stream, builder) -> {
            try {
                boolean success;
                if (LOG.isInfoEnabled()) {
                    LOG.info("Shutting down kafka stream {} ", (Object)builder.getName());
                }
                if (!(success = stream.close(builder.getCloseTimeout()))) {
                    LOG.warn("Timeout was exceeded while attempting to close kafka stream {}", (Object)builder.getName());
                }
            }
            catch (Exception exception) {
                // empty catch block
            }
        });
    }

    Optional<StreamsUncaughtExceptionHandler> makeUncaughtExceptionHandler(Properties properties) {
        return Optional.ofNullable(properties.getProperty(UNCAUGHT_EXCEPTION_HANDLER_PROPERTY)).filter(Predicate.not(String::isBlank)).map(action -> {
            try {
                StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse response = StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse.valueOf((String)action.toUpperCase());
                return exception -> {
                    if (LOG.isWarnEnabled()) {
                        LOG.warn("Responding with {} to unexpected exception thrown by kafka stream thread", (Object)response, (Object)exception);
                    }
                    return response;
                };
            }
            catch (IllegalArgumentException e) {
                if (LOG.isWarnEnabled()) {
                    LOG.warn("Ignoring illegal exception handler: {}. Please use one of: {}", action, Arrays.asList(StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse.values()));
                }
                return null;
            }
        });
    }
}

