/*
 * Decompiled with CFR 0.152.
 */
package com.microsoft.azure.servicebus.primitives;

import com.microsoft.azure.servicebus.TransactionContext;
import com.microsoft.azure.servicebus.amqp.DispatchHandler;
import com.microsoft.azure.servicebus.amqp.IAmqpSender;
import com.microsoft.azure.servicebus.amqp.SendLinkHandler;
import com.microsoft.azure.servicebus.amqp.SessionHandler;
import com.microsoft.azure.servicebus.primitives.AsyncUtil;
import com.microsoft.azure.servicebus.primitives.ClientConstants;
import com.microsoft.azure.servicebus.primitives.ClientEntity;
import com.microsoft.azure.servicebus.primitives.CommonRequestResponseOperations;
import com.microsoft.azure.servicebus.primitives.ErrorContext;
import com.microsoft.azure.servicebus.primitives.ExceptionUtil;
import com.microsoft.azure.servicebus.primitives.IErrorContextProvider;
import com.microsoft.azure.servicebus.primitives.IteratorUtil;
import com.microsoft.azure.servicebus.primitives.MessagingEntityType;
import com.microsoft.azure.servicebus.primitives.MessagingFactory;
import com.microsoft.azure.servicebus.primitives.OperationCancelledException;
import com.microsoft.azure.servicebus.primitives.Pair;
import com.microsoft.azure.servicebus.primitives.PayloadSizeExceededException;
import com.microsoft.azure.servicebus.primitives.RequestResponseLink;
import com.microsoft.azure.servicebus.primitives.RequestResponseUtils;
import com.microsoft.azure.servicebus.primitives.RetryPolicy;
import com.microsoft.azure.servicebus.primitives.SendWorkItem;
import com.microsoft.azure.servicebus.primitives.SenderErrorContext;
import com.microsoft.azure.servicebus.primitives.SenderLinkSettings;
import com.microsoft.azure.servicebus.primitives.ServiceBusException;
import com.microsoft.azure.servicebus.primitives.StringUtil;
import com.microsoft.azure.servicebus.primitives.TimeoutException;
import com.microsoft.azure.servicebus.primitives.TimeoutTracker;
import com.microsoft.azure.servicebus.primitives.Timer;
import com.microsoft.azure.servicebus.primitives.TimerType;
import com.microsoft.azure.servicebus.primitives.Util;
import java.io.IOException;
import java.io.Serializable;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.time.Instant;
import java.time.ZonedDateTime;
import java.util.Arrays;
import java.util.Collection;
import java.util.Comparator;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.Locale;
import java.util.Map;
import java.util.PriorityQueue;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledFuture;
import org.apache.qpid.proton.Proton;
import org.apache.qpid.proton.amqp.Binary;
import org.apache.qpid.proton.amqp.Symbol;
import org.apache.qpid.proton.amqp.UnsignedInteger;
import org.apache.qpid.proton.amqp.messaging.Accepted;
import org.apache.qpid.proton.amqp.messaging.Data;
import org.apache.qpid.proton.amqp.messaging.Outcome;
import org.apache.qpid.proton.amqp.messaging.Rejected;
import org.apache.qpid.proton.amqp.messaging.Released;
import org.apache.qpid.proton.amqp.messaging.Section;
import org.apache.qpid.proton.amqp.messaging.Source;
import org.apache.qpid.proton.amqp.messaging.Target;
import org.apache.qpid.proton.amqp.transaction.Declared;
import org.apache.qpid.proton.amqp.transaction.TransactionalState;
import org.apache.qpid.proton.amqp.transport.DeliveryState;
import org.apache.qpid.proton.amqp.transport.ErrorCondition;
import org.apache.qpid.proton.amqp.transport.SenderSettleMode;
import org.apache.qpid.proton.engine.BaseHandler;
import org.apache.qpid.proton.engine.Connection;
import org.apache.qpid.proton.engine.Delivery;
import org.apache.qpid.proton.engine.EndpointState;
import org.apache.qpid.proton.engine.Extendable;
import org.apache.qpid.proton.engine.Handler;
import org.apache.qpid.proton.engine.Link;
import org.apache.qpid.proton.engine.Sender;
import org.apache.qpid.proton.engine.Session;
import org.apache.qpid.proton.message.Message;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class CoreMessageSender
extends ClientEntity
implements IAmqpSender,
IErrorContextProvider {
    private static final Logger TRACE_LOGGER = LoggerFactory.getLogger(CoreMessageSender.class);
    private static final String SEND_TIMED_OUT = "Send operation timed out";
    private static final Duration LINK_REOPEN_TIMEOUT = Duration.ofMinutes(5L);
    private final Object requestResonseLinkCreationLock = new Object();
    private final MessagingFactory underlyingFactory;
    private final String sendPath;
    private final String sasTokenAudienceURI;
    private final Duration operationTimeout;
    private final RetryPolicy retryPolicy;
    private final CompletableFuture<Void> linkClose;
    private final Object pendingSendLock;
    private final ConcurrentHashMap<String, SendWorkItem<DeliveryState>> pendingSendsData;
    private final PriorityQueue<WeightedDeliveryTag> pendingSends;
    private final DispatchHandler sendWork;
    private final MessagingEntityType entityType;
    private boolean isSendLoopRunning;
    private Sender sendLink;
    private RequestResponseLink requestResponseLink;
    private CompletableFuture<CoreMessageSender> linkFirstOpen;
    private int linkCredit;
    private Exception lastKnownLinkError;
    private Instant lastKnownErrorReportedAt;
    private ScheduledFuture<?> sasTokenRenewTimerFuture;
    private CompletableFuture<Void> requestResponseLinkCreationFuture;
    private CompletableFuture<Void> sendLinkReopenFuture;
    private SenderLinkSettings linkSettings;
    private String transferDestinationPath;
    private String transferSasTokenAudienceURI;
    private boolean isSendVia;
    private int maxMessageSize;
    private boolean shouldRetryLinkOpenIfConnectionIsClosedAfterCBSTokenSent = true;

    @Deprecated
    public static CompletableFuture<CoreMessageSender> create(MessagingFactory factory, String clientId, String senderPath, String transferDestinationPath) {
        return CoreMessageSender.create(factory, clientId, senderPath, transferDestinationPath, null);
    }

    public static CompletableFuture<CoreMessageSender> create(MessagingFactory factory, String clientId, String senderPath, String transferDestinationPath, MessagingEntityType entityType) {
        return CoreMessageSender.create(factory, clientId, entityType, CoreMessageSender.getDefaultLinkProperties(senderPath, transferDestinationPath, factory, entityType));
    }

    static CompletableFuture<CoreMessageSender> create(MessagingFactory factory, String clientId, MessagingEntityType entityType, SenderLinkSettings linkSettings) {
        TRACE_LOGGER.info("Creating core message sender to '{}'", (Object)linkSettings.linkPath);
        Connection connection = factory.getActiveConnectionCreateIfNecessary();
        String sendLinkNamePrefix = "Sender".concat("_").concat(StringUtil.getShortRandomString());
        linkSettings.linkName = !StringUtil.isNullOrEmpty(connection.getRemoteContainer()) ? sendLinkNamePrefix.concat("_").concat(connection.getRemoteContainer()) : sendLinkNamePrefix;
        final CoreMessageSender msgSender = new CoreMessageSender(factory, clientId, entityType, linkSettings);
        TimeoutTracker openLinkTracker = TimeoutTracker.create(factory.getOperationTimeout());
        msgSender.initializeLinkOpen(openLinkTracker);
        CompletableFuture<Object> authenticationFuture = null;
        authenticationFuture = linkSettings.requiresAuthentication ? msgSender.sendTokenAndSetRenewTimer(false) : CompletableFuture.completedFuture(null);
        authenticationFuture.handleAsync((v, sasTokenEx) -> {
            if (sasTokenEx != null) {
                Throwable cause = ExceptionUtil.extractAsyncCompletionCause(sasTokenEx);
                TRACE_LOGGER.info("Sending SAS Token to '{}' failed.", (Object)msgSender.sendPath, (Object)cause);
                msgSender.linkFirstOpen.completeExceptionally(cause);
            } else {
                try {
                    msgSender.underlyingFactory.scheduleOnReactorThread(new DispatchHandler(){

                        @Override
                        public void onEvent() {
                            msgSender.createSendLink(msgSender.linkSettings);
                        }
                    });
                }
                catch (IOException ioException) {
                    msgSender.cancelSASTokenRenewTimer();
                    msgSender.linkFirstOpen.completeExceptionally(new ServiceBusException(false, "Failed to create Sender, see cause for more details.", ioException));
                }
            }
            return null;
        }, (Executor)MessagingFactory.INTERNAL_THREAD_POOL);
        return msgSender.linkFirstOpen;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private CompletableFuture<Void> createRequestResponseLink() {
        Object object = this.requestResonseLinkCreationLock;
        synchronized (object) {
            if (this.requestResponseLinkCreationFuture == null) {
                this.requestResponseLinkCreationFuture = new CompletableFuture();
                this.underlyingFactory.obtainRequestResponseLinkAsync(this.sendPath, this.transferDestinationPath, this.entityType).handleAsync((rrlink, ex) -> {
                    if (ex == null) {
                        this.requestResponseLink = rrlink;
                        this.requestResponseLinkCreationFuture.complete(null);
                    } else {
                        Throwable cause = ExceptionUtil.extractAsyncCompletionCause(ex);
                        this.requestResponseLinkCreationFuture.completeExceptionally(cause);
                        Object object = this.requestResonseLinkCreationLock;
                        synchronized (object) {
                            this.requestResponseLinkCreationFuture = null;
                        }
                    }
                    return null;
                }, (Executor)MessagingFactory.INTERNAL_THREAD_POOL);
            }
            return this.requestResponseLinkCreationFuture;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void closeRequestResponseLink() {
        Object object = this.requestResonseLinkCreationLock;
        synchronized (object) {
            if (this.requestResponseLinkCreationFuture != null) {
                this.requestResponseLinkCreationFuture.thenRun(() -> {
                    this.underlyingFactory.releaseRequestResponseLink(this.sendPath, this.transferDestinationPath);
                    this.requestResponseLink = null;
                });
                this.requestResponseLinkCreationFuture = null;
            }
        }
    }

    private CoreMessageSender(MessagingFactory factory, String sendLinkName, MessagingEntityType entityType, SenderLinkSettings linkSettings) {
        super(sendLinkName);
        this.sendPath = linkSettings.linkPath;
        this.entityType = entityType;
        if (linkSettings.linkProperties != null) {
            String transferPath = linkSettings.linkProperties.getOrDefault(ClientConstants.LINK_TRANSFER_DESTINATION_PROPERTY, null);
            if (transferPath != null && !transferPath.isEmpty()) {
                this.transferDestinationPath = transferPath;
                this.isSendVia = true;
                this.transferSasTokenAudienceURI = String.format("amqp://%s/%s", factory.getHostName(), this.transferDestinationPath);
            } else {
                this.transferDestinationPath = null;
            }
        }
        this.sasTokenAudienceURI = String.format("amqp://%s/%s", factory.getHostName(), linkSettings.linkPath);
        this.underlyingFactory = factory;
        this.operationTimeout = factory.getOperationTimeout();
        this.linkSettings = linkSettings;
        this.lastKnownLinkError = null;
        this.lastKnownErrorReportedAt = Instant.EPOCH;
        this.retryPolicy = factory.getRetryPolicy();
        this.pendingSendLock = new Object();
        this.pendingSendsData = new ConcurrentHashMap();
        this.pendingSends = new PriorityQueue<WeightedDeliveryTag>(1000, new DeliveryTagComparator());
        this.linkCredit = 0;
        this.linkClose = new CompletableFuture();
        this.sendLinkReopenFuture = null;
        this.isSendLoopRunning = false;
        this.sendWork = new DispatchHandler(){

            @Override
            public void onEvent() {
                CoreMessageSender.this.processSendWork();
            }
        };
    }

    public String getSendPath() {
        return this.sendPath;
    }

    private static String generateRandomDeliveryTag() {
        return UUID.randomUUID().toString().replace("-", "");
    }

    CompletableFuture<DeliveryState> sendCoreAsync(byte[] bytes, int arrayOffset, int messageFormat, TransactionContext transaction) {
        this.throwIfClosed(this.lastKnownLinkError);
        TRACE_LOGGER.debug("Sending message to '{}'", (Object)this.sendPath);
        String deliveryTag = CoreMessageSender.generateRandomDeliveryTag();
        CompletableFuture<DeliveryState> onSendFuture = new CompletableFuture<DeliveryState>();
        SendWorkItem<DeliveryState> sendWorkItem = new SendWorkItem<DeliveryState>(bytes, arrayOffset, messageFormat, deliveryTag, transaction, onSendFuture, this.operationTimeout);
        this.enlistSendRequest(deliveryTag, sendWorkItem, false);
        this.scheduleSendTimeout(sendWorkItem);
        return onSendFuture;
    }

    private void scheduleSendTimeout(SendWorkItem<DeliveryState> sendWorkItem) {
        ScheduledFuture<?> timeoutTask = Timer.schedule(() -> {
            if (!sendWorkItem.getWork().isDone()) {
                TRACE_LOGGER.info("Delivery '{}' to '{}' did not receive ack from service. Throwing timeout.", (Object)sendWorkItem.getDeliveryTag(), (Object)this.sendPath);
                this.pendingSendsData.remove(sendWorkItem.getDeliveryTag());
                this.throwSenderTimeout(sendWorkItem.getWork(), sendWorkItem.getLastKnownException());
            }
        }, sendWorkItem.getTimeoutTracker().remaining(), TimerType.OneTimeRun);
        sendWorkItem.setTimeoutTask(timeoutTask);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void enlistSendRequest(String deliveryTag, SendWorkItem<DeliveryState> sendWorkItem, boolean isRetrySend) {
        Object object = this.pendingSendLock;
        synchronized (object) {
            this.pendingSendsData.put(deliveryTag, sendWorkItem);
            this.pendingSends.offer(new WeightedDeliveryTag(deliveryTag, isRetrySend ? 1 : 0));
            if (!this.isSendLoopRunning) {
                try {
                    this.underlyingFactory.scheduleOnReactorThread(this.sendWork);
                }
                catch (IOException ioException) {
                    AsyncUtil.completeFutureExceptionally(sendWorkItem.getWork(), new ServiceBusException(false, "Send failed while dispatching to Reactor, see cause for more details.", ioException));
                }
            }
        }
    }

    private void reSendAsync(String deliveryTag, SendWorkItem<DeliveryState> retryingSendWorkItem, boolean reuseDeliveryTag) {
        Duration remainingTime;
        if (!retryingSendWorkItem.getWork().isDone() && retryingSendWorkItem.cancelTimeoutTask(false) && !(remainingTime = retryingSendWorkItem.getTimeoutTracker().remaining()).isNegative() && !remainingTime.isZero()) {
            if (!reuseDeliveryTag) {
                deliveryTag = CoreMessageSender.generateRandomDeliveryTag();
                retryingSendWorkItem.setDeliveryTag(deliveryTag);
            }
            this.enlistSendRequest(deliveryTag, retryingSendWorkItem, true);
            this.scheduleSendTimeout(retryingSendWorkItem);
        }
    }

    public CompletableFuture<Void> sendAsync(Iterable<Message> messages, TransactionContext transaction) {
        if (messages == null || IteratorUtil.sizeEquals(messages, 0)) {
            throw new IllegalArgumentException("Sending Empty batch of messages is not allowed.");
        }
        TRACE_LOGGER.debug("Sending a batch of messages to '{}'", (Object)this.sendPath);
        Message firstMessage = messages.iterator().next();
        if (IteratorUtil.sizeEquals(messages, 1)) {
            return this.sendAsync(firstMessage, transaction);
        }
        Message batchMessage = Proton.message();
        batchMessage.setMessageAnnotations(firstMessage.getMessageAnnotations());
        byte[] bytes = null;
        int byteArrayOffset = 0;
        try {
            Pair<byte[], Integer> encodedPair = Util.encodeMessageToMaxSizeArray(batchMessage, this.maxMessageSize);
            bytes = encodedPair.getFirstItem();
            byteArrayOffset = encodedPair.getSecondItem();
            for (Message amqpMessage : messages) {
                Message messageWrappedByData = Proton.message();
                encodedPair = Util.encodeMessageToOptimalSizeArray(amqpMessage, this.maxMessageSize);
                messageWrappedByData.setBody((Section)new Data(new Binary(encodedPair.getFirstItem(), 0, encodedPair.getSecondItem().intValue())));
                int encodedSize = Util.encodeMessageToCustomArray(messageWrappedByData, bytes, byteArrayOffset, this.maxMessageSize - byteArrayOffset - 1);
                byteArrayOffset += encodedSize;
            }
        }
        catch (PayloadSizeExceededException ex) {
            TRACE_LOGGER.info("Payload size of batch of messages exceeded limit", (Throwable)ex);
            CompletableFuture<Void> sendTask = new CompletableFuture<Void>();
            sendTask.completeExceptionally(ex);
            return sendTask;
        }
        return this.sendCoreAsync(bytes, byteArrayOffset, -2147404032, transaction).thenAccept(x -> {});
    }

    public CompletableFuture<Void> sendAsync(Message msg, TransactionContext transaction) {
        return this.sendAndReturnDeliveryStateAsync(msg, transaction).thenAccept(x -> {});
    }

    CompletableFuture<DeliveryState> sendAndReturnDeliveryStateAsync(Message msg, TransactionContext transaction) {
        try {
            Pair<byte[], Integer> encodedPair = Util.encodeMessageToOptimalSizeArray(msg, this.maxMessageSize);
            return this.sendCoreAsync(encodedPair.getFirstItem(), encodedPair.getSecondItem(), 0, transaction);
        }
        catch (PayloadSizeExceededException exception) {
            TRACE_LOGGER.info("Payload size of message exceeded limit", (Throwable)exception);
            CompletableFuture<DeliveryState> sendTask = new CompletableFuture<DeliveryState>();
            sendTask.completeExceptionally(exception);
            return sendTask;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void onOpenComplete(Exception completionException) {
        this.shouldRetryLinkOpenIfConnectionIsClosedAfterCBSTokenSent = true;
        if (completionException == null) {
            this.maxMessageSize = Util.getMaxMessageSizeFromLink((Link)this.sendLink);
            this.lastKnownLinkError = null;
            this.retryPolicy.resetRetryCount(this.getClientId());
            if (this.sendLinkReopenFuture != null && !this.sendLinkReopenFuture.isDone()) {
                AsyncUtil.completeFuture(this.sendLinkReopenFuture, null);
            }
            if (!this.linkFirstOpen.isDone()) {
                TRACE_LOGGER.info("Opened send link to '{}'", (Object)this.sendPath);
                AsyncUtil.completeFuture(this.linkFirstOpen, this);
            } else {
                Object object = this.pendingSendLock;
                synchronized (object) {
                    if (!this.pendingSendsData.isEmpty()) {
                        LinkedList unacknowledgedSends = new LinkedList();
                        unacknowledgedSends.addAll(this.pendingSendsData.keySet());
                        if (unacknowledgedSends.size() > 0) {
                            for (String unacknowledgedSend : unacknowledgedSends) {
                                if (!this.pendingSendsData.get(unacknowledgedSend).isWaitingForAck()) continue;
                                this.pendingSends.offer(new WeightedDeliveryTag(unacknowledgedSend, 1));
                            }
                        }
                        unacknowledgedSends.clear();
                    }
                }
            }
        } else {
            this.cancelSASTokenRenewTimer();
            if (!this.linkFirstOpen.isDone()) {
                TRACE_LOGGER.info("Opening send link '{}' to '{}' failed", new Object[]{this.sendLink.getName(), this.sendPath, completionException});
                this.setClosed();
                ExceptionUtil.completeExceptionally(this.linkFirstOpen, completionException, this, true);
            }
            if (this.sendLinkReopenFuture != null && !this.sendLinkReopenFuture.isDone()) {
                TRACE_LOGGER.info("Opening send link '{}' to '{}' failed", new Object[]{this.sendLink.getName(), this.sendPath, completionException});
                AsyncUtil.completeFutureExceptionally(this.sendLinkReopenFuture, completionException);
            }
        }
    }

    @Override
    public void onClose(ErrorCondition condition) {
        Exception completionException = condition != null ? ExceptionUtil.toException(condition) : new ServiceBusException(true, "The entity has been closed due to transient failures (underlying link closed), please retry the operation.");
        this.onError(completionException);
    }

    @Override
    public void onError(Exception completionException) {
        this.linkCredit = 0;
        if (this.getIsClosingOrClosed()) {
            Exception failureException = completionException == null ? new OperationCancelledException("Send cancelled as the Sender instance is Closed before the sendOperation completed.") : completionException;
            this.clearAllPendingSendsWithException(failureException);
            TRACE_LOGGER.info("Send link to '{}' closed", (Object)this.sendPath);
            AsyncUtil.completeFuture(this.linkClose, null);
            return;
        }
        this.underlyingFactory.deregisterForConnectionError((Link)this.sendLink);
        this.lastKnownLinkError = completionException;
        this.lastKnownErrorReportedAt = Instant.now();
        this.onOpenComplete(completionException);
        if (!(completionException == null || completionException instanceof ServiceBusException && ((ServiceBusException)completionException).getIsTransient())) {
            TRACE_LOGGER.info("Send link '{}' to '{}' closed. Failing all pending send requests.", (Object)this.sendLink.getName(), (Object)this.sendPath);
            this.clearAllPendingSendsWithException(completionException);
        } else {
            Duration nextRetryInterval;
            TimeoutTracker tracker;
            Map.Entry<String, SendWorkItem<DeliveryState>> pendingSendEntry = IteratorUtil.getFirst(this.pendingSendsData.entrySet());
            if (pendingSendEntry != null && pendingSendEntry.getValue() != null && (tracker = pendingSendEntry.getValue().getTimeoutTracker()) != null && (nextRetryInterval = this.retryPolicy.getNextRetryInterval(this.getClientId(), completionException, tracker.remaining())) != null) {
                TRACE_LOGGER.info("Send link '{}' to '{}' closed. Will retry link creation after '{}'.", new Object[]{this.sendLink.getName(), this.sendPath, nextRetryInterval});
                Timer.schedule(() -> this.ensureLinkIsOpen(), nextRetryInterval, TimerType.OneTimeRun);
            }
        }
    }

    @Override
    public void onSendComplete(Delivery delivery) {
        DeliveryState outcome = delivery.getRemoteState();
        String deliveryTag = new String(delivery.getTag(), StandardCharsets.UTF_8);
        TRACE_LOGGER.debug("Received ack for delivery. path:{}, linkName:{}, deliveryTag:{}, outcome:{}", new Object[]{this.sendPath, this.sendLink.getName(), deliveryTag, outcome});
        SendWorkItem<DeliveryState> pendingSendWorkItem = this.pendingSendsData.remove(deliveryTag);
        if (pendingSendWorkItem != null) {
            if (outcome instanceof TransactionalState) {
                TRACE_LOGGER.trace("State of delivery is Transactional, retrieving outcome: {}", (Object)outcome);
                Outcome transactionalOutcome = ((TransactionalState)outcome).getOutcome();
                if (transactionalOutcome instanceof DeliveryState) {
                    outcome = (DeliveryState)transactionalOutcome;
                } else {
                    this.cleanupFailedSend(pendingSendWorkItem, new ServiceBusException(false, "Unknown delivery state: " + outcome.toString()));
                    return;
                }
            }
            if (outcome instanceof Accepted) {
                this.lastKnownLinkError = null;
                this.retryPolicy.resetRetryCount(this.getClientId());
                pendingSendWorkItem.cancelTimeoutTask(false);
                AsyncUtil.completeFuture(pendingSendWorkItem.getWork(), outcome);
            } else if (outcome instanceof Declared) {
                AsyncUtil.completeFuture(pendingSendWorkItem.getWork(), outcome);
            } else if (outcome instanceof Rejected) {
                Duration retryInterval;
                Rejected rejected = (Rejected)outcome;
                ErrorCondition error = rejected.getError();
                Exception exception = ExceptionUtil.toException(error);
                if (ExceptionUtil.isGeneralError(error.getCondition())) {
                    this.lastKnownLinkError = exception;
                    this.lastKnownErrorReportedAt = Instant.now();
                }
                if ((retryInterval = this.retryPolicy.getNextRetryInterval(this.getClientId(), exception, pendingSendWorkItem.getTimeoutTracker().remaining())) == null) {
                    this.cleanupFailedSend(pendingSendWorkItem, exception);
                } else {
                    TRACE_LOGGER.info("Send failed for delivery '{}'. Will retry after '{}'", (Object)deliveryTag, (Object)retryInterval);
                    pendingSendWorkItem.setLastKnownException(exception);
                    Timer.schedule(() -> this.reSendAsync(deliveryTag, pendingSendWorkItem, false), retryInterval, TimerType.OneTimeRun);
                }
            } else if (outcome instanceof Released) {
                this.cleanupFailedSend(pendingSendWorkItem, new OperationCancelledException(outcome.toString()));
            } else {
                this.cleanupFailedSend(pendingSendWorkItem, new ServiceBusException(false, outcome.toString()));
            }
        } else {
            TRACE_LOGGER.info("Delivery mismatch. path:{}, linkName:{}, delivery:{}", new Object[]{this.sendPath, this.sendLink.getName(), deliveryTag});
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void clearAllPendingSendsWithException(Throwable failureException) {
        Object object = this.pendingSendLock;
        synchronized (object) {
            for (Map.Entry<String, SendWorkItem<DeliveryState>> pendingSend : this.pendingSendsData.entrySet()) {
                this.cleanupFailedSend(pendingSend.getValue(), failureException);
            }
            this.pendingSendsData.clear();
            this.pendingSends.clear();
        }
    }

    private void cleanupFailedSend(SendWorkItem<DeliveryState> failedSend, Throwable exception) {
        failedSend.cancelTimeoutTask(false);
        ExceptionUtil.completeExceptionally(failedSend.getWork(), exception, this, true);
    }

    private static SenderLinkSettings getDefaultLinkProperties(String sendPath, String transferDestinationPath, MessagingFactory underlyingFactory, MessagingEntityType entityType) {
        SenderLinkSettings linkSettings = new SenderLinkSettings();
        linkSettings.linkPath = sendPath;
        Target target = new Target();
        target.setAddress(sendPath);
        linkSettings.target = target;
        linkSettings.source = new Source();
        linkSettings.settleMode = SenderSettleMode.UNSETTLED;
        linkSettings.requiresAuthentication = true;
        HashMap<Symbol, Object> linkProperties = new HashMap<Symbol, Object>();
        linkProperties.put(ClientConstants.LINK_TIMEOUT_PROPERTY, UnsignedInteger.valueOf((long)Util.adjustServerTimeout(underlyingFactory.getOperationTimeout()).toMillis()));
        if (entityType != null) {
            linkProperties.put(ClientConstants.ENTITY_TYPE_PROPERTY, entityType.getIntValue());
        }
        if (transferDestinationPath != null && !transferDestinationPath.isEmpty()) {
            linkProperties.put(ClientConstants.LINK_TRANSFER_DESTINATION_PROPERTY, transferDestinationPath);
        }
        linkSettings.linkProperties = linkProperties;
        return linkSettings;
    }

    private void createSendLink(SenderLinkSettings linkSettings) {
        TRACE_LOGGER.info("Creating send link to '{}'", (Object)this.sendPath);
        Connection connection = this.underlyingFactory.getActiveConnectionOrNothing();
        if (connection == null) {
            TRACE_LOGGER.warn("Idle connection closed by service just after sending CBS token. Very rare case. Will retry.");
            ServiceBusException exception = new ServiceBusException(true, "Idle connection closed by service just after sending CBS token. Please retry.");
            if (this.linkFirstOpen != null && !this.linkFirstOpen.isDone()) {
                AsyncUtil.completeFutureExceptionally(this.linkFirstOpen, exception);
            }
            if (this.sendLinkReopenFuture != null && !this.sendLinkReopenFuture.isDone()) {
                AsyncUtil.completeFutureExceptionally(this.sendLinkReopenFuture, exception);
                if (this.shouldRetryLinkOpenIfConnectionIsClosedAfterCBSTokenSent) {
                    this.shouldRetryLinkOpenIfConnectionIsClosedAfterCBSTokenSent = false;
                    Timer.schedule(() -> this.ensureLinkIsOpen(), Duration.ZERO, TimerType.OneTimeRun);
                }
            }
            return;
        }
        Session session = connection.session();
        session.setOutgoingWindow(Integer.MAX_VALUE);
        session.open();
        BaseHandler.setHandler((Extendable)session, (Handler)new SessionHandler(this.sendPath));
        Sender sender = session.sender(linkSettings.linkName);
        sender.setTarget(linkSettings.target);
        sender.setSource((org.apache.qpid.proton.amqp.transport.Source)linkSettings.source);
        sender.setProperties(linkSettings.linkProperties);
        TRACE_LOGGER.debug("Send link settle mode '{}'", (Object)linkSettings.settleMode);
        sender.setSenderSettleMode(linkSettings.settleMode);
        SendLinkHandler handler = new SendLinkHandler(this);
        BaseHandler.setHandler((Extendable)sender, (Handler)handler);
        sender.open();
        this.sendLink = sender;
        this.underlyingFactory.registerForConnectionError((Link)this.sendLink);
    }

    CompletableFuture<Void> sendTokenAndSetRenewTimer(boolean retryOnFailure) {
        if (this.getIsClosingOrClosed()) {
            return CompletableFuture.completedFuture(null);
        }
        CompletableFuture<ScheduledFuture<?>> sendTokenFuture = this.underlyingFactory.sendSecurityTokenAndSetRenewTimer(this.sasTokenAudienceURI, retryOnFailure, () -> this.sendTokenAndSetRenewTimer(true));
        CompletionStage sasTokenFuture = sendTokenFuture.thenAccept(f -> {
            this.sasTokenRenewTimerFuture = f;
        });
        if (this.transferDestinationPath != null && !this.transferDestinationPath.isEmpty()) {
            CompletableFuture<Void> transferSendTokenFuture = this.underlyingFactory.sendSecurityToken(this.transferSasTokenAudienceURI);
            return CompletableFuture.allOf(new CompletableFuture[]{sasTokenFuture, transferSendTokenFuture});
        }
        return sasTokenFuture;
    }

    private void cancelSASTokenRenewTimer() {
        if (this.sasTokenRenewTimerFuture != null && !this.sasTokenRenewTimerFuture.isDone()) {
            this.sasTokenRenewTimerFuture.cancel(true);
            TRACE_LOGGER.debug("Cancelled SAS Token renew timer");
        }
    }

    private void initializeLinkOpen(TimeoutTracker timeout) {
        this.linkFirstOpen = new CompletableFuture();
        Timer.schedule(() -> {
            if (!this.linkFirstOpen.isDone()) {
                TimeoutException operationTimedout = new TimeoutException(String.format(Locale.US, "Open operation on SendLink(%s) on Entity(%s) timed out at %s.", this.sendLink.getName(), this.getSendPath(), ZonedDateTime.now().toString()), (Throwable)(this.lastKnownErrorReportedAt.isAfter(Instant.now().minusSeconds(4L)) ? this.lastKnownLinkError : null));
                TRACE_LOGGER.info(((Throwable)operationTimedout).getMessage());
                ExceptionUtil.completeExceptionally(this.linkFirstOpen, operationTimedout, this, true);
                this.setClosing();
                this.closeInternals(false);
                this.setClosed();
            }
        }, timeout.remaining(), TimerType.OneTimeRun);
    }

    @Override
    public ErrorContext getContext() {
        boolean isLinkOpened;
        boolean bl = isLinkOpened = this.linkFirstOpen != null && this.linkFirstOpen.isDone();
        String referenceId = this.sendLink != null && this.sendLink.getRemoteProperties() != null && this.sendLink.getRemoteProperties().containsKey(ClientConstants.TRACKING_ID_PROPERTY) ? this.sendLink.getRemoteProperties().get(ClientConstants.TRACKING_ID_PROPERTY).toString() : (this.sendLink != null ? this.sendLink.getName() : null);
        SenderErrorContext errorContext = new SenderErrorContext(this.underlyingFactory != null ? this.underlyingFactory.getHostName() : null, this.sendPath, referenceId, isLinkOpened && this.sendLink != null ? Integer.valueOf(this.sendLink.getCredit()) : null);
        return errorContext;
    }

    @Override
    public void onFlow(int creditIssued) {
        this.lastKnownLinkError = null;
        if (creditIssued <= 0) {
            return;
        }
        TRACE_LOGGER.debug("Received flow frame. path:{}, linkName:{}, remoteLinkCredit:{}, pendingSendsWaitingForCredit:{}, pendingSendsWaitingDelivery:{}", new Object[]{this.sendPath, this.sendLink.getName(), creditIssued, this.pendingSends.size(), this.pendingSendsData.size() - this.pendingSends.size()});
        this.linkCredit += creditIssued;
        this.sendWork.onEvent();
    }

    private synchronized CompletableFuture<Void> ensureLinkIsOpen() {
        if (this.sendLink.getLocalState() != EndpointState.ACTIVE || this.sendLink.getRemoteState() != EndpointState.ACTIVE) {
            if (this.sendLinkReopenFuture == null || this.sendLinkReopenFuture.isDone()) {
                TRACE_LOGGER.info("Recreating send link to '{}'", (Object)this.sendPath);
                this.retryPolicy.incrementRetryCount(this.getClientId());
                CompletableFuture<Void> linkReopenFutureThatCanBeCancelled = this.sendLinkReopenFuture = new CompletableFuture();
                Timer.schedule(() -> {
                    if (!linkReopenFutureThatCanBeCancelled.isDone()) {
                        this.cancelSASTokenRenewTimer();
                        TimeoutException operationTimedout = new TimeoutException(String.format(Locale.US, "%s operation on SendLink(%s) to path(%s) timed out at %s.", "Open", this.sendLink.getName(), this.sendPath, ZonedDateTime.now()));
                        TRACE_LOGGER.info(((Throwable)operationTimedout).getMessage());
                        linkReopenFutureThatCanBeCancelled.completeExceptionally(operationTimedout);
                    }
                }, LINK_REOPEN_TIMEOUT, TimerType.OneTimeRun);
                this.cancelSASTokenRenewTimer();
                CompletableFuture<Object> authenticationFuture = null;
                authenticationFuture = this.linkSettings.requiresAuthentication ? this.sendTokenAndSetRenewTimer(false) : CompletableFuture.completedFuture(null);
                authenticationFuture.handleAsync((v, sendTokenEx) -> {
                    if (sendTokenEx != null) {
                        Throwable cause = ExceptionUtil.extractAsyncCompletionCause(sendTokenEx);
                        TRACE_LOGGER.info("Sending SAS Token to '{}' failed.", (Object)this.sendPath, (Object)cause);
                        this.sendLinkReopenFuture.completeExceptionally((Throwable)sendTokenEx);
                        this.clearAllPendingSendsWithException((Throwable)sendTokenEx);
                    } else {
                        try {
                            this.underlyingFactory.scheduleOnReactorThread(new DispatchHandler(){

                                @Override
                                public void onEvent() {
                                    CoreMessageSender.this.createSendLink(CoreMessageSender.this.linkSettings);
                                }
                            });
                        }
                        catch (IOException ioEx) {
                            this.sendLinkReopenFuture.completeExceptionally(ioEx);
                        }
                    }
                    return null;
                }, (Executor)MessagingFactory.INTERNAL_THREAD_POOL);
            }
            return this.sendLinkReopenFuture;
        }
        return CompletableFuture.completedFuture(null);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void processSendWork() {
        Object object = this.pendingSendLock;
        synchronized (object) {
            if (this.isSendLoopRunning) {
                return;
            }
            this.isSendLoopRunning = true;
        }
        TRACE_LOGGER.debug("Processing pending sends to '{}'. Available link credit '{}'", (Object)this.sendPath, (Object)this.linkCredit);
        try {
            if (!this.ensureLinkIsOpen().isDone()) {
                return;
            }
            Sender sendLinkCurrent = this.sendLink;
            while (sendLinkCurrent != null && sendLinkCurrent.getLocalState() == EndpointState.ACTIVE && sendLinkCurrent.getRemoteState() == EndpointState.ACTIVE && this.linkCredit > 0) {
                SendWorkItem<DeliveryState> sendData;
                WeightedDeliveryTag deliveryTag;
                Object object2 = this.pendingSendLock;
                synchronized (object2) {
                    deliveryTag = this.pendingSends.poll();
                    if (deliveryTag == null) {
                        TRACE_LOGGER.debug("There are no pending sends to '{}'.", (Object)this.sendPath);
                        this.isSendLoopRunning = false;
                        break;
                    }
                    sendData = this.pendingSendsData.get(deliveryTag.getDeliveryTag());
                    if (sendData == null) {
                        TRACE_LOGGER.debug("SendData not found for this delivery. path:{}, linkName:{}, deliveryTag:{}", new Object[]{this.sendPath, this.sendLink.getName(), deliveryTag});
                        continue;
                    }
                }
                if (sendData.getWork() != null && sendData.getWork().isDone()) {
                    this.pendingSendsData.remove(deliveryTag.getDeliveryTag());
                    continue;
                }
                Delivery delivery = null;
                boolean linkAdvance = false;
                int sentMsgSize = 0;
                Exception sendException = null;
                try {
                    delivery = sendLinkCurrent.delivery(deliveryTag.getDeliveryTag().getBytes(StandardCharsets.UTF_8));
                    delivery.setMessageFormat(sendData.getMessageFormat());
                    TransactionContext transaction = sendData.getTransaction();
                    if (transaction != TransactionContext.NULL_TXN) {
                        TransactionalState transactionalState = new TransactionalState();
                        transactionalState.setTxnId(new Binary(transaction.getTransactionId().array()));
                        delivery.disposition((DeliveryState)transactionalState);
                    }
                    TRACE_LOGGER.debug("Sending message delivery '{}' to '{}'", (Object)deliveryTag.getDeliveryTag(), (Object)this.sendPath);
                    sentMsgSize = sendLinkCurrent.send(sendData.getMessage(), 0, sendData.getEncodedMessageSize());
                    assert (sentMsgSize == sendData.getEncodedMessageSize()) : "Contract of the ProtonJ library for Sender.Send API changed";
                    linkAdvance = sendLinkCurrent.advance();
                }
                catch (Exception exception) {
                    sendException = exception;
                }
                if (linkAdvance) {
                    --this.linkCredit;
                    sendData.setWaitingForAck();
                    continue;
                }
                TRACE_LOGGER.info("Sendlink advance failed. path:{}, linkName:{}, deliveryTag:{}, sentMessageSize:{}, payloadActualSiz:{}", new Object[]{this.sendPath, this.sendLink.getName(), deliveryTag, sentMsgSize, sendData.getEncodedMessageSize()});
                if (delivery != null) {
                    delivery.free();
                }
                OperationCancelledException completionException = sendException != null ? new OperationCancelledException("Send operation failed. Please see cause for more details", (Throwable)sendException) : new OperationCancelledException(String.format(Locale.US, "Send operation failed while advancing delivery(tag: %s) on SendLink(path: %s).", this.sendPath, deliveryTag));
                AsyncUtil.completeFutureExceptionally(sendData.getWork(), completionException);
            }
        }
        finally {
            object = this.pendingSendLock;
            synchronized (object) {
                if (this.isSendLoopRunning) {
                    this.isSendLoopRunning = false;
                }
            }
        }
    }

    private void throwSenderTimeout(CompletableFuture<DeliveryState> pendingSendWork, Exception lastKnownException) {
        Exception cause = lastKnownException;
        if (lastKnownException == null && this.lastKnownLinkError != null) {
            cause = this.lastKnownErrorReportedAt.isAfter(Instant.now().minusMillis(this.operationTimeout.toMillis())) ? this.lastKnownLinkError : null;
        }
        boolean isClientSideTimeout = cause == null || !(cause instanceof ServiceBusException);
        ServiceBusException exception = isClientSideTimeout ? new TimeoutException(String.format(Locale.US, "%s %s %s.", SEND_TIMED_OUT, " at ", ZonedDateTime.now(), cause)) : (ServiceBusException)cause;
        TRACE_LOGGER.info("Send timed out", (Throwable)exception);
        ExceptionUtil.completeExceptionally(pendingSendWork, exception, this, true);
    }

    private void scheduleLinkCloseTimeout(TimeoutTracker timeout) {
        Timer.schedule(() -> {
            if (!this.linkClose.isDone()) {
                TimeoutException operationTimedout = new TimeoutException(String.format(Locale.US, "%s operation on Send Link(%s) timed out at %s", "Close", this.sendLink.getName(), ZonedDateTime.now()));
                TRACE_LOGGER.info(((Throwable)operationTimedout).getMessage());
                ExceptionUtil.completeExceptionally(this.linkClose, operationTimedout, this, true);
            }
        }, timeout.remaining(), TimerType.OneTimeRun);
    }

    @Override
    protected CompletableFuture<Void> onClose() {
        this.closeInternals(true);
        return this.linkClose;
    }

    private void closeInternals(final boolean waitForCloseCompletion) {
        if (!this.getIsClosed()) {
            if (this.sendLink != null && this.sendLink.getLocalState() != EndpointState.CLOSED) {
                try {
                    this.underlyingFactory.scheduleOnReactorThread(new DispatchHandler(){

                        @Override
                        public void onEvent() {
                            if (CoreMessageSender.this.sendLink != null && CoreMessageSender.this.sendLink.getLocalState() != EndpointState.CLOSED) {
                                TRACE_LOGGER.info("Closing send link to '{}'", (Object)CoreMessageSender.this.sendPath);
                                CoreMessageSender.this.underlyingFactory.deregisterForConnectionError((Link)CoreMessageSender.this.sendLink);
                                CoreMessageSender.this.sendLink.close();
                                if (waitForCloseCompletion) {
                                    CoreMessageSender.this.scheduleLinkCloseTimeout(TimeoutTracker.create(CoreMessageSender.this.operationTimeout));
                                } else {
                                    AsyncUtil.completeFuture(CoreMessageSender.this.linkClose, null);
                                }
                            }
                        }
                    });
                }
                catch (IOException e) {
                    AsyncUtil.completeFutureExceptionally(this.linkClose, e);
                }
            } else {
                AsyncUtil.completeFuture(this.linkClose, null);
            }
            this.cancelSASTokenRenewTimer();
            this.closeRequestResponseLink();
        }
    }

    public CompletableFuture<long[]> scheduleMessageAsync(Message[] messages, TransactionContext transaction, Duration timeout) {
        TRACE_LOGGER.debug("Sending '{}' scheduled message(s) to '{}'", (Object)messages.length, (Object)this.sendPath);
        return this.createRequestResponseLink().thenComposeAsync(v -> {
            HashMap requestBodyMap = new HashMap();
            LinkedList messageList = new LinkedList();
            for (Message message : messages) {
                Object viaPartitionKey;
                Object partitionKey;
                Pair<byte[], Integer> encodedPair;
                HashMap<String, Object> messageEntry = new HashMap<String, Object>();
                try {
                    encodedPair = Util.encodeMessageToOptimalSizeArray(message, this.maxMessageSize);
                }
                catch (PayloadSizeExceededException exception) {
                    TRACE_LOGGER.info("Payload size of message exceeded limit", (Throwable)exception);
                    CompletableFuture scheduleMessagesTask = new CompletableFuture();
                    scheduleMessagesTask.completeExceptionally(exception);
                    return scheduleMessagesTask;
                }
                messageEntry.put("message", new Binary(encodedPair.getFirstItem(), 0, encodedPair.getSecondItem().intValue()));
                messageEntry.put("message-id", message.getMessageId());
                String sessionId = message.getGroupId();
                if (!StringUtil.isNullOrEmpty(sessionId)) {
                    messageEntry.put("session-id", sessionId);
                }
                if ((partitionKey = message.getMessageAnnotations().getValue().get(Symbol.valueOf((String)"x-opt-partition-key"))) != null && !((String)partitionKey).isEmpty()) {
                    messageEntry.put("partition-key", partitionKey);
                }
                if ((viaPartitionKey = message.getMessageAnnotations().getValue().get(Symbol.valueOf((String)"x-opt-via-partition-key"))) != null && !((String)viaPartitionKey).isEmpty()) {
                    messageEntry.put("via-partition-key", viaPartitionKey);
                }
                messageList.add(messageEntry);
            }
            requestBodyMap.put("messages", messageList);
            Message requestMessage = RequestResponseUtils.createRequestMessageFromPropertyBag("com.microsoft:schedule-message", requestBodyMap, Util.adjustServerTimeout(timeout), this.sendLink.getName());
            CompletableFuture<Message> responseFuture = this.requestResponseLink.requestAysnc(requestMessage, transaction, timeout);
            return responseFuture.thenComposeAsync(responseMessage -> {
                CompletableFuture<long[]> returningFuture = new CompletableFuture<long[]>();
                int statusCode = RequestResponseUtils.getResponseStatusCode(responseMessage);
                if (statusCode == 200) {
                    long[] sequenceNumbers = (long[])RequestResponseUtils.getResponseBody(responseMessage).get("sequence-numbers");
                    if (TRACE_LOGGER.isDebugEnabled()) {
                        TRACE_LOGGER.debug("Scheduled messages sent. Received sequence numbers '{}'", (Object)Arrays.toString(sequenceNumbers));
                    }
                    returningFuture.complete(sequenceNumbers);
                } else {
                    Exception scheduleException = RequestResponseUtils.genereateExceptionFromResponse(responseMessage);
                    TRACE_LOGGER.info("Sending scheduled messages to '{}' failed.", (Object)this.sendPath, (Object)scheduleException);
                    returningFuture.completeExceptionally(scheduleException);
                }
                return returningFuture;
            }, (Executor)MessagingFactory.INTERNAL_THREAD_POOL);
        }, (Executor)MessagingFactory.INTERNAL_THREAD_POOL);
    }

    public CompletableFuture<Void> cancelScheduledMessageAsync(Long[] sequenceNumbers, Duration timeout) {
        if (TRACE_LOGGER.isDebugEnabled()) {
            TRACE_LOGGER.debug("Cancelling scheduled message(s) '{}' to '{}'", (Object)Arrays.toString((Object[])sequenceNumbers), (Object)this.sendPath);
        }
        return this.createRequestResponseLink().thenComposeAsync(v -> {
            HashMap<String, Long[]> requestBodyMap = new HashMap<String, Long[]>();
            requestBodyMap.put("sequence-numbers", sequenceNumbers);
            Message requestMessage = RequestResponseUtils.createRequestMessageFromPropertyBag("com.microsoft:cancel-scheduled-message", requestBodyMap, Util.adjustServerTimeout(timeout), this.sendLink.getName());
            CompletableFuture<Message> responseFuture = this.requestResponseLink.requestAysnc(requestMessage, TransactionContext.NULL_TXN, timeout);
            return responseFuture.thenComposeAsync(responseMessage -> {
                CompletableFuture returningFuture = new CompletableFuture();
                int statusCode = RequestResponseUtils.getResponseStatusCode(responseMessage);
                if (statusCode == 200) {
                    TRACE_LOGGER.debug("Cancelled scheduled messages in '{}'", (Object)this.sendPath);
                    returningFuture.complete(null);
                } else {
                    Exception failureException = RequestResponseUtils.genereateExceptionFromResponse(responseMessage);
                    TRACE_LOGGER.info("Cancelling scheduled messages in '{}' failed.", (Object)this.sendPath, (Object)failureException);
                    returningFuture.completeExceptionally(failureException);
                }
                return returningFuture;
            }, (Executor)MessagingFactory.INTERNAL_THREAD_POOL);
        }, (Executor)MessagingFactory.INTERNAL_THREAD_POOL);
    }

    public CompletableFuture<Collection<Message>> peekMessagesAsync(long fromSequenceNumber, int messageCount) {
        TRACE_LOGGER.debug("Peeking '{}' messages in '{}' from sequence number '{}'", new Object[]{messageCount, this.sendPath, fromSequenceNumber});
        return this.createRequestResponseLink().thenComposeAsync(v -> CommonRequestResponseOperations.peekMessagesAsync(this.requestResponseLink, this.operationTimeout, fromSequenceNumber, messageCount, null, this.sendLink.getName()), (Executor)MessagingFactory.INTERNAL_THREAD_POOL);
    }

    private static class DeliveryTagComparator
    implements Comparator<WeightedDeliveryTag>,
    Serializable {
        private static final long serialVersionUID = -7057500582037295636L;

        private DeliveryTagComparator() {
        }

        @Override
        public int compare(WeightedDeliveryTag deliveryTag0, WeightedDeliveryTag deliveryTag1) {
            return deliveryTag1.getPriority() - deliveryTag0.getPriority();
        }
    }

    private static class WeightedDeliveryTag {
        private final String deliveryTag;
        private final int priority;

        WeightedDeliveryTag(String deliveryTag, int priority) {
            this.deliveryTag = deliveryTag;
            this.priority = priority;
        }

        public String getDeliveryTag() {
            return this.deliveryTag;
        }

        public int getPriority() {
            return this.priority;
        }
    }
}

