/*
 * Decompiled with CFR 0.152.
 */
package com.antgroup.geaflow.operator.impl.graph.compute.dynamic;

import com.antgroup.geaflow.api.function.iterator.RichIteratorFunction;
import com.antgroup.geaflow.api.graph.base.algo.AbstractIncVertexCentricComputeAlgo;
import com.antgroup.geaflow.api.graph.function.vc.IncVertexCentricComputeFunction;
import com.antgroup.geaflow.collector.ICollector;
import com.antgroup.geaflow.model.graph.message.DefaultGraphMessage;
import com.antgroup.geaflow.model.graph.vertex.IVertex;
import com.antgroup.geaflow.model.record.RecordArgs;
import com.antgroup.geaflow.operator.OpArgs;
import com.antgroup.geaflow.operator.Operator;
import com.antgroup.geaflow.operator.impl.graph.algo.vc.IGraphVertexCentricOp;
import com.antgroup.geaflow.operator.impl.graph.algo.vc.context.dynamic.IncGraphContextImpl;
import com.antgroup.geaflow.operator.impl.graph.algo.vc.msgbox.IGraphMsgBox;
import com.antgroup.geaflow.operator.impl.graph.compute.dynamic.AbstractDynamicGraphVertexCentricOp;
import com.antgroup.geaflow.operator.impl.iterator.IteratorOperator;
import com.antgroup.geaflow.view.graph.GraphViewDesc;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class DynamicGraphVertexCentricComputeOp<K, VV, EV, M, FUNC extends IncVertexCentricComputeFunction<K, VV, EV, M>>
extends AbstractDynamicGraphVertexCentricOp<K, VV, EV, M, AbstractIncVertexCentricComputeAlgo<K, VV, EV, M, FUNC>>
implements IGraphVertexCentricOp<K, VV, EV, M>,
IteratorOperator {
    private static final Logger LOGGER = LoggerFactory.getLogger(DynamicGraphVertexCentricComputeOp.class);
    protected IncGraphComputeContextImpl graphIncVCComputeCtx;
    protected IncVertexCentricComputeFunction<K, VV, EV, M> incVCComputeFunction;
    private Set<K> invokeVIds;
    private ICollector<IVertex<K, VV>> vertexCollector;

    public DynamicGraphVertexCentricComputeOp(GraphViewDesc graphViewDesc, AbstractIncVertexCentricComputeAlgo<K, VV, EV, M, FUNC> incVCAlgorithm) {
        super(graphViewDesc, incVCAlgorithm);
        this.opArgs.setOpType(OpArgs.OpType.INC_VERTEX_CENTRIC_COMPUTE);
        this.opArgs.setChainStrategy(OpArgs.ChainStrategy.NEVER);
    }

    @Override
    public void open(Operator.OpContext opContext) {
        super.open(opContext);
        this.incVCComputeFunction = ((AbstractIncVertexCentricComputeAlgo)this.function).getIncComputeFunction();
        this.graphIncVCComputeCtx = new IncGraphComputeContextImpl();
        this.incVCComputeFunction.init((IncVertexCentricComputeFunction.IncGraphComputeContext)this.graphIncVCComputeCtx);
        this.invokeVIds = new HashSet<K>();
        for (ICollector collector : this.collectors) {
            if (collector.getTag().equals(RecordArgs.GraphRecordNames.Message.name()) || collector.getTag().equals(RecordArgs.GraphRecordNames.Aggregate.name())) continue;
            this.vertexCollector = collector;
        }
    }

    @Override
    public void doFinishIteration(final long iterations) {
        LOGGER.info("finish iteration:{}", (Object)iterations);
        if (this.iterations == 1L) {
            Set vIds = this.temporaryGraphCache.getAllEvolveVId();
            this.invokeVIds.addAll(vIds);
            for (Object vId : vIds) {
                this.graphIncVCComputeCtx.init(iterations, vId);
                this.incVCComputeFunction.evolve(vId, this.graphIncVCComputeCtx.getTemporaryGraph());
            }
        } else {
            this.graphMsgBox.processInMessage(new IGraphMsgBox.MsgProcessFunc<K, M>(){

                @Override
                public void process(K vertexId, List<M> ms) {
                    DynamicGraphVertexCentricComputeOp.this.graphIncVCComputeCtx.init(iterations, vertexId);
                    DynamicGraphVertexCentricComputeOp.this.invokeVIds.add(vertexId);
                    DynamicGraphVertexCentricComputeOp.this.incVCComputeFunction.compute(vertexId, ms.iterator());
                }
            });
            this.graphMsgBox.clearInBox();
        }
        if (this.incVCComputeFunction instanceof RichIteratorFunction) {
            ((RichIteratorFunction)this.incVCComputeFunction).finishIteration(iterations);
        }
        this.graphMsgBox.processOutMessage(new IGraphMsgBox.MsgProcessFunc<K, M>(){

            @Override
            public void process(K vertexId, List<M> messages) {
                int size = messages.size();
                for (int i = 0; i < size; ++i) {
                    DynamicGraphVertexCentricComputeOp.this.messageCollector.partition(vertexId, (Object)new DefaultGraphMessage(vertexId, messages.get(i)));
                }
            }
        });
        this.messageCollector.finish();
        this.graphMsgBox.clearOutBox();
    }

    @Override
    public void finish() {
        LOGGER.info("current batch invokeIds:{}", this.invokeVIds);
        for (K vertexId : this.invokeVIds) {
            this.graphIncVCComputeCtx.init(this.iterations, vertexId);
            this.incVCComputeFunction.finish(vertexId, this.graphIncVCComputeCtx.getMutableGraph());
        }
        this.invokeVIds.clear();
        this.temporaryGraphCache.clear();
        this.vertexCollector.finish();
        this.checkpoint();
    }

    class IncGraphComputeContextImpl
    extends IncGraphContextImpl<K, VV, EV, M>
    implements IncVertexCentricComputeFunction.IncGraphComputeContext<K, VV, EV, M> {
        public IncGraphComputeContextImpl() {
            super(DynamicGraphVertexCentricComputeOp.this.opContext, DynamicGraphVertexCentricComputeOp.this.runtimeContext, DynamicGraphVertexCentricComputeOp.this.graphState, DynamicGraphVertexCentricComputeOp.this.temporaryGraphCache, DynamicGraphVertexCentricComputeOp.this.graphMsgBox, DynamicGraphVertexCentricComputeOp.this.maxIterations);
        }

        public void collect(IVertex vertex) {
            DynamicGraphVertexCentricComputeOp.this.vertexCollector.partition(vertex.getId(), (Object)vertex);
        }
    }
}

