/*
 * Decompiled with CFR 0.152.
 */
package com.azure.messaging.servicebus;

import com.azure.core.amqp.AmqpRetryOptions;
import com.azure.core.amqp.AmqpRetryPolicy;
import com.azure.core.amqp.AmqpTransaction;
import com.azure.core.amqp.exception.AmqpException;
import com.azure.core.amqp.implementation.MessageSerializer;
import com.azure.core.amqp.implementation.RequestResponseChannelClosedException;
import com.azure.core.amqp.implementation.RetryUtil;
import com.azure.core.amqp.implementation.StringUtil;
import com.azure.core.amqp.implementation.TracerProvider;
import com.azure.core.annotation.ServiceClient;
import com.azure.core.util.BinaryData;
import com.azure.core.util.CoreUtils;
import com.azure.core.util.FluxUtil;
import com.azure.core.util.logging.ClientLogger;
import com.azure.messaging.servicebus.FluxAutoComplete;
import com.azure.messaging.servicebus.FluxAutoLockRenew;
import com.azure.messaging.servicebus.LockRenewalOperation;
import com.azure.messaging.servicebus.ReceiverOptions;
import com.azure.messaging.servicebus.ServiceBusAsyncConsumer;
import com.azure.messaging.servicebus.ServiceBusClientBuilder;
import com.azure.messaging.servicebus.ServiceBusErrorSource;
import com.azure.messaging.servicebus.ServiceBusException;
import com.azure.messaging.servicebus.ServiceBusMessageContext;
import com.azure.messaging.servicebus.ServiceBusReceivedMessage;
import com.azure.messaging.servicebus.ServiceBusSessionManager;
import com.azure.messaging.servicebus.ServiceBusTransactionContext;
import com.azure.messaging.servicebus.implementation.DispositionStatus;
import com.azure.messaging.servicebus.implementation.LockContainer;
import com.azure.messaging.servicebus.implementation.Messages;
import com.azure.messaging.servicebus.implementation.MessagingEntityType;
import com.azure.messaging.servicebus.implementation.ServiceBusConnectionProcessor;
import com.azure.messaging.servicebus.implementation.ServiceBusReceiveLinkProcessor;
import com.azure.messaging.servicebus.models.AbandonOptions;
import com.azure.messaging.servicebus.models.CompleteOptions;
import com.azure.messaging.servicebus.models.DeadLetterOptions;
import com.azure.messaging.servicebus.models.DeferOptions;
import com.azure.messaging.servicebus.models.ServiceBusReceiveMode;
import java.time.Duration;
import java.time.OffsetDateTime;
import java.util.Collections;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

