/*
 * Decompiled with CFR 0.152.
 */
package com.azure.messaging.servicebus;

import com.azure.core.amqp.AmqpRetryPolicy;
import com.azure.core.amqp.implementation.CreditFlowMode;
import com.azure.core.amqp.implementation.MessageFlux;
import com.azure.core.amqp.implementation.MessageSerializer;
import com.azure.core.amqp.implementation.handler.DeliveryNotOnLinkException;
import com.azure.core.util.AsyncCloseable;
import com.azure.core.util.FluxUtil;
import com.azure.core.util.logging.ClientLogger;
import com.azure.messaging.servicebus.MessagePumpTerminatedException;
import com.azure.messaging.servicebus.ServiceBusErrorContext;
import com.azure.messaging.servicebus.ServiceBusErrorSource;
import com.azure.messaging.servicebus.ServiceBusException;
import com.azure.messaging.servicebus.ServiceBusMessageContext;
import com.azure.messaging.servicebus.ServiceBusReceivedMessage;
import com.azure.messaging.servicebus.ServiceBusReceivedMessageContext;
import com.azure.messaging.servicebus.ServiceBusSessionAcquirer;
import com.azure.messaging.servicebus.ServiceBusSessionReactorReceiver;
import com.azure.messaging.servicebus.ServiceBusTransactionContext;
import com.azure.messaging.servicebus.implementation.DispositionStatus;
import com.azure.messaging.servicebus.implementation.MessageUtils;
import com.azure.messaging.servicebus.implementation.instrumentation.ReceiverKind;
import com.azure.messaging.servicebus.implementation.instrumentation.ServiceBusReceiverInstrumentation;
import com.azure.messaging.servicebus.implementation.instrumentation.ServiceBusTracer;
import com.azure.messaging.servicebus.models.AbandonOptions;
import com.azure.messaging.servicebus.models.CompleteOptions;
import com.azure.messaging.servicebus.models.DeadLetterOptions;
import com.azure.messaging.servicebus.models.DeferOptions;
import com.azure.messaging.servicebus.models.ServiceBusReceiveMode;
import java.time.Duration;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Supplier;
import org.apache.qpid.proton.amqp.transport.DeliveryState;
import org.apache.qpid.proton.message.Message;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;

final class SessionsMessagePump {
    private static final AtomicLong COUNTER = new AtomicLong();
    private static final ArrayList<RollingSessionReceiver> EMPTY = new ArrayList(0);
    private static final ArrayList<RollingSessionReceiver> TERMINATED = new ArrayList(0);
    private static final Duration CONNECTION_STATE_POLL_INTERVAL = Duration.ofSeconds(20L);
    private final long pumpId;
    private final ClientLogger logger;
    private final String identifier;
    private final String fullyQualifiedNamespace;
    private final String entityPath;
    private final ServiceBusReceiverInstrumentation instrumentation;
    private final ServiceBusSessionAcquirer sessionAcquirer;
    private final Duration maxSessionLockRenew;
    private final Duration sessionIdleTimeout;
    private final int maxConcurrentSessions;
    private final int concurrencyPerSession;
    private final int prefetch;
    private final boolean enableAutoDisposition;
    private final MessageSerializer serializer;
    private final AmqpRetryPolicy retryPolicy;
    private final Consumer<ServiceBusReceivedMessageContext> processMessage;
    private final Consumer<ServiceBusErrorContext> processError;
    private final Runnable onTerminate;
    private final AtomicReference<List<RollingSessionReceiver>> rollingReceiversRef = new AtomicReference<ArrayList<RollingSessionReceiver>>(EMPTY);
    private final SessionReceiversTracker receiversTracker;
    private final Mono<ServiceBusSessionAcquirer.Session> nextSession;

