/*
 * Decompiled with CFR 0.152.
 */
package com.antgroup.geaflow.pdata.graph.window;

import com.antgroup.geaflow.api.function.base.KeySelector;
import com.antgroup.geaflow.api.graph.base.algo.GraphExecAlgo;
import com.antgroup.geaflow.api.graph.base.algo.VertexCentricAlgo;
import com.antgroup.geaflow.api.partition.graph.edge.CustomEdgeVCPartition;
import com.antgroup.geaflow.api.partition.graph.edge.CustomVertexVCPartition;
import com.antgroup.geaflow.api.partition.graph.edge.IGraphVCPartition;
import com.antgroup.geaflow.api.pdata.stream.window.PWindowStream;
import com.antgroup.geaflow.common.encoder.EncoderResolver;
import com.antgroup.geaflow.common.encoder.IEncoder;
import com.antgroup.geaflow.model.graph.edge.IEdge;
import com.antgroup.geaflow.model.graph.message.IGraphMessage;
import com.antgroup.geaflow.model.graph.message.encoder.GraphMessageEncoders;
import com.antgroup.geaflow.model.graph.vertex.IVertex;
import com.antgroup.geaflow.pdata.graph.window.WindowStreamGraph;
import com.antgroup.geaflow.pdata.stream.Stream;
import com.antgroup.geaflow.pdata.stream.window.WindowDataStream;
import com.antgroup.geaflow.pipeline.context.IPipelineContext;

public abstract class AbstractGraphWindow<K, VV, EV, M, R>
extends WindowDataStream<R> {
    protected long maxIterations;
    protected PWindowStream<IVertex<K, VV>> vertexStream;
    protected PWindowStream<IEdge<K, EV>> edgeStream;
    protected GraphExecAlgo graphExecAlgo;
    protected IEncoder<? extends IGraphMessage<K, M>> msgEncoder;

    public AbstractGraphWindow(IPipelineContext pipelineContext, PWindowStream<IVertex<K, VV>> vertexWindowStream, PWindowStream<IEdge<K, EV>> edgeWindowStream) {
        super(pipelineContext);
        this.vertexStream = vertexWindowStream;
        this.edgeStream = edgeWindowStream;
        this.parallelism = Math.max(this.vertexStream.getParallelism(), this.edgeStream.getParallelism());
    }

    protected void processOnVertexCentric(VertexCentricAlgo<K, VV, EV, M> vertexCentricAlgo) {
        IEncoder msgEncoder;
        this.graphExecAlgo = GraphExecAlgo.VertexCentric;
        this.maxIterations = vertexCentricAlgo.getMaxIterationCount();
        IGraphVCPartition graphPartition = vertexCentricAlgo.getGraphPartition();
        if (graphPartition == null) {
            this.input = (Stream)this.vertexStream.keyBy(new WindowStreamGraph.DefaultVertexPartition());
            this.edgeStream = this.edgeStream.keyBy(new WindowStreamGraph.DefaultEdgePartition());
        } else {
            this.input = (Stream)this.vertexStream.keyBy((KeySelector)new CustomVertexVCPartition(graphPartition));
            this.edgeStream = this.edgeStream.keyBy((KeySelector)new CustomEdgeVCPartition(graphPartition));
        }
        IEncoder keyEncoder = vertexCentricAlgo.getKeyEncoder();
        if (keyEncoder == null) {
            keyEncoder = EncoderResolver.resolveFunction(VertexCentricAlgo.class, vertexCentricAlgo, (int)0);
        }
        if ((msgEncoder = vertexCentricAlgo.getMessageEncoder()) == null) {
            msgEncoder = EncoderResolver.resolveFunction(VertexCentricAlgo.class, vertexCentricAlgo, (int)3);
        }
        this.msgEncoder = GraphMessageEncoders.build((IEncoder)keyEncoder, (IEncoder)msgEncoder);
    }

    public long getMaxIterations() {
        return this.maxIterations;
    }

    public PWindowStream<IEdge<K, EV>> getEdges() {
        return this.edgeStream;
    }

    public IEncoder<? extends IGraphMessage<K, M>> getMsgEncoder() {
        return this.msgEncoder;
    }
}

