/*
 * Decompiled with CFR 0.152.
 */
package com.azure.messaging.servicebus.implementation;

import com.azure.core.amqp.AmqpRetryOptions;
import com.azure.core.amqp.exception.AmqpErrorCondition;
import com.azure.core.amqp.exception.AmqpErrorContext;
import com.azure.core.amqp.exception.AmqpException;
import com.azure.core.util.CoreUtils;
import com.azure.core.util.logging.ClientLogger;
import com.azure.messaging.servicebus.ServiceBusReceivedMessage;
import com.azure.messaging.servicebus.implementation.DispositionStatus;
import com.azure.messaging.servicebus.implementation.MessageManagementOperations;
import com.azure.messaging.servicebus.implementation.MessageUtils;
import java.time.Duration;
import java.time.Instant;
import java.util.Deque;
import java.util.Objects;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import org.apache.qpid.proton.amqp.transport.DeliveryState;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscription;
import reactor.core.CoreSubscriber;
import reactor.core.Disposable;
import reactor.core.Disposables;
import reactor.core.Exceptions;
import reactor.core.publisher.EmitterProcessor;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxProcessor;
import reactor.core.publisher.FluxSink;
import reactor.core.publisher.Mono;
import reactor.core.publisher.Operators;
import reactor.util.context.Context;

