/*
 * Decompiled with CFR 0.152.
 */
package com.azure.core.amqp.implementation;

import com.azure.core.amqp.AmqpEndpointState;
import com.azure.core.amqp.AmqpRetryOptions;
import com.azure.core.amqp.AmqpRetryPolicy;
import com.azure.core.amqp.FixedAmqpRetryPolicy;
import com.azure.core.amqp.implementation.AmqpReceiveLink;
import com.azure.core.amqp.implementation.CreditAccountingStrategy;
import com.azure.core.amqp.implementation.CreditFlowMode;
import com.azure.core.amqp.implementation.EmissionDrivenCreditAccountingStrategy;
import com.azure.core.amqp.implementation.ReceiversPumpingScheduler;
import com.azure.core.amqp.implementation.RequestDrivenCreditAccountingStrategy;
import com.azure.core.amqp.implementation.StringUtil;
import com.azure.core.amqp.implementation.handler.DeliveryNotOnLinkException;
import com.azure.core.util.AsyncCloseable;
import com.azure.core.util.FluxUtil;
import com.azure.core.util.logging.ClientLogger;
import com.azure.core.util.logging.LoggingEventBuilder;
import java.time.Duration;
import java.util.HashMap;
import java.util.Objects;
import java.util.Queue;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import org.apache.qpid.proton.amqp.transport.DeliveryState;
import org.apache.qpid.proton.message.Message;
import org.reactivestreams.Subscription;
import reactor.core.CoreSubscriber;
import reactor.core.Disposable;
import reactor.core.Disposables;
import reactor.core.Exceptions;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxOperator;
import reactor.core.publisher.Mono;
import reactor.core.publisher.Operators;
import reactor.core.scheduler.Schedulers;
import reactor.util.concurrent.Queues;
import reactor.util.context.Context;

