/*
 * Decompiled with CFR 0.152.
 */
package com.antgroup.geaflow.operator.impl.graph.traversal.dynamic;

import com.antgroup.geaflow.api.function.iterator.RichIteratorFunction;
import com.antgroup.geaflow.api.graph.base.algo.AbstractIncVertexCentricTraversalAlgo;
import com.antgroup.geaflow.api.graph.function.vc.IncVertexCentricTraversalFunction;
import com.antgroup.geaflow.collector.ICollector;
import com.antgroup.geaflow.model.graph.message.DefaultGraphMessage;
import com.antgroup.geaflow.model.graph.message.IGraphMessage;
import com.antgroup.geaflow.model.record.RecordArgs;
import com.antgroup.geaflow.model.traversal.ITraversalRequest;
import com.antgroup.geaflow.model.traversal.ITraversalResponse;
import com.antgroup.geaflow.operator.OpArgs;
import com.antgroup.geaflow.operator.Operator;
import com.antgroup.geaflow.operator.impl.graph.algo.vc.IGraphTraversalOp;
import com.antgroup.geaflow.operator.impl.graph.algo.vc.context.dynamic.IncGraphContextImpl;
import com.antgroup.geaflow.operator.impl.graph.algo.vc.context.dynamic.IncHistoricalGraph;
import com.antgroup.geaflow.operator.impl.graph.algo.vc.context.dynamic.TraversalIncHistoricalGraph;
import com.antgroup.geaflow.operator.impl.graph.algo.vc.msgbox.IGraphMsgBox;
import com.antgroup.geaflow.operator.impl.graph.compute.dynamic.AbstractDynamicGraphVertexCentricOp;
import com.antgroup.geaflow.view.graph.GraphViewDesc;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class AbstractDynamicGraphVertexCentricTraversalOp<K, VV, EV, M, R, FUNC extends IncVertexCentricTraversalFunction<K, VV, EV, M, R>>
extends AbstractDynamicGraphVertexCentricOp<K, VV, EV, M, AbstractIncVertexCentricTraversalAlgo<K, VV, EV, M, R, FUNC>>
implements IGraphTraversalOp<K, VV, EV, M> {
    private static final Logger LOGGER = LoggerFactory.getLogger(AbstractDynamicGraphVertexCentricTraversalOp.class);
    protected IncGraphVCTraversalCtxImpl graphVCTraversalCtx;
    protected IncVertexCentricTraversalFunction<K, VV, EV, M, R> incVcTraversalFunction;
    protected Set<K> invokeVIds;
    protected List<ITraversalResponse<R>> responses;
    protected ICollector<ITraversalResponse<R>> responseCollector;
    protected final List<ITraversalRequest<K>> traversalRequests;

    public AbstractDynamicGraphVertexCentricTraversalOp(GraphViewDesc graphViewDesc, AbstractIncVertexCentricTraversalAlgo<K, VV, EV, M, R, FUNC> incVertexCentricTraversal) {
        super(graphViewDesc, incVertexCentricTraversal);
        this.opArgs.setOpType(OpArgs.OpType.INC_VERTEX_CENTRIC_TRAVERSAL);
        this.traversalRequests = new ArrayList<ITraversalRequest<K>>();
    }

    @Override
    public void open(Operator.OpContext opContext) {
        super.open(opContext);
        this.incVcTraversalFunction = ((AbstractIncVertexCentricTraversalAlgo)this.function).getIncTraversalFunction();
        this.graphVCTraversalCtx = new IncGraphVCTraversalCtxImpl(this.getIdentify(), this.messageCollector);
        this.incVcTraversalFunction.open((IncVertexCentricTraversalFunction.IncVertexCentricTraversalFuncContext)this.graphVCTraversalCtx);
        this.invokeVIds = new HashSet<K>();
        this.responses = new ArrayList<ITraversalResponse<R>>();
        for (ICollector collector : this.collectors) {
            if (collector.getTag().equals(RecordArgs.GraphRecordNames.Message.name()) || collector.getTag().equals(RecordArgs.GraphRecordNames.Aggregate.name())) continue;
            this.responseCollector = collector;
        }
    }

    @Override
    public void doFinishIteration(final long iterations) {
        if (iterations == 1L) {
            Set vIds = this.temporaryGraphCache.getAllEvolveVId();
            this.invokeVIds.addAll(vIds);
            for (Object vId : vIds) {
                this.graphVCTraversalCtx.init(iterations, vId);
                this.incVcTraversalFunction.evolve(vId, this.graphVCTraversalCtx.getTemporaryGraph());
            }
            this.traversalByRequest();
        } else {
            this.graphMsgBox.processInMessage(new IGraphMsgBox.MsgProcessFunc<K, M>(){

                @Override
                public void process(K vertexId, List<M> messages) {
                    AbstractDynamicGraphVertexCentricTraversalOp.this.invokeVIds.add(vertexId);
                    AbstractDynamicGraphVertexCentricTraversalOp.this.graphVCTraversalCtx.init(iterations, vertexId);
                    AbstractDynamicGraphVertexCentricTraversalOp.this.incVcTraversalFunction.compute(vertexId, messages.iterator());
                }
            });
            this.graphMsgBox.clearInBox();
        }
        if (this.incVcTraversalFunction instanceof RichIteratorFunction) {
            ((RichIteratorFunction)this.incVcTraversalFunction).finishIteration(iterations);
        }
        this.graphMsgBox.processOutMessage(new IGraphMsgBox.MsgProcessFunc<K, M>(){

            @Override
            public void process(K vertexId, List<M> messages) {
                int size = messages.size();
                for (int i = 0; i < size; ++i) {
                    AbstractDynamicGraphVertexCentricTraversalOp.this.messageCollector.partition(vertexId, (Object)new DefaultGraphMessage(vertexId, messages.get(i)));
                }
            }
        });
        this.messageCollector.finish();
        this.graphMsgBox.clearOutBox();
    }

    protected void traversalByRequest() {
        Iterator<ITraversalRequest<K>> iterator = this.getTraversalRequests();
        while (iterator.hasNext()) {
            ITraversalRequest<K> traversalRequest = iterator.next();
            Object vertexId = traversalRequest.getVId();
            this.graphVCTraversalCtx.init(this.iterations, vertexId);
            this.incVcTraversalFunction.init(traversalRequest);
        }
    }

    @Override
    public void finish() {
        LOGGER.info("current batch invokeIds size:{}", (Object)this.invokeVIds.size());
        for (K k : this.invokeVIds) {
            this.graphVCTraversalCtx.init(this.iterations, k);
            this.incVcTraversalFunction.finish(k, this.graphVCTraversalCtx.getMutableGraph());
        }
        this.temporaryGraphCache.clear();
        this.invokeVIds.clear();
        this.traversalRequests.clear();
        LOGGER.info("incVcTraversalFunction finish, windowId:{}, invokeIds size:{}", (Object)this.windowId, (Object)this.invokeVIds.size());
        this.incVcTraversalFunction.finish();
        LOGGER.info("incVcTraversalFunction has finish windowId:{}", (Object)this.windowId);
        for (ITraversalResponse iTraversalResponse : this.responses) {
            this.responseCollector.partition((Object)iTraversalResponse.getResponseId(), (Object)iTraversalResponse);
        }
        this.responseCollector.finish();
        this.responses.clear();
        this.checkpoint();
        LOGGER.info("TraversalOp has finish windowId:{}", (Object)this.windowId);
    }

    @Override
    public void close() {
        super.close();
        this.incVcTraversalFunction.close();
        this.responses.clear();
    }

    @Override
    public void addRequest(ITraversalRequest<K> request) {
        LOGGER.info("add request:{}", request);
        this.traversalRequests.add(request);
    }

    @Override
    public Iterator<ITraversalRequest<K>> getTraversalRequests() {
        return this.traversalRequests.iterator();
    }

    class IncGraphVCTraversalCtxImpl
    extends IncGraphContextImpl<K, VV, EV, M>
    implements IncVertexCentricTraversalFunction.IncVertexCentricTraversalFuncContext<K, VV, EV, M, R> {
        private final ICollector<IGraphMessage<K, M>> messageCollector;
        private final String opName;
        private final IncVertexCentricTraversalFunction.TraversalHistoricalGraph<K, VV, EV> traversalHistoricalGraph;

        protected IncGraphVCTraversalCtxImpl(String opName, ICollector<IGraphMessage<K, M>> messageCollector) {
            super(AbstractDynamicGraphVertexCentricTraversalOp.this.opContext, AbstractDynamicGraphVertexCentricTraversalOp.this.runtimeContext, AbstractDynamicGraphVertexCentricTraversalOp.this.graphState, AbstractDynamicGraphVertexCentricTraversalOp.this.temporaryGraphCache, AbstractDynamicGraphVertexCentricTraversalOp.this.graphMsgBox, AbstractDynamicGraphVertexCentricTraversalOp.this.maxIterations);
            this.opName = opName;
            this.messageCollector = messageCollector;
            this.traversalHistoricalGraph = new TraversalIncHistoricalGraph((IncHistoricalGraph)super.getHistoricalGraph());
        }

        public void activeRequest(ITraversalRequest<K> request) {
        }

        public void takeResponse(ITraversalResponse<R> response) {
            AbstractDynamicGraphVertexCentricTraversalOp.this.responses.add(response);
        }

        public void broadcast(IGraphMessage<K, M> message) {
            this.messageCollector.broadcast(message);
        }

        @Override
        public IncVertexCentricTraversalFunction.TraversalHistoricalGraph<K, VV, EV> getHistoricalGraph() {
            return this.traversalHistoricalGraph;
        }

        public String getTraversalOpName() {
            return this.opName;
        }
    }
}

