/*
 * Decompiled with CFR 0.152.
 */
package io.smallrye.reactive.messaging.mqtt;

import io.smallrye.mutiny.Multi;
import io.smallrye.reactive.messaging.mqtt.Clients;
import io.smallrye.reactive.messaging.mqtt.MqttConnectorIncomingConfiguration;
import io.smallrye.reactive.messaging.mqtt.MqttHelpers;
import io.smallrye.reactive.messaging.mqtt.MqttMessage;
import io.smallrye.reactive.messaging.mqtt.ReceivingMqttMessage;
import io.vertx.mqtt.MqttClientOptions;
import io.vertx.mutiny.core.Vertx;
import java.util.concurrent.atomic.AtomicBoolean;
import org.eclipse.microprofile.reactive.streams.operators.PublisherBuilder;
import org.eclipse.microprofile.reactive.streams.operators.ReactiveStreams;
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class MqttSource {
    private static final Logger LOGGER = LoggerFactory.getLogger(MqttSource.class);
    private final PublisherBuilder<MqttMessage<?>> source;
    private final AtomicBoolean subscribed = new AtomicBoolean();

    public MqttSource(Vertx vertx, MqttConnectorIncomingConfiguration config) {
        MqttClientOptions options = MqttHelpers.createMqttClientOptions(config);
        String host = config.getHost();
        int def = options.isSsl() ? 8883 : 1883;
        int port = config.getPort().orElse(def);
        String server = config.getServerName().orElse(null);
        String topic = config.getTopic().orElseGet(config::getChannel);
        int qos = config.getQos();
        boolean broadcast = config.getBroadcast();
        Clients.ClientHolder holder = Clients.getHolder(vertx, host, port, server, options);
        this.source = ReactiveStreams.fromPublisher((Publisher)((Multi)holder.connect().onItem().produceMulti(client -> client.subscribe(topic, qos).onItem().produceMulti(x -> {
            this.subscribed.set(true);
            return holder.stream().transform().byFilteringItemsWith(m -> m.topicName().equals(topic)).onItem().apply(ReceivingMqttMessage::new);
        })).then(multi -> {
            if (broadcast) {
                return multi.broadcast().toAllSubscribers();
            }
            return multi;
        })).on().cancellation(() -> this.subscribed.set(false)).onFailure().invoke(t -> LOGGER.error("Unable to establish a connection with the MQTT broker", t)));
    }

    PublisherBuilder<MqttMessage<?>> getSource() {
        return this.source;
    }

    boolean isSubscribed() {
        return this.subscribed.get();
    }
}

