/*
 * Decompiled with CFR 0.152.
 */
package com.azure.messaging.servicebus;

import com.azure.core.amqp.AmqpEndpointState;
import com.azure.core.amqp.exception.AmqpErrorCondition;
import com.azure.core.amqp.exception.AmqpErrorContext;
import com.azure.core.amqp.exception.AmqpException;
import com.azure.core.amqp.exception.SessionErrorContext;
import com.azure.core.amqp.implementation.MessageSerializer;
import com.azure.core.amqp.implementation.StringUtil;
import com.azure.core.util.FluxUtil;
import com.azure.core.util.logging.ClientLogger;
import com.azure.messaging.servicebus.ReceiverOptions;
import com.azure.messaging.servicebus.ServiceBusMessageContext;
import com.azure.messaging.servicebus.ServiceBusSessionReceiver;
import com.azure.messaging.servicebus.ServiceBusTransactionContext;
import com.azure.messaging.servicebus.implementation.DispositionStatus;
import com.azure.messaging.servicebus.implementation.MessageUtils;
import com.azure.messaging.servicebus.implementation.Messages;
import com.azure.messaging.servicebus.implementation.MessagingEntityType;
import com.azure.messaging.servicebus.implementation.ServiceBusConnectionProcessor;
import com.azure.messaging.servicebus.implementation.ServiceBusManagementNode;
import com.azure.messaging.servicebus.implementation.ServiceBusReceiveLink;
import java.time.Duration;
import java.time.OffsetDateTime;
import java.util.Collections;
import java.util.Deque;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.apache.qpid.proton.amqp.transport.DeliveryState;
import org.reactivestreams.Publisher;
import reactor.core.publisher.EmitterProcessor;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;
import reactor.util.retry.Retry;

