/*
 * Decompiled with CFR 0.152.
 */
package com.visualization.cloud.sse;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONException;
import com.baomidou.mybatisplus.core.conditions.Wrapper;
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import com.visualization.cloud.scriptBuilder.DAO.entity.po.VSseSendInfo;
import com.visualization.cloud.scriptBuilder.DAO.mapper.VSseSendInfoMapper;
import com.visualization.cloud.util.resp.ResultData;
import java.io.IOException;
import java.time.LocalDate;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.stream.Stream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Async;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;

@Component
public class SSEUtil {
    private static final Logger log = LoggerFactory.getLogger(SSEUtil.class);
    private static VSseSendInfoMapper vSseSendInfoMapper;

    @Autowired
    private SSEUtil(VSseSendInfoMapper vSseSendInfoMapper) {
        SSEUtil.vSseSendInfoMapper = vSseSendInfoMapper;
    }

    public static List<String> SSEHistoryDataRecovery(String key, LocalDate queryDate) {
        QueryWrapper queryWrapper = new QueryWrapper();
        queryWrapper.eq((Object)"send_key", (Object)key);
        queryWrapper.eq((Object)"send_date", (Object)queryDate);
        List vSseSendInfoList = vSseSendInfoMapper.selectList((Wrapper)queryWrapper);
        List<String> responseList = vSseSendInfoList.stream().map(VSseSendInfo::getSendData).flatMap(json -> {
            try {
                return JSON.parseArray((String)json, String.class).stream();
            }
            catch (JSONException e) {
                return Stream.empty();
            }
        }).toList();
        return responseList;
    }

    public static void simpleBroadcastEvent(String key, String data) {
        List<SseEmitter> sseEmitterList = SSEEmitterManager.sseCache.get(key);
        for (SseEmitter sseEmitter : sseEmitterList) {
            try {
                sseEmitter.send(SseEmitter.event().name("simple").data((Object)data));
            }
            catch (IOException e) {
                throw new RuntimeException(e);
            }
        }
    }

    public static List<String> getLinkingFromDB() {
        QueryWrapper queryWrapper = new QueryWrapper();
        queryWrapper.select((Object[])new String[]{"DISTINCT send_key"});
        List vSseSendInfoList = vSseSendInfoMapper.selectList((Wrapper)queryWrapper);
        List<String> distinctSeeds = vSseSendInfoList.stream().map(VSseSendInfo::getSendKey).toList();
        return distinctSeeds;
    }

    public static List<LocalDate> getLinkRecordDateFromDB(String key) {
        QueryWrapper queryWrapper = new QueryWrapper();
        queryWrapper.eq((Object)"send_key", (Object)key);
        List vSseSendInfoList = vSseSendInfoMapper.selectList((Wrapper)queryWrapper);
        List<LocalDate> linkRecordDateList = vSseSendInfoList.stream().map(VSseSendInfo::getSendDate).toList();
        return linkRecordDateList;
    }

    public static class SSEEmitterManager {
        private static final Logger log = LoggerFactory.getLogger(SSEEmitterManager.class);
        private static final Map<String, List<SseEmitter>> sseCache = new ConcurrentHashMap<String, List<SseEmitter>>();

        public static SseEmitter sseEmitterBuilder(String key, Long timeout, Long reLinkTime) {
            SseEmitter emitter = new SseEmitter(timeout);
            List emitters = sseCache.computeIfAbsent(key, k -> new ArrayList());
            emitters.add(emitter);
            emitter.onTimeout(() -> {
                log.info("\u8fde\u63a5\u8d85\u65f6: {}", (Object)key);
                SSEEmitterManager.removeEmitter(key, emitter);
            });
            emitter.onCompletion(() -> {
                log.info("\u8fde\u63a5\u5b8c\u6210: {}", (Object)key);
                SSEEmitterManager.removeEmitter(key, emitter);
            });
            emitter.onError(ex -> {
                log.error("\u8fde\u63a5\u9519\u8bef: {}, \u9519\u8bef: {}", (Object)key, (Object)ex.getMessage());
                SSEEmitterManager.removeEmitter(key, emitter);
            });
            try {
                emitter.send(SseEmitter.event().name("connect").data((Object)"\u8fde\u63a5\u6210\u529f").reconnectTime(reLinkTime.longValue()));
                log.info("\u521b\u5efa\u65b0\u8fde\u63a5: {}", (Object)key);
            }
            catch (IOException e) {
                log.error("\u53d1\u9001\u521d\u59cb\u6d88\u606f\u5931\u8d25: {}", (Object)e.getMessage());
                SSEEmitterManager.removeEmitter(key, emitter);
            }
            return emitter;
        }

        public static void removeEmitter(String key, SseEmitter emitter) {
            List<SseEmitter> emitters = sseCache.get(key);
            if (emitters != null) {
                emitters.remove(emitter);
                if (emitters.isEmpty()) {
                    sseCache.remove(key);
                    log.info("\u79fb\u9664key: {}", (Object)key);
                }
            }
        }

