/*
 * Decompiled with CFR 0.152.
 */
package org.noear.folkmq.client;

import java.io.IOException;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Set;
import org.noear.folkmq.client.MqAlarm;
import org.noear.folkmq.client.MqClientDefault;
import org.noear.folkmq.client.MqMessageReceivedImpl;
import org.noear.folkmq.client.MqSubscription;
import org.noear.snack.ONode;
import org.noear.socketd.exception.SocketDAlarmException;
import org.noear.socketd.transport.core.Entity;
import org.noear.socketd.transport.core.Message;
import org.noear.socketd.transport.core.Session;
import org.noear.socketd.transport.core.entity.EntityDefault;
import org.noear.socketd.transport.core.entity.StringEntity;
import org.noear.socketd.transport.core.listener.EventListener;
import org.noear.socketd.utils.RunUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class MqClientListener
extends EventListener {
    private static final Logger log = LoggerFactory.getLogger(MqClientListener.class);
    private MqClientDefault client;

    protected void init(MqClientDefault client) {
        this.client = client;
        this.doOn("mq.event.distribute", (s, m) -> {
            block6: {
                try {
                    MqMessageReceivedImpl message = new MqMessageReceivedImpl(client, s, m);
                    try {
                        if (message.isSequence()) {
                            RunUtils.single(() -> this.onReceive(s, m, message, false));
                            break block6;
                        }
                        if (client.consumeExecutor == null) {
                            RunUtils.async(() -> this.onReceive(s, m, message, false));
                            break block6;
                        }
                        client.consumeExecutor.submit(() -> this.onReceive(s, m, message, false));
                    }
                    catch (Throwable e) {
                        log.warn("Client consume handle error, sid={}", (Object)m.sid(), (Object)e);
                        client.reply(s, message, false, (Entity)new MqAlarm(e.getMessage()));
                    }
                }
                catch (Throwable e) {
                    log.warn("Client consume handle error, sid={}", (Object)m.sid(), (Object)e);
                }
            }
        });
        this.doOn("mq.event.request", (s, m) -> {
            block5: {
                try {
                    MqMessageReceivedImpl message = new MqMessageReceivedImpl(client, s, m);
                    try {
                        if (client.consumeExecutor == null) {
                            RunUtils.async(() -> this.onReceive(s, m, message, true));
                            break block5;
                        }
                        client.consumeExecutor.submit(() -> this.onReceive(s, m, message, true));
                    }
                    catch (Throwable e) {
                        log.warn("Client consume handle error, sid={}", (Object)m.sid(), (Object)e);
                        client.reply(s, message, false, (Entity)new MqAlarm(e.getMessage()));
                    }
                }
                catch (Throwable e) {
                    log.warn("Client consume handle error, sid={}", (Object)m.sid(), (Object)e);
                }
            }
        });
    }

    protected void onReceive(Session s, Message m, MqMessageReceivedImpl message, boolean isRequest) {
        block21: {
            if (isRequest) {
                try {
                    if (message.isTransaction()) {
                        if (this.client.transactionCheckback != null) {
                            this.client.transactionCheckback.check(message);
                        } else {
                            s.sendAlarm(m, "Client no checkback handler!");
                        }
                        break block21;
                    }
                    if (this.client.listenHandler != null) {
                        this.client.listenHandler.consume(message);
                        break block21;
                    }
                    s.sendAlarm(m, "Client no request handler!");
                }
                catch (Throwable e) {
                    try {
                        if (s.isValid()) {
                            s.sendAlarm(m, "Client request handle error:" + e.getMessage());
                        }
                        log.warn("Client request handle error, key={}", (Object)message.getKey(), (Object)e);
                    }
                    catch (Throwable err) {
                        log.warn("Client request handle error, key={}", (Object)message.getKey(), (Object)e);
                    }
                }
            } else {
                MqSubscription subscription = this.client.getSubscription(message.getFullTopic(), message.getConsumerGroup());
                try {
                    if (subscription != null) {
                        subscription.consume(message);
                        if (subscription.isAutoAck()) {
                            this.client.reply(s, message, true, null);
                        }
                    } else {
                        this.client.reply(s, message, false, null);
                    }
                }
                catch (Throwable e) {
                    try {
                        if (subscription != null) {
                            if (subscription.isAutoAck()) {
                                this.client.reply(s, message, false, null);
                            }
                        } else {
                            this.client.reply(s, message, false, null);
                        }
                        log.warn("Client consume handle error, key={}", (Object)message.getKey(), (Object)e);
                    }
                    catch (Throwable err) {
                        log.warn("Client consume handle error, key={}", (Object)message.getKey(), (Object)e);
                    }
                }
            }
        }
    }

    public void onOpen(Session session) throws IOException {
        super.onOpen(session);
        log.info("Client session opened, sessionId={}", (Object)session.sessionId());
        if (this.client.getSubscriptionSize() == 0) {
            return;
        }
        HashMap<String, Set> subscribeData = new HashMap<String, Set>();
        for (MqSubscription subscription : this.client.getSubscriptionAll()) {
            Set queueNameSet = subscribeData.computeIfAbsent(subscription.getTopic(), n -> new HashSet());
            queueNameSet.add(subscription.getQueueName());
        }
        String json = ONode.stringify(subscribeData);
        EntityDefault entity = new StringEntity(json).metaPut("mq.batch", "1").metaPut("X-Unlimited", "1").at("folkmq-server");
        session.sendAndRequest("mq.event.subscribe", (Entity)entity, 30000L).await();
        log.info("Client onOpen batch subscribe successfully, sessionId={}", (Object)session.sessionId());
    }

    public void onClose(Session session) {
        super.onClose(session);
        log.info("Client session closed, sessionId={}", (Object)session.sessionId());
    }

    public void onError(Session session, Throwable error) {
        super.onError(session, error);
        if (log.isWarnEnabled()) {
            if (error instanceof SocketDAlarmException) {
                SocketDAlarmException alarmException = (SocketDAlarmException)error;
                log.warn("Client error, sessionId={}, from={}", new Object[]{session.sessionId(), alarmException.getAlarm(), error});
            } else {
                log.warn("Client error, sessionId={}", (Object)session.sessionId(), (Object)error);
            }
        }
    }
}

