/*
 * Decompiled with CFR 0.152.
 */
package com.microsoft.azure.servicebus;

import com.microsoft.azure.servicebus.ClientFactory;
import com.microsoft.azure.servicebus.ExceptionPhase;
import com.microsoft.azure.servicebus.IMessage;
import com.microsoft.azure.servicebus.IMessageAndSessionPump;
import com.microsoft.azure.servicebus.IMessageHandler;
import com.microsoft.azure.servicebus.IMessageReceiver;
import com.microsoft.azure.servicebus.IMessageSession;
import com.microsoft.azure.servicebus.ISessionHandler;
import com.microsoft.azure.servicebus.InitializableEntity;
import com.microsoft.azure.servicebus.MessageHandlerOptions;
import com.microsoft.azure.servicebus.ReceiveMode;
import com.microsoft.azure.servicebus.SessionHandlerOptions;
import com.microsoft.azure.servicebus.primitives.ExceptionUtil;
import com.microsoft.azure.servicebus.primitives.MessageLockLostException;
import com.microsoft.azure.servicebus.primitives.MessagingEntityType;
import com.microsoft.azure.servicebus.primitives.MessagingFactory;
import com.microsoft.azure.servicebus.primitives.OperationCancelledException;
import com.microsoft.azure.servicebus.primitives.ServiceBusException;
import com.microsoft.azure.servicebus.primitives.SessionLockLostException;
import com.microsoft.azure.servicebus.primitives.StringUtil;
import com.microsoft.azure.servicebus.primitives.TimeoutException;
import com.microsoft.azure.servicebus.primitives.Timer;
import com.microsoft.azure.servicebus.primitives.TimerType;
import java.time.Duration;
import java.time.Instant;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ScheduledFuture;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

