/*
 * Decompiled with CFR 0.152.
 */
package io.moquette.broker;

import io.moquette.broker.Authorizator;
import io.moquette.broker.ClientDescriptor;
import io.moquette.broker.IQueueRepository;
import io.moquette.broker.PostOffice;
import io.moquette.broker.Session;
import io.moquette.broker.SessionCorruptedException;
import io.moquette.broker.subscriptions.ISubscriptionsDirectory;
import io.moquette.broker.subscriptions.Subscription;
import io.moquette.broker.subscriptions.Topic;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.handler.codec.mqtt.MqttConnectMessage;
import io.netty.handler.codec.mqtt.MqttQoS;
import java.net.InetSocketAddress;
import java.time.Duration;
import java.time.Instant;
import java.time.temporal.ChronoUnit;
import java.time.temporal.TemporalAmount;
import java.util.Arrays;
import java.util.Collection;
import java.util.Map;
import java.util.Optional;
import java.util.Queue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.DelayQueue;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class SessionRegistry {
    private static final Logger LOG = LoggerFactory.getLogger(SessionRegistry.class);
    private final ConcurrentMap<String, Session> pool = new ConcurrentHashMap<String, Session>();
    private final DelayQueue<Session> inactiveSessions = new DelayQueue();
    private Thread sessionCleaner;
    private final ISubscriptionsDirectory subscriptionsDirectory;
    private final IQueueRepository queueRepository;
    private final Authorizator authorizator;
    private final TemporalAmount defaultTimeout;

    SessionRegistry(ISubscriptionsDirectory subscriptionsDirectory, IQueueRepository queueRepository, Authorizator authorizator, long sessionTimeoutSeconds) {
        this.subscriptionsDirectory = subscriptionsDirectory;
        this.queueRepository = queueRepository;
        this.authorizator = authorizator;
        this.defaultTimeout = Duration.of(sessionTimeoutSeconds, ChronoUnit.SECONDS);
        this.recreateSessionPool();
    }

    private void recreateSessionPool() {
        Map<String, Queue<EnqueuedMessage>> queues = this.queueRepository.listAllQueues();
        for (String clientId : this.subscriptionsDirectory.listAllSessionIds()) {
            Queue<EnqueuedMessage> persistentQueue = queues.remove(clientId);
            if (persistentQueue == null) continue;
            Session rehydrated = new Session(clientId, false, persistentQueue);
            this.pool.put(clientId, rehydrated);
            rehydrated.setSessionRegistry(this);
            this.sessionDisconnected(rehydrated);
        }
        if (!queues.isEmpty()) {
            LOG.error("Recreating sessions left {} unused queues. This is probably bug. Session IDs: {}", (Object)queues.size(), (Object)Arrays.toString(queues.keySet().toArray()));
        }
    }

    SessionCreationResult createOrReopenSession(MqttConnectMessage msg, String clientId, String username) {
        SessionCreationResult postConnectAction;
        Session oldSession = this.retrieve(clientId);
        if (oldSession == null) {
            Session newSession = this.createNewSession(msg, clientId);
            postConnectAction = new SessionCreationResult(newSession, CreationModeEnum.CREATED_CLEAN_NEW, false);
            Session previous = this.pool.put(clientId, newSession);
            newSession.setSessionRegistry(this);
            if (previous != null) {
                LOG.error("Another thread added a Session for our clientId {}, this is a bug!", (Object)clientId);
            }
            LOG.trace("case 1, not existing session with CId {}", (Object)clientId);
        } else {
            postConnectAction = this.reopenExistingSession(msg, clientId, oldSession, username);
        }
        return postConnectAction;
    }

    private SessionCreationResult reopenExistingSession(MqttConnectMessage msg, String clientId, Session oldSession, String username) {
        SessionCreationResult creationResult;
        boolean newIsClean = msg.variableHeader().isCleanSession();
        if (!oldSession.disconnected()) {
            oldSession.closeImmediately();
        }
        if (newIsClean) {
            boolean result = oldSession.assignState(Session.SessionStatus.DISCONNECTED, Session.SessionStatus.DESTROYED);
            if (!result) {
                throw new SessionCorruptedException("old session has already changed state");
            }
            this.unsubscribe(oldSession);
            this.remove(clientId);
            Session newSession = this.createNewSession(msg, clientId);
            this.pool.put(clientId, newSession);
            newSession.setSessionRegistry(this);
            LOG.trace("case 2, oldSession with same CId {} disconnected", (Object)clientId);
            creationResult = new SessionCreationResult(newSession, CreationModeEnum.CREATED_CLEAN_NEW, true);
        } else {
            boolean connecting = oldSession.assignState(Session.SessionStatus.DISCONNECTED, Session.SessionStatus.CONNECTING);
            if (!connecting) {
                throw new SessionCorruptedException("old session moved in connected state by other thread");
            }
            this.copySessionConfig(msg, oldSession);
            this.reactivateSubscriptions(oldSession, username);
            LOG.trace("case 3, oldSession with same CId {} disconnected", (Object)clientId);
            creationResult = new SessionCreationResult(oldSession, CreationModeEnum.REOPEN_EXISTING, true);
        }
        return creationResult;
    }

    private void reactivateSubscriptions(Session session, String username) {
        for (Subscription existingSub : session.getSubscriptions()) {
            boolean topicReadable = this.authorizator.canRead(existingSub.getTopicFilter(), username, session.getClientID());
            if (topicReadable) continue;
            this.subscriptionsDirectory.removeSubscription(existingSub.getTopicFilter(), session.getClientID());
        }
    }

    private void unsubscribe(Session session) {
        for (Subscription existingSub : session.getSubscriptions()) {
            this.subscriptionsDirectory.removeSubscription(existingSub.getTopicFilter(), session.getClientID());
        }
    }

    private Session createNewSession(MqttConnectMessage msg, String clientId) {
        Session newSession;
        boolean clean = msg.variableHeader().isCleanSession();
        Queue<EnqueuedMessage> queue = this.queueRepository.createQueue(clientId, clean);
        if (msg.variableHeader().isWillFlag()) {
            Session.Will will = this.createWill(msg);
            newSession = new Session(clientId, clean, will, queue);
        } else {
            newSession = new Session(clientId, clean, queue);
        }
        newSession.markConnecting();
        return newSession;
    }

    private void copySessionConfig(MqttConnectMessage msg, Session session) {
        boolean clean = msg.variableHeader().isCleanSession();
        Session.Will will = msg.variableHeader().isWillFlag() ? this.createWill(msg) : null;
        session.update(clean, will);
    }

    private Session.Will createWill(MqttConnectMessage msg) {
        ByteBuf willPayload = Unpooled.copiedBuffer((byte[])msg.payload().willMessageInBytes());
        String willTopic = msg.payload().willTopic();
        boolean retained = msg.variableHeader().isWillRetain();
        MqttQoS qos = MqttQoS.valueOf((int)msg.variableHeader().willQos());
        return new Session.Will(willTopic, willPayload, qos, retained);
    }

    Session retrieve(String clientID) {
        return (Session)this.pool.get(clientID);
    }

    void remove(String clientID) {
        Session old = (Session)this.pool.remove(clientID);
        if (old != null) {
            old.cleanUp();
        }
    }

    Collection<ClientDescriptor> listConnectedClients() {
        return this.pool.values().stream().filter(Session::connected).map(this::createClientDescriptor).filter(Optional::isPresent).map(Optional::get).collect(Collectors.toList());
    }

    private Optional<ClientDescriptor> createClientDescriptor(Session s) {
        String clientID = s.getClientID();
        Optional<InetSocketAddress> remoteAddressOpt = s.remoteAddress();
        return remoteAddressOpt.map(r -> new ClientDescriptor(clientID, r.getHostString(), r.getPort()));
    }

    void startSessionCleaner(PostOffice postOffice) {
        LOG.info("Starting Session Cleaner.");
        this.sessionCleaner = new Thread(() -> this.cleanerLoop(postOffice));
        this.sessionCleaner.start();
    }

    public void terminate() {
        if (this.sessionCleaner != null) {
            this.sessionCleaner.interrupt();
        }
        try {
            this.sessionCleaner.join(5000L);
        }
        catch (InterruptedException ex) {
            LOG.info("Interrupted while joining session event loop {}", (Object)this.sessionCleaner.getName(), (Object)ex);
        }
    }

    void sessionDisconnected(Session session) {
        if (!session.isClean()) {
            session.setCleanupTime(Instant.now().plus(this.defaultTimeout));
            this.inactiveSessions.add(session);
            LOG.debug("Added session {} with timeout {}s", (Object)session.getClientID(), (Object)session.getDelay(TimeUnit.SECONDS));
        }
    }

    void sessionReconnected(Session session) {
        if (!session.isClean()) {
            boolean removed = this.inactiveSessions.remove(session);
            LOG.debug("Removing session {} success: {}", (Object)session.getClientID(), (Object)removed);
        }
    }

    private void cleanerLoop(PostOffice postOffice) {
        Session expired = null;
        while (!Thread.interrupted()) {
            try {
                if (expired == null) {
                    expired = (Session)this.inactiveSessions.take();
                }
                Session localExpired = expired;
                PostOffice.RouteResult routeResult = postOffice.routeCommand(expired.getClientID(), "CleanSession", () -> {
                    this.checkSession(localExpired);
                    return null;
                });
                if (!routeResult.isSuccess()) continue;
                LOG.debug("Session passed to thread for cleaning.");
                expired = null;
            }
            catch (InterruptedException ex) {
                LOG.info("SessionEventLoop {} interrupted", (Object)Thread.currentThread().getName());
                Thread.currentThread().interrupt();
            }
            catch (RuntimeException ex) {
                LOG.warn("Failed to execute Clean command.", (Throwable)ex);
            }
        }
    }

    private void checkSession(Session session) {
        if (!session.disconnected()) {
            return;
        }
        if (session.getDelay(TimeUnit.MILLISECONDS) <= 0L) {
            this.destroyTimedoutSession(session);
        }
    }

    private void destroyTimedoutSession(Session session) {
        String clientId = session.getClientID();
        LOG.debug("Destroying timed out session for client {}", (Object)clientId);
        if (session.disconnected()) {
            this.remove(clientId);
        }
    }

    public static class SessionCreationResult {
        final Session session;
        final CreationModeEnum mode;
        final boolean alreadyStored;

        public SessionCreationResult(Session session, CreationModeEnum mode, boolean alreadyStored) {
            this.session = session;
            this.mode = mode;
            this.alreadyStored = alreadyStored;
        }
    }

    public static enum CreationModeEnum {
        CREATED_CLEAN_NEW,
        REOPEN_EXISTING,
        DROP_EXISTING;

    }

    public static final class PubRelMarker
    extends EnqueuedMessage {
    }

    public static class PublishedMessage
    extends EnqueuedMessage {
        final Topic topic;
        final MqttQoS publishingQos;
        final ByteBuf payload;
        final boolean retained;

        public PublishedMessage(Topic topic, MqttQoS publishingQos, ByteBuf payload, boolean retained) {
            this.topic = topic;
            this.publishingQos = publishingQos;
            this.payload = payload;
            this.retained = false;
        }

        public Topic getTopic() {
            return this.topic;
        }

        public MqttQoS getPublishingQos() {
            return this.publishingQos;
        }

        public ByteBuf getPayload() {
            return this.payload;
        }

        @Override
        public void release() {
            this.payload.release();
        }

        @Override
        public void retain() {
            this.payload.retain();
        }
    }

    public static abstract class EnqueuedMessage {
        public void release() {
        }

        public void retain() {
        }
    }
}

