/*
 * Decompiled with CFR 0.152.
 */
package com.azure.messaging.servicebus;

import com.azure.core.amqp.AmqpRetryOptions;
import com.azure.core.amqp.AmqpRetryPolicy;
import com.azure.core.amqp.AmqpTransaction;
import com.azure.core.amqp.exception.AmqpErrorContext;
import com.azure.core.amqp.exception.LinkErrorContext;
import com.azure.core.amqp.implementation.MessageSerializer;
import com.azure.core.amqp.implementation.RetryUtil;
import com.azure.core.amqp.implementation.StringUtil;
import com.azure.core.amqp.implementation.TracerProvider;
import com.azure.core.util.CoreUtils;
import com.azure.core.util.FluxUtil;
import com.azure.core.util.logging.ClientLogger;
import com.azure.messaging.servicebus.MessageLockToken;
import com.azure.messaging.servicebus.ReceiverOptions;
import com.azure.messaging.servicebus.ServiceBusAsyncConsumer;
import com.azure.messaging.servicebus.ServiceBusReceivedMessage;
import com.azure.messaging.servicebus.ServiceBusReceivedMessageContext;
import com.azure.messaging.servicebus.ServiceBusTransactionContext;
import com.azure.messaging.servicebus.UnnamedSessionManager;
import com.azure.messaging.servicebus.implementation.DispositionStatus;
import com.azure.messaging.servicebus.implementation.MessageLockContainer;
import com.azure.messaging.servicebus.implementation.Messages;
import com.azure.messaging.servicebus.implementation.MessagingEntityType;
import com.azure.messaging.servicebus.implementation.ServiceBusConnectionProcessor;
import com.azure.messaging.servicebus.implementation.ServiceBusReceiveLinkProcessor;
import com.azure.messaging.servicebus.models.DeadLetterOptions;
import com.azure.messaging.servicebus.models.ReceiveMode;
import java.time.Duration;
import java.time.Instant;
import java.util.Collections;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import reactor.core.Disposable;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

