/*
 * Decompiled with CFR 0.152.
 */
package org.noear.folkmq.broker;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.ReentrantLock;
import org.noear.folkmq.broker.MqBorkerInternal;
import org.noear.folkmq.broker.MqBorkerListener;
import org.noear.folkmq.broker.MqDraft;
import org.noear.folkmq.broker.MqMessageHolder;
import org.noear.folkmq.broker.MqQueue;
import org.noear.folkmq.broker.MqQueueDefault;
import org.noear.folkmq.broker.MqWatcher;
import org.noear.folkmq.common.MqMetasResolver;
import org.noear.socketd.transport.core.Message;
import org.noear.socketd.transport.core.Session;
import org.noear.socketd.transport.core.listener.EventListener;
import org.noear.socketd.utils.StrUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class MqBorkerListenerBase
extends EventListener
implements MqBorkerInternal {
    protected static final Logger log = LoggerFactory.getLogger(MqBorkerListener.class);
    protected MqWatcher watcher;
    protected boolean proxyMode;
    protected final ReentrantLock subscribeLock = new ReentrantLock(true);
    protected final Map<String, Session> sessionAllMap = new ConcurrentHashMap<String, Session>();
    protected final Map<String, String> serverAccessMap = new ConcurrentHashMap<String, String>();
    protected final Map<String, Set<String>> subscribeMap = new ConcurrentHashMap<String, Set<String>>();
    protected final Map<String, MqQueue> queueMap = new ConcurrentHashMap<String, MqQueue>();
    protected final Map<String, String> transactionMessageMap = new ConcurrentHashMap<String, String>();
    protected Thread distributeThread;
    protected final AtomicBoolean isStarted = new AtomicBoolean(false);

    public String chanelType() {
        if (this.proxyMode) {
            return "proxy";
        }
        return "client";
    }

    @Override
    public Collection<Session> getSessionAll() {
        return this.sessionAllMap.values();
    }

    @Override
    public int getSessionCount() {
        return this.sessionAllMap.size();
    }

    @Override
    public Map<String, Set<String>> getSubscribeMap() {
        return Collections.unmodifiableMap(this.subscribeMap);
    }

    @Override
    public boolean hasSubscribe(String topic) {
        return this.subscribeMap.containsKey(topic);
    }

    @Override
    public Map<String, MqQueue> getQueueMap() {
        return Collections.unmodifiableMap(this.queueMap);
    }

    @Override
    public MqQueue getQueue(String queueName) {
        return this.queueMap.get(queueName);
    }

    @Override
    public void removeQueue(String queueName) {
        String[] ss = queueName.split("#");
        Set<String> tmp = this.subscribeMap.get(ss[0]);
        tmp.remove(queueName);
        this.queueMap.remove(queueName);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void subscribeDo(String topic, String consumerGroup, Session session) {
        String queueName = topic + "#" + consumerGroup;
        this.subscribeLock.lock();
        try {
            MqQueue queue = this.queueGetOrInit(topic, consumerGroup, queueName);
            if (session != null) {
                log.info("Broker: {} channel subscribe topic={}, consumerGroup={}, sessionId={}", new Object[]{this.chanelType(), topic, consumerGroup, session.sessionId()});
                session.attrPut(queueName, (Object)"1");
                queue.sessionAdd(session);
            }
        }
        finally {
            this.subscribeLock.unlock();
        }
    }

    protected MqQueue queueGetOrInit(String topic, String consumerGroup, String queueName) {
        Set queueNameSet = this.subscribeMap.computeIfAbsent(topic, n -> Collections.newSetFromMap(new ConcurrentHashMap()));
        queueNameSet.add(queueName);
        MqQueue queue = this.queueMap.get(queueName);
        if (queue == null) {
            queue = new MqQueueDefault((MqBorkerListener)this, this.watcher, topic, consumerGroup, queueName);
            this.queueMap.put(queueName, queue);
        }
        return queue;
    }

    @Override
    public void unsubscribeDo(String topic, String consumerGroup, Session session) {
        if (session == null) {
            return;
        }
        log.info("Broker: {} channel unsubscribe topic={}, consumerGroup={}, sessionId={}", new Object[]{this.chanelType(), topic, consumerGroup, session.sessionId()});
        String queueName = topic + "#" + consumerGroup;
        MqQueue queue = this.queueMap.get(queueName);
        session.attrMap().remove(queueName);
        if (queue != null) {
            queue.sessionRemove(session);
        }
    }

    @Override
    public void routingDo(MqMetasResolver mr, Message message) {
        MqDraft draft = new MqDraft(mr, message);
        Set<String> topicConsumerSet = this.subscribeMap.get(draft.topic);
        if (topicConsumerSet != null) {
            ArrayList<String> topicConsumerList = new ArrayList<String>(topicConsumerSet);
            for (String topicConsumer : topicConsumerList) {
                MqQueue queue = this.queueMap.get(topicConsumer);
                if (queue == null || queue.isTransaction()) continue;
                this.routingToQueueDo(draft, queue);
            }
        }
    }

    protected void routingToQueueName(MqDraft draft, String queueName) {
        MqQueue queue = this.queueMap.get(queueName);
        this.routingToQueueDo(draft, queue);
    }

    @Override
    public void routingToQueueDo(MqDraft draft, MqQueue queue) {
        if (queue != null) {
            MqMessageHolder messageHolder = new MqMessageHolder(draft, queue.getQueueName(), queue.getConsumerGroup());
            queue.add(messageHolder);
        }
    }

    public void unRoutingDo(Message message) {
        String key = message.meta("mq.tid");
        if (StrUtils.isEmpty((String)key)) {
            log.warn("Broker: message key cannot be null, sid={}", (Object)message.sid());
            return;
        }
        String topic = message.meta("mq.topic");
        Set<String> topicConsumerSet = this.subscribeMap.get(topic);
        if (topicConsumerSet != null) {
            ArrayList<String> topicConsumerList = new ArrayList<String>(topicConsumerSet);
            for (String topicConsumer : topicConsumerList) {
                MqQueue queue = this.queueMap.get(topicConsumer);
                queue.removeAt(key);
            }
        }
    }

    protected void distributeDo() {
        while (!this.distributeThread.isInterrupted()) {
            try {
                int count = 0;
                if (this.isStarted.get()) {
                    ArrayList<MqQueue> queueList = new ArrayList<MqQueue>(this.queueMap.values());
                    for (MqQueue queue : queueList) {
                        try {
                            if (!queue.distribute()) continue;
                            ++count;
                        }
                        catch (Throwable e) {
                            if (!log.isWarnEnabled()) continue;
                            log.warn("Broker: queue take error, queue={}", (Object)queue.getQueueName(), (Object)e);
                        }
                    }
                }
                if (count != 0) continue;
                Thread.sleep(10L);
            }
            catch (Throwable e) {
                if (e instanceof InterruptedException || !log.isWarnEnabled()) continue;
                log.warn("Broker: queue distribute error", e);
            }
        }
        if (log.isWarnEnabled()) {
            log.warn("Broker: queue take stoped!");
        }
    }
}

