/*
 * Decompiled with CFR 0.152.
 */
package com.mulesoft.connectors.mqtt.internal;

import com.mulesoft.connectors.mqtt.api.MQTTAttributes;
import com.mulesoft.connectors.mqtt.api.QoS;
import com.mulesoft.connectors.mqtt.api.Topic;
import com.mulesoft.connectors.mqtt.internal.ConnectionUtills;
import com.mulesoft.connectors.mqtt.internal.MQTTSession;
import java.util.List;
import java.util.function.Consumer;
import org.eclipse.paho.client.mqttv3.IMqttMessageListener;
import org.eclipse.paho.client.mqttv3.IMqttToken;
import org.eclipse.paho.client.mqttv3.MqttAsyncClient;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.mule.runtime.api.connection.ConnectionException;
import org.mule.runtime.api.connection.ConnectionProvider;
import org.mule.runtime.api.exception.MuleException;
import org.mule.runtime.api.util.LazyValue;
import org.mule.runtime.extension.api.annotation.Alias;
import org.mule.runtime.extension.api.annotation.param.Connection;
import org.mule.runtime.extension.api.annotation.param.MediaType;
import org.mule.runtime.extension.api.annotation.param.Parameter;
import org.mule.runtime.extension.api.annotation.param.display.DisplayName;
import org.mule.runtime.extension.api.runtime.operation.Result;
import org.mule.runtime.extension.api.runtime.source.Source;
import org.mule.runtime.extension.api.runtime.source.SourceCallback;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@MediaType(value="text/plain")
@Alias(value="listener")
@DisplayName(value="On New Message")
public class MQTTListener
extends Source<byte[], MQTTAttributes> {
    private final Logger LOGGER = LoggerFactory.getLogger(MQTTListener.class);
    @Connection
    private ConnectionProvider<MQTTSession> clientProvider;
    @Parameter
    private List<Topic> topics;
    private IMqttToken subscription;
    private MqttAsyncClient client;
    private LazyValue<String[]> calculatedTopics = new LazyValue(() -> (String[])this.topics.stream().map(Topic::getTopicFilter).toArray(String[]::new));
    private MQTTSession session;
    private Consumer<Throwable> onDisconnectCallback;

    public void onStart(SourceCallback<byte[], MQTTAttributes> sourceCallback) throws MuleException {
        if (this.LOGGER.isDebugEnabled()) {
            this.LOGGER.debug("Initializing Subscription to: " + this.topics);
        }
        this.session = (MQTTSession)this.clientProvider.connect();
        this.client = this.session.getClient();
        this.onDisconnectCallback = t -> sourceCallback.onConnectionException(new ConnectionException(t, (Object)this.client));
        try {
            String[] topics = this.getTopics();
            int[] qos = this.topics.stream().map(Topic::getQos).map(QoS::getValue).mapToInt(Integer::intValue).toArray();
            IMqttMessageListener[] listeners = (IMqttMessageListener[])this.topics.stream().map(t -> this.getMessageListener(sourceCallback)).toArray(IMqttMessageListener[]::new);
            this.subscription = this.client.subscribe(topics, qos, listeners);
            this.session.registerOnDisconnectCallback(this.onDisconnectCallback);
        }
        catch (MqttException e) {
            if (ConnectionUtills.isConnectionException(e)) {
                try {
                    Thread.sleep(500L);
                }
                catch (InterruptedException interruptedException) {
                    // empty catch block
                }
                sourceCallback.onConnectionException(new ConnectionException((Throwable)e));
            }
            sourceCallback.onConnectionException(new ConnectionException((Throwable)e));
        }
    }

    private String[] getTopics() {
        return (String[])this.calculatedTopics.get();
    }

    private IMqttMessageListener getMessageListener(SourceCallback<byte[], MQTTAttributes> sourceCallback) {
        return (topic, mqttMessage) -> sourceCallback.handle(Result.builder().output((Object)mqttMessage.getPayload()).attributes((Object)new MQTTAttributes(topic, mqttMessage)).build());
    }

    public void onStop() {
        if (this.subscription != null) {
            try {
                this.subscription.getClient().unsubscribe(this.getTopics());
            }
            catch (MqttException e) {
                this.LOGGER.debug("Error when trying to unsubscribe", (Throwable)e);
            }
        }
        if (this.session != null) {
            this.session.removeCallback(this.onDisconnectCallback);
        }
    }
}

