/*
 * Decompiled with CFR 0.152.
 */
package com.solace.messaging.receiver;

import com.solace.messaging.MessagingService;
import com.solace.messaging.PubSubPlusClientException;
import com.solace.messaging.receiver.AsyncReceiverSubscriptions;
import com.solace.messaging.receiver.InboundMessage;
import com.solace.messaging.receiver.MessageReceiver;
import com.solace.messaging.receiver.PersistentMessageReceiver;
import com.solace.messaging.receiver.ReceiverBuffers;
import com.solace.messaging.resources.CachedTopicSubscription;
import com.solace.messaging.resources.TopicSubscription;
import com.solace.messaging.util.CompletionListener;
import com.solace.messaging.util.LifecycleControl;
import com.solace.messaging.util.Manageable;
import com.solace.messaging.util.ManageableReceiver;
import com.solace.messaging.util.TypedProperties;
import com.solace.messaging.util.async.ExecutableMessageHandler;
import com.solace.messaging.util.async.ExtendedCompletableFuture;
import com.solace.messaging.util.async.ThreadFactories;
import com.solace.messaging.util.internal.ClientSession;
import com.solace.messaging.util.internal.Internal;
import com.solace.messaging.util.internal.MessageReceiptFailureNotificationDispatcher;
import com.solace.messaging.util.internal.MessagingServiceInternalView;
import com.solace.messaging.util.internal.ServiceEventImpl;
import com.solace.messaging.util.internal.Task;
import com.solace.messaging.util.internal.TerminationEventImpl;
import com.solace.messaging.util.internal.TerminationNotificationDispatcher;
import com.solace.messaging.util.internal.Validation;
import com.solacesystems.common.util.DestinationUtil;
import com.solacesystems.jcsmp.AccessDeniedException;
import com.solacesystems.jcsmp.CapabilityType;
import com.solacesystems.jcsmp.ConsumerFlowProperties;
import com.solacesystems.jcsmp.Endpoint;
import com.solacesystems.jcsmp.EndpointProperties;
import com.solacesystems.jcsmp.FlowEvent;
import com.solacesystems.jcsmp.FlowEventArgs;
import com.solacesystems.jcsmp.FlowEventHandler;
import com.solacesystems.jcsmp.FlowReceiver;
import com.solacesystems.jcsmp.InvalidPropertiesException;
import com.solacesystems.jcsmp.JCSMPErrorResponseException;
import com.solacesystems.jcsmp.JCSMPErrorResponseSubcodeEx;
import com.solacesystems.jcsmp.JCSMPException;
import com.solacesystems.jcsmp.JCSMPFactory;
import com.solacesystems.jcsmp.JCSMPInterruptedException;
import com.solacesystems.jcsmp.PropertyMismatchException;
import com.solacesystems.jcsmp.Queue;
import com.solacesystems.jcsmp.ReplayStartLocation;
import com.solacesystems.jcsmp.ReplicationGroupMessageId;
import com.solacesystems.jcsmp.Subscription;
import com.solacesystems.jcsmp.Topic;
import com.solacesystems.jcsmp.impl.QueueImpl;
import com.solacesystems.jcsmp.impl.ReplayStartLocationBeginningImpl;
import com.solacesystems.jcsmp.impl.ReplayStartLocationDateImpl;
import com.solacesystems.jcsmp.impl.ReplicationGroupMessageIdImpl;
import com.solacesystems.jcsmp.statistics.StatType;
import java.lang.invoke.LambdaMetafactory;
import java.time.Instant;
import java.time.ZonedDateTime;
import java.util.ArrayList;
import java.util.Date;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.atomic.AtomicStampedReference;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.osgi.annotation.versioning.ProviderType;

