/*
 * Decompiled with CFR 0.152.
 */
package com.azure.messaging.servicebus;

import com.azure.core.util.logging.ClientLogger;
import com.azure.core.util.logging.LoggingEventBuilder;
import com.azure.messaging.servicebus.ServiceBusReceivedMessage;
import com.azure.messaging.servicebus.ServiceBusReceiverAsyncClient;
import com.azure.messaging.servicebus.SynchronousReceiveWork;
import com.azure.messaging.servicebus.models.ServiceBusReceiveMode;
import java.time.Duration;
import java.util.Objects;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import org.reactivestreams.Subscription;
import reactor.core.publisher.BaseSubscriber;
import reactor.core.publisher.Operators;
import reactor.util.context.Context;

class SynchronousMessageSubscriber
extends BaseSubscriber<ServiceBusReceivedMessage> {
    private static final ClientLogger LOGGER = new ClientLogger(SynchronousMessageSubscriber.class);
    private static final RuntimeException CLIENT_TERMINATED_ERROR = new RuntimeException("The receiver client is terminated. Re-create the client to continue receive attempt.");
    private final AtomicBoolean isDisposed = new AtomicBoolean();
    private volatile Throwable disposalReason;
    private final AtomicInteger wip = new AtomicInteger();
    private final ConcurrentLinkedQueue<SynchronousReceiveWork> workQueue = new ConcurrentLinkedQueue();
    private final ConcurrentLinkedDeque<ServiceBusReceivedMessage> bufferMessages = new ConcurrentLinkedDeque();
    private final Object currentWorkLock = new Object();
    private final ServiceBusReceiverAsyncClient asyncClient;
    private final boolean isPrefetchDisabled;
    private final Duration operationTimeout;
    private final boolean isReceiveDeleteMode;
    private volatile SynchronousReceiveWork currentWork;
    private volatile long requested;
    private static final AtomicLongFieldUpdater<SynchronousMessageSubscriber> REQUESTED = AtomicLongFieldUpdater.newUpdater(SynchronousMessageSubscriber.class, "requested");
    private volatile Subscription upstream;
    private static final AtomicReferenceFieldUpdater<SynchronousMessageSubscriber, Subscription> UPSTREAM = AtomicReferenceFieldUpdater.newUpdater(SynchronousMessageSubscriber.class, Subscription.class, "upstream");

    SynchronousMessageSubscriber(ServiceBusReceiverAsyncClient asyncClient, SynchronousReceiveWork initialWork, boolean isPrefetchDisabled, Duration operationTimeout) {
        this.asyncClient = Objects.requireNonNull(asyncClient, "'asyncClient' cannot be null.");
        this.operationTimeout = Objects.requireNonNull(operationTimeout, "'operationTimeout' cannot be null.");
        this.workQueue.add(Objects.requireNonNull(initialWork, "'initialWork' cannot be null."));
        this.isPrefetchDisabled = isPrefetchDisabled;
        boolean bl = this.isReceiveDeleteMode = asyncClient.getReceiverOptions().getReceiveMode() == ServiceBusReceiveMode.RECEIVE_AND_DELETE;
        if (initialWork.getNumberOfEvents() < 1) {
            throw LOGGER.logExceptionAsError((RuntimeException)new IllegalArgumentException("'numberOfEvents' cannot be less than 1. Actual: " + initialWork.getNumberOfEvents()));
        }
        Operators.addCap(REQUESTED, (Object)((Object)this), (long)initialWork.getNumberOfEvents());
    }

    protected void hookOnSubscribe(Subscription subscription) {
        if (!Operators.setOnce(UPSTREAM, (Object)((Object)this), (Subscription)subscription)) {
            LOGGER.warning("This should only be subscribed to once. Ignoring subscription.");
            return;
        }
        this.getOrUpdateCurrentWork();
        subscription.request(REQUESTED.get(this));
    }

    protected void hookOnNext(ServiceBusReceivedMessage message) {
        if (this.isTerminated()) {
            Operators.onNextDropped((Object)message, (Context)Context.empty());
        } else {
            this.bufferMessages.add(message);
            this.drain();
        }
    }

    void queueWork(SynchronousReceiveWork work) {
        Objects.requireNonNull(work, "'work' cannot be null");
        if (this.isTerminated()) {
            Throwable reason = this.disposalReason;
            if (reason == null) {
                reason = CLIENT_TERMINATED_ERROR;
            }
            work.complete("The receiver client is terminated. Re-create the client to continue receive attempt.", reason);
            return;
        }
        this.workQueue.add(work);
        LoggingEventBuilder logBuilder = LOGGER.atVerbose().addKeyValue("workId", work.getId()).addKeyValue("numberOfEvents", (long)work.getNumberOfEvents()).addKeyValue("timeout", (Object)work.getTimeout());
        if (this.workQueue.peek() == work && (this.currentWork == null || this.currentWork.isTerminal())) {
            logBuilder.log("First work in queue. Requesting upstream if needed.");
            this.getOrUpdateCurrentWork();
        } else {
            logBuilder.log("Queuing receive work.");
        }
        if (UPSTREAM.get(this) != null) {
            this.drain();
        }
    }

    private void drain() {
        if (this.wip.getAndIncrement() != 0) {
            return;
        }
        int missed = 1;
        while (missed != 0) {
            try {
                this.drainQueue();
            }
            finally {
                missed = this.wip.addAndGet(-missed);
            }
        }
    }

    private void drainQueue() {
        if (this.isTerminated()) {
            return;
        }
        long numberRequested = REQUESTED.get(this);
        boolean isEmpty = this.bufferMessages.isEmpty();
        SynchronousReceiveWork currentDownstream = null;
        while (numberRequested != 0L && !isEmpty && !this.isTerminated()) {
            long requestedMessages;
            long numberConsumed;
            for (numberConsumed = 0L; numberRequested != numberConsumed && !isEmpty && !this.isTerminated(); ++numberConsumed) {
                ServiceBusReceivedMessage message = this.bufferMessages.poll();
                boolean isEmitted = false;
                while (!isEmitted) {
                    currentDownstream = this.getOrUpdateCurrentWork();
                    if (currentDownstream == null && this.isReceiveDeleteMode) {
                        this.bufferMessages.addFirst(message);
                        return;
                    }
                    if (currentDownstream == null) break;
                    isEmitted = currentDownstream.emitNext(message);
                }
                if (!isEmitted) {
                    if (this.isPrefetchDisabled) {
                        this.asyncClient.release(message).subscribe(__ -> {}, error -> LOGGER.atWarning().addKeyValue("lockToken", message.getLockToken()).log("Couldn't release the message.", new Object[]{error}), () -> LOGGER.atVerbose().addKeyValue("lockToken", message.getLockToken()).log("Message successfully released."));
                    } else {
                        this.bufferMessages.addFirst(message);
                        break;
                    }
                }
                isEmpty = this.bufferMessages.isEmpty();
            }
            if ((requestedMessages = REQUESTED.get(this)) == Long.MAX_VALUE) continue;
            numberRequested = REQUESTED.addAndGet(this, -numberConsumed);
        }
        if (numberRequested == 0L) {
            LOGGER.atVerbose().log("Current work is completed. Schedule next work.");
            this.getOrUpdateCurrentWork();
        }
    }

    protected void hookOnError(Throwable throwable) {
        this.dispose("Errors occurred upstream", throwable);
    }

    protected void hookOnCancel() {
        this.dispose();
    }

    private boolean isTerminated() {
        if (UPSTREAM.get(this) == Operators.cancelledSubscription()) {
            return true;
        }
        return this.isDisposed.get();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private SynchronousReceiveWork getOrUpdateCurrentWork() {
        Object object = this.currentWorkLock;
        synchronized (object) {
            if (this.currentWork != null && !this.currentWork.isTerminal()) {
                return this.currentWork;
            }
            this.currentWork = this.workQueue.poll();
            while (this.currentWork != null && this.currentWork.isTerminal()) {
                LOGGER.atVerbose().addKeyValue("workId", this.currentWork.getId()).addKeyValue("numberOfEvents", (long)this.currentWork.getNumberOfEvents()).log("This work from queue is terminal. Skip it.");
                this.currentWork = this.workQueue.poll();
            }
            if (this.currentWork != null) {
                SynchronousReceiveWork work = this.currentWork;
                LOGGER.atVerbose().addKeyValue("workId", work.getId()).addKeyValue("numberOfEvents", (long)work.getNumberOfEvents()).log("Current work updated.");
                work.start();
                this.requestUpstream(work.getNumberOfEvents());
            }
            return this.currentWork;
        }
    }

    private void requestUpstream(long numberOfMessages) {
        if (this.isTerminated()) {
            LOGGER.info("Cannot request more messages upstream. Subscriber is terminated.");
            return;
        }
        Subscription subscription = UPSTREAM.get(this);
        if (subscription == null) {
            LOGGER.info("There is no upstream to request messages from.");
            return;
        }
        long currentRequested = REQUESTED.get(this);
        long difference = numberOfMessages - currentRequested;
        LOGGER.atVerbose().addKeyValue("requested", currentRequested).addKeyValue("numberOfMessages", numberOfMessages).addKeyValue("difference", difference).log("Requesting messages from upstream.");
        if (difference <= 0L) {
            return;
        }
        Operators.addCap(REQUESTED, (Object)((Object)this), (long)difference);
        subscription.request(difference);
    }

    public void dispose() {
        super.dispose();
        this.dispose("Upstream completed the receive work.", null);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void dispose(String message, Throwable throwable) {
        super.dispose();
        if (this.isDisposed.getAndSet(true)) {
            return;
        }
        this.disposalReason = throwable;
        Object object = this.currentWorkLock;
        synchronized (object) {
            if (this.currentWork != null) {
                this.currentWork.complete(message, throwable);
                this.currentWork = null;
            }
            SynchronousReceiveWork w = this.workQueue.poll();
            while (w != null) {
                w.complete(message, throwable);
                w = this.workQueue.poll();
            }
        }
    }

    int getWorkQueueSize() {
        return this.workQueue.size();
    }
}

