/*
 * Decompiled with CFR 0.152.
 */
package io.smallrye.reactive.messaging.mqtt;

import io.reactivex.BackpressureStrategy;
import io.reactivex.Observable;
import io.smallrye.reactive.messaging.mqtt.MqttMessage;
import io.smallrye.reactive.messaging.mqtt.ReceivingMqttMessage;
import io.vertx.mqtt.MqttClientOptions;
import io.vertx.reactivex.core.Vertx;
import io.vertx.reactivex.mqtt.MqttClient;
import io.vertx.reactivex.mqtt.messages.MqttPublishMessage;
import java.util.NoSuchElementException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.eclipse.microprofile.config.Config;
import org.eclipse.microprofile.reactive.streams.operators.PublisherBuilder;
import org.eclipse.microprofile.reactive.streams.operators.ReactiveStreams;
import org.reactivestreams.Publisher;

public class MqttSource {
    private final PublisherBuilder<MqttMessage> source;
    private AtomicBoolean subscribed = new AtomicBoolean();

    public MqttSource(Vertx vertx, Config config) {
        MqttClientOptions options = new MqttClientOptions();
        options.setClientId((String)config.getOptionalValue("client-id", String.class).orElse(null));
        options.setAutoGeneratedClientId(config.getOptionalValue("auto-generated-client-id", Boolean.class).orElse(false).booleanValue());
        options.setAutoKeepAlive(config.getOptionalValue("auto-keep-alive", Boolean.class).orElse(true).booleanValue());
        options.setSsl(config.getOptionalValue("ssl", Boolean.class).orElse(false).booleanValue());
        options.setWillQoS(config.getOptionalValue("will-qos", Integer.class).orElse(0).intValue());
        options.setKeepAliveTimeSeconds(config.getOptionalValue("keep-alive-seconds", Integer.class).orElse(30).intValue());
        options.setMaxInflightQueue(config.getOptionalValue("max-inflight-queue", Integer.class).orElse(10).intValue());
        options.setCleanSession(config.getOptionalValue("auto-clean-session", Boolean.class).orElse(true).booleanValue());
        options.setWillFlag(config.getOptionalValue("will-flag", Boolean.class).orElse(false).booleanValue());
        options.setWillRetain(config.getOptionalValue("will-retain", Boolean.class).orElse(false).booleanValue());
        options.setMaxMessageSize(config.getOptionalValue("max-message-size", Integer.class).orElse(-1).intValue());
        options.setReconnectAttempts(config.getOptionalValue("reconnect-attempts", Integer.class).orElse(5).intValue());
        options.setReconnectInterval(TimeUnit.SECONDS.toMillis(config.getOptionalValue("reconnect-interval-seconds", Integer.class).orElse(1).intValue()));
        options.setUsername((String)config.getOptionalValue("username", String.class).orElse(null));
        options.setPassword((String)config.getOptionalValue("password", String.class).orElse(null));
        options.setConnectTimeout((int)TimeUnit.SECONDS.toMillis(config.getOptionalValue("connect-timeout-seconds", Integer.class).orElse(60).intValue()));
        options.setTrustAll(config.getOptionalValue("trust-all", Boolean.class).orElse(false).booleanValue());
        String host = (String)config.getOptionalValue("host", String.class).orElseThrow(() -> new NoSuchElementException("Invalid configuration - expected key `host` to be present in " + config.getPropertyNames()));
        int def = options.isSsl() ? 8883 : 1883;
        int port = config.getOptionalValue("port", Integer.class).orElse(def);
        String server = config.getOptionalValue("server-name", String.class).orElse(null);
        String topic = this.getTopicOrFail(config);
        MqttClient client = MqttClient.create((Vertx)vertx, (MqttClientOptions)options);
        int qos = config.getOptionalValue("qos", Integer.class).orElse(0);
        boolean broadcast = config.getOptionalValue("broadcast", Boolean.class).orElse(false);
        this.source = ReactiveStreams.fromPublisher((Publisher)client.rxConnect(port, host, server).flatMapObservable(a -> Observable.create(emitter -> {
            client.publishHandler(message -> emitter.onNext((Object)new ReceivingMqttMessage((MqttPublishMessage)message)));
            client.subscribe(topic, qos, done -> {
                if (done.failed()) {
                    emitter.onError(done.cause());
                }
                this.subscribed.set(done.succeeded());
            });
        })).toFlowable(BackpressureStrategy.BUFFER).compose(f -> {
            if (broadcast) {
                return f.publish().autoConnect();
            }
            return f;
        }).doOnCancel(() -> {
            this.subscribed.set(false);
            client.disconnect();
        }));
    }

    PublisherBuilder<MqttMessage> getSource() {
        return this.source;
    }

    public boolean isSubscribed() {
        return this.subscribed.get();
    }

    private String getTopicOrFail(Config config) {
        return config.getOptionalValue("topic", String.class).orElseGet(() -> (String)config.getOptionalValue("channel-name", String.class).orElseThrow(() -> new IllegalArgumentException("Topic attribute must be set")));
    }
}

