/*
 * Decompiled with CFR 0.152.
 */
package com.azure.messaging.servicebus;

import com.azure.core.amqp.AmqpRetryOptions;
import com.azure.core.amqp.implementation.MessageSerializer;
import com.azure.core.util.AsyncCloseable;
import com.azure.core.util.CoreUtils;
import com.azure.core.util.logging.ClientLogger;
import com.azure.core.util.logging.LoggingEventBuilder;
import com.azure.messaging.servicebus.LockRenewalOperation;
import com.azure.messaging.servicebus.ServiceBusMessageContext;
import com.azure.messaging.servicebus.ServiceBusReceivedMessage;
import com.azure.messaging.servicebus.implementation.LockContainer;
import com.azure.messaging.servicebus.implementation.ServiceBusConstants;
import com.azure.messaging.servicebus.implementation.ServiceBusReceiveLink;
import java.time.Duration;
import java.time.OffsetDateTime;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import org.apache.qpid.proton.amqp.transport.DeliveryState;
import org.reactivestreams.Publisher;
import reactor.core.Disposable;
import reactor.core.Disposables;
import reactor.core.publisher.DirectProcessor;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;
import reactor.core.publisher.Mono;
import reactor.core.publisher.MonoProcessor;
import reactor.core.scheduler.Scheduler;

