/*
 * Decompiled with CFR 0.152.
 */
package com.azure.messaging.servicebus.implementation;

import com.azure.core.amqp.AmqpConnection;
import com.azure.core.amqp.AmqpEndpointState;
import com.azure.core.amqp.AmqpRetryPolicy;
import com.azure.core.amqp.implementation.ReactorProvider;
import com.azure.core.amqp.implementation.ReactorReceiver;
import com.azure.core.amqp.implementation.TokenManager;
import com.azure.core.amqp.implementation.handler.ReceiveLinkHandler;
import com.azure.core.amqp.implementation.handler.ReceiverUnsettledDeliveries;
import com.azure.core.util.FluxUtil;
import com.azure.core.util.logging.ClientLogger;
import com.azure.messaging.servicebus.implementation.MessageUtils;
import com.azure.messaging.servicebus.implementation.MessageWithLockToken;
import com.azure.messaging.servicebus.implementation.ServiceBusReactorSession;
import com.azure.messaging.servicebus.implementation.ServiceBusReceiveLink;
import java.time.Duration;
import java.time.Instant;
import java.time.OffsetDateTime;
import java.time.ZoneOffset;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.qpid.proton.Proton;
import org.apache.qpid.proton.amqp.messaging.Accepted;
import org.apache.qpid.proton.amqp.messaging.Source;
import org.apache.qpid.proton.amqp.transport.DeliveryState;
import org.apache.qpid.proton.amqp.transport.ErrorCondition;
import org.apache.qpid.proton.amqp.transport.SenderSettleMode;
import org.apache.qpid.proton.engine.Delivery;
import org.apache.qpid.proton.engine.Receiver;
import org.apache.qpid.proton.message.Message;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;

public class ServiceBusReactorReceiver
extends ReactorReceiver
implements ServiceBusReceiveLink {
    private static final Message EMPTY_MESSAGE = Proton.message();
    private final ClientLogger logger;
    private final ReceiverUnsettledDeliveries receiverUnsettledDeliveries;
    private final AtomicBoolean isDisposed = new AtomicBoolean();
    private final Receiver receiver;
    private final boolean isSettled;
    private final ReceiveLinkHandler handler;
    private final Mono<String> sessionIdMono;
    private final Mono<OffsetDateTime> sessionLockedUntil;

    public ServiceBusReactorReceiver(AmqpConnection connection, String entityPath, Receiver receiver, ReceiveLinkHandler handler, TokenManager tokenManager, ReactorProvider provider, AmqpRetryPolicy retryPolicy) {
        super(connection, entityPath, receiver, handler, tokenManager, provider.getReactorDispatcher(), retryPolicy.getRetryOptions());
        this.receiver = receiver;
        this.handler = handler;
        this.isSettled = receiver.getSenderSettleMode() == SenderSettleMode.SETTLED;
        HashMap<String, String> loggingContext = new HashMap<String, String>(2);
        loggingContext.put("linkName", this.handler.getLinkName());
        loggingContext.put("entityPath", entityPath);
        this.logger = new ClientLogger(ServiceBusReactorReceiver.class, loggingContext);
        this.receiverUnsettledDeliveries = new ReceiverUnsettledDeliveries(handler.getHostname(), entityPath, handler.getLinkName(), provider.getReactorDispatcher(), retryPolicy.getRetryOptions(), MessageUtils.ZERO_LOCK_TOKEN, this.logger);
        this.sessionIdMono = this.getEndpointStates().filter(x -> x == AmqpEndpointState.ACTIVE).next().flatMap(state -> {
            Map remoteSource = ((Source)receiver.getRemoteSource()).getFilter();
            Object value = remoteSource.get(ServiceBusReactorSession.SESSION_FILTER);
            if (value == null) {
                this.logger.info("There is no session id.");
                return Mono.empty();
            }
            String actualSessionId = String.valueOf(value);
            return Mono.just((Object)actualSessionId);
        }).cache(value -> Duration.ofMillis(Long.MAX_VALUE), error -> Duration.ZERO, () -> Duration.ZERO);
        this.sessionLockedUntil = this.getEndpointStates().filter(x -> x == AmqpEndpointState.ACTIVE).next().map(state -> {
            if (receiver.getRemoteProperties() != null && receiver.getRemoteProperties().containsKey(ServiceBusReactorSession.LOCKED_UNTIL_UTC)) {
                long ticks = (Long)receiver.getRemoteProperties().get(ServiceBusReactorSession.LOCKED_UNTIL_UTC);
                return MessageUtils.convertDotNetTicksToOffsetDateTime(ticks);
            }
            this.logger.info("Locked until not set.");
            return Instant.EPOCH.atOffset(ZoneOffset.UTC);
        }).cache(value -> Duration.ofMillis(Long.MAX_VALUE), error -> Duration.ZERO, () -> Duration.ZERO);
    }

    @Override
    public Mono<Void> updateDisposition(String lockToken, DeliveryState deliveryState) {
        if (this.isDisposed.get()) {
            return FluxUtil.monoError((ClientLogger)this.logger, (RuntimeException)new IllegalStateException("Cannot perform operations on a disposed receiver."));
        }
        return this.receiverUnsettledDeliveries.sendDisposition(lockToken, deliveryState);
    }

    public Flux<Message> receive() {
        return super.receive().filter(message -> message != EMPTY_MESSAGE).publishOn(Schedulers.boundedElastic());
    }

    @Override
    public Mono<String> getSessionId() {
        return this.sessionIdMono;
    }

    @Override
    public Mono<OffsetDateTime> getSessionLockedUntil() {
        return this.sessionLockedUntil;
    }

    public Mono<Void> closeAsync() {
        return this.closeAsync("User invoked close operation.", null);
    }

    protected Mono<Void> closeAsync(String message, ErrorCondition errorCondition) {
        if (this.isDisposed.getAndSet(true)) {
            return super.getIsClosedMono();
        }
        return this.receiverUnsettledDeliveries.terminateAndAwaitForDispositionsInProgressToComplete().then(super.closeAsync(message, errorCondition));
    }

    protected Message decodeDelivery(Delivery delivery) {
        byte[] deliveryTag = delivery.getTag();
        UUID lockToken = deliveryTag != null && deliveryTag.length == 16 ? MessageUtils.convertDotNetBytesToUUID(deliveryTag) : MessageUtils.ZERO_LOCK_TOKEN;
        if (this.receiverUnsettledDeliveries.containsDelivery(lockToken)) {
            this.receiverUnsettledDeliveries.onDispositionAck(lockToken, delivery);
            return EMPTY_MESSAGE;
        }
        int messageSize = delivery.pending();
        byte[] buffer = new byte[messageSize];
        int read = this.receiver.recv(buffer, 0, messageSize);
        Message message = Proton.message();
        message.decode(buffer, 0, read);
        if (this.isSettled) {
            delivery.disposition((DeliveryState)Accepted.getInstance());
            delivery.settle();
        } else {
            this.receiverUnsettledDeliveries.onDelivery(lockToken, delivery);
            this.receiver.advance();
        }
        return new MessageWithLockToken(message, lockToken);
    }

    protected void onHandlerClose() {
        this.receiverUnsettledDeliveries.close();
    }
}

