/*
 * Decompiled with CFR 0.152.
 */
package org.noear.folkmq.server;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import org.noear.folkmq.common.MqMetasResolver;
import org.noear.folkmq.server.MqMessageHolder;
import org.noear.folkmq.server.MqQueue;
import org.noear.folkmq.server.MqQueueDefault;
import org.noear.folkmq.server.MqServiceInternal;
import org.noear.folkmq.server.MqServiceListener;
import org.noear.folkmq.server.MqWatcher;
import org.noear.socketd.transport.core.Entity;
import org.noear.socketd.transport.core.Message;
import org.noear.socketd.transport.core.Session;
import org.noear.socketd.transport.core.listener.EventListener;
import org.noear.socketd.utils.StrUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class MqServiceListenerBase
extends EventListener
implements MqServiceInternal {
    protected static final Logger log = LoggerFactory.getLogger(MqServiceListener.class);
    protected MqWatcher watcher;
    protected boolean brokerMode;
    protected final Object SUBSCRIBE_LOCK = new Object();
    protected final Map<String, Session> sessionAllMap = new ConcurrentHashMap<String, Session>();
    protected final Map<String, String> serverAccessMap = new ConcurrentHashMap<String, String>();
    protected final Map<String, Set<String>> subscribeMap = new ConcurrentHashMap<String, Set<String>>();
    protected final Map<String, MqQueue> queueMap = new ConcurrentHashMap<String, MqQueue>();
    protected final Map<String, String> transactionMessageMap = new ConcurrentHashMap<String, String>();
    protected Thread distributeThread;
    protected final AtomicBoolean isStarted = new AtomicBoolean(false);

    @Override
    public Collection<Session> getSessionAll() {
        return this.sessionAllMap.values();
    }

    @Override
    public int getSessionCount() {
        return this.sessionAllMap.size();
    }

    @Override
    public Map<String, Set<String>> getSubscribeMap() {
        return Collections.unmodifiableMap(this.subscribeMap);
    }

    @Override
    public Map<String, MqQueue> getQueueMap() {
        return Collections.unmodifiableMap(this.queueMap);
    }

    @Override
    public MqQueue getQueue(String queueName) {
        return this.queueMap.get(queueName);
    }

    @Override
    public void removeQueue(String queueName) {
        String[] ss = queueName.split("#");
        Set<String> tmp = this.subscribeMap.get(ss[0]);
        tmp.remove(queueName);
        this.queueMap.remove(queueName);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void subscribeDo(String topic, String consumerGroup, Session session) {
        String queueName = topic + "#" + consumerGroup;
        Object object = this.SUBSCRIBE_LOCK;
        synchronized (object) {
            MqQueue queue = this.queueGetOrInit(topic, consumerGroup, queueName);
            if (session != null) {
                log.info("Server channel subscribe topic={}, consumerGroup={}, sessionId={}", new Object[]{topic, consumerGroup, session.sessionId()});
                session.attrPut(queueName, (Object)"1");
                queue.addSession(session);
            }
        }
    }

    protected MqQueue queueGetOrInit(String topic, String consumerGroup, String queueName) {
        Set queueNameSet = this.subscribeMap.computeIfAbsent(topic, n -> Collections.newSetFromMap(new ConcurrentHashMap()));
        queueNameSet.add(queueName);
        MqQueue queue = this.queueMap.get(queueName);
        if (queue == null) {
            queue = new MqQueueDefault((MqServiceListener)this, this.watcher, topic, consumerGroup, queueName);
            this.queueMap.put(queueName, queue);
        }
        return queue;
    }

    @Override
    public void unsubscribeDo(String topic, String consumerGroup, Session session) {
        if (session == null) {
            return;
        }
        log.info("Server channel unsubscribe topic={}, consumerGroup={}, sessionId={}", new Object[]{topic, consumerGroup, session.sessionId()});
        String queueName = topic + "#" + consumerGroup;
        MqQueue queue = this.queueMap.get(queueName);
        session.attrMap().remove(queueName);
        if (queue != null) {
            queue.removeSession(session);
        }
    }

    @Override
    public void routingDo(MqMetasResolver mr, Message message) {
        Set<String> topicConsumerSet;
        String sender = mr.getSender((Entity)message);
        String key = mr.getKey((Entity)message);
        String topic = mr.getTopic((Entity)message);
        int qos = mr.getQos((Entity)message);
        int times = mr.getTimes((Entity)message);
        long expiration = mr.getExpiration((Entity)message);
        long scheduled = mr.getScheduled((Entity)message);
        boolean sequence = mr.isSequence((Entity)message);
        boolean transaction = mr.isTransaction((Entity)message);
        if (scheduled == 0L) {
            scheduled = System.currentTimeMillis();
        }
        if ((topicConsumerSet = this.subscribeMap.get(topic)) != null) {
            ArrayList<String> topicConsumerList = new ArrayList<String>(topicConsumerSet);
            for (String topicConsumer : topicConsumerList) {
                MqQueue queue = this.queueMap.get(topicConsumer);
                if (queue == null || queue.isTransaction()) continue;
                this.routingToQueueDo(mr, queue, message, key, qos, sequence, expiration, transaction, sender, times, scheduled);
            }
        }
    }

    protected void routingToQueueName(MqMetasResolver mr, Message message, String queueName) {
        String sender = mr.getSender((Entity)message);
        String key = mr.getKey((Entity)message);
        int qos = mr.getQos((Entity)message);
        int times = mr.getTimes((Entity)message);
        long expiration = mr.getExpiration((Entity)message);
        long scheduled = mr.getScheduled((Entity)message);
        boolean sequence = mr.isSequence((Entity)message);
        boolean transaction = mr.isTransaction((Entity)message);
        if (scheduled == 0L) {
            scheduled = System.currentTimeMillis();
        }
        MqQueue queue = this.queueMap.get(queueName);
        this.routingToQueueDo(mr, queue, message, key, qos, sequence, expiration, transaction, sender, times, scheduled);
    }

    @Override
    public void routingToQueueDo(MqMetasResolver mr, MqQueue queue, Message message, String key, int qos, boolean sequence, long expiration, boolean transaction, String sender, int times, long scheduled) {
        if (queue != null) {
            MqMessageHolder messageHolder = new MqMessageHolder(mr, queue.getQueueName(), queue.getConsumerGroup(), message, key, qos, sequence, expiration, transaction, sender, times, scheduled);
            queue.add(messageHolder);
        }
    }

    public void unRoutingDo(Message message) {
        String key = message.meta("mq.tid");
        if (StrUtils.isEmpty((String)key)) {
            log.warn("The key cannot be null, sid={}", (Object)message.sid());
            return;
        }
        String topic = message.meta("mq.topic");
        Set<String> topicConsumerSet = this.subscribeMap.get(topic);
        if (topicConsumerSet != null) {
            ArrayList<String> topicConsumerList = new ArrayList<String>(topicConsumerSet);
            for (String topicConsumer : topicConsumerList) {
                MqQueue queue = this.queueMap.get(topicConsumer);
                queue.removeAt(key);
            }
        }
    }

    protected void distributeDo() {
        while (!this.distributeThread.isInterrupted()) {
            try {
                int count = 0;
                if (this.isStarted.get()) {
                    ArrayList<MqQueue> queueList = new ArrayList<MqQueue>(this.queueMap.values());
                    for (MqQueue queue : queueList) {
                        try {
                            if (!queue.distribute()) continue;
                            ++count;
                        }
                        catch (Throwable e) {
                            if (!log.isWarnEnabled()) continue;
                            log.warn("MqQueue take error, queue={}", (Object)queue.getQueueName(), (Object)e);
                        }
                    }
                }
                if (count != 0) continue;
                Thread.sleep(10L);
            }
            catch (Throwable e) {
                if (e instanceof InterruptedException || !log.isWarnEnabled()) continue;
                log.warn("MqQueue distribute error", e);
            }
        }
        if (log.isWarnEnabled()) {
            log.warn("MqQueue take stoped!");
        }
    }
}