class MessageAndSessionPump
extends InitializableEntity
implements IMessageAndSessionPump {
    private static final Logger TRACE_LOGGER = LoggerFactory.getLogger(MessageAndSessionPump.class);
    private static final Duration MINIMUM_MESSAGE_LOCK_VALIDITY = Duration.ofSeconds(4L);
    private static final Duration MAXIMUM_RENEW_LOCK_BUFFER = Duration.ofSeconds(10L);
    private static final Duration SLEEP_DURATION_ON_ACCEPT_SESSION_EXCEPTION = Duration.ofMinutes(1L);
    private static final int UNSET_PREFETCH_COUNT = -1;
    private static final CompletableFuture<Void> COMPLETED_FUTURE = CompletableFuture.completedFuture(null);
    private final ConcurrentHashMap<String, IMessageSession> openSessions;
    private final MessagingFactory factory;
    private final String entityPath;
    private final ReceiveMode receiveMode;
    private final MessagingEntityType entityType;
    private IMessageReceiver innerReceiver;
    private boolean handlerRegistered = false;
    private IMessageHandler messageHandler;
    private ISessionHandler sessionHandler;
    private MessageHandlerOptions messageHandlerOptions;
    private SessionHandlerOptions sessionHandlerOptions;
    private int prefetchCount;
    private ExecutorService customCodeExecutor;

    public MessageAndSessionPump(MessagingFactory factory, String entityPath, MessagingEntityType entityType, ReceiveMode receiveMode) {
        super(StringUtil.getShortRandomString());
        this.factory = factory;
        this.entityPath = entityPath;
        this.entityType = entityType;
        this.receiveMode = receiveMode;
        this.openSessions = new ConcurrentHashMap();
        this.prefetchCount = -1;
    }

    @Override
    @Deprecated
    public void registerMessageHandler(IMessageHandler handler) throws InterruptedException, ServiceBusException {
        this.registerMessageHandler(handler, new MessageHandlerOptions());
    }

    @Override
    public void registerMessageHandler(IMessageHandler handler, ExecutorService executorService) throws InterruptedException, ServiceBusException {
        this.registerMessageHandler(handler, new MessageHandlerOptions(), executorService);
    }

    @Override
    @Deprecated
    public void registerMessageHandler(IMessageHandler handler, MessageHandlerOptions handlerOptions) throws InterruptedException, ServiceBusException {
        this.registerMessageHandler(handler, handlerOptions, ForkJoinPool.commonPool());
    }

    @Override
    public void registerMessageHandler(IMessageHandler handler, MessageHandlerOptions handlerOptions, ExecutorService executorService) throws InterruptedException, ServiceBusException {
        MessageAndSessionPump.assertNonNulls(handler, handlerOptions, executorService);
        TRACE_LOGGER.info("Registering message handler on entity '{}' with '{}'", (Object)this.entityPath, (Object)handlerOptions);
        this.setHandlerRegistered();
        this.messageHandler = handler;
        this.messageHandlerOptions = handlerOptions;
        this.customCodeExecutor = executorService;
        this.innerReceiver = ClientFactory.createMessageReceiverFromEntityPath(this.factory, this.entityPath, this.entityType, this.receiveMode);
        TRACE_LOGGER.info("Created MessageReceiver to entity '{}'", (Object)this.entityPath);
        if (this.prefetchCount != -1) {
            this.innerReceiver.setPrefetchCount(this.prefetchCount);
        }
        for (int i = 0; i < handlerOptions.getMaxConcurrentCalls(); ++i) {
            this.receiveAndPumpMessage();
        }
    }

    @Override
    @Deprecated
    public void registerSessionHandler(ISessionHandler handler) throws InterruptedException, ServiceBusException {
        this.registerSessionHandler(handler, new SessionHandlerOptions());
    }

    @Override
    public void registerSessionHandler(ISessionHandler handler, ExecutorService executorService) throws InterruptedException, ServiceBusException {
        this.registerSessionHandler(handler, new SessionHandlerOptions(), executorService);
    }

    @Override
    @Deprecated
    public void registerSessionHandler(ISessionHandler handler, SessionHandlerOptions handlerOptions) throws InterruptedException, ServiceBusException {
        this.registerSessionHandler(handler, handlerOptions, ForkJoinPool.commonPool());
    }

    @Override
    public void registerSessionHandler(ISessionHandler handler, SessionHandlerOptions handlerOptions, ExecutorService executorService) throws InterruptedException, ServiceBusException {
        MessageAndSessionPump.assertNonNulls(handler, handlerOptions, executorService);
        TRACE_LOGGER.info("Registering session handler on entity '{}' with '{}'", (Object)this.entityPath, (Object)handlerOptions);
        this.setHandlerRegistered();
        this.sessionHandler = handler;
        this.sessionHandlerOptions = handlerOptions;
        this.customCodeExecutor = executorService;
        for (int i = 0; i < handlerOptions.getMaxConcurrentSessions(); ++i) {
            this.acceptSessionAndPumpMessages();
        }
    }

    private static void assertNonNulls(Object handler, Object options, ExecutorService executorService) {
        if (handler == null || options == null || executorService == null) {
            throw new IllegalArgumentException("None of the arguments can be null.");
        }
    }

    private synchronized void setHandlerRegistered() {
        this.throwIfClosed(null);
        if (this.handlerRegistered) {
            throw new UnsupportedOperationException("MessageHandler or SessionHandler already registered.");
        }
        this.handlerRegistered = true;
    }

    private void receiveAndPumpMessage() {
        if (!this.getIsClosingOrClosed()) {
            CompletableFuture<IMessage> receiveMessageFuture = this.innerReceiver.receiveAsync(this.messageHandlerOptions.getMessageWaitDuration());
            receiveMessageFuture.handleAsync((message, receiveEx) -> {
                if (receiveEx != null) {
                    receiveEx = ExceptionUtil.extractAsyncCompletionCause(receiveEx);
                    TRACE_LOGGER.error("Receiving message from entity '{}' failed.", (Object)this.entityPath, receiveEx);
                    this.notifyExceptionToMessageHandler((Throwable)receiveEx, ExceptionPhase.RECEIVE);
                    this.receiveAndPumpMessage();
                } else if (message == null) {
                    TRACE_LOGGER.debug("Receive from entity '{}' returned no messages.", (Object)this.entityPath);
                    this.receiveAndPumpMessage();
                } else {
                    CompletionStage onMessageFuture;
                    MessgeRenewLockLoop renewLockLoop;
                    TRACE_LOGGER.trace("Message with sequence number '{}' received from entity '{}'.", (Object)message.getSequenceNumber(), (Object)this.entityPath);
                    if (this.innerReceiver.getReceiveMode() == ReceiveMode.PEEKLOCK) {
                        Instant stopRenewMessageLockAt = Instant.now().plus(this.messageHandlerOptions.getMaxAutoRenewDuration());
                        renewLockLoop = new MessgeRenewLockLoop(this.innerReceiver, this, (IMessage)message, stopRenewMessageLockAt);
                        renewLockLoop.startLoop();
                        TRACE_LOGGER.trace("Started loop to renew lock on message with sequence number '{}' until '{}'", (Object)message.getSequenceNumber(), (Object)stopRenewMessageLockAt);
                    } else {
                        renewLockLoop = null;
                    }
                    try {
                        TRACE_LOGGER.debug("Invoking onMessage with message containing sequence number '{}'", (Object)message.getSequenceNumber());
                        onMessageFuture = COMPLETED_FUTURE.thenComposeAsync(v -> this.messageHandler.onMessageAsync((IMessage)message), (Executor)this.customCodeExecutor);
                    }
                    catch (Exception onMessageSyncEx) {
                        TRACE_LOGGER.error("Invocation of onMessage with message containing sequence number '{}' threw unexpected exception", (Object)message.getSequenceNumber(), (Object)onMessageSyncEx);
                        onMessageFuture = new CompletableFuture();
                        ((CompletableFuture)onMessageFuture).completeExceptionally(onMessageSyncEx);
                    }
                    if (onMessageFuture == null) {
                        onMessageFuture = COMPLETED_FUTURE;
                    }
                    ((CompletableFuture)onMessageFuture).handleAsync((v, onMessageEx) -> {
                        if (onMessageEx != null) {
                            onMessageEx = ExceptionUtil.extractAsyncCompletionCause(onMessageEx);
                            TRACE_LOGGER.error("onMessage with message containing sequence number '{}' threw exception", (Object)message.getSequenceNumber(), onMessageEx);
                            this.notifyExceptionToMessageHandler((Throwable)onMessageEx, ExceptionPhase.USERCALLBACK);
                        }
                        if (this.innerReceiver.getReceiveMode() == ReceiveMode.PEEKLOCK) {
                            CompletableFuture<Object> updateDispositionFuture;
                            ExceptionPhase dispositionPhase;
                            if (renewLockLoop != null) {
                                renewLockLoop.cancelLoop();
                                TRACE_LOGGER.trace("Cancelled loop to renew lock on message with sequence number '{}'", (Object)message.getSequenceNumber());
                            }
                            if (onMessageEx == null) {
                                dispositionPhase = ExceptionPhase.COMPLETE;
                                if (this.messageHandlerOptions.isAutoComplete()) {
                                    TRACE_LOGGER.debug("Completing message with sequence number '{}'", (Object)message.getSequenceNumber());
                                    updateDispositionFuture = this.innerReceiver.completeAsync(message.getLockToken());
                                } else {
                                    updateDispositionFuture = CompletableFuture.completedFuture(null);
                                }
                            } else {
                                dispositionPhase = ExceptionPhase.ABANDON;
                                if (this.messageHandlerOptions.isAutoComplete()) {
                                    TRACE_LOGGER.debug("Abandoning message with sequence number '{}'", (Object)message.getSequenceNumber());
                                    updateDispositionFuture = this.innerReceiver.abandonAsync(message.getLockToken());
                                } else {
                                    updateDispositionFuture = CompletableFuture.completedFuture(null);
                                }
                            }
                            updateDispositionFuture.handleAsync((u, updateDispositionEx) -> {
                                if (updateDispositionEx != null) {
                                    updateDispositionEx = ExceptionUtil.extractAsyncCompletionCause(updateDispositionEx);
                                    TRACE_LOGGER.error("{} message with sequence number '{}' failed", new Object[]{dispositionPhase == ExceptionPhase.COMPLETE ? "Completing" : "Abandoning", message.getSequenceNumber(), updateDispositionEx});
                                    this.notifyExceptionToMessageHandler((Throwable)updateDispositionEx, dispositionPhase);
                                }
                                this.receiveAndPumpMessage();
                                return null;
                            }, (Executor)MessagingFactory.INTERNAL_THREAD_POOL);
                        } else {
                            this.receiveAndPumpMessage();
                        }
                        return null;
                    }, (Executor)MessagingFactory.INTERNAL_THREAD_POOL);
                }
                return null;
            }, (Executor)MessagingFactory.INTERNAL_THREAD_POOL);
        }
    }

    private void acceptSessionAndPumpMessages() {
        if (!this.getIsClosingOrClosed()) {
            TRACE_LOGGER.debug("Accepting a session from entity '{}'", (Object)this.entityPath);
            CompletableFuture<IMessageSession> acceptSessionFuture = ClientFactory.acceptSessionFromEntityPathAsync(this.factory, this.entityPath, this.entityType, null, this.receiveMode);
            acceptSessionFuture.handleAsync((session, acceptSessionEx) -> {
                if (acceptSessionEx != null) {
                    if (!((acceptSessionEx = ExceptionUtil.extractAsyncCompletionCause(acceptSessionEx)) instanceof TimeoutException)) {
                        TRACE_LOGGER.error("Accepting a session from entity '{}' failed.", (Object)this.entityPath, acceptSessionEx);
                        this.notifyExceptionToSessionHandler((Throwable)acceptSessionEx, ExceptionPhase.ACCEPTSESSION);
                    }
                    if (!(acceptSessionEx instanceof OperationCancelledException)) {
                        TRACE_LOGGER.debug("AcceptSession from entity '{}' will be retried after '{}'.", (Object)this.entityPath, (Object)SLEEP_DURATION_ON_ACCEPT_SESSION_EXCEPTION);
                        Timer.schedule(() -> this.acceptSessionAndPumpMessages(), SLEEP_DURATION_ON_ACCEPT_SESSION_EXCEPTION, TimerType.OneTimeRun);
                    }
                } else {
                    TRACE_LOGGER.debug("Accepted a session '{}' from entity '{}'", (Object)session.getSessionId(), (Object)this.entityPath);
                    if (this.prefetchCount != -1) {
                        try {
                            session.setPrefetchCount(this.prefetchCount);
                        }
                        catch (ServiceBusException serviceBusException) {
                            // empty catch block
                        }
                    }
                    this.openSessions.put(session.getSessionId(), (IMessageSession)session);
                    SessionRenewLockLoop sessionRenewLockLoop = new SessionRenewLockLoop((IMessageSession)session, this);
                    sessionRenewLockLoop.startLoop();
                    TRACE_LOGGER.debug("Started loop to renew lock on session '{}'", (Object)session.getSessionId());
                    SessionTracker sessionTracker = new SessionTracker(this, (IMessageSession)session, sessionRenewLockLoop);
                    for (int i = 0; i < this.sessionHandlerOptions.getMaxConcurrentCallsPerSession(); ++i) {
                        this.receiveFromSessionAndPumpMessage(sessionTracker);
                    }
                }
                return null;
            }, (Executor)MessagingFactory.INTERNAL_THREAD_POOL);
        }
    }

    private void receiveFromSessionAndPumpMessage(SessionTracker sessionTracker) {
        if (!this.getIsClosingOrClosed()) {
            IMessageSession session = sessionTracker.getSession();
            CompletableFuture<IMessage> receiverFuture = session.receiveAsync(this.sessionHandlerOptions.getMessageWaitDuration());
            receiverFuture.handleAsync((message, receiveEx) -> {
                if (receiveEx != null) {
                    receiveEx = ExceptionUtil.extractAsyncCompletionCause(receiveEx);
                    TRACE_LOGGER.error("Receiving message from session '{}' on entity '{}' failed.", new Object[]{session.getSessionId(), this.entityPath, receiveEx});
                    this.notifyExceptionToSessionHandler((Throwable)receiveEx, ExceptionPhase.RECEIVE);
                    sessionTracker.shouldRetryOnNoMessageOrException().thenAcceptAsync(shouldRetry -> {
                        if (shouldRetry.booleanValue()) {
                            this.receiveFromSessionAndPumpMessage(sessionTracker);
                        }
                    }, (Executor)MessagingFactory.INTERNAL_THREAD_POOL);
                } else if (message == null) {
                    TRACE_LOGGER.debug("Receive from from session '{}' on entity '{}' returned no messages.", (Object)session.getSessionId(), (Object)this.entityPath);
                    sessionTracker.shouldRetryOnNoMessageOrException().thenAcceptAsync(shouldRetry -> {
                        if (shouldRetry.booleanValue()) {
                            this.receiveFromSessionAndPumpMessage(sessionTracker);
                        }
                    }, (Executor)MessagingFactory.INTERNAL_THREAD_POOL);
                } else {
                    CompletableFuture<Object> onMessageFuture;
                    TRACE_LOGGER.trace("Message with sequence number '{}' received from session '{}' on entity '{}'.", new Object[]{message.getSequenceNumber(), session.getSessionId(), this.entityPath});
                    sessionTracker.notifyMessageReceived();
                    ScheduledFuture<?> renewCancelTimer = Timer.schedule(() -> {
                        TRACE_LOGGER.warn("onMessage task timed out. Cancelling loop to renew lock on session '{}'", (Object)session.getSessionId());
                        sessionTracker.sessionRenewLockLoop.cancelLoop();
                    }, this.sessionHandlerOptions.getMaxAutoRenewDuration(), TimerType.OneTimeRun);
                    TRACE_LOGGER.debug("Invoking onMessage with message containing sequence number '{}'", (Object)message.getSequenceNumber());
                    try {
                        onMessageFuture = COMPLETED_FUTURE.thenComposeAsync(v -> this.sessionHandler.onMessageAsync(session, (IMessage)message), (Executor)this.customCodeExecutor);
                    }
                    catch (Exception onMessageSyncEx) {
                        TRACE_LOGGER.error("Invocation of onMessage with message containing sequence number '{}' threw unexpected exception", (Object)message.getSequenceNumber(), (Object)onMessageSyncEx);
                        onMessageFuture = new CompletableFuture();
                        onMessageFuture.completeExceptionally(onMessageSyncEx);
                    }
                    if (onMessageFuture == null) {
                        onMessageFuture = COMPLETED_FUTURE;
                    }
                    onMessageFuture.handleAsync((v, onMessageEx) -> {
                        renewCancelTimer.cancel(true);
                        if (onMessageEx != null) {
                            onMessageEx = ExceptionUtil.extractAsyncCompletionCause(onMessageEx);
                            TRACE_LOGGER.error("onMessage with message containing sequence number '{}' threw exception", (Object)message.getSequenceNumber(), onMessageEx);
                            this.notifyExceptionToSessionHandler((Throwable)onMessageEx, ExceptionPhase.USERCALLBACK);
                        }
                        if (this.receiveMode == ReceiveMode.PEEKLOCK) {
                            CompletableFuture<Object> updateDispositionFuture;
                            ExceptionPhase dispositionPhase;
                            if (onMessageEx == null) {
                                dispositionPhase = ExceptionPhase.COMPLETE;
                                if (this.sessionHandlerOptions.isAutoComplete()) {
                                    TRACE_LOGGER.debug("Completing message with sequence number '{}'", (Object)message.getSequenceNumber());
                                    updateDispositionFuture = session.completeAsync(message.getLockToken());
                                } else {
                                    updateDispositionFuture = CompletableFuture.completedFuture(null);
                                }
                            } else {
                                dispositionPhase = ExceptionPhase.ABANDON;
                                if (this.sessionHandlerOptions.isAutoComplete()) {
                                    TRACE_LOGGER.debug("Abandoning message with sequence number '{}'", (Object)message.getSequenceNumber());
                                    updateDispositionFuture = session.abandonAsync(message.getLockToken());
                                } else {
                                    updateDispositionFuture = CompletableFuture.completedFuture(null);
                                }
                            }
                            updateDispositionFuture.handleAsync((u, updateDispositionEx) -> {
                                if (updateDispositionEx != null) {
                                    updateDispositionEx = ExceptionUtil.extractAsyncCompletionCause(updateDispositionEx);
                                    TRACE_LOGGER.error("{} message with sequence number '{}' failed", new Object[]{dispositionPhase == ExceptionPhase.COMPLETE ? "Completing" : "Abandoning", message.getSequenceNumber(), updateDispositionEx});
                                    this.notifyExceptionToSessionHandler((Throwable)updateDispositionEx, dispositionPhase);
                                }
                                this.receiveFromSessionAndPumpMessage(sessionTracker);
                                return null;
                            }, (Executor)MessagingFactory.INTERNAL_THREAD_POOL);
                        } else {
                            this.receiveFromSessionAndPumpMessage(sessionTracker);
                        }
                        return null;
                    }, (Executor)MessagingFactory.INTERNAL_THREAD_POOL);
                }
                return null;
            }, (Executor)MessagingFactory.INTERNAL_THREAD_POOL);
        }
    }

    @Override
    CompletableFuture<Void> initializeAsync() {
        return CompletableFuture.completedFuture(null);
    }

    @Override
    protected CompletableFuture<Void> onClose() {
        TRACE_LOGGER.info("Closing message and session pump on entity '{}'", (Object)this.entityPath);
        CompletableFuture[] closeFutures = new CompletableFuture[this.openSessions.size() + 1];
        int arrayIndex = 0;
        for (IMessageSession session : this.openSessions.values()) {
            closeFutures[arrayIndex++] = session.closeAsync();
        }
        closeFutures[arrayIndex] = this.innerReceiver == null ? CompletableFuture.completedFuture(null) : this.innerReceiver.closeAsync();
        return CompletableFuture.allOf(closeFutures);
    }

    @Override
    public void abandon(UUID lockToken) throws InterruptedException, ServiceBusException {
        this.checkInnerReceiveCreated();
        this.innerReceiver.abandon(lockToken);
    }

    @Override
    public void abandon(UUID lockToken, Map<String, Object> propertiesToModify) throws InterruptedException, ServiceBusException {
        this.checkInnerReceiveCreated();
        this.innerReceiver.abandon(lockToken, propertiesToModify);
    }

    @Override
    public CompletableFuture<Void> abandonAsync(UUID lockToken) {
        this.checkInnerReceiveCreated();
        return this.innerReceiver.abandonAsync(lockToken);
    }

    @Override
    public CompletableFuture<Void> abandonAsync(UUID lockToken, Map<String, Object> propertiesToModify) {
        this.checkInnerReceiveCreated();
        return this.innerReceiver.abandonAsync(lockToken, propertiesToModify);
    }

    @Override
    public void complete(UUID lockToken) throws InterruptedException, ServiceBusException {
        this.checkInnerReceiveCreated();
        this.innerReceiver.complete(lockToken);
    }

    @Override
    public CompletableFuture<Void> completeAsync(UUID lockToken) {
        this.checkInnerReceiveCreated();
        return this.innerReceiver.completeAsync(lockToken);
    }

    void defer(UUID lockToken) throws InterruptedException, ServiceBusException {
        this.checkInnerReceiveCreated();
        this.innerReceiver.defer(lockToken);
    }

    void defer(UUID lockToken, Map<String, Object> propertiesToModify) throws InterruptedException, ServiceBusException {
        this.checkInnerReceiveCreated();
        this.innerReceiver.defer(lockToken, propertiesToModify);
    }

    @Override
    public void deadLetter(UUID lockToken) throws InterruptedException, ServiceBusException {
        this.innerReceiver.deadLetter(lockToken);
    }

    @Override
    public void deadLetter(UUID lockToken, Map<String, Object> propertiesToModify) throws InterruptedException, ServiceBusException {
        this.innerReceiver.deadLetter(lockToken, propertiesToModify);
    }

    @Override
    public void deadLetter(UUID lockToken, String deadLetterReason, String deadLetterErrorDescription) throws InterruptedException, ServiceBusException {
        this.checkInnerReceiveCreated();
        this.innerReceiver.deadLetter(lockToken, deadLetterReason, deadLetterErrorDescription);
    }

    @Override
    public void deadLetter(UUID lockToken, String deadLetterReason, String deadLetterErrorDescription, Map<String, Object> propertiesToModify) throws InterruptedException, ServiceBusException {
        this.checkInnerReceiveCreated();
        this.innerReceiver.deadLetter(lockToken, deadLetterReason, deadLetterErrorDescription, propertiesToModify);
    }

    @Override
    public CompletableFuture<Void> deadLetterAsync(UUID lockToken) {
        this.checkInnerReceiveCreated();
        return this.innerReceiver.deadLetterAsync(lockToken);
    }

    @Override
    public CompletableFuture<Void> deadLetterAsync(UUID lockToken, Map<String, Object> propertiesToModify) {
        this.checkInnerReceiveCreated();
        return this.innerReceiver.deadLetterAsync(lockToken, propertiesToModify);
    }

    @Override
    public CompletableFuture<Void> deadLetterAsync(UUID lockToken, String deadLetterReason, String deadLetterErrorDescription) {
        this.checkInnerReceiveCreated();
        return this.innerReceiver.deadLetterAsync(lockToken, deadLetterReason, deadLetterErrorDescription);
    }

    @Override
    public CompletableFuture<Void> deadLetterAsync(UUID lockToken, String deadLetterReason, String deadLetterErrorDescription, Map<String, Object> propertiesToModify) {
        this.checkInnerReceiveCreated();
        return this.innerReceiver.deadLetterAsync(lockToken, deadLetterReason, deadLetterErrorDescription, propertiesToModify);
    }

    private void checkInnerReceiveCreated() {
        if (this.innerReceiver == null) {
            throw new UnsupportedOperationException("Receiver not created. Registering a MessageHandler creates a receiver.");
        }
    }

    private void notifyExceptionToSessionHandler(Throwable ex, ExceptionPhase phase) {
        if (!(ex instanceof IllegalStateException) || !this.getIsClosingOrClosed()) {
            this.customCodeExecutor.execute(() -> this.sessionHandler.notifyException(ex, phase));
        }
    }

    private void notifyExceptionToMessageHandler(Throwable ex, ExceptionPhase phase) {
        if (!(ex instanceof IllegalStateException) || !this.getIsClosingOrClosed()) {
            this.customCodeExecutor.execute(() -> this.messageHandler.notifyException(ex, phase));
        }
    }

    @Override
    public int getPrefetchCount() {
        return this.prefetchCount;
    }

    @Override
    public void setPrefetchCount(int prefetchCount) throws ServiceBusException {
        IMessageSession[] currentAcceptedSessions;
        if (prefetchCount < 0) {
            throw new IllegalArgumentException("Prefetch count cannot be negative.");
        }
        this.prefetchCount = prefetchCount;
        if (this.innerReceiver != null) {
            this.innerReceiver.setPrefetchCount(prefetchCount);
        }
        for (IMessageSession session : currentAcceptedSessions = this.openSessions.values().toArray(new IMessageSession[0])) {
            try {
                session.setPrefetchCount(prefetchCount);
            }
            catch (IllegalStateException illegalStateException) {
                // empty catch block
            }
        }
    }

    private static class SessionRenewLockLoop
    extends RenewLockLoop {
        private IMessageSession session;
        private MessageAndSessionPump messageAndSessionPump;
        private String sessionIdentifier;
        ScheduledFuture<?> timerFuture;

        SessionRenewLockLoop(IMessageSession session, MessageAndSessionPump messageAndSessionPump) {
            this.session = session;
            this.messageAndSessionPump = messageAndSessionPump;
            this.sessionIdentifier = String.format("session with id:%s", this.session.getSessionId());
        }

        @Override
        protected ScheduledFuture<?> getTimerFuture() {
            return this.timerFuture;
        }

        @Override
        protected void loop() {
            Duration renewInterval;
            if (!this.isCancelled() && (renewInterval = RenewLockLoop.getNextRenewInterval(this.session.getLockedUntilUtc(), this.sessionIdentifier)) != null && !renewInterval.isNegative()) {
                this.timerFuture = Timer.schedule(() -> {
                    TRACE_LOGGER.debug("Renewing lock on '{}'", (Object)this.sessionIdentifier);
                    this.session.renewSessionLockAsync().handleAsync((v, renewLockEx) -> {
                        if (renewLockEx != null) {
                            renewLockEx = ExceptionUtil.extractAsyncCompletionCause(renewLockEx);
                            TRACE_LOGGER.error("Renewing lock on '{}' failed", (Object)this.sessionIdentifier, renewLockEx);
                            this.messageAndSessionPump.notifyExceptionToSessionHandler(renewLockEx, ExceptionPhase.RENEWSESSIONLOCK);
                            if (!(renewLockEx instanceof SessionLockLostException) && !(renewLockEx instanceof OperationCancelledException)) {
                                this.loop();
                            }
                        } else {
                            TRACE_LOGGER.debug("Renewed lock on '{}'", (Object)this.sessionIdentifier);
                            this.loop();
                        }
                        return null;
                    }, (Executor)MessagingFactory.INTERNAL_THREAD_POOL);
                }, renewInterval, TimerType.OneTimeRun);
            }
        }
    }

    private static class MessgeRenewLockLoop
    extends RenewLockLoop {
        private IMessageReceiver innerReceiver;
        private MessageAndSessionPump messageAndSessionPump;
        private IMessage message;
        private Instant stopRenewalAt;
        private String messageIdentifier;
        ScheduledFuture<?> timerFuture;

        MessgeRenewLockLoop(IMessageReceiver innerReceiver, MessageAndSessionPump messageAndSessionPump, IMessage message, Instant stopRenewalAt) {
            this.innerReceiver = innerReceiver;
            this.messageAndSessionPump = messageAndSessionPump;
            this.message = message;
            this.stopRenewalAt = stopRenewalAt;
            this.messageIdentifier = String.format("message with locktoken : %s, sequence number : %s", this.message.getLockToken(), this.message.getSequenceNumber());
        }

        @Override
        protected ScheduledFuture<?> getTimerFuture() {
            return this.timerFuture;
        }

        @Override
        protected void loop() {
            Duration renewInterval;
            if (!this.isCancelled() && (renewInterval = this.getNextRenewInterval()) != null && !renewInterval.isNegative()) {
                this.timerFuture = Timer.schedule(() -> {
                    TRACE_LOGGER.debug("Renewing lock on '{}'", (Object)this.messageIdentifier);
                    this.innerReceiver.renewMessageLockAsync(this.message).handleAsync((v, renewLockEx) -> {
                        if (renewLockEx != null) {
                            renewLockEx = ExceptionUtil.extractAsyncCompletionCause(renewLockEx);
                            TRACE_LOGGER.error("Renewing lock on '{}' failed", (Object)this.messageIdentifier, renewLockEx);
                            this.messageAndSessionPump.notifyExceptionToMessageHandler(renewLockEx, ExceptionPhase.RENEWMESSAGELOCK);
                            if (!(renewLockEx instanceof MessageLockLostException) && !(renewLockEx instanceof OperationCancelledException)) {
                                this.loop();
                            }
                        } else {
                            TRACE_LOGGER.debug("Renewed lock on '{}'", (Object)this.messageIdentifier);
                            this.loop();
                        }
                        return null;
                    }, (Executor)MessagingFactory.INTERNAL_THREAD_POOL);
                }, renewInterval, TimerType.OneTimeRun);
            }
        }

        private Duration getNextRenewInterval() {
            if (this.message.getLockedUntilUtc().isBefore(this.stopRenewalAt)) {
                return RenewLockLoop.getNextRenewInterval(this.message.getLockedUntilUtc(), this.messageIdentifier);
            }
            return null;
        }
    }

    private static abstract class RenewLockLoop {
        private boolean cancelled = false;

        protected RenewLockLoop() {
        }

        protected abstract void loop();

        protected abstract ScheduledFuture<?> getTimerFuture();

        protected boolean isCancelled() {
            return this.cancelled;
        }

        public void startLoop() {
            this.loop();
        }

        public void cancelLoop() {
            if (!this.cancelled) {
                this.cancelled = true;
                ScheduledFuture<?> timerFuture = this.getTimerFuture();
                if (timerFuture != null && !timerFuture.isDone()) {
                    timerFuture.cancel(true);
                }
            }
        }

        protected static Duration getNextRenewInterval(Instant lockedUntilUtc, String identifier) {
            Duration remainingTime = Duration.between(Instant.now(), lockedUntilUtc);
            if (remainingTime.isNegative()) {
                remainingTime = MINIMUM_MESSAGE_LOCK_VALIDITY;
                TRACE_LOGGER.warn("Lock of '{}' already expired. May be there is clock skew. Still trying to renew lock", (Object)identifier);
            }
            Duration buffer = remainingTime.dividedBy(2L).compareTo(MAXIMUM_RENEW_LOCK_BUFFER) > 0 ? MAXIMUM_RENEW_LOCK_BUFFER : remainingTime.dividedBy(2L);
            TRACE_LOGGER.debug("Lock of '{}' is valid for '{}'. It will be renewed '{}' before it expires.", new Object[]{identifier, remainingTime, buffer});
            return remainingTime.minus(buffer);
        }
    }

    private static class SessionTracker {
        private final int numberReceivingThreads;
        private final IMessageSession session;
        private final MessageAndSessionPump messageAndSessionPump;
        private final SessionRenewLockLoop sessionRenewLockLoop;
        private int waitingRetryThreads;
        private CompletableFuture<Boolean> retryFuture;

        SessionTracker(MessageAndSessionPump messageAndSessionPump, IMessageSession session, SessionRenewLockLoop sessionRenewLockLoop) {
            this.messageAndSessionPump = messageAndSessionPump;
            this.session = session;
            this.sessionRenewLockLoop = sessionRenewLockLoop;
            this.numberReceivingThreads = messageAndSessionPump.sessionHandlerOptions.getMaxConcurrentCallsPerSession();
            this.waitingRetryThreads = 0;
        }

        public IMessageSession getSession() {
            return this.session;
        }

        synchronized void notifyMessageReceived() {
            TRACE_LOGGER.trace("Message received from session '{}'", (Object)this.session.getSessionId());
            if (this.retryFuture != null && !this.retryFuture.isDone()) {
                this.waitingRetryThreads = 0;
                this.retryFuture.complete(true);
            }
        }

        synchronized CompletableFuture<Boolean> shouldRetryOnNoMessageOrException() {
            if (this.retryFuture == null || this.retryFuture.isDone()) {
                this.retryFuture = new CompletableFuture();
            }
            ++this.waitingRetryThreads;
            if (this.waitingRetryThreads == this.numberReceivingThreads) {
                CompletableFuture onCloseFuture;
                TRACE_LOGGER.info("No messages recevied by any receive call from session '{}'. Closing the session.", (Object)this.session.getSessionId());
                this.retryFuture.complete(false);
                ScheduledFuture<?> renewCancelTimer = Timer.schedule(() -> {
                    TRACE_LOGGER.warn("Closing session timed out. Cancelling loop to renew lock on session '{}'", (Object)this.session.getSessionId());
                    this.sessionRenewLockLoop.cancelLoop();
                }, this.messageAndSessionPump.sessionHandlerOptions.getMaxAutoRenewDuration(), TimerType.OneTimeRun);
                try {
                    onCloseFuture = COMPLETED_FUTURE.thenComposeAsync(v -> this.messageAndSessionPump.sessionHandler.OnCloseSessionAsync(this.session), (Executor)this.messageAndSessionPump.customCodeExecutor);
                }
                catch (Exception onCloseSyncEx) {
                    TRACE_LOGGER.error("Invocation of onCloseSession on session '{}' threw unexpected exception", (Object)this.session.getSessionId(), (Object)onCloseSyncEx);
                    onCloseFuture = new CompletableFuture();
                    onCloseFuture.completeExceptionally(onCloseSyncEx);
                }
                if (onCloseFuture == null) {
                    onCloseFuture = COMPLETED_FUTURE;
                }
                onCloseFuture.handleAsync((v, onCloseEx) -> {
                    renewCancelTimer.cancel(true);
                    if (onCloseEx != null) {
                        onCloseEx = ExceptionUtil.extractAsyncCompletionCause(onCloseEx);
                        TRACE_LOGGER.error("onCloseSession on session '{}' threw exception", (Object)this.session.getSessionId(), onCloseEx);
                        this.messageAndSessionPump.notifyExceptionToSessionHandler(onCloseEx, ExceptionPhase.USERCALLBACK);
                    }
                    this.sessionRenewLockLoop.cancelLoop();
                    TRACE_LOGGER.debug("Cancelled loop to renew lock on session '{}'", (Object)this.session.getSessionId());
                    this.session.closeAsync().handleAsync((z, closeEx) -> {
                        if (closeEx != null) {
                            closeEx = ExceptionUtil.extractAsyncCompletionCause(closeEx);
                            TRACE_LOGGER.info("Closing session '{}' from entity '{}' failed", new Object[]{this.session.getSessionId(), this.messageAndSessionPump.entityPath, closeEx});
                            this.messageAndSessionPump.notifyExceptionToSessionHandler(closeEx, ExceptionPhase.SESSIONCLOSE);
                        } else {
                            TRACE_LOGGER.info("Closed session '{}' from entity '{}'", (Object)this.session.getSessionId(), (Object)this.messageAndSessionPump.entityPath);
                        }
                        this.messageAndSessionPump.openSessions.remove(this.session.getSessionId());
                        this.messageAndSessionPump.acceptSessionAndPumpMessages();
                        return null;
                    }, (Executor)MessagingFactory.INTERNAL_THREAD_POOL);
                    return null;
                }, (Executor)MessagingFactory.INTERNAL_THREAD_POOL);
            }
            return this.retryFuture;
        }
    }
}