        public static List<SseEmitter> getConnections(String key) {
            return sseCache.getOrDefault(key, new ArrayList());
        }
    }

    @Component
    public static class MessageBuffer {
        private static final Map<String, LinkedBlockingQueue<String>> messageBuffers = new ConcurrentHashMap<String, LinkedBlockingQueue<String>>();
        private static final Map<String, LinkedBlockingQueue<String>> dbBuffers = new ConcurrentHashMap<String, LinkedBlockingQueue<String>>();
        private static final int MAX_SSE_BUFFER_SIZE = 200;
        private static final int MAX_DB_BUFFER_SIZE = 200;
        private static final int SSE_BRUSH_INTERVAL = 1000;
        private static final int DB_BRUSH_INTERVAL = 10000;

        public static void addMessage(String key, String message) {
            LinkedBlockingQueue sseBuffer = messageBuffers.computeIfAbsent(key, k -> new LinkedBlockingQueue(200));
            if (sseBuffer.size() >= 200) {
                sseBuffer.poll();
                log.warn("\u6d88\u606f\u53d1\u9001\u7f13\u5b58\u5df2\u6ee1\uff0c\u79fb\u9664\u6700\u65e7\u6d88\u606f: {}", (Object)key);
            }
            sseBuffer.offer(message);
            log.info("\u6d88\u606f\u5df2\u6dfb\u52a0\u5230 \u6d88\u606f\u53d1\u9001\u7f13\u5b58\u4e2d: {}, \u5185\u5bb9: {}", (Object)key, (Object)message);
            LinkedBlockingQueue dbBuffer = dbBuffers.computeIfAbsent(key, k -> new LinkedBlockingQueue(200));
            if (dbBuffer.size() >= 200) {
                dbBuffer.poll();
                log.warn("\u6570\u636e\u5e93\u53d1\u9001\u7f13\u5b58\u5df2\u6ee1\uff0c\u79fb\u9664\u6700\u65e7\u6d88\u606f: {}", (Object)key);
            }
            dbBuffer.offer("\"" + message + "\"");
            log.info("\u6d88\u606f\u5df2\u6dfb\u52a0\u5230 \u6570\u636e\u5e93\u53d1\u9001\u7f13\u5b58\u4e2d: {}, \u5185\u5bb9: {}", (Object)key, (Object)message);
        }

        @Scheduled(fixedRate=1000L)
        @Async
        public void sendBufferedMessages() {
            for (String key : new ArrayList<String>(messageBuffers.keySet())) {
                List<SseEmitter> emitters;
                LinkedBlockingQueue<String> buffer = messageBuffers.get(key);
                if (buffer == null || buffer.isEmpty() || (emitters = SSEEmitterManager.getConnections(key)).isEmpty()) continue;
                ArrayList messages = new ArrayList();
                buffer.drainTo(messages);
                if (messages.isEmpty()) continue;
                for (SseEmitter emitter : new ArrayList<SseEmitter>(emitters)) {
                    try {
                        emitter.send(SseEmitter.event().name("message").data(ResultData.success(messages)));
                    }
                    catch (IOException e) {
                        log.error("\u53d1\u9001\u6d88\u606f\u5931\u8d25: {}", (Object)e.getMessage());
                        SSEEmitterManager.removeEmitter(key, emitter);
                    }
                }
                log.info("\u53d1\u9001\u6279\u91cf\u6d88\u606f\u5230 {}: {} \u6761\u6d88\u606f", (Object)key, (Object)messages.size());
            }
        }

        @Scheduled(fixedRate=10000L)
        @Async
        public void saveBufferedMessagesToDb() {
            for (String key : new ArrayList<String>(dbBuffers.keySet())) {
                LinkedBlockingQueue<String> buffer = dbBuffers.get(key);
                ArrayList messages = new ArrayList();
                buffer.drainTo(messages);
                if (messages.isEmpty()) continue;
                VSseSendInfo vSseSendInfo = new VSseSendInfo();
                vSseSendInfo.setSendKey(key);
                vSseSendInfo.setSendData(((Object)messages).toString());
                vSseSendInfo.setSendDate(LocalDate.now());
                vSseSendInfoMapper.insert(vSseSendInfo);
                log.info("\u4fdd\u5b58\u6d88\u606f{},\u5230\u6570\u636e\u5e93: {} \u6761\u6d88\u606f", (Object)key, (Object)messages.size());
            }
        }
    }

    public static class MessageSender {
        private SSEEmitterManager sseEmitterManager;

        public static void sendMessage(String key, String message) {
            MessageBuffer.addMessage(key, message);
            log.info("\u5df2\u5c06\u6d88\u606f\u8f6c\u53d1\u81f3\u7f13\u5b58\u5904\u7406: {}, \u5185\u5bb9: {}", (Object)key, (Object)message);
        }
    }
}

