/*
 * Decompiled with CFR 0.152.
 */
package io.smallrye.reactive.messaging.providers.locals;

import io.smallrye.mutiny.Multi;
import io.smallrye.mutiny.operators.MultiOperator;
import io.smallrye.mutiny.operators.multi.MultiOperatorProcessor;
import io.smallrye.mutiny.subscription.MultiSubscriber;
import io.smallrye.reactive.messaging.PublisherDecorator;
import io.smallrye.reactive.messaging.providers.helpers.VertxContext;
import io.smallrye.reactive.messaging.providers.locals.LocalContextMetadata;
import io.vertx.core.Context;
import io.vertx.core.Vertx;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import javax.enterprise.context.ApplicationScoped;
import org.eclipse.microprofile.reactive.messaging.Message;

@ApplicationScoped
public class ContextDecorator
implements PublisherDecorator {
    public int getPriority() {
        return 0;
    }

    public Multi<? extends Message<?>> decorate(Multi<? extends Message<?>> publisher, String channelName, boolean isConnector) {
        return publisher.plug(upstream -> new ContextMulti((Multi<Message<?>>)upstream));
    }

    static class ContextMulti
    extends MultiOperator<Message<?>, Message<?>> {
        public ContextMulti(Multi<Message<?>> upstream) {
            super(upstream);
        }

        public void subscribe(MultiSubscriber<? super Message<?>> subscriber) {
            ContextProcessor operator = new ContextProcessor(subscriber);
            this.upstream().subscribe().withSubscriber((MultiSubscriber)operator);
        }

        static class ContextProcessor
        extends MultiOperatorProcessor<Message<?>, Message<?>> {
            private volatile Context rootContext;
            private static final AtomicReferenceFieldUpdater<ContextProcessor, Context> ROOT_CONTEXT_UPDATER = AtomicReferenceFieldUpdater.newUpdater(ContextProcessor.class, Context.class, "rootContext");

            public ContextProcessor(MultiSubscriber<? super Message<?>> downstream) {
                super(downstream);
            }

            public void onFailure(Throwable throwable) {
                Context root = ROOT_CONTEXT_UPDATER.getAndSet(this, null);
                if (root == null) {
                    super.onFailure(throwable);
                } else {
                    root.runOnContext(ignored -> super.onFailure(throwable));
                }
            }

            public void onItem(Message<?> item) {
                Optional metadata = item.getMetadata().get(LocalContextMetadata.class);
                if (metadata.isPresent()) {
                    Context context = ((LocalContextMetadata)metadata.get()).context();
                    ROOT_CONTEXT_UPDATER.compareAndSet(this, null, VertxContext.getRootContext(context));
                    VertxContext.runOnContext(context, () -> super.onItem((Object)item));
                } else {
                    super.onItem(item);
                }
            }

            public void request(long numberOfItems) {
                Context context = Vertx.currentContext();
                if (context != null) {
                    super.request(numberOfItems);
                } else {
                    Context root = ROOT_CONTEXT_UPDATER.get(this);
                    if (root != null) {
                        root.runOnContext(x -> super.request(numberOfItems));
                    } else {
                        super.request(numberOfItems);
                    }
                }
            }

            public void onCompletion() {
                Context root = ROOT_CONTEXT_UPDATER.getAndSet(this, null);
                if (root == null) {
                    super.onCompletion();
                } else {
                    root.runOnContext(ignored -> super.onCompletion());
                }
            }
        }
    }
}

