/*
 * Decompiled with CFR 0.152.
 */
package com.microsoft.azure.servicebus.primitives;

import com.microsoft.azure.servicebus.TransactionContext;
import com.microsoft.azure.servicebus.amqp.DispatchHandler;
import com.microsoft.azure.servicebus.amqp.IAmqpReceiver;
import com.microsoft.azure.servicebus.amqp.ReceiveLinkHandler;
import com.microsoft.azure.servicebus.amqp.SessionHandler;
import com.microsoft.azure.servicebus.primitives.AsyncUtil;
import com.microsoft.azure.servicebus.primitives.ClientConstants;
import com.microsoft.azure.servicebus.primitives.ClientEntity;
import com.microsoft.azure.servicebus.primitives.CommonRequestResponseOperations;
import com.microsoft.azure.servicebus.primitives.ErrorContext;
import com.microsoft.azure.servicebus.primitives.ExceptionUtil;
import com.microsoft.azure.servicebus.primitives.IErrorContextProvider;
import com.microsoft.azure.servicebus.primitives.MessageWithDeliveryTag;
import com.microsoft.azure.servicebus.primitives.MessageWithLockToken;
import com.microsoft.azure.servicebus.primitives.MessagingEntityType;
import com.microsoft.azure.servicebus.primitives.MessagingFactory;
import com.microsoft.azure.servicebus.primitives.OperationCancelledException;
import com.microsoft.azure.servicebus.primitives.ReceiveWorkItem;
import com.microsoft.azure.servicebus.primitives.ReceiverErrorContext;
import com.microsoft.azure.servicebus.primitives.RequestResponseLink;
import com.microsoft.azure.servicebus.primitives.RequestResponseUtils;
import com.microsoft.azure.servicebus.primitives.RetryPolicy;
import com.microsoft.azure.servicebus.primitives.ServiceBusException;
import com.microsoft.azure.servicebus.primitives.SessionCannotBeLockedException;
import com.microsoft.azure.servicebus.primitives.SessionLockLostException;
import com.microsoft.azure.servicebus.primitives.SettleModePair;
import com.microsoft.azure.servicebus.primitives.StringUtil;
import com.microsoft.azure.servicebus.primitives.TimeoutException;
import com.microsoft.azure.servicebus.primitives.TimeoutTracker;
import com.microsoft.azure.servicebus.primitives.Timer;
import com.microsoft.azure.servicebus.primitives.TimerType;
import com.microsoft.azure.servicebus.primitives.UpdateStateWorkItem;
import com.microsoft.azure.servicebus.primitives.Util;
import com.microsoft.azure.servicebus.primitives.WorkItem;
import java.io.IOException;
import java.time.Duration;
import java.time.Instant;
import java.time.ZonedDateTime;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Date;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import org.apache.qpid.proton.amqp.Binary;
import org.apache.qpid.proton.amqp.Symbol;
import org.apache.qpid.proton.amqp.UnsignedInteger;
import org.apache.qpid.proton.amqp.messaging.Accepted;
import org.apache.qpid.proton.amqp.messaging.AmqpValue;
import org.apache.qpid.proton.amqp.messaging.Modified;
import org.apache.qpid.proton.amqp.messaging.Outcome;
import org.apache.qpid.proton.amqp.messaging.Rejected;
import org.apache.qpid.proton.amqp.messaging.Released;
import org.apache.qpid.proton.amqp.messaging.Source;
import org.apache.qpid.proton.amqp.messaging.Target;
import org.apache.qpid.proton.amqp.transaction.TransactionalState;
import org.apache.qpid.proton.amqp.transport.DeliveryState;
import org.apache.qpid.proton.amqp.transport.ErrorCondition;
import org.apache.qpid.proton.amqp.transport.ReceiverSettleMode;
import org.apache.qpid.proton.amqp.transport.SenderSettleMode;
import org.apache.qpid.proton.engine.BaseHandler;
import org.apache.qpid.proton.engine.Connection;
import org.apache.qpid.proton.engine.Delivery;
import org.apache.qpid.proton.engine.EndpointState;
import org.apache.qpid.proton.engine.Extendable;
import org.apache.qpid.proton.engine.Handler;
import org.apache.qpid.proton.engine.Link;
import org.apache.qpid.proton.engine.Receiver;
import org.apache.qpid.proton.engine.Session;
import org.apache.qpid.proton.message.Message;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class CoreMessageReceiver
extends ClientEntity
implements IAmqpReceiver,
IErrorContextProvider {
    private static final Logger TRACE_LOGGER = LoggerFactory.getLogger(CoreMessageReceiver.class);
    private static final Duration LINK_REOPEN_TIMEOUT = Duration.ofMinutes(5L);
    private static final Duration RETURN_MESSAGES_DAEMON_WAKE_UP_INTERVAL = Duration.ofMillis(1L);
    private static final Duration UPDATE_STATE_REQUESTS_DAEMON_WAKE_UP_INTERVAL = Duration.ofMillis(500L);
    private static final Duration ZERO_TIMEOUT_APPROXIMATION = Duration.ofMillis(200L);
    private static final int CREDIT_FLOW_BATCH_SIZE = 50;
    private final Object requestResonseLinkCreationLock = new Object();
    private final ConcurrentLinkedQueue<ReceiveWorkItem> pendingReceives;
    private final ConcurrentHashMap<String, UpdateStateWorkItem> pendingUpdateStateRequests;
    private final ConcurrentHashMap<String, Delivery> tagsToDeliveriesMap;
    private final MessagingFactory underlyingFactory;
    private final String receivePath;
    private final String sasTokenAudienceURI;
    private final Duration operationTimeout;
    private final CompletableFuture<Void> linkClose;
    private final Object prefetchCountSync;
    private final SettleModePair settleModePair;
    private final RetryPolicy retryPolicy;
    private int prefetchCount;
    private String sessionId;
    private boolean isSessionReceiver;
    private boolean isBrowsableSession;
    private Instant sessionLockedUntilUtc;
    private boolean isSessionLockLost;
    private ConcurrentLinkedQueue<MessageWithDeliveryTag> prefetchedMessages;
    private Receiver receiveLink;
    private RequestResponseLink requestResponseLink;
    private WorkItem<CoreMessageReceiver> linkOpen;
    private Exception lastKnownLinkError;
    private Instant lastKnownErrorReportedAt;
    private final AtomicInteger creditToFlow;
    private final AtomicInteger creditNeededtoServePendingReceives;
    private final AtomicInteger currentPrefetechedMessagesCount;
    private ScheduledFuture<?> sasTokenRenewTimerFuture;
    private CompletableFuture<Void> requestResponseLinkCreationFuture;
    private CompletableFuture<Void> receiveLinkReopenFuture;
    private CompletableFuture<Void> ensureLinkReopenFutureToWaitOn;
    private final Runnable timedOutUpdateStateRequestsDaemon;
    private final Runnable returnMesagesLoopDaemon;
    private final MessagingEntityType entityType;
    private boolean shouldRetryLinkReopenOnTransientFailure = true;
    private ScheduledFuture<?> updateStateRequestsTimeoutChecker;
    private ScheduledFuture<?> returnMessagesLoopRunner;

    private CoreMessageReceiver(MessagingFactory factory, String name, String recvPath, String sessionId, int prefetchCount, SettleModePair settleModePair, MessagingEntityType entityType) {
        super(name);
        this.underlyingFactory = factory;
        this.operationTimeout = factory.getOperationTimeout();
        this.receivePath = recvPath;
        this.sasTokenAudienceURI = String.format("amqp://%s/%s", factory.getHostName(), recvPath);
        this.sessionId = sessionId;
        this.isSessionReceiver = false;
        this.isBrowsableSession = false;
        this.prefetchCount = prefetchCount;
        this.settleModePair = settleModePair;
        this.prefetchedMessages = new ConcurrentLinkedQueue();
        this.linkClose = new CompletableFuture();
        this.lastKnownLinkError = null;
        this.prefetchCountSync = new Object();
        this.retryPolicy = factory.getRetryPolicy();
        this.pendingReceives = new ConcurrentLinkedQueue();
        this.pendingUpdateStateRequests = new ConcurrentHashMap();
        this.tagsToDeliveriesMap = new ConcurrentHashMap();
        this.lastKnownErrorReportedAt = Instant.now();
        this.receiveLinkReopenFuture = null;
        this.creditToFlow = new AtomicInteger();
        this.creditNeededtoServePendingReceives = new AtomicInteger();
        this.currentPrefetechedMessagesCount = new AtomicInteger();
        this.entityType = entityType;
        this.timedOutUpdateStateRequestsDaemon = () -> {
            try {
                if (this.getIsClosed()) {
                    this.updateStateRequestsTimeoutChecker.cancel(true);
                    return;
                }
                TRACE_LOGGER.trace("Starting '{}' core message receiver's internal loop to complete timed out update state requests.", (Object)this.receivePath);
                for (Map.Entry<String, UpdateStateWorkItem> entry : this.pendingUpdateStateRequests.entrySet()) {
                    Duration remainingTime = entry.getValue().getTimeoutTracker().remaining();
                    if (!remainingTime.isZero() && !remainingTime.isNegative()) continue;
                    this.pendingUpdateStateRequests.remove(entry.getKey());
                    Exception exception = entry.getValue().getLastKnownException();
                    if (exception == null) {
                        exception = new TimeoutException("Request timed out.");
                    }
                    TRACE_LOGGER.info("UpdateState request timed out. Delivery:{}", (Object)entry.getKey(), (Object)exception);
                    AsyncUtil.completeFutureExceptionally(entry.getValue().getWork(), exception);
                }
                TRACE_LOGGER.trace("'{}' core message receiver's internal loop to complete timed out update state requests stopped.", (Object)this.receivePath);
            }
            catch (Throwable throwable) {
                // empty catch block
            }
        };
        this.returnMesagesLoopDaemon = () -> {
            try {
                ReceiveWorkItem currentReceive;
                if (this.getIsClosed()) {
                    this.returnMessagesLoopRunner.cancel(true);
                    return;
                }
                TRACE_LOGGER.trace("Starting '{}' core message receiver's internal loop to return messages to waiting clients.", (Object)this.receivePath);
                while (!this.prefetchedMessages.isEmpty() && (currentReceive = this.pendingReceives.poll()) != null) {
                    if (currentReceive.getWork().isDone()) continue;
                    TRACE_LOGGER.debug("Returning the message received from '{}' to a pending receive request", (Object)this.receivePath);
                    currentReceive.cancelTimeoutTask(false);
                    List<MessageWithDeliveryTag> messages = this.receiveCore(currentReceive.getMaxMessageCount());
                    this.reduceCreditForCompletedReceiveRequest(currentReceive.getMaxMessageCount());
                    AsyncUtil.completeFuture(currentReceive.getWork(), messages);
                }
                TRACE_LOGGER.trace("'{}' core message receiver's internal loop to return messages to waiting clients stopped.", (Object)this.receivePath);
            }
            catch (Throwable throwable) {
                // empty catch block
            }
        };
    }

    @Deprecated
    public static CompletableFuture<CoreMessageReceiver> create(MessagingFactory factory, String name, String recvPath, int prefetchCount, SettleModePair settleModePair) {
        return CoreMessageReceiver.create(factory, name, recvPath, prefetchCount, settleModePair, null);
    }

    @Deprecated
    public static CompletableFuture<CoreMessageReceiver> create(MessagingFactory factory, String name, String recvPath, String sessionId, boolean isBrowsableSession, int prefetchCount, SettleModePair settleModePair) {
        return CoreMessageReceiver.create(factory, name, recvPath, sessionId, isBrowsableSession, prefetchCount, settleModePair, null);
    }

    public static CompletableFuture<CoreMessageReceiver> create(MessagingFactory factory, String name, String recvPath, int prefetchCount, SettleModePair settleModePair, MessagingEntityType entityType) {
        TRACE_LOGGER.info("Creating core message receiver to '{}'", (Object)recvPath);
        CoreMessageReceiver msgReceiver = new CoreMessageReceiver(factory, name, recvPath, null, prefetchCount, settleModePair, entityType);
        return msgReceiver.createLink();
    }

    public static CompletableFuture<CoreMessageReceiver> create(MessagingFactory factory, String name, String recvPath, String sessionId, boolean isBrowsableSession, int prefetchCount, SettleModePair settleModePair, MessagingEntityType entityType) {
        TRACE_LOGGER.info("Creating core session receiver to '{}', sessionId '{}', browseonly session '{}'", new Object[]{recvPath, sessionId, isBrowsableSession});
        CoreMessageReceiver msgReceiver = new CoreMessageReceiver(factory, name, recvPath, sessionId, prefetchCount, settleModePair, entityType);
        msgReceiver.isSessionReceiver = true;
        msgReceiver.isBrowsableSession = isBrowsableSession;
        return msgReceiver.createLink();
    }

    private CompletableFuture<CoreMessageReceiver> createLink() {
        this.linkOpen = new WorkItem(new CompletableFuture(), this.operationTimeout);
        this.scheduleLinkOpenTimeout(this.linkOpen.getTimeoutTracker());
        this.sendTokenAndSetRenewTimer(false).handleAsync((v, sasTokenEx) -> {
            if (sasTokenEx != null) {
                Throwable cause = ExceptionUtil.extractAsyncCompletionCause(sasTokenEx);
                TRACE_LOGGER.info("Sending SAS Token failed. ReceivePath:{}", (Object)this.receivePath, (Object)cause);
                this.linkOpen.getWork().completeExceptionally(cause);
            } else {
                try {
                    this.underlyingFactory.scheduleOnReactorThread(new DispatchHandler(){

                        @Override
                        public void onEvent() {
                            CoreMessageReceiver.this.createReceiveLink();
                        }
                    });
                }
                catch (IOException ioException) {
                    this.cancelSASTokenRenewTimer();
                    this.linkOpen.getWork().completeExceptionally(new ServiceBusException(false, "Failed to create Receiver, see cause for more details.", ioException));
                }
            }
            return null;
        }, (Executor)MessagingFactory.INTERNAL_THREAD_POOL);
        return this.linkOpen.getWork();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private CompletableFuture<Void> createRequestResponseLinkAsync() {
        Object object = this.requestResonseLinkCreationLock;
        synchronized (object) {
            if (this.requestResponseLinkCreationFuture == null) {
                this.requestResponseLinkCreationFuture = new CompletableFuture();
                this.underlyingFactory.obtainRequestResponseLinkAsync(this.receivePath, this.entityType).handleAsync((rrlink, ex) -> {
                    if (ex == null) {
                        this.requestResponseLink = rrlink;
                        this.requestResponseLinkCreationFuture.complete(null);
                    } else {
                        Throwable cause = ExceptionUtil.extractAsyncCompletionCause(ex);
                        this.requestResponseLinkCreationFuture.completeExceptionally(cause);
                        Object object = this.requestResonseLinkCreationLock;
                        synchronized (object) {
                            this.requestResponseLinkCreationFuture = null;
                        }
                    }
                    return null;
                }, (Executor)MessagingFactory.INTERNAL_THREAD_POOL);
            }
            return this.requestResponseLinkCreationFuture;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void closeRequestResponseLink() {
        Object object = this.requestResonseLinkCreationLock;
        synchronized (object) {
            if (this.requestResponseLinkCreationFuture != null) {
                this.requestResponseLinkCreationFuture.thenRun(() -> {
                    this.underlyingFactory.releaseRequestResponseLink(this.receivePath);
                    this.requestResponseLink = null;
                });
                this.requestResponseLinkCreationFuture = null;
            }
        }
    }

    private void createReceiveLink() {
        TRACE_LOGGER.info("Creating receive link to '{}'", (Object)this.receivePath);
        Connection connection = this.underlyingFactory.getActiveConnectionOrNothing();
        if (connection == null) {
            TRACE_LOGGER.warn("Idle connection closed by service just after sending CBS token. Very rare case. Will retry.");
            ServiceBusException exception = new ServiceBusException(true, "Idle connection closed by service just after sending CBS token. Please retry.");
            if (this.linkOpen != null && !this.linkOpen.getWork().isDone()) {
                AsyncUtil.completeFutureExceptionally(this.linkOpen.getWork(), exception);
            }
            if (this.receiveLinkReopenFuture != null && !this.receiveLinkReopenFuture.isDone()) {
                AsyncUtil.completeFutureExceptionally(this.receiveLinkReopenFuture, exception);
            }
            return;
        }
        Session session = connection.session();
        session.setIncomingCapacity(Integer.MAX_VALUE);
        session.open();
        BaseHandler.setHandler((Extendable)session, (Handler)new SessionHandler(this.receivePath));
        String receiveLinkNamePrefix = "Receiver".concat("_").concat(StringUtil.getShortRandomString());
        String receiveLinkName = !StringUtil.isNullOrEmpty(connection.getRemoteContainer()) ? receiveLinkNamePrefix.concat("_").concat(connection.getRemoteContainer()) : receiveLinkNamePrefix;
        Receiver receiver = session.receiver(receiveLinkName);
        Source source = new Source();
        source.setAddress(this.receivePath);
        HashMap<Symbol, Object> linkProperties = new HashMap<Symbol, Object>();
        linkProperties.put(ClientConstants.LINK_TIMEOUT_PROPERTY, UnsignedInteger.valueOf((long)Util.adjustServerTimeout(this.underlyingFactory.getOperationTimeout()).toMillis()));
        if (this.entityType != null) {
            linkProperties.put(ClientConstants.ENTITY_TYPE_PROPERTY, this.entityType.getIntValue());
        }
        if (this.isSessionReceiver) {
            HashMap<Symbol, String> filterMap = new HashMap<Symbol, String>();
            filterMap.put(ClientConstants.SESSION_FILTER, this.sessionId);
            source.setFilter(filterMap);
            linkProperties.put(ClientConstants.LINK_PEEKMODE_PROPERTY, this.isBrowsableSession);
        }
        receiver.setSource((org.apache.qpid.proton.amqp.transport.Source)source);
        receiver.setTarget((org.apache.qpid.proton.amqp.transport.Target)new Target());
        TRACE_LOGGER.debug("Receive link settle mode '{}'", (Object)this.settleModePair);
        receiver.setSenderSettleMode(this.settleModePair.getSenderSettleMode());
        receiver.setReceiverSettleMode(this.settleModePair.getReceiverSettleMode());
        receiver.setProperties(linkProperties);
        ReceiveLinkHandler handler = new ReceiveLinkHandler(this);
        BaseHandler.setHandler((Extendable)receiver, (Handler)handler);
        receiver.open();
        this.receiveLink = receiver;
        this.underlyingFactory.registerForConnectionError((Link)this.receiveLink);
    }

    CompletableFuture<Void> sendTokenAndSetRenewTimer(boolean retryOnFailure) {
        if (this.getIsClosingOrClosed()) {
            return CompletableFuture.completedFuture(null);
        }
        CompletableFuture<ScheduledFuture<?>> sendTokenFuture = this.underlyingFactory.sendSecurityTokenAndSetRenewTimer(this.sasTokenAudienceURI, retryOnFailure, () -> this.sendTokenAndSetRenewTimer(true));
        return sendTokenFuture.thenAccept(f -> {
            this.sasTokenRenewTimerFuture = f;
        });
    }

    private void throwIfInUnusableState() {
        if (this.isSessionReceiver && this.isSessionLockLost) {
            throw new IllegalStateException("Session lock lost and cannot be used. Close this session and accept another session.");
        }
        this.throwIfClosed(this.lastKnownLinkError);
    }

    private void cancelSASTokenRenewTimer() {
        if (this.sasTokenRenewTimerFuture != null && !this.sasTokenRenewTimerFuture.isDone()) {
            this.sasTokenRenewTimerFuture.cancel(true);
            TRACE_LOGGER.debug("Cancelled SAS Token renew timer");
        }
    }

    private List<MessageWithDeliveryTag> receiveCore(int messageCount) {
        LinkedList<MessageWithDeliveryTag> returnMessages = null;
        MessageWithDeliveryTag currentMessage = this.prefetchedMessages.poll();
        int returnedMessageCount = 0;
        while (currentMessage != null) {
            this.currentPrefetechedMessagesCount.decrementAndGet();
            if (returnMessages == null) {
                returnMessages = new LinkedList<MessageWithDeliveryTag>();
            }
            returnMessages.add(currentMessage);
            if (++returnedMessageCount >= messageCount) break;
            currentMessage = this.prefetchedMessages.poll();
        }
        return returnMessages;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public int getPrefetchCount() {
        Object object = this.prefetchCountSync;
        synchronized (object) {
            return this.prefetchCount;
        }
    }

    public String getSessionId() {
        return this.sessionId;
    }

    public Instant getSessionLockedUntilUtc() {
        if (this.isSessionReceiver) {
            return this.sessionLockedUntilUtc;
        }
        throw new RuntimeException("Object is not a session receiver");
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void setPrefetchCount(int value) throws ServiceBusException {
        int deltaPrefetchCount;
        if (value < 0) {
            throw new IllegalArgumentException("Prefetch count cannot be negative.");
        }
        this.throwIfInUnusableState();
        Object object = this.prefetchCountSync;
        synchronized (object) {
            deltaPrefetchCount = value - this.prefetchCount;
            this.prefetchCount = value;
            TRACE_LOGGER.info("Setting prefetch count to '{}' on recieve link to '{}'", (Object)value, (Object)this.receivePath);
        }
        if (deltaPrefetchCount > 0) {
            try {
                this.underlyingFactory.scheduleOnReactorThread(new DispatchHandler(){

                    @Override
                    public void onEvent() {
                        CoreMessageReceiver.this.sendFlow(deltaPrefetchCount);
                    }
                });
            }
            catch (IOException ioException) {
                throw new ServiceBusException(false, "Setting prefetch count failed, see cause for more details", ioException);
            }
        }
    }

    public CompletableFuture<Collection<MessageWithDeliveryTag>> receiveAsync(int maxMessageCount, Duration timeout) {
        this.throwIfInUnusableState();
        if (maxMessageCount <= 0) {
            throw new IllegalArgumentException("parameter 'maxMessageCount' should be a positive number");
        }
        TRACE_LOGGER.debug("Receiving maximum of '{}' messages from '{}'", (Object)maxMessageCount, (Object)this.receivePath);
        CompletableFuture<Collection<MessageWithDeliveryTag>> onReceive = new CompletableFuture<Collection<MessageWithDeliveryTag>>();
        ReceiveWorkItem receiveWorkItem = new ReceiveWorkItem(onReceive, timeout, maxMessageCount);
        this.creditNeededtoServePendingReceives.addAndGet(maxMessageCount);
        this.pendingReceives.add(receiveWorkItem);
        if (timeout == Duration.ZERO) {
            timeout = ZERO_TIMEOUT_APPROXIMATION;
        }
        Timer.schedule(() -> {
            if (this.pendingReceives.remove(receiveWorkItem)) {
                this.reduceCreditForCompletedReceiveRequest(receiveWorkItem.getMaxMessageCount());
                TRACE_LOGGER.info("No messages received from '{}'. Pending receive request timed out. Returning null to the client.", (Object)this.receivePath);
                AsyncUtil.completeFuture(receiveWorkItem.getWork(), null);
            }
        }, timeout, TimerType.OneTimeRun);
        this.ensureLinkIsOpen().thenRun(() -> this.addCredit(receiveWorkItem));
        return onReceive;
    }

    @Override
    public void onOpenComplete(Exception exception) {
        if (exception == null) {
            TRACE_LOGGER.info("Receive link to '{}' opened.", (Object)this.receivePath);
            if (this.isSessionReceiver) {
                Map remoteSourceFilter = ((Source)this.receiveLink.getRemoteSource()).getFilter();
                if (remoteSourceFilter != null && remoteSourceFilter.containsKey(ClientConstants.SESSION_FILTER)) {
                    String remoteSessionId;
                    this.sessionId = remoteSessionId = (String)remoteSourceFilter.get(ClientConstants.SESSION_FILTER);
                    if (this.receiveLink.getRemoteProperties() != null && this.receiveLink.getRemoteProperties().containsKey(ClientConstants.LOCKED_UNTIL_UTC)) {
                        this.sessionLockedUntilUtc = Util.convertDotNetTicksToInstant((Long)this.receiveLink.getRemoteProperties().get(ClientConstants.LOCKED_UNTIL_UTC));
                    } else {
                        TRACE_LOGGER.info("Accepted a session with id '{}', from '{}' which didn't set '{}' property on the receive link.", new Object[]{this.sessionId, this.receivePath, ClientConstants.LOCKED_UNTIL_UTC});
                        this.sessionLockedUntilUtc = Instant.ofEpochMilli(0L);
                    }
                    TRACE_LOGGER.info("Accepted session with id '{}', lockedUntilUtc '{}' from '{}'.", new Object[]{this.sessionId, this.sessionLockedUntilUtc, this.receivePath});
                } else {
                    exception = new ServiceBusException(false, "SessionId filter not set on the remote source.");
                }
            }
        }
        if (exception == null) {
            if (this.linkOpen != null && !this.linkOpen.getWork().isDone()) {
                AsyncUtil.completeFuture(this.linkOpen.getWork(), this);
                this.updateStateRequestsTimeoutChecker = Timer.schedule(this.timedOutUpdateStateRequestsDaemon, UPDATE_STATE_REQUESTS_DAEMON_WAKE_UP_INTERVAL, TimerType.RepeatRun);
                this.returnMessagesLoopRunner = Timer.schedule(this.returnMesagesLoopDaemon, RETURN_MESSAGES_DAEMON_WAKE_UP_INTERVAL, TimerType.RepeatRun);
            }
            if (this.receiveLinkReopenFuture != null && !this.receiveLinkReopenFuture.isDone()) {
                AsyncUtil.completeFuture(this.receiveLinkReopenFuture, null);
            }
            this.lastKnownLinkError = null;
            this.underlyingFactory.getRetryPolicy().resetRetryCount(this.underlyingFactory.getClientId());
            this.sendFlow(this.prefetchCount - this.currentPrefetechedMessagesCount.get());
            TRACE_LOGGER.debug("receiverPath:{}, linkname:{}, updated-link-credit:{}, sentCredits:{}", new Object[]{this.receivePath, this.receiveLink.getName(), this.receiveLink.getCredit(), this.prefetchCount});
        } else {
            this.cancelSASTokenRenewTimer();
            if (this.linkOpen != null && !this.linkOpen.getWork().isDone()) {
                TRACE_LOGGER.info("Opening receive link '{}' to '{}' failed.", new Object[]{this.receiveLink.getName(), this.receivePath, exception});
                this.setClosed();
                ExceptionUtil.completeExceptionally(this.linkOpen.getWork(), exception, this, true);
            }
            if (this.receiveLinkReopenFuture != null && !this.receiveLinkReopenFuture.isDone()) {
                TRACE_LOGGER.info("Opening receive link '{}' to '{}' failed.", new Object[]{this.receiveLink.getName(), this.receivePath, exception});
                AsyncUtil.completeFutureExceptionally(this.receiveLinkReopenFuture, exception);
            }
            this.lastKnownLinkError = exception;
        }
    }

    @Override
    public void onReceiveComplete(Delivery delivery) {
        this.underlyingFactory.getRetryPolicy().resetRetryCount(this.getClientId());
        byte[] deliveryTag = delivery.getTag();
        String deliveryTagAsString = StringUtil.convertBytesToString(delivery.getTag());
        TRACE_LOGGER.debug("Received a delivery '{}' from '{}'", (Object)deliveryTagAsString, (Object)this.receivePath);
        if (deliveryTag == null || deliveryTag.length == 0 || !this.tagsToDeliveriesMap.containsKey(deliveryTagAsString)) {
            TRACE_LOGGER.debug("Received a message from '{}'. Adding to prefecthed messages.", (Object)this.receivePath);
            try {
                Message message = Util.readMessageFromDelivery(this.receiveLink, delivery);
                if (this.settleModePair.getSenderSettleMode() == SenderSettleMode.SETTLED) {
                    delivery.disposition((DeliveryState)Accepted.getInstance());
                    delivery.settle();
                } else {
                    this.tagsToDeliveriesMap.put(StringUtil.convertBytesToString(delivery.getTag()), delivery);
                    this.receiveLink.advance();
                }
                this.currentPrefetechedMessagesCount.incrementAndGet();
                this.prefetchedMessages.add(new MessageWithDeliveryTag(message, delivery.getTag()));
            }
            catch (Exception e) {
                TRACE_LOGGER.info("Reading message from delivery '{}' from '{}', session '{}' failed with unexpected exception.", new Object[]{deliveryTagAsString, this.receivePath, this.sessionId, e});
                delivery.disposition((DeliveryState)Released.getInstance());
                delivery.settle();
                return;
            }
        } else {
            UpdateStateWorkItem matchingUpdateStateWorkItem;
            DeliveryState remoteState = delivery.getRemoteState();
            TRACE_LOGGER.debug("Received a delivery '{}' with state '{}' from '{}'", new Object[]{deliveryTagAsString, remoteState, this.receivePath});
            Outcome remoteOutcome = null;
            if (remoteState instanceof Outcome) {
                remoteOutcome = (Outcome)remoteState;
            } else if (remoteState instanceof TransactionalState) {
                remoteOutcome = ((TransactionalState)remoteState).getOutcome();
            }
            if (remoteOutcome != null && (matchingUpdateStateWorkItem = this.pendingUpdateStateRequests.get(deliveryTagAsString)) != null) {
                DeliveryState matchingUpdateWorkItemDeliveryState = matchingUpdateStateWorkItem.getDeliveryState();
                if (matchingUpdateWorkItemDeliveryState instanceof TransactionalState) {
                    matchingUpdateWorkItemDeliveryState = (DeliveryState)((TransactionalState)matchingUpdateWorkItemDeliveryState).getOutcome();
                }
                if (remoteOutcome.getClass().getName().equals(matchingUpdateWorkItemDeliveryState.getClass().getName())) {
                    TRACE_LOGGER.debug("Completing a pending updateState operation for delivery '{}' from '{}'", (Object)deliveryTagAsString, (Object)this.receivePath);
                    this.completePendingUpdateStateWorkItem(delivery, deliveryTagAsString, matchingUpdateStateWorkItem, null);
                } else {
                    TRACE_LOGGER.info("Received delivery '{}' state '{}' doesn't match expected state '{}'", new Object[]{deliveryTagAsString, remoteState, matchingUpdateStateWorkItem.deliveryState});
                    if (remoteOutcome instanceof Rejected) {
                        Duration retryInterval;
                        Rejected rejected = (Rejected)remoteOutcome;
                        ErrorCondition error = rejected.getError();
                        Exception exception = ExceptionUtil.toException(error);
                        if (ExceptionUtil.isGeneralError(error.getCondition())) {
                            this.lastKnownLinkError = exception;
                            this.lastKnownErrorReportedAt = Instant.now();
                        }
                        if ((retryInterval = this.retryPolicy.getNextRetryInterval(this.getClientId(), exception, matchingUpdateStateWorkItem.getTimeoutTracker().remaining())) == null) {
                            TRACE_LOGGER.info("Completing pending updateState operation for delivery '{}' with exception", (Object)deliveryTagAsString, (Object)exception);
                            this.completePendingUpdateStateWorkItem(delivery, deliveryTagAsString, matchingUpdateStateWorkItem, exception);
                        } else {
                            matchingUpdateStateWorkItem.setLastKnownException(exception);
                            TRACE_LOGGER.debug("Pending updateState operation for delivery '{}' will be retried after '{}'", (Object)deliveryTagAsString, (Object)retryInterval);
                            try {
                                this.underlyingFactory.scheduleOnReactorThread((int)retryInterval.toMillis(), new DeliveryStateDispatchHandler(delivery, matchingUpdateStateWorkItem.getDeliveryState()));
                            }
                            catch (IOException ioException) {
                                this.completePendingUpdateStateWorkItem(delivery, deliveryTagAsString, matchingUpdateStateWorkItem, new ServiceBusException(false, "Operation failed while scheduling a retry on Reactor, see cause for more details.", ioException));
                            }
                        }
                    } else if (remoteOutcome instanceof Released) {
                        OperationCancelledException exception = new OperationCancelledException(remoteOutcome.toString());
                        TRACE_LOGGER.info("Completing pending updateState operation for delivery '{}' with exception", (Object)deliveryTagAsString, (Object)exception);
                        this.completePendingUpdateStateWorkItem(delivery, deliveryTagAsString, matchingUpdateStateWorkItem, exception);
                    } else {
                        ServiceBusException exception = new ServiceBusException(false, remoteOutcome.toString());
                        TRACE_LOGGER.info("Completing pending updateState operation for delivery '{}' with exception", (Object)deliveryTagAsString, (Object)exception);
                        this.completePendingUpdateStateWorkItem(delivery, deliveryTagAsString, matchingUpdateStateWorkItem, exception);
                    }
                }
            }
        }
    }

    @Override
    public void onError(Exception exception) {
        this.creditToFlow.set(0);
        this.cancelSASTokenRenewTimer();
        if (this.settleModePair.getSenderSettleMode() == SenderSettleMode.UNSETTLED) {
            this.prefetchedMessages.clear();
            this.currentPrefetechedMessagesCount.set(0);
            this.tagsToDeliveriesMap.clear();
        }
        if (this.getIsClosingOrClosed()) {
            TRACE_LOGGER.info("Receive link to '{}', sessionId '{}' closed", (Object)this.receivePath, (Object)this.sessionId);
            AsyncUtil.completeFuture(this.linkClose, null);
            this.clearAllPendingWorkItems(exception);
        } else {
            this.underlyingFactory.deregisterForConnectionError((Link)this.receiveLink);
            TRACE_LOGGER.info("Receive link '{}' to '{}', sessionId '{}' closed with error.", new Object[]{this.receiveLink.getName(), this.receivePath, this.sessionId, exception});
            this.lastKnownLinkError = exception;
            if (this.linkOpen != null && !this.linkOpen.getWork().isDone() || this.receiveLinkReopenFuture != null && !this.receiveLinkReopenFuture.isDone()) {
                this.onOpenComplete(exception);
            }
            if (!(exception == null || exception instanceof ServiceBusException && ((ServiceBusException)exception).getIsTransient())) {
                this.clearAllPendingWorkItems(exception);
                if (this.isSessionReceiver && (exception instanceof SessionLockLostException || exception instanceof SessionCannotBeLockedException)) {
                    TRACE_LOGGER.info("SessionId '{}' lock lost. Closing receiver.", (Object)this.sessionId);
                    this.isSessionLockLost = true;
                    this.closeAsync();
                }
            } else {
                Duration nextRetryInterval;
                ReceiveWorkItem workItem = this.pendingReceives.peek();
                if (workItem != null && workItem.getTimeoutTracker() != null && (nextRetryInterval = this.underlyingFactory.getRetryPolicy().getNextRetryInterval(this.getClientId(), exception, workItem.getTimeoutTracker().remaining())) != null) {
                    TRACE_LOGGER.info("Receive link '{}' to '{}', sessionId '{}' will be reopened after '{}'", new Object[]{this.receiveLink.getName(), this.receivePath, this.sessionId, nextRetryInterval});
                    Timer.schedule(() -> this.ensureLinkIsOpen(), nextRetryInterval, TimerType.OneTimeRun);
                }
            }
        }
    }

    private void reduceCreditForCompletedReceiveRequest(int maxCreditCountOfReceiveRequest) {
        this.creditNeededtoServePendingReceives.updateAndGet(c -> {
            int updatedCredit = c - maxCreditCountOfReceiveRequest;
            return updatedCredit > 0 ? updatedCredit : 0;
        });
    }

    private void addCredit(ReceiveWorkItem receiveWorkItem) {
        int currentTotalCreditToSend;
        int creditToFlowForWorkItem = this.creditNeededtoServePendingReceives.get() - (this.receiveLink.getCredit() + this.currentPrefetechedMessagesCount.get() + this.creditToFlow.get()) + this.prefetchCount;
        if (creditToFlowForWorkItem > 0 && ((currentTotalCreditToSend = this.creditToFlow.addAndGet(creditToFlowForWorkItem)) >= this.prefetchCount || currentTotalCreditToSend >= 50)) {
            try {
                this.underlyingFactory.scheduleOnReactorThread(new DispatchHandler(){

                    @Override
                    public void onEvent() {
                        int accumulatedCredit = CoreMessageReceiver.this.creditToFlow.getAndSet(0);
                        CoreMessageReceiver.this.sendFlow(accumulatedCredit);
                    }
                });
            }
            catch (IOException ioException) {
                this.pendingReceives.remove(receiveWorkItem);
                this.reduceCreditForCompletedReceiveRequest(receiveWorkItem.getMaxMessageCount());
                receiveWorkItem.getWork().completeExceptionally(CoreMessageReceiver.generateDispatacherSchedulingFailedException("completeMessage", ioException));
                receiveWorkItem.cancelTimeoutTask(false);
            }
        }
    }

    private void sendFlow(int credits) {
        if (!this.isBrowsableSession && credits > 0) {
            this.receiveLink.flow(credits);
            TRACE_LOGGER.debug("Sent flow to the service. receiverPath:{}, linkname:{}, updated-link-credit:{}, sentCredits:{}", new Object[]{this.receivePath, this.receiveLink.getName(), this.receiveLink.getCredit(), credits});
        }
    }

    private void scheduleLinkOpenTimeout(TimeoutTracker timeout) {
        Timer.schedule(() -> {
            if (!this.linkOpen.getWork().isDone()) {
                TimeoutException operationTimedout = new TimeoutException(String.format(Locale.US, "%s operation on ReceiveLink(%s) to path(%s) timed out at %s.", "Open", this.receiveLink.getName(), this.receivePath, ZonedDateTime.now()), (Throwable)this.lastKnownLinkError);
                TRACE_LOGGER.info(((Throwable)operationTimedout).getMessage());
                ExceptionUtil.completeExceptionally(this.linkOpen.getWork(), operationTimedout, this, true);
                this.setClosing();
                this.closeInternals(false);
                this.setClosed();
            }
        }, timeout.remaining(), TimerType.OneTimeRun);
    }

    private void scheduleLinkCloseTimeout(TimeoutTracker timeout) {
        Timer.schedule(() -> {
            if (!this.linkClose.isDone()) {
                TimeoutException operationTimedout = new TimeoutException(String.format(Locale.US, "%s operation on Receive Link(%s) timed out at %s", "Close", this.receiveLink.getName(), ZonedDateTime.now()));
                TRACE_LOGGER.info(((Throwable)operationTimedout).getMessage());
                ExceptionUtil.completeExceptionally(this.linkClose, operationTimedout, this, true);
            }
        }, timeout.remaining(), TimerType.OneTimeRun);
    }

    @Override
    public void onClose(ErrorCondition condition) {
        if (condition == null) {
            this.onError(new ServiceBusException(true, String.format(Locale.US, "Closing the link. LinkName(%s), EntityPath(%s)", this.receiveLink.getName(), this.receivePath)));
        } else {
            Exception completionException = ExceptionUtil.toException(condition);
            this.onError(completionException);
        }
    }

    @Override
    public ErrorContext getContext() {
        boolean isLinkOpened;
        boolean bl = isLinkOpened = this.linkOpen != null && this.linkOpen.getWork().isDone();
        String referenceId = this.receiveLink != null && this.receiveLink.getRemoteProperties() != null && this.receiveLink.getRemoteProperties().containsKey(ClientConstants.TRACKING_ID_PROPERTY) ? this.receiveLink.getRemoteProperties().get(ClientConstants.TRACKING_ID_PROPERTY).toString() : (this.receiveLink != null ? this.receiveLink.getName() : null);
        ReceiverErrorContext errorContext = new ReceiverErrorContext(this.underlyingFactory != null ? this.underlyingFactory.getHostName() : null, this.receivePath, referenceId, isLinkOpened ? Integer.valueOf(this.prefetchCount) : null, isLinkOpened && this.receiveLink != null ? Integer.valueOf(this.receiveLink.getCredit()) : null, this.currentPrefetechedMessagesCount.get());
        return errorContext;
    }

    @Override
    protected CompletableFuture<Void> onClose() {
        this.closeInternals(true);
        return this.linkClose;
    }

    private void closeInternals(final boolean waitForCloseCompletion) {
        if (!this.getIsClosed()) {
            if (this.receiveLink != null && this.receiveLink.getLocalState() != EndpointState.CLOSED) {
                try {
                    this.underlyingFactory.scheduleOnReactorThread(new DispatchHandler(){

                        @Override
                        public void onEvent() {
                            if (CoreMessageReceiver.this.receiveLink != null && CoreMessageReceiver.this.receiveLink.getLocalState() != EndpointState.CLOSED) {
                                TRACE_LOGGER.info("Closing receive link to '{}'", (Object)CoreMessageReceiver.this.receivePath);
                                CoreMessageReceiver.this.receiveLink.close();
                                CoreMessageReceiver.this.underlyingFactory.deregisterForConnectionError((Link)CoreMessageReceiver.this.receiveLink);
                                if (waitForCloseCompletion) {
                                    CoreMessageReceiver.this.scheduleLinkCloseTimeout(TimeoutTracker.create(CoreMessageReceiver.this.operationTimeout));
                                } else {
                                    AsyncUtil.completeFuture(CoreMessageReceiver.this.linkClose, null);
                                }
                            }
                        }
                    });
                }
                catch (IOException e) {
                    AsyncUtil.completeFutureExceptionally(this.linkClose, e);
                }
            } else {
                AsyncUtil.completeFuture(this.linkClose, null);
            }
            this.cancelSASTokenRenewTimer();
            this.closeRequestResponseLink();
            if (this.updateStateRequestsTimeoutChecker != null) {
                this.updateStateRequestsTimeoutChecker.cancel(false);
            }
            if (this.returnMessagesLoopRunner != null) {
                this.returnMessagesLoopRunner.cancel(false);
            }
        }
    }

    public CompletableFuture<Void> completeMessageAsync(byte[] deliveryTag, TransactionContext transaction) {
        Accepted outcome = Accepted.getInstance();
        return this.updateMessageStateAsync(deliveryTag, (Outcome)outcome, transaction);
    }

    public CompletableFuture<Void> completeMessageAsync(UUID lockToken, TransactionContext transaction) {
        return this.updateDispositionAsync(new UUID[]{lockToken}, "completed", null, null, null, transaction);
    }

    public CompletableFuture<Void> abandonMessageAsync(byte[] deliveryTag, Map<String, Object> propertiesToModify, TransactionContext transaction) {
        Modified outcome = new Modified();
        if (propertiesToModify != null) {
            outcome.setMessageAnnotations(propertiesToModify);
        }
        return this.updateMessageStateAsync(deliveryTag, (Outcome)outcome, transaction);
    }

    public CompletableFuture<Void> abandonMessageAsync(UUID lockToken, Map<String, Object> propertiesToModify, TransactionContext transaction) {
        return this.updateDispositionAsync(new UUID[]{lockToken}, "abandoned", null, null, propertiesToModify, transaction);
    }

    public CompletableFuture<Void> deferMessageAsync(byte[] deliveryTag, Map<String, Object> propertiesToModify, TransactionContext transaction) {
        Modified outcome = new Modified();
        outcome.setUndeliverableHere(Boolean.valueOf(true));
        if (propertiesToModify != null) {
            outcome.setMessageAnnotations(propertiesToModify);
        }
        return this.updateMessageStateAsync(deliveryTag, (Outcome)outcome, transaction);
    }

    public CompletableFuture<Void> deferMessageAsync(UUID lockToken, Map<String, Object> propertiesToModify, TransactionContext transaction) {
        return this.updateDispositionAsync(new UUID[]{lockToken}, "defered", null, null, propertiesToModify, transaction);
    }

    public CompletableFuture<Void> deadLetterMessageAsync(byte[] deliveryTag, String deadLetterReason, String deadLetterErrorDescription, Map<String, Object> propertiesToModify, TransactionContext transaction) {
        Rejected outcome = new Rejected();
        ErrorCondition error = new ErrorCondition(ClientConstants.DEADLETTERNAME, null);
        HashMap<String, Object> errorInfo = new HashMap<String, Object>();
        if (!StringUtil.isNullOrEmpty(deadLetterReason)) {
            errorInfo.put("DeadLetterReason", deadLetterReason);
        }
        if (!StringUtil.isNullOrEmpty(deadLetterErrorDescription)) {
            errorInfo.put("DeadLetterErrorDescription", deadLetterErrorDescription);
        }
        if (propertiesToModify != null) {
            errorInfo.putAll(propertiesToModify);
        }
        error.setInfo(errorInfo);
        outcome.setError(error);
        return this.updateMessageStateAsync(deliveryTag, (Outcome)outcome, transaction);
    }

    public CompletableFuture<Void> deadLetterMessageAsync(UUID lockToken, String deadLetterReason, String deadLetterErrorDescription, Map<String, Object> propertiesToModify, TransactionContext transaction) {
        return this.updateDispositionAsync(new UUID[]{lockToken}, "suspended", deadLetterReason, deadLetterErrorDescription, propertiesToModify, transaction);
    }

    private CompletableFuture<Void> updateMessageStateAsync(byte[] deliveryTag, Outcome outcome, TransactionContext transaction) {
        this.throwIfInUnusableState();
        CompletableFuture<Void> completeMessageFuture = new CompletableFuture<Void>();
        String deliveryTagAsString = StringUtil.convertBytesToString(deliveryTag);
        TRACE_LOGGER.debug("Updating message state of delivery '{}' to '{}'", (Object)deliveryTagAsString, (Object)outcome);
        Delivery delivery = this.tagsToDeliveriesMap.get(deliveryTagAsString);
        if (delivery == null) {
            TRACE_LOGGER.info("Delivery not found for delivery tag '{}'. Either receive link to '{}' closed with a transient error and reopened or the delivery was already settled by complete/abandon/defer/deadletter.", (Object)deliveryTagAsString, (Object)this.receivePath);
            completeMessageFuture.completeExceptionally(CoreMessageReceiver.generateDeliveryNotFoundException());
        } else {
            DeliveryState state;
            if (transaction != TransactionContext.NULL_TXN) {
                state = new TransactionalState();
                ((TransactionalState)state).setTxnId(new Binary(transaction.getTransactionId().array()));
                ((TransactionalState)state).setOutcome(outcome);
            } else {
                state = (DeliveryState)outcome;
            }
            UpdateStateWorkItem workItem = new UpdateStateWorkItem(completeMessageFuture, state, this.operationTimeout);
            this.pendingUpdateStateRequests.put(deliveryTagAsString, workItem);
            this.ensureLinkIsOpen().thenRun(() -> {
                try {
                    this.underlyingFactory.scheduleOnReactorThread(new DeliveryStateDispatchHandler(delivery, state));
                }
                catch (IOException ioException) {
                    completeMessageFuture.completeExceptionally(CoreMessageReceiver.generateDispatacherSchedulingFailedException("completeMessage", ioException));
                }
            });
        }
        return completeMessageFuture;
    }

    private synchronized CompletableFuture<Void> ensureLinkIsOpen() {
        if (this.receiveLink.getLocalState() != EndpointState.ACTIVE || this.receiveLink.getRemoteState() != EndpointState.ACTIVE) {
            if (this.receiveLinkReopenFuture == null || this.receiveLinkReopenFuture.isDone()) {
                TRACE_LOGGER.info("Recreating receive link to '{}'", (Object)this.receivePath);
                this.retryPolicy.incrementRetryCount(this.getClientId());
                CompletableFuture<Void> linkReopenFutureThatCanBeCancelled = this.receiveLinkReopenFuture = new CompletableFuture();
                Timer.schedule(() -> {
                    if (!linkReopenFutureThatCanBeCancelled.isDone()) {
                        this.cancelSASTokenRenewTimer();
                        TimeoutException operationTimedout = new TimeoutException(String.format(Locale.US, "%s operation on ReceiveLink(%s) to path(%s) timed out at %s.", "Open", this.receiveLink.getName(), this.receivePath, ZonedDateTime.now()));
                        TRACE_LOGGER.info(((Throwable)operationTimedout).getMessage());
                        AsyncUtil.completeFutureExceptionally(linkReopenFutureThatCanBeCancelled, operationTimedout);
                    }
                }, LINK_REOPEN_TIMEOUT, TimerType.OneTimeRun);
                this.cancelSASTokenRenewTimer();
                this.sendTokenAndSetRenewTimer(false).handleAsync((v, sendTokenEx) -> {
                    if (sendTokenEx != null) {
                        Throwable cause = ExceptionUtil.extractAsyncCompletionCause(sendTokenEx);
                        TRACE_LOGGER.info("Sending SAS Token to '{}' failed.", (Object)this.receivePath, (Object)cause);
                        this.receiveLinkReopenFuture.completeExceptionally((Throwable)sendTokenEx);
                        this.clearAllPendingWorkItems((Throwable)sendTokenEx);
                    } else {
                        try {
                            this.underlyingFactory.scheduleOnReactorThread(new DispatchHandler(){

                                @Override
                                public void onEvent() {
                                    CoreMessageReceiver.this.createReceiveLink();
                                }
                            });
                        }
                        catch (IOException ioEx) {
                            this.receiveLinkReopenFuture.completeExceptionally(ioEx);
                        }
                    }
                    return null;
                }, (Executor)MessagingFactory.INTERNAL_THREAD_POOL);
            }
            if (this.ensureLinkReopenFutureToWaitOn == null || this.ensureLinkReopenFutureToWaitOn.isDone()) {
                this.ensureLinkReopenFutureToWaitOn = new CompletableFuture();
                this.shouldRetryLinkReopenOnTransientFailure = true;
            }
            this.receiveLinkReopenFuture.handleAsync((v, ex) -> {
                if (ex == null) {
                    this.ensureLinkReopenFutureToWaitOn.complete(null);
                } else if (ex instanceof ServiceBusException && ((ServiceBusException)ex).getIsTransient()) {
                    if (this.shouldRetryLinkReopenOnTransientFailure) {
                        this.shouldRetryLinkReopenOnTransientFailure = false;
                        this.ensureLinkIsOpen();
                    } else {
                        this.ensureLinkReopenFutureToWaitOn.completeExceptionally((Throwable)ex);
                    }
                } else {
                    this.ensureLinkReopenFutureToWaitOn.completeExceptionally((Throwable)ex);
                }
                return null;
            }, (Executor)MessagingFactory.INTERNAL_THREAD_POOL);
            return this.ensureLinkReopenFutureToWaitOn;
        }
        return CompletableFuture.completedFuture(null);
    }

    private void completePendingUpdateStateWorkItem(Delivery delivery, String deliveryTagAsString, UpdateStateWorkItem workItem, Exception exception) {
        boolean isSettled = delivery.remotelySettled();
        if (isSettled) {
            delivery.settle();
        }
        if (exception == null) {
            AsyncUtil.completeFuture(workItem.getWork(), null);
        } else {
            ExceptionUtil.completeExceptionally(workItem.getWork(), exception, this, true);
        }
        if (isSettled) {
            this.tagsToDeliveriesMap.remove(deliveryTagAsString);
            this.pendingUpdateStateRequests.remove(deliveryTagAsString);
        }
    }

    private void clearAllPendingWorkItems(Throwable exception) {
        TRACE_LOGGER.info("Completeing all pending receive and updateState operation on the receiver to '{}'", (Object)this.receivePath);
        boolean isTransientException = exception == null || exception instanceof ServiceBusException && ((ServiceBusException)exception).getIsTransient();
        Iterator<ReceiveWorkItem> pendingRecivesIterator = this.pendingReceives.iterator();
        while (pendingRecivesIterator.hasNext()) {
            ReceiveWorkItem workItem = pendingRecivesIterator.next();
            pendingRecivesIterator.remove();
            CompletableFuture future = workItem.getWork();
            workItem.cancelTimeoutTask(false);
            this.reduceCreditForCompletedReceiveRequest(workItem.getMaxMessageCount());
            if (isTransientException) {
                AsyncUtil.completeFuture(future, null);
                continue;
            }
            ExceptionUtil.completeExceptionally(future, exception, this, true);
        }
        for (Map.Entry<String, UpdateStateWorkItem> pendingUpdate : this.pendingUpdateStateRequests.entrySet()) {
            this.pendingUpdateStateRequests.remove(pendingUpdate.getKey());
            ExceptionUtil.completeExceptionally(pendingUpdate.getValue().getWork(), exception, this, true);
        }
    }

    private static IllegalArgumentException generateDeliveryNotFoundException() {
        return new IllegalArgumentException("Delivery not found on the receive link.");
    }

    private static ServiceBusException generateDispatacherSchedulingFailedException(String operation, Exception cause) {
        return new ServiceBusException(false, operation + " failed while dispatching to Reactor, see cause for more details.", cause);
    }

    public CompletableFuture<Collection<Instant>> renewMessageLocksAsync(UUID[] lockTokens) {
        this.throwIfInUnusableState();
        if (TRACE_LOGGER.isDebugEnabled()) {
            TRACE_LOGGER.debug("Renewing message locks for lock tokens '{}' of entity '{}', sesion '{}'", new Object[]{Arrays.toString(lockTokens), this.receivePath, this.isSessionReceiver ? this.getSessionId() : ""});
        }
        return this.createRequestResponseLinkAsync().thenComposeAsync(v -> {
            HashMap<String, Object> requestBodyMap = new HashMap<String, Object>();
            requestBodyMap.put("lock-tokens", lockTokens);
            if (this.isSessionReceiver) {
                requestBodyMap.put("session-id", this.getSessionId());
            }
            Message requestMessage = RequestResponseUtils.createRequestMessageFromPropertyBag("com.microsoft:renew-lock", requestBodyMap, Util.adjustServerTimeout(this.operationTimeout), this.receiveLink.getName());
            CompletableFuture<Message> responseFuture = this.requestResponseLink.requestAysnc(requestMessage, TransactionContext.NULL_TXN, this.operationTimeout);
            return responseFuture.thenComposeAsync(responseMessage -> {
                CompletableFuture<Collection> returningFuture = new CompletableFuture<Collection>();
                int statusCode = RequestResponseUtils.getResponseStatusCode(responseMessage);
                if (statusCode == 200) {
                    if (TRACE_LOGGER.isDebugEnabled()) {
                        TRACE_LOGGER.debug("Message locks for lock tokens '{}' renewed", (Object)Arrays.toString(lockTokens));
                    }
                    Date[] expirations = (Date[])RequestResponseUtils.getResponseBody(responseMessage).get("expirations");
                    returningFuture.complete(Arrays.stream(expirations).map(d -> d.toInstant()).collect(Collectors.toList()));
                } else {
                    Exception failureException = RequestResponseUtils.genereateExceptionFromResponse(responseMessage);
                    TRACE_LOGGER.info("Renewing message locks for lock tokens '{}' on entity '{}' failed", new Object[]{Arrays.toString(lockTokens), this.receivePath, failureException});
                    returningFuture.completeExceptionally(failureException);
                }
                return returningFuture;
            }, (Executor)MessagingFactory.INTERNAL_THREAD_POOL);
        }, (Executor)MessagingFactory.INTERNAL_THREAD_POOL);
    }

    public CompletableFuture<Collection<MessageWithLockToken>> receiveDeferredMessageBatchAsync(Long[] sequenceNumbers) {
        this.throwIfInUnusableState();
        if (TRACE_LOGGER.isDebugEnabled()) {
            TRACE_LOGGER.debug("Receiving messages for sequence numbers '{}' from entity '{}', sesion '{}'", new Object[]{Arrays.toString((Object[])sequenceNumbers), this.receivePath, this.isSessionReceiver ? this.getSessionId() : ""});
        }
        return this.createRequestResponseLinkAsync().thenComposeAsync(v -> {
            HashMap<String, Object> requestBodyMap = new HashMap<String, Object>();
            requestBodyMap.put("sequence-numbers", sequenceNumbers);
            requestBodyMap.put("receiver-settle-mode", UnsignedInteger.valueOf((int)(this.settleModePair.getReceiverSettleMode() == ReceiverSettleMode.FIRST ? 0 : 1)));
            if (this.isSessionReceiver) {
                requestBodyMap.put("session-id", this.getSessionId());
            }
            Message requestMessage = RequestResponseUtils.createRequestMessageFromPropertyBag("com.microsoft:receive-by-sequence-number", requestBodyMap, Util.adjustServerTimeout(this.operationTimeout), this.receiveLink.getName());
            CompletableFuture<Message> responseFuture = this.requestResponseLink.requestAysnc(requestMessage, TransactionContext.NULL_TXN, this.operationTimeout);
            return responseFuture.thenComposeAsync(responseMessage -> {
                CompletableFuture returningFuture = new CompletableFuture();
                int statusCode = RequestResponseUtils.getResponseStatusCode(responseMessage);
                if (statusCode == 200) {
                    Object messages;
                    if (TRACE_LOGGER.isDebugEnabled()) {
                        TRACE_LOGGER.debug("Received messges for sequence numbers '{}' from entity '{}', sesion '{}'", new Object[]{Arrays.toString((Object[])sequenceNumbers), this.receivePath, this.isSessionReceiver ? this.getSessionId() : ""});
                    }
                    ArrayList<MessageWithLockToken> receivedMessages = new ArrayList<MessageWithLockToken>();
                    Object responseBodyMap = ((AmqpValue)responseMessage.getBody()).getValue();
                    if (responseBodyMap != null && responseBodyMap instanceof Map && (messages = ((Map)responseBodyMap).get("messages")) != null && messages instanceof Iterable) {
                        for (Object message : (Iterable)messages) {
                            if (!(message instanceof Map)) continue;
                            Message receivedMessage = Message.Factory.create();
                            Binary messagePayLoad = (Binary)((Map)message).get("message");
                            receivedMessage.decode(messagePayLoad.getArray(), messagePayLoad.getArrayOffset(), messagePayLoad.getLength());
                            UUID lockToken = ClientConstants.ZEROLOCKTOKEN;
                            if (((Map)message).containsKey("lock-token")) {
                                lockToken = (UUID)((Map)message).get("lock-token");
                            }
                            receivedMessages.add(new MessageWithLockToken(receivedMessage, lockToken));
                        }
                    }
                    returningFuture.complete(receivedMessages);
                } else {
                    Exception failureException = RequestResponseUtils.genereateExceptionFromResponse(responseMessage);
                    TRACE_LOGGER.info("Receiving messages by sequence numbers '{}' from entity '{}' failed", new Object[]{Arrays.toString((Object[])sequenceNumbers), this.receivePath, failureException});
                    returningFuture.completeExceptionally(failureException);
                }
                return returningFuture;
            }, (Executor)MessagingFactory.INTERNAL_THREAD_POOL);
        }, (Executor)MessagingFactory.INTERNAL_THREAD_POOL);
    }

    public CompletableFuture<Void> updateDispositionAsync(UUID[] lockTokens, String dispositionStatus, String deadLetterReason, String deadLetterErrorDescription, Map<String, Object> propertiesToModify, TransactionContext transaction) {
        this.throwIfInUnusableState();
        if (TRACE_LOGGER.isDebugEnabled()) {
            TRACE_LOGGER.debug("Update disposition of deliveries '{}' to '{}' on entity '{}', sesion '{}'", new Object[]{Arrays.toString(lockTokens), dispositionStatus, this.receivePath, this.isSessionReceiver ? this.getSessionId() : ""});
        }
        return this.createRequestResponseLinkAsync().thenComposeAsync(v -> {
            HashMap<String, Object> requestBodyMap = new HashMap<String, Object>();
            requestBodyMap.put("lock-tokens", lockTokens);
            requestBodyMap.put("disposition-status", dispositionStatus);
            if (deadLetterReason != null) {
                requestBodyMap.put("deadletter-reason", deadLetterReason);
            }
            if (deadLetterErrorDescription != null) {
                requestBodyMap.put("deadletter-description", deadLetterErrorDescription);
            }
            if (propertiesToModify != null && propertiesToModify.size() > 0) {
                requestBodyMap.put("properties-to-modify", propertiesToModify);
            }
            if (this.isSessionReceiver) {
                requestBodyMap.put("session-id", this.getSessionId());
            }
            Message requestMessage = RequestResponseUtils.createRequestMessageFromPropertyBag("com.microsoft:update-disposition", requestBodyMap, Util.adjustServerTimeout(this.operationTimeout), this.receiveLink.getName());
            CompletableFuture<Message> responseFuture = this.requestResponseLink.requestAysnc(requestMessage, transaction, this.operationTimeout);
            return responseFuture.thenComposeAsync(responseMessage -> {
                CompletableFuture returningFuture = new CompletableFuture();
                int statusCode = RequestResponseUtils.getResponseStatusCode(responseMessage);
                if (statusCode == 200) {
                    if (TRACE_LOGGER.isDebugEnabled()) {
                        TRACE_LOGGER.debug("Update disposition of deliveries '{}' to '{}' on entity '{}', sesion '{}' succeeded.", new Object[]{Arrays.toString(lockTokens), dispositionStatus, this.receivePath, this.isSessionReceiver ? this.getSessionId() : ""});
                    }
                    returningFuture.complete(null);
                } else {
                    Exception failureException = RequestResponseUtils.genereateExceptionFromResponse(responseMessage);
                    TRACE_LOGGER.info("Update disposition on entity '{}' failed", (Object)this.receivePath, (Object)failureException);
                    returningFuture.completeExceptionally(failureException);
                }
                return returningFuture;
            }, (Executor)MessagingFactory.INTERNAL_THREAD_POOL);
        }, (Executor)MessagingFactory.INTERNAL_THREAD_POOL);
    }

    public CompletableFuture<Void> renewSessionLocksAsync() {
        this.throwIfInUnusableState();
        TRACE_LOGGER.debug("Renewing session lock on entity '{}' of sesion '{}'", (Object)this.receivePath, (Object)this.getSessionId());
        return this.createRequestResponseLinkAsync().thenComposeAsync(v -> {
            HashMap<String, String> requestBodyMap = new HashMap<String, String>();
            requestBodyMap.put("session-id", this.getSessionId());
            Message requestMessage = RequestResponseUtils.createRequestMessageFromPropertyBag("com.microsoft:renew-session-lock", requestBodyMap, Util.adjustServerTimeout(this.operationTimeout), this.receiveLink.getName());
            CompletableFuture<Message> responseFuture = this.requestResponseLink.requestAysnc(requestMessage, TransactionContext.NULL_TXN, this.operationTimeout);
            return responseFuture.thenComposeAsync(responseMessage -> {
                CompletableFuture returningFuture = new CompletableFuture();
                int statusCode = RequestResponseUtils.getResponseStatusCode(responseMessage);
                if (statusCode == 200) {
                    Date expiration = (Date)RequestResponseUtils.getResponseBody(responseMessage).get("expiration");
                    this.sessionLockedUntilUtc = expiration.toInstant();
                    TRACE_LOGGER.debug("Session lock on entity '{}' of sesion '{}' renewed until '{}'", new Object[]{this.receivePath, this.getSessionId(), this.sessionLockedUntilUtc});
                    returningFuture.complete(null);
                } else {
                    Exception failureException = RequestResponseUtils.genereateExceptionFromResponse(responseMessage);
                    TRACE_LOGGER.info("Renewing session lock on entity '{}' of sesion '{}' failed", new Object[]{this.receivePath, this.getSessionId(), failureException});
                    returningFuture.completeExceptionally(failureException);
                }
                return returningFuture;
            }, (Executor)MessagingFactory.INTERNAL_THREAD_POOL);
        }, (Executor)MessagingFactory.INTERNAL_THREAD_POOL);
    }

    public CompletableFuture<byte[]> getSessionStateAsync() {
        this.throwIfInUnusableState();
        TRACE_LOGGER.debug("Getting session state of sesion '{}' from entity '{}'", (Object)this.getSessionId(), (Object)this.receivePath);
        return this.createRequestResponseLinkAsync().thenComposeAsync(v -> {
            HashMap<String, String> requestBodyMap = new HashMap<String, String>();
            requestBodyMap.put("session-id", this.getSessionId());
            Message requestMessage = RequestResponseUtils.createRequestMessageFromPropertyBag("com.microsoft:get-session-state", requestBodyMap, Util.adjustServerTimeout(this.operationTimeout), this.receiveLink.getName());
            CompletableFuture<Message> responseFuture = this.requestResponseLink.requestAysnc(requestMessage, TransactionContext.NULL_TXN, this.operationTimeout);
            return responseFuture.thenComposeAsync(responseMessage -> {
                CompletableFuture<byte[]> returningFuture = new CompletableFuture<byte[]>();
                int statusCode = RequestResponseUtils.getResponseStatusCode(responseMessage);
                if (statusCode == 200) {
                    Object sessionState;
                    TRACE_LOGGER.debug("Got session state of sesion '{}' from entity '{}'", (Object)this.getSessionId(), (Object)this.receivePath);
                    byte[] receivedState = null;
                    Map bodyMap = RequestResponseUtils.getResponseBody(responseMessage);
                    if (bodyMap.containsKey("session-state") && (sessionState = bodyMap.get("session-state")) != null) {
                        receivedState = ((Binary)sessionState).getArray();
                    }
                    returningFuture.complete(receivedState);
                } else {
                    Exception failureException = RequestResponseUtils.genereateExceptionFromResponse(responseMessage);
                    TRACE_LOGGER.info("Getting session state of sesion '{}' from entity '{}' failed", new Object[]{this.getSessionId(), this.receivePath, failureException});
                    returningFuture.completeExceptionally(failureException);
                }
                return returningFuture;
            }, (Executor)MessagingFactory.INTERNAL_THREAD_POOL);
        }, (Executor)MessagingFactory.INTERNAL_THREAD_POOL);
    }

    public CompletableFuture<Void> setSessionStateAsync(byte[] sessionState) {
        this.throwIfInUnusableState();
        TRACE_LOGGER.debug("Setting session state of sesion '{}' on entity '{}'", (Object)this.getSessionId(), (Object)this.receivePath);
        return this.createRequestResponseLinkAsync().thenComposeAsync(v -> {
            HashMap<String, String> requestBodyMap = new HashMap<String, String>();
            requestBodyMap.put("session-id", this.getSessionId());
            requestBodyMap.put("session-state", (String)(sessionState == null ? null : new Binary(sessionState)));
            Message requestMessage = RequestResponseUtils.createRequestMessageFromPropertyBag("com.microsoft:set-session-state", requestBodyMap, Util.adjustServerTimeout(this.operationTimeout), this.receiveLink.getName());
            CompletableFuture<Message> responseFuture = this.requestResponseLink.requestAysnc(requestMessage, TransactionContext.NULL_TXN, this.operationTimeout);
            return responseFuture.thenComposeAsync(responseMessage -> {
                CompletableFuture returningFuture = new CompletableFuture();
                int statusCode = RequestResponseUtils.getResponseStatusCode(responseMessage);
                if (statusCode == 200) {
                    TRACE_LOGGER.debug("Setting session state of sesion '{}' on entity '{}' succeeded", (Object)this.getSessionId(), (Object)this.receivePath);
                    returningFuture.complete(null);
                } else {
                    Exception failureException = RequestResponseUtils.genereateExceptionFromResponse(responseMessage);
                    TRACE_LOGGER.info("Setting session state of sesion '{}' on entity '{}' failed", new Object[]{this.getSessionId(), this.receivePath, failureException});
                    returningFuture.completeExceptionally(failureException);
                }
                return returningFuture;
            }, (Executor)MessagingFactory.INTERNAL_THREAD_POOL);
        }, (Executor)MessagingFactory.INTERNAL_THREAD_POOL);
    }

    public CompletableFuture<Collection<Message>> peekMessagesAsync(long fromSequenceNumber, int messageCount, String sessionId) {
        this.throwIfInUnusableState();
        return this.createRequestResponseLinkAsync().thenComposeAsync(v -> CommonRequestResponseOperations.peekMessagesAsync(this.requestResponseLink, this.operationTimeout, fromSequenceNumber, messageCount, sessionId, this.receiveLink.getName()), (Executor)MessagingFactory.INTERNAL_THREAD_POOL);
    }

    private static class DeliveryStateDispatchHandler
    extends DispatchHandler {
        final Delivery delivery;
        final DeliveryState deliveryState;

        DeliveryStateDispatchHandler(Delivery delivery, DeliveryState deliveryState) {
            this.delivery = delivery;
            this.deliveryState = deliveryState;
        }

        @Override
        public void onEvent() {
            this.delivery.disposition(this.deliveryState);
        }
    }
}

