/*
 * Decompiled with CFR 0.152.
 */
package com.azure.messaging.servicebus;

import com.azure.core.amqp.AmqpEndpointState;
import com.azure.core.amqp.exception.AmqpErrorCondition;
import com.azure.core.amqp.exception.AmqpErrorContext;
import com.azure.core.amqp.exception.AmqpException;
import com.azure.core.amqp.exception.SessionErrorContext;
import com.azure.core.amqp.implementation.MessageSerializer;
import com.azure.core.amqp.implementation.StringUtil;
import com.azure.core.amqp.implementation.TracerProvider;
import com.azure.core.util.FluxUtil;
import com.azure.core.util.logging.ClientLogger;
import com.azure.messaging.servicebus.MessageLockToken;
import com.azure.messaging.servicebus.ReceiverOptions;
import com.azure.messaging.servicebus.ServiceBusReceivedMessageContext;
import com.azure.messaging.servicebus.ServiceBusTransactionContext;
import com.azure.messaging.servicebus.UnnamedSessionReceiver;
import com.azure.messaging.servicebus.implementation.DispositionStatus;
import com.azure.messaging.servicebus.implementation.MessageUtils;
import com.azure.messaging.servicebus.implementation.Messages;
import com.azure.messaging.servicebus.implementation.MessagingEntityType;
import com.azure.messaging.servicebus.implementation.ServiceBusConnectionProcessor;
import com.azure.messaging.servicebus.implementation.ServiceBusManagementNode;
import com.azure.messaging.servicebus.implementation.ServiceBusReceiveLink;
import java.time.Duration;
import java.time.Instant;
import java.util.Collections;
import java.util.Deque;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.apache.qpid.proton.amqp.transport.DeliveryState;
import org.reactivestreams.Publisher;
import reactor.core.publisher.EmitterProcessor;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;
import reactor.util.retry.Retry;