    SessionsMessagePump(String identifier, String fullyQualifiedNamespace, String entityPath, ServiceBusReceiveMode receiveMode, ServiceBusReceiverInstrumentation instrumentation, ServiceBusSessionAcquirer sessionAcquirer, Duration maxSessionLockRenew, Duration sessionIdleTimeout, int maxConcurrentSessions, int concurrencyPerSession, int prefetch, boolean enableAutoDisposition, MessageSerializer serializer, AmqpRetryPolicy retryPolicy, Consumer<ServiceBusReceivedMessageContext> processMessage, Consumer<ServiceBusErrorContext> processError, Runnable onTerminate) {
        this.pumpId = COUNTER.incrementAndGet();
        HashMap<String, Object> loggingContext = new HashMap<String, Object>(3);
        loggingContext.put("pumpId", this.pumpId);
        loggingContext.put("namespace", fullyQualifiedNamespace);
        loggingContext.put("entityPath", entityPath);
        this.logger = new ClientLogger(SessionsMessagePump.class, loggingContext);
        this.identifier = identifier;
        this.fullyQualifiedNamespace = Objects.requireNonNull(fullyQualifiedNamespace, "'fullyQualifiedNamespace' cannot be null.");
        this.entityPath = Objects.requireNonNull(entityPath, "'entityPath' cannot be null.");
        Objects.requireNonNull(receiveMode, "'receiveMode' cannot be null.");
        this.instrumentation = Objects.requireNonNull(instrumentation, "'instrumentation' cannot be null");
        this.sessionAcquirer = Objects.requireNonNull(sessionAcquirer, "'sessionAcquirer' cannot be null");
        this.maxSessionLockRenew = Objects.requireNonNull(maxSessionLockRenew, "'maxSessionLockRenew' cannot be null.");
        this.sessionIdleTimeout = sessionIdleTimeout != null ? sessionIdleTimeout : retryPolicy.getRetryOptions().getTryTimeout();
        this.maxConcurrentSessions = maxConcurrentSessions;
        this.concurrencyPerSession = concurrencyPerSession;
        this.prefetch = prefetch;
        this.enableAutoDisposition = enableAutoDisposition;
        this.serializer = Objects.requireNonNull(serializer, "'serializer' cannot be null.");
        this.retryPolicy = Objects.requireNonNull(retryPolicy, "'retryPolicy' cannot be null.");
        this.processMessage = Objects.requireNonNull(processMessage, "'processMessage' cannot be null.");
        this.processError = Objects.requireNonNull(processError, "'processError' cannot be null.");
        this.onTerminate = Objects.requireNonNull(onTerminate, "'onTerminate' cannot be null.");
        this.receiversTracker = new SessionReceiversTracker(this.logger, maxConcurrentSessions, fullyQualifiedNamespace, entityPath, receiveMode, instrumentation);
        this.nextSession = new NextSession(this.pumpId, fullyQualifiedNamespace, entityPath, sessionAcquirer).mono();
    }

    String getIdentifier() {
        return this.identifier;
    }

    Mono<Void> begin() {
        Mono createReceiversMono = Mono.fromSupplier(() -> {
            this.throwIfTerminatedOrInitialized();
            List<RollingSessionReceiver> rollingReceivers = this.createRollingSessionReceivers();
            if (!this.rollingReceiversRef.compareAndSet(EMPTY, rollingReceivers)) {
                rollingReceivers.clear();
                this.throwIfTerminatedOrInitialized();
            }
            return rollingReceivers;
        });
        Function<List, Mono> pumpFromReceiversMono = rollingReceivers -> {
            ArrayList<Mono<Void>> pumpingList = new ArrayList<Mono<Void>>(rollingReceivers.size());
            for (RollingSessionReceiver rollingReceiver : rollingReceivers) {
                pumpingList.add(rollingReceiver.begin());
            }
            Mono<Void> terminatePumping = this.pollConnectionState();
            Mono pumping = Mono.when(pumpingList);
            return Mono.firstWithSignal((Mono[])new Mono[]{terminatePumping, pumping});
        };
        Mono pumpingMessages = Mono.usingWhen((Publisher)createReceiversMono, pumpFromReceiversMono, __ -> this.terminate(TerminalSignalType.COMPLETED), (__, e) -> this.terminate(TerminalSignalType.ERRORED), __ -> this.terminate(TerminalSignalType.CANCELED));
        return pumpingMessages.onErrorMap(e -> {
            if (e instanceof MessagePumpTerminatedException) {
                return e;
            }
            return new MessagePumpTerminatedException(this.pumpId, this.fullyQualifiedNamespace, this.entityPath, "pumping#error-map", (Throwable)e);
        }).then(Mono.error(() -> MessagePumpTerminatedException.forCompletion(this.pumpId, this.fullyQualifiedNamespace, this.entityPath)));
    }

