/*
 * Decompiled with CFR 0.152.
 */
package io.smallrye.reactive.messaging.mqtt;

import io.smallrye.mutiny.Multi;
import io.smallrye.mutiny.Uni;
import io.smallrye.reactive.messaging.health.HealthReport;
import io.smallrye.reactive.messaging.mqtt.Clients;
import io.smallrye.reactive.messaging.mqtt.MqttConnectorIncomingConfiguration;
import io.smallrye.reactive.messaging.mqtt.MqttFailStop;
import io.smallrye.reactive.messaging.mqtt.MqttFailureHandler;
import io.smallrye.reactive.messaging.mqtt.MqttIgnoreFailure;
import io.smallrye.reactive.messaging.mqtt.ReceivingMqttMessage;
import io.smallrye.reactive.messaging.mqtt.i18n.MqttExceptions;
import io.smallrye.reactive.messaging.mqtt.i18n.MqttLogging;
import io.smallrye.reactive.messaging.mqtt.internal.MqttHelpers;
import io.smallrye.reactive.messaging.mqtt.internal.MqttTopicHelper;
import io.smallrye.reactive.messaging.mqtt.session.MqttClientSessionOptions;
import io.smallrye.reactive.messaging.mqtt.session.RequestedQoS;
import io.smallrye.reactive.messaging.providers.helpers.VertxContext;
import io.smallrye.reactive.messaging.providers.impl.ConcurrencyConnectorConfig;
import io.vertx.core.Context;
import io.vertx.core.impl.VertxInternal;
import io.vertx.mutiny.core.Vertx;
import io.vertx.mutiny.mqtt.messages.MqttPublishMessage;
import jakarta.enterprise.inject.Instance;
import java.util.concurrent.Flow;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.regex.Pattern;
import org.eclipse.microprofile.config.Config;

public class MqttSource {
    private final Flow.Publisher<ReceivingMqttMessage> source;
    private final AtomicBoolean ready = new AtomicBoolean();
    private final String channel;
    private final Pattern pattern;
    private final boolean healthEnabled;
    private final AtomicBoolean started = new AtomicBoolean();
    private final AtomicBoolean alive = new AtomicBoolean();
    private final Clients.ClientHolder holder;

    public MqttSource(Vertx vertx, MqttConnectorIncomingConfiguration config, Instance<MqttClientSessionOptions> instances) {
        MqttClientSessionOptions options = MqttHelpers.createClientOptions(config, instances);
        this.channel = config.getChannel();
        String topic = config.getTopic().orElse(this.channel);
        int qos = config.getQos();
        boolean broadcast = config.getBroadcast();
        this.healthEnabled = config.getHealthEnabled();
        MqttFailureHandler.Strategy strategy = MqttFailureHandler.Strategy.from(config.getFailureStrategy());
        MqttFailureHandler onNack = this.createFailureHandler(strategy, config.getChannel());
        if (topic.contains("#") || topic.contains("+")) {
            String replace = MqttTopicHelper.escapeTopicSpecialWord(MqttHelpers.rebuildMatchesWithSharedSubscription(topic)).replace("+", "[^/]+").replace("#", ".+");
            this.pattern = Pattern.compile(replace);
        } else {
            this.pattern = null;
        }
        io.vertx.mutiny.core.Context root = ConcurrencyConnectorConfig.getConcurrency((Config)config.config).filter(i -> i > 1).map(__ -> io.vertx.mutiny.core.Context.newInstance((Context)((VertxInternal)vertx.getDelegate()).createEventLoopContext())).orElse(null);
        this.holder = Clients.getHolder(vertx, options);
        this.holder.start().onSuccess(ignore -> this.started.set(true));
        this.holder.getClient().subscribe(topic, RequestedQoS.valueOf(qos)).onFailure(outcome -> MqttLogging.log.info("Subscription failed!")).onSuccess(outcome -> {
            MqttLogging.log.info("Subscription success on topic " + topic + ", Max QoS " + outcome + ".");
            this.alive.set(true);
        });
        this.source = ((Multi)this.holder.stream().select().where(m -> MqttTopicHelper.matches(topic, this.pattern, m)).plug(m -> root != null ? m.emitOn(c -> VertxContext.runOnContext((Context)root.getDelegate(), (Runnable)c)) : m).onItem().transform(m -> new ReceivingMqttMessage((MqttPublishMessage)m, onNack)).stage(multi -> {
            if (broadcast) {
                return multi.broadcast().toAllSubscribers();
            }
            return multi;
        })).onOverflow().buffer(config.getBufferSize().intValue()).onCancellation().call(() -> {
            this.alive.set(false);
            if (config.getUnsubscribeOnDisconnection().booleanValue()) {
                return Uni.createFrom().completionStage(this.holder.getClient().unsubscribe(topic).toCompletionStage());
            }
            return Uni.createFrom().voidItem();
        }).onFailure().invoke(e -> {
            this.alive.set(false);
            MqttLogging.log.unableToConnectToBroker((Throwable)e);
        });
    }

    private MqttFailureHandler createFailureHandler(MqttFailureHandler.Strategy strategy, String channel) {
        switch (strategy) {
            case IGNORE: {
                return new MqttIgnoreFailure(channel);
            }
            case FAIL: {
                return new MqttFailStop(channel);
            }
        }
        throw MqttExceptions.ex.illegalArgumentUnknownStrategy(strategy.toString());
    }

    Flow.Publisher<ReceivingMqttMessage> getSource() {
        return this.source;
    }

    public void isStarted(HealthReport.HealthReportBuilder builder) {
        if (this.healthEnabled) {
            builder.add(this.channel, this.started.get());
        }
    }

    public void isReady(HealthReport.HealthReportBuilder builder) {
        if (this.healthEnabled) {
            builder.add(this.channel, this.holder.getClient().isConnected());
        }
    }

    public void isAlive(HealthReport.HealthReportBuilder builder) {
        if (this.healthEnabled) {
            builder.add(this.channel, this.alive.get());
        }
    }
}