@Internal
@ProviderType
public class PersistentMessageReceiverImpl
implements PersistentMessageReceiver {
    private final SolaceQueueHolder queue;
    private final MessagingServiceInternalView serviceInternalView;
    private final TypedProperties receiverConfiguration;
    private final ConsumerFlowProperties consumerProps;
    private final MessagingService.ServiceInterruptionListener serviceInterruptionListener;
    private final ClientSession.ClientSessionStateListener closedSessionListener;
    private final ReceiverBuffers.ReceiverBuffer buffer;
    private final FlowControlHandler flowControlHandler;
    private final ExecutorService defaultReceiverExecutorService;
    private final ExecutorService asyncSubscriptionsExecutorService;
    private final List<Task<PersistentMessageReceiverImpl>> preStartTasks = new CopyOnWriteArrayList<Task<PersistentMessageReceiverImpl>>();
    private final List<Task<PersistentMessageReceiverImpl>> postStartAsyncTasks = new CopyOnWriteArrayList<Task<PersistentMessageReceiverImpl>>();
    private final List<TopicSubscription> appliedTopicSubscriptions = new CopyOnWriteArrayList<TopicSubscription>();
    private final MessagingService.ReconnectionListener reconnectionListener;
    private final MessagingService.ReconnectionAttemptListener reconnectionAttemptListener;
    private final ManageableReceiver.PersistentReceiverInfo receiverInfo;
    private final AtomicReference<ExecutableMessageHandler> messageHandlerRef = new AtomicReference();
    private final ReentrantLock messageHandlersLock = new ReentrantLock();
    private final ReentrantLock subscriptionsLock = new ReentrantLock();
    private final Condition notEmpty = this.messageHandlersLock.newCondition();
    private final AsyncConsumerMessageDispatchTask asyncConsumerMessageDispatchTask;
    private final long id;
    private final String instanceName;
    private final MessageReceiptFailureNotificationDispatcher messageReceiptFailureNotificationDispatcher;
    private final boolean autoAckEnabled;
    private volatile FlowReceiver solaceReceiver = null;
    private static final AtomicLong instanceIdGenerator = new AtomicLong(0L);
    private static final Log logger = LogFactory.getLog(PersistentMessageReceiverImpl.class);
    private static final Task<PersistentMessageReceiverImpl> NO_OP_TASK = receiver -> {};
    private final StopSolaceConsumerTask stopSolaceConsumerTask;
    private final TerminationNotificationDispatcher terminationNotificationDispatcher;
    static final int STATE_NOT_STARTED = 0;
    static final int STATE_STARTING = 1;
    static final int STATE_STARTED = 2;
    static final int STATE_TERMINATING = 3;
    static final int STATE_TERMINATED = 4;
    final AtomicStampedReference<CompletableFuture> stateHolder = new AtomicStampedReference<Object>(null, 0);

    public PersistentMessageReceiverImpl(MessagingServiceInternalView serviceInternalView, TypedProperties receiverConfiguration, List<TopicSubscription> initialTopicSubscriptions, com.solace.messaging.resources.Queue queue) {
        this.id = instanceIdGenerator.incrementAndGet();
        this.instanceName = "PersistentMessageReceiver@" + this.id;
        this.receiverConfiguration = receiverConfiguration;
        this.autoAckEnabled = this.isAutoAckConfigured(this.receiverConfiguration);
        this.serviceInternalView = serviceInternalView;
        this.terminationNotificationDispatcher = new TerminationNotificationDispatcher();
        this.queue = SolaceQueueHolder.create(queue, this.serviceInternalView.getClientSession());
        this.flowControlHandler = new FlowControlHandler();
        this.buffer = ReceiverBuffers.createCapacityAwareBuffer(this.receiverConfiguration, this.flowControlHandler);
        this.consumerProps = this.createFlowConfiguration(this.receiverConfiguration, this.queue);
        this.defaultReceiverExecutorService = Executors.newSingleThreadExecutor(new ThreadFactories.NamedDaemonThreadFactory(this.instanceName + "-message-dispatcher"));
        this.asyncSubscriptionsExecutorService = Executors.newSingleThreadExecutor(new ThreadFactories.NamedDaemonThreadFactory(this.instanceName + "-subscription-dispatcher"));
        this.receiverInfo = new PersistentReceiverInfoImpl();
        this.serviceInterruptionListener = new MessagingService.ServiceInterruptionListener(){

            @Override
            public void onServiceInterrupted(MessagingService.ServiceEvent e) {
                if (logger.isWarnEnabled()) {
                    logger.warn((Object)(PersistentMessageReceiverImpl.this.instanceName + " is shutting down due to service interruption"));
                }
                PersistentMessageReceiverImpl.this.stateHolder.set(null, 4);
                PersistentMessageReceiverImpl.this.terminationNotificationDispatcher.onTermination(new TerminationEventImpl(e.getTimestamp(), e.getMessage(), e.getCause()));
                PersistentMessageReceiverImpl.this.terminateOnUnsolicitedInterruption();
            }
        };
        this.closedSessionListener = new ClientSession.ClientSessionStateListener(){

            @Override
            public void onClientSessionStateChange(ClientSession.ClientSessionStateChangeEvent event) {
                if (logger.isWarnEnabled()) {
                    logger.warn((Object)"Shutting down receiver due to service closure");
                }
                PersistentMessageReceiverImpl.this.stateHolder.set(null, 4);
                PersistentMessageReceiverImpl.this.terminationNotificationDispatcher.onTermination(new TerminationEventImpl(event.getTimestamp(), event.getMessage(), event.getCause()));
                PersistentMessageReceiverImpl.this.terminateOnUnsolicitedInterruption();
            }
        };
        this.reconnectionListener = new MessagingService.ReconnectionListener(){

            @Override
            public void onReconnected(MessagingService.ServiceEvent e) {
                if (logger.isDebugEnabled()) {
                    logger.debug((Object)(PersistentMessageReceiverImpl.this.instanceName + " is reconnected."));
                }
            }
        };
        this.reconnectionAttemptListener = new MessagingService.ReconnectionAttemptListener(){

            @Override
            public void onReconnecting(MessagingService.ServiceEvent e) {
                if (logger.isDebugEnabled()) {
                    logger.debug((Object)(PersistentMessageReceiverImpl.this.instanceName + " is reconnecting."));
                }
            }
        };
        this.asyncConsumerMessageDispatchTask = new AsyncConsumerMessageDispatchTask(this.buffer, this, this.messageHandlersLock, this.notEmpty);
        this.preStartTasks.add(this.addSubscriptions(new ArrayList<TopicSubscription>(initialTopicSubscriptions), this.queue));
        this.messageReceiptFailureNotificationDispatcher = new MessageReceiptFailureNotificationDispatcher(this.receiverInfo);
        this.stopSolaceConsumerTask = new StopSolaceConsumerTask();
    }

    @Override
    public ManageableReceiver.PersistentReceiverInfo receiverInfo() {
        return this.receiverInfo;
    }

    @Override
    public PersistentMessageReceiver start() throws PubSubPlusClientException {
        try {
            this.startAsync().get();
        }
        catch (ExecutionException e) {
            Throwable t = e.getCause();
            if (t != null) {
                if (t instanceof PubSubPlusClientException) {
                    throw (PubSubPlusClientException)t;
                }
                if (t instanceof IllegalStateException) {
                    throw (IllegalStateException)t;
                }
                throw new PubSubPlusClientException(t);
            }
            if (logger.isErrorEnabled()) {
                logger.error((Object)(this.instanceName + " failed to start"), (Throwable)e);
            }
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new PubSubPlusClientException.RequestInterruptedException("Message receiver start was canceled", e);
        }
        catch (CancellationException e) {
            throw new PubSubPlusClientException.RequestInterruptedException("Message receiver start was canceled", e);
        }
        return this;
    }

    @Override
    public void terminate(long gracePeriod) throws PubSubPlusClientException.IncompleteMessageDeliveryException, PubSubPlusClientException, IllegalStateException {
        block7: {
            if (gracePeriod == 0L) {
                this.terminateNow();
                return;
            }
            try {
                this.terminateAsync(gracePeriod).get();
            }
            catch (ExecutionException e) {
                Throwable t = e.getCause();
                if (t != null) {
                    if (t instanceof PubSubPlusClientException) {
                        throw (PubSubPlusClientException)t;
                    }
                    throw new PubSubPlusClientException(t);
                }
                throw new PubSubPlusClientException(e);
            }
            catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                throw new PubSubPlusClientException.RequestInterruptedException("Message receiver termination was interrupted", e);
            }
            catch (Exception e) {
                if (!logger.isWarnEnabled()) break block7;
                logger.warn((Object)(this.instanceName + " encountered problem during termination."), (Throwable)e);
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    void terminateNow() throws PubSubPlusClientException.IncompleteMessageDeliveryException, PubSubPlusClientException, IllegalStateException {
        block5: {
            int m;
            this.stateHolder.set(null, 4);
            AtomicInteger lostMessages = new AtomicInteger(0);
            Task<PersistentMessageReceiverImpl> postTerminationTask = receiver -> {
                block4: {
                    try {
                        int bufferSize = receiver.buffer.size();
                        boolean bufferEmpty = bufferSize < 1;
                        receiver.buffer.clear();
                        if (!bufferEmpty) {
                            lostMessages.set(bufferSize);
                            if (logger.isWarnEnabled()) {
                                logger.warn((Object)(receiver.instanceName + " non-gracefully terminated before all buffered messages were processed."));
                            }
                        }
                    }
                    catch (PubSubPlusClientException.RequestInterruptedException e) {
                        if (!logger.isWarnEnabled()) break block4;
                        logger.warn((Object)("Non-graceful termination of " + receiver.instanceName + " was interrupted"));
                    }
                }
            };
            try {
                if (logger.isDebugEnabled()) {
                    logger.debug((Object)(this.instanceName + " is being non gracefully terminated"));
                }
                this.onTerminate(this.stopSolaceConsumerTask, postTerminationTask);
                if (logger.isDebugEnabled()) {
                    logger.debug((Object)(this.instanceName + " is terminated"));
                }
                if ((m = lostMessages.get()) <= 0) break block5;
                this.serviceInternalView.getApiMetricsCollector().increaseMetric(Manageable.ApiMetrics.Metric.RECEIVED_MESSAGES_TERMINATION_DISCARDED, m);
            }
            catch (Throwable throwable) {
                int m2 = lostMessages.get();
                if (m2 > 0) {
                    this.serviceInternalView.getApiMetricsCollector().increaseMetric(Manageable.ApiMetrics.Metric.RECEIVED_MESSAGES_TERMINATION_DISCARDED, m2);
                    throw new PubSubPlusClientException.IncompleteMessageDeliveryException(String.format("Processing of %d messages could not be completed due to non graceful termination", m2), m2);
                }
                throw throwable;
            }
            throw new PubSubPlusClientException.IncompleteMessageDeliveryException(String.format("Processing of %d messages could not be completed due to non graceful termination", m), m);
        }
    }

    @Override
    public boolean isRunning() {
        return 2 == this.stateHolder.getStamp();
    }

    @Override
    public boolean isTerminated() {
        return 4 == this.stateHolder.getStamp();
    }

    @Override
    public boolean isTerminating() {
        return 3 == this.stateHolder.getStamp();
    }

    @Override
    public void setTerminationNotificationListener(LifecycleControl.TerminationNotificationListener listener) {
        this.terminationNotificationDispatcher.setTerminationNotificationListener(listener);
    }

    @Override
    public void setReceiveFailureListener(MessageReceiver.ReceiveFailureListener receiveFailureListener) {
        this.messageReceiptFailureNotificationDispatcher.setReceiveFailureListener(receiveFailureListener);
    }

    /*
     * Enabled force condition propagation
     * Lifted jumps to return sites
     */
    @Override
    public <PersistentMessageReceiver> CompletableFuture<PersistentMessageReceiver> startAsync() throws PubSubPlusClientException, IllegalStateException {
        int state = this.stateHolder.getStamp();
        if (3 == state || state == 4) {
            throw new IllegalStateException("Message receiver is already terminated");
        }
        if (!this.serviceInternalView.isConnected()) {
            throw new IllegalStateException("Message receiver can't be started when service is not connected");
        }
        block6: while (true) {
            if (!this.serviceInternalView.isConnected()) {
                return ExtendedCompletableFuture.failedFuture(new IllegalStateException("Message receiver can't be started when service is not connected"));
            }
            int[] currentStateHolder = new int[1];
            CompletableFuture currentFuture = this.stateHolder.get(currentStateHolder);
            int currentState = currentStateHolder[0];
            switch (currentState) {
                case 0: {
                    ExtendedCompletableFuture<PersistentMessageReceiverImpl> starting;
                    boolean stateChanged;
                    if (!(stateChanged = this.stateHolder.compareAndSet(null, starting = new ExtendedCompletableFuture<PersistentMessageReceiverImpl>(), 0, 1))) continue block6;
                    if (logger.isDebugEnabled()) {
                        logger.debug((Object)(this.instanceName + " is being started"));
                    }
                    try {
                        boolean startingToStarted = this.stateHolder.compareAndSet(starting, starting, 1, 2);
                        if (!startingToStarted) {
                            int newCurrentState = this.stateHolder.getStamp();
                            if (newCurrentState >= 3) {
                                this.onTerminate(null, null);
                                starting.completeExceptionally(new CancellationException("Starting of message receiver was interrupted"));
                                return starting;
                            }
                        } else {
                            this.onStart();
                        }
                        starting.complete(this);
                        if (!logger.isDebugEnabled()) return ExtendedCompletableFuture.onCancellation(starting, (service, throwable) -> {
                            this.stateHolder.set(null, 4);
                            this.onTerminate(null, null);
                            if (logger.isDebugEnabled()) {
                                logger.debug((Object)(this.instanceName + " async start was canceled"));
                            }
                        });
                        logger.debug((Object)(this.instanceName + " is started"));
                        return ExtendedCompletableFuture.onCancellation(starting, (service, throwable) -> {
                            this.stateHolder.set(null, 4);
                            this.onTerminate(null, null);
                            if (logger.isDebugEnabled()) {
                                logger.debug((Object)(this.instanceName + " async start was canceled"));
                            }
                        });
                    }
                    catch (Exception e) {
                        e.printStackTrace();
                        this.stateHolder.set(null, 4);
                        this.onTerminate(null, null);
                        starting.completeExceptionally(PubSubPlusClientException.of(e));
                        if (!logger.isErrorEnabled()) return ExtendedCompletableFuture.onCancellation(starting, (service, throwable) -> {
                            this.stateHolder.set(null, 4);
                            this.onTerminate(null, null);
                            if (logger.isDebugEnabled()) {
                                logger.debug((Object)(this.instanceName + " async start was canceled"));
                            }
                        });
                        logger.error((Object)(this.instanceName + " failed to start and is terminating"), (Throwable)e);
                    }
                    return ExtendedCompletableFuture.onCancellation(starting, (service, throwable) -> {
                        this.stateHolder.set(null, 4);
                        this.onTerminate(null, null);
                        if (logger.isDebugEnabled()) {
                            logger.debug((Object)(this.instanceName + " async start was canceled"));
                        }
                    });
                }
                case 1: 
                case 2: {
                    return currentFuture;
                }
            }
            break;
        }
        return ExtendedCompletableFuture.failedFuture(new IllegalStateException("Message receiver is already terminated"));
    }

    @Override
    public InboundMessage receiveMessage() throws PubSubPlusClientException {
        if (this.isRunning() || this.isTerminating()) {
            InboundMessage message = this.buffer.consume().getMessage();
            if (this.autoAckEnabled && message != null) {
                ((MessageReceiver.InboundMessageImpl)message).doAck();
            }
            return message;
        }
        throw new IllegalStateException("Message receiver is not started");
    }

    @Override
    public InboundMessage receiveMessage(long timeOut) throws PubSubPlusClientException {
        if (this.isRunning() || this.isTerminating()) {
            InboundMessage message;
            ReceiverBuffers.Receivable receivable = this.buffer.consume(timeOut, TimeUnit.MILLISECONDS);
            InboundMessage inboundMessage = message = receivable != null ? receivable.getMessage() : null;
            if (this.autoAckEnabled && message != null) {
                ((MessageReceiver.InboundMessageImpl)message).doAck();
            }
            return message;
        }
        throw new IllegalStateException("Message receiver is not started");
    }

    @Override
    public InboundMessage receiveOrElse(MessageReceiver.InboundMessageSupplier supplierOfAlternativeResponse) {
        if (this.isRunning() || this.isTerminating()) {
            InboundMessage message;
            Validation.nullIllegal(supplierOfAlternativeResponse, "Response supplier can't be null");
            ReceiverBuffers.Receivable receivable = this.buffer.consumeOrNull();
            InboundMessage inboundMessage = message = receivable != null ? receivable.getMessage() : null;
            if (this.autoAckEnabled && message != null) {
                ((MessageReceiver.InboundMessageImpl)message).doAck();
            }
            return receivable != null ? message : (InboundMessage)supplierOfAlternativeResponse.get();
        }
        throw new IllegalStateException("Message receiver is not started");
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void receiveAsync(MessageReceiver.MessageHandler messageHandler) throws PubSubPlusClientException {
        Validation.nullIllegal(messageHandler, "Message handler can't be null");
        ReentrantLock lock = this.messageHandlersLock;
        lock.lock();
        try {
            boolean hasHandler = this.messageHandlerRef.compareAndSet(null, new ExecutableMessageHandler(messageHandler, null, logger, this.instanceName));
            if (!hasHandler) {
                throw new IllegalStateException("receiveAsync can be called once only for the given receiver instance");
            }
            this.notEmpty.signalAll();
        }
        finally {
            lock.unlock();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void receiveAsync(MessageReceiver.MessageHandler messageHandler, ExecutorService executorService) throws PubSubPlusClientException {
        Validation.nullIllegal(messageHandler, "Message handler can't be null");
        Validation.nullIllegal(executorService, "Executor service can't be null");
        ReentrantLock lock = this.messageHandlersLock;
        lock.lock();
        try {
            boolean hasHandler = this.messageHandlerRef.compareAndSet(null, new ExecutableMessageHandler(messageHandler, executorService, logger, this.instanceName));
            if (!hasHandler) {
                throw new IllegalStateException("receiveAsync can be called once only for the given receiver instance");
            }
            this.notEmpty.signalAll();
        }
        finally {
            lock.unlock();
        }
    }

    @Override
    public <PersistentMessageReceiver> void startAsync(CompletionListener<PersistentMessageReceiver> startListener) throws PubSubPlusClientException, IllegalStateException {
        Validation.nullIllegal(startListener, "Start listener can't be null");
        CompletableFuture<PersistentMessageReceiver> onceStated = this.startAsync();
        onceStated.whenComplete((receiver, throwable) -> {
            block2: {
                try {
                    startListener.onCompletion(receiver, throwable == null ? null : throwable.getCause());
                }
                catch (Exception e) {
                    if (!logger.isWarnEnabled()) break block2;
                    logger.warn((Object)"Application code throw an unhandled exception by processing async start completion notification", (Throwable)e);
                }
            }
        });
    }

    @Override
    public void ack(InboundMessage message) throws PubSubPlusClientException {
        Validation.nullIllegal(message, "Can't ack null message");
        if (this.autoAckEnabled && logger.isWarnEnabled()) {
            logger.warn((Object)"ack is ignored because the message acknowledgement mode is auto-ack");
        }
        ((MessageReceiver.InboundMessageImpl)message).doAck();
    }

    @Override
    public void addSubscriptionAsync(TopicSubscription topicSubscription, AsyncReceiverSubscriptions.SubscriptionChangeListener listener) throws PubSubPlusClientException {
        if (topicSubscription instanceof CachedTopicSubscription) {
            throw new IllegalArgumentException("Cached topic subscriptions are not supported on persistent messaging");
        }
        Validation.nullIllegal(topicSubscription, "Topic subscription can't be null");
        Validation.nullIllegal(listener, "Listener can't be null");
        Validation.nullOrEmptyIllegal(topicSubscription.getName(), "Topic subscription name can't be null or empty");
        this.addSubscriptionAsync0(topicSubscription, this.queue, listener, this);
    }

    @Override
    public void removeSubscriptionAsync(TopicSubscription topicSubscription, AsyncReceiverSubscriptions.SubscriptionChangeListener listener) throws PubSubPlusClientException {
        if (topicSubscription instanceof CachedTopicSubscription) {
            throw new IllegalArgumentException("Cached topic subscriptions are not supported on persistent messaging");
        }
        Validation.nullIllegal(topicSubscription, "Topic subscription can't be null");
        Validation.nullIllegal(listener, "Listener can't be null");
        Validation.nullOrEmptyIllegal(topicSubscription.getName(), "Topic subscription name can't be null or empty");
        this.removeSubscriptionAsync0(topicSubscription, this.queue, listener, this);
    }

    @Override
    public void pause() throws PubSubPlusClientException {
        if (this.serviceInternalView.isConnected() && this.isRunning()) {
            try {
                this.solaceReceiver.stopSync();
            }
            catch (JCSMPInterruptedException e) {
                throw new PubSubPlusClientException("Failed to pause message receiver", e);
            }
        } else {
            if (!this.serviceInternalView.isConnected()) {
                throw new IllegalStateException("Messaging service not connected");
            }
            if (!this.isRunning()) {
                throw new IllegalStateException("Message receiver is not running");
            }
        }
    }

    @Override
    public void resume() throws PubSubPlusClientException {
        if (this.serviceInternalView.isConnected() && this.isRunning()) {
            try {
                this.solaceReceiver.startSync();
            }
            catch (JCSMPInterruptedException e) {
                throw new PubSubPlusClientException("Failed to restart message receiver", e);
            }
            catch (JCSMPException e) {
                throw new PubSubPlusClientException("Failed to restart message receiver", e);
            }
        } else {
            if (!this.serviceInternalView.isConnected()) {
                throw new IllegalStateException("Messaging service not connected");
            }
            if (!this.isRunning()) {
                throw new IllegalStateException("Message receiver is not running");
            }
        }
    }

    @Override
    public void addSubscription(TopicSubscription anotherSubscription) throws PubSubPlusClientException, InterruptedException {
        if (anotherSubscription instanceof CachedTopicSubscription) {
            throw new IllegalArgumentException("Cached topic subscriptions are not supported on persistent messaging");
        }
        Validation.nullIllegal(anotherSubscription, "Topic subscription can't be null");
        Validation.nullOrEmptyIllegal(anotherSubscription.getName(), "Topic subscription name can't be null or empty");
        this.addSubscription0(anotherSubscription, this.queue, this);
    }

    @Override
    public void removeSubscription(TopicSubscription subscription) throws PubSubPlusClientException, InterruptedException {
        if (subscription instanceof CachedTopicSubscription) {
            throw new IllegalArgumentException("Cached topic subscriptions are not supported on persistent messaging");
        }
        Validation.nullIllegal(subscription, "Topic subscription can't be null");
        Validation.nullOrEmptyIllegal(subscription.getName(), "Topic subscription name can't be null or empty");
        this.removeSubscription0(subscription, this.queue, this);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     * Unable to fully structure code
     */
    @Override
    public CompletableFuture<Void> terminateAsync(long gracePeriod) throws PubSubPlusClientException, IllegalStateException {
        Validation.smallerThanNumbersIllegal(1L, gracePeriod, "Grace period < 1");
        lostMessages = new AtomicInteger(0);
        block8: while (true) {
            currentStateHolder = new int[1];
            currentFuture = this.stateHolder.get(currentStateHolder);
            currentState = currentStateHolder[0];
            switch (currentState) {
                case 0: {
                    stateChanged = this.stateHolder.compareAndSet(null, null, 0, 4);
                    if (!stateChanged) continue block8;
                    this.onTerminate(null, null);
                    return CompletableFuture.completedFuture(null);
                }
                case 1: {
                    this.stateHolder.set(null, 4);
                    currentFuture.cancel(true);
                    return CompletableFuture.completedFuture(null);
                }
                case 2: {
                    terminating = new ExtendedCompletableFuture<Void>();
                    stateChanged = this.stateHolder.compareAndSet(currentFuture, terminating, 2, 3);
                    if (stateChanged) ** break;
                    continue block8;
                    delayTerminationForGracefulShutdownTask = (Task<PersistentMessageReceiverImpl>)LambdaMetafactory.metafactory(null, null, null, (Ljava/lang/Object;)V, lambda$terminateAsync$4(long java.util.concurrent.atomic.AtomicInteger com.solace.messaging.receiver.PersistentMessageReceiverImpl ), (Lcom/solace/messaging/receiver/PersistentMessageReceiverImpl;)V)((PersistentMessageReceiverImpl)this, (long)gracePeriod, (AtomicInteger)lostMessages);
                    try {
                        this.onTerminate(this.stopSolaceConsumerTask, delayTerminationForGracefulShutdownTask);
                    }
                    finally {
                        this.stateHolder.set(null, 4);
                    }
                    lm = lostMessages.get();
                    if (lm > 0) {
                        this.serviceInternalView.getApiMetricsCollector().increaseMetric(Manageable.ApiMetrics.Metric.RECEIVED_MESSAGES_TERMINATION_DISCARDED, lm);
                        terminating.completeExceptionally(new PubSubPlusClientException.IncompleteMessageDeliveryException(String.format("Processing of %d messages could not be completed due to expiration of a grace period", new Object[]{lm}), lm));
                    } else {
                        terminating.complete(null);
                    }
                    return terminating;
                }
            }
            break;
        }
        return CompletableFuture.completedFuture(null);
    }

    @Override
    public void terminateAsync(CompletionListener<Void> terminationListener, long gracePeriod) throws PubSubPlusClientException, IllegalStateException {
        Validation.nullIllegal(terminationListener, "Termination listener can't be null");
        CompletableFuture<Void> disconnecting = this.terminateAsync(gracePeriod);
        disconnecting.whenComplete((nothing, throwable) -> {
            block2: {
                try {
                    terminationListener.onCompletion(null, throwable == null ? null : throwable.getCause());
                }
                catch (Exception e) {
                    if (!logger.isWarnEnabled()) break block2;
                    logger.warn((Object)"Application code throw an unhandled exception by processing termination completion notification", (Throwable)e);
                }
            }
        });
    }

    ConsumerFlowProperties createFlowConfiguration(TypedProperties receiverConfiguration, SolaceQueueHolder queue) {
        ConsumerFlowProperties consumerFlowProperties = new ConsumerFlowProperties();
        consumerFlowProperties.setActiveFlowIndication(true);
        String selector = receiverConfiguration.getProperty("solace.messaging.receiver.persistent.selector-query");
        if (selector != null && !selector.isEmpty()) {
            consumerFlowProperties.setSelector(selector);
        }
        consumerFlowProperties.setEndpoint((Endpoint)queue.getSolaceQueue());
        String replayStrategy = receiverConfiguration.getProperty("solace.messaging.receiver.persistent.replay.strategy");
        if (replayStrategy != null) {
            if ("REPLAY_ALL".equals(replayStrategy)) {
                consumerFlowProperties.setReplayStartLocation((ReplayStartLocation)new ReplayStartLocationBeginningImpl());
            } else if ("REPLAY_TIME_BASED".equals(replayStrategy)) {
                ZonedDateTime replayTime = (ZonedDateTime)receiverConfiguration.getObjectProperty("solace.messaging.receiver.persistent.replay.timebased-start-time");
                consumerFlowProperties.setReplayStartLocation((ReplayStartLocation)new ReplayStartLocationDateImpl(Date.from(replayTime.toInstant())));
            } else if ("REPLAY_REPLICATION_GROUP_MESSAGE_ID_BASED".equals(replayStrategy)) {
                String rmidString = receiverConfiguration.getProperty("solace.messaging.receiver.persistent.replay.replication-group-message-id");
                try {
                    ReplicationGroupMessageId startLocation = ReplicationGroupMessageIdImpl.createReplicationGroupMessageId((String)rmidString);
                    consumerFlowProperties.setReplayStartLocation((ReplayStartLocation)startLocation);
                }
                catch (InvalidPropertiesException e) {
                    logger.error((Object)"Malformed group message id bypassed input validation", (Throwable)e);
                }
            }
        }
        consumerFlowProperties.setAckMode(receiverConfiguration.getProperty("message_ack_mode"));
        consumerFlowProperties.setAckThreshold(receiverConfiguration.getIntegerProperty("sub_ack_window_threshold").intValue());
        consumerFlowProperties.setAckTimerInMsecs(receiverConfiguration.getIntegerProperty("sub_ack_time").intValue());
        consumerFlowProperties.setWindowedAckMaxSize(receiverConfiguration.getIntegerProperty("sub_ack_window_size").intValue());
        consumerFlowProperties.setAckTimerInMsecs(receiverConfiguration.getIntegerProperty("sub_ack_time").intValue());
        consumerFlowProperties.setReconnectTries(receiverConfiguration.getIntegerProperty("solace.messaging.receiver.persistent.transport.reconnection-attempts").intValue());
        consumerFlowProperties.setReconnectRetryIntervalInMsecs(receiverConfiguration.getIntegerProperty("solace.messaging.receiver.persistent.transport.reconnection-attempts-wait-interval").intValue());
        return consumerFlowProperties;
    }

    Task<PersistentMessageReceiverImpl> addSubscriptions(List<TopicSubscription> topicSubscriptions, SolaceQueueHolder queue) {
        if (topicSubscriptions.isEmpty()) {
            return NO_OP_TASK;
        }
        return receiver -> {
            if (receiver.isRunning()) {
                for (TopicSubscription subscription : topicSubscriptions) {
                    if (receiver.serviceInternalView.getClientSession().isConnected()) {
                        if (Thread.currentThread().isInterrupted()) break;
                        if (subscription == null || queue == null) continue;
                        this.addSubscription0(subscription, queue, (PersistentMessageReceiverImpl)receiver);
                        continue;
                    }
                    throw new IllegalStateException("Subscriptions can't be added while service is not connected");
                }
            }
        };
    }

    void onGracefulTerminateEmptyRemainingBuffer(ReceiverBuffers.ReceiverBuffer buffer) {
        buffer.clear();
    }

    void addInitialSubscriptions() {
        this.preStartTasks.removeIf(task -> {
            task.run(this);
            return true;
        });
    }

    void testBrokerCapabilities(List<CapabilityType> capabilities) {
        if (capabilities == null || capabilities.isEmpty()) {
            return;
        }
        if (this.serviceInternalView.getClientSession().isConnected()) {
            for (CapabilityType capability : capabilities) {
                if (this.serviceInternalView.getClientSession().isCapable(capability)) continue;
                throw new PubSubPlusClientException.ServiceCapabilityException("Service does not support configured capability " + capability, capability);
            }
        }
    }

    void createMissingResources(SolaceQueueHolder resourceToCreate, TypedProperties receiverConfiguration) {
        String resourceCreationStrategy = receiverConfiguration.getProperty("solace.messaging.receiver.persistent.missing-resource-creation-strategy");
        if ("CREATE_ON_START".equals(resourceCreationStrategy) && resourceToCreate.isDurable()) {
            EndpointProperties qProvisionProperties = new EndpointProperties();
            if (resourceToCreate.isExclusivelyAccessible()) {
                qProvisionProperties.setAccessType(Integer.valueOf(1));
            } else {
                qProvisionProperties.setAccessType(Integer.valueOf(0));
            }
            try {
                this.serviceInternalView.getClientSession().provision((Endpoint)resourceToCreate.getSolaceQueue(), qProvisionProperties, 1L);
            }
            catch (PropertyMismatchException e) {
                if (logger.isErrorEnabled()) {
                    logger.error((Object)(this.instanceName + " could not provision queue: " + resourceToCreate.getSolaceQueue().getName()), (Throwable)e);
                }
                String message = "Queue '%s' already provisioned using different property set";
                throw new PubSubPlusClientException.ResourceProvisioningException(String.format("Queue '%s' already provisioned using different property set", resourceToCreate.getSolaceQueue().getName()), e);
            }
            catch (JCSMPErrorResponseException re) {
                if (403 == re.getResponseCode()) {
                    throw new PubSubPlusClientException.AuthorizationException("Queue can't be provisioned, user not authorized", re);
                }
            }
            catch (JCSMPException e) {
                throw new PubSubPlusClientException("Queue can't be provisioned, see cause for more details", e);
            }
        }
    }

    void addSubscription0(TopicSubscription subscription, SolaceQueueHolder queue, PersistentMessageReceiverImpl receiver) throws PubSubPlusClientException {
        if (!receiver.serviceInternalView.getClientSession().isConnected()) {
            throw new IllegalStateException("Subscription can't be added when service is not connected");
        }
        ReentrantLock sLock = this.subscriptionsLock;
        try {
            sLock.lockInterruptibly();
            if (!this.appliedTopicSubscriptions.contains(subscription)) {
                Topic topicToAdd = this.createTopicInstance(subscription);
                try {
                    receiver.serviceInternalView.getClientSession().addSubscription((Endpoint)queue.getSolaceQueue(), (Subscription)topicToAdd, 4);
                    receiver.appliedTopicSubscriptions.add(subscription);
                }
                catch (JCSMPException e) {
                    if (logger.isErrorEnabled()) {
                        logger.error((Object)(this.instanceName + " failed to apply subscription: " + topicToAdd), (Throwable)e);
                    }
                }
            }
        }
        catch (InterruptedException e) {
            if (logger.isErrorEnabled()) {
                logger.error((Object)(this.instanceName + " failed to apply subscription due to interruption"), (Throwable)e);
            }
            throw new PubSubPlusClientException.RequestInterruptedException("Failed to apply subscription due to interruption", e);
        }
        finally {
            sLock.unlock();
        }
    }

    void addSubscriptionAsync0(TopicSubscription subscription, SolaceQueueHolder queue, AsyncReceiverSubscriptions.SubscriptionChangeListener listener, PersistentMessageReceiverImpl receiver) throws PubSubPlusClientException {
        if (!receiver.serviceInternalView.getClientSession().isConnected()) {
            throw new IllegalStateException("Subscription can't be added when service is not connected");
        }
        try {
            this.asyncSubscriptionsExecutorService.submit(() -> {
                AsyncReceiverSubscriptions.AddSubscriptionListenerAdapter subscriptionListener = new AsyncReceiverSubscriptions.AddSubscriptionListenerAdapter(listener, subscription);
                try {
                    this.addSubscription0(subscription, queue, receiver);
                    subscriptionListener.handleSuccess(subscription);
                }
                catch (Exception e) {
                    subscriptionListener.handleError(subscription, e);
                }
                return null;
            });
        }
        catch (RejectedExecutionException e) {
            if (logger.isErrorEnabled()) {
                logger.error((Object)(this.instanceName + " could not schedule adding of subscription: " + subscription));
            }
            throw new PubSubPlusClientException("Adding subscription task was rejected by internal executor service", e);
        }
    }

    void removeSubscriptionAsync0(TopicSubscription subscription, SolaceQueueHolder queue, AsyncReceiverSubscriptions.SubscriptionChangeListener listener, PersistentMessageReceiverImpl receiver) throws PubSubPlusClientException {
        try {
            this.asyncSubscriptionsExecutorService.submit(() -> {
                AtomicBoolean noSuchSubscription = new AtomicBoolean(true);
                AsyncReceiverSubscriptions.RemoveSubscriptionListenerAdapter subscriptionListenerAdapter = new AsyncReceiverSubscriptions.RemoveSubscriptionListenerAdapter(listener, subscription);
                this.appliedTopicSubscriptions.removeIf(appliedSubscription -> {
                    if (subscription.equals(appliedSubscription)) {
                        block8: {
                            noSuchSubscription.set(false);
                            try {
                                this.removeSubscription0(subscription, queue, receiver);
                                try {
                                    subscriptionListenerAdapter.handleSuccess(subscription);
                                }
                                catch (Exception e) {
                                    if (logger.isWarnEnabled()) {
                                        logger.warn((Object)("Application code throw an unhandled exception by processing adding of subscription: " + subscription), (Throwable)e);
                                    }
                                }
                            }
                            catch (Exception e) {
                                try {
                                    subscriptionListenerAdapter.handleError(subscription, e);
                                }
                                catch (Exception ex) {
                                    if (!logger.isWarnEnabled()) break block8;
                                    logger.warn((Object)("Application code throw an unhandled exception by processing removal of subscription: " + subscription), (Throwable)e);
                                }
                            }
                        }
                        return true;
                    }
                    return false;
                });
                if (noSuchSubscription.get()) {
                    subscriptionListenerAdapter.handleSuccess(subscription);
                }
                return null;
            });
        }
        catch (RejectedExecutionException e) {
            if (logger.isErrorEnabled()) {
                logger.error((Object)(this.instanceName + " could not schedule removing of subscription: " + subscription));
            }
            throw new PubSubPlusClientException("Subscription removal task was rejected by internal executor service", e);
        }
    }

    void removeSubscription0(TopicSubscription subscription, SolaceQueueHolder queue, PersistentMessageReceiverImpl receiver) throws PubSubPlusClientException {
        this.appliedTopicSubscriptions.removeIf(appliedSubscription -> {
            if (subscription.equals(appliedSubscription)) {
                try {
                    receiver.serviceInternalView.getClientSession().removeSubscription((Endpoint)queue.getSolaceQueue(), (Subscription)this.createTopicInstance(subscription), 4);
                }
                catch (Exception e) {
                    if (logger.isErrorEnabled()) {
                        logger.error((Object)(this.instanceName + " failed to remove subscription: " + subscription), (Throwable)e);
                    }
                    throw new PubSubPlusClientException("Un-subscription could not be performed", e);
                }
                return true;
            }
            return false;
        });
    }

    boolean isMessageReplayEnabled(TypedProperties receiverConfiguration) {
        String replayStrategy = receiverConfiguration.getProperty("solace.messaging.receiver.persistent.replay.strategy");
        return replayStrategy != null && !replayStrategy.isEmpty();
    }

    void onStart() throws PubSubPlusClientException.ResourceProvisioningException, PubSubPlusClientException.AuthorizationException, PubSubPlusClientException.ServiceCapabilityException, PubSubPlusClientException {
        this.serviceInternalView.addReconnectionListener(this.reconnectionListener);
        this.serviceInternalView.addReconnectionAttemptListener(this.reconnectionAttemptListener);
        this.serviceInternalView.getClientSession().addServiceInterruptionListener(this.serviceInterruptionListener);
        this.serviceInternalView.getClientSession().addClientSessionStateListener(this.closedSessionListener);
        this.testBrokerCapabilities(this.getRequiredCapabilities(this.receiverConfiguration, this.queue));
        this.createMissingResources(this.queue, this.receiverConfiguration);
        boolean messageReplay = this.isMessageReplayEnabled(this.receiverConfiguration);
        if (messageReplay) {
            this.addInitialSubscriptions();
        }
        if (this.solaceReceiver != null) {
            this.solaceReceiver.close(true);
        }
        this.solaceReceiver = this.createSolaceConsumer(this.queue, this);
        if (!messageReplay) {
            this.addInitialSubscriptions();
        }
        try {
            this.defaultReceiverExecutorService.submit(this.asyncConsumerMessageDispatchTask);
        }
        catch (RejectedExecutionException e) {
            if (logger.isErrorEnabled()) {
                logger.error((Object)(this.instanceName + " could not schedule consumer dispatcher task"));
            }
            throw new PubSubPlusClientException("Internal Executor service rejected consumer dispatcher task", e);
        }
        try {
            this.solaceReceiver.startSync();
        }
        catch (JCSMPException e) {
            e.printStackTrace();
        }
    }

    private LifecycleControl.TerminationEvent mapFlowDownException(Exception e) {
        if (e instanceof JCSMPErrorResponseException) {
            switch (((JCSMPErrorResponseException)((Object)e)).getSubcodeEx()) {
                case 78: 
                case 79: 
                case 80: 
                case 81: 
                case 82: 
                case 83: 
                case 84: {
                    return new TerminationEventImpl(Instant.now().toEpochMilli(), "Receiver flow is down due to replay problem", new PubSubPlusClientException.MessageReplayException(JCSMPErrorResponseSubcodeEx.getSubcodeAsString((int)((JCSMPErrorResponseException)((Object)e)).getSubcodeEx()), e));
                }
            }
            return new TerminationEventImpl(Instant.now().toEpochMilli(), "Receiver flow is shut down", new PubSubPlusClientException(e));
        }
        if (e instanceof PubSubPlusClientException) {
            return new TerminationEventImpl(Instant.now().toEpochMilli(), "Receiver flow is shut down", (PubSubPlusClientException)e);
        }
        return new TerminationEventImpl(Instant.now().toEpochMilli(), "Receiver flow is shut down", new PubSubPlusClientException(e));
    }

    FlowReceiver createSolaceConsumer(SolaceQueueHolder queue, final PersistentMessageReceiverImpl receiver) {
        FlowEventHandler flowEventHandler = new FlowEventHandler(){
            final String instanceReconnected;
            final String instanceReconnecting;
            {
                this.instanceReconnected = PersistentMessageReceiverImpl.this.instanceName + " reconnected";
                this.instanceReconnecting = PersistentMessageReceiverImpl.this.instanceName + " reconnecting";
            }

            public void handleEvent(Object source, FlowEventArgs event) {
                ClientSession session = PersistentMessageReceiverImpl.this.serviceInternalView.getClientSession();
                String uri = "n/a";
                if (session != null) {
                    if (session.getClientChannel() != null && session.getClientChannel().getSmfClient() != null) {
                        uri = session.getClientChannel().getSmfClient().getRemoteHost();
                    }
                    if (event.getEvent() == FlowEvent.FLOW_DOWN) {
                        if (session.isReconnecting()) {
                            if (logger.isDebugEnabled()) {
                                logger.debug((Object)(PersistentMessageReceiverImpl.this.instanceName + " flow receiver is temporarily down while reconnecting"));
                            }
                            return;
                        }
                        PersistentMessageReceiverImpl.this.stateHolder.set(null, 4);
                        PersistentMessageReceiverImpl.this.terminationNotificationDispatcher.onTermination(PersistentMessageReceiverImpl.this.mapFlowDownException(event.getException()));
                        PersistentMessageReceiverImpl.this.terminateOnUnsolicitedInterruption();
                    } else if (event.getEvent() == FlowEvent.FLOW_UP) {
                        if (logger.isDebugEnabled()) {
                            logger.debug((Object)(PersistentMessageReceiverImpl.this.instanceName + " flow receiver is started"));
                        }
                    } else if (event.getEvent() == FlowEvent.FLOW_ACTIVE) {
                        if (logger.isDebugEnabled()) {
                            logger.debug((Object)(PersistentMessageReceiverImpl.this.instanceName + " flow receiver is active"));
                        }
                    } else if (event.getEvent() == FlowEvent.FLOW_RECONNECTING) {
                        if (logger.isDebugEnabled()) {
                            logger.debug((Object)(PersistentMessageReceiverImpl.this.instanceName + " flow receiver is reconnecting"));
                        }
                        PersistentMessageReceiverImpl.this.reconnectionAttemptListener.onReconnecting(new ServiceEventImpl(uri, event.getException(), this.instanceReconnecting));
                    } else if (event.getEvent() == FlowEvent.FLOW_RECONNECTED) {
                        if (logger.isDebugEnabled()) {
                            logger.debug((Object)(PersistentMessageReceiverImpl.this.instanceName + " flow receiver is reconnected"));
                        }
                        PersistentMessageReceiverImpl.this.reconnectionListener.onReconnected(new ServiceEventImpl(uri, event.getException(), this.instanceReconnected));
                    }
                }
            }
        };
        InboundMessage.SolaceMessageListener messageDispatcher = new InboundMessage.SolaceMessageListener(queue.getName()){

            @Override
            public void onException(PubSubPlusClientException e) {
                receiver.messageReceiptFailureNotificationDispatcher.onException(e);
                if (logger.isErrorEnabled()) {
                    logger.error((Object)(receiver.instanceName + " encountered problem during message reception."), (Throwable)e);
                }
            }

            @Override
            public void onReceive(InboundMessage message) {
                try {
                    receiver.buffer.insert(ReceiverBuffers.Receivable.of(message));
                }
                catch (Exception e) {
                    PersistentMessageReceiverImpl.this.serviceInternalView.getClientSession().getSessionStats().incStat(StatType.MESSAGES_DISCARDED_INTERNAL);
                    receiver.messageReceiptFailureNotificationDispatcher.onException(e);
                    if (logger.isErrorEnabled()) {
                        logger.error((Object)(receiver.instanceName + " encountered problem during message reception."), (Throwable)e);
                    }
                    return;
                }
            }
        };
        try {
            if (queue.isDurable()) {
                try {
                    return receiver.serviceInternalView.getClientSession().createFlow(messageDispatcher, this.consumerProps, null, flowEventHandler);
                }
                catch (AccessDeniedException ade) {
                    throw new PubSubPlusClientException.AuthorizationException("User not authorized to bind a flow to or to a queue", ade);
                }
            }
            try {
                EndpointProperties tmpQprovisioningProperties = new EndpointProperties();
                tmpQprovisioningProperties.setAccessType(Integer.valueOf(1));
                tmpQprovisioningProperties.setPermission(Integer.valueOf(4));
                return receiver.serviceInternalView.getClientSession().createFlow(messageDispatcher, this.consumerProps, tmpQprovisioningProperties, flowEventHandler);
            }
            catch (AccessDeniedException ade) {
                throw new PubSubPlusClientException.AuthorizationException("User not authorized to bind a flow to or to provision a temporary queue", ade);
            }
        }
        catch (JCSMPErrorResponseException e) {
            int errorSubcode = e.getSubcodeEx();
            if (73 <= errorSubcode && 84 >= errorSubcode) {
                throw new PubSubPlusClientException.MessageReplayException(e.getMessage(), e);
            }
            if (20 == errorSubcode) {
                throw new PubSubPlusClientException.MissingResourceException(e.getMessage(), e);
            }
            throw new PubSubPlusClientException("Solace consumer flow could not be created", e);
        }
        catch (JCSMPException e) {
            throw new PubSubPlusClientException("Solace consumer flow could not be created", e);
        }
    }

    void onTerminate(Task<PersistentMessageReceiverImpl> preTerminationTask, Task<PersistentMessageReceiverImpl> postTerminationTask) {
        try {
            if (preTerminationTask != null) {
                preTerminationTask.run(this);
            }
            this.appliedTopicSubscriptions.clear();
            this.preStartTasks.clear();
            this.serviceInternalView.removeReconnectionListener(this.reconnectionListener);
            this.serviceInternalView.removeReconnectionAttemptListener(this.reconnectionAttemptListener);
            this.serviceInternalView.getClientSession().removeServiceInterruptionListener(this.serviceInterruptionListener);
            this.serviceInternalView.getClientSession().removeClientSessionStateListener(this.closedSessionListener);
            if (logger.isDebugEnabled()) {
                logger.debug((Object)(this.instanceName + " receiver is shutdown"));
            }
            this.buffer.clearDiscardedHandler();
            if (!this.defaultReceiverExecutorService.isShutdown()) {
                this.defaultReceiverExecutorService.shutdown();
            }
        }
        finally {
            if (postTerminationTask != null) {
                postTerminationTask.run(this);
            }
        }
    }

    void terminateOnUnsolicitedInterruption() throws PubSubPlusClientException {
        this.stateHolder.set(null, 4);
        Task<PersistentMessageReceiverImpl> postTerminationTask = receiver -> {
            block12: {
                if (receiver.solaceReceiver != null) {
                    try {
                        receiver.solaceReceiver.stopSync();
                    }
                    catch (JCSMPInterruptedException e) {
                        if (logger.isWarnEnabled()) {
                            logger.warn((Object)(receiver.instanceName + " could not be stopped after service was interrupted."));
                        }
                    }
                    finally {
                        receiver.solaceReceiver.setMessageListener(null);
                    }
                }
                try {
                    int bufferSize = receiver.buffer.size();
                    boolean bufferEmpty = bufferSize < 1;
                    receiver.buffer.clear();
                    if (!bufferEmpty) {
                        this.serviceInternalView.getApiMetricsCollector().increaseMetric(Manageable.ApiMetrics.Metric.RECEIVED_MESSAGES_TERMINATION_DISCARDED, bufferSize);
                        if (logger.isWarnEnabled()) {
                            logger.warn((Object)(receiver.instanceName + " non-gracefully terminated before all buffered messages were processed."));
                        }
                    }
                }
                catch (PubSubPlusClientException.RequestInterruptedException e) {
                    if (!logger.isWarnEnabled()) break block12;
                    logger.warn((Object)("Non-graceful termination of " + receiver.instanceName + " was interrupted"));
                }
            }
        };
        if (logger.isDebugEnabled()) {
            logger.debug((Object)(this.instanceName + " is being non gracefully terminated due to service interruption"));
        }
        this.onTerminate(null, postTerminationTask);
        if (logger.isDebugEnabled()) {
            logger.debug((Object)(this.instanceName + " is terminated"));
        }
    }

    List<CapabilityType> getRequiredCapabilities(TypedProperties receiverConfiguration, com.solace.messaging.resources.Queue queue) {
        String replayStrategy;
        LinkedList<CapabilityType> c = new LinkedList<CapabilityType>();
        c.add(CapabilityType.PUB_FLOW_GUARANTEED);
        c.add(CapabilityType.QUEUE_SUBSCRIPTIONS);
        c.add(CapabilityType.ACTIVE_FLOW_INDICATION);
        String resourceCreationStrategy = receiverConfiguration.getProperty("solace.messaging.receiver.persistent.missing-resource-creation-strategy");
        if ("CREATE_ON_START".equals(resourceCreationStrategy)) {
            c.add(CapabilityType.ENDPOINT_MANAGEMENT);
            if (!queue.isDurable()) {
                c.add(CapabilityType.TEMP_ENDPOINT);
            }
        }
        if ((replayStrategy = receiverConfiguration.getProperty("solace.messaging.receiver.persistent.replay.strategy")) != null) {
            c.add(CapabilityType.MESSAGE_REPLAY);
        }
        return c;
    }

    boolean isAutoAckConfigured(TypedProperties receiverConfiguration) {
        String ackStrategy = receiverConfiguration.getProperty("solace.messaging.receiver.persistent.ack.strategy");
        return "AUTO_ACK".equals(ackStrategy);
    }

    Topic createTopicInstance(TopicSubscription subscription) {
        return JCSMPFactory.onlyInstance().createTopic(subscription.getName());
    }

    private /* synthetic */ void lambda$terminateAsync$4(long gracePeriod, AtomicInteger lostMessages, PersistentMessageReceiverImpl receiver) {
        boolean bufferEmpty = this.buffer.awaitEmpty(gracePeriod, TimeUnit.MILLISECONDS);
        if (!bufferEmpty) {
            if (logger.isWarnEnabled()) {
                logger.warn((Object)(receiver.instanceName + " shutdown gracefully before all buffered messages were processed, grace period was not sufficient: " + gracePeriod));
            }
            lostMessages.set(receiver.buffer.size());
            this.onGracefulTerminateEmptyRemainingBuffer(receiver.buffer);
        }
    }

    @Internal
    @ProviderType
    private static class AsyncConsumerMessageDispatchTask
    implements Callable<Void> {
        private final ReceiverBuffers.ReceiverBuffer buffer;
        private final PersistentMessageReceiverImpl receiver;
        private final ReentrantLock messageHandlersLock;
        private final Condition notEmpty;

        private AsyncConsumerMessageDispatchTask(ReceiverBuffers.ReceiverBuffer buffer, PersistentMessageReceiverImpl receiver, ReentrantLock messageHandlersLock, Condition notEmpty) {
            this.buffer = buffer;
            this.receiver = receiver;
            this.messageHandlersLock = messageHandlersLock;
            this.notEmpty = notEmpty;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public Void call() throws Exception {
            while (this.receiver.isRunning() || this.receiver.isTerminating()) {
                try {
                    MessageReceiver.MessageHandler messageHandler = (MessageReceiver.MessageHandler)this.receiver.messageHandlerRef.get();
                    if (messageHandler != null) {
                        ReceiverBuffers.Receivable nextMessageTask = this.buffer.consume();
                        if (nextMessageTask != null) {
                            InboundMessage message = nextMessageTask.getMessage();
                            if (message != null) {
                                boolean exceptionThrown;
                                block14: {
                                    exceptionThrown = false;
                                    try {
                                        messageHandler.onMessage(message);
                                    }
                                    catch (Exception t) {
                                        exceptionThrown = true;
                                        if (!logger.isErrorEnabled()) break block14;
                                        logger.error((Object)("Application code throw an unhandled exception by message processing: " + message), (Throwable)t);
                                    }
                                }
                                if (!this.receiver.autoAckEnabled || exceptionThrown) continue;
                                ((MessageReceiver.InboundMessageImpl)message).doAck();
                                continue;
                            }
                            if (!logger.isInfoEnabled()) continue;
                            logger.info((Object)(this.receiver.instanceName + " buffer returned null message"));
                            continue;
                        }
                        if (!logger.isInfoEnabled()) continue;
                        logger.info((Object)(this.receiver.instanceName + " buffer returned no message"));
                        continue;
                    }
                    ReentrantLock theLock = this.messageHandlersLock;
                    try {
                        theLock.lockInterruptibly();
                        if (this.receiver.messageHandlerRef.get() != null) continue;
                        this.notEmpty.await();
                    }
                    catch (InterruptedException e) {}
                    continue;
                    finally {
                        theLock.unlock();
                    }
                }
                catch (Exception e) {
                    boolean threadInterrupted = Thread.interrupted();
                    if (threadInterrupted) {
                        if (!logger.isInfoEnabled()) continue;
                        logger.info((Object)(this.receiver.instanceName + " waiting for a message was interrupted"), (Throwable)e);
                        continue;
                    }
                    if (!logger.isErrorEnabled()) continue;
                    logger.error((Object)(this.receiver.instanceName + " Problem during a message reception"), (Throwable)e);
                }
            }
            return null;
        }
    }

    @Internal
    @ProviderType
    private static class StopSolaceConsumerTask
    implements Task<PersistentMessageReceiverImpl> {
        private StopSolaceConsumerTask() {
        }

        @Override
        public void run(PersistentMessageReceiverImpl receiver) {
            block5: {
                try {
                    if (receiver.solaceReceiver == null) break block5;
                    try {
                        receiver.solaceReceiver.closeSync();
                    }
                    finally {
                        receiver.solaceReceiver.setMessageListener(null);
                    }
                }
                catch (JCSMPException e) {
                    if (!logger.isWarnEnabled()) break block5;
                    logger.warn((Object)(receiver.instanceName + " internal Solace receiver failed to stop."), (Throwable)e);
                }
            }
        }
    }

    @Internal
    @ProviderType
    static class SolaceQueueHolder
    implements com.solace.messaging.resources.Queue {
        private final boolean exclusivelyAccessible;
        private final Queue solaceQueue;

        private SolaceQueueHolder(Queue solaceQueue, boolean exclusive) {
            this.solaceQueue = solaceQueue;
            this.exclusivelyAccessible = exclusive;
        }

        @Override
        public String getName() {
            return this.solaceQueue.getName();
        }

        @Override
        public boolean isExclusivelyAccessible() {
            return this.exclusivelyAccessible;
        }

        @Override
        public boolean isDurable() {
            return this.solaceQueue.isDurable();
        }

        public Queue getSolaceQueue() {
            return this.solaceQueue;
        }

        static SolaceQueueHolder create(com.solace.messaging.resources.Queue q, ClientSession session) {
            if (!q.isDurable()) {
                if (q.getName() == null) {
                    Queue sq = QueueImpl.createWithInit((String)DestinationUtil.createNonDurQueueTrbTopic((String)session.getVirtualRouterName(), null), (boolean)false, (String)session.getVirtualRouterName());
                    return new SolaceQueueHolder(sq, q.isExclusivelyAccessible());
                }
                Queue sq = QueueImpl.createWithInit((String)DestinationUtil.createNonDurQueueTrbTopic((String)session.getVirtualRouterName(), (String)q.getName()), (boolean)false, (String)session.getVirtualRouterName());
                return new SolaceQueueHolder(sq, q.isExclusivelyAccessible());
            }
            return new SolaceQueueHolder(QueueImpl.userCreateWithInit((String)q.getName(), (boolean)true, (String)session.getVirtualRouterName()), q.isExclusivelyAccessible());
        }
    }

    @Internal
    @ProviderType
    class FlowControlHandler
    implements ReceiverBuffers.CapacityChangeListener {
        private final ExecutorService defaultFlowControlExecutorService;
        private final FlowControlResumeTask resumeTask;
        private final FlowControlPauseTask pauseTask;

        private FlowControlHandler() {
            this.defaultFlowControlExecutorService = Executors.newSingleThreadExecutor(new ThreadFactories.NamedDaemonThreadFactory(PersistentMessageReceiverImpl.this.instanceName + "-flow-control-dispatcher"));
            this.resumeTask = new FlowControlResumeTask(PersistentMessageReceiverImpl.this);
            this.pauseTask = new FlowControlPauseTask(PersistentMessageReceiverImpl.this);
        }

        @Override
        public void low() {
            block5: {
                FlowReceiver r = PersistentMessageReceiverImpl.this.solaceReceiver;
                if (PersistentMessageReceiverImpl.this.isRunning() && r != null) {
                    try {
                        this.defaultFlowControlExecutorService.submit(this.pauseTask);
                    }
                    catch (RejectedExecutionException e) {
                        if (!logger.isWarnEnabled()) break block5;
                        logger.warn((Object)(PersistentMessageReceiverImpl.this.instanceName + " could not schedule pause on flow control, executing on same thread"));
                        try {
                            this.pauseTask.call();
                        }
                        catch (Exception ex) {
                            if (!logger.isErrorEnabled()) break block5;
                            logger.error((Object)(PersistentMessageReceiverImpl.this.instanceName + " flow control 'pause' operation failed"), (Throwable)ex);
                        }
                    }
                }
            }
        }

        @Override
        public void normal() {
            block5: {
                FlowReceiver r = PersistentMessageReceiverImpl.this.solaceReceiver;
                if (PersistentMessageReceiverImpl.this.isRunning() && r != null) {
                    try {
                        this.defaultFlowControlExecutorService.submit(this.resumeTask);
                    }
                    catch (RejectedExecutionException e) {
                        if (!logger.isWarnEnabled()) break block5;
                        logger.warn((Object)(PersistentMessageReceiverImpl.this.instanceName + " could not schedule pause on flow control, executing on same thread"));
                        try {
                            this.resumeTask.call();
                        }
                        catch (Exception ex) {
                            if (!logger.isErrorEnabled()) break block5;
                            logger.error((Object)(PersistentMessageReceiverImpl.this.instanceName + " flow control 'resume' operation failed"), (Throwable)ex);
                        }
                    }
                }
            }
        }

        @ProviderType
        private final class FlowControlResumeTask
        implements Callable<Void> {
            private final PersistentMessageReceiverImpl messageReceiverParent;

            FlowControlResumeTask(PersistentMessageReceiverImpl messageReceiverParent) {
                this.messageReceiverParent = messageReceiverParent;
            }

            @Override
            public Void call() throws Exception {
                FlowReceiver r = this.messageReceiverParent.solaceReceiver;
                if (this.messageReceiverParent.isRunning() && r != null && !r.isClosed()) {
                    try {
                        r.start();
                    }
                    catch (JCSMPException e) {
                        this.messageReceiverParent.messageReceiptFailureNotificationDispatcher.onException((Exception)((Object)e));
                    }
                }
                return null;
            }
        }

        @ProviderType
        private final class FlowControlPauseTask
        implements Callable<Void> {
            private final PersistentMessageReceiverImpl messageReceiverParent;

            FlowControlPauseTask(PersistentMessageReceiverImpl messageReceiverParent) {
                this.messageReceiverParent = messageReceiverParent;
            }

            @Override
            public Void call() throws Exception {
                FlowReceiver r = this.messageReceiverParent.solaceReceiver;
                if (this.messageReceiverParent.isRunning() && r != null && !r.isClosed()) {
                    try {
                        r.stop();
                    }
                    catch (Exception e) {
                        this.messageReceiverParent.messageReceiptFailureNotificationDispatcher.onException(e);
                    }
                }
                return null;
            }
        }
    }

    @Internal
    @ProviderType
    private class PersistentReceiverInfoImpl
    implements ManageableReceiver.PersistentReceiverInfo {
        private final ManageableReceiver.PersistentReceiverInfo.ResourceInfo resourceInfo = new QueueResourceInfoImpl();

        @Override
        public long getId() {
            return PersistentMessageReceiverImpl.this.id;
        }

        @Override
        public String getInstanceName() {
            return PersistentMessageReceiverImpl.this.instanceName;
        }

        @Override
        public ManageableReceiver.PersistentReceiverInfo.ResourceInfo getResourceInfo() throws IllegalStateException {
            return this.resourceInfo;
        }

        @ProviderType
        private class QueueResourceInfoImpl
        implements ManageableReceiver.PersistentReceiverInfo.ResourceInfo {
            private QueueResourceInfoImpl() {
            }

            @Override
            public boolean isDurable() {
                return PersistentMessageReceiverImpl.this.queue.isDurable();
            }

            @Override
            public String getName() {
                return PersistentMessageReceiverImpl.this.queue.getSolaceQueue().getName();
            }
        }
    }
}