    private Mono<Void> pollConnectionState() {
        return Flux.interval((Duration)CONNECTION_STATE_POLL_INTERVAL).handle((ignored, sink) -> {
            if (this.sessionAcquirer.isConnectionClosed()) {
                RuntimeException e = this.logger.atInfo().log((RuntimeException)new MessagePumpTerminatedException(this.pumpId, this.fullyQualifiedNamespace, this.entityPath, "session#connection-state-poll"));
                sink.error((Throwable)e);
            }
        }).then();
    }

    private Mono<Void> terminate(TerminalSignalType signalType) {
        List rollingReceivers = this.rollingReceiversRef.getAndSet(TERMINATED);
        if (rollingReceivers == TERMINATED) {
            return Mono.empty();
        }
        this.logger.atInfo().log("Pump terminated. signal:" + (Object)((Object)signalType));
        this.receiversTracker.clear();
        this.onTerminate.run();
        return Mono.empty();
    }

    private List<RollingSessionReceiver> createRollingSessionReceivers() {
        ArrayList<RollingSessionReceiver> rollingReceivers = new ArrayList<RollingSessionReceiver>(this.maxConcurrentSessions);
        for (int rollerId = 1; rollerId <= this.maxConcurrentSessions; ++rollerId) {
            RollingSessionReceiver rollingReceiver = new RollingSessionReceiver(this.pumpId, rollerId, this.instrumentation, this.fullyQualifiedNamespace, this.entityPath, this.nextSession, this.maxSessionLockRenew, this.sessionIdleTimeout, this.concurrencyPerSession, this.prefetch, this.enableAutoDisposition, this.serializer, this.retryPolicy, this.processMessage, this.processError, this.receiversTracker);
            rollingReceivers.add(rollingReceiver);
        }
        return rollingReceivers;
    }

    private void throwIfTerminatedOrInitialized() {
        List<RollingSessionReceiver> l = this.rollingReceiversRef.get();
        if (l == TERMINATED) {
            throw this.logger.atVerbose().log((RuntimeException)new IllegalStateException("Cannot invoke begin() once terminated."));
        }
        if (l != EMPTY) {
            throw this.logger.atVerbose().log((RuntimeException)new IllegalStateException("Cannot invoke begin() more than once."));
        }
    }

    static final class SessionReceiversTracker {
        private final ClientLogger logger;
        private final String fullyQualifiedNamespace;
        private final String entityPath;
        private final ServiceBusReceiveMode receiveMode;
        private final ConcurrentHashMap<String, ServiceBusSessionReactorReceiver> receivers;
        private final ServiceBusReceiverInstrumentation instrumentation;

        private SessionReceiversTracker(ClientLogger logger, int size, String fullyQualifiedNamespace, String entityPath, ServiceBusReceiveMode receiveMode, ServiceBusReceiverInstrumentation instrumentation) {
            this.logger = logger;
            this.fullyQualifiedNamespace = fullyQualifiedNamespace;
            this.entityPath = entityPath;
            this.receiveMode = receiveMode;
            this.receivers = new ConcurrentHashMap(size);
            this.instrumentation = instrumentation;
        }

        private void track(ServiceBusSessionReactorReceiver receiver) {
            this.receivers.put(receiver.getSessionId(), receiver);
        }

        private void untrack(ServiceBusSessionReactorReceiver receiver) {
            this.receivers.remove(receiver.getSessionId(), receiver);
        }

        private void clear() {
            this.receivers.clear();
        }

        String getFullyQualifiedNamespace() {
            return this.fullyQualifiedNamespace;
        }

        String getEntityPath() {
            return this.entityPath;
        }

        Mono<Void> abandon(ServiceBusReceivedMessage message) {
            return this.updateDisposition(message, DispositionStatus.ABANDONED, null, null, null, null);
        }

        Mono<Void> complete(ServiceBusReceivedMessage message) {
            return this.updateDisposition(message, DispositionStatus.COMPLETED, null, null, null, null);
        }

