/*
 * Decompiled with CFR 0.152.
 */
package org.springframework.kafka.config;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.commons.logging.LogFactory;
import org.apache.kafka.streams.KafkaClientSupplier;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.TopologyConfig;
import org.apache.kafka.streams.TopologyDescription;
import org.apache.kafka.streams.errors.StreamsUncaughtExceptionHandler;
import org.apache.kafka.streams.processor.StateRestoreListener;
import org.apache.kafka.streams.processor.internals.DefaultKafkaClientSupplier;
import org.jspecify.annotations.Nullable;
import org.springframework.beans.factory.BeanNameAware;
import org.springframework.beans.factory.SmartInitializingSingleton;
import org.springframework.beans.factory.config.AbstractFactoryBean;
import org.springframework.context.SmartLifecycle;
import org.springframework.core.log.LogAccessor;
import org.springframework.kafka.KafkaException;
import org.springframework.kafka.config.KafkaStreamsConfiguration;
import org.springframework.kafka.config.KafkaStreamsCustomizer;
import org.springframework.kafka.config.KafkaStreamsInfrastructureCustomizer;
import org.springframework.kafka.core.CleanupConfig;
import org.springframework.util.Assert;

public class StreamsBuilderFactoryBean
extends AbstractFactoryBean<StreamsBuilder>
implements SmartLifecycle,
BeanNameAware,
SmartInitializingSingleton {
    public static final Duration DEFAULT_CLOSE_TIMEOUT = Duration.ofSeconds(10L);
    private static final LogAccessor LOGGER = new LogAccessor(LogFactory.getLog(StreamsBuilderFactoryBean.class));
    private static final String STREAMS_CONFIG_MUST_NOT_BE_NULL = "'streamsConfig' must not be null";
    private static final String CLEANUP_CONFIG_MUST_NOT_BE_NULL = "'cleanupConfig' must not be null";
    private final ReentrantLock lifecycleLock = new ReentrantLock();
    private final List<Listener> listeners = new ArrayList<Listener>();
    private KafkaClientSupplier clientSupplier = new DefaultKafkaClientSupplier();
    private Properties properties;
    private CleanupConfig cleanupConfig;
    private KafkaStreamsInfrastructureCustomizer infrastructureCustomizer = new KafkaStreamsInfrastructureCustomizer(){};
    private KafkaStreamsCustomizer kafkaStreamsCustomizer;
    private KafkaStreams.StateListener stateListener;
    private @Nullable StateRestoreListener stateRestoreListener;
    private @Nullable StreamsUncaughtExceptionHandler streamsUncaughtExceptionHandler;
    private boolean autoStartup;
    private int phase;
    private Duration closeTimeout;
    private boolean leaveGroupOnClose;
    private @Nullable KafkaStreams kafkaStreams;
    private volatile boolean running;
    private Topology topology;
    private String beanName;

    public StreamsBuilderFactoryBean() {
        this.kafkaStreamsCustomizer = kafkaStreams -> {};
        this.autoStartup = true;
        this.phase = 2147482647;
        this.closeTimeout = DEFAULT_CLOSE_TIMEOUT;
        this.leaveGroupOnClose = false;
        this.cleanupConfig = new CleanupConfig();
    }

    public StreamsBuilderFactoryBean(KafkaStreamsConfiguration streamsConfig, CleanupConfig cleanupConfig) {
        this.kafkaStreamsCustomizer = kafkaStreams -> {};
        this.autoStartup = true;
        this.phase = 2147482647;
        this.closeTimeout = DEFAULT_CLOSE_TIMEOUT;
        this.leaveGroupOnClose = false;
        Assert.notNull((Object)streamsConfig, (String)STREAMS_CONFIG_MUST_NOT_BE_NULL);
        Assert.notNull((Object)cleanupConfig, (String)CLEANUP_CONFIG_MUST_NOT_BE_NULL);
        this.properties = streamsConfig.asProperties();
        this.cleanupConfig = cleanupConfig;
    }

    public StreamsBuilderFactoryBean(KafkaStreamsConfiguration streamsConfig) {
        this(streamsConfig, new CleanupConfig());
    }

    public synchronized void setBeanName(String name) {
        this.beanName = name;
    }

    public void setStreamsConfiguration(Properties streamsConfig) {
        Assert.notNull((Object)streamsConfig, (String)STREAMS_CONFIG_MUST_NOT_BE_NULL);
        this.properties = streamsConfig;
    }

    public @Nullable Properties getStreamsConfiguration() {
        return this.properties;
    }

    public void setClientSupplier(KafkaClientSupplier clientSupplier) {
        Assert.notNull((Object)clientSupplier, (String)"'clientSupplier' must not be null");
        this.clientSupplier = clientSupplier;
    }

    public void setInfrastructureCustomizer(KafkaStreamsInfrastructureCustomizer infrastructureCustomizer) {
        Assert.notNull((Object)infrastructureCustomizer, (String)"'infrastructureCustomizer' must not be null");
        this.infrastructureCustomizer = infrastructureCustomizer;
    }

    public void setKafkaStreamsCustomizer(KafkaStreamsCustomizer kafkaStreamsCustomizer) {
        Assert.notNull((Object)kafkaStreamsCustomizer, (String)"'kafkaStreamsCustomizer' must not be null");
        this.kafkaStreamsCustomizer = kafkaStreamsCustomizer;
    }

    public void setStateListener(KafkaStreams.StateListener stateListener) {
        this.stateListener = stateListener;
    }

    public void setStreamsUncaughtExceptionHandler(StreamsUncaughtExceptionHandler streamsUncaughtExceptionHandler) {
        this.streamsUncaughtExceptionHandler = streamsUncaughtExceptionHandler;
    }

    public @Nullable StreamsUncaughtExceptionHandler getStreamsUncaughtExceptionHandler() {
        return this.streamsUncaughtExceptionHandler;
    }

    public void setStateRestoreListener(StateRestoreListener stateRestoreListener) {
        this.stateRestoreListener = stateRestoreListener;
    }

    public void setCloseTimeout(int closeTimeout) {
        this.closeTimeout = Duration.ofSeconds(closeTimeout);
    }

    public void setLeaveGroupOnClose(boolean leaveGroupOnClose) {
        this.leaveGroupOnClose = leaveGroupOnClose;
    }

    public Topology getTopology() {
        return this.topology;
    }

    public Class<?> getObjectType() {
        return StreamsBuilder.class;
    }

    public void setAutoStartup(boolean autoStartup) {
        this.autoStartup = autoStartup;
    }

    public void setPhase(int phase) {
        this.phase = phase;
    }

    public int getPhase() {
        return this.phase;
    }

    public void setCleanupConfig(CleanupConfig cleanupConfig) {
        Assert.notNull((Object)cleanupConfig, (String)CLEANUP_CONFIG_MUST_NOT_BE_NULL);
        this.cleanupConfig = cleanupConfig;
    }

    public synchronized @Nullable KafkaStreams getKafkaStreams() {
        this.lifecycleLock.lock();
        try {
            KafkaStreams kafkaStreams = this.kafkaStreams;
            return kafkaStreams;
        }
        finally {
            this.lifecycleLock.unlock();
        }
    }

    public List<Listener> getListeners() {
        return Collections.unmodifiableList(this.listeners);
    }

    public void addListener(Listener listener) {
        Assert.notNull((Object)listener, (String)"'listener' cannot be null");
        this.listeners.add(listener);
    }

    public boolean removeListener(Listener listener) {
        return this.listeners.remove(listener);
    }

    protected StreamsBuilder createInstance() {
        this.lifecycleLock.lock();
        try {
            if (this.autoStartup) {
                Assert.state((this.properties != null ? 1 : 0) != 0, (String)"streams configuration properties must not be null");
            }
            StreamsBuilder builder = this.createStreamBuilder();
            this.infrastructureCustomizer.configureBuilder(builder);
            StreamsBuilder streamsBuilder = builder;
            return streamsBuilder;
        }
        finally {
            this.lifecycleLock.unlock();
        }
    }

    public boolean isAutoStartup() {
        return this.autoStartup;
    }

    public void stop(Runnable callback) {
        this.stop();
        callback.run();
    }

    public void start() {
        block8: {
            this.lifecycleLock.lock();
            try {
                if (this.running) break block8;
                try {
                    Assert.state((this.properties != null ? 1 : 0) != 0, (String)"streams configuration properties must not be null");
                    this.kafkaStreams = this.kafkaStreamsCustomizer.initKafkaStreams(this.topology, this.properties, this.clientSupplier);
                    this.kafkaStreams.setStateListener(this.stateListener);
                    this.kafkaStreams.setGlobalStateRestoreListener(this.stateRestoreListener);
                    if (this.streamsUncaughtExceptionHandler != null) {
                        this.kafkaStreams.setUncaughtExceptionHandler(this.streamsUncaughtExceptionHandler);
                    }
                    this.kafkaStreamsCustomizer.customize(this.kafkaStreams);
                    if (this.cleanupConfig.cleanupOnStart()) {
                        this.kafkaStreams.cleanUp();
                    }
                    this.kafkaStreams.start();
                    for (Listener listener : this.listeners) {
                        listener.streamsAdded(this.beanName, this.kafkaStreams);
                    }
                    this.running = true;
                }
                catch (Exception e) {
                    throw new KafkaException("Could not start stream: ", e);
                }
            }
            finally {
                this.lifecycleLock.unlock();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void stop() {
        block11: {
            this.lifecycleLock.lock();
            try {
                if (!this.running) break block11;
                try {
                    if (this.kafkaStreams != null) {
                        this.kafkaStreams.close(new KafkaStreams.CloseOptions().timeout(this.closeTimeout).leaveGroup(this.leaveGroupOnClose));
                        if (this.cleanupConfig.cleanupOnStop()) {
                            this.kafkaStreams.cleanUp();
                        }
                        for (Listener listener : this.listeners) {
                            listener.streamsRemoved(this.beanName, this.kafkaStreams);
                        }
                        this.kafkaStreams = null;
                    }
                }
                catch (Exception e) {
                    LOGGER.error((Throwable)e, (CharSequence)"Failed to stop streams");
                }
                finally {
                    this.running = false;
                }
            }
            finally {
                this.lifecycleLock.unlock();
            }
        }
    }

    public boolean isRunning() {
        this.lifecycleLock.lock();
        try {
            boolean bl = this.running;
            return bl;
        }
        finally {
            this.lifecycleLock.unlock();
        }
    }

    public void afterSingletonsInstantiated() {
        try {
            this.topology = ((StreamsBuilder)this.getObject()).build(this.properties);
            this.infrastructureCustomizer.configureTopology(this.topology);
            TopologyDescription description = this.topology.describe();
            LOGGER.debug(() -> ((TopologyDescription)description).toString());
        }
        catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    private StreamsBuilder createStreamBuilder() {
        if (this.properties == null) {
            return new StreamsBuilder();
        }
        StreamsConfig streamsConfig = new StreamsConfig((Map)this.properties);
        TopologyConfig topologyConfig = new TopologyConfig(streamsConfig);
        return new StreamsBuilder(topologyConfig);
    }

    public static interface Listener {
        default public void streamsAdded(String id, KafkaStreams streams) {
        }

        default public void streamsRemoved(String id, KafkaStreams streams) {
        }
    }
}