public final class ServiceBusReceiverAsyncClient
implements AutoCloseable {
    private static final DeadLetterOptions DEFAULT_DEAD_LETTER_OPTIONS = new DeadLetterOptions();
    private static final String TRANSACTION_LINK_NAME = "coordinator";
    private final AtomicBoolean isDisposed = new AtomicBoolean();
    private final MessageLockContainer managementNodeLocks;
    private final ClientLogger logger = new ClientLogger(ServiceBusReceiverAsyncClient.class);
    private final String fullyQualifiedNamespace;
    private final String entityPath;
    private final MessagingEntityType entityType;
    private final ReceiverOptions receiverOptions;
    private final ServiceBusConnectionProcessor connectionProcessor;
    private final TracerProvider tracerProvider;
    private final MessageSerializer messageSerializer;
    private final Runnable onClientClose;
    private final UnnamedSessionManager unnamedSessionManager;
    private final AtomicLong lastPeekedSequenceNumber = new AtomicLong(-1L);
    private final AtomicReference<ServiceBusAsyncConsumer> consumer = new AtomicReference();

    ServiceBusReceiverAsyncClient(String fullyQualifiedNamespace, String entityPath, MessagingEntityType entityType, ReceiverOptions receiverOptions, ServiceBusConnectionProcessor connectionProcessor, Duration cleanupInterval, TracerProvider tracerProvider, MessageSerializer messageSerializer, Runnable onClientClose) {
        this.fullyQualifiedNamespace = Objects.requireNonNull(fullyQualifiedNamespace, "'fullyQualifiedNamespace' cannot be null.");
        this.entityPath = Objects.requireNonNull(entityPath, "'entityPath' cannot be null.");
        this.entityType = Objects.requireNonNull(entityType, "'entityType' cannot be null.");
        this.receiverOptions = Objects.requireNonNull(receiverOptions, "'receiveOptions cannot be null.'");
        this.connectionProcessor = Objects.requireNonNull(connectionProcessor, "'connectionProcessor' cannot be null.");
        this.tracerProvider = Objects.requireNonNull(tracerProvider, "'tracerProvider' cannot be null.");
        this.messageSerializer = Objects.requireNonNull(messageSerializer, "'messageSerializer' cannot be null.");
        this.onClientClose = Objects.requireNonNull(onClientClose, "'onClientClose' cannot be null.");
        this.managementNodeLocks = new MessageLockContainer(cleanupInterval);
        this.unnamedSessionManager = null;
    }

    ServiceBusReceiverAsyncClient(String fullyQualifiedNamespace, String entityPath, MessagingEntityType entityType, ReceiverOptions receiverOptions, ServiceBusConnectionProcessor connectionProcessor, Duration cleanupInterval, TracerProvider tracerProvider, MessageSerializer messageSerializer, Runnable onClientClose, UnnamedSessionManager unnamedSessionManager) {
        this.fullyQualifiedNamespace = Objects.requireNonNull(fullyQualifiedNamespace, "'fullyQualifiedNamespace' cannot be null.");
        this.entityPath = Objects.requireNonNull(entityPath, "'entityPath' cannot be null.");
        this.entityType = Objects.requireNonNull(entityType, "'entityType' cannot be null.");
        this.receiverOptions = Objects.requireNonNull(receiverOptions, "'receiveOptions cannot be null.'");
        this.connectionProcessor = Objects.requireNonNull(connectionProcessor, "'connectionProcessor' cannot be null.");
        this.tracerProvider = Objects.requireNonNull(tracerProvider, "'tracerProvider' cannot be null.");
        this.messageSerializer = Objects.requireNonNull(messageSerializer, "'messageSerializer' cannot be null.");
        this.onClientClose = Objects.requireNonNull(onClientClose, "'onClientClose' cannot be null.");
        this.unnamedSessionManager = Objects.requireNonNull(unnamedSessionManager, "'sessionManager' cannot be null.");
        this.managementNodeLocks = new MessageLockContainer(cleanupInterval);
    }

    public String getFullyQualifiedNamespace() {
        return this.fullyQualifiedNamespace;
    }

    public String getEntityPath() {
        return this.entityPath;
    }

    public Mono<Void> abandon(MessageLockToken lockToken) {
        return this.abandon(lockToken, this.receiverOptions.getSessionId());
    }

    public Mono<Void> abandon(MessageLockToken lockToken, String sessionId) {
        return this.abandon(lockToken, null, sessionId);
    }

    public Mono<Void> abandon(MessageLockToken lockToken, Map<String, Object> propertiesToModify) {
        return this.abandon(lockToken, propertiesToModify, this.receiverOptions.getSessionId());
    }

    public Mono<Void> abandon(MessageLockToken lockToken, Map<String, Object> propertiesToModify, ServiceBusTransactionContext transactionContext) {
        return this.abandon(lockToken, propertiesToModify, this.receiverOptions.getSessionId(), transactionContext);
    }

    public Mono<Void> abandon(MessageLockToken lockToken, Map<String, Object> propertiesToModify, String sessionId) {
        return this.updateDisposition(lockToken, DispositionStatus.ABANDONED, null, null, propertiesToModify, sessionId, null);
    }

    public Mono<Void> abandon(MessageLockToken lockToken, Map<String, Object> propertiesToModify, String sessionId, ServiceBusTransactionContext transactionContext) {
        if (Objects.isNull(transactionContext)) {
            return FluxUtil.monoError((ClientLogger)this.logger, (RuntimeException)new NullPointerException("'transactionContext' cannot be null."));
        }
        if (Objects.isNull(transactionContext.getTransactionId())) {
            return FluxUtil.monoError((ClientLogger)this.logger, (RuntimeException)new NullPointerException("'transactionContext.transactionId' cannot be null."));
        }
        return this.updateDisposition(lockToken, DispositionStatus.ABANDONED, null, null, propertiesToModify, sessionId, transactionContext);
    }

    public Mono<Void> complete(MessageLockToken lockToken) {
        if (lockToken instanceof ServiceBusReceivedMessage) {
            return this.complete(lockToken, ((ServiceBusReceivedMessage)lockToken).getSessionId());
        }
        return this.updateDisposition(lockToken, DispositionStatus.COMPLETED, null, null, null, null, null);
    }

    public Mono<Void> complete(MessageLockToken lockToken, ServiceBusTransactionContext transactionContext) {
        return this.complete(lockToken, this.receiverOptions.getSessionId(), transactionContext);
    }

    public Mono<Void> complete(MessageLockToken lockToken, String sessionId) {
        return this.updateDisposition(lockToken, DispositionStatus.COMPLETED, null, null, null, sessionId, null);
    }

    public Mono<Void> complete(MessageLockToken lockToken, String sessionId, ServiceBusTransactionContext transactionContext) {
        if (Objects.isNull(transactionContext)) {
            return FluxUtil.monoError((ClientLogger)this.logger, (RuntimeException)new NullPointerException("'transactionContext' cannot be null."));
        }
        if (Objects.isNull(transactionContext.getTransactionId())) {
            return FluxUtil.monoError((ClientLogger)this.logger, (RuntimeException)new NullPointerException("'transactionContext.transactionId' cannot be null."));
        }
        return this.updateDisposition(lockToken, DispositionStatus.COMPLETED, null, null, null, sessionId, transactionContext);
    }

    public Mono<Void> defer(MessageLockToken lockToken) {
        return this.defer(lockToken, this.receiverOptions.getSessionId());
    }

    public Mono<Void> defer(MessageLockToken lockToken, String sessionId) {
        return this.defer(lockToken, null, sessionId);
    }

    public Mono<Void> defer(MessageLockToken lockToken, Map<String, Object> propertiesToModify) {
        return this.defer(lockToken, propertiesToModify, this.receiverOptions.getSessionId());
    }

    public Mono<Void> defer(MessageLockToken lockToken, Map<String, Object> propertiesToModify, ServiceBusTransactionContext transactionContext) {
        return this.defer(lockToken, propertiesToModify, this.receiverOptions.getSessionId(), transactionContext);
    }

    public Mono<Void> defer(MessageLockToken lockToken, Map<String, Object> propertiesToModify, String sessionId) {
        return this.updateDisposition(lockToken, DispositionStatus.DEFERRED, null, null, propertiesToModify, sessionId, null);
    }

    public Mono<Void> defer(MessageLockToken lockToken, Map<String, Object> propertiesToModify, String sessionId, ServiceBusTransactionContext transactionContext) {
        if (Objects.isNull(transactionContext)) {
            return FluxUtil.monoError((ClientLogger)this.logger, (RuntimeException)new NullPointerException("'transactionContext' cannot be null."));
        }
        if (Objects.isNull(transactionContext.getTransactionId())) {
            return FluxUtil.monoError((ClientLogger)this.logger, (RuntimeException)new NullPointerException("'transactionContext.transactionId' cannot be null."));
        }
        return this.updateDisposition(lockToken, DispositionStatus.DEFERRED, null, null, propertiesToModify, sessionId, transactionContext);
    }

    public Mono<Void> deadLetter(MessageLockToken lockToken) {
        return this.deadLetter(lockToken, this.receiverOptions.getSessionId());
    }

    public Mono<Void> deadLetter(MessageLockToken lockToken, String sessionId) {
        return this.deadLetter(lockToken, DEFAULT_DEAD_LETTER_OPTIONS, sessionId);
    }

    public Mono<Void> deadLetter(MessageLockToken lockToken, String sessionId, ServiceBusTransactionContext transactionContext) {
        return this.deadLetter(lockToken, DEFAULT_DEAD_LETTER_OPTIONS, sessionId, transactionContext);
    }

    public Mono<Void> deadLetter(MessageLockToken lockToken, DeadLetterOptions deadLetterOptions) {
        return this.deadLetter(lockToken, deadLetterOptions, this.receiverOptions.getSessionId());
    }

    public Mono<Void> deadLetter(MessageLockToken lockToken, DeadLetterOptions deadLetterOptions, ServiceBusTransactionContext transactionContext) {
        return this.deadLetter(lockToken, deadLetterOptions, this.receiverOptions.getSessionId(), transactionContext);
    }

    public Mono<Void> deadLetter(MessageLockToken lockToken, DeadLetterOptions deadLetterOptions, String sessionId) {
        if (Objects.isNull(deadLetterOptions)) {
            return FluxUtil.monoError((ClientLogger)this.logger, (RuntimeException)new NullPointerException("'deadLetterOptions' cannot be null."));
        }
        return this.updateDisposition(lockToken, DispositionStatus.SUSPENDED, deadLetterOptions.getDeadLetterReason(), deadLetterOptions.getDeadLetterErrorDescription(), deadLetterOptions.getPropertiesToModify(), sessionId, null);
    }

    public Mono<Void> deadLetter(MessageLockToken lockToken, DeadLetterOptions deadLetterOptions, String sessionId, ServiceBusTransactionContext transactionContext) {
        if (Objects.isNull(transactionContext)) {
            return FluxUtil.monoError((ClientLogger)this.logger, (RuntimeException)new NullPointerException("'transactionContext' cannot be null."));
        }
        if (Objects.isNull(transactionContext.getTransactionId())) {
            return FluxUtil.monoError((ClientLogger)this.logger, (RuntimeException)new NullPointerException("'transactionContext.transactionId' cannot be null."));
        }
        return this.updateDisposition(lockToken, DispositionStatus.SUSPENDED, deadLetterOptions.getDeadLetterReason(), deadLetterOptions.getDeadLetterErrorDescription(), deadLetterOptions.getPropertiesToModify(), sessionId, transactionContext);
    }

    public Mono<byte[]> getSessionState(String sessionId) {
        if (this.isDisposed.get()) {
            return FluxUtil.monoError((ClientLogger)this.logger, (RuntimeException)new IllegalStateException(String.format(Messages.INVALID_OPERATION_DISPOSED_RECEIVER, "getSessionState")));
        }
        if (!this.receiverOptions.isSessionReceiver()) {
            return FluxUtil.monoError((ClientLogger)this.logger, (RuntimeException)new IllegalStateException("Cannot get session state on a non-session receiver."));
        }
        if (this.unnamedSessionManager != null) {
            return this.unnamedSessionManager.getSessionState(sessionId);
        }
        return this.connectionProcessor.flatMap(connection -> connection.getManagementNode(this.entityPath, this.entityType)).flatMap(channel -> channel.getSessionState(sessionId, this.getLinkName(sessionId)));
    }

    public Mono<ServiceBusReceivedMessage> peek() {
        return this.peek(this.receiverOptions.getSessionId());
    }

    public Mono<ServiceBusReceivedMessage> peek(String sessionId) {
        if (this.isDisposed.get()) {
            return FluxUtil.monoError((ClientLogger)this.logger, (RuntimeException)new IllegalStateException(String.format(Messages.INVALID_OPERATION_DISPOSED_RECEIVER, "peek")));
        }
        return this.connectionProcessor.flatMap(connection -> connection.getManagementNode(this.entityPath, this.entityType)).flatMap(channel -> {
            long sequence = this.lastPeekedSequenceNumber.get() + 1L;
            this.logger.verbose("Peek message from sequence number: {}", new Object[]{sequence});
            return channel.peek(sequence, sessionId, this.getLinkName(sessionId));
        }).handle((message, sink) -> {
            long current = this.lastPeekedSequenceNumber.updateAndGet(value -> Math.max(value, message.getSequenceNumber()));
            this.logger.verbose("Updating last peeked sequence number: {}", new Object[]{current});
            sink.next(message);
        });
    }

    public Mono<ServiceBusReceivedMessage> peekAt(long sequenceNumber) {
        return this.peekAt(sequenceNumber, this.receiverOptions.getSessionId());
    }

    public Mono<ServiceBusReceivedMessage> peekAt(long sequenceNumber, String sessionId) {
        if (this.isDisposed.get()) {
            return FluxUtil.monoError((ClientLogger)this.logger, (RuntimeException)new IllegalStateException(String.format(Messages.INVALID_OPERATION_DISPOSED_RECEIVER, "peekAt")));
        }
        return this.connectionProcessor.flatMap(connection -> connection.getManagementNode(this.entityPath, this.entityType)).flatMap(node -> node.peek(sequenceNumber, sessionId, this.getLinkName(sessionId)));
    }

    public Flux<ServiceBusReceivedMessage> peekBatch(int maxMessages) {
        return this.peekBatch(maxMessages, this.receiverOptions.getSessionId());
    }

    public Flux<ServiceBusReceivedMessage> peekBatch(int maxMessages, String sessionId) {
        if (this.isDisposed.get()) {
            return FluxUtil.fluxError((ClientLogger)this.logger, (RuntimeException)new IllegalStateException(String.format(Messages.INVALID_OPERATION_DISPOSED_RECEIVER, "peekBatch")));
        }
        return this.connectionProcessor.flatMap(connection -> connection.getManagementNode(this.entityPath, this.entityType)).flatMapMany(node -> {
            long nextSequenceNumber = this.lastPeekedSequenceNumber.get() + 1L;
            this.logger.verbose("Peek batch from sequence number: {}", new Object[]{nextSequenceNumber});
            Flux<ServiceBusReceivedMessage> messages = node.peek(nextSequenceNumber, sessionId, this.getLinkName(sessionId), maxMessages);
            Mono handle = messages.switchIfEmpty((Publisher)Mono.fromCallable(() -> {
                ServiceBusReceivedMessage emptyMessage = new ServiceBusReceivedMessage(new byte[0]);
                emptyMessage.setSequenceNumber(this.lastPeekedSequenceNumber.get());
                return emptyMessage;
            })).last().handle((last, sink) -> {
                long current = this.lastPeekedSequenceNumber.updateAndGet(value -> Math.max(value, last.getSequenceNumber()));
                this.logger.verbose("Last peeked sequence number in batch: {}", new Object[]{current});
                sink.complete();
            });
            return Flux.merge((Publisher[])new Publisher[]{messages, handle});
        });
    }

    public Flux<ServiceBusReceivedMessage> peekBatchAt(int maxMessages, long sequenceNumber) {
        return this.peekBatchAt(maxMessages, sequenceNumber, this.receiverOptions.getSessionId());
    }

    public Flux<ServiceBusReceivedMessage> peekBatchAt(int maxMessages, long sequenceNumber, String sessionId) {
        if (this.isDisposed.get()) {
            return FluxUtil.fluxError((ClientLogger)this.logger, (RuntimeException)new IllegalStateException(String.format(Messages.INVALID_OPERATION_DISPOSED_RECEIVER, "peekBatchAt")));
        }
        return this.connectionProcessor.flatMap(connection -> connection.getManagementNode(this.entityPath, this.entityType)).flatMapMany(node -> node.peek(sequenceNumber, sessionId, this.getLinkName(sessionId), maxMessages));
    }

    public Flux<ServiceBusReceivedMessageContext> receive() {
        if (this.unnamedSessionManager != null) {
            return this.unnamedSessionManager.receive();
        }
        return this.getOrCreateConsumer().receive().map(ServiceBusReceivedMessageContext::new);
    }

    public Flux<ServiceBusReceivedMessageContext> receive(int maxNumberOfMessages, Duration maxWaitTime) {
        if (maxNumberOfMessages < 1) {
            return FluxUtil.fluxError((ClientLogger)this.logger, (RuntimeException)new IllegalArgumentException("'maxNumberOfMessages' cannot be less than 1."));
        }
        if (maxWaitTime == null) {
            return FluxUtil.fluxError((ClientLogger)this.logger, (RuntimeException)new NullPointerException("'maxWaitTime' cannot be null."));
        }
        if (maxWaitTime.isNegative() || maxWaitTime.isZero()) {
            return FluxUtil.fluxError((ClientLogger)this.logger, (RuntimeException)new NullPointerException("'maxWaitTime' cannot be negative or zero."));
        }
        return this.receive().take((long)maxNumberOfMessages).take(maxWaitTime);
    }

    public Mono<ServiceBusReceivedMessage> receiveDeferredMessage(long sequenceNumber) {
        return this.receiveDeferredMessage(sequenceNumber, this.receiverOptions.getSessionId());
    }

    public Mono<ServiceBusReceivedMessage> receiveDeferredMessage(long sequenceNumber, String sessionId) {
        return this.connectionProcessor.flatMap(connection -> connection.getManagementNode(this.entityPath, this.entityType)).flatMap(node -> node.receiveDeferredMessages(this.receiverOptions.getReceiveMode(), sessionId, this.getLinkName(sessionId), Collections.singleton(sequenceNumber)).last()).map(receivedMessage -> {
            if (CoreUtils.isNullOrEmpty((CharSequence)receivedMessage.getLockToken())) {
                return receivedMessage;
            }
            if (this.receiverOptions.getReceiveMode() == ReceiveMode.PEEK_LOCK) {
                receivedMessage.setLockedUntil(this.managementNodeLocks.addOrUpdate(receivedMessage.getLockToken(), receivedMessage.getLockedUntil()));
            }
            return receivedMessage;
        });
    }

    public Flux<ServiceBusReceivedMessage> receiveDeferredMessageBatch(Iterable<Long> sequenceNumbers) {
        return this.receiveDeferredMessageBatch(sequenceNumbers, this.receiverOptions.getSessionId());
    }

    public Flux<ServiceBusReceivedMessage> receiveDeferredMessageBatch(Iterable<Long> sequenceNumbers, String sessionId) {
        if (this.isDisposed.get()) {
            return FluxUtil.fluxError((ClientLogger)this.logger, (RuntimeException)new IllegalStateException(String.format(Messages.INVALID_OPERATION_DISPOSED_RECEIVER, "receiveDeferredMessageBatch")));
        }
        return this.connectionProcessor.flatMap(connection -> connection.getManagementNode(this.entityPath, this.entityType)).flatMapMany(node -> node.receiveDeferredMessages(this.receiverOptions.getReceiveMode(), sessionId, this.getLinkName(sessionId), sequenceNumbers)).map(receivedMessage -> {
            if (CoreUtils.isNullOrEmpty((CharSequence)receivedMessage.getLockToken())) {
                return receivedMessage;
            }
            if (this.receiverOptions.getReceiveMode() == ReceiveMode.PEEK_LOCK) {
                receivedMessage.setLockedUntil(this.managementNodeLocks.addOrUpdate(receivedMessage.getLockToken(), receivedMessage.getLockedUntil()));
            }
            return receivedMessage;
        });
    }

    public Mono<Instant> renewMessageLock(MessageLockToken lockToken) {
        if (this.isDisposed.get()) {
            return FluxUtil.monoError((ClientLogger)this.logger, (RuntimeException)new IllegalStateException(String.format(Messages.INVALID_OPERATION_DISPOSED_RECEIVER, "renewMessageLock")));
        }
        if (Objects.isNull(lockToken)) {
            return FluxUtil.monoError((ClientLogger)this.logger, (RuntimeException)new NullPointerException("'receivedMessage' cannot be null."));
        }
        if (Objects.isNull(lockToken.getLockToken())) {
            return FluxUtil.monoError((ClientLogger)this.logger, (RuntimeException)new NullPointerException("'receivedMessage.lockToken' cannot be null."));
        }
        if (lockToken.getLockToken().isEmpty()) {
            return FluxUtil.monoError((ClientLogger)this.logger, (RuntimeException)new IllegalArgumentException("'message.lockToken' cannot be empty."));
        }
        if (this.receiverOptions.isSessionReceiver()) {
            return FluxUtil.monoError((ClientLogger)this.logger, (RuntimeException)new IllegalStateException(String.format("Cannot renew message lock [%s] for a session receiver.", lockToken.getLockToken())));
        }
        return this.connectionProcessor.flatMap(connection -> connection.getManagementNode(this.entityPath, this.entityType)).flatMap(serviceBusManagementNode -> serviceBusManagementNode.renewMessageLock(lockToken.getLockToken(), this.getLinkName(null))).map(instant -> {
            if (lockToken instanceof ServiceBusReceivedMessage) {
                ((ServiceBusReceivedMessage)lockToken).setLockedUntil((Instant)instant);
            }
            return this.managementNodeLocks.addOrUpdate(lockToken.getLockToken(), (Instant)instant);
        });
    }

    public Mono<Instant> renewSessionLock(String sessionId) {
        if (this.isDisposed.get()) {
            return FluxUtil.monoError((ClientLogger)this.logger, (RuntimeException)new IllegalStateException(String.format(Messages.INVALID_OPERATION_DISPOSED_RECEIVER, "renewSessionLock")));
        }
        if (!this.receiverOptions.isSessionReceiver()) {
            return FluxUtil.monoError((ClientLogger)this.logger, (RuntimeException)new IllegalStateException("Cannot renew session lock on a non-session receiver."));
        }
        String linkName = this.unnamedSessionManager != null ? this.unnamedSessionManager.getLinkName(sessionId) : null;
        return this.connectionProcessor.flatMap(connection -> connection.getManagementNode(this.entityPath, this.entityType)).flatMap(channel -> channel.renewSessionLock(sessionId, linkName));
    }

    public Mono<Void> setSessionState(String sessionId, byte[] sessionState) {
        if (this.isDisposed.get()) {
            return FluxUtil.monoError((ClientLogger)this.logger, (RuntimeException)new IllegalStateException(String.format(Messages.INVALID_OPERATION_DISPOSED_RECEIVER, "setSessionState")));
        }
        if (!this.receiverOptions.isSessionReceiver()) {
            return FluxUtil.monoError((ClientLogger)this.logger, (RuntimeException)new IllegalStateException("Cannot set session state on a non-session receiver."));
        }
        String linkName = this.unnamedSessionManager != null ? this.unnamedSessionManager.getLinkName(sessionId) : null;
        return this.connectionProcessor.flatMap(connection -> connection.getManagementNode(this.entityPath, this.entityType)).flatMap(channel -> channel.setSessionState(sessionId, sessionState, linkName));
    }

    public Mono<ServiceBusTransactionContext> createTransaction() {
        if (this.isDisposed.get()) {
            return FluxUtil.monoError((ClientLogger)this.logger, (RuntimeException)new IllegalStateException(String.format(Messages.INVALID_OPERATION_DISPOSED_RECEIVER, "createTransaction")));
        }
        return this.connectionProcessor.flatMap(connection -> connection.createSession(TRANSACTION_LINK_NAME)).flatMap(transactionSession -> transactionSession.createTransaction()).map(transaction -> new ServiceBusTransactionContext(transaction.getTransactionId()));
    }

    public Mono<Void> commitTransaction(ServiceBusTransactionContext transactionContext) {
        if (this.isDisposed.get()) {
            return FluxUtil.monoError((ClientLogger)this.logger, (RuntimeException)new IllegalStateException(String.format(Messages.INVALID_OPERATION_DISPOSED_RECEIVER, "commitTransaction")));
        }
        if (Objects.isNull(transactionContext)) {
            return FluxUtil.monoError((ClientLogger)this.logger, (RuntimeException)new NullPointerException("'transactionContext' cannot be null."));
        }
        if (Objects.isNull(transactionContext.getTransactionId())) {
            return FluxUtil.monoError((ClientLogger)this.logger, (RuntimeException)new NullPointerException("'transactionContext.transactionId' cannot be null."));
        }
        return this.connectionProcessor.flatMap(connection -> connection.createSession(TRANSACTION_LINK_NAME)).flatMap(transactionSession -> transactionSession.commitTransaction(new AmqpTransaction(transactionContext.getTransactionId())));
    }

    public Mono<Void> rollbackTransaction(ServiceBusTransactionContext transactionContext) {
        if (this.isDisposed.get()) {
            return FluxUtil.monoError((ClientLogger)this.logger, (RuntimeException)new IllegalStateException(String.format(Messages.INVALID_OPERATION_DISPOSED_RECEIVER, "rollbackTransaction")));
        }
        if (Objects.isNull(transactionContext)) {
            return FluxUtil.monoError((ClientLogger)this.logger, (RuntimeException)new NullPointerException("'transactionContext' cannot be null."));
        }
        if (Objects.isNull(transactionContext.getTransactionId())) {
            return FluxUtil.monoError((ClientLogger)this.logger, (RuntimeException)new NullPointerException("'transactionContext.transactionId' cannot be null."));
        }
        return this.connectionProcessor.flatMap(connection -> connection.createSession(TRANSACTION_LINK_NAME)).flatMap(transactionSession -> transactionSession.rollbackTransaction(new AmqpTransaction(transactionContext.getTransactionId())));
    }

    @Override
    public void close() {
        if (this.isDisposed.getAndSet(true)) {
            return;
        }
        this.logger.info("Removing receiver links.");
        ServiceBusAsyncConsumer disposed = this.consumer.getAndSet(null);
        if (disposed != null) {
            disposed.close();
        }
        if (this.unnamedSessionManager != null) {
            this.unnamedSessionManager.close();
        }
        this.onClientClose.run();
    }

    private boolean isManagementToken(String lockToken) {
        return this.managementNodeLocks.contains(lockToken);
    }

    private Mono<Void> updateDisposition(MessageLockToken message, DispositionStatus dispositionStatus, String deadLetterReason, String deadLetterErrorDescription, Map<String, Object> propertiesToModify, String sessionId, ServiceBusTransactionContext transactionContext) {
        String sessionIdToUse;
        if (this.isDisposed.get()) {
            return FluxUtil.monoError((ClientLogger)this.logger, (RuntimeException)new IllegalStateException(String.format(Messages.INVALID_OPERATION_DISPOSED_RECEIVER, dispositionStatus.getValue())));
        }
        if (Objects.isNull(message)) {
            return FluxUtil.monoError((ClientLogger)this.logger, (RuntimeException)new NullPointerException("'receivedMessage' cannot be null."));
        }
        if (Objects.isNull(message.getLockToken())) {
            return FluxUtil.monoError((ClientLogger)this.logger, (RuntimeException)new NullPointerException("'receivedMessage.lockToken' cannot be null."));
        }
        if (message.getLockToken().isEmpty()) {
            return FluxUtil.monoError((ClientLogger)this.logger, (RuntimeException)new IllegalArgumentException("'message.lockToken' cannot be empty."));
        }
        if (this.receiverOptions.getReceiveMode() != ReceiveMode.PEEK_LOCK) {
            return Mono.error((Throwable)this.logger.logExceptionAsError((RuntimeException)new UnsupportedOperationException(String.format("'%s' is not supported on a receiver opened in ReceiveMode.RECEIVE_AND_DELETE.", new Object[]{dispositionStatus}))));
        }
        if (Objects.isNull(message.getLockToken())) {
            return FluxUtil.monoError((ClientLogger)this.logger, (RuntimeException)new NullPointerException("'receivedMessage.lockToken' cannot be null."));
        }
        if (message.getLockToken().isEmpty()) {
            return FluxUtil.monoError((ClientLogger)this.logger, (RuntimeException)new IllegalArgumentException("'message.lockToken' cannot be empty."));
        }
        String lockToken = message.getLockToken();
        if (message instanceof ServiceBusReceivedMessage) {
            sessionIdToUse = ((ServiceBusReceivedMessage)message).getSessionId();
            if (!(CoreUtils.isNullOrEmpty((CharSequence)sessionIdToUse) || CoreUtils.isNullOrEmpty((CharSequence)sessionId) || sessionIdToUse.equals(sessionId))) {
                this.logger.warning("Given sessionId '{}' does not match message's sessionId '{}'", new Object[]{sessionId, sessionIdToUse});
            }
        } else {
            sessionIdToUse = sessionId == null && !CoreUtils.isNullOrEmpty((CharSequence)this.receiverOptions.getSessionId()) ? this.receiverOptions.getSessionId() : sessionId;
        }
        this.logger.info("{}: Update started. Disposition: {}. Lock: {}. SessionId {}.", new Object[]{this.entityPath, dispositionStatus, lockToken, sessionIdToUse});
        Mono performOnManagement = this.connectionProcessor.flatMap(connection -> connection.getManagementNode(this.entityPath, this.entityType)).flatMap(node -> node.updateDisposition(lockToken, dispositionStatus, deadLetterReason, deadLetterErrorDescription, propertiesToModify, sessionId, this.getLinkName(sessionId), transactionContext)).then(Mono.fromRunnable(() -> {
            this.logger.info("{}: Management node Update completed. Disposition: {}. Lock: {}.", new Object[]{this.entityPath, dispositionStatus, lockToken});
            this.managementNodeLocks.remove(lockToken);
        }));
        if (this.unnamedSessionManager != null) {
            return this.unnamedSessionManager.updateDisposition(message, sessionId, dispositionStatus, propertiesToModify, deadLetterReason, deadLetterErrorDescription, transactionContext).flatMap(isSuccess -> {
                if (isSuccess.booleanValue()) {
                    return Mono.empty();
                }
                this.logger.info("Could not perform on session manger. Performing on management node.");
                return performOnManagement;
            });
        }
        ServiceBusAsyncConsumer existingConsumer = this.consumer.get();
        if (this.isManagementToken(lockToken) || existingConsumer == null) {
            return performOnManagement;
        }
        return existingConsumer.updateDisposition(lockToken, dispositionStatus, deadLetterReason, deadLetterErrorDescription, propertiesToModify, transactionContext).then(Mono.fromRunnable(() -> this.logger.info("{}: Update completed. Disposition: {}. Lock: {}.", new Object[]{this.entityPath, dispositionStatus, lockToken})));
    }

    private ServiceBusAsyncConsumer getOrCreateConsumer() {
        ServiceBusAsyncConsumer existing = this.consumer.get();
        if (existing != null) {
            return existing;
        }
        String linkName = StringUtil.getRandomString((String)this.entityPath);
        this.logger.info("{}: Creating consumer for link '{}'", new Object[]{this.entityPath, linkName});
        Flux receiveLink = this.connectionProcessor.flatMap(connection -> {
            if (this.receiverOptions.isSessionReceiver()) {
                return connection.createReceiveLink(linkName, this.entityPath, this.receiverOptions.getReceiveMode(), null, this.entityType, this.receiverOptions.getSessionId());
            }
            return connection.createReceiveLink(linkName, this.entityPath, this.receiverOptions.getReceiveMode(), null, this.entityType);
        }).doOnNext(next -> {
            String format = "Created consumer for Service Bus resource: [{}] mode: [{}] sessionEnabled? {} transferEntityPath: [{}], entityType: [{}]";
            this.logger.verbose("Created consumer for Service Bus resource: [{}] mode: [{}] sessionEnabled? {} transferEntityPath: [{}], entityType: [{}]", new Object[]{next.getEntityPath(), this.receiverOptions.getReceiveMode(), CoreUtils.isNullOrEmpty((CharSequence)this.receiverOptions.getSessionId()), "N/A", this.entityType});
        }).repeat();
        LinkErrorContext context = new LinkErrorContext(this.fullyQualifiedNamespace, this.entityPath, linkName, null);
        AmqpRetryPolicy retryPolicy = RetryUtil.getRetryPolicy((AmqpRetryOptions)this.connectionProcessor.getRetryOptions());
        ServiceBusReceiveLinkProcessor linkMessageProcessor = (ServiceBusReceiveLinkProcessor)receiveLink.subscribeWith((Subscriber)new ServiceBusReceiveLinkProcessor(this.receiverOptions.getPrefetchCount(), retryPolicy, (Disposable)this.connectionProcessor, (AmqpErrorContext)context));
        ServiceBusAsyncConsumer newConsumer = new ServiceBusAsyncConsumer(linkName, linkMessageProcessor, this.messageSerializer, false, this.receiverOptions.autoLockRenewalEnabled(), this.receiverOptions.getMaxAutoLockRenewalDuration(), this.connectionProcessor.getRetryOptions(), (token, associatedLinkName) -> this.renewMessageLock((MessageLockToken)token, (String)associatedLinkName));
        if (this.consumer.compareAndSet(null, newConsumer)) {
            return newConsumer;
        }
        newConsumer.close();
        return this.consumer.get();
    }

    ReceiverOptions getReceiverOptions() {
        return this.receiverOptions;
    }

    private Mono<Instant> renewMessageLock(MessageLockToken lockToken, String linkName) {
        return this.connectionProcessor.flatMap(connection -> connection.getManagementNode(this.entityPath, this.entityType)).flatMap(serviceBusManagementNode -> serviceBusManagementNode.renewMessageLock(lockToken.getLockToken(), linkName)).map(instant -> {
            if (lockToken instanceof ServiceBusReceivedMessage) {
                ((ServiceBusReceivedMessage)lockToken).setLockedUntil((Instant)instant);
            }
            return instant;
        });
    }

    private String getLinkName(String sessionId) {
        if (this.unnamedSessionManager != null && !CoreUtils.isNullOrEmpty((CharSequence)sessionId)) {
            return this.unnamedSessionManager.getLinkName(sessionId);
        }
        if (!CoreUtils.isNullOrEmpty((CharSequence)sessionId) && !this.receiverOptions.isSessionReceiver()) {
            return null;
        }
        ServiceBusAsyncConsumer existing = this.consumer.get();
        return existing != null ? existing.getLinkName() : null;
    }
}