class ServiceBusSessionManager
implements AutoCloseable {
    private static final String TRACKING_ID_KEY = "trackingId";
    private static final ClientLogger LOGGER = new ClientLogger(ServiceBusSessionManager.class);
    private final String entityPath;
    private final MessagingEntityType entityType;
    private final ReceiverOptions receiverOptions;
    private final ServiceBusReceiveLink receiveLink;
    private final ServiceBusConnectionProcessor connectionProcessor;
    private final Duration operationTimeout;
    private final MessageSerializer messageSerializer;
    private final String identifier;
    private final AtomicBoolean isDisposed = new AtomicBoolean();
    private final AtomicBoolean isStarted = new AtomicBoolean();
    private final List<Scheduler> schedulers;
    private final Deque<Scheduler> availableSchedulers = new ConcurrentLinkedDeque<Scheduler>();
    private final Duration maxSessionLockRenewDuration;
    private final ConcurrentHashMap<String, ServiceBusSessionReceiver> sessionReceivers = new ConcurrentHashMap();
    private final EmitterProcessor<Flux<ServiceBusMessageContext>> processor;
    private final FluxSink<Flux<ServiceBusMessageContext>> sessionReceiveSink;
    private volatile Flux<ServiceBusMessageContext> receiveFlux;

    ServiceBusSessionManager(String entityPath, MessagingEntityType entityType, ServiceBusConnectionProcessor connectionProcessor, MessageSerializer messageSerializer, ReceiverOptions receiverOptions, ServiceBusReceiveLink receiveLink, String identifier) {
        this.entityPath = entityPath;
        this.entityType = entityType;
        this.receiverOptions = receiverOptions;
        this.connectionProcessor = connectionProcessor;
        this.operationTimeout = connectionProcessor.getRetryOptions().getTryTimeout();
        this.messageSerializer = messageSerializer;
        this.maxSessionLockRenewDuration = receiverOptions.getMaxLockRenewDuration();
        this.identifier = identifier;
        int numberOfSchedulers = receiverOptions.isRollingSessionReceiver() ? receiverOptions.getMaxConcurrentSessions() : 1;
        List schedulerList = IntStream.range(0, numberOfSchedulers).mapToObj(index -> Schedulers.newBoundedElastic((int)Schedulers.DEFAULT_BOUNDED_ELASTIC_SIZE, (int)Schedulers.DEFAULT_BOUNDED_ELASTIC_QUEUESIZE, (String)("receiver-" + index))).collect(Collectors.toList());
        this.schedulers = Collections.unmodifiableList(schedulerList);
        this.availableSchedulers.addAll(this.schedulers);
        this.processor = EmitterProcessor.create((int)numberOfSchedulers, (boolean)false);
        this.sessionReceiveSink = this.processor.sink();
        this.receiveLink = receiveLink;
    }

    ServiceBusSessionManager(String entityPath, MessagingEntityType entityType, ServiceBusConnectionProcessor connectionProcessor, MessageSerializer messageSerializer, ReceiverOptions receiverOptions, String identifier) {
        this(entityPath, entityType, connectionProcessor, messageSerializer, receiverOptions, null, identifier);
    }

    String getLinkName(String sessionId) {
        ServiceBusSessionReceiver receiver = this.sessionReceivers.get(sessionId);
        return receiver != null ? receiver.getLinkName() : null;
    }

    public String getIdentifier() {
        return this.identifier;
    }

    Mono<byte[]> getSessionState(String sessionId) {
        return this.validateParameter(sessionId, "sessionId", "getSessionState").then(this.getManagementNode().flatMap(channel -> {
            ServiceBusSessionReceiver receiver = this.sessionReceivers.get(sessionId);
            String associatedLinkName = receiver != null ? receiver.getLinkName() : null;
            return channel.getSessionState(sessionId, associatedLinkName);
        }));
    }

    Flux<ServiceBusMessageContext> receive() {
        if (!this.isStarted.getAndSet(true)) {
            this.sessionReceiveSink.onRequest(this::onSessionRequest);
            this.receiveFlux = !this.receiverOptions.isRollingSessionReceiver() ? this.getSession(this.schedulers.get(0), false) : Flux.merge(this.processor, (int)this.receiverOptions.getMaxConcurrentSessions());
        }
        return this.receiveFlux;
    }

    Mono<OffsetDateTime> renewSessionLock(String sessionId) {
        return this.validateParameter(sessionId, "sessionId", "renewSessionLock").then(this.getManagementNode().flatMap(channel -> {
            ServiceBusSessionReceiver receiver = this.sessionReceivers.get(sessionId);
            String associatedLinkName = receiver != null ? receiver.getLinkName() : null;
            return channel.renewSessionLock(sessionId, associatedLinkName).handle((offsetDateTime, sink) -> {
                if (receiver != null) {
                    receiver.setSessionLockedUntil((OffsetDateTime)offsetDateTime);
                }
                sink.next(offsetDateTime);
            });
        }));
    }

    Mono<Boolean> updateDisposition(String lockToken, String sessionId, DispositionStatus dispositionStatus, Map<String, Object> propertiesToModify, String deadLetterReason, String deadLetterDescription, ServiceBusTransactionContext transactionContext) {
        String operation = "updateDisposition";
        return Mono.when((Publisher[])new Publisher[]{this.validateParameter(lockToken, "lockToken", "updateDisposition"), this.validateParameter(lockToken, "lockToken", "updateDisposition"), this.validateParameter(sessionId, "'sessionId'", "updateDisposition")}).then(Mono.defer(() -> {
            ServiceBusSessionReceiver receiver = this.sessionReceivers.get(sessionId);
            if (receiver == null || !receiver.containsLockToken(lockToken)) {
                return Mono.just((Object)false);
            }
            DeliveryState deliveryState = MessageUtils.getDeliveryState(dispositionStatus, deadLetterReason, deadLetterDescription, propertiesToModify, transactionContext);
            return receiver.updateDisposition(lockToken, deliveryState).thenReturn((Object)true);
        }));
    }

    @Override
    public void close() {
        if (this.isDisposed.getAndSet(true)) {
            return;
        }
        List closeables = this.sessionReceivers.values().stream().map(receiver -> receiver.closeAsync()).collect(Collectors.toList());
        Mono.when(closeables).block(this.operationTimeout);
        this.sessionReceiveSink.complete();
        for (Scheduler scheduler : this.schedulers) {
            scheduler.dispose();
        }
    }

    private AmqpErrorContext getErrorContext() {
        return new SessionErrorContext(this.connectionProcessor.getFullyQualifiedNamespace(), this.entityPath);
    }

    private Mono<ServiceBusReceiveLink> createSessionReceiveLink() {
        String sessionId = this.receiverOptions.getSessionId();
        String linkName = sessionId != null ? sessionId : StringUtil.getRandomString((String)"session-");
        return this.connectionProcessor.flatMap(connection -> connection.createReceiveLink(linkName, this.entityPath, this.receiverOptions.getReceiveMode(), null, this.entityType, this.identifier, sessionId));
    }

    Mono<ServiceBusReceiveLink> getActiveLink() {
        if (this.receiveLink != null) {
            return Mono.just((Object)this.receiveLink);
        }
        return Mono.defer(() -> this.createSessionReceiveLink().flatMap(link -> link.getEndpointStates().filter(e -> e == AmqpEndpointState.ACTIVE).next().switchIfEmpty(Mono.error(() -> new AmqpException(true, "Session receive link completed without being active", null))).timeout(this.operationTimeout).then(Mono.just((Object)link)))).retryWhen(Retry.from(retrySignals -> retrySignals.flatMap(signal -> {
            Throwable failure = signal.failure();
            LOGGER.atInfo().addKeyValue("entityPath", this.entityPath).addKeyValue("attempt", signal.totalRetriesInARow()).log("Error occurred while getting unnamed session.", new Object[]{failure});
            if (this.isDisposed.get()) {
                return Mono.error((Throwable)new AmqpException(false, "SessionManager is already disposed.", failure, this.getErrorContext()));
            }
            if (failure instanceof TimeoutException) {
                return Mono.empty();
            }
            if (failure instanceof AmqpException && ((AmqpException)failure).getErrorCondition() == AmqpErrorCondition.TIMEOUT_ERROR) {
                return Mono.empty();
            }
            long id = System.nanoTime();
            LOGGER.atInfo().addKeyValue(TRACKING_ID_KEY, id).log("Unable to acquire new session.", new Object[]{failure});
            return Mono.error((Throwable)failure).publishOn(Schedulers.boundedElastic()).doOnError(e -> LOGGER.atInfo().addKeyValue(TRACKING_ID_KEY, id).log("Emitting the error signal received for session acquire attempt.", new Object[]{e}));
        })));
    }

    private Flux<ServiceBusMessageContext> getSession(Scheduler scheduler, boolean disposeOnIdle) {
        return this.getActiveLink().flatMap(link -> link.getSessionId().map(sessionId -> this.sessionReceivers.compute((String)sessionId, (key, existing) -> {
            if (existing != null) {
                return existing;
            }
            return new ServiceBusSessionReceiver((ServiceBusReceiveLink)link, this.messageSerializer, this.connectionProcessor.getRetryOptions(), this.receiverOptions.getPrefetchCount(), disposeOnIdle, scheduler, this::renewSessionLock, this.maxSessionLockRenewDuration);
        }))).flatMapMany(sessionReceiver -> sessionReceiver.receive().doFinally(signalType -> {
            LOGGER.atVerbose().addKeyValue("sessionId", sessionReceiver.getSessionId()).log("Closing session receiver.");
            this.availableSchedulers.push(scheduler);
            this.sessionReceivers.remove(sessionReceiver.getSessionId());
            sessionReceiver.closeAsync().subscribe();
            if (this.receiverOptions.isRollingSessionReceiver()) {
                this.onSessionRequest(1L);
            }
        }));
    }

    private Mono<ServiceBusManagementNode> getManagementNode() {
        return this.connectionProcessor.flatMap(connection -> connection.getManagementNode(this.entityPath, this.entityType));
    }

    private void onSessionRequest(long request) {
        if (this.isDisposed.get()) {
            LOGGER.info("Session manager is disposed. Not emitting more unnamed sessions.");
            return;
        }
        LOGGER.atVerbose().addKeyValue("requested", request).log("Requested unnamed sessions.");
        int i = 0;
        while ((long)i < request) {
            Scheduler scheduler = this.availableSchedulers.poll();
            if (scheduler == null) {
                if (request != Long.MAX_VALUE) {
                    LOGGER.atVerbose().addKeyValue("requested", request).log("There are no available schedulers to fetch.");
                }
                return;
            }
            Flux<ServiceBusMessageContext> session = this.getSession(scheduler, true);
            this.sessionReceiveSink.next(session);
            ++i;
        }
    }

    private <T> Mono<Void> validateParameter(T parameter, String parameterName, String operation) {
        if (this.isDisposed.get()) {
            return FluxUtil.monoError((ClientLogger)LOGGER, (RuntimeException)new IllegalStateException(String.format(Messages.INVALID_OPERATION_DISPOSED_RECEIVER, operation)));
        }
        if (parameter == null) {
            return FluxUtil.monoError((ClientLogger)LOGGER, (RuntimeException)new NullPointerException(String.format("'%s' cannot be null.", parameterName)));
        }
        if (parameter instanceof String && ((String)parameter).isEmpty()) {
            return FluxUtil.monoError((ClientLogger)LOGGER, (RuntimeException)new IllegalArgumentException(String.format("'%s' cannot be an empty string.", parameterName)));
        }
        return Mono.empty();
    }
}

