/*
 * Decompiled with CFR 0.152.
 */
package org.springframework.pulsar.listener;

import io.micrometer.observation.Observation;
import java.lang.reflect.Field;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;
import org.apache.pulsar.client.api.BatchReceivePolicy;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.DeadLetterPolicy;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.MessageListener;
import org.apache.pulsar.client.api.Messages;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.RedeliveryBackoff;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.api.transaction.Transaction;
import org.apache.pulsar.client.impl.ConsumerImpl;
import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
import org.jspecify.annotations.Nullable;
import org.springframework.beans.BeanUtils;
import org.springframework.context.ApplicationEvent;
import org.springframework.context.ApplicationEventPublisher;
import org.springframework.core.log.LogAccessor;
import org.springframework.core.task.AsyncTaskExecutor;
import org.springframework.core.task.SimpleAsyncTaskExecutor;
import org.springframework.pulsar.PulsarException;
import org.springframework.pulsar.config.StartupFailurePolicy;
import org.springframework.pulsar.core.ConsumerBuilderConfigurationUtil;
import org.springframework.pulsar.core.ConsumerBuilderCustomizer;
import org.springframework.pulsar.core.PulsarConsumerFactory;
import org.springframework.pulsar.event.ConsumerFailedToStartEvent;
import org.springframework.pulsar.event.ConsumerStartedEvent;
import org.springframework.pulsar.event.ConsumerStartingEvent;
import org.springframework.pulsar.listener.AbstractPulsarMessageListenerContainer;
import org.springframework.pulsar.listener.AckMode;
import org.springframework.pulsar.listener.Acknowledgement;
import org.springframework.pulsar.listener.PulsarAcknowledgingMessageListener;
import org.springframework.pulsar.listener.PulsarBatchAcknowledgingMessageListener;
import org.springframework.pulsar.listener.PulsarBatchListenerFailedException;
import org.springframework.pulsar.listener.PulsarBatchMessageListener;
import org.springframework.pulsar.listener.PulsarConsumerErrorHandler;
import org.springframework.pulsar.listener.PulsarContainerProperties;
import org.springframework.pulsar.listener.PulsarRecordMessageListener;
import org.springframework.pulsar.observation.DefaultPulsarListenerObservationConvention;
import org.springframework.pulsar.observation.PulsarListenerObservation;
import org.springframework.pulsar.observation.PulsarMessageReceiverContext;
import org.springframework.pulsar.transaction.PulsarAwareTransactionManager;
import org.springframework.pulsar.transaction.PulsarResourceHolder;
import org.springframework.pulsar.transaction.PulsarTransactionUtils;
import org.springframework.retry.support.RetryTemplate;
import org.springframework.scheduling.SchedulingAwareRunnable;
import org.springframework.transaction.PlatformTransactionManager;
import org.springframework.transaction.TransactionDefinition;
import org.springframework.transaction.TransactionStatus;
import org.springframework.transaction.support.TransactionCallback;
import org.springframework.transaction.support.TransactionCallbackWithoutResult;
import org.springframework.transaction.support.TransactionTemplate;
import org.springframework.util.Assert;
import org.springframework.util.CollectionUtils;
import org.springframework.util.ReflectionUtils;
import org.springframework.util.StringUtils;

