/*
 * Decompiled with CFR 0.152.
 */
package io.micronaut.mqtt.v3.intercept;

import io.micronaut.context.BeanContext;
import io.micronaut.mqtt.bind.MqttBinderRegistry;
import io.micronaut.mqtt.bind.MqttBindingContext;
import io.micronaut.mqtt.exception.MqttSubscriberException;
import io.micronaut.mqtt.exception.MqttSubscriberExceptionHandler;
import io.micronaut.mqtt.intercept.AbstractMqttSubscriberAdvice;
import io.micronaut.mqtt.v3.bind.MqttV3BindingContext;
import jakarta.inject.Singleton;
import java.util.Arrays;
import java.util.Set;
import java.util.function.Consumer;
import org.eclipse.paho.client.mqttv3.IMqttMessageListener;
import org.eclipse.paho.client.mqttv3.IMqttToken;
import org.eclipse.paho.client.mqttv3.MqttAsyncClient;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Singleton
public class MqttSubscriberAdvice
extends AbstractMqttSubscriberAdvice<MqttMessage> {
    private static final Logger LOG = LoggerFactory.getLogger(MqttSubscriberAdvice.class);
    private final MqttAsyncClient mqttAsyncClient;

    public MqttSubscriberAdvice(BeanContext beanContext, MqttBinderRegistry binderRegistry, MqttSubscriberExceptionHandler exceptionHandler, MqttAsyncClient mqttAsyncClient) {
        super(beanContext, binderRegistry, exceptionHandler);
        this.mqttAsyncClient = mqttAsyncClient;
    }

    public void subscribe(String[] topics, int[] qos, Consumer<MqttBindingContext<MqttMessage>> callback) {
        try {
            IMqttMessageListener messageListener = (actualTopic, message) -> {
                MqttV3BindingContext context = new MqttV3BindingContext(this.mqttAsyncClient, message);
                context.setTopic(actualTopic);
                callback.accept(context);
            };
            Object[] listeners = new IMqttMessageListener[topics.length];
            Arrays.fill(listeners, messageListener);
            this.mqttAsyncClient.subscribe(topics, qos, (IMqttMessageListener[])listeners);
        }
        catch (MqttException e) {
            throw new MqttSubscriberException(String.format("Failed to subscribe to the topics: %s", new Object[]{topics}), (Throwable)e);
        }
    }

    public void unsubscribe(Set<String> topics) {
        block2: {
            try {
                IMqttToken token = this.mqttAsyncClient.unsubscribe(topics.toArray(new String[0]));
                token.waitForCompletion();
            }
            catch (MqttException e) {
                if (!LOG.isWarnEnabled()) break block2;
                LOG.warn("Failed to unsubscribe from the subscribed topics", (Throwable)e);
            }
        }
    }
}

