/*
 * Decompiled with CFR 0.152.
 */
package io.quarkus.smallrye.reactivemessaging.runtime;

import io.quarkus.smallrye.reactivemessaging.runtime.ContextualEmitter;
import io.smallrye.common.annotation.CheckReturnValue;
import io.smallrye.mutiny.Uni;
import io.smallrye.mutiny.infrastructure.Infrastructure;
import io.smallrye.mutiny.operators.uni.UniOnItemTransformToUni;
import io.smallrye.mutiny.operators.uni.builders.UniCreateWithEmitter;
import io.smallrye.mutiny.subscription.Cancellable;
import io.smallrye.mutiny.subscription.UniEmitter;
import io.smallrye.reactive.messaging.EmitterConfiguration;
import io.smallrye.reactive.messaging.providers.extension.AbstractEmitter;
import io.smallrye.reactive.messaging.providers.i18n.ProviderExceptions;
import io.smallrye.reactive.messaging.providers.i18n.ProviderLogging;
import io.smallrye.reactive.messaging.providers.locals.ContextAwareMessage;
import io.smallrye.reactive.messaging.providers.locals.LocalContextMetadata;
import io.vertx.core.Context;
import io.vertx.core.Vertx;
import io.vertx.core.impl.ContextInternal;
import java.util.function.Consumer;
import java.util.function.Function;
import org.eclipse.microprofile.reactive.messaging.Message;

public class ContextualEmitterImpl<T>
extends AbstractEmitter<T>
implements ContextualEmitter<T> {
    public ContextualEmitterImpl(EmitterConfiguration configuration, long defaultBufferSize) {
        super(configuration, defaultBufferSize);
    }

    @Override
    public void sendAndAwait(T payload) {
        this.sendMessage(Message.of(payload)).await().indefinitely();
    }

    @Override
    public Cancellable sendAndForget(T payload) {
        return this.send(payload).subscribe().with(x -> {}, arg_0 -> ((ProviderLogging)ProviderLogging.log).failureEmittingMessage(arg_0));
    }

    @Override
    public Uni<Void> send(T payload) {
        return this.sendMessage(Message.of(payload));
    }

    @Override
    public <M extends Message<? extends T>> void sendMessageAndAwait(M msg) {
        this.sendMessage(msg).await().indefinitely();
    }

    @Override
    public <M extends Message<? extends T>> Cancellable sendMessageAndForget(M msg) {
        return this.sendMessage(msg).subscribe().with(x -> {}, arg_0 -> ((ProviderLogging)ProviderLogging.log).failureEmittingMessage(arg_0));
    }

    @Override
    @CheckReturnValue
    public <M extends Message<? extends T>> Uni<Void> sendMessage(M msg) {
        if (msg == null) {
            throw ProviderExceptions.ex.illegalArgumentForNullValue();
        }
        Context context = Vertx.currentContext();
        Uni msgUni = Uni.createFrom().item(() -> ContextualEmitterImpl.createContextualMessage(msg, context));
        if (context != null) {
            msgUni = msgUni.emitOn(r -> context.runOnContext(x -> r.run()));
        }
        Uni uni = ContextualEmitterImpl.transformToUni(msgUni, message -> ContextualEmitterImpl.emitter(e -> {
            try {
                this.emit(message.withAck(() -> {
                    e.complete(null);
                    return msg.ack();
                }).withNack(t -> {
                    e.fail(t);
                    return msg.nack(t);
                }));
            }
            catch (Exception t2) {
                msg.nack((Throwable)t2);
                throw t2;
            }
        }));
        if (context != null) {
            return uni.emitOn(r -> context.runOnContext(x -> r.run()));
        }
        return uni;
    }

    private static <T, M extends Message<T>> Message<T> createContextualMessage(M msg, Context context) {
        if (context == null) {
            return ContextAwareMessage.withContextMetadata(msg);
        }
        ContextInternal internal = (ContextInternal)context;
        ContextInternal newCtx = internal.duplicate();
        newCtx.localContextData().putAll(internal.localContextData());
        return msg.addMetadata((Object)new LocalContextMetadata((Context)newCtx));
    }

    public static <T> Uni<T> emitter(Consumer<UniEmitter<? super T>> emitter) {
        return Infrastructure.onUniCreation((Uni)new UniCreateWithEmitter(emitter));
    }

    public static <T, R> Uni<R> transformToUni(Uni<T> upstream, Function<? super T, Uni<? extends R>> mapper) {
        return Infrastructure.onUniCreation((Uni)new UniOnItemTransformToUni(upstream, mapper));
    }
}

