/*
 * Decompiled with CFR 0.152.
 */
package com.antgroup.geaflow.operator.impl.graph.compute.statical;

import com.antgroup.geaflow.api.context.RuntimeContext;
import com.antgroup.geaflow.api.function.iterator.RichIteratorFunction;
import com.antgroup.geaflow.api.graph.base.algo.AbstractVertexCentricComputeAlgo;
import com.antgroup.geaflow.api.graph.function.vc.VertexCentricComputeFunction;
import com.antgroup.geaflow.collector.ICollector;
import com.antgroup.geaflow.common.exception.GeaflowRuntimeException;
import com.antgroup.geaflow.common.iterator.CloseableIterator;
import com.antgroup.geaflow.model.graph.message.DefaultGraphMessage;
import com.antgroup.geaflow.model.graph.vertex.IVertex;
import com.antgroup.geaflow.model.record.RecordArgs;
import com.antgroup.geaflow.operator.OpArgs;
import com.antgroup.geaflow.operator.Operator;
import com.antgroup.geaflow.operator.impl.graph.algo.vc.context.statical.StaticGraphContextImpl;
import com.antgroup.geaflow.operator.impl.graph.algo.vc.msgbox.IGraphMsgBox;
import com.antgroup.geaflow.operator.impl.graph.compute.statical.AbstractStaticGraphVertexCentricOp;
import com.antgroup.geaflow.state.GraphState;
import com.antgroup.geaflow.view.graph.GraphViewDesc;
import java.util.Collections;
import java.util.List;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class StaticGraphVertexCentricComputeOp<K, VV, EV, M, FUNC extends VertexCentricComputeFunction<K, VV, EV, M>>
extends AbstractStaticGraphVertexCentricOp<K, VV, EV, M, AbstractVertexCentricComputeAlgo<K, VV, EV, M, FUNC>> {
    private static final Logger LOGGER = LoggerFactory.getLogger(StaticGraphVertexCentricComputeOp.class);
    protected GraphVCComputeCtxImpl graphVCComputeCtx;
    protected VertexCentricComputeFunction<K, VV, EV, M> vcComputeFunction;
    private ICollector<IVertex<K, VV>> vertexCollector;

    public StaticGraphVertexCentricComputeOp(GraphViewDesc graphViewDesc, AbstractVertexCentricComputeAlgo<K, VV, EV, M, FUNC> vcAlgorithm) {
        super(graphViewDesc, vcAlgorithm);
        this.opArgs.setOpType(OpArgs.OpType.VERTEX_CENTRIC_COMPUTE);
    }

    @Override
    public void open(Operator.OpContext opContext) {
        super.open(opContext);
        this.vcComputeFunction = ((AbstractVertexCentricComputeAlgo)this.function).getComputeFunction();
        this.graphVCComputeCtx = new GraphVCComputeCtxImpl(opContext, this.runtimeContext, this.graphState, this.graphMsgBox, this.maxIterations);
        this.vcComputeFunction.init((VertexCentricComputeFunction.VertexCentricComputeFuncContext)this.graphVCComputeCtx);
        for (ICollector collector : this.collectors) {
            if (collector.getTag().equals(RecordArgs.GraphRecordNames.Message.name()) || collector.getTag().equals(RecordArgs.GraphRecordNames.Aggregate.name())) continue;
            this.vertexCollector = collector;
        }
    }

    @Override
    public void doFinishIteration(final long iterations) {
        if (iterations == 1L) {
            for (IVertex vertex : this.graphState.staticGraph().V()) {
                Object vertexId = vertex.getId();
                this.graphVCComputeCtx.init(iterations, vertexId);
                this.vcComputeFunction.compute(vertexId, Collections.emptyIterator());
            }
        } else {
            this.graphMsgBox.processInMessage(new IGraphMsgBox.MsgProcessFunc<K, M>(){

                @Override
                public void process(K vertexId, List<M> ms) {
                    StaticGraphVertexCentricComputeOp.this.graphVCComputeCtx.init(iterations, vertexId);
                    StaticGraphVertexCentricComputeOp.this.vcComputeFunction.compute(vertexId, ms.iterator());
                }
            });
            this.graphMsgBox.clearInBox();
        }
        if (this.vcComputeFunction instanceof RichIteratorFunction) {
            ((RichIteratorFunction)this.vcComputeFunction).finishIteration(iterations);
        }
        this.graphMsgBox.processOutMessage(new IGraphMsgBox.MsgProcessFunc<K, M>(){

            @Override
            public void process(K vertexId, List<M> messages) {
                int size = messages.size();
                for (int i = 0; i < size; ++i) {
                    StaticGraphVertexCentricComputeOp.this.messageCollector.partition(vertexId, (Object)new DefaultGraphMessage(vertexId, messages.get(i)));
                }
            }
        });
        this.messageCollector.finish();
        this.graphMsgBox.clearOutBox();
    }

    @Override
    public void finish() {
        try (CloseableIterator vertexIterator = this.graphState.staticGraph().V().query().iterator();){
            while (vertexIterator.hasNext()) {
                IVertex vertex = (IVertex)vertexIterator.next();
                this.vertexCollector.partition(vertex.getId(), (Object)vertex);
            }
        }
        catch (Exception e) {
            throw new GeaflowRuntimeException((Throwable)e);
        }
        this.vcComputeFunction.finish();
        this.vertexCollector.finish();
    }

    class GraphVCComputeCtxImpl
    extends StaticGraphContextImpl<K, VV, EV, M>
    implements VertexCentricComputeFunction.VertexCentricComputeFuncContext<K, VV, EV, M> {
        public GraphVCComputeCtxImpl(Operator.OpContext opContext, RuntimeContext runtimeContext, GraphState<K, VV, EV> graphState, IGraphMsgBox<K, M> graphMsgBox, long maxIteration) {
            super(opContext, runtimeContext, graphState, graphMsgBox, maxIteration);
        }

        public void setNewVertexValue(VV value) {
            IVertex valueVertex = (IVertex)StaticGraphVertexCentricComputeOp.this.graphState.staticGraph().V().query(this.vertexId).get();
            valueVertex = valueVertex.withValue(value);
            StaticGraphVertexCentricComputeOp.this.graphState.staticGraph().V().add((Object)valueVertex);
        }
    }
}

