/*
 * Decompiled with CFR 0.152.
 */
package io.smallrye.reactive.messaging.mqtt;

import io.netty.handler.codec.mqtt.MqttQoS;
import io.smallrye.reactive.messaging.mqtt.MqttConnectorOutgoingConfiguration;
import io.smallrye.reactive.messaging.mqtt.MqttHelpers;
import io.smallrye.reactive.messaging.mqtt.SendingMqttMessage;
import io.vertx.core.json.Json;
import io.vertx.core.json.JsonArray;
import io.vertx.core.json.JsonObject;
import io.vertx.mqtt.MqttClientOptions;
import io.vertx.mutiny.core.Vertx;
import io.vertx.mutiny.core.buffer.Buffer;
import io.vertx.mutiny.mqtt.MqttClient;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicBoolean;
import org.eclipse.microprofile.reactive.messaging.Message;
import org.eclipse.microprofile.reactive.streams.operators.ReactiveStreams;
import org.eclipse.microprofile.reactive.streams.operators.SubscriberBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class MqttSink {
    private static final Logger LOGGER = LoggerFactory.getLogger(MqttSink.class);
    private final String host;
    private final int port;
    private final MqttClient client;
    private final String server;
    private final String topic;
    private final int qos;
    private final SubscriberBuilder<? extends Message<?>, Void> sink;
    private final AtomicBoolean connected = new AtomicBoolean();

    public MqttSink(Vertx vertx, MqttConnectorOutgoingConfiguration config) {
        MqttClientOptions options = MqttHelpers.createMqttClientOptions(config);
        this.host = config.getHost();
        int def = options.isSsl() ? 8883 : 1883;
        this.port = config.getPort().orElse(def);
        this.server = config.getServerName().orElse(null);
        this.topic = config.getTopic().orElseGet(config::getChannel);
        this.client = MqttClient.create((Vertx)vertx, (MqttClientOptions)options);
        this.qos = config.getQos();
        this.sink = ReactiveStreams.builder().flatMapCompletionStage(msg -> {
            if (this.connected.get()) {
                return CompletableFuture.completedFuture(msg);
            }
            return this.client.connect(this.port, this.host, this.server).subscribeAsCompletionStage().thenApply(x -> {
                this.connected.set(true);
                return msg;
            });
        }).flatMapCompletionStage(msg -> {
            String actualTopicToBeUsed = this.topic;
            MqttQoS actualQoS = MqttQoS.valueOf((int)this.qos);
            boolean isRetain = false;
            if (msg instanceof SendingMqttMessage) {
                SendingMqttMessage mm = (SendingMqttMessage)msg;
                actualTopicToBeUsed = mm.getTopic() == null ? this.topic : mm.getTopic();
                actualQoS = mm.getQosLevel() == null ? actualQoS : mm.getQosLevel();
                isRetain = mm.isRetain();
            }
            if (actualTopicToBeUsed == null) {
                LOGGER.error("Ignoring message - no topic set");
                return CompletableFuture.completedFuture(msg);
            }
            return this.client.publish(actualTopicToBeUsed, this.convert(msg.getPayload()), actualQoS, false, isRetain).subscribeAsCompletionStage();
        }).onComplete(() -> ((MqttClient)this.client).disconnect()).onError(t -> LOGGER.error("An error has been caught while sending a MQTT message to the broker", t)).ignore();
    }

    private Buffer convert(Object payload) {
        if (payload instanceof JsonObject) {
            return new Buffer(((JsonObject)payload).toBuffer());
        }
        if (payload instanceof JsonArray) {
            return new Buffer(((JsonArray)payload).toBuffer());
        }
        if (payload instanceof String || payload.getClass().isPrimitive()) {
            return new Buffer(io.vertx.core.buffer.Buffer.buffer((String)payload.toString()));
        }
        if (payload instanceof byte[]) {
            return new Buffer(io.vertx.core.buffer.Buffer.buffer((byte[])((byte[])payload)));
        }
        if (payload instanceof Buffer) {
            return (Buffer)payload;
        }
        if (payload instanceof io.vertx.core.buffer.Buffer) {
            return new Buffer((io.vertx.core.buffer.Buffer)payload);
        }
        return new Buffer(Json.encodeToBuffer((Object)payload));
    }

    public SubscriberBuilder<? extends Message<?>, Void> getSink() {
        return this.sink;
    }

    public boolean isReady() {
        return this.connected.get();
    }
}