        Mono<Void> deadLetter(ServiceBusReceivedMessage message) {
            return this.updateDisposition(message, DispositionStatus.SUSPENDED, null, null, null, null);
        }

        Mono<Void> defer(ServiceBusReceivedMessage message) {
            return this.updateDisposition(message, DispositionStatus.DEFERRED, null, null, null, null);
        }

        Mono<Void> abandon(ServiceBusReceivedMessage message, AbandonOptions options) {
            Mono<Void> nullError = this.checkNull(options, options != null ? options.getTransactionContext() : null);
            if (nullError != null) {
                return nullError;
            }
            return this.updateDisposition(message, DispositionStatus.ABANDONED, options.getPropertiesToModify(), null, null, options.getTransactionContext());
        }

        Mono<Void> complete(ServiceBusReceivedMessage message, CompleteOptions options) {
            Mono<Void> nullError = this.checkNull(options, options != null ? options.getTransactionContext() : null);
            if (nullError != null) {
                return nullError;
            }
            return this.updateDisposition(message, DispositionStatus.COMPLETED, null, null, null, options.getTransactionContext());
        }

        Mono<Void> deadLetter(ServiceBusReceivedMessage message, DeadLetterOptions options) {
            Mono<Void> nullError = this.checkNull(options, options != null ? options.getTransactionContext() : null);
            if (nullError != null) {
                return nullError;
            }
            return this.updateDisposition(message, DispositionStatus.SUSPENDED, options.getPropertiesToModify(), options.getDeadLetterReason(), options.getDeadLetterErrorDescription(), options.getTransactionContext());
        }

        Mono<Void> defer(ServiceBusReceivedMessage message, DeferOptions options) {
            Mono<Void> nullError = this.checkNull(options, options != null ? options.getTransactionContext() : null);
            if (nullError != null) {
                return nullError;
            }
            return this.updateDisposition(message, DispositionStatus.DEFERRED, options.getPropertiesToModify(), null, null, options.getTransactionContext());
        }

        private Mono<Void> updateDisposition(ServiceBusReceivedMessage message, DispositionStatus dispositionStatus, Map<String, Object> propertiesToModify, String deadLetterReason, String deadLetterDescription, ServiceBusTransactionContext transactionContext) {
            if (this.receiveMode != ServiceBusReceiveMode.PEEK_LOCK) {
                String m = String.format("'%s' is not supported on a receiver opened in ReceiveMode.RECEIVE_AND_DELETE.", new Object[]{dispositionStatus});
                return Mono.error((Throwable)new UnsupportedOperationException(m));
            }
            if (message.isSettled()) {
                String m = "The message has either been deleted or already settled.";
                return Mono.error((Throwable)new IllegalArgumentException("The message has either been deleted or already settled."));
            }
            if (message.getLockToken() == null) {
                String m = "This operation is not supported for peeked messages. Only messages received using receiveMessages() in PEEK_LOCK mode can be settled.";
                return Mono.error((Throwable)new UnsupportedOperationException("This operation is not supported for peeked messages. Only messages received using receiveMessages() in PEEK_LOCK mode can be settled."));
            }
            String sessionId = message.getSessionId();
            ServiceBusSessionReactorReceiver receiver = this.receivers.get(sessionId);
            DeliveryState deliveryState = MessageUtils.getDeliveryState(dispositionStatus, deadLetterReason, deadLetterDescription, propertiesToModify, transactionContext);
            Mono updateDispositionMono = receiver != null ? receiver.updateDisposition(message.getLockToken(), deliveryState) : Mono.error((Throwable)DeliveryNotOnLinkException.noMatchingDelivery((String)message.getLockToken(), (DeliveryState)deliveryState));
            return this.instrumentation.instrumentSettlement(updateDispositionMono, message, message.getContext(), dispositionStatus);
        }

        private Mono<Void> checkNull(Object options, ServiceBusTransactionContext transactionContext) {
            if (options == null) {
                return FluxUtil.monoError((ClientLogger)this.logger, (RuntimeException)new NullPointerException("'options' cannot be null."));
            }
            if (transactionContext != null && transactionContext.getTransactionId() == null) {
                return FluxUtil.monoError((ClientLogger)this.logger, (RuntimeException)new NullPointerException("'options.transactionContext.transactionId' cannot be null."));
            }
            return null;
        }
    }

