/*
 * Decompiled with CFR 0.152.
 */
package com.azure.messaging.servicebus.implementation;

import com.azure.core.amqp.AmqpEndpointState;
import com.azure.core.amqp.AmqpRetryPolicy;
import com.azure.core.amqp.exception.AmqpErrorCondition;
import com.azure.core.amqp.exception.AmqpException;
import com.azure.core.amqp.implementation.AmqpReceiveLink;
import com.azure.core.util.FluxUtil;
import com.azure.core.util.logging.ClientLogger;
import com.azure.core.util.logging.LoggingEventBuilder;
import com.azure.messaging.servicebus.implementation.ServiceBusReceiveLink;
import java.time.Duration;
import java.util.Deque;
import java.util.Objects;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import org.apache.qpid.proton.amqp.transport.DeliveryState;
import org.apache.qpid.proton.message.Message;
import org.reactivestreams.Subscription;
import reactor.core.CoreSubscriber;
import reactor.core.Disposable;
import reactor.core.Disposables;
import reactor.core.Exceptions;
import reactor.core.publisher.FluxProcessor;
import reactor.core.publisher.Mono;
import reactor.core.publisher.Operators;
import reactor.core.scheduler.Schedulers;
import reactor.util.context.Context;

public class ServiceBusReceiveLinkProcessor
extends FluxProcessor<ServiceBusReceiveLink, Message>
implements Subscription {
    private final ClientLogger logger = new ClientLogger(ServiceBusReceiveLinkProcessor.class);
    private final Object lock = new Object();
    private final Object queueLock = new Object();
    private final AtomicBoolean isTerminated = new AtomicBoolean();
    private final AtomicInteger retryAttempts = new AtomicInteger();
    private final AtomicReference<String> linkName = new AtomicReference();
    private final Deque<Message> messageQueue = new ConcurrentLinkedDeque<Message>();
    private final AtomicInteger pendingMessages = new AtomicInteger();
    private final int minimumNumberOfMessages;
    private final int prefetch;
    private final AtomicReference<CoreSubscriber<? super Message>> downstream = new AtomicReference();
    private final AtomicInteger wip = new AtomicInteger();
    private final AmqpRetryPolicy retryPolicy;
    private volatile Throwable lastError;
    private volatile boolean isCancelled;
    private volatile ServiceBusReceiveLink currentLink;
    private volatile Disposable currentLinkSubscriptions;
    private volatile Disposable retrySubscription;
    private volatile long requested;
    private static final AtomicLongFieldUpdater<ServiceBusReceiveLinkProcessor> REQUESTED = AtomicLongFieldUpdater.newUpdater(ServiceBusReceiveLinkProcessor.class, "requested");
    private volatile Subscription upstream;
    private static final AtomicReferenceFieldUpdater<ServiceBusReceiveLinkProcessor, Subscription> UPSTREAM = AtomicReferenceFieldUpdater.newUpdater(ServiceBusReceiveLinkProcessor.class, Subscription.class, "upstream");

    public ServiceBusReceiveLinkProcessor(int prefetch, AmqpRetryPolicy retryPolicy) {
        this.retryPolicy = Objects.requireNonNull(retryPolicy, "'retryPolicy' cannot be null.");
        if (prefetch < 0) {
            throw this.logger.logExceptionAsError((RuntimeException)new IllegalArgumentException("'prefetch' cannot be less than 0."));
        }
        this.prefetch = prefetch;
        this.minimumNumberOfMessages = Math.floorDiv(prefetch, 3);
    }

    public String getLinkName() {
        return this.linkName.get();
    }

    public Mono<Void> updateDisposition(String lockToken, DeliveryState deliveryState) {
        if (this.isDisposed()) {
            return FluxUtil.monoError((LoggingEventBuilder)this.logger.atError().addKeyValue("lockToken", lockToken).addKeyValue("deliveryState", (Object)deliveryState), (RuntimeException)new IllegalStateException(String.format("lockToken[%s]. state[%s]. Cannot update disposition on closed processor.", lockToken, deliveryState)));
        }
        ServiceBusReceiveLink link = this.currentLink;
        if (link == null) {
            return FluxUtil.monoError((LoggingEventBuilder)this.logger.atError().addKeyValue("lockToken", lockToken).addKeyValue("deliveryState", (Object)deliveryState), (RuntimeException)new IllegalStateException(String.format("lockToken[%s]. state[%s]. Cannot update disposition with no link.", lockToken, deliveryState)));
        }
        return link.updateDisposition(lockToken, deliveryState).onErrorResume(error -> {
            AmqpException amqpException;
            if (error instanceof AmqpException && AmqpErrorCondition.TIMEOUT_ERROR.equals((Object)(amqpException = (AmqpException)error).getErrorCondition())) {
                return link.closeAsync().then(Mono.error((Throwable)error));
            }
            return Mono.error((Throwable)error);
        });
    }

    public Throwable getError() {
        return this.lastError;
    }

    public boolean isTerminated() {
        return this.isTerminated.get() || this.isCancelled;
    }

    public int getPrefetch() {
        return this.prefetch;
    }

    public void onSubscribe(Subscription subscription) {
        Objects.requireNonNull(subscription, "'subscription' cannot be null");
        if (!Operators.setOnce(UPSTREAM, (Object)((Object)this), (Subscription)subscription)) {
            throw this.logger.logExceptionAsError((RuntimeException)new IllegalStateException("Cannot set upstream twice."));
        }
        this.requestUpstream();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void onNext(ServiceBusReceiveLink next) {
        Disposable oldSubscription;
        ServiceBusReceiveLink oldChannel;
        Objects.requireNonNull(next, "'next' cannot be null.");
        if (this.isTerminated()) {
            this.logger.atWarning().addKeyValue("linkName", next.getLinkName()).addKeyValue("entityPath", next.getEntityPath()).log("Got another link when we have already terminated processor.");
            Operators.onNextDropped((Object)next, (Context)this.currentContext());
            return;
        }
        String linkName = next.getLinkName();
        String entityPath = next.getEntityPath();
        this.logger.atInfo().addKeyValue("linkName", linkName).addKeyValue("entityPath", entityPath).log("Setting next AMQP receive link.");
        Object object = this.lock;
        synchronized (object) {
            oldChannel = this.currentLink;
            oldSubscription = this.currentLinkSubscriptions;
            this.currentLink = next;
            next.setEmptyCreditListener(() -> 0);
            this.currentLinkSubscriptions = Disposables.composite((Disposable[])new Disposable[]{next.receive().publishOn(Schedulers.boundedElastic()).subscribe(message -> {
                Object object = this.queueLock;
                synchronized (object) {
                    this.messageQueue.add((Message)message);
                    this.pendingMessages.incrementAndGet();
                }
                this.drain();
            }, error -> this.logger.atVerbose().addKeyValue("linkName", linkName).addKeyValue("entityPath", entityPath).log("Receiver is terminated.", new Object[]{error})), next.getEndpointStates().subscribeOn(Schedulers.boundedElastic()).subscribe(state -> {
                if (state == AmqpEndpointState.ACTIVE) {
                    this.retryAttempts.set(0);
                }
            }, error -> {
                this.currentLink = null;
                this.onError((Throwable)error);
            }, () -> {
                if (this.isTerminated()) {
                    this.logger.info("Processor is terminated. Disposing of link processor.");
                    this.dispose();
                } else if (this.upstream == Operators.cancelledSubscription()) {
                    this.logger.info("Upstream has completed. Disposing of link processor.");
                    this.dispose();
                } else {
                    this.logger.info("Receive link endpoint states are closed. Requesting another.");
                    ServiceBusReceiveLink existing = this.currentLink;
                    this.currentLink = null;
                    this.disposeReceiver(existing);
                    this.requestUpstream();
                }
            })});
        }
        this.checkAndAddCredits(next);
        this.disposeReceiver(oldChannel);
        if (oldSubscription != null) {
            oldSubscription.dispose();
        }
    }

    public void subscribe(CoreSubscriber<? super Message> actual) {
        boolean terminateSubscriber;
        Objects.requireNonNull(actual, "'actual' cannot be null.");
        boolean bl = terminateSubscriber = this.isTerminated() || this.currentLink == null && this.upstream == Operators.cancelledSubscription();
        if (this.isTerminated()) {
            ServiceBusReceiveLink link = this.currentLink;
            String linkName = link != null ? link.getLinkName() : "n/a";
            String entityPath = link != null ? link.getEntityPath() : "n/a";
            this.logger.atInfo().addKeyValue("linkName", linkName).addKeyValue("entityPath", entityPath).log("AmqpReceiveLink is already terminated.");
        } else if (this.currentLink == null && this.upstream == Operators.cancelledSubscription()) {
            this.logger.info("There is no current link and upstream is terminated.");
        }
        if (terminateSubscriber) {
            actual.onSubscribe(Operators.emptySubscription());
            if (this.hasError()) {
                actual.onError(this.lastError);
            } else {
                actual.onComplete();
            }
            return;
        }
        if (this.downstream.compareAndSet(null, actual)) {
            actual.onSubscribe((Subscription)this);
            this.drain();
        } else {
            Operators.error(actual, (Throwable)this.logger.logExceptionAsError((RuntimeException)new IllegalStateException("There is already one downstream subscriber.'")));
        }
    }

    public void onError(Throwable throwable) {
        String entityPath;
        Objects.requireNonNull(throwable, "'throwable' is required.");
        if (this.isTerminated()) {
            this.logger.info("AmqpReceiveLinkProcessor is terminated. Not reopening on error.");
            return;
        }
        int attempt = this.retryAttempts.incrementAndGet();
        Duration retryInterval = this.retryPolicy.calculateRetryDelay(throwable, attempt);
        ServiceBusReceiveLink link = this.currentLink;
        String linkName = link != null ? link.getLinkName() : "n/a";
        String string = entityPath = link != null ? link.getEntityPath() : "n/a";
        if (retryInterval != null && this.upstream != Operators.cancelledSubscription()) {
            this.logger.atWarning().addKeyValue("linkName", linkName).addKeyValue("entityPath", entityPath).addKeyValue("attempt", (long)attempt).addKeyValue("retryAfter", retryInterval.toMillis()).log("Transient error occurred.", new Object[]{throwable});
            this.retrySubscription = Mono.delay((Duration)retryInterval).subscribe(i -> this.requestUpstream());
            return;
        }
        this.logger.atWarning().addKeyValue("linkName", linkName).addKeyValue("entityPath", entityPath).log("Non-retryable error occurred in AMQP receive link.", new Object[]{throwable});
        this.lastError = throwable;
        this.isTerminated.set(true);
        CoreSubscriber<? super Message> subscriber = this.downstream.get();
        if (subscriber != null) {
            subscriber.onError(throwable);
        }
        this.onDispose();
    }

    public void dispose() {
        if (this.isTerminated.getAndSet(true)) {
            return;
        }
        this.drain();
        this.onDispose();
    }

    public void onComplete() {
        this.upstream = Operators.cancelledSubscription();
    }

    public void request(long request) {
        if (!Operators.validate((long)request)) {
            this.logger.atWarning().addKeyValue("requested", request).log("Invalid request");
            return;
        }
        Operators.addCap(REQUESTED, (Object)((Object)this), (long)request);
        ServiceBusReceiveLink link = this.currentLink;
        if (link == null) {
            return;
        }
        this.checkAndAddCredits(link);
        this.drain();
    }

    public void cancel() {
        if (this.isCancelled) {
            return;
        }
        this.isCancelled = true;
        this.drain();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void requestUpstream() {
        if (this.isTerminated()) {
            this.logger.info("Processor is terminated. Not requesting another link.");
            return;
        }
        if (this.upstream == null) {
            this.logger.info("There is no upstream. Not requesting another link.");
            return;
        }
        if (this.upstream == Operators.cancelledSubscription()) {
            this.logger.info("Upstream is cancelled or complete. Not requesting another link.");
            return;
        }
        Object object = this.lock;
        synchronized (object) {
            if (this.currentLink != null) {
                this.logger.info("Current link exists. Not requesting another link.");
                return;
            }
        }
        this.logger.info("Requesting a new AmqpReceiveLink from upstream.");
        this.upstream.request(1L);
    }

    private void onDispose() {
        if (this.retrySubscription != null && !this.retrySubscription.isDisposed()) {
            this.retrySubscription.dispose();
        }
        this.disposeReceiver(this.currentLink);
        this.currentLink = null;
        if (this.currentLinkSubscriptions != null) {
            this.currentLinkSubscriptions.dispose();
        }
    }

    private void drain() {
        if (this.wip.getAndIncrement() != 0) {
            return;
        }
        int missed = 1;
        while (missed != 0) {
            this.drainQueue();
            missed = this.wip.addAndGet(-missed);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void drainQueue() {
        CoreSubscriber<? super Message> subscriber = this.downstream.get();
        if (subscriber == null || this.checkAndSetTerminated()) {
            return;
        }
        long numberRequested = REQUESTED.get(this);
        boolean isEmpty = this.messageQueue.isEmpty();
        while (numberRequested != 0L && !isEmpty && !this.checkAndSetTerminated()) {
            Message message;
            long numberEmitted;
            for (numberEmitted = 0L; !(numberRequested == numberEmitted || isEmpty && this.checkAndSetTerminated() || (message = this.messageQueue.poll()) == null); ++numberEmitted) {
                if (this.isCancelled) {
                    Operators.onDiscard((Object)message, (Context)subscriber.currentContext());
                    Object object = this.queueLock;
                    synchronized (object) {
                        Operators.onDiscardQueueWithClear(this.messageQueue, (Context)subscriber.currentContext(), null);
                        this.pendingMessages.set(0);
                    }
                    return;
                }
                try {
                    subscriber.onNext((Object)message);
                    this.pendingMessages.decrementAndGet();
                    if (this.prefetch > 0) {
                        this.checkAndAddCredits(this.currentLink);
                    }
                }
                catch (Exception e) {
                    this.logger.error("Exception occurred while handling downstream onNext operation.", new Object[]{e});
                    throw this.logger.logExceptionAsError(Exceptions.propagate((Throwable)Operators.onOperatorError((Subscription)this.upstream, (Throwable)e, (Object)message, (Context)subscriber.currentContext())));
                }
                isEmpty = this.messageQueue.isEmpty();
            }
            if (REQUESTED.get(this) == Long.MAX_VALUE) continue;
            numberRequested = REQUESTED.addAndGet(this, -numberEmitted);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private boolean checkAndSetTerminated() {
        if (!this.isTerminated()) {
            return false;
        }
        CoreSubscriber<? super Message> subscriber = this.downstream.get();
        Throwable error = this.lastError;
        if (error != null) {
            subscriber.onError(error);
        } else {
            subscriber.onComplete();
        }
        this.disposeReceiver(this.currentLink);
        Object object = this.queueLock;
        synchronized (object) {
            this.messageQueue.clear();
            this.pendingMessages.set(0);
        }
        return true;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void checkAndAddCredits(AmqpReceiveLink link) {
        if (link == null) {
            return;
        }
        Object object = this.lock;
        synchronized (object) {
            int linkCredits = link.getCredits();
            int credits = this.getCreditsToAdd(linkCredits);
            if (credits > 0) {
                link.addCredits(credits).subscribe();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private int getCreditsToAdd(int linkCredits) {
        int creditsToAdd;
        boolean hasBackpressure;
        CoreSubscriber<? super Message> subscriber = this.downstream.get();
        long r = REQUESTED.get(this);
        boolean bl = hasBackpressure = r != Long.MAX_VALUE;
        if (subscriber == null || r == 0L) {
            this.logger.info("Not adding credits. No downstream subscribers or items requested.");
            return 0;
        }
        int expectedTotalCredit = this.prefetch == 0 ? (r <= Integer.MAX_VALUE ? (int)r : Integer.MAX_VALUE) : this.prefetch;
        Object object = this.queueLock;
        synchronized (object) {
            int queuedMessages = this.pendingMessages.get();
            int pending = queuedMessages + linkCredits;
            creditsToAdd = hasBackpressure ? Math.max(expectedTotalCredit - pending, 0) : (this.minimumNumberOfMessages >= queuedMessages ? Math.max(expectedTotalCredit - pending, 0) : 0);
            this.logger.atInfo().addKeyValue("prefetch", (long)this.getPrefetch()).addKeyValue("requested", r).addKeyValue("linkCredits", (long)linkCredits).addKeyValue("expectedTotalCredit", (long)expectedTotalCredit).addKeyValue("queuedMessages", (long)queuedMessages).addKeyValue("creditsToAdd", (long)creditsToAdd).addKeyValue("messageQueueSize", (long)this.messageQueue.size()).log("Adding credits.");
        }
        return creditsToAdd;
    }

    private void disposeReceiver(AmqpReceiveLink link) {
        if (link == null) {
            return;
        }
        try {
            link.closeAsync().subscribe();
        }
        catch (Exception error) {
            this.logger.atWarning().addKeyValue("linkName", link.getLinkName()).addKeyValue("entityPath", link.getEntityPath()).log("Unable to dispose of link.", new Object[]{error});
        }
    }
}

