/*
 * Decompiled with CFR 0.152.
 */
package com.azure.messaging.servicebus;

import com.azure.core.util.logging.ClientLogger;
import com.azure.messaging.servicebus.MessagePumpTerminatedException;
import com.azure.messaging.servicebus.ServiceBusErrorContext;
import com.azure.messaging.servicebus.ServiceBusErrorSource;
import com.azure.messaging.servicebus.ServiceBusException;
import com.azure.messaging.servicebus.ServiceBusMessageContext;
import com.azure.messaging.servicebus.ServiceBusReceivedMessage;
import com.azure.messaging.servicebus.ServiceBusReceivedMessageContext;
import com.azure.messaging.servicebus.ServiceBusReceiverAsyncClient;
import com.azure.messaging.servicebus.implementation.instrumentation.ReceiverKind;
import com.azure.messaging.servicebus.implementation.instrumentation.ServiceBusReceiverInstrumentation;
import java.time.Duration;
import java.util.HashMap;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;
import java.util.function.Function;
import org.reactivestreams.Publisher;
import reactor.core.Disposable;
import reactor.core.Disposables;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;

final class MessagePump {
    private static final AtomicLong COUNTER = new AtomicLong();
    private static final Duration CONNECTION_STATE_POLL_INTERVAL = Duration.ofSeconds(20L);
    private final long pumpId = COUNTER.incrementAndGet();
    private final ServiceBusReceiverAsyncClient client;
    private final String fullyQualifiedNamespace;
    private final String entityPath;
    private final ClientLogger logger;
    private final Consumer<ServiceBusReceivedMessageContext> processMessage;
    private final Consumer<ServiceBusErrorContext> processError;
    private final int concurrency;
    private final boolean enableAutoDisposition;
    private final boolean enableAutoLockRenew;
    private final Scheduler workerScheduler;
    private final ServiceBusReceiverInstrumentation instrumentation;

    MessagePump(ServiceBusReceiverAsyncClient client, Consumer<ServiceBusReceivedMessageContext> processMessage, Consumer<ServiceBusErrorContext> processError, int concurrency, boolean enableAutoDisposition) {
        this.client = client;
        this.fullyQualifiedNamespace = this.client.getFullyQualifiedNamespace();
        this.entityPath = this.client.getEntityPath();
        HashMap<String, Object> loggingContext = new HashMap<String, Object>(3);
        loggingContext.put("pumpId", this.pumpId);
        loggingContext.put("namespace", this.fullyQualifiedNamespace);
        loggingContext.put("entityPath", this.entityPath);
        this.logger = new ClientLogger(MessagePump.class, loggingContext);
        this.processMessage = processMessage;
        this.processError = processError;
        this.concurrency = concurrency;
        this.enableAutoDisposition = enableAutoDisposition;
        this.enableAutoLockRenew = client.isAutoLockRenewRequested();
        this.workerScheduler = concurrency > 1 ? Schedulers.boundedElastic() : Schedulers.immediate();
        this.instrumentation = this.client.getInstrumentation();
    }

    Mono<Void> begin() {
        Mono<Void> terminatePumping = this.pollConnectionState();
        Mono pumping = this.client.nonSessionProcessorReceiveV2().flatMap((Function)new RunOnWorker(this::handleMessage, this.workerScheduler), this.concurrency, 1).then();
        Mono pumpingMessages = Mono.firstWithSignal((Mono[])new Mono[]{pumping, terminatePumping});
        return pumpingMessages.onErrorMap(e -> {
            if (e instanceof MessagePumpTerminatedException) {
                return e;
            }
            return new MessagePumpTerminatedException(this.pumpId, this.fullyQualifiedNamespace, this.entityPath, "pumping#error-map", (Throwable)e);
        }).then(Mono.error(() -> MessagePumpTerminatedException.forCompletion(this.pumpId, this.fullyQualifiedNamespace, this.entityPath)));
    }

    private Mono<Void> pollConnectionState() {
        return Flux.interval((Duration)CONNECTION_STATE_POLL_INTERVAL).handle((ignored, sink) -> {
            if (this.client.isConnectionClosed()) {
                RuntimeException e = this.logger.atInfo().log((RuntimeException)new MessagePumpTerminatedException(this.pumpId, this.fullyQualifiedNamespace, this.entityPath, "non-session#connection-state-poll"));
                sink.error((Throwable)e);
            }
        }).then();
    }

    private void handleMessage(ServiceBusReceivedMessage message) {
        this.instrumentation.instrumentProcess(message, ReceiverKind.PROCESSOR, msg -> {
            Disposable lockRenewDisposable = this.enableAutoLockRenew ? this.client.beginLockRenewal(message) : Disposables.disposed();
            Throwable error = this.notifyMessage(message);
            if (this.enableAutoDisposition) {
                if (error == null) {
                    this.complete(message);
                } else {
                    this.abandon(message);
                }
            }
            lockRenewDisposable.dispose();
            return error;
        });
    }

    private Throwable notifyMessage(ServiceBusReceivedMessage message) {
        try {
            this.processMessage.accept(new ServiceBusReceivedMessageContext(this.client, new ServiceBusMessageContext(message)));
        }
        catch (Exception e) {
            this.notifyError((Throwable)((Object)new ServiceBusException(e, ServiceBusErrorSource.USER_CALLBACK)));
            return e;
        }
        return null;
    }

    private void notifyError(Throwable throwable) {
        try {
            this.processError.accept(new ServiceBusErrorContext(throwable, this.fullyQualifiedNamespace, this.entityPath));
        }
        catch (Exception e) {
            this.logger.atVerbose().log("Ignoring error from user processError handler.", new Object[]{e});
        }
    }

    private void complete(ServiceBusReceivedMessage message) {
        try {
            this.client.complete(message).block();
        }
        catch (Exception e) {
            this.logger.atVerbose().log("Failed to complete message", new Object[]{e});
        }
    }

    private void abandon(ServiceBusReceivedMessage message) {
        try {
            this.client.abandon(message).block();
        }
        catch (Exception e) {
            this.logger.atVerbose().log("Failed to abandon message", new Object[]{e});
        }
    }

    private static final class RunOnWorker
    implements Function<ServiceBusReceivedMessage, Publisher<Void>> {
        private final Consumer<ServiceBusReceivedMessage> handleMessage;
        private final Scheduler workerScheduler;

        RunOnWorker(Consumer<ServiceBusReceivedMessage> handleMessage, Scheduler workerScheduler) {
            this.handleMessage = handleMessage;
            this.workerScheduler = workerScheduler;
        }

        @Override
        public Mono<Void> apply(ServiceBusReceivedMessage message) {
            return Mono.fromRunnable(() -> this.handleMessage.accept(message)).subscribeOn(this.workerScheduler);
        }
    }
}

