/*
 * Decompiled with CFR 0.152.
 */
package com.azure.messaging.servicebus;

import com.azure.core.util.Context;
import com.azure.messaging.servicebus.ServiceBusMessageContext;
import com.azure.messaging.servicebus.implementation.instrumentation.ServiceBusReceiverInstrumentation;
import com.azure.messaging.servicebus.implementation.instrumentation.ServiceBusTracer;
import java.util.Objects;
import org.reactivestreams.Subscription;
import reactor.core.CoreSubscriber;
import reactor.core.publisher.BaseSubscriber;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxOperator;

final class FluxTrace
extends FluxOperator<ServiceBusMessageContext, ServiceBusMessageContext> {
    static final String PROCESS_ERROR_KEY = "process-error";
    private final ServiceBusReceiverInstrumentation instrumentation;

    FluxTrace(Flux<? extends ServiceBusMessageContext> upstream, ServiceBusReceiverInstrumentation instrumentation) {
        super(upstream);
        this.instrumentation = instrumentation;
    }

    public void subscribe(CoreSubscriber<? super ServiceBusMessageContext> coreSubscriber) {
        Objects.requireNonNull(coreSubscriber, "'coreSubscriber' cannot be null.");
        this.source.subscribe((CoreSubscriber)new TracingSubscriber(coreSubscriber, this.instrumentation));
    }

    private static class TracingSubscriber
    extends BaseSubscriber<ServiceBusMessageContext> {
        private final CoreSubscriber<? super ServiceBusMessageContext> downstream;
        private final ServiceBusReceiverInstrumentation instrumentation;
        private final ServiceBusTracer tracer;

        TracingSubscriber(CoreSubscriber<? super ServiceBusMessageContext> downstream, ServiceBusReceiverInstrumentation instrumentation) {
            this.downstream = downstream;
            this.instrumentation = instrumentation;
            this.tracer = instrumentation.getTracer();
        }

        public reactor.util.context.Context currentContext() {
            return this.downstream.currentContext();
        }

        protected void hookOnSubscribe(Subscription subscription) {
            this.downstream.onSubscribe((Subscription)this);
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        protected void hookOnNext(ServiceBusMessageContext message) {
            Throwable exception = null;
            Context span = this.instrumentation.instrumentProcess("ServiceBus.process", message.getMessage(), Context.NONE);
            message.getMessage().setContext(span);
            AutoCloseable scope = this.tracer.makeSpanCurrent(span);
            try {
                this.downstream.onNext((Object)message);
            }
            catch (Throwable t) {
                exception = t;
            }
            finally {
                Object processorException;
                Context context = message.getMessage().getContext();
                if (context != null && (processorException = context.getData((Object)FluxTrace.PROCESS_ERROR_KEY).orElse(null)) instanceof Throwable) {
                    exception = processorException;
                }
                this.tracer.endSpan(exception, context, scope);
            }
        }

        protected void hookOnError(Throwable throwable) {
            this.downstream.onError(throwable);
        }

        protected void hookOnComplete() {
            this.downstream.onComplete();
        }
    }
}

