/*
 * Decompiled with CFR 0.152.
 */
package com.azure.messaging.servicebus;

import com.azure.core.util.logging.ClientLogger;
import com.azure.messaging.servicebus.ServiceBusClientBuilder;
import com.azure.messaging.servicebus.ServiceBusErrorContext;
import com.azure.messaging.servicebus.ServiceBusErrorSource;
import com.azure.messaging.servicebus.ServiceBusException;
import com.azure.messaging.servicebus.ServiceBusMessageContext;
import com.azure.messaging.servicebus.ServiceBusReceivedMessageContext;
import com.azure.messaging.servicebus.ServiceBusReceiverAsyncClient;
import com.azure.messaging.servicebus.implementation.ServiceBusProcessorClientOptions;
import com.azure.messaging.servicebus.implementation.instrumentation.ServiceBusTracer;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import org.reactivestreams.Subscription;
import reactor.core.CoreSubscriber;
import reactor.core.Disposable;
import reactor.core.scheduler.Schedulers;

public final class ServiceBusProcessorClient
implements AutoCloseable {
    private static final int SCHEDULER_INTERVAL_IN_SECONDS = 10;
    private static final ClientLogger LOGGER = new ClientLogger(ServiceBusProcessorClient.class);
    private final ServiceBusClientBuilder.ServiceBusSessionReceiverClientBuilder sessionReceiverBuilder;
    private final ServiceBusClientBuilder.ServiceBusReceiverClientBuilder receiverBuilder;
    private final Consumer<ServiceBusReceivedMessageContext> processMessage;
    private final Consumer<ServiceBusErrorContext> processError;
    private final ServiceBusProcessorClientOptions processorOptions;
    private final Map<Subscription, Subscription> receiverSubscriptions = new ConcurrentHashMap<Subscription, Subscription>();
    private final AtomicReference<ServiceBusReceiverAsyncClient> asyncClient = new AtomicReference();
    private final AtomicBoolean isRunning = new AtomicBoolean();
    private final String queueName;
    private final String topicName;
    private final String subscriptionName;
    private final ServiceBusTracer tracer;
    private Disposable monitorDisposable;
    private boolean wasStopped = false;

    ServiceBusProcessorClient(ServiceBusClientBuilder.ServiceBusSessionReceiverClientBuilder sessionReceiverBuilder, String queueName, String topicName, String subscriptionName, Consumer<ServiceBusReceivedMessageContext> processMessage, Consumer<ServiceBusErrorContext> processError, ServiceBusProcessorClientOptions processorOptions) {
        this.sessionReceiverBuilder = Objects.requireNonNull(sessionReceiverBuilder, "'sessionReceiverBuilder' cannot be null");
        this.processMessage = Objects.requireNonNull(processMessage, "'processMessage' cannot be null");
        this.processError = Objects.requireNonNull(processError, "'processError' cannot be null");
        this.processorOptions = Objects.requireNonNull(processorOptions, "'processorOptions' cannot be null");
        ServiceBusReceiverAsyncClient client = sessionReceiverBuilder.buildAsyncClientForProcessor();
        this.asyncClient.set(client);
        this.receiverBuilder = null;
        this.queueName = queueName;
        this.topicName = topicName;
        this.subscriptionName = subscriptionName;
        this.tracer = client.getInstrumentation().getTracer();
    }

    ServiceBusProcessorClient(ServiceBusClientBuilder.ServiceBusReceiverClientBuilder receiverBuilder, String queueName, String topicName, String subscriptionName, Consumer<ServiceBusReceivedMessageContext> processMessage, Consumer<ServiceBusErrorContext> processError, ServiceBusProcessorClientOptions processorOptions) {
        this.receiverBuilder = Objects.requireNonNull(receiverBuilder, "'receiverBuilder' cannot be null");
        this.processMessage = Objects.requireNonNull(processMessage, "'processMessage' cannot be null");
        this.processError = Objects.requireNonNull(processError, "'processError' cannot be null");
        this.processorOptions = Objects.requireNonNull(processorOptions, "'processorOptions' cannot be null");
        ServiceBusReceiverAsyncClient client = receiverBuilder.buildAsyncClient();
        this.asyncClient.set(client);
        this.sessionReceiverBuilder = null;
        this.queueName = queueName;
        this.topicName = topicName;
        this.subscriptionName = subscriptionName;
        this.tracer = client.getInstrumentation().getTracer();
    }

    public synchronized void start() {
        if (this.isRunning.getAndSet(true)) {
            LOGGER.info("Processor is already running");
            return;
        }
        if (this.wasStopped) {
            this.wasStopped = false;
            LOGGER.warning("Starting Processor that was stopped before is not recommended, and this feature may be deprecated in the future. Please close this processor instance and create a new one to restart processing. Refer to the GitHub issue https://github.com/Azure/azure-sdk-for-java/issues/34464 for more details");
        }
        if (this.asyncClient.get() == null) {
            ServiceBusReceiverAsyncClient newReceiverClient = this.receiverBuilder == null ? this.sessionReceiverBuilder.buildAsyncClientForProcessor() : this.receiverBuilder.buildAsyncClient();
            this.asyncClient.set(newReceiverClient);
        }
        this.receiveMessages();
        if (this.monitorDisposable == null) {
            this.monitorDisposable = Schedulers.boundedElastic().schedulePeriodically(() -> {
                if (this.asyncClient.get().isConnectionClosed()) {
                    this.restartMessageReceiver(null);
                }
            }, 10L, 10L, TimeUnit.SECONDS);
        }
    }

    public synchronized void stop() {
        this.wasStopped = true;
        this.isRunning.set(false);
    }

    @Override
    public synchronized void close() {
        this.isRunning.set(false);
        this.receiverSubscriptions.keySet().forEach(Subscription::cancel);
        this.receiverSubscriptions.clear();
        if (this.monitorDisposable != null) {
            this.monitorDisposable.dispose();
            this.monitorDisposable = null;
        }
        if (this.asyncClient.get() != null) {
            this.asyncClient.get().close();
            this.asyncClient.set(null);
        }
    }

    public synchronized boolean isRunning() {
        return this.isRunning.get();
    }

    public String getQueueName() {
        return this.queueName;
    }

    public String getTopicName() {
        return this.topicName;
    }

    public String getSubscriptionName() {
        return this.subscriptionName;
    }

    public synchronized String getIdentifier() {
        if (this.asyncClient.get() == null) {
            ServiceBusReceiverAsyncClient newReceiverClient = this.receiverBuilder == null ? this.sessionReceiverBuilder.buildAsyncClientForProcessor() : this.receiverBuilder.buildAsyncClient();
            this.asyncClient.set(newReceiverClient);
        }
        return this.asyncClient.get().getIdentifier();
    }

    private synchronized void receiveMessages() {
        if (this.receiverSubscriptions.size() > 0) {
            this.receiverSubscriptions.keySet().forEach(subscription -> subscription.request(1L));
            return;
        }
        final ServiceBusReceiverAsyncClient receiverClient = this.asyncClient.get();
        CoreSubscriber[] subscribers = new CoreSubscriber[this.processorOptions.getMaxConcurrentCalls()];
        for (int i = 0; i < this.processorOptions.getMaxConcurrentCalls(); ++i) {
            subscribers[i] = new CoreSubscriber<ServiceBusMessageContext>(){
                private Subscription subscription = null;

                public void onSubscribe(Subscription subscription) {
                    this.subscription = subscription;
                    ServiceBusProcessorClient.this.receiverSubscriptions.put(subscription, subscription);
                    subscription.request(1L);
                }

                public void onNext(ServiceBusMessageContext serviceBusMessageContext) {
                    try (AutoCloseable scope = ServiceBusProcessorClient.this.tracer.makeSpanCurrent(serviceBusMessageContext.getMessage().getContext());){
                        block13: {
                            if (serviceBusMessageContext.hasError()) {
                                ServiceBusProcessorClient.this.handleError(serviceBusMessageContext.getThrowable());
                            } else {
                                ServiceBusReceivedMessageContext serviceBusReceivedMessageContext = new ServiceBusReceivedMessageContext(receiverClient, serviceBusMessageContext);
                                try {
                                    ServiceBusProcessorClient.this.processMessage.accept(serviceBusReceivedMessageContext);
                                }
                                catch (Exception ex) {
                                    serviceBusMessageContext.getMessage().setContext(serviceBusMessageContext.getMessage().getContext().addData((Object)"process-error", (Object)ex));
                                    ServiceBusProcessorClient.this.handleError((Throwable)((Object)new ServiceBusException(ex, ServiceBusErrorSource.USER_CALLBACK)));
                                    if (ServiceBusProcessorClient.this.processorOptions.isDisableAutoComplete()) break block13;
                                    LOGGER.warning("Error when processing message. Abandoning message.", new Object[]{ex});
                                    ServiceBusProcessorClient.this.abandonMessage(serviceBusMessageContext, receiverClient);
                                }
                            }
                        }
                        if (ServiceBusProcessorClient.this.isRunning.get()) {
                            LOGGER.verbose("Requesting 1 more message from upstream");
                            this.subscription.request(1L);
                        }
                    }
                    catch (Exception e) {
                        LOGGER.verbose("Error disposing scope", new Object[]{e});
                    }
                }

                public void onError(Throwable throwable) {
                    LOGGER.info("Error receiving messages.", new Object[]{throwable});
                    ServiceBusProcessorClient.this.handleError(throwable);
                    if (ServiceBusProcessorClient.this.isRunning.get()) {
                        ServiceBusProcessorClient.this.restartMessageReceiver(this.subscription);
                    }
                }

                public void onComplete() {
                    LOGGER.info("Completed receiving messages.");
                    if (ServiceBusProcessorClient.this.isRunning.get()) {
                        ServiceBusProcessorClient.this.restartMessageReceiver(this.subscription);
                    }
                }
            };
        }
        if (this.processorOptions.getMaxConcurrentCalls() > 1) {
            receiverClient.receiveMessagesWithContext().parallel(this.processorOptions.getMaxConcurrentCalls(), 1).runOn(Schedulers.boundedElastic(), 1).subscribe(subscribers);
        } else {
            receiverClient.receiveMessagesWithContext().subscribe(subscribers[0]);
        }
    }

    private void abandonMessage(ServiceBusMessageContext serviceBusMessageContext, ServiceBusReceiverAsyncClient receiverClient) {
        try {
            receiverClient.abandon(serviceBusMessageContext.getMessage()).block();
        }
        catch (Exception exception) {
            LOGGER.verbose("Failed to abandon message", new Object[]{exception});
        }
    }

    private void handleError(Throwable throwable) {
        try {
            ServiceBusReceiverAsyncClient client = this.asyncClient.get();
            String fullyQualifiedNamespace = client.getFullyQualifiedNamespace();
            String entityPath = client.getEntityPath();
            this.processError.accept(new ServiceBusErrorContext(throwable, fullyQualifiedNamespace, entityPath));
        }
        catch (Exception ex) {
            LOGGER.verbose("Error from error handler. Ignoring error.", new Object[]{ex});
        }
    }

    private synchronized void restartMessageReceiver(Subscription requester) {
        if (!this.isRunning()) {
            return;
        }
        if (requester != null && !this.receiverSubscriptions.containsKey(requester)) {
            return;
        }
        this.receiverSubscriptions.keySet().forEach(Subscription::cancel);
        this.receiverSubscriptions.clear();
        ServiceBusReceiverAsyncClient receiverClient = this.asyncClient.get();
        receiverClient.close();
        ServiceBusReceiverAsyncClient newReceiverClient = this.receiverBuilder == null ? this.sessionReceiverBuilder.buildAsyncClientForProcessor() : this.receiverBuilder.buildAsyncClient();
        this.asyncClient.set(newReceiverClient);
        this.receiveMessages();
    }
}

