/*
 * Decompiled with CFR 0.152.
 */
package io.smallrye.reactive.messaging.mqtt;

import io.smallrye.mutiny.Multi;
import io.smallrye.mutiny.Uni;
import io.smallrye.mutiny.operators.multi.processors.BroadcastProcessor;
import io.vertx.mqtt.MqttClientOptions;
import io.vertx.mutiny.core.Vertx;
import io.vertx.mutiny.mqtt.MqttClient;
import io.vertx.mutiny.mqtt.messages.MqttConnAckMessage;
import io.vertx.mutiny.mqtt.messages.MqttPublishMessage;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

public class Clients {
    private static final Map<String, ClientHolder> clients = new ConcurrentHashMap<String, ClientHolder>();

    private Clients() {
    }

    static Uni<MqttClient> getConnectedClient(Vertx vertx, String host, int port, String server, MqttClientOptions options) {
        String id = host + port + "<" + (server == null ? "" : server) + ">-[" + (options.getClientId() != null ? options.getClientId() : "") + "]";
        ClientHolder holder = clients.computeIfAbsent(id, key -> {
            MqttClient client = MqttClient.create((Vertx)vertx, (MqttClientOptions)options);
            return new ClientHolder(client, host, port, server);
        });
        return holder.connect();
    }

    static ClientHolder getHolder(Vertx vertx, String host, int port, String server, MqttClientOptions options) {
        String id = host + port + "<" + (server == null ? "" : server) + ">-[" + (options.getClientId() != null ? options.getClientId() : "") + "]";
        return clients.computeIfAbsent(id, key -> {
            MqttClient client = MqttClient.create((Vertx)vertx, (MqttClientOptions)options);
            return new ClientHolder(client, host, port, server);
        });
    }

    public static void clear() {
        clients.forEach((name, holder) -> holder.close());
        clients.clear();
    }

    public static class ClientHolder {
        private final MqttClient client;
        private final Uni<MqttConnAckMessage> connection;
        private final BroadcastProcessor<MqttPublishMessage> messages;

        public ClientHolder(MqttClient client, String host, int port, String server) {
            this.client = client;
            this.connection = client.connect(port, host, server).memoize().indefinitely();
            this.messages = BroadcastProcessor.create();
            client.publishHandler(arg_0 -> this.messages.onNext(arg_0));
            client.closeHandler(() -> this.messages.onComplete());
            client.exceptionHandler(arg_0 -> this.messages.onError(arg_0));
        }

        public Uni<MqttClient> connect() {
            return this.connection.map(ignored -> this.client);
        }

        public void close() {
            if (this.client.isConnected()) {
                this.client.disconnectAndAwait();
            }
        }

        public Multi<MqttPublishMessage> stream() {
            return this.messages;
        }
    }
}

