/*
 * Decompiled with CFR 0.152.
 */
package org.springframework.rabbit.stream.listener;

import com.rabbitmq.stream.Codec;
import com.rabbitmq.stream.Consumer;
import com.rabbitmq.stream.ConsumerBuilder;
import com.rabbitmq.stream.Environment;
import io.micrometer.observation.Observation;
import io.micrometer.observation.ObservationRegistry;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import org.aopalliance.aop.Advice;
import org.apache.commons.logging.LogFactory;
import org.jspecify.annotations.Nullable;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageListener;
import org.springframework.amqp.rabbit.listener.MicrometerHolder;
import org.springframework.amqp.rabbit.listener.ObservableListenerContainer;
import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener;
import org.springframework.amqp.rabbit.support.RabbitExceptionTranslator;
import org.springframework.aop.Advisor;
import org.springframework.aop.framework.ProxyFactory;
import org.springframework.aop.support.DefaultPointcutAdvisor;
import org.springframework.core.log.LogAccessor;
import org.springframework.rabbit.stream.listener.ConsumerCustomizer;
import org.springframework.rabbit.stream.listener.StreamMessageListener;
import org.springframework.rabbit.stream.micrometer.RabbitStreamListenerObservation;
import org.springframework.rabbit.stream.micrometer.RabbitStreamListenerObservationConvention;
import org.springframework.rabbit.stream.micrometer.RabbitStreamMessageReceiverContext;
import org.springframework.rabbit.stream.support.StreamMessageProperties;
import org.springframework.rabbit.stream.support.converter.DefaultStreamMessageConverter;
import org.springframework.rabbit.stream.support.converter.StreamMessageConverter;
import org.springframework.util.Assert;