@ServiceClient(builder=ServiceBusClientBuilder.class, isAsync=true)
public final class ServiceBusReceiverAsyncClient
implements AutoCloseable {
    private static final DeadLetterOptions DEFAULT_DEAD_LETTER_OPTIONS = new DeadLetterOptions();
    private static final String TRANSACTION_LINK_NAME = "coordinator";
    private static final ClientLogger LOGGER = new ClientLogger(ServiceBusReceiverAsyncClient.class);
    private final LockContainer<LockRenewalOperation> renewalContainer;
    private final AtomicBoolean isDisposed = new AtomicBoolean();
    private final LockContainer<OffsetDateTime> managementNodeLocks;
    private final String fullyQualifiedNamespace;
    private final String entityPath;
    private final MessagingEntityType entityType;
    private final ReceiverOptions receiverOptions;
    private final ServiceBusConnectionProcessor connectionProcessor;
    private final TracerProvider tracerProvider;
    private final MessageSerializer messageSerializer;
    private final Runnable onClientClose;
    private final ServiceBusSessionManager sessionManager;
    private final Semaphore completionLock = new Semaphore(1);
    private final AtomicLong lastPeekedSequenceNumber = new AtomicLong(-1L);
    private final AtomicReference<ServiceBusAsyncConsumer> consumer = new AtomicReference();

    ServiceBusReceiverAsyncClient(String fullyQualifiedNamespace, String entityPath, MessagingEntityType entityType, ReceiverOptions receiverOptions, ServiceBusConnectionProcessor connectionProcessor, Duration cleanupInterval, TracerProvider tracerProvider, MessageSerializer messageSerializer, Runnable onClientClose) {
        this.fullyQualifiedNamespace = Objects.requireNonNull(fullyQualifiedNamespace, "'fullyQualifiedNamespace' cannot be null.");
        this.entityPath = Objects.requireNonNull(entityPath, "'entityPath' cannot be null.");
        this.entityType = Objects.requireNonNull(entityType, "'entityType' cannot be null.");
        this.receiverOptions = Objects.requireNonNull(receiverOptions, "'receiveOptions cannot be null.'");
        this.connectionProcessor = Objects.requireNonNull(connectionProcessor, "'connectionProcessor' cannot be null.");
        this.tracerProvider = Objects.requireNonNull(tracerProvider, "'tracerProvider' cannot be null.");
        this.messageSerializer = Objects.requireNonNull(messageSerializer, "'messageSerializer' cannot be null.");
        this.onClientClose = Objects.requireNonNull(onClientClose, "'onClientClose' cannot be null.");
        this.managementNodeLocks = new LockContainer(cleanupInterval);
        this.renewalContainer = new LockContainer<LockRenewalOperation>(Duration.ofMinutes(2L), renewal -> {
            LOGGER.atVerbose().addKeyValue("lockToken", renewal.getLockToken()).addKeyValue("status", (Object)renewal.getStatus()).log("Closing expired renewal operation.", new Object[]{renewal.getThrowable()});
            renewal.close();
        });
        this.sessionManager = null;
    }

    ServiceBusReceiverAsyncClient(String fullyQualifiedNamespace, String entityPath, MessagingEntityType entityType, ReceiverOptions receiverOptions, ServiceBusConnectionProcessor connectionProcessor, Duration cleanupInterval, TracerProvider tracerProvider, MessageSerializer messageSerializer, Runnable onClientClose, ServiceBusSessionManager sessionManager) {
        this.fullyQualifiedNamespace = Objects.requireNonNull(fullyQualifiedNamespace, "'fullyQualifiedNamespace' cannot be null.");
        this.entityPath = Objects.requireNonNull(entityPath, "'entityPath' cannot be null.");
        this.entityType = Objects.requireNonNull(entityType, "'entityType' cannot be null.");
        this.receiverOptions = Objects.requireNonNull(receiverOptions, "'receiveOptions cannot be null.'");
        this.connectionProcessor = Objects.requireNonNull(connectionProcessor, "'connectionProcessor' cannot be null.");
        this.tracerProvider = Objects.requireNonNull(tracerProvider, "'tracerProvider' cannot be null.");
        this.messageSerializer = Objects.requireNonNull(messageSerializer, "'messageSerializer' cannot be null.");
        this.onClientClose = Objects.requireNonNull(onClientClose, "'onClientClose' cannot be null.");
        this.sessionManager = Objects.requireNonNull(sessionManager, "'sessionManager' cannot be null.");
        this.managementNodeLocks = new LockContainer(cleanupInterval);
        this.renewalContainer = new LockContainer<LockRenewalOperation>(Duration.ofMinutes(2L), renewal -> {
            LOGGER.atInfo().addKeyValue("sessionId", renewal.getSessionId()).addKeyValue("status", (Object)renewal.getStatus()).log("Closing expired renewal operation.", new Object[]{renewal.getThrowable()});
            renewal.close();
        });
    }

    public String getFullyQualifiedNamespace() {
        return this.fullyQualifiedNamespace;
    }

    public String getEntityPath() {
        return this.entityPath;
    }

    public String getSessionId() {
        return this.receiverOptions.getSessionId();
    }

    public Mono<Void> abandon(ServiceBusReceivedMessage message) {
        return this.updateDisposition(message, DispositionStatus.ABANDONED, null, null, null, null);
    }

    public Mono<Void> abandon(ServiceBusReceivedMessage message, AbandonOptions options) {
        if (Objects.isNull(options)) {
            return FluxUtil.monoError((ClientLogger)LOGGER, (RuntimeException)new NullPointerException("'settlementOptions' cannot be null."));
        }
        if (!Objects.isNull(options.getTransactionContext()) && Objects.isNull(options.getTransactionContext().getTransactionId())) {
            return FluxUtil.monoError((ClientLogger)LOGGER, (RuntimeException)new NullPointerException("'options.transactionContext.transactionId' cannot be null."));
        }
        return this.updateDisposition(message, DispositionStatus.ABANDONED, null, null, options.getPropertiesToModify(), options.getTransactionContext());
    }

    public Mono<Void> complete(ServiceBusReceivedMessage message) {
        return this.updateDisposition(message, DispositionStatus.COMPLETED, null, null, null, null);
    }

    public Mono<Void> complete(ServiceBusReceivedMessage message, CompleteOptions options) {
        if (Objects.isNull(options)) {
            return FluxUtil.monoError((ClientLogger)LOGGER, (RuntimeException)new NullPointerException("'options' cannot be null."));
        }
        if (!Objects.isNull(options.getTransactionContext()) && Objects.isNull(options.getTransactionContext().getTransactionId())) {
            return FluxUtil.monoError((ClientLogger)LOGGER, (RuntimeException)new NullPointerException("'options.transactionContext.transactionId' cannot be null."));
        }
        return this.updateDisposition(message, DispositionStatus.COMPLETED, null, null, null, options.getTransactionContext());
    }

    public Mono<Void> defer(ServiceBusReceivedMessage message) {
        return this.updateDisposition(message, DispositionStatus.DEFERRED, null, null, null, null);
    }

    public Mono<Void> defer(ServiceBusReceivedMessage message, DeferOptions options) {
        if (Objects.isNull(options)) {
            return FluxUtil.monoError((ClientLogger)LOGGER, (RuntimeException)new NullPointerException("'options' cannot be null."));
        }
        if (!Objects.isNull(options.getTransactionContext()) && Objects.isNull(options.getTransactionContext().getTransactionId())) {
            return FluxUtil.monoError((ClientLogger)LOGGER, (RuntimeException)new NullPointerException("'options.transactionContext.transactionId' cannot be null."));
        }
        return this.updateDisposition(message, DispositionStatus.DEFERRED, null, null, options.getPropertiesToModify(), options.getTransactionContext());
    }

    public Mono<Void> deadLetter(ServiceBusReceivedMessage message) {
        return this.deadLetter(message, DEFAULT_DEAD_LETTER_OPTIONS);
    }

    public Mono<Void> deadLetter(ServiceBusReceivedMessage message, DeadLetterOptions options) {
        if (Objects.isNull(options)) {
            return FluxUtil.monoError((ClientLogger)LOGGER, (RuntimeException)new NullPointerException("'options' cannot be null."));
        }
        if (!Objects.isNull(options.getTransactionContext()) && Objects.isNull(options.getTransactionContext().getTransactionId())) {
            return FluxUtil.monoError((ClientLogger)LOGGER, (RuntimeException)new NullPointerException("'options.transactionContext.transactionId' cannot be null."));
        }
        return this.updateDisposition(message, DispositionStatus.SUSPENDED, options.getDeadLetterReason(), options.getDeadLetterErrorDescription(), options.getPropertiesToModify(), options.getTransactionContext());
    }

    public Mono<byte[]> getSessionState() {
        return this.getSessionState(this.receiverOptions.getSessionId());
    }

    public Mono<ServiceBusReceivedMessage> peekMessage() {
        return this.peekMessage(this.receiverOptions.getSessionId());
    }

    Mono<ServiceBusReceivedMessage> peekMessage(String sessionId) {
        if (this.isDisposed.get()) {
            return FluxUtil.monoError((ClientLogger)LOGGER, (RuntimeException)new IllegalStateException(String.format(Messages.INVALID_OPERATION_DISPOSED_RECEIVER, "peek")));
        }
        return this.connectionProcessor.flatMap(connection -> connection.getManagementNode(this.entityPath, this.entityType)).flatMap(channel -> {
            long sequence = this.lastPeekedSequenceNumber.get() + 1L;
            LOGGER.atVerbose().addKeyValue("sequenceNumber", sequence).log("Peek message.");
            return channel.peek(sequence, sessionId, this.getLinkName(sessionId));
        }).handle((message, sink) -> {
            long current = this.lastPeekedSequenceNumber.updateAndGet(value -> Math.max(value, message.getSequenceNumber()));
            LOGGER.atVerbose().addKeyValue("sequenceNumber", current).log("Updating last peeked sequence number.");
            sink.next(message);
        });
    }

    public Mono<ServiceBusReceivedMessage> peekMessage(long sequenceNumber) {
        return this.peekMessage(sequenceNumber, this.receiverOptions.getSessionId());
    }

    Mono<ServiceBusReceivedMessage> peekMessage(long sequenceNumber, String sessionId) {
        if (this.isDisposed.get()) {
            return FluxUtil.monoError((ClientLogger)LOGGER, (RuntimeException)new IllegalStateException(String.format(Messages.INVALID_OPERATION_DISPOSED_RECEIVER, "peekAt")));
        }
        return this.connectionProcessor.flatMap(connection -> connection.getManagementNode(this.entityPath, this.entityType)).flatMap(node -> node.peek(sequenceNumber, sessionId, this.getLinkName(sessionId)));
    }

    public Flux<ServiceBusReceivedMessage> peekMessages(int maxMessages) {
        return this.peekMessages(maxMessages, this.receiverOptions.getSessionId());
    }

    Flux<ServiceBusReceivedMessage> peekMessages(int maxMessages, String sessionId) {
        if (this.isDisposed.get()) {
            return FluxUtil.fluxError((ClientLogger)LOGGER, (RuntimeException)new IllegalStateException(String.format(Messages.INVALID_OPERATION_DISPOSED_RECEIVER, "peekBatch")));
        }
        return this.connectionProcessor.flatMap(connection -> connection.getManagementNode(this.entityPath, this.entityType)).flatMapMany(node -> {
            long nextSequenceNumber = this.lastPeekedSequenceNumber.get() + 1L;
            LOGGER.atVerbose().addKeyValue("sequenceNumber", nextSequenceNumber).log("Peek batch.");
            Flux<ServiceBusReceivedMessage> messages = node.peek(nextSequenceNumber, sessionId, this.getLinkName(sessionId), maxMessages);
            Mono handle = messages.switchIfEmpty((Publisher)Mono.fromCallable(() -> {
                ServiceBusReceivedMessage emptyMessage = new ServiceBusReceivedMessage(BinaryData.fromBytes((byte[])new byte[0]));
                emptyMessage.setSequenceNumber(this.lastPeekedSequenceNumber.get());
                return emptyMessage;
            })).last().handle((last, sink) -> {
                long current = this.lastPeekedSequenceNumber.updateAndGet(value -> Math.max(value, last.getSequenceNumber()));
                LOGGER.atVerbose().addKeyValue("sequenceNumber", current).log("Last peeked sequence number in batch.");
                sink.complete();
            });
            return Flux.merge((Publisher[])new Publisher[]{messages, handle});
        });
    }

    public Flux<ServiceBusReceivedMessage> peekMessages(int maxMessages, long sequenceNumber) {
        return this.peekMessages(maxMessages, sequenceNumber, this.receiverOptions.getSessionId());
    }

    Flux<ServiceBusReceivedMessage> peekMessages(int maxMessages, long sequenceNumber, String sessionId) {
        if (this.isDisposed.get()) {
            return FluxUtil.fluxError((ClientLogger)LOGGER, (RuntimeException)new IllegalStateException(String.format(Messages.INVALID_OPERATION_DISPOSED_RECEIVER, "peekBatchAt")));
        }
        return this.connectionProcessor.flatMap(connection -> connection.getManagementNode(this.entityPath, this.entityType)).flatMapMany(node -> node.peek(sequenceNumber, sessionId, this.getLinkName(sessionId), maxMessages));
    }

    public Flux<ServiceBusReceivedMessage> receiveMessages() {
        if (this.isDisposed.get()) {
            return FluxUtil.fluxError((ClientLogger)LOGGER, (RuntimeException)new IllegalStateException(String.format(Messages.INVALID_OPERATION_DISPOSED_RECEIVER, "receiveMessages")));
        }
        return this.receiveMessagesNoBackPressure().limitRate(1, 0);
    }

    Flux<ServiceBusReceivedMessage> receiveMessagesNoBackPressure() {
        return this.receiveMessagesWithContext(0).handle((serviceBusMessageContext, sink) -> {
            if (serviceBusMessageContext.hasError()) {
                sink.error(serviceBusMessageContext.getThrowable());
                return;
            }
            sink.next((Object)serviceBusMessageContext.getMessage());
        });
    }

    Flux<ServiceBusMessageContext> receiveMessagesWithContext() {
        return this.receiveMessagesWithContext(1);
    }

    Flux<ServiceBusMessageContext> receiveMessagesWithContext(int highTide) {
        Flux messageFlux = this.sessionManager != null ? this.sessionManager.receive() : this.getOrCreateConsumer().receive().map(ServiceBusMessageContext::new);
        Object withAutoLockRenewal = !this.receiverOptions.isSessionReceiver() && this.receiverOptions.isAutoLockRenewEnabled() ? new FluxAutoLockRenew((Flux<? extends ServiceBusMessageContext>)messageFlux, this.receiverOptions, this.renewalContainer, this::renewMessageLock) : messageFlux;
        Object result = this.receiverOptions.isEnableAutoComplete() ? new FluxAutoComplete((Flux<? extends ServiceBusMessageContext>)withAutoLockRenewal, this.completionLock, context -> context.getMessage() != null ? this.complete(context.getMessage()) : Mono.empty(), context -> context.getMessage() != null ? this.abandon(context.getMessage()) : Mono.empty()) : withAutoLockRenewal;
        if (highTide > 0) {
            result = result.limitRate(highTide, 0);
        }
        return result.onErrorMap(throwable -> this.mapError((Throwable)throwable, ServiceBusErrorSource.RECEIVE));
    }

    public Mono<ServiceBusReceivedMessage> receiveDeferredMessage(long sequenceNumber) {
        if (this.isDisposed.get()) {
            return FluxUtil.monoError((ClientLogger)LOGGER, (RuntimeException)new IllegalStateException(String.format(Messages.INVALID_OPERATION_DISPOSED_RECEIVER, "receiveDeferredMessage")));
        }
        return this.receiveDeferredMessage(sequenceNumber, this.receiverOptions.getSessionId());
    }

    Mono<ServiceBusReceivedMessage> receiveDeferredMessage(long sequenceNumber, String sessionId) {
        if (this.isDisposed.get()) {
            return FluxUtil.monoError((ClientLogger)LOGGER, (RuntimeException)new IllegalStateException(String.format(Messages.INVALID_OPERATION_DISPOSED_RECEIVER, "receiveDeferredMessage")));
        }
        return this.connectionProcessor.flatMap(connection -> connection.getManagementNode(this.entityPath, this.entityType)).flatMap(node -> node.receiveDeferredMessages(this.receiverOptions.getReceiveMode(), sessionId, this.getLinkName(sessionId), Collections.singleton(sequenceNumber)).last()).map(receivedMessage -> {
            if (CoreUtils.isNullOrEmpty((CharSequence)receivedMessage.getLockToken())) {
                return receivedMessage;
            }
            if (this.receiverOptions.getReceiveMode() == ServiceBusReceiveMode.PEEK_LOCK) {
                receivedMessage.setLockedUntil(this.managementNodeLocks.addOrUpdate(receivedMessage.getLockToken(), receivedMessage.getLockedUntil(), receivedMessage.getLockedUntil()));
            }
            return receivedMessage;
        });
    }

    public Flux<ServiceBusReceivedMessage> receiveDeferredMessages(Iterable<Long> sequenceNumbers) {
        return this.receiveDeferredMessages(sequenceNumbers, this.receiverOptions.getSessionId());
    }

    Flux<ServiceBusReceivedMessage> receiveDeferredMessages(Iterable<Long> sequenceNumbers, String sessionId) {
        if (this.isDisposed.get()) {
            return FluxUtil.fluxError((ClientLogger)LOGGER, (RuntimeException)new IllegalStateException(String.format(Messages.INVALID_OPERATION_DISPOSED_RECEIVER, "receiveDeferredMessageBatch")));
        }
        return this.connectionProcessor.flatMap(connection -> connection.getManagementNode(this.entityPath, this.entityType)).flatMapMany(node -> node.receiveDeferredMessages(this.receiverOptions.getReceiveMode(), sessionId, this.getLinkName(sessionId), sequenceNumbers)).map(receivedMessage -> {
            if (CoreUtils.isNullOrEmpty((CharSequence)receivedMessage.getLockToken())) {
                return receivedMessage;
            }
            if (this.receiverOptions.getReceiveMode() == ServiceBusReceiveMode.PEEK_LOCK) {
                receivedMessage.setLockedUntil(this.managementNodeLocks.addOrUpdate(receivedMessage.getLockToken(), receivedMessage.getLockedUntil(), receivedMessage.getLockedUntil()));
            }
            return receivedMessage;
        });
    }

    Mono<Void> release(ServiceBusReceivedMessage message) {
        return this.updateDisposition(message, DispositionStatus.RELEASED, null, null, null, null);
    }

    public Mono<OffsetDateTime> renewMessageLock(ServiceBusReceivedMessage message) {
        if (this.isDisposed.get()) {
            return FluxUtil.monoError((ClientLogger)LOGGER, (RuntimeException)new IllegalStateException(String.format(Messages.INVALID_OPERATION_DISPOSED_RECEIVER, "renewMessageLock")));
        }
        if (Objects.isNull(message)) {
            return FluxUtil.monoError((ClientLogger)LOGGER, (RuntimeException)new NullPointerException("'message' cannot be null."));
        }
        if (Objects.isNull(message.getLockToken())) {
            return FluxUtil.monoError((ClientLogger)LOGGER, (RuntimeException)new NullPointerException("'message.getLockToken()' cannot be null."));
        }
        if (message.getLockToken().isEmpty()) {
            return FluxUtil.monoError((ClientLogger)LOGGER, (RuntimeException)new IllegalArgumentException("'message.getLockToken()' cannot be empty."));
        }
        if (this.receiverOptions.isSessionReceiver()) {
            String errorMessage = "Renewing message lock is an invalid operation when working with sessions.";
            return FluxUtil.monoError((ClientLogger)LOGGER, (RuntimeException)new IllegalStateException("Renewing message lock is an invalid operation when working with sessions."));
        }
        return this.renewMessageLock(message.getLockToken()).onErrorMap(throwable -> this.mapError((Throwable)throwable, ServiceBusErrorSource.RENEW_LOCK));
    }

    Mono<OffsetDateTime> renewMessageLock(String lockToken) {
        if (this.isDisposed.get()) {
            return FluxUtil.monoError((ClientLogger)LOGGER, (RuntimeException)new IllegalStateException(String.format(Messages.INVALID_OPERATION_DISPOSED_RECEIVER, "renewMessageLock")));
        }
        return this.connectionProcessor.flatMap(connection -> connection.getManagementNode(this.entityPath, this.entityType)).flatMap(serviceBusManagementNode -> serviceBusManagementNode.renewMessageLock(lockToken, this.getLinkName(null))).map(offsetDateTime -> this.managementNodeLocks.addOrUpdate(lockToken, (OffsetDateTime)offsetDateTime, (OffsetDateTime)offsetDateTime));
    }

    public Mono<Void> renewMessageLock(ServiceBusReceivedMessage message, Duration maxLockRenewalDuration) {
        if (this.isDisposed.get()) {
            return FluxUtil.monoError((ClientLogger)LOGGER, (RuntimeException)new IllegalStateException(String.format(Messages.INVALID_OPERATION_DISPOSED_RECEIVER, "getAutoRenewMessageLock")));
        }
        if (Objects.isNull(message)) {
            return FluxUtil.monoError((ClientLogger)LOGGER, (RuntimeException)new NullPointerException("'message' cannot be null."));
        }
        if (Objects.isNull(message.getLockToken())) {
            return FluxUtil.monoError((ClientLogger)LOGGER, (RuntimeException)new NullPointerException("'message.getLockToken()' cannot be null."));
        }
        if (message.getLockToken().isEmpty()) {
            return FluxUtil.monoError((ClientLogger)LOGGER, (RuntimeException)new IllegalArgumentException("'message.getLockToken()' cannot be empty."));
        }
        if (this.receiverOptions.isSessionReceiver()) {
            return FluxUtil.monoError((ClientLogger)LOGGER, (RuntimeException)new IllegalStateException(String.format("Cannot renew message lock [%s] for a session receiver.", message.getLockToken())));
        }
        if (maxLockRenewalDuration == null) {
            return FluxUtil.monoError((ClientLogger)LOGGER, (RuntimeException)new NullPointerException("'maxLockRenewalDuration' cannot be null."));
        }
        if (maxLockRenewalDuration.isNegative()) {
            return FluxUtil.monoError((ClientLogger)LOGGER, (RuntimeException)new IllegalArgumentException("'maxLockRenewalDuration' cannot be negative."));
        }
        LockRenewalOperation operation = new LockRenewalOperation(message.getLockToken(), maxLockRenewalDuration, false, ignored -> this.renewMessageLock(message));
        this.renewalContainer.addOrUpdate(message.getLockToken(), OffsetDateTime.now().plus(maxLockRenewalDuration), operation);
        return operation.getCompletionOperation().onErrorMap(throwable -> this.mapError((Throwable)throwable, ServiceBusErrorSource.RENEW_LOCK));
    }

    public Mono<OffsetDateTime> renewSessionLock() {
        return this.renewSessionLock(this.receiverOptions.getSessionId());
    }

    public Mono<Void> renewSessionLock(Duration maxLockRenewalDuration) {
        return this.renewSessionLock(this.receiverOptions.getSessionId(), maxLockRenewalDuration);
    }

    public Mono<Void> setSessionState(byte[] sessionState) {
        return this.setSessionState(this.receiverOptions.getSessionId(), sessionState);
    }

    public Mono<ServiceBusTransactionContext> createTransaction() {
        if (this.isDisposed.get()) {
            return FluxUtil.monoError((ClientLogger)LOGGER, (RuntimeException)new IllegalStateException(String.format(Messages.INVALID_OPERATION_DISPOSED_RECEIVER, "createTransaction")));
        }
        return this.connectionProcessor.flatMap(connection -> connection.createSession(TRANSACTION_LINK_NAME)).flatMap(transactionSession -> transactionSession.createTransaction()).map(transaction -> new ServiceBusTransactionContext(transaction.getTransactionId()));
    }

    public Mono<Void> commitTransaction(ServiceBusTransactionContext transactionContext) {
        if (this.isDisposed.get()) {
            return FluxUtil.monoError((ClientLogger)LOGGER, (RuntimeException)new IllegalStateException(String.format(Messages.INVALID_OPERATION_DISPOSED_RECEIVER, "commitTransaction")));
        }
        if (Objects.isNull(transactionContext)) {
            return FluxUtil.monoError((ClientLogger)LOGGER, (RuntimeException)new NullPointerException("'transactionContext' cannot be null."));
        }
        if (Objects.isNull(transactionContext.getTransactionId())) {
            return FluxUtil.monoError((ClientLogger)LOGGER, (RuntimeException)new NullPointerException("'transactionContext.transactionId' cannot be null."));
        }
        return this.connectionProcessor.flatMap(connection -> connection.createSession(TRANSACTION_LINK_NAME)).flatMap(transactionSession -> transactionSession.commitTransaction(new AmqpTransaction(transactionContext.getTransactionId())));
    }

    public Mono<Void> rollbackTransaction(ServiceBusTransactionContext transactionContext) {
        if (this.isDisposed.get()) {
            return FluxUtil.monoError((ClientLogger)LOGGER, (RuntimeException)new IllegalStateException(String.format(Messages.INVALID_OPERATION_DISPOSED_RECEIVER, "rollbackTransaction")));
        }
        if (Objects.isNull(transactionContext)) {
            return FluxUtil.monoError((ClientLogger)LOGGER, (RuntimeException)new NullPointerException("'transactionContext' cannot be null."));
        }
        if (Objects.isNull(transactionContext.getTransactionId())) {
            return FluxUtil.monoError((ClientLogger)LOGGER, (RuntimeException)new NullPointerException("'transactionContext.transactionId' cannot be null."));
        }
        return this.connectionProcessor.flatMap(connection -> connection.createSession(TRANSACTION_LINK_NAME)).flatMap(transactionSession -> transactionSession.rollbackTransaction(new AmqpTransaction(transactionContext.getTransactionId())));
    }

    @Override
    public void close() {
        if (this.isDisposed.get()) {
            return;
        }
        try {
            boolean acquired = this.completionLock.tryAcquire(5L, TimeUnit.SECONDS);
            if (!acquired) {
                LOGGER.info("Unable to obtain completion lock.");
            }
        }
        catch (InterruptedException e) {
            LOGGER.info("Unable to obtain completion lock.", new Object[]{e});
        }
        if (this.isDisposed.getAndSet(true)) {
            return;
        }
        LOGGER.info("Removing receiver links.");
        ServiceBusAsyncConsumer disposed = this.consumer.getAndSet(null);
        if (disposed != null) {
            disposed.close();
        }
        if (this.sessionManager != null) {
            this.sessionManager.close();
        }
        this.managementNodeLocks.close();
        this.renewalContainer.close();
        this.onClientClose.run();
    }

    ReceiverOptions getReceiverOptions() {
        return this.receiverOptions;
    }

    private boolean isManagementToken(String lockToken) {
        return this.managementNodeLocks.containsUnexpired(lockToken);
    }

    private Mono<Void> updateDisposition(ServiceBusReceivedMessage message, DispositionStatus dispositionStatus, String deadLetterReason, String deadLetterErrorDescription, Map<String, Object> propertiesToModify, ServiceBusTransactionContext transactionContext) {
        Mono updateDispositionOperation;
        if (this.isDisposed.get()) {
            return FluxUtil.monoError((ClientLogger)LOGGER, (RuntimeException)new IllegalStateException(String.format(Messages.INVALID_OPERATION_DISPOSED_RECEIVER, dispositionStatus.getValue())));
        }
        if (Objects.isNull(message)) {
            return FluxUtil.monoError((ClientLogger)LOGGER, (RuntimeException)new NullPointerException("'message' cannot be null."));
        }
        String lockToken = message.getLockToken();
        String sessionId = message.getSessionId();
        if (this.receiverOptions.getReceiveMode() != ServiceBusReceiveMode.PEEK_LOCK) {
            return Mono.error((Throwable)LOGGER.logExceptionAsError((RuntimeException)new UnsupportedOperationException(String.format("'%s' is not supported on a receiver opened in ReceiveMode.RECEIVE_AND_DELETE.", new Object[]{dispositionStatus}))));
        }
        if (message.isSettled()) {
            return Mono.error((Throwable)LOGGER.logExceptionAsError((RuntimeException)new IllegalArgumentException("The message has either been deleted or already settled.")));
        }
        if (message.getLockToken() == null) {
            String errorMessage = "This operation is not supported for peeked messages. Only messages received using receiveMessages() in PEEK_LOCK mode can be settled.";
            return Mono.error((Throwable)LOGGER.logExceptionAsError((RuntimeException)new UnsupportedOperationException("This operation is not supported for peeked messages. Only messages received using receiveMessages() in PEEK_LOCK mode can be settled.")));
        }
        String sessionIdToUse = sessionId == null && !CoreUtils.isNullOrEmpty((CharSequence)this.receiverOptions.getSessionId()) ? this.receiverOptions.getSessionId() : sessionId;
        LOGGER.atVerbose().addKeyValue("lockToken", lockToken).addKeyValue("entityPath", this.entityPath).addKeyValue("sessionId", sessionIdToUse).addKeyValue("dispositionStatus", (Object)dispositionStatus).log("Update started.");
        Mono performOnManagement = this.connectionProcessor.flatMap(connection -> connection.getManagementNode(this.entityPath, this.entityType)).flatMap(node -> node.updateDisposition(lockToken, dispositionStatus, deadLetterReason, deadLetterErrorDescription, propertiesToModify, sessionId, this.getLinkName(sessionId), transactionContext)).then(Mono.fromRunnable(() -> {
            LOGGER.atInfo().addKeyValue("lockToken", lockToken).addKeyValue("entityPath", this.entityPath).addKeyValue("dispositionStatus", (Object)dispositionStatus).log("Management node Update completed.");
            message.setIsSettled();
            this.managementNodeLocks.remove(lockToken);
            this.renewalContainer.remove(lockToken);
        }));
        if (this.sessionManager != null) {
            updateDispositionOperation = this.sessionManager.updateDisposition(lockToken, sessionId, dispositionStatus, propertiesToModify, deadLetterReason, deadLetterErrorDescription, transactionContext).flatMap(isSuccess -> {
                if (isSuccess.booleanValue()) {
                    message.setIsSettled();
                    this.renewalContainer.remove(lockToken);
                    return Mono.empty();
                }
                LOGGER.info("Could not perform on session manger. Performing on management node.");
                return performOnManagement;
            });
        } else {
            ServiceBusAsyncConsumer existingConsumer = this.consumer.get();
            updateDispositionOperation = this.isManagementToken(lockToken) || existingConsumer == null ? performOnManagement : existingConsumer.updateDisposition(lockToken, dispositionStatus, deadLetterReason, deadLetterErrorDescription, propertiesToModify, transactionContext).then(Mono.fromRunnable(() -> {
                LOGGER.atVerbose().addKeyValue("lockToken", lockToken).addKeyValue("entityPath", this.entityPath).addKeyValue("dispositionStatus", (Object)dispositionStatus).log("Update completed.");
                message.setIsSettled();
                this.renewalContainer.remove(lockToken);
            }));
        }
        return updateDispositionOperation.onErrorMap(throwable -> {
            if (throwable instanceof ServiceBusException) {
                return throwable;
            }
            switch (dispositionStatus) {
                case COMPLETED: {
                    return new ServiceBusException((Throwable)throwable, ServiceBusErrorSource.COMPLETE);
                }
                case ABANDONED: {
                    return new ServiceBusException((Throwable)throwable, ServiceBusErrorSource.ABANDON);
                }
            }
            return new ServiceBusException((Throwable)throwable, ServiceBusErrorSource.UNKNOWN);
        });
    }

    private ServiceBusAsyncConsumer getOrCreateConsumer() {
        ServiceBusAsyncConsumer existing = this.consumer.get();
        if (existing != null) {
            return existing;
        }
        String linkName = StringUtil.getRandomString((String)this.entityPath);
        LOGGER.atInfo().addKeyValue("linkName", linkName).addKeyValue("entityPath", this.entityPath).log("Creating consumer.");
        Mono receiveLinkMono = this.connectionProcessor.flatMap(connection -> {
            if (this.receiverOptions.isSessionReceiver()) {
                return connection.createReceiveLink(linkName, this.entityPath, this.receiverOptions.getReceiveMode(), null, this.entityType, this.receiverOptions.getSessionId());
            }
            return connection.createReceiveLink(linkName, this.entityPath, this.receiverOptions.getReceiveMode(), null, this.entityType);
        }).doOnNext(next -> LOGGER.atVerbose().addKeyValue("linkName", linkName).addKeyValue("entityPath", next.getEntityPath()).addKeyValue("mode", (Object)this.receiverOptions.getReceiveMode()).addKeyValue("isSessionEnabled", CoreUtils.isNullOrEmpty((CharSequence)this.receiverOptions.getSessionId())).addKeyValue("entityType", (Object)this.entityType).log("Created consumer for Service Bus resource."));
        Mono retryableReceiveLinkMono = RetryUtil.withRetry((Mono)receiveLinkMono.onErrorMap(RequestResponseChannelClosedException.class, e -> new AmqpException(true, e.getMessage(), (Throwable)e, null)), (AmqpRetryOptions)this.connectionProcessor.getRetryOptions(), (String)("Failed to create receive link " + linkName), (boolean)true);
        Flux receiveLinkFlux = retryableReceiveLinkMono.repeat();
        AmqpRetryPolicy retryPolicy = RetryUtil.getRetryPolicy((AmqpRetryOptions)this.connectionProcessor.getRetryOptions());
        ServiceBusReceiveLinkProcessor linkMessageProcessor = (ServiceBusReceiveLinkProcessor)receiveLinkFlux.subscribeWith((Subscriber)new ServiceBusReceiveLinkProcessor(this.receiverOptions.getPrefetchCount(), retryPolicy));
        ServiceBusAsyncConsumer newConsumer = new ServiceBusAsyncConsumer(linkName, linkMessageProcessor, this.messageSerializer, this.receiverOptions);
        if (this.consumer.compareAndSet(null, newConsumer)) {
            return newConsumer;
        }
        newConsumer.close();
        return this.consumer.get();
    }

    private String getLinkName(String sessionId) {
        if (this.sessionManager != null && !CoreUtils.isNullOrEmpty((CharSequence)sessionId)) {
            return this.sessionManager.getLinkName(sessionId);
        }
        if (!CoreUtils.isNullOrEmpty((CharSequence)sessionId) && !this.receiverOptions.isSessionReceiver()) {
            return null;
        }
        ServiceBusAsyncConsumer existing = this.consumer.get();
        return existing != null ? existing.getLinkName() : null;
    }

    Mono<OffsetDateTime> renewSessionLock(String sessionId) {
        if (this.isDisposed.get()) {
            return FluxUtil.monoError((ClientLogger)LOGGER, (RuntimeException)new IllegalStateException(String.format(Messages.INVALID_OPERATION_DISPOSED_RECEIVER, "renewSessionLock")));
        }
        if (!this.receiverOptions.isSessionReceiver()) {
            return FluxUtil.monoError((ClientLogger)LOGGER, (RuntimeException)new IllegalStateException("Cannot renew session lock on a non-session receiver."));
        }
        String linkName = this.sessionManager != null ? this.sessionManager.getLinkName(sessionId) : null;
        return this.connectionProcessor.flatMap(connection -> connection.getManagementNode(this.entityPath, this.entityType)).flatMap(channel -> channel.renewSessionLock(sessionId, linkName)).onErrorMap(throwable -> this.mapError((Throwable)throwable, ServiceBusErrorSource.RENEW_LOCK));
    }

    Mono<Void> renewSessionLock(String sessionId, Duration maxLockRenewalDuration) {
        if (this.isDisposed.get()) {
            return FluxUtil.monoError((ClientLogger)LOGGER, (RuntimeException)new IllegalStateException(String.format(Messages.INVALID_OPERATION_DISPOSED_RECEIVER, "renewSessionLock")));
        }
        if (!this.receiverOptions.isSessionReceiver()) {
            return FluxUtil.monoError((ClientLogger)LOGGER, (RuntimeException)new IllegalStateException("Cannot renew session lock on a non-session receiver."));
        }
        if (maxLockRenewalDuration == null) {
            return FluxUtil.monoError((ClientLogger)LOGGER, (RuntimeException)new NullPointerException("'maxLockRenewalDuration' cannot be null."));
        }
        if (maxLockRenewalDuration.isNegative()) {
            return FluxUtil.monoError((ClientLogger)LOGGER, (RuntimeException)new IllegalArgumentException("'maxLockRenewalDuration' cannot be negative."));
        }
        if (Objects.isNull(sessionId)) {
            return FluxUtil.monoError((ClientLogger)LOGGER, (RuntimeException)new NullPointerException("'sessionId' cannot be null."));
        }
        if (sessionId.isEmpty()) {
            return FluxUtil.monoError((ClientLogger)LOGGER, (RuntimeException)new IllegalArgumentException("'sessionId' cannot be empty."));
        }
        LockRenewalOperation operation = new LockRenewalOperation(sessionId, maxLockRenewalDuration, true, this::renewSessionLock);
        this.renewalContainer.addOrUpdate(sessionId, OffsetDateTime.now().plus(maxLockRenewalDuration), operation);
        return operation.getCompletionOperation().onErrorMap(throwable -> this.mapError((Throwable)throwable, ServiceBusErrorSource.RENEW_LOCK));
    }

    Mono<Void> setSessionState(String sessionId, byte[] sessionState) {
        if (this.isDisposed.get()) {
            return FluxUtil.monoError((ClientLogger)LOGGER, (RuntimeException)new IllegalStateException(String.format(Messages.INVALID_OPERATION_DISPOSED_RECEIVER, "setSessionState")));
        }
        if (!this.receiverOptions.isSessionReceiver()) {
            return FluxUtil.monoError((ClientLogger)LOGGER, (RuntimeException)new IllegalStateException("Cannot set session state on a non-session receiver."));
        }
        String linkName = this.sessionManager != null ? this.sessionManager.getLinkName(sessionId) : null;
        return this.connectionProcessor.flatMap(connection -> connection.getManagementNode(this.entityPath, this.entityType)).flatMap(channel -> channel.setSessionState(sessionId, sessionState, linkName)).onErrorMap(err -> this.mapError((Throwable)err, ServiceBusErrorSource.RECEIVE));
    }

    Mono<byte[]> getSessionState(String sessionId) {
        if (this.isDisposed.get()) {
            return FluxUtil.monoError((ClientLogger)LOGGER, (RuntimeException)new IllegalStateException(String.format(Messages.INVALID_OPERATION_DISPOSED_RECEIVER, "getSessionState")));
        }
        if (!this.receiverOptions.isSessionReceiver()) {
            return FluxUtil.monoError((ClientLogger)LOGGER, (RuntimeException)new IllegalStateException("Cannot get session state on a non-session receiver."));
        }
        Mono result = this.sessionManager != null ? this.sessionManager.getSessionState(sessionId) : this.connectionProcessor.flatMap(connection -> connection.getManagementNode(this.entityPath, this.entityType)).flatMap(channel -> channel.getSessionState(sessionId, this.getLinkName(sessionId)));
        return result.onErrorMap(err -> this.mapError((Throwable)err, ServiceBusErrorSource.RECEIVE));
    }

    private Throwable mapError(Throwable throwable, ServiceBusErrorSource errorSource) {
        if (!(throwable instanceof ServiceBusException)) {
            return new ServiceBusException(throwable, errorSource);
        }
        return throwable;
    }

    boolean isConnectionClosed() {
        return this.connectionProcessor.isChannelClosed();
    }

    boolean isManagementNodeLocksClosed() {
        return this.managementNodeLocks.isClosed();
    }

    boolean isRenewalContainerClosed() {
        return this.renewalContainer.isClosed();
    }
}

