/*
 * Decompiled with CFR 0.152.
 */
package com.azure.messaging.servicebus;

import com.azure.core.amqp.implementation.TracerProvider;
import com.azure.core.util.Context;
import com.azure.core.util.logging.ClientLogger;
import com.azure.core.util.tracing.ProcessKind;
import com.azure.messaging.servicebus.ServiceBusClientBuilder;
import com.azure.messaging.servicebus.ServiceBusErrorContext;
import com.azure.messaging.servicebus.ServiceBusErrorSource;
import com.azure.messaging.servicebus.ServiceBusException;
import com.azure.messaging.servicebus.ServiceBusMessageContext;
import com.azure.messaging.servicebus.ServiceBusReceivedMessage;
import com.azure.messaging.servicebus.ServiceBusReceivedMessageContext;
import com.azure.messaging.servicebus.ServiceBusReceiverAsyncClient;
import com.azure.messaging.servicebus.implementation.models.ServiceBusProcessorClientOptions;
import java.util.Locale;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import org.reactivestreams.Subscription;
import reactor.core.CoreSubscriber;
import reactor.core.publisher.Signal;
import reactor.core.scheduler.Schedulers;

public final class ServiceBusProcessorClient
implements AutoCloseable {
    private static final int SCHEDULER_INTERVAL_IN_SECONDS = 10;
    private final ClientLogger logger = new ClientLogger(ServiceBusProcessorClient.class);
    private final ServiceBusClientBuilder.ServiceBusSessionReceiverClientBuilder sessionReceiverBuilder;
    private final ServiceBusClientBuilder.ServiceBusReceiverClientBuilder receiverBuilder;
    private final Consumer<ServiceBusReceivedMessageContext> processMessage;
    private final Consumer<ServiceBusErrorContext> processError;
    private final ServiceBusProcessorClientOptions processorOptions;
    private final Map<Subscription, Subscription> receiverSubscriptions = new ConcurrentHashMap<Subscription, Subscription>();
    private final AtomicReference<ServiceBusReceiverAsyncClient> asyncClient = new AtomicReference();
    private final AtomicBoolean isRunning = new AtomicBoolean();
    private final TracerProvider tracerProvider;
    private final String queueName;
    private final String topicName;
    private final String subscriptionName;
    private ScheduledExecutorService scheduledExecutor;

    ServiceBusProcessorClient(ServiceBusClientBuilder.ServiceBusSessionReceiverClientBuilder sessionReceiverBuilder, String queueName, String topicName, String subscriptionName, Consumer<ServiceBusReceivedMessageContext> processMessage, Consumer<ServiceBusErrorContext> processError, ServiceBusProcessorClientOptions processorOptions) {
        this.sessionReceiverBuilder = Objects.requireNonNull(sessionReceiverBuilder, "'sessionReceiverBuilder' cannot be null");
        this.processMessage = Objects.requireNonNull(processMessage, "'processMessage' cannot be null");
        this.processError = Objects.requireNonNull(processError, "'processError' cannot be null");
        this.processorOptions = Objects.requireNonNull(processorOptions, "'processorOptions' cannot be null");
        this.asyncClient.set(sessionReceiverBuilder.buildAsyncClientForProcessor());
        this.receiverBuilder = null;
        this.tracerProvider = processorOptions.getTracerProvider();
        this.queueName = queueName;
        this.topicName = topicName;
        this.subscriptionName = subscriptionName;
    }

    ServiceBusProcessorClient(ServiceBusClientBuilder.ServiceBusReceiverClientBuilder receiverBuilder, String queueName, String topicName, String subscriptionName, Consumer<ServiceBusReceivedMessageContext> processMessage, Consumer<ServiceBusErrorContext> processError, ServiceBusProcessorClientOptions processorOptions) {
        this.receiverBuilder = Objects.requireNonNull(receiverBuilder, "'receiverBuilder' cannot be null");
        this.processMessage = Objects.requireNonNull(processMessage, "'processMessage' cannot be null");
        this.processError = Objects.requireNonNull(processError, "'processError' cannot be null");
        this.processorOptions = Objects.requireNonNull(processorOptions, "'processorOptions' cannot be null");
        this.asyncClient.set(receiverBuilder.buildAsyncClient());
        this.sessionReceiverBuilder = null;
        this.tracerProvider = processorOptions.getTracerProvider();
        this.queueName = queueName;
        this.topicName = topicName;
        this.subscriptionName = subscriptionName;
    }

    public synchronized void start() {
        if (this.isRunning.getAndSet(true)) {
            this.logger.info("Processor is already running");
            return;
        }
        if (this.asyncClient.get() == null) {
            ServiceBusReceiverAsyncClient newReceiverClient = this.receiverBuilder == null ? this.sessionReceiverBuilder.buildAsyncClientForProcessor() : this.receiverBuilder.buildAsyncClient();
            this.asyncClient.set(newReceiverClient);
        }
        this.receiveMessages();
        if (this.scheduledExecutor == null) {
            this.scheduledExecutor = Executors.newSingleThreadScheduledExecutor();
            this.scheduledExecutor.scheduleWithFixedDelay(() -> {
                if (this.asyncClient.get().isConnectionClosed()) {
                    this.restartMessageReceiver(null);
                }
            }, 10L, 10L, TimeUnit.SECONDS);
        }
    }

    public synchronized void stop() {
        this.isRunning.set(false);
    }

    @Override
    public synchronized void close() {
        this.isRunning.set(false);
        this.receiverSubscriptions.keySet().forEach(Subscription::cancel);
        this.receiverSubscriptions.clear();
        if (this.scheduledExecutor != null) {
            this.scheduledExecutor.shutdown();
            this.scheduledExecutor = null;
        }
        if (this.asyncClient.get() != null) {
            this.asyncClient.get().close();
            this.asyncClient.set(null);
        }
    }

    public synchronized boolean isRunning() {
        return this.isRunning.get();
    }

    public String getQueueName() {
        return this.queueName;
    }

    public String getTopicName() {
        return this.topicName;
    }

    public String getSubscriptionName() {
        return this.subscriptionName;
    }

    private synchronized void receiveMessages() {
        if (this.receiverSubscriptions.size() > 0) {
            this.receiverSubscriptions.keySet().forEach(subscription -> subscription.request(1L));
            return;
        }
        final ServiceBusReceiverAsyncClient receiverClient = this.asyncClient.get();
        CoreSubscriber[] subscribers = new CoreSubscriber[this.processorOptions.getMaxConcurrentCalls()];
        for (int i = 0; i < this.processorOptions.getMaxConcurrentCalls(); ++i) {
            subscribers[i] = new CoreSubscriber<ServiceBusMessageContext>(){
                private Subscription subscription = null;

                public void onSubscribe(Subscription subscription) {
                    this.subscription = subscription;
                    ServiceBusProcessorClient.this.receiverSubscriptions.put(subscription, subscription);
                    subscription.request(1L);
                }

                public void onNext(ServiceBusMessageContext serviceBusMessageContext) {
                    block6: {
                        if (serviceBusMessageContext.hasError()) {
                            ServiceBusProcessorClient.this.handleError(serviceBusMessageContext.getThrowable());
                        } else {
                            Context processSpanContext = null;
                            try {
                                ServiceBusReceivedMessageContext serviceBusReceivedMessageContext = new ServiceBusReceivedMessageContext(receiverClient, serviceBusMessageContext);
                                processSpanContext = ServiceBusProcessorClient.this.startProcessTracingSpan(serviceBusMessageContext.getMessage(), receiverClient.getEntityPath(), receiverClient.getFullyQualifiedNamespace());
                                if (processSpanContext.getData((Object)"span-context").isPresent()) {
                                    serviceBusMessageContext.getMessage().addContext("span-context", processSpanContext);
                                }
                                ServiceBusProcessorClient.this.processMessage.accept(serviceBusReceivedMessageContext);
                                ServiceBusProcessorClient.this.endProcessTracingSpan(processSpanContext, (Signal<Void>)Signal.complete());
                            }
                            catch (Exception ex) {
                                ServiceBusProcessorClient.this.handleError((Throwable)((Object)new ServiceBusException(ex, ServiceBusErrorSource.USER_CALLBACK)));
                                ServiceBusProcessorClient.this.endProcessTracingSpan(processSpanContext, (Signal<Void>)Signal.error((Throwable)ex));
                                if (ServiceBusProcessorClient.this.processorOptions.isDisableAutoComplete()) break block6;
                                ServiceBusProcessorClient.this.logger.warning("Error when processing message. Abandoning message.", new Object[]{ex});
                                ServiceBusProcessorClient.this.abandonMessage(serviceBusMessageContext, receiverClient);
                            }
                        }
                    }
                    if (ServiceBusProcessorClient.this.isRunning.get()) {
                        ServiceBusProcessorClient.this.logger.verbose("Requesting 1 more message from upstream");
                        this.subscription.request(1L);
                    }
                }

                public void onError(Throwable throwable) {
                    ServiceBusProcessorClient.this.logger.info("Error receiving messages.", new Object[]{throwable});
                    ServiceBusProcessorClient.this.handleError(throwable);
                    if (ServiceBusProcessorClient.this.isRunning.get()) {
                        ServiceBusProcessorClient.this.restartMessageReceiver(this.subscription);
                    }
                }

                public void onComplete() {
                    ServiceBusProcessorClient.this.logger.info("Completed receiving messages.");
                    if (ServiceBusProcessorClient.this.isRunning.get()) {
                        ServiceBusProcessorClient.this.restartMessageReceiver(this.subscription);
                    }
                }
            };
        }
        receiverClient.receiveMessagesWithContext().parallel(this.processorOptions.getMaxConcurrentCalls(), 1).runOn(Schedulers.boundedElastic(), 1).subscribe(subscribers);
    }

    private void endProcessTracingSpan(Context processSpanContext, Signal<Void> signal) {
        if (processSpanContext == null) {
            return;
        }
        Optional spanScope = processSpanContext.getData((Object)"scope");
        if (!spanScope.isPresent() || !this.tracerProvider.isEnabled()) {
            return;
        }
        if (spanScope.get() instanceof AutoCloseable) {
            AutoCloseable close = (AutoCloseable)processSpanContext.getData((Object)"scope").get();
            try {
                close.close();
            }
            catch (Exception exception) {
                this.logger.error("endTracingSpan().close() failed with an error {}", new Object[]{exception});
            }
        } else {
            this.logger.warning(String.format(Locale.US, "Process span scope type is not of type AutoCloseable, but type: %s. Not closing the scope and span", spanScope.get() != null ? spanScope.getClass() : "null"));
        }
        this.tracerProvider.endSpan(processSpanContext, signal);
    }

    private Context startProcessTracingSpan(ServiceBusReceivedMessage receivedMessage, String entityPath, String fullyQualifiedNamespace) {
        Object diagnosticId = receivedMessage.getApplicationProperties().get("Diagnostic-Id");
        if (this.tracerProvider == null || !this.tracerProvider.isEnabled()) {
            return Context.NONE;
        }
        Context spanContext = Objects.isNull(diagnosticId) ? Context.NONE : this.tracerProvider.extractContext(diagnosticId.toString(), Context.NONE);
        spanContext = spanContext.addData((Object)"entity-path", (Object)entityPath).addData((Object)"hostname", (Object)fullyQualifiedNamespace).addData((Object)"az.namespace", (Object)"Microsoft.ServiceBus");
        spanContext = receivedMessage.getEnqueuedTime() == null ? spanContext : spanContext.addData((Object)"x-opt-enqueued-time", (Object)receivedMessage.getEnqueuedTime().toInstant().getEpochSecond());
        return this.tracerProvider.startSpan("ServiceBus.", spanContext, ProcessKind.PROCESS);
    }

    private void abandonMessage(ServiceBusMessageContext serviceBusMessageContext, ServiceBusReceiverAsyncClient receiverClient) {
        try {
            receiverClient.abandon(serviceBusMessageContext.getMessage()).block();
        }
        catch (Exception exception) {
            this.logger.verbose("Failed to abandon message", new Object[]{exception});
        }
    }

    private void handleError(Throwable throwable) {
        try {
            ServiceBusReceiverAsyncClient client = this.asyncClient.get();
            String fullyQualifiedNamespace = client.getFullyQualifiedNamespace();
            String entityPath = client.getEntityPath();
            this.processError.accept(new ServiceBusErrorContext(throwable, fullyQualifiedNamespace, entityPath));
        }
        catch (Exception ex) {
            this.logger.verbose("Error from error handler. Ignoring error.", new Object[]{ex});
        }
    }

    private synchronized void restartMessageReceiver(Subscription requester) {
        if (!this.isRunning()) {
            return;
        }
        if (requester != null && !this.receiverSubscriptions.containsKey(requester)) {
            return;
        }
        this.receiverSubscriptions.keySet().forEach(Subscription::cancel);
        this.receiverSubscriptions.clear();
        ServiceBusReceiverAsyncClient receiverClient = this.asyncClient.get();
        receiverClient.close();
        ServiceBusReceiverAsyncClient newReceiverClient = this.receiverBuilder == null ? this.sessionReceiverBuilder.buildAsyncClientForProcessor() : this.receiverBuilder.buildAsyncClient();
        this.asyncClient.set(newReceiverClient);
        this.receiveMessages();
    }
}

