/*
 * Decompiled with CFR 0.152.
 */
package com.microsoft.azure.servicebus;

import com.microsoft.azure.servicebus.ActiveClientTokenManager;
import com.microsoft.azure.servicebus.ClientConstants;
import com.microsoft.azure.servicebus.ClientEntity;
import com.microsoft.azure.servicebus.ErrorContext;
import com.microsoft.azure.servicebus.ExceptionUtil;
import com.microsoft.azure.servicebus.IErrorContextProvider;
import com.microsoft.azure.servicebus.IReceiverSettingsProvider;
import com.microsoft.azure.servicebus.MessagingFactory;
import com.microsoft.azure.servicebus.OperationCancelledException;
import com.microsoft.azure.servicebus.ReceiverContext;
import com.microsoft.azure.servicebus.ServiceBusException;
import com.microsoft.azure.servicebus.TimeoutException;
import com.microsoft.azure.servicebus.TimeoutTracker;
import com.microsoft.azure.servicebus.Timer;
import com.microsoft.azure.servicebus.TimerType;
import com.microsoft.azure.servicebus.TrackingUtil;
import com.microsoft.azure.servicebus.WorkItem;
import com.microsoft.azure.servicebus.amqp.DispatchHandler;
import com.microsoft.azure.servicebus.amqp.IAmqpReceiver;
import com.microsoft.azure.servicebus.amqp.IOperationResult;
import com.microsoft.azure.servicebus.amqp.ReceiveLinkHandler;
import java.io.IOException;
import java.security.InvalidKeyException;
import java.security.NoSuchAlgorithmException;
import java.time.Duration;
import java.time.ZonedDateTime;
import java.util.Collection;
import java.util.LinkedList;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ScheduledFuture;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.logging.Level;
import java.util.logging.Logger;
import org.apache.qpid.proton.Proton;
import org.apache.qpid.proton.amqp.Symbol;
import org.apache.qpid.proton.amqp.UnknownDescribedType;
import org.apache.qpid.proton.amqp.messaging.Source;
import org.apache.qpid.proton.amqp.messaging.Target;
import org.apache.qpid.proton.amqp.transport.ErrorCondition;
import org.apache.qpid.proton.amqp.transport.ReceiverSettleMode;
import org.apache.qpid.proton.amqp.transport.SenderSettleMode;
import org.apache.qpid.proton.engine.BaseHandler;
import org.apache.qpid.proton.engine.Delivery;
import org.apache.qpid.proton.engine.EndpointState;
import org.apache.qpid.proton.engine.Extendable;
import org.apache.qpid.proton.engine.Handler;
import org.apache.qpid.proton.engine.Link;
import org.apache.qpid.proton.engine.Receiver;
import org.apache.qpid.proton.engine.Session;
import org.apache.qpid.proton.message.Message;

