/*
 * Decompiled with CFR 0.152.
 */
package io.smallrye.reactive.messaging.mqtt;

import io.netty.handler.codec.mqtt.MqttQoS;
import io.smallrye.mutiny.Uni;
import io.smallrye.mutiny.vertx.AsyncResultUni;
import io.smallrye.reactive.messaging.OutgoingMessageMetadata;
import io.smallrye.reactive.messaging.mqtt.Clients;
import io.smallrye.reactive.messaging.mqtt.MqttConnectorOutgoingConfiguration;
import io.smallrye.reactive.messaging.mqtt.MqttHelpers;
import io.smallrye.reactive.messaging.mqtt.SendingMqttMessage;
import io.smallrye.reactive.messaging.mqtt.i18n.MqttLogging;
import io.smallrye.reactive.messaging.mqtt.session.MqttClientSession;
import io.smallrye.reactive.messaging.mqtt.session.MqttClientSessionOptions;
import io.vertx.core.json.Json;
import io.vertx.core.json.JsonArray;
import io.vertx.core.json.JsonObject;
import io.vertx.mutiny.core.Vertx;
import io.vertx.mutiny.core.buffer.Buffer;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import org.eclipse.microprofile.reactive.messaging.Message;
import org.eclipse.microprofile.reactive.streams.operators.ReactiveStreams;
import org.eclipse.microprofile.reactive.streams.operators.SubscriberBuilder;

public class MqttSink {
    private final String topic;
    private final int qos;
    private final SubscriberBuilder<? extends Message<?>, Void> sink;
    private final AtomicBoolean ready = new AtomicBoolean();

    public MqttSink(Vertx vertx, MqttConnectorOutgoingConfiguration config) {
        MqttClientSessionOptions options = MqttHelpers.createMqttClientOptions(config);
        this.topic = config.getTopic().orElseGet(config::getChannel);
        this.qos = config.getQos();
        AtomicReference reference = new AtomicReference();
        this.sink = ReactiveStreams.builder().flatMapCompletionStage(msg -> {
            Clients.ClientHolder client = (Clients.ClientHolder)reference.get();
            if (client == null) {
                client = Clients.getHolder(vertx, options);
                reference.set(client);
            }
            return client.start().onSuccess(ignore -> this.ready.set(true)).map(ignore -> msg).toCompletionStage();
        }).flatMapCompletionStage(msg -> this.send(reference, (Message<?>)msg)).onComplete(() -> {
            Clients.ClientHolder c = reference.getAndSet(null);
            if (c != null) {
                c.close().onComplete(ignore -> this.ready.set(false));
            }
        }).onError(MqttLogging.log::errorWhileSendingMessageToBroker).ignore();
    }

    private CompletionStage<?> send(AtomicReference<Clients.ClientHolder> reference, Message<?> msg) {
        boolean isRetain;
        MqttQoS actualQoS;
        String actualTopicToBeUsed;
        MqttClientSession client = reference.get().getClient();
        if (msg instanceof SendingMqttMessage) {
            SendingMqttMessage mm = (SendingMqttMessage)msg;
            actualTopicToBeUsed = mm.getTopic() == null ? this.topic : mm.getTopic();
            actualQoS = mm.getQosLevel() == null ? MqttQoS.valueOf((int)this.qos) : mm.getQosLevel();
            isRetain = mm.isRetain();
        } else {
            actualTopicToBeUsed = this.topic;
            isRetain = false;
            actualQoS = MqttQoS.valueOf((int)this.qos);
        }
        if (actualTopicToBeUsed == null) {
            MqttLogging.log.ignoringNoTopicSet();
            return CompletableFuture.completedFuture(msg);
        }
        return AsyncResultUni.toUni(h -> client.publish(actualTopicToBeUsed, this.convert(msg.getPayload()).getDelegate(), actualQoS, false, isRetain).onComplete(h)).onItemOrFailure().transformToUni((s, f) -> {
            if (f != null) {
                return Uni.createFrom().completionStage(msg.nack(f).thenApply(x -> msg));
            }
            OutgoingMessageMetadata.setResultOnMessage((Message)msg, (Object)s);
            return Uni.createFrom().completionStage(msg.ack().thenApply(x -> msg));
        }).subscribeAsCompletionStage();
    }

    private Buffer convert(Object payload) {
        if (payload instanceof JsonObject) {
            return new Buffer(((JsonObject)payload).toBuffer());
        }
        if (payload instanceof JsonArray) {
            return new Buffer(((JsonArray)payload).toBuffer());
        }
        if (payload instanceof String || payload.getClass().isPrimitive()) {
            return new Buffer(io.vertx.core.buffer.Buffer.buffer((String)payload.toString()));
        }
        if (payload instanceof byte[]) {
            return new Buffer(io.vertx.core.buffer.Buffer.buffer((byte[])((byte[])payload)));
        }
        if (payload instanceof Buffer) {
            return (Buffer)payload;
        }
        if (payload instanceof io.vertx.core.buffer.Buffer) {
            return new Buffer((io.vertx.core.buffer.Buffer)payload);
        }
        return new Buffer(Json.encodeToBuffer((Object)payload));
    }

    public SubscriberBuilder<? extends Message<?>, Void> getSink() {
        return this.sink;
    }

    public boolean isReady() {
        return this.ready.get();
    }
}