public class ServiceBusMessageProcessor
extends FluxProcessor<ServiceBusReceivedMessage, ServiceBusReceivedMessage>
implements Subscription {
    private final ClientLogger logger = new ClientLogger(ServiceBusMessageProcessor.class);
    private final boolean isAutoComplete;
    private final String linkName;
    private final AmqpRetryOptions retryOptions;
    private final AmqpErrorContext errorContext;
    private final Deque<ServiceBusReceivedMessage> messageQueue = new ConcurrentLinkedDeque<ServiceBusReceivedMessage>();
    private final boolean isAutoRenewLock;
    private final Duration maxAutoLockRenewal;
    private final MessageManagementOperations managementOperations;
    private volatile boolean isDone;
    private volatile CoreSubscriber<? super ServiceBusReceivedMessage> downstream;
    private volatile boolean isCancelled;
    volatile Subscription upstream;
    private static final AtomicReferenceFieldUpdater<ServiceBusMessageProcessor, Subscription> UPSTREAM = AtomicReferenceFieldUpdater.newUpdater(ServiceBusMessageProcessor.class, Subscription.class, "upstream");
    volatile int once;
    static final AtomicIntegerFieldUpdater<ServiceBusMessageProcessor> ONCE = AtomicIntegerFieldUpdater.newUpdater(ServiceBusMessageProcessor.class, "once");
    volatile int wip;
    static final AtomicIntegerFieldUpdater<ServiceBusMessageProcessor> WIP = AtomicIntegerFieldUpdater.newUpdater(ServiceBusMessageProcessor.class, "wip");
    volatile long requested;
    static final AtomicLongFieldUpdater<ServiceBusMessageProcessor> REQUESTED = AtomicLongFieldUpdater.newUpdater(ServiceBusMessageProcessor.class, "requested");
    volatile Throwable error;
    static final AtomicReferenceFieldUpdater<ServiceBusMessageProcessor, Throwable> ERROR = AtomicReferenceFieldUpdater.newUpdater(ServiceBusMessageProcessor.class, Throwable.class, "error");

    public ServiceBusMessageProcessor(String linkName, boolean isAutoComplete, boolean isAutoRenewLock, Duration maxAutoLockRenewal, AmqpRetryOptions retryOptions, AmqpErrorContext errorContext, MessageManagementOperations managementOperations) {
        this.linkName = linkName;
        this.retryOptions = Objects.requireNonNull(retryOptions, "'retryOptions' cannot be null.");
        this.errorContext = Objects.requireNonNull(errorContext, "'errorContext' cannot be null.");
        this.managementOperations = managementOperations;
        this.isAutoComplete = isAutoComplete;
        this.isAutoRenewLock = isAutoRenewLock;
        this.maxAutoLockRenewal = maxAutoLockRenewal;
    }

    public void onSubscribe(Subscription subscription) {
        Objects.requireNonNull(subscription, "'subscription' cannot be null.");
        if (Operators.setOnce(UPSTREAM, (Object)((Object)this), (Subscription)subscription)) {
            subscription.request(1L);
        } else {
            RuntimeException error = this.logger.logExceptionAsError((RuntimeException)new IllegalStateException("Processor cannot be subscribed to with multiple upstreams."));
            this.onError(Operators.onOperatorError((Subscription)subscription, (Throwable)error, (Context)Context.empty()));
        }
    }

    public boolean isTerminated() {
        return this.isDone || this.isCancelled;
    }

    public void onNext(ServiceBusReceivedMessage message) {
        if (this.isTerminated()) {
            Context context = this.downstream == null ? this.currentContext() : this.downstream.currentContext();
            Operators.onNextDropped((Object)message, (Context)context);
            return;
        }
        this.messageQueue.add(message);
        this.drain();
    }

    public void onError(Throwable throwable) {
        if (this.isDone || this.isCancelled) {
            this.logger.error("Exception occurred from upstream when this is already terminated.", new Object[]{throwable});
            Operators.onErrorDropped((Throwable)throwable, (Context)this.currentContext());
            return;
        }
        if (Exceptions.addThrowable(ERROR, (Object)((Object)this), (Throwable)throwable)) {
            this.isDone = true;
        } else {
            Operators.onErrorDropped((Throwable)throwable, (Context)this.currentContext());
        }
        this.drain();
    }

    public void onComplete() {
        if (this.isDone) {
            return;
        }
        this.isDone = true;
        this.drain();
    }

    public void request(long request) {
        this.logger.info("Back-pressure request: {}", new Object[]{request});
        if (Operators.validate((long)request)) {
            Operators.addCap(REQUESTED, (Object)((Object)this), (long)request);
            if (this.upstream != null) {
                this.upstream.request(request);
            }
            this.drain();
        }
    }

    public void cancel() {
        if (this.isCancelled) {
            return;
        }
        this.logger.verbose("Cancelling subscription.");
        this.isCancelled = true;
        this.drain();
    }

    public void dispose() {
        if (this.isDone) {
            return;
        }
        this.logger.verbose("Disposing subscription.");
        this.isDone = true;
        this.drain();
    }

    public boolean isDisposed() {
        return this.isDone || this.isCancelled;
    }

    public void subscribe(CoreSubscriber<? super ServiceBusReceivedMessage> downstream) {
        Objects.requireNonNull(downstream, "'downstream' cannot be null.");
        if (this.once == 0 && ONCE.compareAndSet(this, 0, 1)) {
            this.downstream = downstream;
            downstream.onSubscribe((Subscription)this);
            if (this.isCancelled) {
                this.downstream = null;
            } else {
                this.drain();
            }
        } else {
            Operators.error(downstream, (Throwable)new IllegalStateException("ServiceBusMessageSubscriber can only have one subscriber."));
        }
    }

    private void drain() {
        if (!WIP.compareAndSet(this, 0, 1)) {
            return;
        }
        try {
            this.drainQueue();
        }
        finally {
            if (WIP.decrementAndGet(this) != 0) {
                this.logger.warning("There is another worker in drainLoop. But there should only be 1 worker.");
            }
        }
    }

    private void drainQueue() {
        long amountRequested;
        long emitted;
        if (this.downstream == null) {
            return;
        }
        while (!this.messageQueue.isEmpty() && REQUESTED.addAndGet(this, -(emitted = this.drainRequested(amountRequested = REQUESTED.get(this)))) != 0L && !this.isDone) {
        }
        if (this.isDone) {
            if (this.error != null) {
                this.downstream.onError(this.error);
            } else if (this.messageQueue.peekLast() == null) {
                this.downstream.onComplete();
            } else {
                Operators.onDiscardQueueWithClear(this.messageQueue, (Context)this.downstream.currentContext(), null);
            }
            this.downstream = null;
        }
    }

    private long drainRequested(long numberRequested) {
        long numberEmitted;
        if (numberRequested == 0L) {
            return numberEmitted;
        }
        for (numberEmitted = 0L; numberEmitted < numberRequested; ++numberEmitted) {
            if (this.isDone) {
                return numberEmitted;
            }
            ServiceBusReceivedMessage message = this.messageQueue.poll();
            if (message == null) break;
            if (this.isCancelled) {
                Operators.onDiscard((Object)message, (Context)this.downstream.currentContext());
                Operators.onDiscardQueueWithClear(this.messageQueue, (Context)this.downstream.currentContext(), null);
                break;
            }
            try {
                this.next(message);
                continue;
            }
            catch (Exception e) {
                this.setInternalError(e);
                break;
            }
        }
        return numberEmitted;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void next(ServiceBusReceivedMessage message) {
        long sequenceNumber = message.getSequenceNumber();
        String lockToken = message.getLockToken();
        Instant initialLockedUntil = message.getLockedUntil();
        if (this.isAutoComplete && CoreUtils.isNullOrEmpty((CharSequence)lockToken)) {
            throw this.logger.logExceptionAsError((RuntimeException)new IllegalStateException("Cannot auto-complete message without a lock token on message. Sequence number: " + sequenceNumber));
        }
        AtomicBoolean hasError = new AtomicBoolean();
        Disposable renewLockOperation = this.getRenewLockOperation(message, initialLockedUntil, hasError);
        try {
            this.downstream.onNext((Object)message);
        }
        catch (Exception e) {
            hasError.set(true);
            this.logger.error("Exception occurred while handling downstream onNext operation.", new Object[]{e});
            if (this.isAutoComplete) {
                this.logger.info("Abandoning message lock: {}", new Object[]{lockToken});
                DeliveryState deliveryState = MessageUtils.getDeliveryState(DispositionStatus.ABANDONED, null, null, null, null);
                this.managementOperations.updateDisposition(lockToken, deliveryState).onErrorContinue((error, item) -> {
                    this.logger.warning("Could not abandon message with lock: {}", new Object[]{lockToken, error});
                    this.setInternalError((Throwable)error);
                }).doFinally(signal -> this.logger.info("lock[{}]. Abandon status: [{}]", new Object[]{lockToken, signal})).block(this.retryOptions.getTryTimeout());
            } else {
                this.setInternalError(e);
            }
        }
        finally {
            renewLockOperation.dispose();
        }
        if (hasError.get()) {
            return;
        }
        if (this.isAutoComplete) {
            this.logger.info("sequenceNumber[{}]. lock[{}]. Completing message.", new Object[]{sequenceNumber, lockToken});
            DeliveryState deliveryState = MessageUtils.getDeliveryState(DispositionStatus.COMPLETED, null, null, null, null);
            this.managementOperations.updateDisposition(lockToken, deliveryState).onErrorResume(error -> {
                this.logger.warning("Could not complete message with lock: {}", new Object[]{lockToken, error});
                this.setInternalError((Throwable)error);
                return Mono.empty();
            }).doFinally(signal -> this.logger.verbose("lock[{}]. Complete status: [{}]", new Object[]{lockToken, signal})).block(this.retryOptions.getTryTimeout());
        }
    }

    private Disposable getRenewLockOperation(ServiceBusReceivedMessage message, Instant initialLockedUntil, AtomicBoolean hasError) {
        if (!this.isAutoRenewLock) {
            return Disposables.disposed();
        }
        String lockToken = message.getLockToken();
        if (initialLockedUntil == null) {
            throw this.logger.logExceptionAsError((RuntimeException)new IllegalStateException("Cannot renew lock token without a value for 'message.getLockedUntil()'"));
        }
        Instant now = Instant.now();
        Duration initialInterval = Duration.between(now, initialLockedUntil);
        if (initialInterval.isNegative()) {
            this.logger.info("Duration was negative. now[{}] lockedUntil[{}]", new Object[]{now, initialLockedUntil});
            initialInterval = Duration.ZERO;
        } else {
            Duration adjusted = MessageUtils.adjustServerTimeout(initialInterval);
            if (adjusted.isNegative()) {
                this.logger.info("Adjusted duration is negative. Adjusted: {}ms", new Object[]{initialInterval.toMillis()});
            } else {
                initialInterval = adjusted;
            }
        }
        this.logger.info("lockToken[{}]. lockedUntil[{}]. firstInterval[{}].", new Object[]{lockToken, initialLockedUntil, initialInterval});
        EmitterProcessor emitterProcessor = EmitterProcessor.create();
        FluxSink sink = emitterProcessor.sink(FluxSink.OverflowStrategy.BUFFER);
        sink.next((Object)MessageUtils.adjustServerTimeout(initialInterval));
        Disposable timeoutOperation = Mono.delay((Duration)this.maxAutoLockRenewal).subscribe(l -> {
            if (!sink.isCancelled()) {
                sink.error((Throwable)new AmqpException(true, AmqpErrorCondition.TIMEOUT_ERROR, "Could not complete within renewal time. Max renewal time: " + this.maxAutoLockRenewal, this.errorContext));
            }
        });
        Disposable renewLockSubscription = Flux.switchOnNext((Publisher)emitterProcessor.map(i -> Flux.interval((Duration)i))).flatMap(delay -> {
            this.logger.info("lockToken[{}]. now[{}]. Starting lock renewal.", new Object[]{lockToken, Instant.now()});
            return this.managementOperations.renewMessageLock(lockToken, this.linkName);
        }).map(instant -> {
            Duration next = Duration.between(Instant.now(), instant);
            this.logger.info("lockToken[{}]. nextExpiration[{}]. Next renewal: [{}]", new Object[]{lockToken, instant, next});
            sink.next((Object)MessageUtils.adjustServerTimeout(next));
            return instant;
        }).subscribe(lockedUntil -> this.logger.verbose("lockToken[{}]. lockedUntil[{}]. Lock renewal successful.", new Object[]{lockToken, lockedUntil}), error -> {
            this.logger.error("Error occurred while renewing lock token.", new Object[]{error});
            hasError.set(true);
            this.setInternalError((Throwable)error);
        }, () -> this.logger.info("Renewing lock token task completed."));
        return Disposables.composite((Disposable[])new Disposable[]{renewLockSubscription, timeoutOperation});
    }

    private void setInternalError(Throwable error) {
        if (Exceptions.addThrowable(ERROR, (Object)((Object)this), (Throwable)error)) {
            this.isDone = true;
        } else {
            Operators.onErrorDropped((Throwable)error, (Context)this.downstream.currentContext());
        }
    }
}

