/*
 * Decompiled with CFR 0.152.
 */
package com.antgroup.geaflow.operator.impl.graph.traversal.statical;

import com.antgroup.geaflow.api.context.RuntimeContext;
import com.antgroup.geaflow.api.function.iterator.RichIteratorFunction;
import com.antgroup.geaflow.api.graph.base.algo.AbstractVertexCentricTraversalAlgo;
import com.antgroup.geaflow.api.graph.function.vc.VertexCentricTraversalFunction;
import com.antgroup.geaflow.collector.ICollector;
import com.antgroup.geaflow.model.graph.message.DefaultGraphMessage;
import com.antgroup.geaflow.model.graph.message.IGraphMessage;
import com.antgroup.geaflow.model.record.RecordArgs;
import com.antgroup.geaflow.model.traversal.ITraversalRequest;
import com.antgroup.geaflow.model.traversal.ITraversalResponse;
import com.antgroup.geaflow.operator.OpArgs;
import com.antgroup.geaflow.operator.Operator;
import com.antgroup.geaflow.operator.impl.graph.algo.vc.IGraphTraversalOp;
import com.antgroup.geaflow.operator.impl.graph.algo.vc.context.dynamic.DynamicTraversalEdgeQueryImpl;
import com.antgroup.geaflow.operator.impl.graph.algo.vc.context.dynamic.DynamicTraversalVertexQueryImpl;
import com.antgroup.geaflow.operator.impl.graph.algo.vc.context.statical.StaticGraphContextImpl;
import com.antgroup.geaflow.operator.impl.graph.algo.vc.context.statical.StaticTraversalEdgeQueryImpl;
import com.antgroup.geaflow.operator.impl.graph.algo.vc.context.statical.StaticTraversalVertexQueryImpl;
import com.antgroup.geaflow.operator.impl.graph.algo.vc.msgbox.IGraphMsgBox;
import com.antgroup.geaflow.operator.impl.graph.compute.statical.AbstractStaticGraphVertexCentricOp;
import com.antgroup.geaflow.state.GraphState;
import com.antgroup.geaflow.view.graph.GraphSnapshotDesc;
import com.antgroup.geaflow.view.graph.GraphViewDesc;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class AbstractStaticGraphVertexCentricTraversalOp<K, VV, EV, M, R, FUNC extends VertexCentricTraversalFunction<K, VV, EV, M, R>>
extends AbstractStaticGraphVertexCentricOp<K, VV, EV, M, AbstractVertexCentricTraversalAlgo<K, VV, EV, M, R, FUNC>>
implements IGraphTraversalOp<K, VV, EV, M> {
    private static final Logger LOGGER = LoggerFactory.getLogger(AbstractStaticGraphVertexCentricTraversalOp.class);
    protected GraphVCTraversalCtxImpl graphVCTraversalCtx;
    protected VertexCentricTraversalFunction<K, VV, EV, M, R> vcTraversalFunction;
    protected List<ITraversalResponse<R>> responses;
    protected ICollector<ITraversalResponse<R>> responseCollector;
    protected final List<ITraversalRequest<K>> traversalRequests;

    public AbstractStaticGraphVertexCentricTraversalOp(GraphViewDesc graphViewDesc, AbstractVertexCentricTraversalAlgo<K, VV, EV, M, R, FUNC> vcTraversal) {
        super(graphViewDesc, vcTraversal);
        this.opArgs.setOpType(OpArgs.OpType.VERTEX_CENTRIC_TRAVERSAL);
        this.traversalRequests = new ArrayList<ITraversalRequest<K>>();
    }

    @Override
    public void open(Operator.OpContext opContext) {
        super.open(opContext);
        this.vcTraversalFunction = ((AbstractVertexCentricTraversalAlgo)this.function).getTraversalFunction();
        this.graphVCTraversalCtx = new GraphVCTraversalCtxImpl(opContext, this.runtimeContext, this.graphState, this.graphMsgBox, this.maxIterations, this.getIdentify(), this.messageCollector);
        this.vcTraversalFunction.open((VertexCentricTraversalFunction.VertexCentricTraversalFuncContext)this.graphVCTraversalCtx);
        this.responses = new ArrayList<ITraversalResponse<R>>();
        for (ICollector collector : this.collectors) {
            if (collector.getTag().equals(RecordArgs.GraphRecordNames.Message.name()) || collector.getTag().equals(RecordArgs.GraphRecordNames.Aggregate.name())) continue;
            this.responseCollector = collector;
        }
    }

    @Override
    public void doFinishIteration(final long iterations) {
        if (iterations == 1L) {
            this.traversalByRequest(iterations);
        } else {
            this.graphMsgBox.processInMessage(new IGraphMsgBox.MsgProcessFunc<K, M>(){

                @Override
                public void process(K vertexId, List<M> messages) {
                    AbstractStaticGraphVertexCentricTraversalOp.this.graphVCTraversalCtx.init(iterations, vertexId);
                    AbstractStaticGraphVertexCentricTraversalOp.this.vcTraversalFunction.compute(vertexId, messages.iterator());
                }
            });
            this.graphMsgBox.clearInBox();
        }
        if (this.vcTraversalFunction instanceof RichIteratorFunction) {
            ((RichIteratorFunction)this.vcTraversalFunction).finishIteration(iterations);
        }
        this.graphMsgBox.processOutMessage(new IGraphMsgBox.MsgProcessFunc<K, M>(){

            @Override
            public void process(K vertexId, List<M> messages) {
                int size = messages.size();
                for (int i = 0; i < size; ++i) {
                    AbstractStaticGraphVertexCentricTraversalOp.this.messageCollector.partition(vertexId, (Object)new DefaultGraphMessage(vertexId, messages.get(i)));
                }
            }
        });
        this.messageCollector.finish();
        this.graphMsgBox.clearOutBox();
    }

    protected void traversalByRequest(long iterations) {
        Iterator<ITraversalRequest<K>> iterator = this.getTraversalRequests();
        while (iterator.hasNext()) {
            ITraversalRequest<K> traversalRequest = iterator.next();
            Object vertexId = traversalRequest.getVId();
            this.graphVCTraversalCtx.init(iterations, vertexId);
            this.vcTraversalFunction.init(traversalRequest);
        }
    }

    @Override
    public void finish() {
        LOGGER.info("vcTraversalFunction finish windowId:{}", (Object)this.windowId);
        this.vcTraversalFunction.finish();
        LOGGER.info("vcTraversalFunction has finish windowId:{}", (Object)this.windowId);
        for (ITraversalResponse<R> response : this.responses) {
            this.responseCollector.partition((Object)response.getResponseId(), response);
        }
        this.responseCollector.finish();
        this.traversalRequests.clear();
        this.responses.clear();
        LOGGER.info("TraversalOp has finish windowId:{}", (Object)this.windowId);
    }

    @Override
    public void close() {
        this.vcTraversalFunction.close();
        super.close();
        this.responses.clear();
    }

    @Override
    public void addRequest(ITraversalRequest<K> request) {
        this.traversalRequests.add(request);
    }

    @Override
    public Iterator<ITraversalRequest<K>> getTraversalRequests() {
        return this.traversalRequests.iterator();
    }

    class GraphVCTraversalCtxImpl
    extends StaticGraphContextImpl<K, VV, EV, M>
    implements VertexCentricTraversalFunction.VertexCentricTraversalFuncContext<K, VV, EV, M, R> {
        private final String opName;
        private final ICollector<IGraphMessage<K, M>> messageCollector;

        public GraphVCTraversalCtxImpl(Operator.OpContext opContext, RuntimeContext runtimeContext, GraphState<K, VV, EV> graphState, IGraphMsgBox<K, M> graphMsgBox, long maxIteration, String opName, ICollector<IGraphMessage<K, M>> messageCollector) {
            super(opContext, runtimeContext, graphState, graphMsgBox, maxIteration);
            this.opName = opName;
            this.messageCollector = messageCollector;
        }

        public void takeResponse(ITraversalResponse response) {
            AbstractStaticGraphVertexCentricTraversalOp.this.responses.add(response);
        }

        @Override
        public VertexCentricTraversalFunction.TraversalVertexQuery<K, VV> vertex() {
            if (AbstractStaticGraphVertexCentricTraversalOp.this.graphViewDesc instanceof GraphSnapshotDesc) {
                return new DynamicTraversalVertexQueryImpl(this.vertexId, 0L, AbstractStaticGraphVertexCentricTraversalOp.this.graphState, AbstractStaticGraphVertexCentricTraversalOp.this.taskKeyGroup);
            }
            return new StaticTraversalVertexQueryImpl(this.vertexId, AbstractStaticGraphVertexCentricTraversalOp.this.graphState, AbstractStaticGraphVertexCentricTraversalOp.this.taskKeyGroup);
        }

        @Override
        public VertexCentricTraversalFunction.TraversalEdgeQuery<K, EV> edges() {
            if (AbstractStaticGraphVertexCentricTraversalOp.this.graphViewDesc instanceof GraphSnapshotDesc) {
                return new DynamicTraversalEdgeQueryImpl(this.vertexId, 0L, AbstractStaticGraphVertexCentricTraversalOp.this.graphState, AbstractStaticGraphVertexCentricTraversalOp.this.taskKeyGroup);
            }
            return new StaticTraversalEdgeQueryImpl(this.vertexId, AbstractStaticGraphVertexCentricTraversalOp.this.graphState, AbstractStaticGraphVertexCentricTraversalOp.this.taskKeyGroup);
        }

        public void broadcast(IGraphMessage<K, M> message) {
            this.messageCollector.broadcast(message);
        }

        public String getTraversalOpName() {
            return this.opName;
        }
    }
}

