/*
 * Decompiled with CFR 0.152.
 */
package io.smallrye.reactive.messaging.mqtt.session.impl;

import io.netty.handler.codec.mqtt.MqttQoS;
import io.smallrye.reactive.messaging.mqtt.session.MqttClientSession;
import io.smallrye.reactive.messaging.mqtt.session.MqttClientSessionOptions;
import io.smallrye.reactive.messaging.mqtt.session.ReconnectDelayProvider;
import io.smallrye.reactive.messaging.mqtt.session.RequestedQoS;
import io.smallrye.reactive.messaging.mqtt.session.SessionEvent;
import io.smallrye.reactive.messaging.mqtt.session.SessionState;
import io.smallrye.reactive.messaging.mqtt.session.SubscriptionEvent;
import io.smallrye.reactive.messaging.mqtt.session.SubscriptionState;
import io.smallrye.reactive.messaging.mqtt.session.impl.SessionEventImpl;
import io.smallrye.reactive.messaging.mqtt.session.impl.SubscriptionEventImpl;
import io.vertx.core.AsyncResult;
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.Promise;
import io.vertx.core.Vertx;
import io.vertx.core.VertxException;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.impl.VertxInternal;
import io.vertx.core.impl.logging.Logger;
import io.vertx.core.impl.logging.LoggerFactory;
import io.vertx.mqtt.MqttClient;
import io.vertx.mqtt.MqttClientOptions;
import io.vertx.mqtt.messages.MqttConnAckMessage;
import io.vertx.mqtt.messages.MqttPublishMessage;
import io.vertx.mqtt.messages.MqttSubAckMessage;
import java.time.Duration;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;