class UnnamedSessionManager
implements AutoCloseable {
    private static final Duration SLEEP_DURATION_ON_ACCEPT_SESSION_EXCEPTION = Duration.ofMinutes(1L);
    private final ClientLogger logger = new ClientLogger(UnnamedSessionManager.class);
    private final String entityPath;
    private final MessagingEntityType entityType;
    private final ReceiverOptions receiverOptions;
    private final ServiceBusConnectionProcessor connectionProcessor;
    private final Duration operationTimeout;
    private final TracerProvider tracerProvider;
    private final MessageSerializer messageSerializer;
    private final AtomicBoolean isDisposed = new AtomicBoolean();
    private final AtomicBoolean isStarted = new AtomicBoolean();
    private final List<Scheduler> schedulers;
    private final Deque<Scheduler> availableSchedulers = new ConcurrentLinkedDeque<Scheduler>();
    private final ConcurrentHashMap<String, UnnamedSessionReceiver> sessionReceivers = new ConcurrentHashMap();
    private final EmitterProcessor<Flux<ServiceBusReceivedMessageContext>> processor;
    private final FluxSink<Flux<ServiceBusReceivedMessageContext>> sessionReceiveSink;
    private volatile Flux<ServiceBusReceivedMessageContext> receiveFlux;

    UnnamedSessionManager(String entityPath, MessagingEntityType entityType, ServiceBusConnectionProcessor connectionProcessor, Duration operationTimeout, TracerProvider tracerProvider, MessageSerializer messageSerializer, ReceiverOptions receiverOptions) {
        this.entityPath = entityPath;
        this.entityType = entityType;
        this.receiverOptions = receiverOptions;
        this.connectionProcessor = connectionProcessor;
        this.operationTimeout = operationTimeout;
        this.tracerProvider = tracerProvider;
        this.messageSerializer = messageSerializer;
        int numberOfSchedulers = receiverOptions.isRollingSessionReceiver() ? receiverOptions.getMaxConcurrentSessions() : 1;
        List schedulerList = IntStream.range(0, numberOfSchedulers).mapToObj(index -> Schedulers.newBoundedElastic((int)Schedulers.DEFAULT_BOUNDED_ELASTIC_SIZE, (int)Schedulers.DEFAULT_BOUNDED_ELASTIC_QUEUESIZE, (String)("receiver-" + index))).collect(Collectors.toList());
        this.schedulers = Collections.unmodifiableList(schedulerList);
        this.availableSchedulers.addAll(this.schedulers);
        this.processor = EmitterProcessor.create((int)numberOfSchedulers, (boolean)false);
        this.sessionReceiveSink = this.processor.sink();
    }

    String getLinkName(String sessionId) {
        UnnamedSessionReceiver receiver = this.sessionReceivers.get(sessionId);
        return receiver != null ? receiver.getLinkName() : null;
    }

    Mono<byte[]> getSessionState(String sessionId) {
        return this.validateParameter(sessionId, "sessionId", "getSessionState").then(this.getManagementNode().flatMap(channel -> {
            UnnamedSessionReceiver receiver = this.sessionReceivers.get(sessionId);
            String associatedLinkName = receiver != null ? receiver.getLinkName() : null;
            return channel.getSessionState(sessionId, associatedLinkName);
        }));
    }

    Flux<ServiceBusReceivedMessageContext> receive() {
        if (!this.isStarted.getAndSet(true)) {
            this.sessionReceiveSink.onRequest(this::onSessionRequest);
            this.receiveFlux = !this.receiverOptions.isRollingSessionReceiver() ? this.getSession(this.schedulers.get(0), false) : Flux.merge(this.processor, (int)this.receiverOptions.getMaxConcurrentSessions());
        }
        return this.receiveFlux;
    }

    Mono<Instant> renewSessionLock(String sessionId) {
        return this.validateParameter(sessionId, "sessionId", "renewSessionLock").then(this.getManagementNode().flatMap(channel -> {
            UnnamedSessionReceiver receiver = this.sessionReceivers.get(sessionId);
            String associatedLinkName = receiver != null ? receiver.getLinkName() : null;
            return channel.renewSessionLock(sessionId, associatedLinkName).handle((instant, sink) -> {
                if (receiver != null) {
                    receiver.setSessionLockedUntil((Instant)instant);
                }
                sink.next(instant);
            });
        }));
    }

    Mono<Boolean> updateDisposition(MessageLockToken lockToken, String sessionId, DispositionStatus dispositionStatus, Map<String, Object> propertiesToModify, String deadLetterReason, String deadLetterDescription, ServiceBusTransactionContext transactionContext) {
        String operation = "updateDisposition";
        return Mono.when((Publisher[])new Publisher[]{this.validateParameter(lockToken, "lockToken", "updateDisposition"), this.validateParameter(lockToken.getLockToken(), "lockToken.getLockToken()", "updateDisposition"), this.validateParameter(sessionId, "'sessionId'", "updateDisposition")}).then(Mono.defer(() -> {
            String lock = lockToken.getLockToken();
            UnnamedSessionReceiver receiver = this.sessionReceivers.get(sessionId);
            if (receiver == null || !receiver.containsLockToken(lock)) {
                return Mono.just((Object)false);
            }
            DeliveryState deliveryState = MessageUtils.getDeliveryState(dispositionStatus, deadLetterReason, deadLetterDescription, propertiesToModify, transactionContext);
            return receiver.updateDisposition(lock, deliveryState).thenReturn((Object)true);
        }));
    }

    @Override
    public void close() {
        if (this.isDisposed.getAndSet(true)) {
            return;
        }
        for (Scheduler scheduler : this.schedulers) {
            scheduler.dispose();
        }
        this.sessionReceivers.values().forEach(receiver -> receiver.close());
        this.sessionReceiveSink.complete();
    }

    private AmqpErrorContext getErrorContext() {
        return new SessionErrorContext(this.connectionProcessor.getFullyQualifiedNamespace(), this.entityPath);
    }

    private Mono<ServiceBusReceiveLink> createSessionReceiveLink() {
        String linkName = StringUtil.getRandomString((String)"session-");
        return this.connectionProcessor.flatMap(connection -> connection.createReceiveLink(linkName, this.entityPath, this.receiverOptions.getReceiveMode(), null, this.entityType, null));
    }

    private Mono<ServiceBusReceiveLink> getActiveLink() {
        return Mono.defer(() -> this.createSessionReceiveLink().flatMap(link -> link.getEndpointStates().takeUntil(e -> e == AmqpEndpointState.ACTIVE).timeout(this.operationTimeout).then(Mono.just((Object)link)))).retryWhen(Retry.from(retrySignals -> retrySignals.flatMap(signal -> {
            Throwable failure = signal.failure();
            this.logger.info("entityPath[{}] attempt[{}]. Error occurred while getting unnamed session.", new Object[]{this.entityPath, signal.totalRetriesInARow(), failure});
            if (this.isDisposed.get()) {
                return Mono.error((Throwable)new AmqpException(false, "SessionManager is already disposed.", failure, this.getErrorContext()));
            }
            if (failure instanceof TimeoutException) {
                return Mono.delay((Duration)SLEEP_DURATION_ON_ACCEPT_SESSION_EXCEPTION);
            }
            if (failure instanceof AmqpException && ((AmqpException)failure).getErrorCondition() == AmqpErrorCondition.TIMEOUT_ERROR) {
                return Mono.delay((Duration)SLEEP_DURATION_ON_ACCEPT_SESSION_EXCEPTION);
            }
            return Mono.error((Throwable)failure);
        })));
    }

    private Flux<ServiceBusReceivedMessageContext> getSession(Scheduler scheduler, boolean disposeOnIdle) {
        return this.getActiveLink().flatMap(link -> link.getSessionId().map(linkName -> this.sessionReceivers.compute((String)linkName, (key, existing) -> {
            if (existing != null) {
                return existing;
            }
            return new UnnamedSessionReceiver((ServiceBusReceiveLink)link, this.messageSerializer, this.connectionProcessor.getRetryOptions(), this.receiverOptions.getPrefetchCount(), disposeOnIdle, scheduler, this.receiverOptions.autoLockRenewalEnabled(), this.receiverOptions.getMaxAutoLockRenewalDuration(), this::renewSessionLock);
        }))).flatMapMany(session -> session.receive().doFinally(signalType -> {
            this.logger.verbose("Adding scheduler back to pool.");
            this.availableSchedulers.push(scheduler);
            this.onSessionRequest(1L);
        })).publishOn(scheduler);
    }

    private Mono<ServiceBusManagementNode> getManagementNode() {
        return this.connectionProcessor.flatMap(connection -> connection.getManagementNode(this.entityPath, this.entityType));
    }

    private void onSessionRequest(long request) {
        if (this.isDisposed.get()) {
            this.logger.info("Session manager is disposed. Not emitting more unnamed sessions.");
            return;
        }
        this.logger.verbose("Requested {} unnamed sessions.", new Object[]{request});
        int i = 0;
        while ((long)i < request) {
            Scheduler scheduler = this.availableSchedulers.poll();
            if (scheduler == null) {
                if (request != Long.MAX_VALUE) {
                    this.logger.verbose("request[{}]: There are no available schedulers to fetch.", new Object[]{request});
                }
                return;
            }
            Flux<ServiceBusReceivedMessageContext> session = this.getSession(scheduler, true);
            this.sessionReceiveSink.next(session);
            ++i;
        }
    }

    private <T> Mono<Void> validateParameter(T parameter, String parameterName, String operation) {
        if (this.isDisposed.get()) {
            return FluxUtil.monoError((ClientLogger)this.logger, (RuntimeException)new IllegalStateException(String.format(Messages.INVALID_OPERATION_DISPOSED_RECEIVER, operation)));
        }
        if (parameter == null) {
            return FluxUtil.monoError((ClientLogger)this.logger, (RuntimeException)new NullPointerException(String.format("'%s' cannot be null.", parameterName)));
        }
        if (parameter instanceof String && ((String)parameter).isEmpty()) {
            return FluxUtil.monoError((ClientLogger)this.logger, (RuntimeException)new IllegalArgumentException(String.format("'%s' cannot be an empty string.", parameterName)));
        }
        return Mono.empty();
    }
}

