/*
 * Decompiled with CFR 0.152.
 */
package com.mulesoft.connectors.mqtt3.internal.connection;

import com.mulesoft.connectors.mqtt3.api.QoS;
import com.mulesoft.connectors.mqtt3.api.Topic;
import com.mulesoft.connectors.mqtt3.internal.connection.MQTT3Connection;
import com.mulesoft.connectors.mqtt3.internal.connection.MQTT3ConnectionOptions;
import com.mulesoft.connectors.mqtt3.internal.connection.MQTT3FilePersistenceOptions;
import com.mulesoft.connectors.mqtt3.internal.connection.MQTT3PublishActionListener;
import com.mulesoft.connectors.mqtt3.internal.connection.MQTT3SubscriptionSuccessListener;
import com.mulesoft.connectors.mqtt3.internal.exceptions.MQTT3ConnectionExceptionResolver;
import com.mulesoft.connectors.mqtt3.internal.exceptions.MQTT3Error;
import com.mulesoft.connectors.mqtt3.internal.exceptions.MQTT3InvalidTopicException;
import com.mulesoft.connectors.mqtt3.internal.exceptions.MQTT3PersistenceException;
import com.mulesoft.connectors.mqtt3.internal.exceptions.MQTT3PublishException;
import com.mulesoft.connectors.mqtt3.internal.routing.DefaultMQTT3Message;
import com.mulesoft.connectors.mqtt3.internal.routing.LWTMessage;
import com.mulesoft.connectors.mqtt3.internal.routing.MQTT3MessageHandler;
import com.mulesoft.connectors.mqtt3.internal.routing.MQTT3TopicRouter;
import com.mulesoft.connectors.mqtt3.internal.source.MQTT3ConnectionLostHandler;
import java.security.KeyManagementException;
import java.security.NoSuchAlgorithmException;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.net.SocketFactory;
import org.apache.commons.lang3.builder.EqualsBuilder;
import org.eclipse.paho.client.mqttv3.IMqttActionListener;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.IMqttToken;
import org.eclipse.paho.client.mqttv3.MqttAsyncClient;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttCallbackExtended;
import org.eclipse.paho.client.mqttv3.MqttClientPersistence;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.MqttPersistenceException;
import org.eclipse.paho.client.mqttv3.MqttSecurityException;
import org.eclipse.paho.client.mqttv3.MqttTopic;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.eclipse.paho.client.mqttv3.persist.MqttDefaultFilePersistence;
import org.mule.runtime.api.connection.ConnectionException;
import org.mule.runtime.api.exception.MuleRuntimeException;
import org.mule.runtime.api.tls.TlsContextFactory;
import org.mule.runtime.api.util.Reference;
import org.mule.runtime.extension.api.error.ErrorTypeDefinition;
import org.mule.runtime.extension.api.exception.ModuleException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class DefaultMQTT3Connection
implements MQTT3Connection {
    private static final String ROOT_TOPIC = "#";
    private static final Logger LOGGER = LoggerFactory.getLogger(DefaultMQTT3Connection.class);
    private static final int COMPLETION_WAIT_TIMEOUT_MILLIS = 10000;
    public static final int ADDITIONAL_PROCESSING_TIME = 3;
    private final MqttConnectOptions mqttConnectOptions = new MqttConnectOptions();
    private MQTT3TopicRouter topicRouter;
    private MqttAsyncClient mqttClient;
    private IMqttToken mqttToken;
    private AtomicBoolean notifiedReconnect = new AtomicBoolean(false);
    private int connectionTimeoutMultiplier = 1;

    public DefaultMQTT3Connection(String url, String clientId, MQTT3ConnectionOptions connectionOptions, MQTT3FilePersistenceOptions filePersistenceOptions, LWTMessage lwtMessage) throws ConnectionException {
        this.topicRouter = new MQTT3TopicRouter((topicFilter, topic) -> MqttTopic.isMatched((String)topicFilter, (String)topic));
        try {
            this.mqttClient = filePersistenceOptions.getEnableFilePersistence() ? (filePersistenceOptions.getDataStorePath() != null && !filePersistenceOptions.getDataStorePath().isEmpty() ? new MqttAsyncClient(url, clientId, (MqttClientPersistence)new MqttDefaultFilePersistence(filePersistenceOptions.getDataStorePath())) : new MqttAsyncClient(url, clientId, (MqttClientPersistence)new MqttDefaultFilePersistence())) : new MqttAsyncClient(url, clientId, (MqttClientPersistence)new MemoryPersistence());
        }
        catch (MqttException mqttException) {
            LOGGER.error("Failed to initialize mqttConnection, check that your connection parameters are correct." + mqttException.getMessage(), (Throwable)mqttException);
            throw new ConnectionException((Throwable)mqttException, (Object)this);
        }
        long keepAliveIntervalSeconds = TimeUnit.SECONDS.convert(connectionOptions.getKeepAliveInterval(), connectionOptions.getKeepAliveIntervalUnit());
        this.mqttConnectOptions.setConnectionTimeout((int)TimeUnit.SECONDS.convert(connectionOptions.getConnectionTimeout(), connectionOptions.getConnectionTimeoutUnit()));
        this.mqttConnectOptions.setCleanSession(connectionOptions.getCleanSession());
        this.mqttConnectOptions.setKeepAliveInterval((int)keepAliveIntervalSeconds);
        this.mqttConnectOptions.setMaxInflight(connectionOptions.getMaxInFlight());
        this.setLastWillAndTestamentMessage(lwtMessage);
    }

    private DefaultMQTT3Connection() {
    }

    public boolean isCleanSessionEnabled() {
        return this.mqttConnectOptions.isCleanSession();
    }

    @Override
    public void setUsernamePassword(String username, String password) {
        this.mqttConnectOptions.setUserName(username);
        if (password != null) {
            this.mqttConnectOptions.setPassword(password.toCharArray());
        }
    }

    @Override
    public void setFailOverServers(String[] failOverServerArray) {
        this.mqttConnectOptions.setServerURIs(failOverServerArray);
        this.connectionTimeoutMultiplier = failOverServerArray.length;
    }

    public void connect() throws ConnectionException {
        Reference throwableReference;
        block9: {
            throwableReference = new Reference();
            try {
                final CountDownLatch latch = new CountDownLatch(1);
                this.mqttToken = this.mqttClient.connect(this.mqttConnectOptions, null, new IMqttActionListener(){

                    public void onSuccess(IMqttToken asyncActionToken) {
                        throwableReference.set(null);
                        LOGGER.debug("Successfully connected to " + DefaultMQTT3Connection.this.mqttClient.getCurrentServerURI());
                        latch.countDown();
                    }

                    public void onFailure(IMqttToken asyncActionToken, Throwable exception) {
                        LOGGER.error("Error occurred establishing connection to " + DefaultMQTT3Connection.this.mqttClient.getCurrentServerURI() + ":" + exception.getMessage(), exception);
                        throwableReference.set((Object)exception);
                        latch.countDown();
                    }
                });
                if (!latch.await((this.mqttConnectOptions.getConnectionTimeout() + 3) * this.connectionTimeoutMultiplier, TimeUnit.SECONDS)) {
                    throwableReference.set((Object)new ConnectionException("Error occurred attempting to establish connection"));
                }
            }
            catch (MqttSecurityException securityException) {
                throw new ModuleException("Error connecting to mqtt broker: not authorized to connect", (ErrorTypeDefinition)MQTT3Error.UNAUTHORIZED, (Throwable)securityException);
            }
            catch (InterruptedException | MqttException mqttException) {
                if (!this.notifiedReconnect.compareAndSet(false, true)) break block9;
                LOGGER.error("Error occurred attempting to establish connection to mqtt broker " + mqttException, mqttException);
                throw new ConnectionException(mqttException, (Object)this);
            }
        }
        Throwable throwable = (Throwable)throwableReference.get();
        if (throwable != null) {
            Optional<ConnectionException> mqtt3ConnectionException = MQTT3ConnectionExceptionResolver.resolveMQTT3ConnectionException(throwable, this);
            if (!this.isConnected() && mqtt3ConnectionException.isPresent()) {
                if (this.notifiedReconnect.compareAndSet(false, true)) {
                    LOGGER.error("Error occurred attempting to establish connection to mqtt broker " + throwableReference.get(), throwable);
                    throw mqtt3ConnectionException.get();
                }
            } else {
                if (throwable instanceof MqttSecurityException) {
                    throw new ModuleException("Error connecting to mqtt broker: not authorized to connect", (ErrorTypeDefinition)MQTT3Error.UNAUTHORIZED, throwable);
                }
                throw new MuleRuntimeException(throwable);
            }
        }
    }

    @Override
    public void subscribeListenerToTopics(List<Topic> topics, MQTT3MessageHandler messageHandler) throws ConnectionException {
        try {
            List<Topic> newTopicsSubscriptionList = this.topicRouter.registerCallbackForTopics(topics, messageHandler);
            String[] topicsArray = (String[])newTopicsSubscriptionList.stream().map(Topic::getTopicFilter).toArray(String[]::new);
            int[] qosArray = newTopicsSubscriptionList.stream().map(Topic::getQos).map(QoS::getValue).mapToInt(Integer::intValue).toArray();
            LOGGER.debug("Subscribing to topics: {}, with QOS {}", (Object)topicsArray, (Object)qosArray);
            if (topics.stream().anyMatch(topic -> topic.getTopicFilter().equals(ROOT_TOPIC))) {
                LOGGER.warn("Issuing subscription request for the root topic #. This is not advisable, you will receive all messages issued to all topics.");
            }
            this.subscribe(topicsArray, qosArray);
        }
        catch (Exception exception) {
            LOGGER.error("Exception occurred during subscription to topics " + topics, (Throwable)exception);
            throw exception;
        }
    }

    @Override
    public void unsubscribeListenerFromTopics(List<Topic> topics, MQTT3MessageHandler messageHandler) {
        try {
            List<Topic> deletedTopics = this.topicRouter.deregisterCallbackForTopics(topics, messageHandler);
            if (this.isCleanSessionEnabled() && !deletedTopics.isEmpty()) {
                Object[] deletedTopicsArray = (String[])deletedTopics.stream().map(Topic::getTopicFilter).toArray(String[]::new);
                try {
                    LOGGER.debug("Unsubscribing from topics: {}", deletedTopicsArray);
                    this.mqttClient.unsubscribe((String[])deletedTopicsArray);
                }
                catch (MqttException exception) {
                    LOGGER.error("Error occurred unsubscribing from topics {}: {}", (Object)deletedTopicsArray, (Object)exception);
                }
            }
        }
        catch (Exception exception) {
            LOGGER.error("Error unsubscribing callbacks for topics " + topics);
        }
    }

    @Override
    public void setConnectionLostHandler(final MQTT3ConnectionLostHandler mqtt3ConnectionLostHandler) {
        MqttCallbackExtended callbackExtended = new MqttCallbackExtended(){

            public void connectComplete(boolean isReconnecting, String serverURI) {
                DefaultMQTT3Connection.this.notifiedReconnect.set(false);
                if (isReconnecting && DefaultMQTT3Connection.this.mqttConnectOptions.isCleanSession()) {
                    List<Topic> subscriptions = DefaultMQTT3Connection.this.topicRouter.getDistinctTopicFilters();
                    if (subscriptions.isEmpty()) {
                        return;
                    }
                    Object[] topicFilters = new String[subscriptions.size()];
                    int[] qosForTopics = new int[subscriptions.size()];
                    for (int i = 0; i < subscriptions.size(); ++i) {
                        topicFilters[i] = subscriptions.get(i).getTopicFilter();
                        qosForTopics[i] = subscriptions.get(i).getQos().getValue();
                    }
                    if (LOGGER.isDebugEnabled()) {
                        LOGGER.debug("Reconnect to {} complete.", (Object)serverURI);
                        LOGGER.debug("Recovering subscriptions to topics {}", topicFilters);
                    }
                    try {
                        IMqttToken token = DefaultMQTT3Connection.this.mqttClient.subscribe((String[])topicFilters, qosForTopics, null, (IMqttActionListener)new MQTT3SubscriptionSuccessListener());
                        token.waitForCompletion(10000L);
                    }
                    catch (MqttException ex) {
                        LOGGER.error("Re-subscribe after reconnection failed for " + serverURI + " with error " + (Object)((Object)ex));
                    }
                }
            }

            public void connectionLost(Throwable throwable) {
                if (DefaultMQTT3Connection.this.notifiedReconnect.compareAndSet(false, true)) {
                    mqtt3ConnectionLostHandler.onConnectionLost(throwable);
                }
            }

            public void messageArrived(String topic, MqttMessage mqttMessage) throws Exception {
                DefaultMQTT3Connection.this.topicRouter.handleMessageArrived(new DefaultMQTT3Message(mqttMessage.getId(), topic, mqttMessage.getPayload(), mqttMessage.getQos(), mqttMessage.isDuplicate(), mqttMessage.isRetained()));
            }

            public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
            }
        };
        this.mqttClient.setCallback((MqttCallback)callbackExtended);
    }

    @Override
    public boolean isConnected() {
        return this.mqttClient.isConnected();
    }

    private void close() {
        try {
            this.mqttClient.close(true);
        }
        catch (MqttException mqttException) {
            LOGGER.error("Error occurred while attempting to close connection.", (Throwable)mqttException);
        }
    }

    @Override
    public CompletableFuture<Integer> publish(String topic, byte[] message, int qos, boolean isRetained) throws MQTT3PublishException, ConnectionException {
        CompletableFuture<Integer> future = new CompletableFuture<Integer>();
        try {
            this.mqttClient.publish(topic, message, qos, isRetained, null, (IMqttActionListener)new MQTT3PublishActionListener(future, this));
        }
        catch (IllegalArgumentException e) {
            LOGGER.error("IllegalArgumentException found performing publish operation: " + e.getMessage());
            throw new MQTT3InvalidTopicException(e);
        }
        catch (MqttPersistenceException persistenceException) {
            LOGGER.error("MqttPersistenceException found performing publish operation: " + persistenceException.getMessage());
            throw new MQTT3PersistenceException(persistenceException);
        }
        catch (Throwable mqttException) {
            Optional<ConnectionException> connException = MQTT3ConnectionExceptionResolver.resolveMQTT3ConnectionException(mqttException, this);
            if (connException.isPresent()) {
                LOGGER.error("MqttConnectionException found performing publish operation: " + mqttException.getMessage());
                throw connException.get();
            }
            LOGGER.error("MqttException found performing publish operation: " + mqttException.getMessage());
            throw new MQTT3PublishException(mqttException);
        }
        return future;
    }

    private void subscribe(String[] topics, int[] subscriptionQoSArray) throws ConnectionException {
        try {
            this.mqttToken = this.mqttClient.subscribe(topics, subscriptionQoSArray, null, (IMqttActionListener)new MQTT3SubscriptionSuccessListener());
        }
        catch (MqttException exception) {
            LOGGER.error("Subscription failed for topics " + topics + " with error:" + exception.getMessage(), (Throwable)exception);
            throw new ConnectionException((Throwable)exception, (Object)this);
        }
    }

    @Override
    public boolean isSessionPresent() {
        return this.mqttToken.getSessionPresent();
    }

    private void setLastWillAndTestamentMessage(LWTMessage lwtMessage) {
        if (lwtMessage.getBody() == null || lwtMessage.getTopic() == null) {
            return;
        }
        this.mqttConnectOptions.setWill(lwtMessage.getTopic(), lwtMessage.getBody().getBytes(), lwtMessage.getQoS().getValue(), lwtMessage.isRetained());
    }

    @Override
    public void setTLSOptions(TlsContextFactory tlsContextFactory) throws ConnectionException {
        try {
            this.mqttConnectOptions.setSocketFactory((SocketFactory)tlsContextFactory.createSocketFactory());
        }
        catch (KeyManagementException | NoSuchAlgorithmException e) {
            throw new ConnectionException((Throwable)e, (Object)this);
        }
    }

    @Override
    public void disconnect() {
        try {
            this.mqttClient.disconnect().waitForCompletion(10000L);
        }
        catch (MqttException mqttException) {
            LOGGER.error(mqttException.getMessage(), (Throwable)mqttException);
            try {
                LOGGER.error("Error occurred while attempting to disconnect client " + this.mqttClient.getClientId());
                LOGGER.error(" Attempting to forcibly disconnect...");
                this.mqttClient.disconnectForcibly(10000L);
            }
            catch (MqttException mqttException2) {
                LOGGER.error("Error occurred while attempting to forcibly disconnect client " + this.mqttClient.getClientId() + ": " + (Object)((Object)mqttException2), (Throwable)mqttException2);
            }
        }
        finally {
            this.close();
        }
    }

    public boolean equals(Object o) {
        if (this == o) {
            return true;
        }
        if (!(o instanceof DefaultMQTT3Connection)) {
            return false;
        }
        DefaultMQTT3Connection that = (DefaultMQTT3Connection)o;
        return new EqualsBuilder().append((Object)this.mqttClient, (Object)that.mqttClient).append(this.connectionTimeoutMultiplier, that.connectionTimeoutMultiplier).append((Object)this.mqttConnectOptions, (Object)that.mqttConnectOptions).isEquals();
    }

    public int hashCode() {
        return Objects.hash(this.mqttClient, this.connectionTimeoutMultiplier, this.mqttConnectOptions);
    }
}

