/*
 * Decompiled with CFR 0.152.
 */
package io.joynr.messaging.mqtt.paho.client;

import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import io.joynr.exceptions.JoynrDelayMessageException;
import io.joynr.exceptions.JoynrIllegalStateException;
import io.joynr.exceptions.JoynrMessageNotSentException;
import io.joynr.exceptions.JoynrRuntimeException;
import io.joynr.messaging.FailureAction;
import io.joynr.messaging.mqtt.IMqttMessagingSkeleton;
import io.joynr.messaging.mqtt.JoynrMqttClient;
import io.joynr.messaging.mqtt.statusmetrics.MqttStatusReceiver;
import java.net.URI;
import java.net.URISyntaxException;
import java.nio.charset.StandardCharsets;
import java.util.HashSet;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.StampedLock;
import joynr.system.RoutingTypes.MqttAddress;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttClientPersistence;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class MqttPahoClient
implements JoynrMqttClient,
MqttCallback {
    private static final Logger logger = LoggerFactory.getLogger(MqttPahoClient.class);
    private MqttClient mqttClient;
    private StampedLock mqttClientLock;
    private boolean isReceiver;
    private IMqttMessagingSkeleton messagingSkeleton;
    private int reconnectSleepMs;
    private int keepAliveTimerSec;
    private int connectionTimeoutSec;
    private int timeToWaitMs;
    private int maxMsgsInflight;
    private int maxMsgSizeBytes;
    private boolean cleanSession;
    private String keyStorePath;
    private String trustStorePath;
    private String keyStoreType;
    private String trustStoreType;
    private String keyStorePWD;
    private String trustStorePWD;
    private String username;
    private String password;
    private MqttStatusReceiver mqttStatusReceiver;
    private boolean separateConnections;
    private boolean isSecureConnection;
    private Set<String> subscribedTopics = new HashSet<String>();
    private AtomicBoolean shutdown = new AtomicBoolean(false);
    private MqttAddress ownAddress;
    private String clientId;
    private ScheduledExecutorService scheduledExecutorService;

    public MqttPahoClient(MqttAddress ownAddress, String clientId, ScheduledExecutorService scheduledExecutorService, int reconnectSleepMS, int keepAliveTimerSec, int connectionTimeoutSec, int timeToWaitMs, int maxMsgsInflight, int maxMsgSizeBytes, boolean cleanSession, boolean isReceiver, boolean separateConnections, String keyStorePath, String trustStorePath, String keyStoreType, String trustStoreType, String keyStorePWD, String trustStorePWD, String username, String password, MqttStatusReceiver mqttStatusReceiver) throws MqttException {
        this.ownAddress = ownAddress;
        this.clientId = clientId;
        this.scheduledExecutorService = scheduledExecutorService;
        this.reconnectSleepMs = reconnectSleepMS;
        this.keepAliveTimerSec = keepAliveTimerSec;
        this.connectionTimeoutSec = connectionTimeoutSec;
        this.timeToWaitMs = timeToWaitMs;
        this.maxMsgsInflight = maxMsgsInflight;
        this.maxMsgSizeBytes = maxMsgSizeBytes;
        this.cleanSession = cleanSession;
        this.isReceiver = isReceiver;
        this.keyStorePath = keyStorePath;
        this.trustStorePath = trustStorePath;
        this.keyStoreType = keyStoreType;
        this.trustStoreType = trustStoreType;
        this.keyStorePWD = keyStorePWD;
        this.trustStorePWD = trustStorePWD;
        this.username = username;
        this.password = password;
        this.mqttStatusReceiver = mqttStatusReceiver;
        this.separateConnections = separateConnections;
        this.mqttClientLock = new StampedLock();
        this.mqttClient = this.createMqttClient();
        String srvURI = this.mqttClient.getServerURI();
        try {
            URI vURI = new URI(srvURI);
            this.isSecureConnection = vURI.getScheme().equals("ssl");
        }
        catch (URISyntaxException e) {
            logger.error("Failed to read srvURI, error: ", (Throwable)e);
            throw new JoynrIllegalStateException("Fail to parse URI server: " + srvURI + " , error: " + e);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private boolean clientExistsAndIsNotConnected() {
        boolean clientExistsAndIsNotConnected = false;
        long stamp = this.mqttClientLock.readLock();
        try {
            clientExistsAndIsNotConnected = this.mqttClient != null && !this.mqttClient.isConnected();
        }
        finally {
            this.mqttClientLock.unlockRead(stamp);
        }
        return clientExistsAndIsNotConnected;
    }

    /*
     * Enabled aggressive block sorting
     * Enabled unnecessary exception pruning
     * Enabled aggressive exception aggregation
     */
    @Override
    @SuppressFBWarnings(value={"SF_SWITCH_FALLTHROUGH"}, justification="extra error log for TLS errors")
    public void start() {
        block23: while (!this.shutdown.get() && this.clientExistsAndIsNotConnected()) {
            String unableToCreateClientErrorMessage = "Unable to create MqttClient: ";
            long stamp = this.mqttClientLock.readLock();
            try {
                logger.info("Attempting to connect client");
                if (this.mqttClient.isConnected()) {
                    logger.trace("Client connected while waiting for lock. Returning.");
                    return;
                }
                logger.debug("Started MqttPahoClient");
                this.mqttClient.setCallback((MqttCallback)this);
                this.mqttClient.setTimeToWait((long)this.timeToWaitMs);
                this.mqttClient.connect(this.getConnectOptions());
                logger.info("Connected client");
                this.mqttStatusReceiver.notifyConnectionStatusChanged(MqttStatusReceiver.ConnectionStatus.CONNECTED);
                this.reestablishSubscriptions();
            }
            catch (MqttException mqttError) {
                logger.error("Connect failed. Error code {}", (Object)mqttError.getReasonCode(), (Object)mqttError);
                switch (mqttError.getReasonCode()) {
                    case 5: {
                        logger.error("Failed to establish connection because of missing authorization , error: ", (Throwable)mqttError);
                        throw new JoynrIllegalStateException("Unable to create MqttPahoClient: " + (Object)((Object)mqttError));
                    }
                    case 0: {
                        if (this.isSecureConnection) {
                            logger.error("Failed to establish TLS connection, error: ", (Throwable)mqttError);
                        }
                    }
                    case 3: 
                    case 6: 
                    case 128: 
                    case 32000: 
                    case 32001: 
                    case 32002: 
                    case 32102: 
                    case 32103: 
                    case 32104: 
                    case 32109: 
                    case 32110: 
                    case 32202: {
                        this.mqttClientLock.unlockRead(stamp);
                        try {
                            this.reconnect(this.reconnectSleepMs);
                        }
                        catch (Exception e) {
                            stamp = this.mqttClientLock.readLock();
                            logger.error("Unable to create MqttClient: ", (Throwable)e);
                            throw new JoynrIllegalStateException("Unable to create MqttClient: " + e.getMessage(), e);
                        }
                        stamp = this.mqttClientLock.readLock();
                        continue block23;
                    }
                    case 32100: {
                        this.mqttClientLock.unlockRead(stamp);
                        try {
                            this.reconnect();
                        }
                        catch (Exception e) {
                            stamp = this.mqttClientLock.readLock();
                            logger.error("Unable to create MqttClient: ", (Throwable)e);
                            throw new JoynrIllegalStateException("Unable to create MqttClient: " + e.getMessage(), e);
                        }
                        stamp = this.mqttClientLock.readLock();
                        continue block23;
                    }
                }
            }
            catch (JoynrIllegalStateException e) {
                Throwable cause = e.getCause();
                if (!(cause instanceof MqttException)) throw new JoynrIllegalStateException("Unable to start MqttPahoClient: " + e.getMessage(), (Exception)((Object)e));
                this.mqttClientLock.unlockRead(stamp);
                try {
                    this.reconnect(this.reconnectSleepMs);
                }
                catch (MqttException mqttException) {
                    stamp = this.mqttClientLock.readLock();
                    logger.error("Unable to create MqttClient: ", (Throwable)mqttException);
                    throw new JoynrIllegalStateException("Unable to create MqttClient: " + mqttException.getMessage(), (Exception)((Object)mqttException));
                }
                stamp = this.mqttClientLock.readLock();
            }
            catch (Exception e) {
                throw new JoynrIllegalStateException("Unable to start MqttPahoClient: " + e.getMessage(), e);
            }
            finally {
                this.mqttClientLock.unlockRead(stamp);
            }
        }
        logger.info("Leaving start");
    }

    private MqttClient createMqttClient() throws MqttException {
        logger.info("Create Mqtt Client. Address: {}", (Object)this.ownAddress);
        return new MqttClient(this.ownAddress.getBrokerUri(), this.clientId, (MqttClientPersistence)new MemoryPersistence(), this.scheduledExecutorService);
    }

    private void reestablishSubscriptions() {
        logger.debug("Reestablishing {} subscriptions after restart", (Object)this.subscribedTopics.size());
        Set<String> oldSubscribedTopics = this.subscribedTopics;
        this.subscribedTopics = new HashSet<String>();
        for (String topic : oldSubscribedTopics) {
            try {
                this.subscribe(topic);
            }
            catch (Exception e) {
                this.subscribedTopics = oldSubscribedTopics;
                throw e;
            }
        }
    }

    private MqttConnectOptions getConnectOptions() {
        MqttConnectOptions options = new MqttConnectOptions();
        if (this.username != null && !this.username.isEmpty()) {
            if (this.password == null || this.password.isEmpty()) {
                throw new JoynrIllegalStateException("MQTT password not configured or empty");
            }
            options.setUserName(this.username);
            options.setPassword(this.password.toCharArray());
        }
        options.setAutomaticReconnect(false);
        options.setConnectionTimeout(this.connectionTimeoutSec);
        options.setKeepAliveInterval(this.keepAliveTimerSec);
        options.setMaxInflight(this.maxMsgsInflight);
        options.setCleanSession(this.cleanSession);
        if (this.isSecureConnection) {
            Properties sslClientProperties = new Properties();
            sslClientProperties.setProperty("com.ibm.ssl.keyStoreType", this.keyStoreType);
            sslClientProperties.setProperty("com.ibm.ssl.keyStore", this.keyStorePath);
            sslClientProperties.setProperty("com.ibm.ssl.keyStorePassword", this.keyStorePWD);
            sslClientProperties.setProperty("com.ibm.ssl.trustStoreType", this.trustStoreType);
            sslClientProperties.setProperty("com.ibm.ssl.trustStore", this.trustStorePath);
            sslClientProperties.setProperty("com.ibm.ssl.trustStorePassword", this.trustStorePWD);
            options.setSSLProperties(sslClientProperties);
        }
        return options;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void subscribe(String topic) {
        boolean subscribed = false;
        while (!subscribed && !this.shutdown.get()) {
            logger.debug("Checking for subscription to: {}", (Object)topic);
            long stamp = this.mqttClientLock.readLock();
            try {
                Set<String> set = this.subscribedTopics;
                synchronized (set) {
                    if (!this.subscribedTopics.contains(topic)) {
                        logger.info("Attempting to subscribe to: {}", (Object)topic);
                        if (this.mqttClient == null) {
                            throw new MqttException(32104);
                        }
                        this.mqttClient.subscribe(topic);
                        this.subscribedTopics.add(topic);
                        logger.info("Subscribed to: {}", (Object)topic);
                    }
                    subscribed = true;
                }
            }
            catch (MqttException mqttError) {
                logger.debug("Subscribe to {} failed: {}. Error code {}", new Object[]{topic, mqttError.getMessage(), mqttError.getReasonCode(), mqttError});
                switch (mqttError.getReasonCode()) {
                    case 0: 
                    case 3: 
                    case 6: 
                    case 128: 
                    case 32000: 
                    case 32001: 
                    case 32002: 
                    case 32102: 
                    case 32103: 
                    case 32104: 
                    case 32109: 
                    case 32110: 
                    case 32202: {
                        throw new JoynrIllegalStateException("Unexpected exception while subscribing to " + topic + ", error: " + (Object)((Object)mqttError), (Exception)((Object)mqttError));
                    }
                }
                throw new JoynrIllegalStateException("Unexpected exception while subscribing to " + topic + ", error: " + (Object)((Object)mqttError));
            }
            catch (Exception e) {
                throw new JoynrRuntimeException("Unable to start MqttPahoClient", (Throwable)e);
            }
            finally {
                this.mqttClientLock.unlockRead(stamp);
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void unsubscribe(String topic) {
        try {
            Set<String> set = this.subscribedTopics;
            synchronized (set) {
                if (this.subscribedTopics.remove(topic)) {
                    long stamp = this.mqttClientLock.readLock();
                    try {
                        if (this.mqttClient == null) {
                            throw new MqttException(32104);
                        }
                        this.mqttClient.unsubscribe(topic);
                    }
                    finally {
                        this.mqttClientLock.unlockRead(stamp);
                    }
                }
            }
        }
        catch (MqttException e) {
            throw new JoynrRuntimeException("Unable to unsubscribe from " + topic, (Throwable)e);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    @SuppressFBWarnings(value={"NN_NAKED_NOTIFY"}, justification="required to control shutdown of this instance")
    public synchronized void shutdown() {
        if (this.shutdown.get()) {
            return;
        }
        this.shutdown.set(true);
        logger.info("Attempting to shutdown connection.");
        long stamp = this.mqttClientLock.writeLock();
        try {
            if (this.mqttClient != null) {
                this.mqttStatusReceiver.notifyConnectionStatusChanged(MqttStatusReceiver.ConnectionStatus.NOT_CONNECTED);
                boolean forcibly = false;
                MqttClient clientToDisconnect = this.mqttClient;
                this.mqttClient = null;
                this.disconnect(clientToDisconnect, false);
            }
        }
        finally {
            this.mqttClientLock.unlockWrite(stamp);
        }
    }

    @Override
    public void publishMessage(String topic, byte[] serializedMessage) {
        this.publishMessage(topic, serializedMessage, 1);
    }

    @Override
    public void publishMessage(String topic, byte[] serializedMessage, int qosLevel) {
        assert (!this.separateConnections || this.separateConnections && !this.isReceiver);
        if (!this.separateConnections && this.messagingSkeleton == null) {
            throw new JoynrDelayMessageException("MQTT Publish failed: messagingSkeleton has not been set yet");
        }
        if (this.maxMsgSizeBytes != 0 && serializedMessage.length > this.maxMsgSizeBytes) {
            throw new JoynrMessageNotSentException("MQTT Publish failed: maximum allowed message size of " + this.maxMsgSizeBytes + " bytes exceeded, actual size is " + serializedMessage.length + " bytes");
        }
        MqttMessage message = new MqttMessage();
        try {
            message.setPayload(serializedMessage);
            message.setQos(qosLevel);
            message.setRetained(false);
        }
        catch (Exception e) {
            throw new JoynrMessageNotSentException(e.getMessage(), (Throwable)e);
        }
        long stamp = this.mqttClientLock.tryReadLock();
        if (stamp == 0L) {
            throw new JoynrDelayMessageException("Mqtt client not available (not connected)");
        }
        try {
            logger.debug("Publish to: {}", (Object)topic);
            if (this.mqttClient == null) {
                throw new MqttException(32104);
            }
            this.mqttClient.publish(topic, message);
        }
        catch (MqttException e) {
            logger.debug("Publish failed: {}. Error code {}", new Object[]{e.getMessage(), e.getReasonCode(), e});
            switch (e.getReasonCode()) {
                case 0: {
                    Throwable cause = e.getCause();
                    if (cause != null) {
                        throw new JoynrDelayMessageException("MqttException: " + cause.getMessage());
                    }
                    throw new JoynrDelayMessageException("MqttException: " + e.getMessage());
                }
                case 3: 
                case 6: 
                case 32000: 
                case 32001: 
                case 32002: 
                case 32102: 
                case 32103: 
                case 32104: 
                case 32109: 
                case 32110: 
                case 32202: {
                    throw new JoynrDelayMessageException("MqttException: " + e.getMessage());
                }
            }
            throw new JoynrMessageNotSentException(e.getMessage());
        }
        catch (Exception e) {
            throw new JoynrMessageNotSentException(e.getMessage(), (Throwable)e);
        }
        finally {
            this.mqttClientLock.unlockRead(stamp);
        }
        logger.debug("Published message: " + new String(serializedMessage, StandardCharsets.UTF_8));
    }

    public void connectionLost(Throwable error) {
        logger.info("connectionLost: {}", (Object)error.getMessage());
        if (error instanceof MqttException) {
            this.mqttStatusReceiver.notifyConnectionStatusChanged(MqttStatusReceiver.ConnectionStatus.NOT_CONNECTED);
            MqttException mqttError = (MqttException)error;
            int reason = mqttError.getReasonCode();
            switch (reason) {
                case 3: 
                case 6: 
                case 32000: 
                case 32002: 
                case 32104: 
                case 32108: 
                case 32109: {
                    logger.debug("Connection lost, trying to reconnect. Error code {}", (Object)reason);
                    this.attemptDisconnectAndRestart();
                    break;
                }
                case 0: {
                    Throwable cause = mqttError.getCause();
                    String causeMessage = cause != null ? cause.getMessage() : "<not available>";
                    logger.error("Connection lost due to client exception. Cause {}", (Object)causeMessage);
                    this.attemptDisconnectAndRestart();
                    break;
                }
                case 1: 
                case 2: 
                case 4: 
                case 5: 
                case 32105: 
                case 32106: {
                    logger.error("Connection is incorrectly configured. Connection not possible: {}. Error code {}", (Object)mqttError.getMessage(), (Object)reason);
                    this.shutdown();
                    break;
                }
                case 32102: 
                case 32111: {
                    logger.trace("Connection lost due to client shutting down. Error code {}", (Object)reason);
                    break;
                }
                default: {
                    logger.error("Received error reason that should not have been thrown for connection loss: {}. Error code {}", (Object)mqttError.getMessage(), (Object)reason);
                    this.shutdown();
                    break;
                }
            }
        } else {
            logger.error("Connection lost due to unknown error ", error);
            this.shutdown();
        }
    }

    private void reconnect() throws MqttException {
        this.reconnect(0);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void reconnect(int reconnectSleepMs) throws MqttException {
        long stamp = this.mqttClientLock.writeLock();
        MqttClient clientToDisconnect = this.mqttClient;
        this.mqttClient = null;
        this.mqttClientLock.unlockWrite(stamp);
        boolean forcibly = true;
        if (clientToDisconnect != null) {
            this.disconnect(clientToDisconnect, true);
        }
        if (this.shutdown.get()) {
            logger.debug("joynr is shutting down. Will not attempt a reconnect.");
            return;
        }
        if (reconnectSleepMs > 0) {
            logger.info("Waiting {}ms before attempting reconnect.", (Object)reconnectSleepMs);
            try {
                Thread.sleep(reconnectSleepMs);
            }
            catch (InterruptedException e) {
                logger.error("Interrupted while waiting before reconnecting mqtt client.", (Throwable)e);
            }
        }
        stamp = this.mqttClientLock.writeLock();
        try {
            this.mqttClient = this.createMqttClient();
        }
        finally {
            this.mqttClientLock.unlockWrite(stamp);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void disconnect(MqttClient clientToDisconnect, boolean forcibly) {
        block17: {
            logger.info("Attempting to remove callbacks from client.");
            clientToDisconnect.setCallback(null);
            try {
                if (forcibly) {
                    logger.info("Attempting to disconnect client forcibly.");
                    clientToDisconnect.disconnectForcibly(1L, 1L);
                } else {
                    logger.info("Attempting to disconnect client.");
                    clientToDisconnect.disconnect();
                }
                logger.info("Client disconnected.");
            }
            catch (Exception e) {
                if (!forcibly) {
                    logger.trace("Failed to disconnect client. Error ", (Throwable)e);
                    try {
                        if (clientToDisconnect.isConnected()) {
                            logger.info("Attempting to disconnect client forcibly.");
                            clientToDisconnect.disconnectForcibly(1L, 1L);
                            logger.info("Client forcibly disconnected.");
                        }
                        break block17;
                    }
                    catch (Exception disconnectForciblyException) {
                        logger.trace("Failed to disconnect client forcibly. Error ", (Throwable)disconnectForciblyException);
                    }
                    break block17;
                }
                logger.trace("Failed to disconnect client forcibly. Error ", (Throwable)e);
            }
            finally {
                logger.trace("Attempting to close client");
                try {
                    clientToDisconnect.close();
                    logger.info("Client closed.");
                }
                catch (Exception e) {
                    logger.error("Failed to close client. Error ", (Throwable)e);
                }
            }
        }
    }

    private synchronized void attemptDisconnectAndRestart() {
        try {
            this.reconnect();
            this.start();
        }
        catch (Exception e) {
            logger.error("Problem while attempting to close and restart. Error ", (Throwable)e);
        }
    }

    public void deliveryComplete(IMqttDeliveryToken mqttDeliveryToken) {
        logger.debug("Message delivered. id: {}", (Object)mqttDeliveryToken.getMessageId());
    }

    public void messageArrived(String topic, MqttMessage mqttMessage) throws Exception {
        logger.debug("Message received: id {}, topic {}, payload\n{}", new Object[]{mqttMessage.getId(), topic, new String(mqttMessage.getPayload(), StandardCharsets.UTF_8)});
        if (this.messagingSkeleton == null) {
            logger.error("Message not processed: messagingSkeleton has not been set yet");
            return;
        }
        this.messagingSkeleton.transmit(mqttMessage.getPayload(), new FailureAction(){

            public void execute(Throwable error) {
                logger.error("Message not processed.", error);
            }
        });
    }

    @Override
    public void setMessageListener(IMqttMessagingSkeleton messaging) {
        this.messagingSkeleton = messaging;
    }

    @Override
    public boolean isShutdown() {
        return this.shutdown.get();
    }

    MqttClient getMqttClient() {
        return this.mqttClient;
    }
}

