/*
 * Decompiled with CFR 0.152.
 */
package io.micronaut.mqtt.v5.intercept;

import io.micronaut.context.BeanContext;
import io.micronaut.mqtt.bind.MqttBinderRegistry;
import io.micronaut.mqtt.bind.MqttBindingContext;
import io.micronaut.mqtt.exception.MqttSubscriberException;
import io.micronaut.mqtt.exception.MqttSubscriberExceptionHandler;
import io.micronaut.mqtt.intercept.AbstractMqttSubscriberAdvice;
import io.micronaut.mqtt.v5.bind.MqttV5BindingContext;
import jakarta.inject.Singleton;
import java.util.Arrays;
import java.util.Set;
import java.util.function.Consumer;
import org.eclipse.paho.mqttv5.client.IMqttToken;
import org.eclipse.paho.mqttv5.client.MqttAsyncClient;
import org.eclipse.paho.mqttv5.common.MqttException;
import org.eclipse.paho.mqttv5.common.MqttMessage;
import org.eclipse.paho.mqttv5.common.MqttSubscription;
import org.eclipse.paho.mqttv5.common.packet.MqttProperties;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Singleton
public class MqttSubscriberAdvice
extends AbstractMqttSubscriberAdvice<MqttMessage> {
    private static final Logger LOG = LoggerFactory.getLogger(MqttSubscriberAdvice.class);
    private final MqttAsyncClient mqttAsyncClient;

    public MqttSubscriberAdvice(BeanContext beanContext, MqttBinderRegistry binderRegistry, MqttSubscriberExceptionHandler exceptionHandler, MqttAsyncClient mqttAsyncClient) {
        super(beanContext, binderRegistry, exceptionHandler);
        this.mqttAsyncClient = mqttAsyncClient;
    }

    public void subscribe(String[] topics, int[] qos, Consumer<MqttBindingContext<MqttMessage>> callback) {
        try {
            MqttProperties props = new MqttProperties();
            props.setSubscriptionIdentifiers(Arrays.asList(0));
            MqttSubscription[] subscriptions = new MqttSubscription[topics.length];
            for (int i = 0; i < topics.length; ++i) {
                subscriptions[i] = new MqttSubscription(topics[i], qos[i]);
            }
            this.mqttAsyncClient.subscribe(subscriptions, null, null, (actualTopic, message) -> {
                MqttV5BindingContext context = new MqttV5BindingContext(this.mqttAsyncClient, message);
                context.setTopic(actualTopic);
                callback.accept(context);
            }, props);
        }
        catch (MqttException e) {
            throw new MqttSubscriberException(String.format("Failed to subscribe to the topics: %s", new Object[]{topics}), (Throwable)e);
        }
    }

    public void unsubscribe(Set<String> topics) {
        block2: {
            try {
                IMqttToken token = this.mqttAsyncClient.unsubscribe(topics.toArray(new String[0]));
                token.waitForCompletion();
            }
            catch (MqttException e) {
                if (!LOG.isWarnEnabled()) break block2;
                LOG.warn("Failed to unsubscribe from the subscribed topics", (Throwable)e);
            }
        }
    }
}

