/*
 * Decompiled with CFR 0.152.
 */
package com.azure.messaging.servicebus;

import com.azure.core.amqp.AmqpRetryOptions;
import com.azure.core.amqp.exception.AmqpErrorContext;
import com.azure.core.amqp.exception.LinkErrorContext;
import com.azure.core.amqp.implementation.MessageSerializer;
import com.azure.core.util.CoreUtils;
import com.azure.core.util.logging.ClientLogger;
import com.azure.messaging.servicebus.ServiceBusReceivedMessage;
import com.azure.messaging.servicebus.ServiceBusReceivedMessageContext;
import com.azure.messaging.servicebus.implementation.MessageLockContainer;
import com.azure.messaging.servicebus.implementation.MessageManagementOperations;
import com.azure.messaging.servicebus.implementation.MessageUtils;
import com.azure.messaging.servicebus.implementation.ServiceBusConstants;
import com.azure.messaging.servicebus.implementation.ServiceBusMessageProcessor;
import com.azure.messaging.servicebus.implementation.ServiceBusReceiveLink;
import java.time.Duration;
import java.time.Instant;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import org.apache.qpid.proton.amqp.transport.DeliveryState;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import reactor.core.Disposable;
import reactor.core.Disposables;
import reactor.core.publisher.DirectProcessor;
import reactor.core.publisher.EmitterProcessor;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;
import reactor.core.publisher.Mono;
import reactor.core.publisher.MonoProcessor;
import reactor.core.scheduler.Scheduler;

