/*
 * Decompiled with CFR 0.152.
 */
package com.antgroup.geaflow.operator.impl.graph.algo.vc.msgbox;

import com.antgroup.geaflow.collector.ICollector;
import com.antgroup.geaflow.model.graph.message.DefaultGraphMessage;
import com.antgroup.geaflow.model.graph.message.IGraphMessage;
import com.antgroup.geaflow.operator.impl.graph.algo.vc.msgbox.IGraphMsgBox;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

public class DirectEmitMsgBox<K, MESSAGE>
implements IGraphMsgBox<K, MESSAGE> {
    private final Map<K, List<MESSAGE>> inMessageBox = new HashMap<K, List<MESSAGE>>();
    private final ICollector<IGraphMessage<K, MESSAGE>> msgCollector;

    public DirectEmitMsgBox(ICollector<IGraphMessage<K, MESSAGE>> msgCollector) {
        this.msgCollector = msgCollector;
    }

    @Override
    public void addInMessages(K vertexId, MESSAGE message) {
        List messages = this.inMessageBox.computeIfAbsent(vertexId, k -> new ArrayList());
        messages.add(message);
    }

    @Override
    public void processInMessage(IGraphMsgBox.MsgProcessFunc<K, MESSAGE> processFunc) {
        this.processMessage(this.inMessageBox, processFunc);
    }

    @Override
    public void clearInBox() {
        this.inMessageBox.clear();
    }

    @Override
    public void addOutMessage(K vertexId, MESSAGE message) {
        this.msgCollector.partition(vertexId, (Object)new DefaultGraphMessage(vertexId, message));
    }

    @Override
    public void processOutMessage(IGraphMsgBox.MsgProcessFunc<K, MESSAGE> processFunc) {
    }

    @Override
    public void clearOutBox() {
    }

    private void processMessage(Map<K, List<MESSAGE>> messageBox, IGraphMsgBox.MsgProcessFunc<K, MESSAGE> processFunc) {
        for (Map.Entry<K, List<MESSAGE>> entry : messageBox.entrySet()) {
            K vertexId = entry.getKey();
            List<MESSAGE> messageList = entry.getValue();
            processFunc.process(vertexId, messageList);
        }
    }

    private void addMessage(Map<K, List<MESSAGE>> messageBox, K vertexId, MESSAGE message) {
        List oldMessages = messageBox.getOrDefault(vertexId, new ArrayList());
        oldMessages.add(message);
        messageBox.put(vertexId, oldMessages);
    }
}

