/*
 * Decompiled with CFR 0.152.
 */
package com.antgroup.geaflow.operator.impl.graph.algo.vc;

import com.antgroup.geaflow.api.function.iterator.RichIteratorFunction;
import com.antgroup.geaflow.api.graph.base.algo.VertexCentricAlgo;
import com.antgroup.geaflow.api.graph.function.vc.VertexCentricCombineFunction;
import com.antgroup.geaflow.api.partition.graph.edge.IGraphVCPartition;
import com.antgroup.geaflow.collector.AbstractCollector;
import com.antgroup.geaflow.collector.ICollector;
import com.antgroup.geaflow.common.config.Configuration;
import com.antgroup.geaflow.common.exception.GeaflowRuntimeException;
import com.antgroup.geaflow.context.AbstractRuntimeContext;
import com.antgroup.geaflow.metrics.common.MetricNameFormatter;
import com.antgroup.geaflow.metrics.common.api.Meter;
import com.antgroup.geaflow.model.graph.message.IGraphMessage;
import com.antgroup.geaflow.model.record.RecordArgs;
import com.antgroup.geaflow.operator.OpArgs;
import com.antgroup.geaflow.operator.Operator;
import com.antgroup.geaflow.operator.base.AbstractOperator;
import com.antgroup.geaflow.operator.impl.graph.algo.vc.IGraphVertexCentricOp;
import com.antgroup.geaflow.operator.impl.graph.algo.vc.msgbox.GraphMsgBoxFactory;
import com.antgroup.geaflow.operator.impl.graph.algo.vc.msgbox.IGraphMsgBox;
import com.antgroup.geaflow.operator.impl.iterator.IteratorOperator;
import com.antgroup.geaflow.state.GraphState;
import com.antgroup.geaflow.state.StateFactory;
import com.antgroup.geaflow.state.descriptor.GraphStateDescriptor;
import com.antgroup.geaflow.utils.keygroup.IKeyGroupAssigner;
import com.antgroup.geaflow.utils.keygroup.KeyGroup;
import com.antgroup.geaflow.utils.keygroup.KeyGroupAssignerFactory;
import com.antgroup.geaflow.utils.keygroup.KeyGroupAssignment;
import com.antgroup.geaflow.view.IViewDesc;
import com.antgroup.geaflow.view.graph.GraphViewDesc;
import com.antgroup.geaflow.view.meta.ViewMetaBookKeeper;
import com.google.common.base.Preconditions;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class AbstractGraphVertexCentricOp<K, VV, EV, M, FUNC extends VertexCentricAlgo<K, VV, EV, M>>
extends AbstractOperator<FUNC>
implements IGraphVertexCentricOp<K, VV, EV, M>,
IteratorOperator {
    private static final Logger LOGGER = LoggerFactory.getLogger(AbstractGraphVertexCentricOp.class);
    protected int taskId;
    protected VertexCentricCombineFunction<M> msgCombineFunction;
    protected IGraphVCPartition<K> graphVCPartition;
    protected final GraphViewDesc graphViewDesc;
    protected long maxIterations;
    protected long iterations;
    protected long windowId;
    protected KeyGroup keyGroup;
    protected GraphState<K, VV, EV> graphState;
    protected IGraphMsgBox<K, M> graphMsgBox;
    protected Map<String, ICollector> collectorMap;
    protected ICollector<IGraphMessage<K, M>> messageCollector;
    protected Meter msgMeter;

    public AbstractGraphVertexCentricOp(GraphViewDesc graphViewDesc, FUNC func) {
        super(func);
        this.graphViewDesc = graphViewDesc;
        this.maxIterations = func.getMaxIterationCount();
        this.opArgs.setChainStrategy(OpArgs.ChainStrategy.NEVER);
    }

    @Override
    public void open(Operator.OpContext opContext) {
        super.open(opContext);
        this.msgCombineFunction = ((VertexCentricAlgo)this.function).getCombineFunction();
        this.graphVCPartition = ((VertexCentricAlgo)this.function).getGraphPartition();
        this.windowId = this.runtimeContext.getWindowId();
        this.msgMeter = this.metricGroup.meter(MetricNameFormatter.iterationMsgMetricName(this.getClass(), (int)this.opArgs.getOpId()));
        GraphStateDescriptor<K, VV, EV> desc = this.buildGraphStateDesc(this.opArgs.getOpName());
        desc.withMetricGroup(this.runtimeContext.getMetric());
        this.taskId = this.runtimeContext.getTaskArgs().getTaskId();
        int taskIndex = this.runtimeContext.getTaskArgs().getTaskIndex();
        LOGGER.info("opName:{} taskId:{} taskIndex:{} keyGroup:{}", new Object[]{this.opArgs.getOpName(), this.taskId, taskIndex, desc.getKeyGroup()});
        this.graphState = StateFactory.buildGraphState(desc, (Configuration)this.runtimeContext.getConfiguration());
        this.recover();
        this.collectorMap = new HashMap<String, ICollector>();
        for (ICollector collector : this.collectors) {
            this.collectorMap.put(collector.getTag(), collector);
        }
        this.messageCollector = this.collectorMap.get(RecordArgs.GraphRecordNames.Message.name());
        if (this.messageCollector instanceof AbstractCollector) {
            ((AbstractCollector)this.messageCollector).setOutputMetric(this.msgMeter);
        }
        this.graphMsgBox = GraphMsgBoxFactory.buildMessageBox(this.messageCollector, this.msgCombineFunction);
    }

    protected GraphStateDescriptor<K, VV, EV> buildGraphStateDesc(String name) {
        int taskIndex = this.runtimeContext.getTaskArgs().getTaskIndex();
        int taskPara = this.runtimeContext.getTaskArgs().getParallelism();
        IViewDesc.BackendType backendType = this.graphViewDesc.getBackend();
        GraphStateDescriptor desc = GraphStateDescriptor.build((String)this.graphViewDesc.getName(), (String)backendType.name());
        int maxPara = this.graphViewDesc.getShardNum();
        Preconditions.checkArgument((taskPara <= maxPara ? 1 : 0) != 0, (Object)String.format("task parallelism '%s' must be <= shard num(max parallelism) '%s'", taskPara, maxPara));
        this.keyGroup = KeyGroupAssignment.computeKeyGroupRangeForOperatorIndex((int)maxPara, (int)taskPara, (int)taskIndex);
        IKeyGroupAssigner keyGroupAssigner = KeyGroupAssignerFactory.createKeyGroupAssigner((KeyGroup)this.keyGroup, (int)taskIndex, (int)maxPara);
        desc.withKeyGroup(this.keyGroup);
        desc.withKeyGroupAssigner(keyGroupAssigner);
        return desc;
    }

    @Override
    public void processMessage(IGraphMessage<K, M> graphMessage) {
        if (this.enableDebug) {
            LOGGER.info("taskId:{} windowId:{} Iteration:{} add message:{}", new Object[]{this.taskId, this.windowId, this.iterations, graphMessage});
        }
        Object vertexId = graphMessage.getTargetVId();
        while (graphMessage.hasNext()) {
            this.graphMsgBox.addInMessages(vertexId, graphMessage.next());
        }
        this.opInputMeter.mark();
    }

    @Override
    public void initIteration(long iterations) {
        this.iterations = iterations;
        this.windowId = this.opContext.getRuntimeContext().getWindowId();
        ((AbstractRuntimeContext)this.runtimeContext).updateWindowId(this.windowId);
        if (this.enableDebug) {
            LOGGER.info("taskId:{} windowId:{} init Iteration:{}", new Object[]{this.taskId, this.windowId, iterations});
        }
        this.iterations = iterations;
        if (this.function instanceof RichIteratorFunction) {
            ((RichIteratorFunction)this.function).initIteration(iterations);
        }
    }

    @Override
    public long getMaxIterationCount() {
        return this.maxIterations;
    }

    @Override
    public void finishIteration(long iteration) {
        this.ticToc.tic();
        this.doFinishIteration(iteration);
        this.metricGroup.histogram(MetricNameFormatter.iterationFinishMetricName(this.getClass(), (int)this.opArgs.getOpId(), (long)iteration)).update(this.ticToc.toc());
    }

    public abstract void doFinishIteration(long var1);

    @Override
    public void close() {
        this.graphMsgBox.clearInBox();
        this.graphMsgBox.clearOutBox();
        this.graphState.manage().operate().close();
    }

    protected void recover() {
        long lastCheckPointId;
        LOGGER.info("opName: {} will do recover, windowId: {}", (Object)this.opArgs.getOpName(), (Object)this.windowId);
        try {
            ViewMetaBookKeeper keeper = new ViewMetaBookKeeper(this.graphViewDesc.getName(), this.runtimeContext.getConfiguration());
            lastCheckPointId = keeper.getLatestViewVersion(this.graphViewDesc.getName());
            LOGGER.info("opName: {} will do recover, ViewMetaBookKeeper version: {}", (Object)this.opArgs.getOpName(), (Object)lastCheckPointId);
        }
        catch (IOException e) {
            throw new GeaflowRuntimeException((Throwable)e);
        }
        if (lastCheckPointId >= 0L) {
            LOGGER.info("opName: {} do recover to state VersionId: {}", (Object)this.opArgs.getOpName(), (Object)lastCheckPointId);
            this.graphState.manage().operate().setCheckpointId(lastCheckPointId);
            this.graphState.manage().operate().recover();
        }
    }
}