public final class MessageFlux
extends FluxOperator<AmqpReceiveLink, Message> {
    public static final AmqpRetryPolicy NULL_RETRY_POLICY = new FixedAmqpRetryPolicy(new AmqpRetryOptions());
    private static final String MESSAGE_FLUX_KEY = "messageFlux";
    private final ClientLogger logger;
    private final int prefetch;
    private final CreditFlowMode creditFlowMode;
    private final AmqpRetryPolicy retryPolicy;
    private volatile DispositionFunction updateDispositionFunc;

    public MessageFlux(Flux<? extends AmqpReceiveLink> source, int prefetch, CreditFlowMode creditFlowMode, AmqpRetryPolicy retryPolicy) {
        super(source);
        HashMap<String, String> loggingContext = new HashMap<String, String>(1);
        loggingContext.put(MESSAGE_FLUX_KEY, StringUtil.getRandomString("mf"));
        this.logger = new ClientLogger(MessageFlux.class, loggingContext);
        if (prefetch < 0) {
            throw new IllegalArgumentException("prefetch >= 0 required but it was " + prefetch);
        }
        this.prefetch = prefetch;
        this.creditFlowMode = creditFlowMode;
        this.retryPolicy = Objects.requireNonNull(retryPolicy, "'retryPolicy' cannot be null.");
        this.updateDispositionFunc = DispositionFunction.NO_DISPOSITION;
    }

    public void subscribe(CoreSubscriber<? super Message> actual) {
        this.source.subscribe((CoreSubscriber)new RecoverableReactorReceiver(this, actual, this.prefetch, this.creditFlowMode, this.retryPolicy));
    }

    public Mono<Void> updateDisposition(String deliveryTag, DeliveryState deliveryState) {
        DispositionFunction function = this.updateDispositionFunc;
        return function.updateDisposition(deliveryTag, deliveryState);
    }

    void onNextUpdateDispositionFunction(DispositionFunction updateDispositionFunc) {
        this.updateDispositionFunc = updateDispositionFunc;
    }

    @FunctionalInterface
    private static interface DispositionFunction {
        public static final DispositionFunction NO_DISPOSITION = (t, s) -> Mono.error((Throwable)new IllegalStateException("Cannot update disposition as no receive-link is established."));

        public Mono<Void> updateDisposition(String var1, DeliveryState var2);
    }

    private static final class RecoverableReactorReceiver
    implements CoreSubscriber<AmqpReceiveLink>,
    Subscription {
        private final MediatorHolder mediatorHolder = new MediatorHolder();
        private final MessageFlux parent;
        private final int prefetch;
        private final CreditFlowMode creditFlowMode;
        private final AmqpRetryPolicy retryPolicy;
        private final ClientLogger logger;
        private final AtomicInteger retryAttempts = new AtomicInteger();
        private final CoreSubscriber<? super Message> messageSubscriber;
        private Subscription upstream;
        private volatile long requested;
        private static final AtomicLongFieldUpdater<RecoverableReactorReceiver> REQUESTED = AtomicLongFieldUpdater.newUpdater(RecoverableReactorReceiver.class, "requested");
        private volatile int wip;
        private static final AtomicIntegerFieldUpdater<RecoverableReactorReceiver> WIP = AtomicIntegerFieldUpdater.newUpdater(RecoverableReactorReceiver.class, "wip");
        private volatile boolean done;
        private volatile boolean cancelled;
        private volatile Throwable error;
        private static final AtomicReferenceFieldUpdater<RecoverableReactorReceiver, Throwable> ERROR = AtomicReferenceFieldUpdater.newUpdater(RecoverableReactorReceiver.class, Throwable.class, "error");

        RecoverableReactorReceiver(MessageFlux parent, CoreSubscriber<? super Message> messageSubscriber, int prefetch, CreditFlowMode creditFlowMode, AmqpRetryPolicy retryPolicy) {
            this.parent = parent;
            this.messageSubscriber = messageSubscriber;
            this.prefetch = prefetch;
            this.creditFlowMode = creditFlowMode;
            this.retryPolicy = retryPolicy;
            this.logger = parent.logger;
        }

        public void onSubscribe(Subscription s) {
            if (Operators.validate((Subscription)this.upstream, (Subscription)s)) {
                this.upstream = s;
                this.messageSubscriber.onSubscribe((Subscription)this);
                s.request(1L);
            }
        }

        public void onNext(AmqpReceiveLink receiver) {
            if (this.done) {
                receiver.closeAsync().subscribe();
                Operators.onNextDropped((Object)receiver, (Context)this.messageSubscriber.currentContext());
                return;
            }
            ReactorReceiverMediator mediator = new ReactorReceiverMediator(this, receiver, this.prefetch, this.creditFlowMode, this.logger);
            if (this.mediatorHolder.trySet(mediator)) {
                mediator.onParentReady();
            } else {
                this.logger.atWarning().addKeyValue("oldLinkName", this.mediatorHolder.getLinkName()).addKeyValue("linkName", receiver.getLinkName()).addKeyValue("entityPath", receiver.getEntityPath()).log("Got a AmqpReceiveLink when the MessageFlux is already terminated.");
                receiver.closeAsync().subscribe();
                Operators.onDiscard((Object)receiver, (Context)this.messageSubscriber.currentContext());
            }
        }

        public void onError(Throwable e) {
            if (this.done) {
                Operators.onErrorDropped((Throwable)e, (Context)this.messageSubscriber.currentContext());
                return;
            }
            if (Exceptions.addThrowable(ERROR, (Object)this, (Throwable)e)) {
                this.done = true;
                String logMessage = this.retryPolicy == NULL_RETRY_POLICY ? "Terminal error signal from Upstream|Receiver arrived at MessageFlux." : "Terminal error signal from Upstream|RetryLoop arrived at MessageFlux.";
                this.mediatorHolder.withReceiverInfo(this.logger.atWarning()).log(logMessage, new Object[]{e});
                this.drain(null);
            } else {
                Operators.onErrorDropped((Throwable)e, (Context)this.messageSubscriber.currentContext());
            }
        }

        public void onComplete() {
            if (this.done) {
                return;
            }
            this.done = true;
            String logMessage = this.retryPolicy == NULL_RETRY_POLICY ? "Terminal completion signal from Upstream|Receiver arrived at MessageFlux." : "Terminal completion signal from Upstream arrived at MessageFlux.";
            this.mediatorHolder.withReceiverInfo(this.logger.atWarning()).log(logMessage);
            this.drain(null);
        }

        public void request(long n) {
            if (Operators.validate((long)n)) {
                Operators.addCap(REQUESTED, (Object)this, (long)n);
                this.drain(null);
            }
        }

        public void cancel() {
            if (this.cancelled) {
                return;
            }
            this.cancelled = true;
            this.mediatorHolder.withReceiverInfo(this.logger.atWarning()).log("Downstream cancellation signal arrived at MessageFlux.");
            if (WIP.getAndIncrement(this) == 0) {
                this.upstream.cancel();
                this.mediatorHolder.freeze();
            }
        }

        void onMediatorReady(DispositionFunction updateDispositionFunc) {
            this.retryAttempts.set(0);
            this.parent.onNextUpdateDispositionFunction(updateDispositionFunc);
            this.drain(null);
        }

        void drain(Message dataSignal) {
            if (WIP.getAndIncrement(this) != 0) {
                if (dataSignal != null && this.cancelled) {
                    Operators.onDiscard((Object)dataSignal, (Context)this.messageSubscriber.currentContext());
                }
                return;
            }
            this.drainLoop();
        }

        private void drainLoop() {
            int missed = 1;
            CoreSubscriber<? super Message> downstream = this.messageSubscriber;
            do {
                boolean hasMediator;
                boolean d = this.done;
                ReactorReceiverMediator mediator = this.mediatorHolder.mediator;
                boolean bl = hasMediator = mediator != null;
                if (this.terminateIfCancelled(downstream, null)) {
                    return;
                }
                if (this.terminateIfErrorOrCompletionSignaled(d, downstream, null)) {
                    return;
                }
                long r = this.requested;
                boolean mediatorTerminatedAndDrained = false;
                if (r != 0L && hasMediator) {
                    long emitted;
                    Queue<Message> q = mediator.queue;
                    for (emitted = 0L; emitted != r; ++emitted) {
                        boolean empty;
                        Message message = q.poll();
                        if (this.terminateIfCancelled(downstream, message)) {
                            return;
                        }
                        if (this.terminateIfErrorOrCompletionSignaled(this.done, downstream, message)) {
                            return;
                        }
                        boolean bl2 = empty = message == null;
                        if (empty && mediator.done) {
                            mediatorTerminatedAndDrained = true;
                            break;
                        }
                        if (empty) break;
                        this.messageSubscriber.onNext((Object)message);
                    }
                    if (emitted == r && mediator.queue.isEmpty() && mediator.done) {
                        mediatorTerminatedAndDrained = true;
                    }
                    if (emitted != 0L && r != Long.MAX_VALUE) {
                        r = REQUESTED.addAndGet(this, -emitted);
                    }
                    mediator.update(r, emitted);
                }
                if (r == 0L && hasMediator) {
                    if (this.terminateIfCancelled(downstream, null)) {
                        return;
                    }
                    if (this.terminateIfErrorOrCompletionSignaled(this.done, downstream, null)) {
                        return;
                    }
                    if (mediator.queue.isEmpty() && mediator.done) {
                        mediatorTerminatedAndDrained = true;
                    }
                }
                if (!mediatorTerminatedAndDrained || mediator.isRetryInitiated) continue;
                mediator.isRetryInitiated = true;
                mediator.closeAsync().subscribe();
                this.setTerminationSignalOrScheduleNextMediatorRequest(mediator.error, downstream, this.mediatorHolder);
            } while ((missed = WIP.addAndGet(this, -missed)) != 0);
        }

        private boolean terminateIfCancelled(CoreSubscriber<? super Message> downstream, Message messageDropped) {
            if (this.cancelled) {
                Operators.onDiscard((Object)messageDropped, (Context)downstream.currentContext());
                this.upstream.cancel();
                this.mediatorHolder.freeze();
                return true;
            }
            return false;
        }

        private boolean terminateIfErrorOrCompletionSignaled(boolean d, CoreSubscriber<? super Message> downstream, Message messageDropped) {
            if (d) {
                LoggingEventBuilder logBuilder = this.mediatorHolder.withReceiverInfo(this.logger.atWarning());
                Throwable e = this.error;
                if (e != null && e != Exceptions.TERMINATED) {
                    e = Exceptions.terminate(ERROR, (Object)this);
                    Operators.onDiscard((Object)messageDropped, (Context)downstream.currentContext());
                    this.upstream.cancel();
                    this.mediatorHolder.freeze();
                    logBuilder.log("MessageFlux reached a terminal error-state, signaling it downstream.", new Object[]{e});
                    downstream.onError(e);
                    return true;
                }
                Operators.onDiscard((Object)messageDropped, (Context)downstream.currentContext());
                this.upstream.cancel();
                this.mediatorHolder.freeze();
                logBuilder.log("MessageFlux reached a terminal completion-state, signaling it downstream.");
                downstream.onComplete();
                return true;
            }
            return false;
        }

        private void setTerminationSignalOrScheduleNextMediatorRequest(Throwable error, CoreSubscriber<? super Message> downstream, MediatorHolder mediatorHolder) {
            Duration delay;
            LoggingEventBuilder logBuilder = mediatorHolder.withReceiverInfo(this.logger.atWarning());
            if (this.cancelled || this.done) {
                logBuilder.log("MessageFlux reached terminal-state [done:{}, cancelled:{}].", new Object[]{this.done, this.cancelled});
                return;
            }
            if (this.retryPolicy == NULL_RETRY_POLICY) {
                if (error == null) {
                    this.onComplete();
                } else {
                    this.onError(error);
                }
                return;
            }
            if (error == null) {
                delay = Duration.ofSeconds(1L);
                logBuilder.addKeyValue("retryAfter", delay.toMillis()).log("Current mediator reached terminal completion-state (retriable:true).");
            } else {
                int attempt = this.retryAttempts.incrementAndGet();
                delay = this.retryPolicy.calculateRetryDelay(error, attempt);
                if (delay != null) {
                    logBuilder.addKeyValue("attempt", (long)attempt).addKeyValue("retryAfter", delay.toMillis()).log("Current mediator reached terminal error-state (retriable:true).", new Object[]{error});
                } else {
                    logBuilder.addKeyValue("attempt", (long)attempt).log("Current mediator reached terminal error-state (retriable:false) Or MessageFlux retries exhausted.", new Object[]{error});
                    this.onError(error);
                    return;
                }
            }
            try {
                this.scheduleNextMediatorRequest(delay, mediatorHolder);
            }
            catch (RejectedExecutionException ree) {
                RuntimeException e = Operators.onRejectedExecution((Throwable)ree, (Context)downstream.currentContext());
                mediatorHolder.withReceiverInfo(this.logger.atWarning()).log("Unable to schedule a request for a new mediator (retriable:false).", new Object[]{e});
                this.onError(e);
            }
        }

        private void scheduleNextMediatorRequest(Duration delay, MediatorHolder mediatorHolder) {
            Runnable task = () -> {
                LoggingEventBuilder logBuilder = mediatorHolder.withReceiverInfo(this.logger.atWarning());
                if (this.cancelled || this.done) {
                    logBuilder.log("During the backoff, MessageFlux reached terminal-state [done:{}, cancelled:{}].", new Object[]{this.done, this.cancelled});
                    return;
                }
                logBuilder.log("Requesting a new mediator.");
                this.upstream.request(1L);
            };
            mediatorHolder.nextMediatorRequestDisposable = Schedulers.parallel().schedule(task, delay.toMillis(), TimeUnit.MILLISECONDS);
        }
    }

    private static final class MediatorHolder {
        private boolean isFrozen;
        volatile ReactorReceiverMediator mediator;
        volatile Disposable nextMediatorRequestDisposable;

        private MediatorHolder() {
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        boolean trySet(ReactorReceiverMediator mediator) {
            MediatorHolder mediatorHolder = this;
            synchronized (mediatorHolder) {
                if (this.isFrozen) {
                    return false;
                }
                this.mediator = mediator;
                return true;
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        void freeze() {
            ReactorReceiverMediator m;
            Disposable d;
            MediatorHolder mediatorHolder = this;
            synchronized (mediatorHolder) {
                if (this.isFrozen) {
                    return;
                }
                d = this.nextMediatorRequestDisposable;
                m = this.mediator;
                this.isFrozen = true;
            }
            if (d != null) {
                d.dispose();
            }
            if (m != null) {
                m.closeAsync().subscribe();
            }
        }

        String getLinkName() {
            ReactorReceiverMediator m = this.mediator;
            return m != null ? m.receiver.getLinkName() : null;
        }

        LoggingEventBuilder withReceiverInfo(LoggingEventBuilder builder) {
            ReactorReceiverMediator m = this.mediator;
            if (m != null) {
                return builder.addKeyValue("connectionId", m.receiver.getConnectionId()).addKeyValue("linkName", m.receiver.getLinkName()).addKeyValue("entityPath", m.receiver.getEntityPath());
            }
            return builder;
        }
    }

    private static final class ReactorReceiverMediator
    implements AsyncCloseable,
    CoreSubscriber<Message>,
    Subscription {
        private static final Subscription CANCELLED_SUBSCRIPTION = Operators.cancelledSubscription();
        private final RecoverableReactorReceiver parent;
        private final AmqpReceiveLink receiver;
        private final int prefetch;
        private final CreditFlowMode creditFlowMode;
        private final ClientLogger logger;
        private final Disposable.Composite endpointStateDisposable = Disposables.composite();
        private CreditAccountingStrategy creditAccounting;
        private volatile boolean ready;
        private volatile Subscription s;
        private static final AtomicReferenceFieldUpdater<ReactorReceiverMediator, Subscription> S = AtomicReferenceFieldUpdater.newUpdater(ReactorReceiverMediator.class, Subscription.class, "s");
        volatile Throwable error;
        static final AtomicReferenceFieldUpdater<ReactorReceiverMediator, Throwable> ERROR = AtomicReferenceFieldUpdater.newUpdater(ReactorReceiverMediator.class, Throwable.class, "error");
        volatile boolean done;
        volatile boolean isRetryInitiated;
        final Queue<Message> queue;

        ReactorReceiverMediator(RecoverableReactorReceiver parent, AmqpReceiveLink receiver, int prefetch, CreditFlowMode creditFlowMode, ClientLogger logger) {
            this.parent = parent;
            this.receiver = receiver;
            this.prefetch = prefetch;
            this.creditFlowMode = creditFlowMode;
            this.logger = logger;
            this.queue = (Queue)Queues.get((int)Integer.MAX_VALUE).get();
        }

        void onParentReady() {
            this.updateLogWithReceiverId(this.logger.atWarning()).log("Setting next mediator and waiting for activation.");
            this.receiver.receive().subscribe((CoreSubscriber)this);
            Disposable endpointDisposable = this.receiver.getEndpointStates().filter(s -> s == AmqpEndpointState.ACTIVE).publishOn(ReceiversPumpingScheduler.instance()).subscribe(state -> {
                assert (state == AmqpEndpointState.ACTIVE);
                if (!this.ready) {
                    this.updateLogWithReceiverId(this.logger.atWarning()).log("The mediator is active.");
                    this.ready = true;
                    this.parent.onMediatorReady(this::updateDisposition);
                }
            }, e -> {
                this.updateLogWithReceiverId(this.logger.atWarning()).log("Receiver emitted terminal error.", new Object[]{e});
                this.onLinkError((Throwable)e);
            }, () -> {
                this.updateLogWithReceiverId(this.logger.atWarning()).log("Receiver emitted terminal completion.");
                this.onLinkComplete();
            });
            this.endpointStateDisposable.add(endpointDisposable);
        }

        public void onSubscribe(Subscription s) {
            if (Operators.setOnce(S, (Object)this, (Subscription)s)) {
                switch (this.creditFlowMode) {
                    case RequestDriven: {
                        this.creditAccounting = new RequestDrivenCreditAccountingStrategy(this.receiver, s, this.prefetch, this.logger);
                        break;
                    }
                    case EmissionDriven: {
                        this.creditAccounting = new EmissionDrivenCreditAccountingStrategy(this.receiver, s, this.prefetch, this.logger);
                        break;
                    }
                    default: {
                        throw new IllegalArgumentException("Unknown CreditFlowMode " + (Object)((Object)this.creditFlowMode));
                    }
                }
            }
        }

        void update(long request, long emitted) {
            if (this.ready && !this.done) {
                this.creditAccounting.update(request, emitted);
            }
        }

        public void onNext(Message message) {
            if (this.done) {
                Operators.onNextDropped((Object)message, (Context)this.parent.currentContext());
                return;
            }
            if (this.s == Operators.cancelledSubscription()) {
                Operators.onDiscard((Object)message, (Context)this.parent.currentContext());
                return;
            }
            if (this.queue.offer(message)) {
                this.parent.drain(message);
            } else {
                Operators.onOperatorError((Subscription)this, (Throwable)Exceptions.failWithOverflow((String)"Queue is full: Reactive Streams source doesn't respect backpressure"), (Context)this.parent.messageSubscriber.currentContext());
                Operators.onDiscard((Object)message, (Context)this.parent.messageSubscriber.currentContext());
                this.done = true;
                this.parent.drain(message);
            }
        }

        public void onError(Throwable e) {
        }

        private void onLinkError(Throwable e) {
            if (this.done) {
                Operators.onErrorDropped((Throwable)e, (Context)this.parent.messageSubscriber.currentContext());
                return;
            }
            if (ERROR.compareAndSet(this, null, e)) {
                this.done = true;
                this.parent.drain(null);
            } else {
                this.done = true;
                Operators.onErrorDropped((Throwable)e, (Context)this.parent.messageSubscriber.currentContext());
            }
        }

        public void onComplete() {
        }

        private void onLinkComplete() {
            if (this.done) {
                return;
            }
            this.done = true;
            this.parent.drain(null);
        }

        public void request(long n) {
            throw new IllegalStateException("The request accounting must be through update(,).");
        }

        public void cancel() {
            if (Operators.terminate(S, (Object)this)) {
                Operators.onDiscardQueueWithClear(this.queue, (Context)this.parent.currentContext(), null);
            }
            this.endpointStateDisposable.dispose();
        }

        public Mono<Void> closeAsync() {
            this.cancel();
            return this.receiver.closeAsync();
        }

        private Mono<Void> updateDisposition(String deliveryTag, DeliveryState deliveryState) {
            if (this.done || this.s == CANCELLED_SUBSCRIPTION) {
                Throwable upstreamError;
                String state = String.format("[link.done:%b link.cancelled:%b parent.done:%b parent.cancelled:%b]", this.done, this.s == CANCELLED_SUBSCRIPTION, this.parent.done, this.parent.cancelled);
                DeliveryNotOnLinkException dispositionError = DeliveryNotOnLinkException.linkClosed(deliveryTag, deliveryState);
                Throwable receiverError = this.error;
                if (receiverError != null) {
                    dispositionError.addSuppressed(receiverError);
                }
                if ((upstreamError = this.parent.error) != null) {
                    dispositionError.addSuppressed(upstreamError);
                }
                return FluxUtil.monoError((LoggingEventBuilder)this.logger.atError().addKeyValue("lockToken", deliveryTag).addKeyValue("deliveryState", (Object)deliveryState).addKeyValue("messageFluxState", state), (RuntimeException)dispositionError);
            }
            return this.receiver.updateDisposition(deliveryTag, deliveryState);
        }

        private LoggingEventBuilder updateLogWithReceiverId(LoggingEventBuilder builder) {
            return builder.addKeyValue("connectionId", this.receiver.getConnectionId()).addKeyValue("linkName", this.receiver.getLinkName()).addKeyValue("entityPath", this.receiver.getEntityPath());
        }
    }
}

