/*
 * Decompiled with CFR 0.152.
 */
package org.springframework.pulsar.listener;

import io.micrometer.observation.Observation;
import io.micrometer.observation.ObservationRegistry;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;
import org.apache.pulsar.client.api.BatchReceivePolicy;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.DeadLetterPolicy;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.MessageListener;
import org.apache.pulsar.client.api.Messages;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.RedeliveryBackoff;
import org.apache.pulsar.client.api.SubscriptionType;
import org.springframework.context.ApplicationEvent;
import org.springframework.context.ApplicationEventPublisher;
import org.springframework.core.log.LogAccessor;
import org.springframework.core.task.AsyncTaskExecutor;
import org.springframework.core.task.SimpleAsyncTaskExecutor;
import org.springframework.lang.Nullable;
import org.springframework.pulsar.core.ConsumerBuilderConfigurationUtil;
import org.springframework.pulsar.core.ConsumerBuilderCustomizer;
import org.springframework.pulsar.core.PulsarConsumerFactory;
import org.springframework.pulsar.event.ConsumerFailedToStartEvent;
import org.springframework.pulsar.event.ConsumerStartedEvent;
import org.springframework.pulsar.event.ConsumerStartingEvent;
import org.springframework.pulsar.listener.AbstractPulsarMessageListenerContainer;
import org.springframework.pulsar.listener.AckMode;
import org.springframework.pulsar.listener.Acknowledgement;
import org.springframework.pulsar.listener.PulsarAcknowledgingMessageListener;
import org.springframework.pulsar.listener.PulsarBatchAcknowledgingMessageListener;
import org.springframework.pulsar.listener.PulsarBatchListenerFailedException;
import org.springframework.pulsar.listener.PulsarBatchMessageListener;
import org.springframework.pulsar.listener.PulsarConsumerErrorHandler;
import org.springframework.pulsar.listener.PulsarContainerProperties;
import org.springframework.pulsar.listener.PulsarRecordMessageListener;
import org.springframework.pulsar.observation.DefaultPulsarListenerObservationConvention;
import org.springframework.pulsar.observation.PulsarListenerObservation;
import org.springframework.pulsar.observation.PulsarMessageReceiverContext;
import org.springframework.scheduling.SchedulingAwareRunnable;
import org.springframework.util.Assert;
import org.springframework.util.StringUtils;