class UnnamedSessionReceiver
implements AutoCloseable {
    private final AtomicBoolean isDisposed = new AtomicBoolean();
    private final MessageLockContainer lockContainer;
    private final AtomicReference<Instant> sessionLockedUntil = new AtomicReference();
    private final AtomicReference<String> sessionId = new AtomicReference();
    private final ClientLogger logger = new ClientLogger(UnnamedSessionReceiver.class);
    private final ServiceBusReceiveLink receiveLink;
    private final boolean enableSessionLockRenewal;
    private final Duration maxSessionLockRenewDuration;
    private final Function<String, Mono<Instant>> renewSessionLock;
    private final Disposable.Composite subscriptions;
    private final Flux<ServiceBusReceivedMessageContext> receivedMessages;
    private final MonoProcessor<ServiceBusReceivedMessageContext> cancelReceiveProcessor = MonoProcessor.create();
    private final DirectProcessor<String> messageReceivedEmitter = DirectProcessor.create();
    private final FluxSink<String> messageReceivedSink = this.messageReceivedEmitter.sink(FluxSink.OverflowStrategy.BUFFER);

    UnnamedSessionReceiver(ServiceBusReceiveLink receiveLink, MessageSerializer messageSerializer, AmqpRetryOptions retryOptions, int prefetch, boolean disposeOnIdle, Scheduler scheduler, boolean enableSessionLockRenewal, Duration maxSessionLockRenewDuration, Function<String, Mono<Instant>> renewSessionLock) {
        this.receiveLink = receiveLink;
        this.enableSessionLockRenewal = enableSessionLockRenewal;
        this.maxSessionLockRenewDuration = maxSessionLockRenewDuration;
        this.renewSessionLock = renewSessionLock;
        this.lockContainer = new MessageLockContainer(ServiceBusConstants.OPERATION_TIMEOUT);
        LinkErrorContext errorContext = new LinkErrorContext(receiveLink.getHostname(), receiveLink.getEntityPath(), null, null);
        SessionMessageManagement messageManagement = new SessionMessageManagement(receiveLink);
        receiveLink.setEmptyCreditListener(() -> 1);
        Flux receivedMessagesFlux = ((ServiceBusMessageProcessor)receiveLink.receive().publishOn(scheduler).doOnSubscribe(subscription -> {
            this.logger.verbose("Adding prefetch to receive link.");
            receiveLink.addCredits(prefetch);
        }).takeUntilOther(this.cancelReceiveProcessor).map(message -> (ServiceBusReceivedMessage)messageSerializer.deserialize(message, ServiceBusReceivedMessage.class)).subscribeWith((Subscriber)new ServiceBusMessageProcessor(receiveLink.getLinkName(), false, false, Duration.ZERO, retryOptions, (AmqpErrorContext)errorContext, messageManagement))).map(message -> {
            if (!CoreUtils.isNullOrEmpty((CharSequence)message.getLockToken())) {
                this.lockContainer.addOrUpdate(message.getLockToken(), message.getLockedUntil());
            }
            return new ServiceBusReceivedMessageContext((ServiceBusReceivedMessage)message);
        }).onErrorResume(error -> {
            this.logger.warning("sessionId[{}]. Error occurred. Ending session.", new Object[]{this.sessionId, error});
            return Mono.just((Object)new ServiceBusReceivedMessageContext(this.getSessionId(), (Throwable)error));
        }).doOnNext(context -> {
            if (context.hasError()) {
                return;
            }
            String token = CoreUtils.isNullOrEmpty((CharSequence)context.getMessage().getLockToken()) ? context.getMessage().getLockToken() : "";
            this.messageReceivedSink.next((Object)token);
        });
        this.receivedMessages = Flux.concat((Publisher[])new Publisher[]{receivedMessagesFlux, this.cancelReceiveProcessor});
        this.subscriptions = Disposables.composite();
        if (disposeOnIdle) {
            this.subscriptions.add(Flux.switchOnNext((Publisher)this.messageReceivedEmitter.flatMap(lockToken -> Mono.delay((Duration)retryOptions.getTryTimeout())).handle((l, sink) -> {
                this.logger.info("entityPath[{}]. sessionId[{}]. Did not a receive message within timeout {}.", new Object[]{receiveLink.getEntityPath(), this.sessionId.get(), retryOptions.getTryTimeout()});
                this.cancelReceiveProcessor.onComplete();
                sink.complete();
            })).subscribe());
        }
        this.subscriptions.add(receiveLink.getSessionId().subscribe(id -> {
            if (!this.sessionId.compareAndSet((String)null, (String)id)) {
                this.logger.warning("Another method set sessionId. Existing: {}. Returned: {}.", new Object[]{this.sessionId.get(), id});
            }
        }));
        this.subscriptions.add(receiveLink.getSessionLockedUntil().subscribe(lockedUntil -> {
            if (!this.sessionLockedUntil.compareAndSet((Instant)null, (Instant)lockedUntil)) {
                this.logger.info("SessionLockedUntil was already set: {}", new Object[]{this.sessionLockedUntil});
            } else {
                this.subscriptions.add(this.getRenewLockOperation((Instant)lockedUntil));
            }
        }));
    }

    boolean containsLockToken(String lockToken) {
        if (lockToken == null) {
            throw this.logger.logExceptionAsError((RuntimeException)new NullPointerException("'lockToken' cannot be null."));
        }
        if (lockToken.isEmpty()) {
            throw this.logger.logExceptionAsError((RuntimeException)new IllegalArgumentException("'lockToken' cannot be an empty string."));
        }
        return this.lockContainer.contains(lockToken);
    }

    String getLinkName() {
        return this.receiveLink.getLinkName();
    }

    String getSessionId() {
        return this.sessionId.get();
    }

    Flux<ServiceBusReceivedMessageContext> receive() {
        return this.receivedMessages;
    }

    void setSessionLockedUntil(Instant lockedUntil) {
        this.sessionLockedUntil.set(lockedUntil);
    }

    Mono<Void> updateDisposition(String lockToken, DeliveryState deliveryState) {
        return this.receiveLink.updateDisposition(lockToken, deliveryState);
    }

    @Override
    public void close() {
        if (this.isDisposed.getAndSet(true)) {
            return;
        }
        this.receiveLink.dispose();
        this.subscriptions.dispose();
    }

    private Disposable getRenewLockOperation(Instant initialLockedUntil) {
        Instant now = Instant.now();
        Duration initialInterval = Duration.between(now, initialLockedUntil);
        if (initialInterval.isNegative()) {
            this.logger.info("Duration was negative. now[{}] lockedUntil[{}]", new Object[]{now, initialLockedUntil});
            initialInterval = Duration.ZERO;
        } else {
            Duration adjusted = MessageUtils.adjustServerTimeout(initialInterval);
            if (adjusted.isNegative()) {
                this.logger.info("Adjusted duration is negative. Adjusted: {}ms", new Object[]{initialInterval.toMillis()});
            } else {
                initialInterval = adjusted;
            }
        }
        EmitterProcessor emitterProcessor = EmitterProcessor.create();
        FluxSink sink = emitterProcessor.sink(FluxSink.OverflowStrategy.BUFFER);
        sink.next((Object)MessageUtils.adjustServerTimeout(initialInterval));
        Flux cancellationSignals = this.enableSessionLockRenewal ? Flux.first((Publisher[])new Publisher[]{this.cancelReceiveProcessor, Mono.delay((Duration)this.maxSessionLockRenewDuration)}) : Flux.first((Publisher[])new Publisher[]{this.cancelReceiveProcessor});
        return Flux.switchOnNext((Publisher)emitterProcessor.map(Flux::interval)).takeUntilOther((Publisher)cancellationSignals).flatMap(delay -> {
            String id = this.sessionId.get();
            this.logger.info("sessionId[{}]. now[{}]. Starting lock renewal.", new Object[]{id, Instant.now()});
            if (CoreUtils.isNullOrEmpty((CharSequence)id)) {
                return Mono.error((Throwable)new IllegalStateException("Cannot renew session lock without session id."));
            }
            return (Publisher)this.renewSessionLock.apply(this.sessionId.get());
        }).map(instant -> {
            Duration next = Duration.between(Instant.now(), instant);
            this.logger.info("sessionId[{}]. nextExpiration[{}]. Next renewal: [{}]", new Object[]{this.sessionId, instant, next});
            sink.next((Object)MessageUtils.adjustServerTimeout(next));
            return instant;
        }).subscribe(lockedUntil -> {
            this.logger.verbose("lockToken[{}]. lockedUntil[{}]. Lock renewal successful.", new Object[]{this.sessionId, lockedUntil});
            this.sessionLockedUntil.set((Instant)lockedUntil);
        }, error -> {
            this.logger.error("Error occurred while renewing lock token.", new Object[]{error});
            this.cancelReceiveProcessor.onNext((Object)new ServiceBusReceivedMessageContext(this.sessionId.get(), (Throwable)error));
        }, () -> {
            this.logger.verbose("Renewing session lock task completed.");
            this.cancelReceiveProcessor.onComplete();
        });
    }

    private static final class SessionMessageManagement
    implements MessageManagementOperations {
        private final ServiceBusReceiveLink link;

        private SessionMessageManagement(ServiceBusReceiveLink link) {
            this.link = link;
        }

        @Override
        public Mono<Void> updateDisposition(String lockToken, DeliveryState deliveryState) {
            return this.link.updateDisposition(lockToken, deliveryState);
        }

        @Override
        public Mono<Instant> renewMessageLock(String lockToken, String associatedLinkName) {
            return Mono.just((Object)Instant.now().plusSeconds(60L));
        }
    }
}