class ServiceBusSessionReceiver
implements AsyncCloseable,
AutoCloseable {
    private static final ClientLogger LOGGER = new ClientLogger(ServiceBusSessionReceiver.class);
    private final AtomicBoolean isDisposed = new AtomicBoolean();
    private final LockContainer<OffsetDateTime> lockContainer;
    private final AtomicReference<OffsetDateTime> sessionLockedUntil = new AtomicReference();
    private final AtomicReference<String> sessionId = new AtomicReference();
    private final AtomicReference<LockRenewalOperation> renewalOperation = new AtomicReference();
    private final ServiceBusReceiveLink receiveLink;
    private final Disposable.Composite subscriptions;
    private final Flux<ServiceBusMessageContext> receivedMessages;
    private final MonoProcessor<ServiceBusMessageContext> cancelReceiveProcessor = MonoProcessor.create();
    private final DirectProcessor<String> messageReceivedEmitter = DirectProcessor.create();
    private final FluxSink<String> messageReceivedSink = this.messageReceivedEmitter.sink(FluxSink.OverflowStrategy.BUFFER);
    private final AmqpRetryOptions retryOptions;

    ServiceBusSessionReceiver(ServiceBusReceiveLink receiveLink, MessageSerializer messageSerializer, AmqpRetryOptions retryOptions, int prefetch, Scheduler scheduler, Function<String, Mono<OffsetDateTime>> renewSessionLock, Duration maxSessionLockRenewDuration, Duration sessionIdleTimeout) {
        this.receiveLink = receiveLink;
        this.lockContainer = new LockContainer(ServiceBusConstants.OPERATION_TIMEOUT);
        this.retryOptions = retryOptions;
        receiveLink.setEmptyCreditListener(() -> 0);
        this.receivedMessages = receiveLink.receive().publishOn(scheduler).doOnSubscribe(subscription -> {
            this.withReceiveLinkInformation(LOGGER.atVerbose()).addKeyValue("prefetch", (long)prefetch).log("Adding prefetch to receive link.");
            if (prefetch > 0) {
                receiveLink.addCredits(prefetch).subscribe();
            }
        }).doOnRequest(request -> {
            if (prefetch == 0) {
                receiveLink.addCredits((int)request).subscribe();
            } else {
                receiveLink.addCredits(Math.max(0, prefetch - receiveLink.getCredits())).subscribe();
            }
        }).limitRate(1).takeUntilOther(this.cancelReceiveProcessor).map(message -> {
            ServiceBusReceivedMessage deserialized = (ServiceBusReceivedMessage)messageSerializer.deserialize(message, ServiceBusReceivedMessage.class);
            if (!CoreUtils.isNullOrEmpty((CharSequence)deserialized.getLockToken()) && deserialized.getLockedUntil() != null) {
                this.lockContainer.addOrUpdate(deserialized.getLockToken(), deserialized.getLockedUntil(), deserialized.getLockedUntil());
            } else {
                LOGGER.atInfo().addKeyValue("sessionId", deserialized.getSessionId()).addKeyValue("messageId", deserialized.getMessageId()).log("There is no lock token.");
            }
            return new ServiceBusMessageContext(deserialized);
        }).onErrorResume(error -> {
            this.withReceiveLinkInformation(LOGGER.atWarning()).log("Error occurred. Ending session.", new Object[]{error});
            return Mono.just((Object)new ServiceBusMessageContext(this.getSessionId(), (Throwable)error));
        }).doOnNext(context -> {
            if (context.hasError()) {
                return;
            }
            ServiceBusReceivedMessage message = context.getMessage();
            String token = !CoreUtils.isNullOrEmpty((CharSequence)message.getLockToken()) ? message.getLockToken() : "";
            LOGGER.atVerbose().addKeyValue("sessionId", context.getSessionId()).addKeyValue("messageId", message.getMessageId()).log("Received message.");
            this.messageReceivedSink.next((Object)token);
        });
        this.subscriptions = Disposables.composite();
        if (sessionIdleTimeout != null) {
            this.subscriptions.add(Flux.switchOnNext((Publisher)this.messageReceivedEmitter.map(lockToken -> Mono.delay((Duration)sessionIdleTimeout))).subscribe(item -> {
                this.withReceiveLinkInformation(LOGGER.atInfo()).addKeyValue("timeout", (Object)sessionIdleTimeout).log("Did not a receive message within timeout.");
                this.cancelReceiveProcessor.onComplete();
            }));
        }
        this.subscriptions.add(receiveLink.getSessionId().subscribe(id -> {
            if (!this.sessionId.compareAndSet((String)null, (String)id)) {
                LOGGER.atWarning().addKeyValue("existingSessionId", this.sessionId.get()).addKeyValue("returnedSessionId", id).log("Another method set sessionId.");
            }
        }));
        this.subscriptions.add(receiveLink.getSessionLockedUntil().subscribe(lockedUntil -> {
            if (!this.sessionLockedUntil.compareAndSet((OffsetDateTime)null, (OffsetDateTime)lockedUntil)) {
                this.withReceiveLinkInformation(LOGGER.atInfo()).addKeyValue("existingLockToken", (Object)this.sessionLockedUntil.get()).addKeyValue("newLockToken", lockedUntil).log("SessionLockedUntil was already set.");
                return;
            }
            this.renewalOperation.compareAndSet(null, new LockRenewalOperation(this.sessionId.get(), maxSessionLockRenewDuration, true, renewSessionLock, (OffsetDateTime)lockedUntil));
        }));
    }

    boolean containsLockToken(String lockToken) {
        if (lockToken == null) {
            throw LOGGER.logExceptionAsError((RuntimeException)new NullPointerException("'lockToken' cannot be null."));
        }
        if (lockToken.isEmpty()) {
            throw LOGGER.logExceptionAsError((RuntimeException)new IllegalArgumentException("'lockToken' cannot be an empty string."));
        }
        return this.lockContainer.containsUnexpired(lockToken);
    }

    String getLinkName() {
        return this.receiveLink.getLinkName();
    }

    String getSessionId() {
        return this.sessionId.get();
    }

    Flux<ServiceBusMessageContext> receive() {
        return this.receivedMessages;
    }

    void setSessionLockedUntil(OffsetDateTime lockedUntil) {
        this.sessionLockedUntil.set(lockedUntil);
    }

    Mono<Void> updateDisposition(String lockToken, DeliveryState deliveryState) {
        return this.receiveLink.updateDisposition(lockToken, deliveryState).doFinally(ignored -> this.lockContainer.remove(lockToken));
    }

    public Mono<Void> closeAsync() {
        if (this.isDisposed.getAndSet(true)) {
            return this.receiveLink.closeAsync();
        }
        LockRenewalOperation operation = this.renewalOperation.getAndSet(null);
        if (operation != null) {
            operation.close();
        }
        return this.receiveLink.closeAsync().doFinally(signal -> this.subscriptions.dispose());
    }

    @Override
    public void close() {
        this.closeAsync().block(this.retryOptions.getTryTimeout());
    }

    private LoggingEventBuilder withReceiveLinkInformation(LoggingEventBuilder builder) {
        String current = this.sessionId.get();
        return builder.addKeyValue("sessionId", current != null ? current : "n/a").addKeyValue("entityPath", this.receiveLink.getEntityPath()).addKeyValue("linkName", this.receiveLink.getLinkName());
    }
}

