/*
 * Decompiled with CFR 0.152.
 */
package io.smallrye.reactive.messaging.mqtt;

import io.netty.handler.codec.mqtt.MqttQoS;
import io.smallrye.reactive.messaging.mqtt.SendingMqttMessage;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.json.Json;
import io.vertx.core.json.JsonArray;
import io.vertx.core.json.JsonObject;
import io.vertx.core.logging.Logger;
import io.vertx.core.logging.LoggerFactory;
import io.vertx.mqtt.MqttClientOptions;
import io.vertx.reactivex.core.Vertx;
import io.vertx.reactivex.mqtt.MqttClient;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.eclipse.microprofile.config.Config;
import org.eclipse.microprofile.reactive.messaging.Message;
import org.eclipse.microprofile.reactive.streams.operators.ReactiveStreams;
import org.eclipse.microprofile.reactive.streams.operators.SubscriberBuilder;

public class MqttSink {
    private static final Logger LOGGER = LoggerFactory.getLogger(MqttSink.class);
    private final String host;
    private final int port;
    private final MqttClient client;
    private final String server;
    private final String topic;
    private final int qos;
    private final SubscriberBuilder<? extends Message, Void> sink;

    public MqttSink(Vertx vertx, Config config) {
        MqttClientOptions options = new MqttClientOptions();
        options.setClientId((String)config.getOptionalValue("client-id", String.class).orElse(null));
        options.setAutoGeneratedClientId(config.getOptionalValue("auto-generated-client-id", Boolean.class).orElse(false).booleanValue());
        options.setAutoKeepAlive(config.getOptionalValue("auto-keep-alive", Boolean.class).orElse(true).booleanValue());
        options.setSsl(config.getOptionalValue("ssl", Boolean.class).orElse(false).booleanValue());
        options.setWillQoS(config.getOptionalValue("will-qos", Integer.class).orElse(0).intValue());
        options.setKeepAliveTimeSeconds(config.getOptionalValue("keep-alive-seconds", Integer.class).orElse(30).intValue());
        options.setMaxInflightQueue(config.getOptionalValue("max-inflight-queue", Integer.class).orElse(10).intValue());
        options.setCleanSession(config.getOptionalValue("auto-clean-session", Boolean.class).orElse(true).booleanValue());
        options.setWillFlag(config.getOptionalValue("will-flag", Boolean.class).orElse(false).booleanValue());
        options.setWillRetain(config.getOptionalValue("will-retain", Boolean.class).orElse(false).booleanValue());
        options.setMaxMessageSize(config.getOptionalValue("max-message-size", Integer.class).orElse(-1).intValue());
        options.setReconnectAttempts(config.getOptionalValue("reconnect-attempts", Integer.class).orElse(5).intValue());
        options.setReconnectInterval(TimeUnit.SECONDS.toMillis(config.getOptionalValue("reconnect-interval-seconds", Integer.class).orElse(1).intValue()));
        options.setUsername((String)config.getOptionalValue("username", String.class).orElse(null));
        options.setPassword((String)config.getOptionalValue("password", String.class).orElse(null));
        options.setConnectTimeout((int)TimeUnit.SECONDS.toMillis(config.getOptionalValue("connect-timeout-seconds", Integer.class).orElse(60).intValue()));
        options.setTrustAll(config.getOptionalValue("trust-all", Boolean.class).orElse(false).booleanValue());
        this.host = (String)config.getValue("host", String.class);
        int def = options.isSsl() ? 8883 : 1883;
        this.port = config.getOptionalValue("port", Integer.class).orElse(def);
        this.server = config.getOptionalValue("server-name", String.class).orElse(null);
        this.topic = this.getTopicOrNull(config);
        this.client = MqttClient.create((Vertx)vertx, (MqttClientOptions)options);
        this.qos = config.getOptionalValue("qos", Integer.class).orElse(0);
        AtomicBoolean connected = new AtomicBoolean();
        this.sink = ReactiveStreams.builder().flatMapCompletionStage(msg -> {
            if (connected.get()) {
                return CompletableFuture.completedFuture(msg);
            }
            CompletableFuture future = new CompletableFuture();
            this.client.connect(this.port, this.host, this.server, ar -> {
                if (ar.failed()) {
                    future.completeExceptionally(ar.cause());
                } else {
                    connected.set(true);
                    future.complete(msg);
                }
            });
            return future;
        }).flatMapCompletionStage(msg -> {
            CompletableFuture done = new CompletableFuture();
            String actualTopictoBeUsed = this.topic;
            MqttQoS qos = MqttQoS.valueOf((int)this.qos);
            boolean isRetain = false;
            if (msg instanceof SendingMqttMessage) {
                SendingMqttMessage mm = (SendingMqttMessage)msg;
                actualTopictoBeUsed = mm.getTopic() == null ? this.topic : mm.getTopic();
                qos = mm.getQosLevel() == null ? qos : mm.getQosLevel();
                isRetain = mm.isRetain();
            }
            if (actualTopictoBeUsed == null) {
                LOGGER.error((Object)"Ignoring message - no topic set");
                return CompletableFuture.completedFuture(msg);
            }
            this.client.publish(actualTopictoBeUsed, this.convert(msg.getPayload()), qos, false, isRetain, res -> {
                if (res.failed()) {
                    done.completeExceptionally(res.cause());
                } else {
                    done.complete(res.result());
                }
            });
            return done;
        }).onComplete(() -> ((MqttClient)this.client).disconnect()).ignore();
    }

    private io.vertx.reactivex.core.buffer.Buffer convert(Object payload) {
        if (payload instanceof JsonObject) {
            return new io.vertx.reactivex.core.buffer.Buffer(((JsonObject)payload).toBuffer());
        }
        if (payload instanceof JsonArray) {
            return new io.vertx.reactivex.core.buffer.Buffer(((JsonArray)payload).toBuffer());
        }
        if (payload instanceof String || payload.getClass().isPrimitive()) {
            return new io.vertx.reactivex.core.buffer.Buffer(Buffer.buffer((String)payload.toString()));
        }
        if (payload instanceof byte[]) {
            return new io.vertx.reactivex.core.buffer.Buffer(Buffer.buffer((byte[])((byte[])payload)));
        }
        if (payload instanceof io.vertx.reactivex.core.buffer.Buffer) {
            return (io.vertx.reactivex.core.buffer.Buffer)payload;
        }
        if (payload instanceof Buffer) {
            return new io.vertx.reactivex.core.buffer.Buffer((Buffer)payload);
        }
        return new io.vertx.reactivex.core.buffer.Buffer(Json.encodeToBuffer((Object)payload));
    }

    public SubscriberBuilder<? extends Message, Void> getSink() {
        return this.sink;
    }

    private String getTopicOrNull(Config config) {
        return config.getOptionalValue("topic", String.class).orElseGet(() -> config.getOptionalValue("channel-name", String.class).orElse(null));
    }
}