public class StreamListenerContainer
extends ObservableListenerContainer {
    protected LogAccessor logger = new LogAccessor(LogFactory.getLog(((Object)((Object)this)).getClass()));
    private final Lock lock = new ReentrantLock();
    private final ConsumerBuilder builder;
    private final Collection<Consumer> consumers = new ArrayList<Consumer>();
    private StreamMessageConverter streamConverter;
    private ConsumerCustomizer consumerCustomizer = (id, con) -> {};
    private boolean simpleStream;
    private boolean superStream;
    private int concurrency = 1;
    private boolean autoStartup = true;
    private @Nullable MessageListener messageListener;
    private @Nullable StreamMessageListener streamListener;
    private Advice @Nullable [] adviceChain;
    private String streamName;
    private @Nullable RabbitStreamListenerObservationConvention observationConvention;

    public StreamListenerContainer(Environment environment) {
        this(environment, null);
    }

    public StreamListenerContainer(Environment environment, @Nullable Codec codec) {
        Assert.notNull((Object)environment, (String)"'environment' cannot be null");
        this.builder = environment.consumerBuilder();
        this.streamConverter = codec != null ? new DefaultStreamMessageConverter(codec) : new DefaultStreamMessageConverter();
    }

    public String getStreamName() {
        return this.streamName;
    }

    public void setQueueNames(String ... queueNames) {
        Assert.isTrue((!this.superStream ? 1 : 0) != 0, (String)"setQueueNames() and superStream() are mutually exclusive");
        Assert.isTrue((queueNames != null && queueNames.length == 1 ? 1 : 0) != 0, (String)"Only one stream is supported");
        this.lock.lock();
        try {
            this.builder.stream(queueNames[0]);
            this.simpleStream = true;
            this.streamName = queueNames[0];
        }
        finally {
            this.lock.unlock();
        }
    }

    public void superStream(String streamName, String name) {
        this.superStream(streamName, name, 1);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void superStream(String streamName, String name, int consumers) {
        this.lock.lock();
        try {
            Assert.isTrue((consumers > 0 ? 1 : 0) != 0, () -> "'concurrency' must be greater than zero, not " + consumers);
            this.concurrency = consumers;
            Assert.isTrue((!this.simpleStream ? 1 : 0) != 0, (String)"setQueueNames() and superStream() are mutually exclusive");
            Assert.notNull((Object)streamName, (String)"'superStream' cannot be null");
            this.builder.superStream(streamName).singleActiveConsumer().name(name);
            this.superStream = true;
            this.streamName = streamName;
        }
        finally {
            this.lock.unlock();
        }
    }

    public StreamMessageConverter getStreamConverter() {
        return this.streamConverter;
    }

    public void setStreamConverter(StreamMessageConverter messageConverter) {
        Assert.notNull((Object)messageConverter, (String)"'messageConverter' cannot be null");
        this.streamConverter = messageConverter;
    }

    public void setConsumerCustomizer(ConsumerCustomizer consumerCustomizer) {
        this.lock.lock();
        try {
            Assert.notNull((Object)consumerCustomizer, (String)"'consumerCustomizer' cannot be null");
            this.consumerCustomizer = consumerCustomizer;
        }
        finally {
            this.lock.unlock();
        }
    }

    public void setAutoStartup(boolean autoStart) {
        this.autoStartup = autoStart;
    }

    public boolean isAutoStartup() {
        return this.autoStartup;
    }

    public void setAdviceChain(Advice ... advices) {
        Assert.notNull((Object)advices, (String)"'advices' cannot be null");
        Assert.noNullElements((Object[])advices, (String)"'advices' cannot have null elements");
        this.adviceChain = Arrays.copyOf(advices, advices.length);
    }

    public @Nullable Object getMessageListener() {
        return this.messageListener;
    }

    public void setObservationConvention(RabbitStreamListenerObservationConvention observationConvention) {
        this.observationConvention = observationConvention;
    }

    public void afterPropertiesSet() {
        this.checkMicrometer();
        this.checkObservation();
    }

    public boolean isRunning() {
        this.lock.lock();
        try {
            boolean bl = !this.consumers.isEmpty();
            return bl;
        }
        finally {
            this.lock.unlock();
        }
    }

    public void start() {
        this.lock.lock();
        try {
            if (this.consumers.isEmpty()) {
                this.consumerCustomizer.accept(this.getListenerId(), this.builder);
                if (this.simpleStream) {
                    this.consumers.add(this.builder.build());
                } else {
                    for (int i = 0; i < this.concurrency; ++i) {
                        this.consumers.add(this.builder.build());
                    }
                }
            }
        }
        finally {
            this.lock.unlock();
        }
    }

    public void stop() {
        this.lock.lock();
        try {
            this.consumers.forEach(consumer -> {
                try {
                    consumer.close();
                }
                catch (RuntimeException ex) {
                    this.logger.error((Throwable)ex, (CharSequence)"Failed to close consumer");
                }
            });
            this.consumers.clear();
        }
        finally {
            this.lock.unlock();
        }
    }

    public void setupMessageListener(MessageListener messageListener) {
        this.adviseIfNeeded(messageListener);
        this.builder.messageHandler((context, message) -> {
            ObservationRegistry registry = this.getObservationRegistry();
            Object sample = null;
            MicrometerHolder micrometerHolder = this.getMicrometerHolder();
            if (micrometerHolder != null) {
                sample = micrometerHolder.start();
            }
            Observation observation = RabbitStreamListenerObservation.STREAM_LISTENER_OBSERVATION.observation(this.observationConvention, RabbitStreamListenerObservation.DefaultRabbitStreamListenerObservationConvention.INSTANCE, () -> new RabbitStreamMessageReceiverContext(message, this.getListenerId(), this.streamName), registry);
            Object finalSample = sample;
            StreamMessageListener streamListenerToUse = this.streamListener;
            if (streamListenerToUse != null) {
                observation.observe(() -> {
                    try {
                        streamListenerToUse.onStreamMessage(message, context);
                        if (micrometerHolder != null && finalSample != null) {
                            micrometerHolder.success(finalSample, this.streamName);
                        }
                    }
                    catch (RuntimeException rtex) {
                        if (micrometerHolder != null && finalSample != null) {
                            micrometerHolder.failure(finalSample, this.streamName, rtex.getClass().getSimpleName());
                        }
                        throw rtex;
                    }
                    catch (Exception ex) {
                        if (micrometerHolder != null && finalSample != null) {
                            micrometerHolder.failure(finalSample, this.streamName, ex.getClass().getSimpleName());
                        }
                        throw RabbitExceptionTranslator.convertRabbitAccessException((Throwable)ex);
                    }
                });
            } else {
                Message message2 = this.streamConverter.toMessage((Object)message, new StreamMessageProperties(context));
                MessageListener patt0$temp = this.messageListener;
                if (patt0$temp instanceof ChannelAwareMessageListener) {
                    ChannelAwareMessageListener channelAwareMessageListener = (ChannelAwareMessageListener)patt0$temp;
                    try {
                        observation.observe(() -> {
                            try {
                                channelAwareMessageListener.onMessage(message2, null);
                                if (micrometerHolder != null && finalSample != null) {
                                    micrometerHolder.success(finalSample, this.streamName);
                                }
                            }
                            catch (RuntimeException rtex) {
                                if (micrometerHolder != null && finalSample != null) {
                                    micrometerHolder.failure(finalSample, this.streamName, rtex.getClass().getSimpleName());
                                }
                                throw rtex;
                            }
                            catch (Exception ex) {
                                if (micrometerHolder != null && finalSample != null) {
                                    micrometerHolder.failure(finalSample, this.streamName, ex.getClass().getSimpleName());
                                }
                                throw RabbitExceptionTranslator.convertRabbitAccessException((Throwable)ex);
                            }
                        });
                    }
                    catch (Exception ex) {
                        this.logger.error((Throwable)ex, (CharSequence)"Listener threw an exception");
                    }
                } else {
                    MessageListener messageListenerToUse = this.messageListener;
                    Assert.state((messageListenerToUse != null ? 1 : 0) != 0, (String)"'messageListener' or 'streamListener' is required");
                    observation.observe(() -> messageListenerToUse.onMessage(message2));
                }
            }
        });
    }

    private void adviseIfNeeded(MessageListener messageListener) {
        this.messageListener = messageListener;
        if (messageListener instanceof StreamMessageListener) {
            StreamMessageListener streamMessageListener;
            this.streamListener = streamMessageListener = (StreamMessageListener)messageListener;
        }
        if (this.adviceChain != null && this.adviceChain.length > 0) {
            ProxyFactory factory = new ProxyFactory((Object)messageListener);
            for (Advice advice : this.adviceChain) {
                factory.addAdvisor((Advisor)new DefaultPointcutAdvisor(advice));
            }
            factory.setInterfaces((Class[])messageListener.getClass().getInterfaces());
            if (this.streamListener != null) {
                this.streamListener = (StreamMessageListener)factory.getProxy(((Object)((Object)this)).getClass().getClassLoader());
            } else {
                this.messageListener = (MessageListener)factory.getProxy(((Object)((Object)this)).getClass().getClassLoader());
            }
        }
    }
}

