/*
 * Decompiled with CFR 0.152.
 */
package com.azure.messaging.servicebus;

import com.azure.core.amqp.AmqpRetryOptions;
import com.azure.core.amqp.implementation.MessageSerializer;
import com.azure.core.util.FluxUtil;
import com.azure.core.util.logging.ClientLogger;
import com.azure.messaging.servicebus.MessageLockToken;
import com.azure.messaging.servicebus.ServiceBusReceivedMessage;
import com.azure.messaging.servicebus.ServiceBusTransactionContext;
import com.azure.messaging.servicebus.implementation.DispositionStatus;
import com.azure.messaging.servicebus.implementation.MessageManagementOperations;
import com.azure.messaging.servicebus.implementation.MessageUtils;
import com.azure.messaging.servicebus.implementation.ServiceBusMessageProcessor;
import com.azure.messaging.servicebus.implementation.ServiceBusReceiveLinkProcessor;
import java.time.Duration;
import java.time.Instant;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.BiFunction;
import org.apache.qpid.proton.amqp.transport.DeliveryState;
import org.reactivestreams.Subscriber;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

class ServiceBusAsyncConsumer
implements AutoCloseable {
    private final ClientLogger logger = new ClientLogger(ServiceBusAsyncConsumer.class);
    private final AtomicBoolean isDisposed = new AtomicBoolean();
    private final String linkName;
    private final ServiceBusReceiveLinkProcessor linkProcessor;
    private final MessageSerializer messageSerializer;
    private final ServiceBusMessageProcessor processor;

    ServiceBusAsyncConsumer(String linkName, ServiceBusReceiveLinkProcessor linkProcessor, MessageSerializer messageSerializer, boolean isAutoComplete, boolean autoLockRenewal, Duration maxAutoLockRenewDuration, AmqpRetryOptions retryOptions, BiFunction<MessageLockToken, String, Mono<Instant>> renewMessageLock) {
        this.linkName = linkName;
        this.linkProcessor = linkProcessor;
        this.messageSerializer = messageSerializer;
        MessageManagement messageManagement = new MessageManagement(linkProcessor, renewMessageLock);
        this.processor = (ServiceBusMessageProcessor)linkProcessor.map(message -> (ServiceBusReceivedMessage)this.messageSerializer.deserialize(message, ServiceBusReceivedMessage.class)).subscribeWith((Subscriber)new ServiceBusMessageProcessor(linkName, isAutoComplete, autoLockRenewal, maxAutoLockRenewDuration, retryOptions, linkProcessor.getErrorContext(), messageManagement));
    }

    String getLinkName() {
        return this.linkName;
    }

    Flux<ServiceBusReceivedMessage> receive() {
        return this.processor;
    }

    Mono<Void> updateDisposition(String lockToken, DispositionStatus dispositionStatus, String deadLetterReason, String deadLetterErrorDescription, Map<String, Object> propertiesToModify, ServiceBusTransactionContext transactionContext) {
        DeliveryState deliveryState = MessageUtils.getDeliveryState(dispositionStatus, deadLetterReason, deadLetterErrorDescription, propertiesToModify, transactionContext);
        if (deliveryState == null) {
            return FluxUtil.monoError((ClientLogger)this.logger, (RuntimeException)new IllegalArgumentException("'dispositionStatus' is not known. status: " + (Object)((Object)dispositionStatus)));
        }
        return this.linkProcessor.updateDisposition(lockToken, deliveryState);
    }

    @Override
    public void close() {
        if (!this.isDisposed.getAndSet(true)) {
            this.processor.onComplete();
            this.linkProcessor.cancel();
        }
    }

    private static final class MessageManagement
    implements MessageManagementOperations {
        private final ServiceBusReceiveLinkProcessor link;
        private final BiFunction<MessageLockToken, String, Mono<Instant>> renewMessageLock;

        private MessageManagement(ServiceBusReceiveLinkProcessor link, BiFunction<MessageLockToken, String, Mono<Instant>> renewMessageLock) {
            this.link = link;
            this.renewMessageLock = renewMessageLock;
        }

        @Override
        public Mono<Void> updateDisposition(String lockToken, DeliveryState deliveryState) {
            return this.link.updateDisposition(lockToken, deliveryState);
        }

        @Override
        public Mono<Instant> renewMessageLock(String lockToken, String associatedLinkName) {
            return this.renewMessageLock.apply(MessageLockToken.fromString(lockToken), associatedLinkName);
        }
    }
}