public class DefaultPulsarMessageListenerContainer<T>
extends AbstractPulsarMessageListenerContainer<T> {
    private volatile @Nullable CompletableFuture<?> listenerConsumerFuture;
    private volatile @Nullable Listener listenerConsumer;
    private final AbstractPulsarMessageListenerContainer<?> thisOrParentContainer;
    private final AtomicReference<Thread> listenerConsumerThread = new AtomicReference();
    private volatile CountDownLatch startLatch = new CountDownLatch(1);
    private final AtomicBoolean receiveInProgress = new AtomicBoolean();
    private final Lock lockOnPause = new ReentrantLock();
    private final Condition pausedCondition = this.lockOnPause.newCondition();
    private final LogAccessor listenerErrorLogger = new LogAccessor("%s-ListenerErrors".formatted(DefaultPulsarMessageListenerContainer.class.getName()));

    public DefaultPulsarMessageListenerContainer(PulsarConsumerFactory<? super T> pulsarConsumerFactory, PulsarContainerProperties pulsarContainerProperties) {
        super(pulsarConsumerFactory, pulsarContainerProperties);
        this.thisOrParentContainer = this;
    }

    @Override
    protected void doStart() {
        MessageListener messageListener;
        AsyncTaskExecutor consumerExecutor;
        PulsarContainerProperties containerProperties;
        block7: {
            containerProperties = this.getContainerProperties();
            consumerExecutor = containerProperties.getConsumerTaskExecutor();
            if (consumerExecutor == null) {
                consumerExecutor = new SimpleAsyncTaskExecutor((this.getBeanName() == null ? "" : this.getBeanName()) + "-C-");
                containerProperties.setConsumerTaskExecutor(consumerExecutor);
            }
            messageListener = (MessageListener)containerProperties.getMessageListener();
            try {
                this.listenerConsumer = new Listener(messageListener, containerProperties);
            }
            catch (Exception e) {
                String msg = "Error starting listener container [%s]".formatted(this.getBeanName());
                this.logger.error((Throwable)e, () -> msg);
                if (containerProperties.getStartupFailurePolicy() != StartupFailurePolicy.RETRY) {
                    this.publishConsumerFailedToStart();
                }
                if (containerProperties.getStartupFailurePolicy() != StartupFailurePolicy.STOP) break block7;
                this.logger.info(() -> "Configured to stop on startup failures - exiting");
                throw new IllegalStateException(msg, e);
            }
        }
        if (this.listenerConsumer != null) {
            this.logger.debug(() -> "Successfully created completable - submitting to executor");
            this.listenerConsumerFuture = consumerExecutor.submitCompletable((Runnable)((Object)this.listenerConsumer));
            this.waitForStartup(containerProperties.determineConsumerStartTimeout());
        } else if (containerProperties.getStartupFailurePolicy() == StartupFailurePolicy.RETRY) {
            this.logger.info(() -> "Configured to retry on startup failure - retrying asynchronously");
            this.listenerConsumerFuture = consumerExecutor.submitCompletable(() -> {
                RetryTemplate retryTemplate = Optional.ofNullable(containerProperties.getStartupFailureRetryTemplate()).orElseGet(containerProperties::getDefaultStartupFailureRetryTemplate);
                this.listenerConsumer = (Listener)retryTemplate.execute(__ -> new Listener(messageListener, containerProperties));
                this.listenerConsumer.run();
            }).whenComplete((__, ex) -> {
                if (ex == null) {
                    this.logger.info(() -> "Successfully re-started listener container [%s]".formatted(this.getBeanName()));
                } else {
                    this.logger.error(ex, () -> "Unable to re-start listener container [%s] - retries exhausted".formatted(this.getBeanName()));
                    this.publishConsumerFailedToStart();
                }
            });
        }
    }

    private void waitForStartup(Duration waitTime) {
        try {
            if (!this.startLatch.await(waitTime.toMillis(), TimeUnit.MILLISECONDS)) {
                this.logger.error((CharSequence)"Consumer thread failed to start - does the configured task executor have enough threads to support all containers and concurrency?");
                this.publishConsumerFailedToStart();
            }
        }
        catch (InterruptedException ignored) {
            Thread.currentThread().interrupt();
        }
    }

    @Override
    public void doStop() {
        this.setRunning(false);
        this.logger.info((CharSequence)"Pausing consumer");
        if (this.listenerConsumer != null && this.listenerConsumer.consumer != null) {
            this.listenerConsumer.consumer.pause();
        }
        if (this.listenerConsumerThread.get() != null) {
            if (this.receiveInProgress.get()) {
                this.listenerConsumerThread.get().interrupt();
            }
            try {
                this.listenerConsumerThread.get().join();
            }
            catch (InterruptedException e) {
                this.logger.error((Throwable)e, () -> "Interrupting the main thread");
                Thread.currentThread().interrupt();
            }
        }
        try {
            this.logger.info((CharSequence)"Closing consumer");
            if (this.listenerConsumer != null && this.listenerConsumer.consumer != null) {
                this.listenerConsumer.consumer.close();
            }
        }
        catch (PulsarClientException e) {
            this.logger.error((Throwable)e, () -> "Error closing Pulsar Client.");
        }
    }

    private void publishConsumerStartingEvent() {
        this.setRunning(true);
        this.startLatch.countDown();
        ApplicationEventPublisher publisher = this.getApplicationEventPublisher();
        if (publisher != null) {
            publisher.publishEvent((ApplicationEvent)new ConsumerStartingEvent(this, this.thisOrParentContainer));
        }
    }

    private void publishConsumerStartedEvent() {
        ApplicationEventPublisher publisher = this.getApplicationEventPublisher();
        if (publisher != null) {
            publisher.publishEvent((ApplicationEvent)new ConsumerStartedEvent(this, this.thisOrParentContainer));
        }
    }

    private void publishConsumerFailedToStart() {
        ApplicationEventPublisher publisher = this.getApplicationEventPublisher();
        if (publisher != null) {
            publisher.publishEvent((ApplicationEvent)new ConsumerFailedToStartEvent(this, this.thisOrParentContainer));
        }
    }

    @Override
    public void doPause() {
        this.setPaused(true);
        if (this.listenerConsumer != null) {
            this.listenerConsumer.pause();
        }
    }

    @Override
    public void doResume() {
        if (this.listenerConsumer != null) {
            this.listenerConsumer.resume();
        }
        this.setPaused(false);
        this.lockOnPause.lock();
        try {
            this.pausedCondition.signal();
        }
        finally {
            this.lockOnPause.unlock();
        }
    }

    private final class Listener
    implements SchedulingAwareRunnable {
        private final @Nullable PulsarRecordMessageListener<T> listener;
        private final @Nullable PulsarBatchMessageListener<T> batchMessageListener;
        private final PulsarContainerProperties containerProperties;
        private Consumer<T> consumer;
        private final Set<MessageId> nackableMessages = new HashSet<MessageId>();
        private final @Nullable PulsarConsumerErrorHandler<T> pulsarConsumerErrorHandler;
        private final @Nullable ConsumerBuilderCustomizer<T> consumerBuilderCustomizer;
        private final boolean isBatchListener;
        private final AckMode ackMode;
        private @Nullable SubscriptionType subscriptionType;
        private @Nullable PulsarAwareTransactionManager transactionManager;
        private @Nullable TransactionTemplate transactionTemplate;

        Listener(MessageListener<?> messageListener, PulsarContainerProperties containerProperties) {
            this.containerProperties = containerProperties;
            this.isBatchListener = this.containerProperties.isBatchListener();
            Assert.notNull((Object)((Object)this.containerProperties.getAckMode()), (String)"containerProperties.ackMode must not be null");
            this.ackMode = this.containerProperties.getAckMode();
            this.subscriptionType = this.containerProperties.getSubscriptionType();
            this.pulsarConsumerErrorHandler = DefaultPulsarMessageListenerContainer.this.getPulsarConsumerErrorHandler();
            this.validateTransactionSettings(this.containerProperties.transactions());
            this.transactionManager = this.containerProperties.transactions().getTransactionManager();
            this.transactionTemplate = this.determineTransactionTemplate();
            if (messageListener instanceof PulsarBatchMessageListener) {
                this.batchMessageListener = (PulsarBatchMessageListener)messageListener;
                this.listener = null;
            } else if (messageListener != null) {
                this.listener = (PulsarRecordMessageListener)messageListener;
                this.batchMessageListener = null;
            } else {
                this.listener = null;
                this.batchMessageListener = null;
            }
            this.consumerBuilderCustomizer = DefaultPulsarMessageListenerContainer.this.getConsumerBuilderCustomizer();
            Map<String, Object> propertiesToConsumer = this.extractDirectConsumerProperties();
            this.populateAllNecessaryPropertiesIfNeedBe(propertiesToConsumer);
            BatchReceivePolicy batchReceivePolicy = new BatchReceivePolicy.Builder().maxNumMessages(containerProperties.getMaxNumMessages()).maxNumBytes(containerProperties.getMaxNumBytes()).timeout(containerProperties.getBatchTimeoutMillis(), TimeUnit.MILLISECONDS).build();
            Set topicNames = (Set)propertiesToConsumer.remove("topicNames");
            Map properties = (Map)propertiesToConsumer.remove("properties");
            ArrayList customizers = new ArrayList();
            customizers.add(builder -> {
                ConsumerBuilderConfigurationUtil.loadConf(builder, propertiesToConsumer);
                builder.batchReceivePolicy(batchReceivePolicy);
            });
            if (this.consumerBuilderCustomizer != null) {
                customizers.add(this.consumerBuilderCustomizer);
            }
            Assert.notNull(containerProperties.getSchema(), (String)"containerProperties.schema must not be null");
            this.consumer = DefaultPulsarMessageListenerContainer.this.getPulsarConsumerFactory().createConsumer(containerProperties.getSchema(), topicNames, this.containerProperties.getSubscriptionName(), properties, customizers);
            Assert.state((this.consumer != null ? 1 : 0) != 0, (String)"Unable to create a consumer");
            this.updateSubscriptionTypeFromConsumer(this.consumer);
        }

        private void validateTransactionSettings(PulsarContainerProperties.TransactionSettings txnProps) {
            if (!txnProps.isEnabled()) {
                return;
            }
            Assert.state((txnProps.getTransactionManager() != null ? 1 : 0) != 0, (String)"Transactions are enabled but txn manager is not set");
            boolean txnRecordListenerWithBatchAckMode = !this.isBatchListener && this.containerProperties.getAckMode() == AckMode.BATCH;
            Assert.state((!txnRecordListenerWithBatchAckMode ? 1 : 0) != 0, (String)"Transactional record listeners can not use batch ack mode");
            boolean txnBatchListenerWithRecordAckMode = this.isBatchListener && this.containerProperties.getAckMode() == AckMode.RECORD;
            Assert.state((!txnBatchListenerWithRecordAckMode ? 1 : 0) != 0, (String)"Transactional batch listeners do not support AckMode.RECORD");
            boolean txnBatchListenerWithErrorHandler = this.isBatchListener && this.pulsarConsumerErrorHandler != null;
            Assert.state((!txnBatchListenerWithErrorHandler ? 1 : 0) != 0, (String)"Transactional batch listeners do not support custom error handlers");
        }

        private @Nullable TransactionTemplate determineTransactionTemplate() {
            if (this.transactionManager == null) {
                return null;
            }
            TransactionTemplate template = new TransactionTemplate((PlatformTransactionManager)this.transactionManager);
            TransactionDefinition definition = this.containerProperties.transactions().determineTransactionDefinition();
            Assert.state((definition == null || definition.getPropagationBehavior() == 0 || definition.getPropagationBehavior() == 3 ? 1 : 0) != 0, (String)"Transaction propagation behavior must be REQUIRED or REQUIRES_NEW");
            if (definition != null) {
                BeanUtils.copyProperties((Object)definition, (Object)template);
            }
            return template;
        }

        private void updateSubscriptionTypeFromConsumer(Consumer<T> consumer) {
            try {
                Field confField = ReflectionUtils.findField(ConsumerImpl.class, (String)"conf");
                Assert.notNull((Object)confField, (String)"Could not find field 'conf' on ConsumerImpl.class");
                ReflectionUtils.makeAccessible((Field)confField);
                Object conf = ReflectionUtils.getField((Field)confField, consumer);
                if (conf instanceof ConsumerConfigurationData) {
                    ConsumerConfigurationData confData = (ConsumerConfigurationData)conf;
                    this.subscriptionType = confData.getSubscriptionType();
                }
            }
            catch (Exception ex) {
                DefaultPulsarMessageListenerContainer.this.logger.error((Throwable)ex, () -> "Unable to determine default subscription type from consumer due to: " + ex.getMessage());
            }
        }

        private Map<String, Object> extractDirectConsumerProperties() {
            Properties propertyOverrides = this.containerProperties.getPulsarConsumerProperties();
            return propertyOverrides.entrySet().stream().collect(Collectors.toMap(e -> String.valueOf(e.getKey()), Map.Entry::getValue, (prev, next) -> next, HashMap::new));
        }

        private void populateAllNecessaryPropertiesIfNeedBe(Map<String, Object> currentProperties) {
            DeadLetterPolicy deadLetterPolicy;
            RedeliveryBackoff ackTimeoutRedeliveryBackoff;
            RedeliveryBackoff negativeAckRedeliveryBackoff;
            String topicsPattern;
            Set<String> listenerDefinedTopics;
            SubscriptionType subscriptionType;
            String topicsFromMap;
            String[] topicNames;
            Set<String> propertiesDefinedTopics;
            if (currentProperties.containsKey("topicNames") && !(propertiesDefinedTopics = Set.of(topicNames = StringUtils.delimitedListToStringArray((String)(topicsFromMap = (String)currentProperties.get("topicNames")), (String)","))).isEmpty()) {
                currentProperties.put("topicNames", propertiesDefinedTopics);
            }
            if (!currentProperties.containsKey("subscriptionType") && (subscriptionType = this.containerProperties.getSubscriptionType()) != null) {
                currentProperties.put("subscriptionType", subscriptionType);
            }
            if (!currentProperties.containsKey("topicNames") && !CollectionUtils.isEmpty(listenerDefinedTopics = this.containerProperties.getTopics())) {
                currentProperties.put("topicNames", listenerDefinedTopics);
            }
            if (!currentProperties.containsKey("topicsPattern") && (topicsPattern = this.containerProperties.getTopicsPattern()) != null) {
                currentProperties.put("topicsPattern", topicsPattern);
            }
            if (!currentProperties.containsKey("subscriptionName") && StringUtils.hasText((String)this.containerProperties.getSubscriptionName())) {
                currentProperties.put("subscriptionName", this.containerProperties.getSubscriptionName());
            }
            if ((negativeAckRedeliveryBackoff = DefaultPulsarMessageListenerContainer.this.negativeAckRedeliveryBackoff) != null) {
                currentProperties.put("negativeAckRedeliveryBackoff", negativeAckRedeliveryBackoff);
            }
            if ((ackTimeoutRedeliveryBackoff = DefaultPulsarMessageListenerContainer.this.ackTimeoutRedeliveryBackoff) != null) {
                currentProperties.put("ackTimeoutRedeliveryBackoff", ackTimeoutRedeliveryBackoff);
            }
            if ((deadLetterPolicy = DefaultPulsarMessageListenerContainer.this.deadLetterPolicy) != null) {
                currentProperties.put("deadLetterPolicy", deadLetterPolicy);
            }
        }

        public boolean isLongLived() {
            return true;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void run() {
            DefaultPulsarMessageListenerContainer.this.listenerConsumerThread.set(Thread.currentThread());
            DefaultPulsarMessageListenerContainer.this.publishConsumerStartingEvent();
            DefaultPulsarMessageListenerContainer.this.publishConsumerStartedEvent();
            AtomicBoolean inRetryMode = new AtomicBoolean(false);
            AtomicBoolean messagesPendingInBatch = new AtomicBoolean(false);
            Messages messages = null;
            List<Object> messageList = new ArrayList();
            while (DefaultPulsarMessageListenerContainer.this.isRunning()) {
                this.checkIfPausedAndHandleAccordingly();
                try {
                    if (!inRetryMode.get() && !messagesPendingInBatch.get()) {
                        DefaultPulsarMessageListenerContainer.this.receiveInProgress.set(true);
                        if (!DefaultPulsarMessageListenerContainer.this.isPaused()) {
                            messages = this.consumer.batchReceive();
                        }
                    }
                }
                catch (PulsarClientException e) {
                    if (e.getCause() instanceof InterruptedException) {
                        DefaultPulsarMessageListenerContainer.this.logger.debug((Throwable)e, () -> "Error receiving messages due to a thread interrupt call from upstream.");
                    } else {
                        DefaultPulsarMessageListenerContainer.this.logger.error((Throwable)e, () -> "Error receiving messages.");
                    }
                    messages = null;
                }
                finally {
                    DefaultPulsarMessageListenerContainer.this.receiveInProgress.set(false);
                }
                if (messages == null) continue;
                if (this.isBatchListener) {
                    if (!inRetryMode.get() && !messagesPendingInBatch.get()) {
                        messageList = new ArrayList();
                        messages.forEach(messageList::add);
                    }
                    messageList = this.invokeBatchListener(messages, messageList, inRetryMode, messagesPendingInBatch);
                    continue;
                }
                this.invokeRecordListener(messages, inRetryMode);
            }
        }

        public void pause() {
            if (this.consumer != null) {
                this.consumer.pause();
            }
        }

        public void resume() {
            if (this.consumer != null) {
                this.consumer.resume();
            }
        }

        private void checkIfPausedAndHandleAccordingly() {
            if (DefaultPulsarMessageListenerContainer.this.isPaused()) {
                DefaultPulsarMessageListenerContainer.this.lockOnPause.lock();
                try {
                    DefaultPulsarMessageListenerContainer.this.pausedCondition.await();
                }
                catch (InterruptedException e) {
                    throw new IllegalStateException("Exception occurred trying to wake up the paused listener thread.");
                }
                finally {
                    DefaultPulsarMessageListenerContainer.this.lockOnPause.unlock();
                }
            }
        }

        private boolean transactional() {
            return this.transactionTemplate != null && this.transactionManager != null;
        }

        private void invokeRecordListener(Messages<T> messages, AtomicBoolean inRetryMode) {
            if (!this.transactional()) {
                this.doInvokeRecordListener(messages, inRetryMode);
            } else {
                this.invokeRecordListenerInTx(messages, inRetryMode);
            }
        }

        private void doInvokeRecordListener(Messages<T> messages, AtomicBoolean inRetryMode) {
            for (Message message : messages) {
                do {
                    this.newObservation(message).observe(() -> this.dispatchMessageToListener(message, inRetryMode, null));
                } while (inRetryMode.get());
            }
            if (this.ackMode.equals((Object)AckMode.BATCH)) {
                this.handleBatchAcksForRecordListener(messages, null);
            }
        }

        private void invokeRecordListenerInTx(Messages<T> messages, AtomicBoolean inRetryMode) {
            for (Message message : messages) {
                do {
                    this.newObservation(message).observe(() -> this.dispatchMessageToListenerInTxn(message, inRetryMode));
                } while (inRetryMode.get());
            }
        }

        private Observation newObservation(Message<T> message) {
            if (this.containerProperties.getObservationRegistry() == null) {
                return Observation.NOOP;
            }
            return PulsarListenerObservation.LISTENER_OBSERVATION.observation(this.containerProperties.getObservationConvention(), DefaultPulsarListenerObservationConvention.INSTANCE, () -> new PulsarMessageReceiverContext(message, DefaultPulsarMessageListenerContainer.this.requireNonNullBeanName()), this.containerProperties.getObservationRegistry());
        }

        private void dispatchMessageToListenerInTxn(final Message<T> message, final AtomicBoolean inRetryMode) {
            try {
                Objects.requireNonNull(this.transactionTemplate, "transactionTemplate must not be null").execute((TransactionCallback)new TransactionCallbackWithoutResult(){

                    protected void doInTransactionWithoutResult(TransactionStatus status) {
                        RuntimeException aborted = Listener.this.dispatchMessageToListener(message, inRetryMode, Listener.this.getTransaction());
                        if (aborted != null) {
                            throw aborted;
                        }
                    }
                });
            }
            catch (Throwable ex) {
                DefaultPulsarMessageListenerContainer.this.logger.error(ex, (CharSequence)"Transaction rolled back");
            }
        }

        private @Nullable Transaction getTransaction() {
            if (this.transactionManager == null) {
                return null;
            }
            PulsarResourceHolder resourceHolder = PulsarTransactionUtils.getResourceHolder(this.transactionManager.getPulsarClient());
            return resourceHolder != null ? resourceHolder.getTransaction() : null;
        }

        private @Nullable RuntimeException dispatchMessageToListener(Message<T> message, AtomicBoolean inRetryMode, @Nullable Transaction txn) {
            try {
                if (this.listener instanceof PulsarAcknowledgingMessageListener) {
                    this.listener.received(this.consumer, message, this.ackMode.equals((Object)AckMode.MANUAL) ? new ConsumerAcknowledgment(this.consumer, message, txn) : null);
                } else if (this.listener != null) {
                    this.listener.received(this.consumer, message);
                }
                if (this.ackMode.equals((Object)AckMode.RECORD)) {
                    this.handleAck(message, txn);
                }
                inRetryMode.compareAndSet(true, false);
            }
            catch (RuntimeException e) {
                DefaultPulsarMessageListenerContainer.this.listenerErrorLogger.debug((Throwable)e, () -> "Error dispatching the message to the listener.");
                if (this.pulsarConsumerErrorHandler != null) {
                    this.invokeRecordListenerErrorHandler(this.pulsarConsumerErrorHandler, inRetryMode, message, e, txn);
                } else if (this.ackMode.equals((Object)AckMode.RECORD)) {
                    this.consumer.negativeAcknowledge(message);
                } else if (this.ackMode.equals((Object)AckMode.BATCH)) {
                    this.nackableMessages.add(message.getMessageId());
                } else {
                    throw new IllegalStateException("Exception occurred and message %".formatted(message.getMessageId()) + "was not auto-nacked; switch to AckMode BATCH or RECORD to enable auto-nacks", e);
                }
                return e;
            }
            return null;
        }

        private void invokeRecordListenerErrorHandler(PulsarConsumerErrorHandler<T> consumerErrorHandler, AtomicBoolean inRetryMode, Message<T> message, Exception e, @Nullable Transaction txn) {
            boolean toBeRetried = consumerErrorHandler.shouldRetryMessage(e, message);
            if (toBeRetried) {
                inRetryMode.set(true);
            } else {
                inRetryMode.compareAndSet(true, false);
                consumerErrorHandler.recoverMessage(this.consumer, message, e);
                if (this.ackMode.equals((Object)AckMode.RECORD)) {
                    this.handleAck(message, txn);
                }
            }
        }

        private List<Message<T>> invokeBatchListener(Messages<T> messages, List<Message<T>> messageList, AtomicBoolean inRetryMode, AtomicBoolean messagesPendingInBatch) {
            if (!this.transactional()) {
                return this.doInvokeBatchListener(messages, messageList, inRetryMode, messagesPendingInBatch, null);
            }
            return this.invokeBatchListenerInTxn(messages, messageList, inRetryMode, messagesPendingInBatch);
        }

        private List<Message<T>> invokeBatchListenerInTxn(Messages<T> messages, List<Message<T>> messageList, AtomicBoolean inRetryMode, AtomicBoolean messagesPendingInBatch) {
            try {
                return (List)Objects.requireNonNull(this.transactionTemplate, "transactionTemplate must not be null").execute(status -> this.doInvokeBatchListener(messages, messageList, inRetryMode, messagesPendingInBatch, this.getTransaction()));
            }
            catch (Throwable e) {
                DefaultPulsarMessageListenerContainer.this.logger.error(e, (CharSequence)"Transaction rolled back");
                return Collections.emptyList();
            }
        }

        private List<Message<T>> doInvokeBatchListener(Messages<T> messages, List<Message<T>> messageList, AtomicBoolean inRetryMode, AtomicBoolean messagesPendingInBatch, @Nullable Transaction txn) {
            try {
                if (!CollectionUtils.isEmpty(messageList)) {
                    if (this.batchMessageListener instanceof PulsarBatchAcknowledgingMessageListener) {
                        this.batchMessageListener.received(this.consumer, messageList, (Acknowledgement)(this.ackMode.equals((Object)AckMode.MANUAL) ? new ConsumerBatchAcknowledgment(this.consumer) : null));
                    } else if (this.batchMessageListener != null) {
                        this.batchMessageListener.received(this.consumer, messageList);
                    }
                    if (this.ackMode.equals((Object)AckMode.BATCH)) {
                        this.handleBatchAcks(messages, txn);
                    }
                    if (this.pulsarConsumerErrorHandler != null) {
                        this.pendingMessagesHandledSuccessfully(this.pulsarConsumerErrorHandler, inRetryMode, messagesPendingInBatch);
                    }
                }
                return Collections.emptyList();
            }
            catch (RuntimeException ex) {
                DefaultPulsarMessageListenerContainer.this.listenerErrorLogger.debug((Throwable)ex, () -> "Error dispatching the messages to the batch listener.");
                if (this.pulsarConsumerErrorHandler != null) {
                    return this.invokeBatchListenerErrorHandler(this.pulsarConsumerErrorHandler, inRetryMode, messagesPendingInBatch, messageList, ex, txn);
                }
                this.consumer.negativeAcknowledge(messages);
                if (txn != null) {
                    throw ex;
                }
                return Collections.emptyList();
            }
        }

        private List<Message<T>> invokeBatchListenerErrorHandler(PulsarConsumerErrorHandler<T> consumerErrorHandler, AtomicBoolean inRetryMode, AtomicBoolean messagesPendingInBatch, List<Message<T>> messageList, Throwable exception, @Nullable Transaction txn) {
            if (!(exception instanceof PulsarBatchListenerFailedException)) {
                exception = exception.getCause();
                Assert.isInstanceOf(PulsarBatchListenerFailedException.class, (Object)exception, (String)"Batch listener should throw PulsarBatchListenerFailedException on errors.");
            }
            PulsarBatchListenerFailedException pulsarBatchListenerFailedException = (PulsarBatchListenerFailedException)((Object)exception);
            Message pulsarMessage = this.getPulsarMessageCausedTheException(pulsarBatchListenerFailedException);
            Message theCurrentPulsarMessageTracked = consumerErrorHandler.currentMessage();
            if (theCurrentPulsarMessageTracked != null && !theCurrentPulsarMessageTracked.equals(pulsarMessage)) {
                this.pendingMessagesHandledSuccessfully(consumerErrorHandler, inRetryMode, messagesPendingInBatch);
            }
            int indexOfFailedMessage = messageList.indexOf(pulsarMessage);
            messageList = messageList.subList(indexOfFailedMessage, messageList.size());
            boolean toBeRetried = consumerErrorHandler.shouldRetryMessage((Exception)((Object)pulsarBatchListenerFailedException), pulsarMessage);
            if (toBeRetried) {
                inRetryMode.set(true);
            } else {
                inRetryMode.compareAndSet(true, false);
                consumerErrorHandler.recoverMessage(this.consumer, pulsarMessage, (Exception)((Object)pulsarBatchListenerFailedException));
                this.handleAck(pulsarMessage, txn);
                if (messageList.size() == 1) {
                    messageList.remove(0);
                    messagesPendingInBatch.set(false);
                } else {
                    messageList = messageList.subList(1, messageList.size());
                    messagesPendingInBatch.set(true);
                }
                consumerErrorHandler.clearMessage();
            }
            return messageList;
        }

        private void pendingMessagesHandledSuccessfully(PulsarConsumerErrorHandler<T> consumerErrorHandler, AtomicBoolean inRetryMode, AtomicBoolean messagesPendingInBatch) {
            inRetryMode.compareAndSet(true, false);
            messagesPendingInBatch.compareAndSet(true, false);
            consumerErrorHandler.clearMessage();
        }

        private Message<T> getPulsarMessageCausedTheException(PulsarBatchListenerFailedException exception) {
            return (Message)exception.getMessageInError();
        }

        private boolean isSharedSubscriptionType() {
            return this.subscriptionType != null && (this.subscriptionType.equals((Object)SubscriptionType.Shared) || this.subscriptionType.equals((Object)SubscriptionType.Key_Shared));
        }

        private void handleBatchAcksForRecordListener(Messages<T> messages, @Nullable Transaction txn) {
            if (this.nackableMessages.isEmpty()) {
                this.handleBatchAcks(messages, txn);
            } else {
                for (Message message : messages) {
                    if (this.nackableMessages.contains(message.getMessageId())) {
                        this.consumer.negativeAcknowledge(message);
                        this.nackableMessages.remove(message.getMessageId());
                        continue;
                    }
                    this.handleAck(message, txn);
                }
            }
        }

        private void handleBatchAcks(Messages<T> messages, @Nullable Transaction txn) {
            if (messages.size() <= 0) {
                return;
            }
            try {
                if (this.isSharedSubscriptionType()) {
                    AckUtils.handleAck(this.consumer, messages, txn);
                } else {
                    Stream stream = StreamSupport.stream(messages.spliterator(), true);
                    Message last = (Message)stream.reduce((a, b) -> b).orElseThrow(() -> new RuntimeException("Failed to determine last message"));
                    AckUtils.handleAckCumulative(this.consumer, last, txn);
                }
            }
            catch (PulsarException pe) {
                DefaultPulsarMessageListenerContainer.this.logger.warn((Throwable)((Object)pe), () -> "Batch acknowledgment failed: " + pe.getMessage());
                this.consumer.negativeAcknowledge(messages);
            }
        }

        private void handleAck(Message<T> message, @Nullable Transaction txn) {
            AckUtils.handleAckWithNackOnFailure(this.consumer, message.getMessageId(), txn);
        }
    }

    static final class AckUtils {
        private static LogAccessor LOG = new LogAccessor(AckUtils.class);

        private AckUtils() {
        }

        static void handleAck(Consumer<?> consumer, MessageId messageId, @Nullable Transaction txn) {
            try {
                if (txn != null) {
                    consumer.acknowledgeAsync(messageId, txn).get();
                } else {
                    consumer.acknowledge(messageId);
                }
            }
            catch (Exception ex) {
                LOG.trace((Throwable)ex, () -> "Ack for msg w/ id [%s] failed due to: %s".formatted(messageId, ex.getMessage()));
                throw PulsarException.unwrap(ex);
            }
        }

        static void handleAckWithNackOnFailure(Consumer<?> consumer, MessageId messageId, @Nullable Transaction txn) {
            try {
                AckUtils.handleAck(consumer, messageId, txn);
            }
            catch (Exception ex) {
                LOG.warn((Throwable)ex, () -> "Ack for msg w/ id [%s] failed due to: %s".formatted(messageId, ex.getMessage()));
                consumer.negativeAcknowledge(messageId);
            }
        }

        static void handleAck(Consumer<?> consumer, List<MessageId> messageIds, @Nullable Transaction txn) {
            try {
                if (txn != null) {
                    consumer.acknowledgeAsync(messageIds, txn).get();
                } else {
                    consumer.acknowledge(messageIds);
                }
            }
            catch (Exception ex) {
                LOG.trace((Throwable)ex, () -> "Batch ack failed due to: %s".formatted(ex.getMessage()));
                throw PulsarException.unwrap(ex);
            }
        }

        static void handleAck(Consumer<?> consumer, Messages<?> messages, @Nullable Transaction txn) {
            try {
                if (txn != null) {
                    consumer.acknowledgeAsync(messages, txn).get();
                } else {
                    consumer.acknowledge(messages);
                }
            }
            catch (Exception ex) {
                LOG.trace((Throwable)ex, () -> "Batch ack failed due to: %s".formatted(ex.getMessage()));
                throw PulsarException.unwrap(ex);
            }
        }

        static void handleAckCumulative(Consumer<?> consumer, Message<?> last, @Nullable Transaction txn) {
            try {
                if (txn != null) {
                    consumer.acknowledgeCumulativeAsync(last.getMessageId(), txn).get();
                } else {
                    consumer.acknowledgeCumulative(last);
                }
            }
            catch (Exception ex) {
                LOG.trace((Throwable)ex, () -> "Cumulative ack failed w/ last msg id [%s] due to: %s".formatted(last.getMessageId(), ex.getMessage()));
                throw PulsarException.unwrap(ex);
            }
        }
    }

    private static final class ConsumerBatchAcknowledgment
    extends AbstractAcknowledgement {
        ConsumerBatchAcknowledgment(Consumer<?> consumer) {
            super(consumer);
        }

        @Override
        public void acknowledge() {
            throw new UnsupportedOperationException();
        }

        @Override
        public void nack() {
            throw new UnsupportedOperationException();
        }
    }

    private static final class ConsumerAcknowledgment
    extends AbstractAcknowledgement {
        private final Message<?> message;

        ConsumerAcknowledgment(Consumer<?> consumer, Message<?> message, @Nullable Transaction txn) {
            super(consumer, txn);
            this.message = message;
        }

        @Override
        public void acknowledge() {
            this.handleAckByMessageId(this.message.getMessageId());
        }

        @Override
        public void nack() {
            this.consumer.negativeAcknowledge(this.message);
        }
    }

    private static abstract class AbstractAcknowledgement
    implements Acknowledgement {
        protected final Consumer<?> consumer;
        private final @Nullable Transaction txn;

        AbstractAcknowledgement(Consumer<?> consumer) {
            this(consumer, null);
        }

        AbstractAcknowledgement(Consumer<?> consumer, @Nullable Transaction txn) {
            this.consumer = consumer;
            this.txn = txn;
        }

        protected @Nullable Transaction getTransaction() {
            return this.txn;
        }

        @Override
        public void acknowledge(MessageId messageId) {
            this.handleAckByMessageId(messageId);
        }

        @Override
        public void acknowledge(List<MessageId> messageIds) {
            try {
                AckUtils.handleAck(this.consumer, messageIds, this.txn);
            }
            catch (PulsarException pe) {
                for (MessageId messageId : messageIds) {
                    this.handleAckByMessageId(messageId);
                }
            }
        }

        @Override
        public void nack(MessageId messageId) {
            this.consumer.negativeAcknowledge(messageId);
        }

        protected void handleAckByMessageId(MessageId messageId) {
            AckUtils.handleAckWithNackOnFailure(this.consumer, messageId, this.txn);
        }
    }
}

