/*
 * Decompiled with CFR 0.152.
 */
package com.azure.messaging.servicebus;

import com.azure.core.amqp.exception.AmqpErrorCondition;
import com.azure.core.amqp.exception.AmqpException;
import com.azure.core.amqp.implementation.StringUtil;
import com.azure.core.util.logging.ClientLogger;
import com.azure.messaging.servicebus.ConnectionCacheWrapper;
import com.azure.messaging.servicebus.LockRenewalOperation;
import com.azure.messaging.servicebus.implementation.MessagingEntityType;
import com.azure.messaging.servicebus.implementation.ServiceBusManagementNode;
import com.azure.messaging.servicebus.implementation.ServiceBusReceiveLink;
import com.azure.messaging.servicebus.implementation.instrumentation.ServiceBusTracer;
import com.azure.messaging.servicebus.models.ServiceBusReceiveMode;
import java.time.Duration;
import java.time.OffsetDateTime;
import java.util.Objects;
import java.util.concurrent.TimeoutException;
import java.util.function.Function;
import reactor.core.Disposable;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;
import reactor.util.retry.Retry;

final class ServiceBusSessionAcquirer {
    private static final String TRACKING_ID_KEY = "trackingId";
    private final ClientLogger logger;
    private final String identifier;
    private final String entityPath;
    private final MessagingEntityType entityType;
    private final Duration sessionActiveTimeout;
    private final ServiceBusReceiveMode receiveMode;
    private final ConnectionCacheWrapper connectionCacheWrapper;
    private final Mono<ServiceBusManagementNode> sessionManagement;

    ServiceBusSessionAcquirer(ClientLogger logger, String identifier, String entityPath, MessagingEntityType entityType, ServiceBusReceiveMode receiveMode, Duration sessionActiveTimeout, ConnectionCacheWrapper connectionCacheWrapper) {
        assert (connectionCacheWrapper.isV2());
        this.logger = logger;
        this.identifier = identifier;
        this.entityPath = entityPath;
        this.entityType = entityType;
        this.sessionActiveTimeout = sessionActiveTimeout;
        this.receiveMode = receiveMode;
        this.connectionCacheWrapper = connectionCacheWrapper;
        this.sessionManagement = connectionCacheWrapper.getConnection().flatMap(connection -> connection.getManagementNode(entityPath, entityType));
    }

    boolean isConnectionClosed() {
        return this.connectionCacheWrapper.isChannelClosed();
    }

    Mono<Session> acquire() {
        return this.acquireIntern(null);
    }

    Mono<Session> acquire(String sessionId) {
        Objects.requireNonNull(sessionId, "sessionId cannot be null.");
        return this.acquireIntern(sessionId);
    }

    private Mono<Session> acquireIntern(String sessionId) {
        return Mono.defer(() -> this.createSessionReceiveLink(sessionId).flatMap(sessionLink -> sessionLink.getSessionProperties().flatMap(sessionProperties -> Mono.just((Object)new Session((ServiceBusReceiveLink)sessionLink, (ServiceBusReceiveLink.SessionProperties)sessionProperties, this.sessionManagement))))).timeout(this.sessionActiveTimeout).retryWhen(Retry.from(retrySignals -> retrySignals.flatMap(signal -> {
            Throwable failure = signal.failure();
            this.logger.atInfo().addKeyValue("entityPath", this.entityPath).addKeyValue("attempt", signal.totalRetriesInARow()).log(sessionId == null ? "Error occurred while getting unnamed session." : "Error occurred while getting session " + sessionId, new Object[]{failure});
            if (failure instanceof TimeoutException) {
                return Mono.delay((Duration)Duration.ZERO);
            }
            if (failure instanceof AmqpException && ((AmqpException)failure).getErrorCondition() == AmqpErrorCondition.TIMEOUT_ERROR) {
                return Mono.delay((Duration)Duration.ZERO);
            }
            long id = System.nanoTime();
            this.logger.atInfo().addKeyValue(TRACKING_ID_KEY, id).log("Unable to acquire a session.", new Object[]{failure});
            return Mono.error((Throwable)failure).publishOn(Schedulers.boundedElastic()).doOnError(e -> this.logger.atInfo().addKeyValue(TRACKING_ID_KEY, id).log("Emitting the error signal received for session acquire attempt.", new Object[]{e}));
        })));
    }

    private Mono<ServiceBusReceiveLink> createSessionReceiveLink(String sessionId) {
        String linkName = sessionId != null ? sessionId : StringUtil.getRandomString((String)"session-");
        return this.connectionCacheWrapper.getConnection().flatMap(connection -> connection.createReceiveLink(linkName, this.entityPath, this.receiveMode, null, this.entityType, this.identifier, sessionId));
    }

    static final class Session {
        private final ServiceBusReceiveLink link;
        private final ServiceBusReceiveLink.SessionProperties properties;
        private final Mono<ServiceBusManagementNode> sessionManagement;

        Session(ServiceBusReceiveLink sessionLink, ServiceBusReceiveLink.SessionProperties sessionProperties, Mono<ServiceBusManagementNode> sessionManagement) {
            this.link = Objects.requireNonNull(sessionLink, "sessionLink cannot be null.");
            this.properties = Objects.requireNonNull(sessionProperties, "sessionProperties cannot be null.");
            this.sessionManagement = Objects.requireNonNull(sessionManagement, "sessionManagement cannot be null.");
        }

        String getId() {
            return this.properties.getId();
        }

        ServiceBusReceiveLink getLink() {
            return this.link;
        }

        Disposable beginLockRenew(ServiceBusTracer tracer, Duration maxSessionLockRenew) {
            String sessionId = this.properties.getId();
            Function<String, Mono<OffsetDateTime>> lockRenewFunc = __ -> this.sessionManagement.flatMap(mgmt -> {
                Mono<OffsetDateTime> renewLock = mgmt.renewSessionLock(sessionId, this.link.getLinkName());
                return tracer.traceMono("ServiceBus.renewSessionLock", renewLock);
            });
            OffsetDateTime initialLockedUntil = this.properties.getLockedUntil();
            LockRenewalOperation recurringLockRenew = new LockRenewalOperation(sessionId, maxSessionLockRenew, true, lockRenewFunc, initialLockedUntil);
            return recurringLockRenew;
        }
    }
}