public class MqttClientSessionImpl
implements MqttClientSession {
    private static final Logger log = LoggerFactory.getLogger(MqttClientSessionImpl.class);
    private final VertxInternal vertx;
    private final MqttClientSessionOptions options;
    private final Map<String, RequestedQoS> subscriptions = new HashMap<String, RequestedQoS>();
    private final Map<Integer, LinkedHashMap<String, RequestedQoS>> pendingSubscribes = new HashMap<Integer, LinkedHashMap<String, RequestedQoS>>();
    private final Map<Integer, List<String>> pendingUnsubscribes = new HashMap<Integer, List<String>>();
    private final ReconnectDelayProvider reconnectDelay;
    private volatile SessionState state = SessionState.DISCONNECTED;
    private volatile boolean running;
    private final Map<String, SubscriptionState> subscriptionStates = new ConcurrentHashMap<String, SubscriptionState>();
    private MqttClient client;
    private Long reconnectTimer;
    private volatile Handler<MqttPublishMessage> messageHandler;
    private volatile Handler<Throwable> exceptionHandler;
    private volatile Handler<SessionEvent> sessionStateHandler;
    private volatile Handler<SubscriptionEvent> subscriptionStateHandler;
    private volatile Handler<Integer> publishCompleteHandler;
    private volatile Handler<Integer> publishCompletionExpirationHandler;
    private volatile Handler<Integer> publishCompletionUnknownPacketIdHandler;
    private final List<Handler<AsyncResult<Void>>> notifyConnected = new LinkedList<Handler<AsyncResult<Void>>>();
    private final List<Handler<AsyncResult<Void>>> notifyStopped = new LinkedList<Handler<AsyncResult<Void>>>();
    private final Map<String, List<Handler<AsyncResult<Integer>>>> notifySubscribed = new HashMap<String, List<Handler<AsyncResult<Integer>>>>();
    private final Map<String, List<Handler<AsyncResult<Void>>>> notifyUnsubscribed = new HashMap<String, List<Handler<AsyncResult<Void>>>>();

    public MqttClientSessionImpl(Vertx vertx, MqttClientSessionOptions options) {
        this.vertx = (VertxInternal)vertx;
        this.options = options;
        this.reconnectDelay = options.getReconnectDelay().createProvider();
    }

    @Override
    public Future<Void> start() {
        Promise promise = Promise.promise();
        this.vertx.runOnContext(x -> this.doStart((Handler<AsyncResult<Void>>)promise));
        return promise.future();
    }

    @Override
    public Future<Void> stop() {
        Promise promise = Promise.promise();
        this.vertx.runOnContext(x -> this.doStop((Handler<AsyncResult<Void>>)promise));
        return promise.future();
    }

    @Override
    public SessionState getState() {
        return this.state;
    }

    @Override
    public SubscriptionState getSubscriptionState(String topicFilter) {
        return this.subscriptionStates.get(topicFilter);
    }

    @Override
    public Future<Integer> subscribe(String topic, RequestedQoS qos) {
        Promise result = Promise.promise();
        this.vertx.runOnContext(x -> this.doSubscribe(topic, qos, (Handler<AsyncResult<Integer>>)result));
        return result.future();
    }

    @Override
    public Future<Void> unsubscribe(String topic) {
        Promise result = Promise.promise();
        this.vertx.runOnContext(x -> this.doUnsubscribe(topic, (Handler<AsyncResult<Void>>)result));
        return result.future();
    }

    private void doStart(Handler<AsyncResult<Void>> handler) {
        if (this.running) {
            if (handler != null) {
                if (this.state == SessionState.CONNECTED) {
                    handler.handle((Object)Future.succeededFuture());
                } else {
                    this.notifyConnected.add(handler);
                }
            }
            return;
        }
        if (handler != null) {
            this.notifyConnected.add(handler);
        }
        this.reconnectDelay.reset();
        this.running = true;
        switch (this.state) {
            case DISCONNECTED: {
                this.createConnection();
                break;
            }
            case CONNECTING: {
                break;
            }
            case CONNECTED: {
                break;
            }
        }
    }

    private void doStop(Handler<AsyncResult<Void>> handler) {
        if (!this.running) {
            if (handler != null) {
                if (this.state == SessionState.DISCONNECTED) {
                    handler.handle((Object)Future.succeededFuture());
                } else {
                    this.notifyStopped.add(handler);
                }
            }
            return;
        }
        if (handler != null) {
            this.notifyStopped.add(handler);
        }
        this.running = false;
        if (this.reconnectTimer != null) {
            this.vertx.cancelTimer(this.reconnectTimer.longValue());
        }
        switch (this.state) {
            case CONNECTED: {
                this.closeConnection((Throwable)new VertxException("Stop requested"));
                break;
            }
            case DISCONNECTED: {
                break;
            }
            case DISCONNECTING: {
                break;
            }
        }
    }

    @Override
    public MqttClientSession exceptionHandler(Handler<Throwable> exceptionHandler) {
        this.exceptionHandler = exceptionHandler;
        return this;
    }

    @Override
    public MqttClientSession sessionStateHandler(Handler<SessionEvent> sessionStateHandler) {
        this.sessionStateHandler = sessionStateHandler;
        return this;
    }

    @Override
    public MqttClientSession subscriptionStateHandler(Handler<SubscriptionEvent> subscriptionStateHandler) {
        this.subscriptionStateHandler = subscriptionStateHandler;
        return this;
    }

    @Override
    public MqttClientSession publishCompletionHandler(Handler<Integer> publishCompleteHandler) {
        this.publishCompleteHandler = publishCompleteHandler;
        return this;
    }

    @Override
    public MqttClientSession publishCompletionExpirationHandler(Handler<Integer> publishCompletionExpirationHandler) {
        this.publishCompletionExpirationHandler = publishCompletionExpirationHandler;
        return this;
    }

    @Override
    public MqttClientSession publishCompletionUnknownPacketIdHandler(Handler<Integer> publishCompletionUnknownPacketIdHandler) {
        this.publishCompletionUnknownPacketIdHandler = publishCompletionUnknownPacketIdHandler;
        return this;
    }

    @Override
    public MqttClientSession messageHandler(Handler<MqttPublishMessage> messageHandler) {
        this.messageHandler = messageHandler;
        return this;
    }

    private void setState(SessionState sessionState, Throwable cause) {
        if (log.isDebugEnabled()) {
            log.debug((Object)String.format("setState - current: %s, next: %s", new Object[]{this.state, sessionState}), cause);
        }
        switch (sessionState) {
            case CONNECTING: {
                break;
            }
            case CONNECTED: {
                this.reconnectDelay.reset();
                break;
            }
            case DISCONNECTING: {
                break;
            }
            case DISCONNECTED: {
                this.pendingUnsubscribes.clear();
                this.pendingSubscribes.clear();
                for (String string : this.subscriptions.keySet()) {
                    this.notifySubscriptionState(string, SubscriptionState.UNSUBSCRIBED, null);
                }
                break;
            }
        }
        if (this.state != sessionState) {
            this.state = sessionState;
            Handler<SessionEvent> handler = this.sessionStateHandler;
            if (handler != null) {
                handler.handle((Object)new SessionEventImpl(sessionState, cause));
            }
        }
        switch (this.state) {
            case CONNECTING: {
                break;
            }
            case CONNECTED: {
                if (!this.running) {
                    this.closeConnection((Throwable)null);
                    break;
                }
                for (Handler handler : this.notifyConnected) {
                    handler.handle((Object)Future.succeededFuture());
                }
                this.notifyConnected.clear();
                break;
            }
            case DISCONNECTING: {
                break;
            }
            case DISCONNECTED: {
                if (this.running) {
                    this.scheduleReconnect();
                    break;
                }
                for (Handler handler : this.notifyConnected) {
                    handler.handle((Object)Future.failedFuture((String)"Session stopped"));
                }
                this.notifyConnected.clear();
                for (Handler handler : this.notifyStopped) {
                    handler.handle((Object)Future.succeededFuture());
                }
                this.notifyStopped.clear();
            }
        }
    }

    private void notifySubscriptionState(String topic, SubscriptionState state, Integer grantedQoS) {
        List<Object> handlers;
        if (log.isDebugEnabled()) {
            log.debug((Object)String.format("notifySubscriptionState - topic: %s, state: %s, grantedQoS: %s", new Object[]{topic, state, grantedQoS}));
        }
        this.subscriptionStates.put(topic, state);
        Handler<SubscriptionEvent> handler = this.subscriptionStateHandler;
        if (handler != null) {
            handler.handle((Object)new SubscriptionEventImpl(topic, state, grantedQoS));
        }
        if ((state == SubscriptionState.SUBSCRIBED || state == SubscriptionState.FAILED) && (handlers = this.notifySubscribed.remove(topic)) != null) {
            for (Handler<AsyncResult<Integer>> handler2 : handlers) {
                if (grantedQoS != null) {
                    handler2.handle((Object)Future.succeededFuture((Object)grantedQoS));
                    continue;
                }
                handler2.handle((Object)Future.failedFuture((String)"Unable to subscribe"));
            }
        }
        if (state == SubscriptionState.UNSUBSCRIBED && (handlers = this.notifyUnsubscribed.remove(topic)) != null) {
            for (Handler handler3 : handlers) {
                if (grantedQoS != null) {
                    handler3.handle((Object)Future.succeededFuture());
                    continue;
                }
                handler3.handle((Object)Future.failedFuture((String)"Unable to subscribe"));
            }
        }
    }

    private void scheduleReconnect() {
        log.debug((Object)"Scheduling reconnect");
        if (this.reconnectTimer == null) {
            Duration delay = this.nextDelay();
            if (log.isDebugEnabled()) {
                log.debug((Object)("Next delay: " + delay));
            }
            long timer = this.vertx.setTimer(delay.toMillis(), x -> this.createConnection());
            if (log.isDebugEnabled()) {
                log.debug((Object)("Timer set: " + timer));
            }
            this.reconnectTimer = timer;
        }
    }

    private Duration nextDelay() {
        return this.reconnectDelay.nextDelay();
    }

    private void createConnection() {
        log.debug((Object)"Creating connection");
        this.reconnectTimer = null;
        this.client = MqttClient.create((Vertx)this.vertx, (MqttClientOptions)this.options);
        this.client.exceptionHandler(this::exceptionCaught);
        this.client.closeHandler(x -> this.connectionClosed());
        this.client.publishHandler(this::serverPublished);
        this.client.subscribeCompletionHandler(this::subscribeCompleted);
        this.client.unsubscribeCompletionHandler(this::unsubscribeCompleted);
        this.client.publishCompletionHandler(this::publishComplete);
        this.client.publishCompletionExpirationHandler(this::publishExpired);
        this.client.publishCompletionUnknownPacketIdHandler(this::publishCompletionUnknown);
        this.setState(SessionState.CONNECTING, null);
        this.client.connect(this.options.getPort(), this.options.getHostname(), this.options.getServerName().orElse(this.options.getHostname()), this::connectCompleted);
    }

    private void exceptionCaught(Throwable cause) {
        log.debug((Object)"Caught exception", cause);
        this.closeConnection(cause);
        Handler<Throwable> exceptionHandler = this.exceptionHandler;
        if (exceptionHandler != null) {
            exceptionHandler.handle((Object)cause);
        }
    }

    private void closeConnection(Throwable cause) {
        log.debug((Object)"Closing connection", cause);
        this.setState(SessionState.DISCONNECTING, cause);
        this.client.disconnect().onComplete(this::disconnectCompleted);
    }

    private void connectCompleted(AsyncResult<MqttConnAckMessage> result) {
        if (log.isDebugEnabled()) {
            log.debug((Object)String.format("Connect completed - result: %s, cause: %s", result.result(), result.cause()));
        }
        if (result.failed() || result.result() == null) {
            this.setState(SessionState.DISCONNECTED, result.cause());
            return;
        }
        MqttConnAckMessage ack = (MqttConnAckMessage)result.result();
        this.setState(SessionState.CONNECTED, null);
        if (log.isDebugEnabled()) {
            log.debug((Object)String.format("Subscriptions: %s, cleanSession: %s, sessionPresent: %s", !this.subscriptions.isEmpty(), this.options.isCleanSession(), ack.isSessionPresent()));
        }
        if (!(this.subscriptions.isEmpty() || !this.options.isCleanSession() && ack.isSessionPresent())) {
            if (log.isDebugEnabled()) {
                log.debug((Object)("Re-subscribe to: " + this.subscriptions));
            }
            this.requestSubscribe(new LinkedHashMap<String, RequestedQoS>(this.subscriptions));
        }
    }

    private void disconnectCompleted(AsyncResult<?> result) {
        if (log.isDebugEnabled()) {
            log.debug((Object)String.format("Disconnect completed - result: %s, cause: %s", result.result(), result.cause()));
        }
        this.connectionClosed(result.cause());
    }

    private void closeConnection(String reason) {
        this.closeConnection(new VertxException(reason).fillInStackTrace());
    }

    private void connectionClosed() {
        if (this.state != SessionState.DISCONNECTING) {
            this.connectionClosed((Throwable)new VertxException("Connection closed"));
        }
    }

    private void connectionClosed(Throwable cause) {
        if (log.isDebugEnabled()) {
            log.debug((Object)"Connection closed", cause);
        } else {
            log.info((Object)("Connection closed: " + (cause != null ? cause.getMessage() : "<unknown>")));
        }
        if (this.client != null) {
            this.client.exceptionHandler(null);
            this.client.publishHandler(null);
            this.client.closeHandler(null);
            this.client.subscribeCompletionHandler(null);
            this.client.publishCompletionHandler(null);
            this.client.publishCompletionExpirationHandler(null);
            this.client.publishCompletionUnknownPacketIdHandler(null);
            this.client = null;
        }
        this.setState(SessionState.DISCONNECTED, cause);
    }

    private void serverPublished(MqttPublishMessage message) {
        Handler<MqttPublishMessage> publishHandler;
        if (log.isDebugEnabled()) {
            log.debug((Object)("Server published: " + message));
        }
        if ((publishHandler = this.messageHandler) != null) {
            publishHandler.handle((Object)message);
        }
    }

    private void doSubscribe(String topic, RequestedQoS qos, Handler<AsyncResult<Integer>> handler) {
        RequestedQoS current;
        if (log.isDebugEnabled()) {
            log.debug((Object)String.format("Request to subscribe to: %s / %s", new Object[]{topic, qos}));
        }
        if ((current = this.subscriptions.get(topic)) != null) {
            if (log.isDebugEnabled()) {
                log.debug((Object)("Already subscribed with: " + current));
            }
            if (handler != null) {
                handler.handle((Object)Future.succeededFuture((Object)current.toInteger()));
            }
            return;
        }
        this.subscriptions.put(topic, qos);
        if (handler != null) {
            this.notifySubscribed.computeIfAbsent(topic, x -> new LinkedList()).add(handler);
        }
        if (log.isDebugEnabled()) {
            log.debug((Object)String.format("Requesting subscribe: %s / %s", new Object[]{topic, qos}));
        }
        this.requestSubscribe(new LinkedHashMap<String, RequestedQoS>(Collections.singletonMap(topic, qos)));
    }

    private void doUnsubscribe(String topic, Handler<AsyncResult<Void>> handler) {
        if (this.subscriptions.remove(topic) == null) {
            handler.handle((Object)Future.succeededFuture());
            return;
        }
        if (handler != null) {
            this.notifyUnsubscribed.computeIfAbsent(topic, x -> new LinkedList()).add(handler);
        }
        if (log.isDebugEnabled()) {
            log.debug((Object)("Requesting unsubscribe: " + topic));
        }
        this.requestUnsubscribe(Collections.singletonList(topic));
    }

    private void requestSubscribe(LinkedHashMap<String, RequestedQoS> topics) {
        if (topics.isEmpty() || this.client == null || !this.client.isConnected()) {
            return;
        }
        this.client.subscribe(topics.entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, e -> ((RequestedQoS)((Object)((Object)e.getValue()))).toInteger()))).onComplete(result -> this.subscribeSent((AsyncResult<Integer>)result, topics));
    }

    private void requestUnsubscribe(List<String> topics) {
        if (topics.isEmpty() || this.client == null || !this.client.isConnected()) {
            return;
        }
        for (String topic : topics) {
            this.client.unsubscribe(topic).onComplete(result -> this.unsubscribeSent((AsyncResult<Integer>)result, Collections.singletonList(topic)));
        }
    }

    private void subscribeSent(AsyncResult<Integer> result, LinkedHashMap<String, RequestedQoS> topics) {
        if (result.failed() || result.result() == null) {
            for (String topic : topics.keySet()) {
                this.notifySubscriptionState(topic, SubscriptionState.UNSUBSCRIBED, null);
            }
        } else {
            for (String topic : topics.keySet()) {
                this.notifySubscriptionState(topic, SubscriptionState.SUBSCRIBING, null);
            }
            this.pendingSubscribes.put((Integer)result.result(), topics);
        }
    }

    private void unsubscribeSent(AsyncResult<Integer> result, List<String> topics) {
        if (result.failed() || result.result() == null) {
            this.closeConnection(String.format("Failed to send unsubscribe request: %s", result.cause()));
        } else {
            this.pendingUnsubscribes.put((Integer)result.result(), topics);
        }
    }

    private void subscribeCompleted(MqttSubAckMessage ack) {
        LinkedHashMap<String, RequestedQoS> request = this.pendingSubscribes.remove(ack.messageId());
        if (request == null) {
            this.closeConnection(String.format("Unexpected subscription ack response - messageId: %s", ack.messageId()));
            return;
        }
        if (request.size() != ack.grantedQoSLevels().size()) {
            this.closeConnection(String.format("Mismatch of topics on subscription ack - expected: %d, actual: %d", request.size(), ack.grantedQoSLevels().size()));
            return;
        }
        int idx = 0;
        for (String topic : request.keySet()) {
            Integer grantedQoS = (Integer)ack.grantedQoSLevels().get(idx);
            this.notifySubscriptionState(topic, SubscriptionState.SUBSCRIBED, grantedQoS);
            ++idx;
        }
    }

    private void unsubscribeCompleted(Integer messageId) {
        List<String> request = this.pendingUnsubscribes.remove(messageId);
        if (request != null) {
            for (String topic : request) {
                this.notifySubscriptionState(topic, SubscriptionState.UNSUBSCRIBED, null);
            }
        }
    }

    @Override
    public Future<Integer> publish(String topic, Buffer payload, MqttQoS qosLevel, boolean isDup, boolean isRetain) {
        Promise future = Promise.promise();
        this.vertx.runOnContext(x -> this.doPublish(topic, payload, qosLevel, isDup, isRetain).onComplete((Handler)future));
        return future.future();
    }

    private Future<Integer> doPublish(String topic, Buffer payload, MqttQoS qosLevel, boolean isDup, boolean isRetain) {
        if (this.client != null && this.client.isConnected()) {
            return this.client.publish(topic, payload, qosLevel, isDup, isRetain);
        }
        return Future.failedFuture((String)"Session is not connected");
    }

    private void publishComplete(Integer messageId) {
        Handler<Integer> handler = this.publishCompleteHandler;
        if (handler != null) {
            handler.handle((Object)messageId);
        }
    }

    private void publishExpired(Integer messageId) {
        Handler<Integer> handler = this.publishCompletionExpirationHandler;
        if (handler != null) {
            handler.handle((Object)messageId);
        }
    }

    private void publishCompletionUnknown(Integer messageId) {
        Handler<Integer> handler = this.publishCompletionUnknownPacketIdHandler;
        if (handler != null) {
            handler.handle((Object)messageId);
        }
    }
}