public final class MessageReceiver
extends ClientEntity
implements IAmqpReceiver,
IErrorContextProvider {
    private static final Logger TRACE_LOGGER = Logger.getLogger("servicebus.trace");
    private static final int MIN_TIMEOUT_DURATION_MILLIS = 20;
    private final ConcurrentLinkedQueue<ReceiveWorkItem> pendingReceives;
    private final MessagingFactory underlyingFactory;
    private final String receivePath;
    private final Runnable onOperationTimedout;
    private final Duration operationTimeout;
    private final CompletableFuture<Void> linkClose;
    private final Object prefetchCountSync;
    private final IReceiverSettingsProvider settingsProvider;
    private final String tokenAudience;
    private final ActiveClientTokenManager activeClientTokenManager;
    private final WorkItem<MessageReceiver> linkOpen;
    private final ConcurrentLinkedQueue<Message> prefetchedMessages;
    private final ReceiveWork receiveWork;
    private final CreateAndReceive createAndReceive;
    private final Object errorConditionLock;
    private int prefetchCount;
    private Receiver receiveLink;
    private Duration receiveTimeout;
    private Message lastReceivedMessage;
    private Exception lastKnownLinkError;
    private int nextCreditToFlow;
    private boolean creatingLink;
    private ScheduledFuture openTimer;
    private ScheduledFuture closeTimer;

    private MessageReceiver(MessagingFactory factory, String name, String recvPath, int prefetchCount, IReceiverSettingsProvider settingsProvider) {
        super(name, factory);
        this.underlyingFactory = factory;
        this.operationTimeout = factory.getOperationTimeout();
        this.receivePath = recvPath;
        this.prefetchCount = prefetchCount;
        this.prefetchedMessages = new ConcurrentLinkedQueue();
        this.linkClose = new CompletableFuture();
        this.lastKnownLinkError = null;
        this.receiveTimeout = factory.getOperationTimeout();
        this.prefetchCountSync = new Object();
        this.settingsProvider = settingsProvider;
        this.linkOpen = new WorkItem(new CompletableFuture(), factory.getOperationTimeout());
        this.pendingReceives = new ConcurrentLinkedQueue();
        this.errorConditionLock = new Object();
        this.onOperationTimedout = new Runnable(){

            @Override
            public void run() {
                WorkItem topWorkItem = null;
                while ((topWorkItem = (WorkItem)MessageReceiver.this.pendingReceives.peek()) != null) {
                    if (topWorkItem.getTimeoutTracker().remaining().toMillis() <= 20L) {
                        WorkItem dequedWorkItem = (WorkItem)MessageReceiver.this.pendingReceives.poll();
                        if (dequedWorkItem == null || dequedWorkItem.getWork() == null || dequedWorkItem.getWork().isDone()) break;
                        dequedWorkItem.getWork().complete(null);
                        continue;
                    }
                    MessageReceiver.this.scheduleOperationTimer(topWorkItem.getTimeoutTracker());
                    break;
                }
            }
        };
        this.receiveWork = new ReceiveWork();
        this.createAndReceive = new CreateAndReceive();
        this.tokenAudience = String.format("amqp://%s/%s", this.underlyingFactory.getHostName(), this.receivePath);
        this.activeClientTokenManager = new ActiveClientTokenManager(this, new Runnable(){

            @Override
            public void run() {
                block2: {
                    try {
                        MessageReceiver.this.underlyingFactory.getCBSChannel().sendToken(MessageReceiver.this.underlyingFactory.getReactorScheduler(), MessageReceiver.this.underlyingFactory.getTokenProvider().getToken(MessageReceiver.this.tokenAudience, ClientConstants.TOKEN_REFRESH_INTERVAL), MessageReceiver.this.tokenAudience, new IOperationResult<Void, Exception>(){

                            @Override
                            public void onComplete(Void result) {
                                if (TRACE_LOGGER.isLoggable(Level.FINE)) {
                                    TRACE_LOGGER.log(Level.FINE, String.format(Locale.US, "path[%s], linkName[%s] - token renewed", MessageReceiver.this.receivePath, MessageReceiver.this.receiveLink.getName()));
                                }
                            }

                            @Override
                            public void onError(Exception error) {
                                if (TRACE_LOGGER.isLoggable(Level.WARNING)) {
                                    TRACE_LOGGER.log(Level.WARNING, String.format(Locale.US, "path[%s], linkName[%s], tokenRenewalFailure[%s]", MessageReceiver.this.receivePath, MessageReceiver.this.receiveLink.getName(), error.getMessage()));
                                }
                            }
                        });
                    }
                    catch (IOException | RuntimeException | InvalidKeyException | NoSuchAlgorithmException exception) {
                        if (!TRACE_LOGGER.isLoggable(Level.WARNING)) break block2;
                        TRACE_LOGGER.log(Level.WARNING, String.format(Locale.US, "path[%s], linkName[%s], tokenRenewalScheduleFailure[%s]", MessageReceiver.this.receivePath, MessageReceiver.this.receiveLink.getName(), exception.getMessage()));
                    }
                }
            }
        }, ClientConstants.TOKEN_REFRESH_INTERVAL);
    }

    public static CompletableFuture<MessageReceiver> create(MessagingFactory factory, String name, String recvPath, int prefetchCount, IReceiverSettingsProvider settingsProvider) {
        MessageReceiver msgReceiver = new MessageReceiver(factory, name, recvPath, prefetchCount, settingsProvider);
        return msgReceiver.createLink();
    }

    public String getReceivePath() {
        return this.receivePath;
    }

    private CompletableFuture<MessageReceiver> createLink() {
        this.scheduleLinkOpenTimeout(this.linkOpen.getTimeoutTracker());
        try {
            this.underlyingFactory.scheduleOnReactorThread(new DispatchHandler(){

                @Override
                public void onEvent() {
                    MessageReceiver.this.createReceiveLink();
                }
            });
        }
        catch (IOException ioException) {
            this.linkOpen.getWork().completeExceptionally(new ServiceBusException(false, "Failed to create Receiver, see cause for more details.", ioException));
        }
        return this.linkOpen.getWork();
    }

    private List<Message> receiveCore(int messageCount) {
        Message currentMessage;
        LinkedList<Message> returnMessages = null;
        while ((currentMessage = this.pollPrefetchQueue()) != null) {
            if (returnMessages == null) {
                returnMessages = new LinkedList<Message>();
            }
            returnMessages.add(currentMessage);
            if (returnMessages.size() < messageCount) continue;
            break;
        }
        return returnMessages;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public int getPrefetchCount() {
        Object object = this.prefetchCountSync;
        synchronized (object) {
            return this.prefetchCount;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void setPrefetchCount(int value) throws ServiceBusException {
        int deltaPrefetchCount;
        Object object = this.prefetchCountSync;
        synchronized (object) {
            deltaPrefetchCount = this.prefetchCount - value;
            this.prefetchCount = value;
        }
        try {
            this.underlyingFactory.scheduleOnReactorThread(new DispatchHandler(){

                @Override
                public void onEvent() {
                    MessageReceiver.this.sendFlow(deltaPrefetchCount);
                }
            });
        }
        catch (IOException ioException) {
            throw new ServiceBusException(false, "Setting prefetch count failed, see cause for more details", ioException);
        }
    }

    public Duration getReceiveTimeout() {
        return this.receiveTimeout;
    }

    public void setReceiveTimeout(Duration value) {
        this.receiveTimeout = value;
    }

    public CompletableFuture<Collection<Message>> receive(int maxMessageCount) {
        this.throwIfClosed();
        if (maxMessageCount <= 0 || maxMessageCount > this.prefetchCount) {
            throw new IllegalArgumentException(String.format(Locale.US, "parameter 'maxMessageCount' should be a positive number and should be less than prefetchCount(%s)", this.prefetchCount));
        }
        if (this.pendingReceives.isEmpty()) {
            this.scheduleOperationTimer(TimeoutTracker.create(this.receiveTimeout));
        }
        CompletableFuture<Collection<Message>> onReceive = new CompletableFuture<Collection<Message>>();
        this.pendingReceives.offer(new ReceiveWorkItem(onReceive, this.receiveTimeout, maxMessageCount));
        try {
            this.underlyingFactory.scheduleOnReactorThread(this.createAndReceive);
        }
        catch (IOException ioException) {
            onReceive.completeExceptionally(new OperationCancelledException("Receive failed while dispatching to Reactor, see cause for more details.", (Throwable)ioException));
        }
        return onReceive;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void onOpenComplete(Exception exception) {
        this.creatingLink = false;
        if (exception == null) {
            if (this.getIsClosingOrClosed()) {
                this.receiveLink.close();
                return;
            }
            if (this.linkOpen != null && !this.linkOpen.getWork().isDone()) {
                this.linkOpen.getWork().complete(this);
                if (this.openTimer != null) {
                    this.openTimer.cancel(false);
                }
            }
            Object object = this.errorConditionLock;
            synchronized (object) {
                this.lastKnownLinkError = null;
            }
            this.underlyingFactory.getRetryPolicy().resetRetryCount(this.underlyingFactory.getClientId());
            this.nextCreditToFlow = 0;
            this.sendFlow(this.prefetchCount - this.prefetchedMessages.size());
            if (TRACE_LOGGER.isLoggable(Level.FINE)) {
                TRACE_LOGGER.log(Level.FINE, String.format("receiverPath[%s], linkname[%s], updated-link-credit[%s], sentCredits[%s]", this.receivePath, this.receiveLink.getName(), this.receiveLink.getCredit(), this.prefetchCount));
            }
        } else {
            if (this.linkOpen != null && !this.linkOpen.getWork().isDone()) {
                this.setClosed();
                ExceptionUtil.completeExceptionally(this.linkOpen.getWork(), exception, this);
                if (this.openTimer != null) {
                    this.openTimer.cancel(false);
                }
            }
            Object object = this.errorConditionLock;
            synchronized (object) {
                this.lastKnownLinkError = exception;
            }
        }
    }

    @Override
    public void onReceiveComplete(Delivery delivery) {
        int msgSize = delivery.pending();
        byte[] buffer = new byte[msgSize];
        int read = this.receiveLink.recv(buffer, 0, msgSize);
        Message message = Proton.message();
        message.decode(buffer, 0, read);
        delivery.settle();
        this.prefetchedMessages.add(message);
        this.underlyingFactory.getRetryPolicy().resetRetryCount(this.getClientId());
        this.receiveWork.onEvent();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void onError(Exception exception) {
        this.prefetchedMessages.clear();
        this.underlyingFactory.deregisterForConnectionError((Link)this.receiveLink);
        if (this.getIsClosingOrClosed()) {
            boolean isTransientException;
            if (this.closeTimer != null) {
                this.closeTimer.cancel(false);
            }
            WorkItem workItem = null;
            boolean bl = isTransientException = exception == null || exception instanceof ServiceBusException && ((ServiceBusException)exception).getIsTransient();
            while ((workItem = (WorkItem)this.pendingReceives.poll()) != null) {
                CompletableFuture<Object> future = workItem.getWork();
                if (isTransientException) {
                    future.complete(null);
                    continue;
                }
                ExceptionUtil.completeExceptionally(future, exception, this);
            }
            this.linkClose.complete(null);
        } else {
            Object workItem = this.errorConditionLock;
            synchronized (workItem) {
                this.lastKnownLinkError = exception == null ? this.lastKnownLinkError : exception;
            }
            Exception completionException = exception == null ? new ServiceBusException(true, "Client encountered transient error for unknown reasons, please retry the operation.") : exception;
            this.onOpenComplete(completionException);
            WorkItem workItem2 = this.pendingReceives.peek();
            Duration nextRetryInterval = workItem2 != null && workItem2.getTimeoutTracker() != null ? this.underlyingFactory.getRetryPolicy().getNextRetryInterval(this.getClientId(), completionException, workItem2.getTimeoutTracker().remaining()) : null;
            boolean recreateScheduled = true;
            if (nextRetryInterval != null) {
                try {
                    this.underlyingFactory.scheduleOnReactorThread((int)nextRetryInterval.toMillis(), new DispatchHandler(){

                        @Override
                        public void onEvent() {
                            if (!(MessageReceiver.this.getIsClosingOrClosed() || MessageReceiver.this.receiveLink.getLocalState() != EndpointState.CLOSED && MessageReceiver.this.receiveLink.getRemoteState() != EndpointState.CLOSED)) {
                                MessageReceiver.this.createReceiveLink();
                                MessageReceiver.this.underlyingFactory.getRetryPolicy().incrementRetryCount(MessageReceiver.this.getClientId());
                            }
                        }
                    });
                }
                catch (IOException ignore) {
                    recreateScheduled = false;
                }
            }
            if (nextRetryInterval == null || !recreateScheduled) {
                WorkItem pendingReceive = null;
                while ((pendingReceive = (WorkItem)this.pendingReceives.poll()) != null) {
                    ExceptionUtil.completeExceptionally(pendingReceive.getWork(), completionException, this);
                }
            }
        }
    }

    private void scheduleOperationTimer(TimeoutTracker tracker) {
        if (tracker != null) {
            Timer.schedule(this.onOperationTimedout, tracker.remaining(), TimerType.OneTimeRun);
        }
    }

    private void createReceiveLink() {
        if (this.creatingLink) {
            return;
        }
        this.creatingLink = true;
        final Consumer<Session> onSessionOpen = new Consumer<Session>(){

            /*
             * WARNING - Removed try catching itself - possible behaviour change.
             */
            @Override
            public void accept(Session session) {
                Symbol[] desiredCapabilities;
                if (MessageReceiver.this.getIsClosingOrClosed()) {
                    session.close();
                    return;
                }
                Source source = new Source();
                source.setAddress(MessageReceiver.this.receivePath);
                Map<Symbol, UnknownDescribedType> filterMap = MessageReceiver.this.settingsProvider.getFilter(MessageReceiver.this.lastReceivedMessage);
                if (filterMap != null) {
                    source.setFilter(filterMap);
                }
                Receiver receiver = session.receiver(TrackingUtil.getLinkName(session));
                receiver.setSource((org.apache.qpid.proton.amqp.transport.Source)source);
                Target target = new Target();
                receiver.setTarget((org.apache.qpid.proton.amqp.transport.Target)target);
                receiver.setSenderSettleMode(SenderSettleMode.UNSETTLED);
                receiver.setReceiverSettleMode(ReceiverSettleMode.SECOND);
                Map<Symbol, Object> linkProperties = MessageReceiver.this.settingsProvider.getProperties();
                if (linkProperties != null) {
                    receiver.setProperties(linkProperties);
                }
                if ((desiredCapabilities = MessageReceiver.this.settingsProvider.getDesiredCapabilities()) != null) {
                    receiver.setDesiredCapabilities(desiredCapabilities);
                }
                ReceiveLinkHandler handler = new ReceiveLinkHandler(MessageReceiver.this);
                BaseHandler.setHandler((Extendable)receiver, (Handler)handler);
                MessageReceiver.this.underlyingFactory.registerForConnectionError((Link)receiver);
                receiver.open();
                Object object = MessageReceiver.this.errorConditionLock;
                synchronized (object) {
                    MessageReceiver.this.receiveLink = receiver;
                }
            }
        };
        final BiConsumer<ErrorCondition, Exception> onSessionOpenFailed = new BiConsumer<ErrorCondition, Exception>(){

            @Override
            public void accept(ErrorCondition t, Exception u) {
                if (t != null) {
                    MessageReceiver.this.onError(t != null && t.getCondition() != null ? ExceptionUtil.toException(t) : null);
                } else if (u != null) {
                    MessageReceiver.this.onError(u);
                }
            }
        };
        try {
            this.underlyingFactory.getCBSChannel().sendToken(this.underlyingFactory.getReactorScheduler(), this.underlyingFactory.getTokenProvider().getToken(this.tokenAudience, ClientConstants.TOKEN_REFRESH_INTERVAL), this.tokenAudience, new IOperationResult<Void, Exception>(){

                @Override
                public void onComplete(Void result) {
                    if (MessageReceiver.this.getIsClosingOrClosed()) {
                        return;
                    }
                    MessageReceiver.this.underlyingFactory.getSession(MessageReceiver.this.receivePath, onSessionOpen, onSessionOpenFailed);
                }

                @Override
                public void onError(Exception error) {
                    MessageReceiver.this.onError(error);
                }
            });
        }
        catch (IOException | RuntimeException | InvalidKeyException | NoSuchAlgorithmException exception) {
            this.onError(exception);
        }
    }

    private Message pollPrefetchQueue() {
        Message message = this.prefetchedMessages.poll();
        if (message != null) {
            this.lastReceivedMessage = message;
            this.sendFlow(1);
        }
        return message;
    }

    private void sendFlow(int credits) {
        this.nextCreditToFlow += credits;
        if (this.nextCreditToFlow >= this.prefetchCount || this.nextCreditToFlow >= 100) {
            int tempFlow = this.nextCreditToFlow;
            this.receiveLink.flow(tempFlow);
            this.nextCreditToFlow = 0;
            if (TRACE_LOGGER.isLoggable(Level.FINE)) {
                TRACE_LOGGER.log(Level.FINE, String.format("receiverPath[%s], linkname[%s], updated-link-credit[%s], sentCredits[%s], ThreadId[%s]", this.receivePath, this.receiveLink.getName(), this.receiveLink.getCredit(), tempFlow, Thread.currentThread().getId()));
            }
        }
    }

    private void scheduleLinkOpenTimeout(TimeoutTracker timeout) {
        this.openTimer = Timer.schedule(new Runnable(){

            /*
             * WARNING - Removed try catching itself - possible behaviour change.
             */
            @Override
            public void run() {
                if (!MessageReceiver.this.linkOpen.getWork().isDone()) {
                    Exception lastReportedLinkError;
                    Receiver link;
                    Object object = MessageReceiver.this.errorConditionLock;
                    synchronized (object) {
                        link = MessageReceiver.this.receiveLink;
                        lastReportedLinkError = MessageReceiver.this.lastKnownLinkError;
                    }
                    TimeoutException operationTimedout = new TimeoutException(String.format(Locale.US, "%s operation on ReceiveLink(%s) to path(%s) timed out at %s.", "Open", link.getName(), MessageReceiver.this.receivePath, ZonedDateTime.now()), (Throwable)lastReportedLinkError);
                    if (TRACE_LOGGER.isLoggable(Level.WARNING)) {
                        TRACE_LOGGER.log(Level.WARNING, String.format(Locale.US, "receiverPath[%s], linkName[%s], %s call timedout", MessageReceiver.this.receivePath, link.getName(), "Open"), operationTimedout);
                    }
                    ExceptionUtil.completeExceptionally(MessageReceiver.this.linkOpen.getWork(), operationTimedout, MessageReceiver.this);
                }
            }
        }, timeout.remaining(), TimerType.OneTimeRun);
    }

    private void scheduleLinkCloseTimeout(TimeoutTracker timeout) {
        this.closeTimer = Timer.schedule(new Runnable(){

            /*
             * WARNING - Removed try catching itself - possible behaviour change.
             */
            @Override
            public void run() {
                if (!MessageReceiver.this.linkClose.isDone()) {
                    Receiver link;
                    Object object = MessageReceiver.this.errorConditionLock;
                    synchronized (object) {
                        link = MessageReceiver.this.receiveLink;
                    }
                    TimeoutException operationTimedout = new TimeoutException(String.format(Locale.US, "%s operation on Receive Link(%s) timed out at %s", "Close", link.getName(), ZonedDateTime.now()));
                    if (TRACE_LOGGER.isLoggable(Level.WARNING)) {
                        TRACE_LOGGER.log(Level.WARNING, String.format(Locale.US, "receiverPath[%s], linkName[%s], %s call timedout", MessageReceiver.this.receivePath, link.getName(), "Close"), operationTimedout);
                    }
                    ExceptionUtil.completeExceptionally(MessageReceiver.this.linkClose, operationTimedout, MessageReceiver.this);
                    MessageReceiver.this.onError(null);
                }
            }
        }, timeout.remaining(), TimerType.OneTimeRun);
    }

    @Override
    public void onClose(ErrorCondition condition) {
        Exception completionException = condition != null && condition.getCondition() != null ? ExceptionUtil.toException(condition) : null;
        this.onError(completionException);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public ErrorContext getContext() {
        boolean isLinkOpened;
        Receiver link;
        Object object = this.errorConditionLock;
        synchronized (object) {
            link = this.receiveLink;
        }
        boolean bl = isLinkOpened = this.linkOpen != null && this.linkOpen.getWork().isDone();
        String referenceId = link != null && link.getRemoteProperties() != null && link.getRemoteProperties().containsKey(ClientConstants.TRACKING_ID_PROPERTY) ? link.getRemoteProperties().get(ClientConstants.TRACKING_ID_PROPERTY).toString() : (link != null ? link.getName() : null);
        ReceiverContext errorContext = new ReceiverContext(this.underlyingFactory != null ? this.underlyingFactory.getHostName() : null, this.receivePath, referenceId, isLinkOpened ? Integer.valueOf(this.prefetchCount) : null, isLinkOpened && link != null ? Integer.valueOf(link.getCredit()) : null, isLinkOpened && this.prefetchedMessages != null ? Integer.valueOf(this.prefetchedMessages.size()) : null);
        return errorContext;
    }

    @Override
    protected CompletableFuture<Void> onClose() {
        if (!this.getIsClosed()) {
            try {
                this.activeClientTokenManager.cancel();
                this.scheduleLinkCloseTimeout(TimeoutTracker.create(this.operationTimeout));
                this.underlyingFactory.scheduleOnReactorThread(new DispatchHandler(){

                    @Override
                    public void onEvent() {
                        if (MessageReceiver.this.receiveLink != null && MessageReceiver.this.receiveLink.getLocalState() != EndpointState.CLOSED) {
                            MessageReceiver.this.receiveLink.close();
                        } else if (MessageReceiver.this.receiveLink == null || MessageReceiver.this.receiveLink.getRemoteState() == EndpointState.CLOSED) {
                            if (MessageReceiver.this.closeTimer != null) {
                                MessageReceiver.this.closeTimer.cancel(false);
                            }
                            MessageReceiver.this.linkClose.complete(null);
                        }
                    }
                });
            }
            catch (IOException ioException) {
                this.linkClose.completeExceptionally(new ServiceBusException(false, "Scheduling close failed with error. See cause for more details.", ioException));
            }
        }
        return this.linkClose;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    protected Exception getLastKnownError() {
        Object object = this.errorConditionLock;
        synchronized (object) {
            return this.lastKnownLinkError;
        }
    }

    private final class CreateAndReceive
    extends DispatchHandler {
        private CreateAndReceive() {
        }

        @Override
        public void onEvent() {
            MessageReceiver.this.receiveWork.onEvent();
            if (!(MessageReceiver.this.getIsClosingOrClosed() || MessageReceiver.this.receiveLink.getLocalState() != EndpointState.CLOSED && MessageReceiver.this.receiveLink.getRemoteState() != EndpointState.CLOSED)) {
                MessageReceiver.this.createReceiveLink();
            }
        }
    }

    private final class ReceiveWork
    extends DispatchHandler {
        private ReceiveWork() {
        }

        @Override
        public void onEvent() {
            ReceiveWorkItem pendingReceive;
            while (!MessageReceiver.this.prefetchedMessages.isEmpty() && (pendingReceive = (ReceiveWorkItem)MessageReceiver.this.pendingReceives.poll()) != null) {
                if (pendingReceive.getWork() == null || pendingReceive.getWork().isDone()) continue;
                List receivedMessages = MessageReceiver.this.receiveCore(pendingReceive.maxMessageCount);
                pendingReceive.getWork().complete(receivedMessages);
            }
        }
    }

    private static class ReceiveWorkItem
    extends WorkItem<Collection<Message>> {
        private final int maxMessageCount;

        public ReceiveWorkItem(CompletableFuture<Collection<Message>> completableFuture, Duration timeout, int maxMessageCount) {
            super(completableFuture, timeout);
            this.maxMessageCount = maxMessageCount;
        }
    }
}