public class DefaultPulsarMessageListenerContainer<T>
extends AbstractPulsarMessageListenerContainer<T> {
    private volatile CompletableFuture<?> listenerConsumerFuture;
    private volatile Listener listenerConsumer;
    private volatile CountDownLatch startLatch = new CountDownLatch(1);
    private final AbstractPulsarMessageListenerContainer<?> thisOrParentContainer;
    private final AtomicReference<Thread> listenerConsumerThread = new AtomicReference();
    private final AtomicBoolean receiveInProgress = new AtomicBoolean();
    private final Lock lockOnPause = new ReentrantLock();
    private final Condition pausedCondition = this.lockOnPause.newCondition();

    public DefaultPulsarMessageListenerContainer(PulsarConsumerFactory<? super T> pulsarConsumerFactory, PulsarContainerProperties pulsarContainerProperties) {
        this(pulsarConsumerFactory, pulsarContainerProperties, null);
    }

    public DefaultPulsarMessageListenerContainer(PulsarConsumerFactory<? super T> pulsarConsumerFactory, PulsarContainerProperties pulsarContainerProperties, @Nullable ObservationRegistry observationRegistry) {
        super(pulsarConsumerFactory, pulsarContainerProperties, observationRegistry);
        this.thisOrParentContainer = this;
    }

    @Override
    protected void doStart() {
        PulsarContainerProperties containerProperties = this.getContainerProperties();
        Object messageListenerObject = containerProperties.getMessageListener();
        AsyncTaskExecutor consumerExecutor = containerProperties.getConsumerTaskExecutor();
        MessageListener messageListener = (MessageListener)messageListenerObject;
        if (consumerExecutor == null) {
            consumerExecutor = new SimpleAsyncTaskExecutor((this.getBeanName() == null ? "" : this.getBeanName()) + "-C-");
            containerProperties.setConsumerTaskExecutor(consumerExecutor);
        }
        this.listenerConsumer = new Listener(messageListener, this.getContainerProperties(), this.getObservationRegistry());
        this.setRunning(true);
        this.startLatch = new CountDownLatch(1);
        this.listenerConsumerFuture = consumerExecutor.submitCompletable((Runnable)((Object)this.listenerConsumer));
        try {
            if (!this.startLatch.await(containerProperties.getConsumerStartTimeout().toMillis(), TimeUnit.MILLISECONDS)) {
                this.logger.error((CharSequence)"Consumer thread failed to start - does the configured task executor have enough threads to support all containers and concurrency?");
                this.publishConsumerFailedToStart();
            }
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }

    @Override
    public void doStop() {
        this.setRunning(false);
        this.logger.info((CharSequence)"Pausing this consumer.");
        this.listenerConsumer.consumer.pause();
        if (this.listenerConsumerThread.get() != null) {
            if (this.receiveInProgress.get()) {
                this.listenerConsumerThread.get().interrupt();
            }
            try {
                this.listenerConsumerThread.get().join();
            }
            catch (InterruptedException e) {
                this.logger.error((Throwable)e, () -> "Interrupting the main thread");
                Thread.currentThread().interrupt();
            }
        }
        try {
            this.logger.info((CharSequence)"Closing this consumer.");
            this.listenerConsumer.consumer.close();
        }
        catch (PulsarClientException e) {
            this.logger.error((Throwable)e, () -> "Error closing Pulsar Client.");
        }
    }

    private void publishConsumerStartingEvent() {
        this.startLatch.countDown();
        ApplicationEventPublisher publisher = this.getApplicationEventPublisher();
        if (publisher != null) {
            publisher.publishEvent((ApplicationEvent)new ConsumerStartingEvent(this, this.thisOrParentContainer));
        }
    }

    private void publishConsumerStartedEvent() {
        ApplicationEventPublisher publisher = this.getApplicationEventPublisher();
        if (publisher != null) {
            publisher.publishEvent((ApplicationEvent)new ConsumerStartedEvent(this, this.thisOrParentContainer));
        }
    }

    private void publishConsumerFailedToStart() {
        ApplicationEventPublisher publisher = this.getApplicationEventPublisher();
        if (publisher != null) {
            publisher.publishEvent((ApplicationEvent)new ConsumerFailedToStartEvent(this, this.thisOrParentContainer));
        }
    }

    @Override
    public void doPause() {
        this.setPaused(true);
        Listener consumer = this.listenerConsumer;
        if (consumer != null) {
            consumer.pause();
        }
    }

    @Override
    public void doResume() {
        Listener consumer = this.listenerConsumer;
        if (consumer != null) {
            consumer.resume();
        }
        this.setPaused(false);
        this.lockOnPause.lock();
        try {
            this.pausedCondition.signal();
        }
        finally {
            this.lockOnPause.unlock();
        }
    }

    private final class Listener
    implements SchedulingAwareRunnable {
        private final PulsarRecordMessageListener<T> listener;
        private final PulsarBatchMessageListener<T> batchMessageListener;
        private final PulsarContainerProperties containerProperties;
        private final ObservationRegistry observationRegistry;
        private Consumer<T> consumer;
        private final Set<MessageId> nackableMessages = new HashSet<MessageId>();
        private final PulsarConsumerErrorHandler<T> pulsarConsumerErrorHandler;
        private final ConsumerBuilderCustomizer<T> consumerBuilderCustomizer;
        private final boolean isBatchListener;
        private final AckMode ackMode;
        private final SubscriptionType subscriptionType;

        Listener(MessageListener<?> messageListener, @Nullable PulsarContainerProperties containerProperties, ObservationRegistry observationRegistry) {
            this.containerProperties = containerProperties;
            this.isBatchListener = this.containerProperties.isBatchListener();
            this.ackMode = this.containerProperties.getAckMode();
            this.subscriptionType = this.containerProperties.getSubscriptionType();
            if (messageListener instanceof PulsarBatchMessageListener) {
                this.batchMessageListener = (PulsarBatchMessageListener)messageListener;
                this.listener = null;
            } else if (messageListener != null) {
                this.listener = (PulsarRecordMessageListener)messageListener;
                this.batchMessageListener = null;
            } else {
                this.listener = null;
                this.batchMessageListener = null;
            }
            this.observationRegistry = observationRegistry;
            this.pulsarConsumerErrorHandler = DefaultPulsarMessageListenerContainer.this.getPulsarConsumerErrorHandler();
            this.consumerBuilderCustomizer = DefaultPulsarMessageListenerContainer.this.getConsumerBuilderCustomizer();
            try {
                Map<String, Object> propertiesToConsumer = this.extractDirectConsumerProperties();
                this.populateAllNecessaryPropertiesIfNeedBe(propertiesToConsumer);
                BatchReceivePolicy batchReceivePolicy = new BatchReceivePolicy.Builder().maxNumMessages(containerProperties.getMaxNumMessages()).maxNumBytes(containerProperties.getMaxNumBytes()).timeout(containerProperties.getBatchTimeoutMillis(), TimeUnit.MILLISECONDS).build();
                Set topicNames = (Set)propertiesToConsumer.remove("topicNames");
                Map properties = (Map)propertiesToConsumer.remove("properties");
                ArrayList customizers = new ArrayList();
                customizers.add(builder -> {
                    ConsumerBuilderConfigurationUtil.loadConf(builder, propertiesToConsumer);
                    builder.batchReceivePolicy(batchReceivePolicy);
                });
                if (this.consumerBuilderCustomizer != null) {
                    customizers.add(this.consumerBuilderCustomizer);
                }
                this.consumer = DefaultPulsarMessageListenerContainer.this.getPulsarConsumerFactory().createConsumer(containerProperties.getSchema(), topicNames, this.containerProperties.getSubscriptionName(), properties, customizers);
                Assert.state((this.consumer != null ? 1 : 0) != 0, (String)"Unable to create a consumer");
            }
            catch (PulsarClientException e) {
                DefaultPulsarMessageListenerContainer.this.logger.error((Throwable)e, () -> "Pulsar client exceptions.");
            }
        }

        private Map<String, Object> extractDirectConsumerProperties() {
            Properties propertyOverrides = this.containerProperties.getPulsarConsumerProperties();
            return propertyOverrides.entrySet().stream().collect(Collectors.toMap(e -> String.valueOf(e.getKey()), Map.Entry::getValue, (prev, next) -> next, HashMap::new));
        }

        private void populateAllNecessaryPropertiesIfNeedBe(Map<String, Object> currentProperties) {
            DeadLetterPolicy deadLetterPolicy;
            RedeliveryBackoff ackTimeoutRedeliveryBackoff;
            RedeliveryBackoff negativeAckRedeliveryBackoff;
            String topicsPattern;
            SubscriptionType subscriptionType;
            String topicsFromMap;
            String[] topicNames;
            Set<String> propertiesDefinedTopics;
            if (currentProperties.containsKey("topicNames") && !(propertiesDefinedTopics = Set.of(topicNames = StringUtils.delimitedListToStringArray((String)(topicsFromMap = (String)currentProperties.get("topicNames")), (String)","))).isEmpty()) {
                currentProperties.put("topicNames", propertiesDefinedTopics);
            }
            if (!currentProperties.containsKey("subscriptionType") && (subscriptionType = this.containerProperties.getSubscriptionType()) != null) {
                currentProperties.put("subscriptionType", subscriptionType);
            }
            if (!currentProperties.containsKey("topicNames")) {
                Set<String> listenerDefinedTopics = this.containerProperties.getTopics();
                if (!this.containerProperties.getTopics().isEmpty()) {
                    currentProperties.put("topicNames", listenerDefinedTopics);
                }
            }
            if (!currentProperties.containsKey("topicsPattern") && (topicsPattern = this.containerProperties.getTopicsPattern()) != null) {
                currentProperties.put("topicsPattern", topicsPattern);
            }
            if (!currentProperties.containsKey("subscriptionName") && StringUtils.hasText((String)this.containerProperties.getSubscriptionName())) {
                currentProperties.put("subscriptionName", this.containerProperties.getSubscriptionName());
            }
            if ((negativeAckRedeliveryBackoff = DefaultPulsarMessageListenerContainer.this.negativeAckRedeliveryBackoff) != null) {
                currentProperties.put("negativeAckRedeliveryBackoff", negativeAckRedeliveryBackoff);
            }
            if ((ackTimeoutRedeliveryBackoff = DefaultPulsarMessageListenerContainer.this.ackTimeoutRedeliveryBackoff) != null) {
                currentProperties.put("ackTimeoutRedeliveryBackoff", ackTimeoutRedeliveryBackoff);
            }
            if ((deadLetterPolicy = DefaultPulsarMessageListenerContainer.this.deadLetterPolicy) != null) {
                currentProperties.put("deadLetterPolicy", deadLetterPolicy);
            }
        }

        public boolean isLongLived() {
            return true;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void run() {
            DefaultPulsarMessageListenerContainer.this.listenerConsumerThread.set(Thread.currentThread());
            DefaultPulsarMessageListenerContainer.this.publishConsumerStartingEvent();
            DefaultPulsarMessageListenerContainer.this.publishConsumerStartedEvent();
            AtomicBoolean inRetryMode = new AtomicBoolean(false);
            AtomicBoolean messagesPendingInBatch = new AtomicBoolean(false);
            Messages messages = null;
            List messageList = null;
            while (DefaultPulsarMessageListenerContainer.this.isRunning()) {
                this.checkIfPausedAndHandleAccordingly();
                try {
                    if (!inRetryMode.get() && !messagesPendingInBatch.get()) {
                        DefaultPulsarMessageListenerContainer.this.receiveInProgress.set(true);
                        if (!DefaultPulsarMessageListenerContainer.this.isPaused()) {
                            messages = this.consumer.batchReceive();
                        }
                    }
                }
                catch (PulsarClientException e) {
                    if (e.getCause() instanceof InterruptedException) {
                        DefaultPulsarMessageListenerContainer.this.logger.debug((Throwable)e, () -> "Error receiving messages due to a thread interrupt call from upstream.");
                    } else {
                        DefaultPulsarMessageListenerContainer.this.logger.error((Throwable)e, () -> "Error receiving messages.");
                    }
                    messages = null;
                }
                finally {
                    DefaultPulsarMessageListenerContainer.this.receiveInProgress.set(false);
                }
                if (messages == null) continue;
                if (this.isBatchListener) {
                    if (!inRetryMode.get() && !messagesPendingInBatch.get()) {
                        messageList = new ArrayList();
                        messages.forEach(messageList::add);
                    }
                    try {
                        if (messageList == null || messageList.size() <= 0) continue;
                        if (this.batchMessageListener instanceof PulsarBatchAcknowledgingMessageListener) {
                            this.batchMessageListener.received(this.consumer, messageList, (Acknowledgement)(this.ackMode.equals((Object)AckMode.MANUAL) ? new ConsumerBatchAcknowledgment(this.consumer) : null));
                        } else {
                            this.batchMessageListener.received(this.consumer, messageList);
                        }
                        if (this.ackMode.equals((Object)AckMode.BATCH)) {
                            try {
                                if (this.isSharedSubscriptionType()) {
                                    this.consumer.acknowledge(messages);
                                } else {
                                    Stream stream = StreamSupport.stream(messages.spliterator(), true);
                                    Message last = stream.reduce((a, b) -> b).orElse(null);
                                    this.consumer.acknowledgeCumulative(last);
                                }
                            }
                            catch (PulsarClientException pce) {
                                this.consumer.negativeAcknowledge(messages);
                            }
                        }
                        if (this.pulsarConsumerErrorHandler == null) continue;
                        this.pendingMessagesHandledSuccessfully(inRetryMode, messagesPendingInBatch);
                    }
                    catch (Exception e) {
                        if (this.pulsarConsumerErrorHandler != null) {
                            messageList = this.invokeBatchListenerErrorHandler(inRetryMode, messagesPendingInBatch, messageList, e);
                            continue;
                        }
                        this.consumer.negativeAcknowledge(messages);
                    }
                    continue;
                }
                for (Message message : messages) {
                    do {
                        this.newObservation(message).observe(() -> this.dispatchMessageToListener(message, inRetryMode));
                    } while (inRetryMode.get());
                }
                if (!this.ackMode.equals((Object)AckMode.BATCH)) continue;
                this.handleAcks(messages);
            }
        }

        private void checkIfPausedAndHandleAccordingly() {
            if (DefaultPulsarMessageListenerContainer.this.isPaused()) {
                DefaultPulsarMessageListenerContainer.this.lockOnPause.lock();
                try {
                    DefaultPulsarMessageListenerContainer.this.pausedCondition.await();
                }
                catch (InterruptedException e) {
                    throw new IllegalStateException("Exception occurred trying to wake up the paused listener thread.");
                }
                finally {
                    DefaultPulsarMessageListenerContainer.this.lockOnPause.unlock();
                }
            }
        }

        private Observation newObservation(Message<T> message) {
            if (this.observationRegistry == null) {
                return Observation.NOOP;
            }
            return PulsarListenerObservation.LISTENER_OBSERVATION.observation(this.containerProperties.getObservationConvention(), DefaultPulsarListenerObservationConvention.INSTANCE, () -> new PulsarMessageReceiverContext(message, DefaultPulsarMessageListenerContainer.this.getBeanName()), this.observationRegistry);
        }

        private void dispatchMessageToListener(Message<T> message, AtomicBoolean inRetryMode) {
            try {
                if (this.listener instanceof PulsarAcknowledgingMessageListener) {
                    this.listener.received(this.consumer, message, this.ackMode.equals((Object)AckMode.MANUAL) ? new ConsumerAcknowledgment(this.consumer, message) : null);
                } else if (this.listener != null) {
                    this.listener.received(this.consumer, message);
                }
                if (this.ackMode.equals((Object)AckMode.RECORD)) {
                    this.handleAck(message);
                }
                inRetryMode.compareAndSet(true, false);
            }
            catch (Exception e) {
                if (this.pulsarConsumerErrorHandler != null) {
                    this.invokeRecordListenerErrorHandler(inRetryMode, message, e);
                }
                if (this.ackMode.equals((Object)AckMode.RECORD)) {
                    this.consumer.negativeAcknowledge(message);
                }
                if (this.ackMode.equals((Object)AckMode.BATCH)) {
                    this.nackableMessages.add(message.getMessageId());
                }
                throw new IllegalStateException("Exception occurred and message %s was not auto-nacked; switch to AckMode BATCH or RECORD to enable auto-nacks".formatted(message.getMessageId()), e);
            }
        }

        private List<Message<T>> invokeBatchListenerErrorHandler(AtomicBoolean inRetryMode, AtomicBoolean messagesPendingInBatch, List<Message<T>> messageList, Throwable exception) {
            if (!(exception instanceof PulsarBatchListenerFailedException)) {
                exception = exception.getCause();
                Assert.isInstanceOf(PulsarBatchListenerFailedException.class, (Object)exception, (String)"Batch listener should throw PulsarBatchListenerFailedException on errors.");
            }
            PulsarBatchListenerFailedException pulsarBatchListenerFailedException = (PulsarBatchListenerFailedException)((Object)exception);
            Message pulsarMessage = this.getPulsarMessageCausedTheException(pulsarBatchListenerFailedException);
            Message theCurrentPulsarMessageTracked = this.pulsarConsumerErrorHandler.currentMessage();
            if (theCurrentPulsarMessageTracked != null && !theCurrentPulsarMessageTracked.equals(pulsarMessage)) {
                this.pendingMessagesHandledSuccessfully(inRetryMode, messagesPendingInBatch);
            }
            int indexOfFailedMessage = messageList.indexOf(pulsarMessage);
            messageList = messageList.subList(indexOfFailedMessage, messageList.size());
            boolean toBeRetried = this.pulsarConsumerErrorHandler.shouldRetryMessage((Exception)((Object)pulsarBatchListenerFailedException), pulsarMessage);
            if (toBeRetried) {
                inRetryMode.set(true);
            } else {
                inRetryMode.compareAndSet(true, false);
                this.pulsarConsumerErrorHandler.recoverMessage(this.consumer, pulsarMessage, (Exception)((Object)pulsarBatchListenerFailedException));
                this.handleAck(pulsarMessage);
                if (messageList.size() == 1) {
                    messagesPendingInBatch.set(false);
                } else {
                    messageList = messageList.subList(1, messageList.size());
                }
                if (!messageList.isEmpty()) {
                    messagesPendingInBatch.set(true);
                }
                this.pulsarConsumerErrorHandler.clearMessage();
            }
            return messageList;
        }

        private void invokeRecordListenerErrorHandler(AtomicBoolean inRetryMode, Message<T> message, Exception e) {
            boolean toBeRetried = this.pulsarConsumerErrorHandler.shouldRetryMessage(e, message);
            if (toBeRetried) {
                inRetryMode.set(true);
            } else {
                inRetryMode.compareAndSet(true, false);
                this.pulsarConsumerErrorHandler.recoverMessage(this.consumer, message, e);
                if (this.ackMode.equals((Object)AckMode.RECORD)) {
                    this.handleAck(message);
                }
            }
        }

        private void pendingMessagesHandledSuccessfully(AtomicBoolean inRetryMode, AtomicBoolean messagesPendingInBatch) {
            inRetryMode.compareAndSet(true, false);
            messagesPendingInBatch.compareAndSet(true, false);
            this.pulsarConsumerErrorHandler.clearMessage();
        }

        private Message<T> getPulsarMessageCausedTheException(PulsarBatchListenerFailedException exception) {
            return (Message)exception.getMessageInError();
        }

        private boolean isSharedSubscriptionType() {
            return this.subscriptionType.equals((Object)SubscriptionType.Shared) || this.subscriptionType.equals((Object)SubscriptionType.Key_Shared);
        }

        private void handleAcks(Messages<T> messages) {
            block7: {
                if (this.nackableMessages.isEmpty()) {
                    try {
                        if (messages.size() <= 0) break block7;
                        if (this.isSharedSubscriptionType()) {
                            this.consumer.acknowledge(messages);
                            break block7;
                        }
                        Stream stream = StreamSupport.stream(messages.spliterator(), true);
                        Message last = stream.reduce((a, b) -> b).orElse(null);
                        this.consumer.acknowledgeCumulative(last);
                    }
                    catch (PulsarClientException pce) {
                        this.consumer.negativeAcknowledge(messages);
                    }
                } else {
                    for (Message message : messages) {
                        if (this.nackableMessages.contains(message.getMessageId())) {
                            this.consumer.negativeAcknowledge(message);
                            this.nackableMessages.remove(message.getMessageId());
                            continue;
                        }
                        this.handleAck(message);
                    }
                }
            }
        }

        private void handleAck(Message<T> message) {
            AbstractAcknowledgement.handleAckByMessageId(this.consumer, message.getMessageId());
        }

        public void pause() {
            if (this.consumer != null) {
                this.consumer.pause();
            }
        }

        public void resume() {
            if (this.consumer != null) {
                this.consumer.resume();
            }
        }
    }

    private static final class ConsumerBatchAcknowledgment
    extends AbstractAcknowledgement {
        ConsumerBatchAcknowledgment(Consumer<?> consumer) {
            super(consumer);
        }

        @Override
        public void acknowledge() {
            throw new UnsupportedOperationException();
        }

        @Override
        public void nack() {
            throw new UnsupportedOperationException();
        }
    }

    private static final class ConsumerAcknowledgment
    extends AbstractAcknowledgement {
        private final Message<?> message;

        ConsumerAcknowledgment(Consumer<?> consumer, Message<?> message) {
            super(consumer);
            this.message = message;
        }

        @Override
        public void acknowledge() {
            this.acknowledge(this.message.getMessageId());
        }

        @Override
        public void nack() {
            this.consumer.negativeAcknowledge(this.message);
        }
    }

    private static abstract class AbstractAcknowledgement
    implements Acknowledgement {
        private static final LogAccessor logger = new LogAccessor(AbstractAcknowledgement.class);
        protected final Consumer<?> consumer;

        AbstractAcknowledgement(Consumer<?> consumer) {
            this.consumer = consumer;
        }

        @Override
        public void acknowledge(MessageId messageId) {
            AbstractAcknowledgement.handleAckByMessageId(this.consumer, messageId);
        }

        private static void handleAckByMessageId(Consumer<?> consumer, MessageId messageId) {
            try {
                consumer.acknowledge(messageId);
            }
            catch (PulsarClientException pce) {
                logger.warn((Throwable)pce, () -> "Acknowledgment failed for message: [%s]".formatted(messageId));
                consumer.negativeAcknowledge(messageId);
            }
        }

        @Override
        public void acknowledge(List<MessageId> messageIds) {
            try {
                this.consumer.acknowledge(messageIds);
            }
            catch (PulsarClientException e) {
                for (MessageId messageId : messageIds) {
                    AbstractAcknowledgement.handleAckByMessageId(this.consumer, messageId);
                }
            }
        }

        @Override
        public void nack(MessageId messageId) {
            this.consumer.negativeAcknowledge(messageId);
        }
    }
}

