/*
 * Decompiled with CFR 0.152.
 */
package com.mulesoft.connectors.mqtt3.internal.source;

import com.mulesoft.connectors.mqtt3.api.MQTT3MessageAttributes;
import com.mulesoft.connectors.mqtt3.api.Topic;
import com.mulesoft.connectors.mqtt3.internal.connection.MQTT3Connection;
import com.mulesoft.connectors.mqtt3.internal.routing.MQTT3ForwardingMessageHandler;
import com.mulesoft.connectors.mqtt3.internal.routing.MQTT3Message;
import java.util.List;
import org.mule.runtime.api.connection.ConnectionException;
import org.mule.runtime.api.connection.ConnectionProvider;
import org.mule.runtime.api.exception.MuleRuntimeException;
import org.mule.runtime.core.api.util.ExceptionUtils;
import org.mule.runtime.extension.api.annotation.Alias;
import org.mule.runtime.extension.api.annotation.param.Connection;
import org.mule.runtime.extension.api.annotation.param.MediaType;
import org.mule.runtime.extension.api.annotation.param.Parameter;
import org.mule.runtime.extension.api.annotation.param.display.DisplayName;
import org.mule.runtime.extension.api.annotation.source.ClusterSupport;
import org.mule.runtime.extension.api.annotation.source.SourceClusterSupport;
import org.mule.runtime.extension.api.runtime.operation.Result;
import org.mule.runtime.extension.api.runtime.source.Source;
import org.mule.runtime.extension.api.runtime.source.SourceCallback;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@MediaType(value="*/*")
@Alias(value="listener")
@DisplayName(value="On New Message")
@ClusterSupport(value=SourceClusterSupport.DEFAULT_PRIMARY_NODE_ONLY)
public class MQTT3MessageListener
extends Source<byte[], MQTT3MessageAttributes> {
    private static final Logger LOGGER = LoggerFactory.getLogger(MQTT3MessageListener.class);
    @Parameter
    private List<Topic> topics;
    @Connection
    private ConnectionProvider<MQTT3Connection> connectionProvider;
    private MQTT3Connection connection;
    private MQTT3ForwardingMessageHandler messageHandler;

    public void onStart(SourceCallback<byte[], MQTT3MessageAttributes> sourceCallback) {
        try {
            LOGGER.debug("Starting MQTT3 Source");
            this.connection = (MQTT3Connection)this.connectionProvider.connect();
            this.connection.setConnectionLostHandler(throwable -> sourceCallback.onConnectionException(new ConnectionException(throwable, (Object)this.connection)));
            this.messageHandler = new MQTT3ForwardingMessageHandler(message -> sourceCallback.handle(this.buildResult(message)));
            this.connection.subscribeListenerToTopics(this.topics, this.messageHandler);
        }
        catch (Throwable e) {
            if (ExceptionUtils.extractConnectionException((Throwable)e).isPresent()) {
                throw new MuleRuntimeException((Throwable)ExceptionUtils.extractConnectionException((Throwable)e).get());
            }
            throw new MuleRuntimeException(e);
        }
    }

    public void onStop() {
        if (this.connection == null) {
            return;
        }
        try {
            this.connection.unsubscribeListenerFromTopics(this.topics, this.messageHandler);
            this.connectionProvider.disconnect((Object)this.connection);
        }
        catch (Exception e) {
            LOGGER.error("Error occurred unsubscribing listeners from topics:" + this.topics.toString() + " " + e.getMessage(), (Throwable)e);
        }
    }

    private Result<byte[], MQTT3MessageAttributes> buildResult(MQTT3Message mqttMessage) {
        return Result.builder().attributes((Object)MQTT3MessageAttributes.Builder.newInstance().withTopic(mqttMessage.getTopic()).withMessageId(mqttMessage.getId()).withDuplicate(mqttMessage.getIsDuplicate()).withRetained(mqttMessage.getIsRetained()).withQoS(mqttMessage.getQoS()).build()).output((Object)mqttMessage.getContent()).build();
    }
}

