/*
 * Decompiled with CFR 0.152.
 */
package io.smallrye.reactive.messaging.mqtt;

import io.smallrye.mutiny.Multi;
import io.smallrye.mutiny.subscription.BackPressureStrategy;
import io.smallrye.mutiny.subscription.MultiEmitter;
import io.smallrye.reactive.messaging.mqtt.MqttConnectorIncomingConfiguration;
import io.smallrye.reactive.messaging.mqtt.MqttHelpers;
import io.smallrye.reactive.messaging.mqtt.MqttMessage;
import io.smallrye.reactive.messaging.mqtt.ReceivingMqttMessage;
import io.vertx.mqtt.MqttClientOptions;
import io.vertx.mutiny.core.Vertx;
import io.vertx.mutiny.mqtt.MqttClient;
import io.vertx.mutiny.mqtt.messages.MqttPublishMessage;
import java.util.concurrent.atomic.AtomicBoolean;
import org.eclipse.microprofile.reactive.streams.operators.PublisherBuilder;
import org.eclipse.microprofile.reactive.streams.operators.ReactiveStreams;
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class MqttSource {
    private static final Logger LOGGER = LoggerFactory.getLogger(MqttSource.class);
    private final PublisherBuilder<MqttMessage<?>> source;
    private final AtomicBoolean subscribed = new AtomicBoolean();

    public MqttSource(Vertx vertx, MqttConnectorIncomingConfiguration config) {
        MqttClientOptions options = MqttHelpers.createMqttClientOptions(config);
        String host = config.getHost();
        int def = options.isSsl() ? 8883 : 1883;
        int port = config.getPort().orElse(def);
        String server = config.getServerName().orElse(null);
        String topic = config.getTopic().orElseGet(config::getChannel);
        int qos = config.getQos();
        MqttClient client = MqttClient.create((Vertx)vertx, (MqttClientOptions)options);
        boolean broadcast = config.getBroadcast();
        this.source = ReactiveStreams.fromPublisher((Publisher)((Multi)client.connect(port, host, server).onItem().produceMulti(a -> Multi.createFrom().emitter(emitter -> {
            client.publishHandler(message -> emitter.emit((Object)new ReceivingMqttMessage((MqttPublishMessage)message)));
            client.subscribe(topic, qos).subscribe().with(i -> this.subscribed.set(true), arg_0 -> ((MultiEmitter)emitter).fail(arg_0));
        }, BackPressureStrategy.BUFFER)).then(multi -> {
            if (broadcast) {
                return multi.broadcast().toAllSubscribers();
            }
            return multi;
        })).on().cancellation(() -> {
            this.subscribed.set(false);
            client.disconnectAndForget();
        }).onFailure().invoke(t -> LOGGER.error("Unable to establish a connection with the MQTT broker", t)));
    }

    PublisherBuilder<MqttMessage<?>> getSource() {
        return this.source;
    }

    boolean isSubscribed() {
        return this.subscribed.get();
    }
}