    private static final class NextSession
    implements Supplier<Mono<ServiceBusSessionAcquirer.Session>> {
        private final AtomicReference<Boolean> isTerminated = new AtomicReference<Boolean>(false);
        private final long pumpId;
        private final String fullyQualifiedNamespace;
        private final String entityPath;
        private final ServiceBusSessionAcquirer sessionAcquirer;

        NextSession(long pumpId, String fullyQualifiedNamespace, String entityPath, ServiceBusSessionAcquirer sessionAcquirer) {
            this.pumpId = pumpId;
            this.fullyQualifiedNamespace = fullyQualifiedNamespace;
            this.entityPath = entityPath;
            this.sessionAcquirer = sessionAcquirer;
        }

        Mono<ServiceBusSessionAcquirer.Session> mono() {
            NextSession supplier = this;
            return Mono.defer((Supplier)supplier);
        }

        @Override
        public Mono<ServiceBusSessionAcquirer.Session> get() {
            if (this.isTerminated.get().booleanValue()) {
                return Mono.error((Throwable)new MessagePumpTerminatedException(this.pumpId, this.fullyQualifiedNamespace, this.entityPath, "session#acquire"));
            }
            return this.sessionAcquirer.acquire().onErrorMap(e -> {
                this.isTerminated.set(true);
                return new MessagePumpTerminatedException(this.pumpId, this.fullyQualifiedNamespace, this.entityPath, "session#acquire", (Throwable)e);
            });
        }
    }

