/*
 * Decompiled with CFR 0.152.
 */
package io.joynr.messaging.mqtt.hivemq.client;

import com.hivemq.client.mqtt.MqttClientState;
import com.hivemq.client.mqtt.MqttGlobalPublishFilter;
import com.hivemq.client.mqtt.datatypes.MqttQos;
import com.hivemq.client.mqtt.exceptions.MqttClientStateException;
import com.hivemq.client.mqtt.exceptions.MqttSessionExpiredException;
import com.hivemq.client.mqtt.mqtt5.Mqtt5ClientConfig;
import com.hivemq.client.mqtt.mqtt5.Mqtt5RxClient;
import com.hivemq.client.mqtt.mqtt5.datatypes.Mqtt5UserProperties;
import com.hivemq.client.mqtt.mqtt5.datatypes.Mqtt5UserPropertiesBuilder;
import com.hivemq.client.mqtt.mqtt5.datatypes.Mqtt5UserProperty;
import com.hivemq.client.mqtt.mqtt5.message.connect.Mqtt5Connect;
import com.hivemq.client.mqtt.mqtt5.message.connect.Mqtt5ConnectBuilder;
import com.hivemq.client.mqtt.mqtt5.message.connect.Mqtt5ConnectRestrictionsBuilder;
import com.hivemq.client.mqtt.mqtt5.message.disconnect.Mqtt5DisconnectBuilder;
import com.hivemq.client.mqtt.mqtt5.message.publish.Mqtt5Publish;
import com.hivemq.client.mqtt.mqtt5.message.publish.Mqtt5PublishBuilder;
import com.hivemq.client.mqtt.mqtt5.message.subscribe.Mqtt5Subscribe;
import com.hivemq.client.mqtt.mqtt5.message.subscribe.Mqtt5SubscribeBuilder;
import com.hivemq.client.mqtt.mqtt5.message.subscribe.Mqtt5Subscription;
import com.hivemq.client.mqtt.mqtt5.message.subscribe.Mqtt5SubscriptionBuilder;
import com.hivemq.client.mqtt.mqtt5.message.unsubscribe.Mqtt5Unsubscribe;
import com.hivemq.client.mqtt.mqtt5.message.unsubscribe.Mqtt5UnsubscribeBuilder;
import io.joynr.exceptions.JoynrDelayMessageException;
import io.joynr.exceptions.JoynrMessageExpiredException;
import io.joynr.exceptions.JoynrMessageNotSentException;
import io.joynr.messaging.FailureAction;
import io.joynr.messaging.SuccessAction;
import io.joynr.messaging.mqtt.IMqttMessagingSkeleton;
import io.joynr.messaging.mqtt.JoynrMqttClient;
import io.joynr.statusmetrics.ConnectionStatusMetricsImpl;
import io.reactivex.Completable;
import io.reactivex.disposables.Disposable;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class HivemqMqttClient
implements JoynrMqttClient {
    private static final Logger logger = LoggerFactory.getLogger(HivemqMqttClient.class);
    private static final long NOT_CONNECTED_RETRY_INTERVAL_MS = 5000L;
    private final Mqtt5RxClient client;
    private final Mqtt5ClientConfig clientConfig;
    private int maxMsgSizeBytes = 0x10000004;
    private final boolean cleanSession;
    private final int keepAliveTimeSeconds;
    private final int connectionTimeoutSec;
    private final int reconnectDelayMs;
    private final int receiveMaximum;
    private final boolean isReceiver;
    private final boolean isSender;
    private final String clientInformation;
    private AtomicBoolean shuttingDown = new AtomicBoolean(true);
    private boolean connected = false;
    private boolean retain;
    private IMqttMessagingSkeleton messagingSkeleton;
    private ConnectionStatusMetricsImpl connectionStatusMetrics;
    private Disposable publishesDisposable;
    private HashMap<String, Disposable> subscriptionDisposables = new HashMap();
    private AtomicLong obsoleteUnsubscribeDisposableCount = new AtomicLong(0L);
    private List<Disposable> unsubscribeDisposables = new ArrayList<Disposable>();
    private Map<String, Mqtt5Subscription> subscriptions = new HashMap<String, Mqtt5Subscription>();

    public HivemqMqttClient(Mqtt5RxClient client, int keepAliveTimeSeconds, boolean cleanSession, int connectionTimeoutSec, int reconnectDelayMs, int receiveMaximum, boolean isReceiver, boolean isSender, boolean retain, String gbid, ConnectionStatusMetricsImpl connectionStatusMetrics) {
        this.client = client;
        this.clientConfig = client.getConfig();
        this.keepAliveTimeSeconds = keepAliveTimeSeconds;
        this.cleanSession = cleanSession;
        this.connectionTimeoutSec = connectionTimeoutSec;
        this.reconnectDelayMs = reconnectDelayMs;
        this.receiveMaximum = receiveMaximum;
        this.isReceiver = isReceiver;
        this.isSender = isSender;
        this.retain = retain;
        this.clientInformation = this.createClientInformationString(gbid);
        this.connectionStatusMetrics = connectionStatusMetrics;
        this.publishesDisposable = null;
    }

    protected void registerPublishCallback() {
        if (!this.isReceiver || this.publishesDisposable != null) {
            return;
        }
        this.publishesDisposable = this.client.publishes(MqttGlobalPublishFilter.ALL, true).subscribe(this::handleIncomingMessage, throwable -> {
            if (!this.cleanSession && throwable instanceof MqttSessionExpiredException) {
                logger.warn("{}: MqttSessionExpiredException encountered in publish callback, trying to resubscribe.", (Object)this.clientInformation, throwable);
            } else {
                logger.error("{}: Error encountered in publish callback, trying to resubscribe.", (Object)this.clientInformation, throwable);
            }
            HivemqMqttClient hivemqMqttClient = this;
            synchronized (hivemqMqttClient) {
                if (this.publishesDisposable != null) {
                    this.publishesDisposable.dispose();
                    this.publishesDisposable = null;
                }
                this.registerPublishCallback();
            }
        });
    }

    String getClientInformationString() {
        return this.clientInformation;
    }

    private String createClientInformationString(String gbid) {
        StringBuilder clientIdBuilder = new StringBuilder();
        clientIdBuilder.append("(clientHash=");
        clientIdBuilder.append(Integer.toHexString(this.client.hashCode()));
        clientIdBuilder.append(", GBID=");
        clientIdBuilder.append(gbid);
        clientIdBuilder.append(", ");
        if (this.isReceiver && this.isSender) {
            clientIdBuilder.append("bidirectional");
        } else if (this.isReceiver) {
            clientIdBuilder.append("receiver");
        } else {
            clientIdBuilder.append("sender");
        }
        clientIdBuilder.append(")");
        return clientIdBuilder.toString();
    }

    public synchronized void connect() {
        if (this.shuttingDown.get()) {
            logger.error("{}: Client not started.", (Object)this.clientInformation);
            return;
        }
        if (!this.client.getConfig().getState().isConnected()) {
            while (!this.client.getConfig().getState().isConnected()) {
                logger.info("{}: Attempting to connect client, clean session={} ...", (Object)this.clientInformation, (Object)this.cleanSession);
                Mqtt5Connect mqtt5Connect = ((Mqtt5ConnectBuilder)((Mqtt5ConnectBuilder)((Mqtt5ConnectBuilder)((Mqtt5ConnectBuilder)((Mqtt5ConnectRestrictionsBuilder.Nested)Mqtt5Connect.builder().restrictions().receiveMaximum(this.receiveMaximum)).applyRestrictions()).cleanStart(this.cleanSession)).keepAlive(this.keepAliveTimeSeconds)).noSessionExpiry()).build();
                try {
                    this.connectionStatusMetrics.increaseConnectionAttempts();
                    this.client.connect(mqtt5Connect).timeout((long)this.connectionTimeoutSec, TimeUnit.SECONDS).doOnSuccess(connAck -> {
                        this.maxMsgSizeBytes = connAck.getRestrictions().getMaximumPacketSize();
                        logger.info("{}: MQTT client connected: {}, maxMsgSizeBytes = {}.", new Object[]{this.clientInformation, connAck, this.maxMsgSizeBytes});
                    }).blockingGet();
                    this.connected = true;
                }
                catch (Exception e) {
                    logger.error("{}: Exception encountered while connecting MQTT client.", (Object)this.clientInformation, (Object)e);
                    do {
                        try {
                            logger.debug("{}: Waiting to reconnect, state: {}.", (Object)this.clientInformation, (Object)this.client.getConfig().getState());
                            this.wait(this.reconnectDelayMs);
                        }
                        catch (InterruptedException exception) {
                            logger.error("{}: Exception while waiting to reconnect.", (Object)this.clientInformation, (Object)exception);
                            Thread.currentThread().interrupt();
                            return;
                        }
                    } while (this.client.getConfig().getState() == MqttClientState.CONNECTING || this.client.getConfig().getState() == MqttClientState.CONNECTING_RECONNECT || this.client.getConfig().getState() == MqttClientState.DISCONNECTED_RECONNECT);
                    logger.debug("{}: Leaving reconnect loop, state: {}.", (Object)this.clientInformation, (Object)this.client.getConfig().getState());
                }
            }
        } else {
            logger.info("{}: MQTT client already connected - skipping.", (Object)this.clientInformation);
        }
    }

    public synchronized void disconnect() {
        if (!this.connected) {
            return;
        }
        this.connected = false;
        try {
            logger.info("{}: Attempting to disconnect.", (Object)this.clientInformation);
            ((Completable)((Mqtt5DisconnectBuilder.Nested)this.client.disconnectWith().noSessionExpiry()).applyDisconnect()).doOnComplete(() -> logger.info("{}: Disconnected.", (Object)this.clientInformation)).onErrorComplete(throwable -> {
                logger.error("{}: Error encountered from disconnect.", (Object)this.clientInformation, throwable);
                return true;
            }).blockingAwait(5000L, TimeUnit.MILLISECONDS);
        }
        catch (Exception e) {
            logger.error("{}: Exception thrown on disconnect.", (Object)this.clientInformation, (Object)e);
        }
    }

    public synchronized void start() {
        if (!this.shuttingDown.getAndSet(false)) {
            logger.warn("{}: Client already started.", (Object)this.clientInformation);
            return;
        }
        logger.info("{}: Initializing HiveMQ MQTT client for address {}.", (Object)this.clientInformation, (Object)this.client.getConfig().getServerAddress());
        assert (!this.isReceiver || this.messagingSkeleton != null);
        this.registerPublishCallback();
    }

    public void setMessageListener(IMqttMessagingSkeleton rawMessaging) {
        if (this.isReceiver && this.messagingSkeleton == null) {
            this.messagingSkeleton = rawMessaging;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public synchronized void shutdown() {
        if (this.shuttingDown.getAndSet(true)) {
            logger.warn("{}: Client already shutdown.", (Object)this.clientInformation);
            return;
        }
        this.disconnect();
        logger.debug("{}: Shutdown.", (Object)this.clientInformation);
        if (this.publishesDisposable != null) {
            this.publishesDisposable.dispose();
            this.publishesDisposable = null;
        }
        Map<String, Mqtt5Subscription> map = this.subscriptions;
        synchronized (map) {
            this.disposeSubscriptions();
            for (Disposable unsubscribeDisposable : this.unsubscribeDisposables) {
                unsubscribeDisposable.dispose();
            }
            this.obsoleteUnsubscribeDisposableCount.set(0L);
        }
    }

    private void disposeSubscriptions() {
        for (Disposable subscriptionDisposable : this.subscriptionDisposables.values()) {
            subscriptionDisposable.dispose();
        }
        this.subscriptionDisposables.clear();
    }

    public void publishMessage(String topic, byte[] serializedMessage, Map<String, String> prefixedCustomHeaders, int qosLevel, long messageExpiryIntervalSec, SuccessAction successAction, FailureAction failureAction) {
        assert (this.isSender);
        if (prefixedCustomHeaders == null) {
            throw new JoynrMessageNotSentException("prefixedCustomHeaders must not be null");
        }
        if (this.maxMsgSizeBytes != 0 && serializedMessage.length > this.maxMsgSizeBytes) {
            throw new JoynrMessageNotSentException("Publish failed: maximum allowed message size of " + this.maxMsgSizeBytes + " bytes exceeded, actual size is " + serializedMessage.length + " bytes");
        }
        if (!this.clientConfig.getState().isConnected()) {
            failureAction.execute((Throwable)new JoynrDelayMessageException(5000L, "Publish failed: Mqtt client not connected."));
            return;
        }
        Mqtt5UserPropertiesBuilder mqtt5UserPropertiesBuilder = Mqtt5UserProperties.builder();
        for (Map.Entry<String, String> entry : prefixedCustomHeaders.entrySet()) {
            if (entry.getKey().isEmpty() || entry.getValue().isEmpty()) {
                logger.trace("{}: Did not add MQTT empty user property {} / {}", new Object[]{this.clientInformation, entry.getKey(), entry.getValue()});
                continue;
            }
            mqtt5UserPropertiesBuilder.add(entry.getKey(), entry.getValue());
        }
        Mqtt5UserProperties mqtt5UserProperties = mqtt5UserPropertiesBuilder.build();
        Mqtt5Publish mqtt5Publish = ((Mqtt5PublishBuilder.Complete)((Mqtt5PublishBuilder.Complete)((Mqtt5PublishBuilder.Complete)((Mqtt5PublishBuilder.Complete)((Mqtt5PublishBuilder.Complete)((Mqtt5PublishBuilder.Complete)Mqtt5Publish.builder().topic(topic)).qos(this.safeParseQos(qosLevel))).payload(serializedMessage)).messageExpiryInterval(messageExpiryIntervalSec)).retain(this.retain)).userProperties(mqtt5UserProperties)).build();
        logger.debug("{}: Publishing to topic: {}, size: {}, qos: {}", new Object[]{this.clientInformation, topic, serializedMessage.length, qosLevel});
        this.client.toAsync().publish(mqtt5Publish).whenComplete((publishResult, throwable) -> {
            if (throwable != null) {
                logger.error("{}: Publishing to topic: {}, size: {}, qos: {} failed with exception.", new Object[]{this.clientInformation, topic, serializedMessage.length, qosLevel, throwable});
                if (throwable instanceof MqttClientStateException) {
                    failureAction.execute((Throwable)new JoynrDelayMessageException(5000L, "Publish failed: " + throwable.toString()));
                } else {
                    failureAction.execute((Throwable)new JoynrDelayMessageException("Publish failed: " + throwable.toString()));
                }
            } else if (publishResult.getError().isPresent()) {
                logger.error("{}: Publishing to topic: {}, size: {}, qos: {} failed with error result: {}", new Object[]{this.clientInformation, topic, serializedMessage.length, qosLevel, publishResult, publishResult.getError().get()});
                failureAction.execute((Throwable)new JoynrDelayMessageException("Publish failed: " + ((Throwable)publishResult.getError().get()).toString()));
            } else {
                this.connectionStatusMetrics.increaseSentMessages();
                if (logger.isTraceEnabled()) {
                    logger.trace("{}: Publishing to topic: {}, size: {}, qos: {} succeeded: {}", new Object[]{this.clientInformation, topic, serializedMessage.length, qosLevel, publishResult});
                } else {
                    logger.debug("{}: Publishing to topic: {}, size: {}, qos: {}, retain: {} succeeded.", new Object[]{this.clientInformation, topic, serializedMessage.length, qosLevel, this.retain});
                }
                successAction.execute();
            }
        });
    }

    private MqttQos safeParseQos(int qosLevel) {
        MqttQos result = MqttQos.fromCode((int)qosLevel);
        if (result == null) {
            result = MqttQos.AT_LEAST_ONCE;
            logger.error("{}: Got invalid QoS level {} for publish, using default: {}", new Object[]{this.clientInformation, qosLevel, result.getCode()});
        }
        return result;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void subscribe(String topic) {
        assert (this.isReceiver);
        Map<String, Mqtt5Subscription> map = this.subscriptions;
        synchronized (map) {
            logger.info("{}: Subscribing to topic: {}", (Object)this.clientInformation, (Object)topic);
            Mqtt5Subscription subscription = this.subscriptions.computeIfAbsent(topic, t -> ((Mqtt5SubscriptionBuilder.Complete)((Mqtt5SubscriptionBuilder.Complete)Mqtt5Subscription.builder().topicFilter(t)).qos(MqttQos.AT_LEAST_ONCE)).build());
            if (this.shuttingDown.get()) {
                return;
            }
            this.doSubscribe(subscription, topic);
        }
    }

    private void doSubscribe(Mqtt5Subscription subscription, String topic) {
        Mqtt5Subscribe subscribe = ((Mqtt5SubscribeBuilder.Complete)Mqtt5Subscribe.builder().addSubscription(subscription)).build();
        Disposable disposable = this.client.subscribeStream(subscribe).doOnSingle(mqtt5SubAck -> logger.debug("{}: Subscribed to topic: {}, result: {}", new Object[]{this.clientInformation, subscription, mqtt5SubAck})).subscribe(mqtt5Publish -> {}, throwable -> logger.error("{}: Error encountered for subscription {}.", new Object[]{this.clientInformation, subscription, throwable}));
        this.subscriptionDisposables.put(topic, disposable);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void resubscribe() {
        logger.debug("{}: Resubscribe triggered.", (Object)this.clientInformation);
        Map<String, Mqtt5Subscription> map = this.subscriptions;
        synchronized (map) {
            this.disposeSubscriptions();
            this.subscriptions.forEach((topic, subscription) -> {
                logger.info("{}: Resubscribing to topic: {}", (Object)this.clientInformation, topic);
                this.doSubscribe((Mqtt5Subscription)subscription, (String)topic);
            });
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void unsubscribe(String topic) {
        assert (this.isReceiver);
        Mqtt5Unsubscribe unsubscribe = ((Mqtt5UnsubscribeBuilder.Complete)Mqtt5Unsubscribe.builder().addTopicFilter(topic)).build();
        Map<String, Mqtt5Subscription> map = this.subscriptions;
        synchronized (map) {
            logger.info("{}: Unsubscribing from topic: {}", (Object)this.clientInformation, (Object)topic);
            this.subscriptions.remove(topic);
            AtomicBoolean callbackCalled = new AtomicBoolean(false);
            Disposable unsubscribeDisposable = this.client.unsubscribe(unsubscribe).subscribe(unused -> {
                logger.debug("{}: Unsubscribed from topic: {}", (Object)this.clientInformation, (Object)topic);
                if (callbackCalled.compareAndSet(false, true)) {
                    this.obsoleteUnsubscribeDisposableCount.getAndIncrement();
                }
            }, throwable -> {
                logger.error("{}: Unable to unsubscribe from topic: {}", new Object[]{this.clientInformation, topic, throwable});
                if (callbackCalled.compareAndSet(false, true)) {
                    this.obsoleteUnsubscribeDisposableCount.getAndIncrement();
                }
            });
            this.unsubscribeDisposables.add(unsubscribeDisposable);
            Disposable disposable = this.subscriptionDisposables.remove(topic);
            if (disposable != null) {
                disposable.dispose();
            }
            while (this.obsoleteUnsubscribeDisposableCount.get() > 0L) {
                unsubscribeDisposable = this.unsubscribeDisposables.remove(0);
                if (unsubscribeDisposable != null) {
                    unsubscribeDisposable.dispose();
                }
                this.obsoleteUnsubscribeDisposableCount.decrementAndGet();
            }
        }
    }

    public synchronized boolean isShutdown() {
        return this.shuttingDown.get();
    }

    private void handleIncomingMessage(Mqtt5Publish mqtt5Publish) {
        if (logger.isDebugEnabled()) {
            ByteBuffer payload = mqtt5Publish.getPayload().orElse(null);
            logger.debug("{}: Received publication: topic: {}, size: {}, qos: {}, retain: {}, expiryInterval: {}.", new Object[]{this.clientInformation, mqtt5Publish.getTopic(), payload == null ? 0 : payload.remaining(), mqtt5Publish.getQos(), mqtt5Publish.isRetain(), mqtt5Publish.getMessageExpiryInterval().orElse(0L)});
        }
        this.connectionStatusMetrics.increaseReceivedMessages();
        Mqtt5UserProperties mqtt5UserProperties = mqtt5Publish.getUserProperties();
        List mqtt5UserPropertiesList = mqtt5UserProperties.asList();
        HashMap<String, String> prefixedCustomHeaders = new HashMap<String, String>();
        for (Mqtt5UserProperty entry : mqtt5UserPropertiesList) {
            if (!entry.getName().toString().startsWith("c-")) continue;
            prefixedCustomHeaders.put(entry.getName().toString(), entry.getValue().toString());
        }
        this.messagingSkeleton.transmit(mqtt5Publish, prefixedCustomHeaders, throwable -> {
            if (throwable instanceof JoynrMessageExpiredException) {
                logger.warn("{}: Unable to handle incoming {}", new Object[]{this.clientInformation, mqtt5Publish, throwable});
            } else {
                logger.error("{}: Unable to handle incoming {}", new Object[]{this.clientInformation, mqtt5Publish, throwable});
            }
        });
    }

    Mqtt5RxClient getClient() {
        return this.client;
    }
}

