/*
 * Decompiled with CFR 0.152.
 */
package io.opentelemetry.instrumentation.reactor;

import io.opentelemetry.context.Context;
import io.opentelemetry.context.Scope;
import io.opentelemetry.instrumentation.api.annotation.support.async.AsyncOperationEndStrategies;
import io.opentelemetry.instrumentation.api.annotation.support.async.AsyncOperationEndStrategy;
import io.opentelemetry.instrumentation.reactor.ContextPropagationOperatorBuilder;
import io.opentelemetry.instrumentation.reactor.ReactorAsyncOperationEndStrategy;
import io.opentelemetry.instrumentation.reactor.TracingSubscriber;
import java.util.function.BiFunction;
import java.util.function.Function;
import javax.annotation.Nullable;
import org.reactivestreams.Publisher;
import reactor.core.CoreSubscriber;
import reactor.core.Fuseable;
import reactor.core.Scannable;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Hooks;
import reactor.core.publisher.Mono;
import reactor.core.publisher.Operators;

public final class ContextPropagationOperator {
    private static final Object VALUE = new Object();
    private final ReactorAsyncOperationEndStrategy asyncOperationEndStrategy;
    private static final Object TRACE_CONTEXT_KEY = new Object(){

        public String toString() {
            return "otel-trace-context";
        }
    };
    private static final Object lock = new Object();
    private static volatile boolean enabled = false;

    public static ContextPropagationOperator create() {
        return ContextPropagationOperator.builder().build();
    }

    public static ContextPropagationOperatorBuilder builder() {
        return new ContextPropagationOperatorBuilder();
    }

    public static reactor.util.context.Context storeOpenTelemetryContext(reactor.util.context.Context context, Context traceContext) {
        return context.put(TRACE_CONTEXT_KEY, (Object)traceContext);
    }

    public static Context getOpenTelemetryContext(reactor.util.context.Context context, Context defaultTraceContext) {
        return (Context)context.getOrDefault(TRACE_CONTEXT_KEY, (Object)defaultTraceContext);
    }

    ContextPropagationOperator(boolean captureExperimentalSpanAttributes) {
        this.asyncOperationEndStrategy = ReactorAsyncOperationEndStrategy.builder().setCaptureExperimentalSpanAttributes(captureExperimentalSpanAttributes).build();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void registerOnEachOperator() {
        Object object = lock;
        synchronized (object) {
            if (enabled) {
                return;
            }
            Hooks.onEachOperator((String)TracingSubscriber.class.getName(), ContextPropagationOperator.tracingLift(this.asyncOperationEndStrategy));
            AsyncOperationEndStrategies.instance().registerStrategy((AsyncOperationEndStrategy)this.asyncOperationEndStrategy);
            enabled = true;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void resetOnEachOperator() {
        Object object = lock;
        synchronized (object) {
            if (!enabled) {
                return;
            }
            Hooks.resetOnEachOperator((String)TracingSubscriber.class.getName());
            AsyncOperationEndStrategies.instance().unregisterStrategy((AsyncOperationEndStrategy)this.asyncOperationEndStrategy);
            enabled = false;
        }
    }

    private static <T> Function<? super Publisher<T>, ? extends Publisher<T>> tracingLift(ReactorAsyncOperationEndStrategy asyncOperationEndStrategy) {
        return Operators.lift(new Lifter(asyncOperationEndStrategy));
    }

    public static <T> Mono<T> runWithContext(Mono<T> publisher, Context tracingContext) {
        if (!enabled) {
            return publisher;
        }
        return ScalarPropagatingMono.create(publisher).subscriberContext(ctx -> ContextPropagationOperator.storeOpenTelemetryContext(ctx, tracingContext));
    }

    public static <T> Flux<T> runWithContext(Flux<T> publisher, Context tracingContext) {
        if (!enabled) {
            return publisher;
        }
        return ScalarPropagatingFlux.create(publisher).subscriberContext(ctx -> ContextPropagationOperator.storeOpenTelemetryContext(ctx, tracingContext));
    }

    static void subscribeInActiveSpan(CoreSubscriber<? super Object> actual, Object value) {
        Context tracingContextInReactor = ContextPropagationOperator.getOpenTelemetryContext(actual.currentContext(), null);
        if (tracingContextInReactor == null || tracingContextInReactor == Context.current()) {
            actual.onSubscribe(Operators.scalarSubscription(actual, (Object)value));
        } else {
            try (Scope ignored = tracingContextInReactor.makeCurrent();){
                actual.onSubscribe(Operators.scalarSubscription(actual, (Object)value));
            }
        }
    }

    public static class Lifter<T>
    implements BiFunction<Scannable, CoreSubscriber<? super T>, CoreSubscriber<? super T>> {
        private final ReactorAsyncOperationEndStrategy asyncOperationEndStrategy;

        public Lifter(ReactorAsyncOperationEndStrategy asyncOperationEndStrategy) {
            this.asyncOperationEndStrategy = asyncOperationEndStrategy;
        }

        @Override
        public CoreSubscriber<? super T> apply(Scannable publisher, CoreSubscriber<? super T> sub) {
            if (publisher instanceof Fuseable.ScalarCallable) {
                return sub;
            }
            return new TracingSubscriber<T>(sub, sub.currentContext());
        }
    }

    static class ScalarPropagatingMono
    extends Mono<Object>
    implements Scannable {
        private final Mono<?> source;

        static <T> Mono<T> create(Mono<T> source) {
            return new ScalarPropagatingMono(source).flatMap(unused -> source);
        }

        private ScalarPropagatingMono(Mono<?> source) {
            this.source = source;
        }

        public void subscribe(CoreSubscriber<? super Object> actual) {
            ContextPropagationOperator.subscribeInActiveSpan(actual, VALUE);
        }

        @Nullable
        public Object scanUnsafe(Scannable.Attr attr) {
            if (attr == Scannable.Attr.PARENT) {
                return this.source;
            }
            return null;
        }
    }

    static class ScalarPropagatingFlux
    extends Flux<Object>
    implements Scannable {
        private final Flux<?> source;

        static <T> Flux<T> create(Flux<T> source) {
            return new ScalarPropagatingFlux(source).flatMap(unused -> source);
        }

        private ScalarPropagatingFlux(Flux<?> source) {
            this.source = source;
        }

        public void subscribe(CoreSubscriber<? super Object> actual) {
            ContextPropagationOperator.subscribeInActiveSpan(actual, VALUE);
        }

        @Nullable
        public Object scanUnsafe(Scannable.Attr attr) {
            if (attr == Scannable.Attr.PARENT) {
                return this.source;
            }
            return null;
        }
    }
}