    private static final class RollingSessionReceiver
    extends AtomicReference<State<ServiceBusSessionReactorReceiver>> {
        private static final String ROLLER_ID_KEY = "roller-id";
        private static final State<ServiceBusSessionReactorReceiver> INIT = State.init();
        private static final State<ServiceBusSessionReactorReceiver> TERMINATED = State.terminated();
        private final ClientLogger logger;
        private final long pumpId;
        private final int rollerId;
        private final String fullyQualifiedNamespace;
        private final String entityPath;
        private final int concurrency;
        private final Consumer<ServiceBusReceivedMessageContext> processMessage;
        private final Consumer<ServiceBusErrorContext> processError;
        private final boolean enableAutoDisposition;
        private final Duration maxSessionLockRenew;
        private final Duration sessionIdleTimeout;
        private final MessageSerializer serializer;
        private final ServiceBusReceiverInstrumentation instrumentation;
        private final ServiceBusTracer tracer;
        private final SessionReceiversTracker receiversTracker;
        private final NextSessionStream nextSessionStream;
        private final MessageFlux messageFlux;

        RollingSessionReceiver(long pumpId, int rollerId, ServiceBusReceiverInstrumentation instrumentation, String fullyQualifiedNamespace, String entityPath, Mono<ServiceBusSessionAcquirer.Session> nextSession, Duration maxSessionLockRenew, Duration sessionIdleTimeout, int concurrency, int prefetch, boolean enableAutoDisposition, MessageSerializer serializer, AmqpRetryPolicy retryPolicy, Consumer<ServiceBusReceivedMessageContext> processMessage, Consumer<ServiceBusErrorContext> processError, SessionReceiversTracker receiversTracker) {
            super(INIT);
            this.pumpId = pumpId;
            HashMap<String, Object> loggingContext = new HashMap<String, Object>(3);
            loggingContext.put(ROLLER_ID_KEY, rollerId);
            loggingContext.put("namespace", fullyQualifiedNamespace);
            loggingContext.put("entityPath", entityPath);
            this.logger = new ClientLogger(RollingSessionReceiver.class, loggingContext);
            this.rollerId = rollerId;
            this.fullyQualifiedNamespace = fullyQualifiedNamespace;
            this.entityPath = entityPath;
            this.concurrency = concurrency;
            this.processError = processError;
            this.processMessage = processMessage;
            this.enableAutoDisposition = enableAutoDisposition;
            this.maxSessionLockRenew = maxSessionLockRenew;
            this.sessionIdleTimeout = sessionIdleTimeout;
            this.serializer = serializer;
            this.instrumentation = instrumentation;
            this.tracer = instrumentation.getTracer();
            this.receiversTracker = receiversTracker;
            this.nextSessionStream = new NextSessionStream(pumpId, rollerId, fullyQualifiedNamespace, entityPath, nextSession);
            Flux nextSessionReceiverStream = this.nextSessionStream.flux().map(this::nextSessionReceiver);
            this.messageFlux = new MessageFlux(nextSessionReceiverStream, prefetch, CreditFlowMode.RequestDriven, retryPolicy);
        }

        Mono<Void> begin() {
            return Mono.usingWhen((Publisher)Mono.fromSupplier(() -> {
                Scheduler workerScheduler = this.concurrency > 1 ? Schedulers.newBoundedElastic((int)Schedulers.DEFAULT_BOUNDED_ELASTIC_SIZE, (int)Schedulers.DEFAULT_BOUNDED_ELASTIC_QUEUESIZE, (String)("rolling-session-receiver-" + this.rollerId)) : Schedulers.immediate();
                return workerScheduler;
            }), workerScheduler -> {
                RunOnWorker handleMessageOnWorker = new RunOnWorker(this::handleMessage, (Scheduler)workerScheduler);
                return this.messageFlux.flatMap((Function)handleMessageOnWorker, this.concurrency, 1).then();
            }, workerScheduler -> this.terminate(TerminalSignalType.COMPLETED, (Scheduler)workerScheduler), (workerScheduler, e) -> this.terminate(TerminalSignalType.ERRORED, (Scheduler)workerScheduler), workerScheduler -> this.terminate(TerminalSignalType.CANCELED, (Scheduler)workerScheduler));
        }

        private Mono<Void> terminate(TerminalSignalType signalType, Scheduler workerScheduler) {
            State<ServiceBusSessionReactorReceiver> state = super.getAndSet(TERMINATED);
            if (state == TERMINATED) {
                return Mono.empty();
            }
            this.logger.atInfo().log("Roller terminated. rollerId:" + this.rollerId + " signal:" + (Object)((Object)signalType));
            this.nextSessionStream.close();
            workerScheduler.dispose();
            return Mono.empty();
        }

        private ServiceBusSessionReactorReceiver nextSessionReceiver(ServiceBusSessionAcquirer.Session nextSession) {
            State lastState = (State)super.get();
            if (lastState == TERMINATED) {
                nextSession.getLink().closeAsync().subscribe();
                throw new MessagePumpTerminatedException(this.pumpId, this.fullyQualifiedNamespace, this.entityPath, "session#next-receiver roller_" + this.rollerId);
            }
            ServiceBusSessionReactorReceiver nextSessionReceiver = new ServiceBusSessionReactorReceiver(this.logger, this.tracer, nextSession, this.sessionIdleTimeout, this.maxSessionLockRenew);
            if (!super.compareAndSet(lastState, new State<ServiceBusSessionReactorReceiver>(nextSessionReceiver))) {
                nextSessionReceiver.closeAsync().subscribe();
                throw new MessagePumpTerminatedException(this.pumpId, this.fullyQualifiedNamespace, this.entityPath, "session#next-receiver roller_" + this.rollerId);
            }
            if (lastState != INIT) {
                ServiceBusSessionReactorReceiver lastSessionReceiver = (ServiceBusSessionReactorReceiver)lastState.receiver;
                this.receiversTracker.untrack(lastSessionReceiver);
            }
            this.receiversTracker.track(nextSessionReceiver);
            return nextSessionReceiver;
        }

        private void handleMessage(Message qpidMessage) {
            ServiceBusReceivedMessage message = (ServiceBusReceivedMessage)this.serializer.deserialize(qpidMessage, ServiceBusReceivedMessage.class);
            this.instrumentation.instrumentProcess(message, ReceiverKind.PROCESSOR, msg -> {
                this.logger.atVerbose().addKeyValue("sessionId", message.getSessionId()).addKeyValue("messageId", message.getMessageId()).log("Received message.");
                Throwable error = this.notifyMessage((ServiceBusReceivedMessage)msg);
                if (this.enableAutoDisposition) {
                    if (error == null) {
                        this.complete((ServiceBusReceivedMessage)msg);
                    } else {
                        this.abandon((ServiceBusReceivedMessage)msg);
                    }
                }
                return error;
            });
        }

        private Throwable notifyMessage(ServiceBusReceivedMessage message) {
            try {
                this.processMessage.accept(new ServiceBusReceivedMessageContext(this.receiversTracker, new ServiceBusMessageContext(message)));
            }
            catch (Exception e) {
                this.notifyError((Throwable)((Object)new ServiceBusException(e, ServiceBusErrorSource.USER_CALLBACK)));
                return e;
            }
            return null;
        }

        private void notifyError(Throwable throwable) {
            try {
                this.processError.accept(new ServiceBusErrorContext(throwable, this.fullyQualifiedNamespace, this.entityPath));
            }
            catch (Exception e) {
                this.logger.atVerbose().log("Ignoring error from user processError handler.", new Object[]{e});
            }
        }

        private void complete(ServiceBusReceivedMessage message) {
            try {
                this.receiversTracker.complete(message).block();
            }
            catch (Exception e) {
                this.logger.atVerbose().log("Failed to complete message", new Object[]{e});
            }
        }

        private void abandon(ServiceBusReceivedMessage message) {
            try {
                this.receiversTracker.abandon(message).block();
            }
            catch (Exception e) {
                this.logger.atVerbose().log("Failed to abandon message", new Object[]{e});
            }
        }

        private static final class NextSessionStream
        extends AtomicBoolean {
            private final long pumpId;
            private final int rollerId;
            private final String fullyQualifiedNamespace;
            private final String entityPath;
            private final Mono<ServiceBusSessionAcquirer.Session> newSession;

            NextSessionStream(long pumpId, int rollerId, String fullyQualifiedNamespace, String entityPath, Mono<ServiceBusSessionAcquirer.Session> nextSession) {
                super(false);
                this.pumpId = pumpId;
                this.rollerId = rollerId;
                this.fullyQualifiedNamespace = fullyQualifiedNamespace;
                this.entityPath = entityPath;
                this.newSession = Mono.defer(() -> {
                    boolean isTerminated = super.get();
                    if (isTerminated) {
                        return Mono.error((Throwable)new MessagePumpTerminatedException(this.pumpId, this.fullyQualifiedNamespace, this.entityPath, "session#next-link roller_" + this.rollerId));
                    }
                    return nextSession;
                }).map(session -> {
                    boolean isTerminated = super.get();
                    if (isTerminated) {
                        session.getLink().closeAsync().subscribe();
                        throw new MessagePumpTerminatedException(this.pumpId, this.fullyQualifiedNamespace, this.entityPath, "session#next-link roller_" + this.rollerId);
                    }
                    return session;
                });
            }

            Flux<ServiceBusSessionAcquirer.Session> flux() {
                return NextSessionStream.nonEagerRepeat(this.newSession);
            }

            void close() {
                super.set(true);
            }

            private static Flux<ServiceBusSessionAcquirer.Session> nonEagerRepeat(Mono<ServiceBusSessionAcquirer.Session> source) {
                return source.cacheInvalidateIf(cachedSession -> cachedSession.getLink().isDisposed()).repeat().filter(session -> !session.getLink().isDisposed());
            }
        }

        private static final class RunOnWorker
        implements Function<Message, Publisher<Void>> {
            private final Consumer<Message> handleMessage;
            private final Scheduler workerScheduler;

            RunOnWorker(Consumer<Message> handleMessage, Scheduler workerScheduler) {
                this.handleMessage = handleMessage;
                this.workerScheduler = workerScheduler;
            }

            @Override
            public Mono<Void> apply(Message qpidMessage) {
                return Mono.fromRunnable(() -> this.handleMessage.accept(qpidMessage)).subscribeOn(this.workerScheduler);
            }
        }
    }

    private static enum TerminalSignalType {
        COMPLETED,
        ERRORED,
        CANCELED;

    }

    private static final class State<T extends AsyncCloseable> {
        final T receiver;

        State(T receiver) {
            this.receiver = (AsyncCloseable)Objects.requireNonNull(receiver);
        }

        static <T extends AsyncCloseable> State<T> init() {
            return new State<T>();
        }

        static <T extends AsyncCloseable> State<T> terminated() {
            return new State<T>();
        }

        private State() {
            this.receiver = null;
        }
    }
}

