/*
 * Decompiled with CFR 0.152.
 */
package com.antgroup.geaflow.operator.impl.graph.materialize;

import com.antgroup.geaflow.api.graph.materialize.GraphMaterializeFunction;
import com.antgroup.geaflow.api.trait.CheckpointTrait;
import com.antgroup.geaflow.api.trait.TransactionTrait;
import com.antgroup.geaflow.common.config.Configuration;
import com.antgroup.geaflow.common.exception.GeaflowRuntimeException;
import com.antgroup.geaflow.model.graph.edge.IEdge;
import com.antgroup.geaflow.model.graph.meta.GraphMeta;
import com.antgroup.geaflow.model.graph.vertex.IVertex;
import com.antgroup.geaflow.operator.Operator;
import com.antgroup.geaflow.operator.base.window.AbstractOneInputOperator;
import com.antgroup.geaflow.state.DataModel;
import com.antgroup.geaflow.state.GraphState;
import com.antgroup.geaflow.state.StateFactory;
import com.antgroup.geaflow.state.descriptor.GraphStateDescriptor;
import com.antgroup.geaflow.utils.keygroup.IKeyGroupAssigner;
import com.antgroup.geaflow.utils.keygroup.KeyGroup;
import com.antgroup.geaflow.utils.keygroup.KeyGroupAssignerFactory;
import com.antgroup.geaflow.utils.keygroup.KeyGroupAssignment;
import com.antgroup.geaflow.view.graph.GraphViewDesc;
import com.antgroup.geaflow.view.meta.ViewMetaBookKeeper;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import java.io.IOException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class GraphViewMaterializeOp<K, VV, EV>
extends AbstractOneInputOperator<Object, GraphMaterializeFunction<K, VV, EV>>
implements TransactionTrait,
CheckpointTrait {
    private static final Logger LOGGER = LoggerFactory.getLogger(GraphViewMaterializeOp.class);
    private final GraphViewDesc graphViewDesc;
    protected transient GraphState<K, VV, EV> graphState;

    public GraphViewMaterializeOp(GraphViewDesc graphViewDesc) {
        this.graphViewDesc = graphViewDesc;
    }

    @Override
    public void open(Operator.OpContext opContext) {
        super.open(opContext);
        String name = this.graphViewDesc.getName();
        String storeType = this.graphViewDesc.getBackend().name();
        GraphStateDescriptor descriptor = GraphStateDescriptor.build((String)this.graphViewDesc.getName(), (String)storeType);
        descriptor.withDataModel(DataModel.DYNAMIC_GRAPH);
        descriptor.withGraphMeta(new GraphMeta(this.graphViewDesc.getGraphMetaType()));
        descriptor.withMetricGroup(this.runtimeContext.getMetric());
        int maxPara = this.graphViewDesc.getShardNum();
        int taskPara = this.runtimeContext.getTaskArgs().getParallelism();
        Preconditions.checkArgument((taskPara <= maxPara ? 1 : 0) != 0, (Object)String.format("task parallelism '%s' must be <= shard num(max parallelism) '%s'", taskPara, maxPara));
        int taskIndex = this.runtimeContext.getTaskArgs().getTaskIndex();
        KeyGroup keyGroup = KeyGroupAssignment.computeKeyGroupRangeForOperatorIndex((int)maxPara, (int)taskPara, (int)taskIndex);
        descriptor.withKeyGroup(keyGroup);
        IKeyGroupAssigner keyGroupAssigner = KeyGroupAssignerFactory.createKeyGroupAssigner((KeyGroup)keyGroup, (int)taskIndex, (int)maxPara);
        descriptor.withKeyGroupAssigner(keyGroupAssigner);
        int taskId = this.runtimeContext.getTaskArgs().getTaskId();
        LOGGER.info("opName:{} taskId:{} taskIndex:{} keyGroup:{}", new Object[]{name, taskId, taskIndex, keyGroup});
        this.graphState = StateFactory.buildGraphState((GraphStateDescriptor)descriptor, (Configuration)this.runtimeContext.getConfiguration());
        this.recover();
        this.function = new DynamicGraphMaterializeFunction<K, VV, EV>(this.graphState);
    }

    @Override
    protected void process(Object record) throws Exception {
        if (record instanceof IVertex) {
            ((GraphMaterializeFunction)this.function).materializeVertex((IVertex)record);
        } else {
            ((GraphMaterializeFunction)this.function).materializeEdge((IEdge)record);
        }
    }

    public void checkpoint(long windowId) {
        long checkpointId = this.graphViewDesc.getCheckpoint(windowId);
        this.graphState.manage().operate().setCheckpointId(checkpointId);
        this.graphState.manage().operate().finish();
        this.graphState.manage().operate().archive();
        LOGGER.info("do checkpoint over, checkpointId: {}", (Object)checkpointId);
    }

    public void finish(long windowId) {
    }

    public void rollback(long batchId) {
        this.recover(batchId);
    }

    @Override
    public void close() {
        if (this.graphState != null) {
            this.graphState.manage().operate().close();
        }
    }

    protected void recover() {
        this.recover(this.runtimeContext.getWindowId());
    }

    private void recover(long windowId) {
        long lastCheckPointId;
        try {
            ViewMetaBookKeeper keeper = new ViewMetaBookKeeper(this.graphViewDesc.getName(), this.runtimeContext.getConfiguration());
            lastCheckPointId = keeper.getLatestViewVersion(this.graphViewDesc.getName());
            LOGGER.info("opName: {} will do recover, ViewMetaBookKeeper version: {}", (Object)this.opArgs.getOpName(), (Object)lastCheckPointId);
        }
        catch (IOException e) {
            throw new GeaflowRuntimeException((Throwable)e);
        }
        if (lastCheckPointId >= 0L) {
            LOGGER.info("opName: {} do recover to graph VersionId: {}", (Object)this.opArgs.getOpName(), (Object)lastCheckPointId);
            this.graphState.manage().operate().setCheckpointId(lastCheckPointId);
            this.graphState.manage().operate().recover();
        } else {
            LOGGER.info("lastCheckPointId < 0, windowId: {}", (Object)windowId);
            if (windowId > 1L) {
                long recoverVersionId = this.graphViewDesc.getCheckpoint(windowId - 1L);
                LOGGER.info("opName: {} do recover to latestVersionId: {}", (Object)this.opArgs.getOpName(), (Object)recoverVersionId);
                this.graphState.manage().operate().setCheckpointId(recoverVersionId);
                this.graphState.manage().operate().recover();
            }
        }
    }

    @VisibleForTesting
    public GraphState<K, VV, EV> getGraphState() {
        return this.graphState;
    }

    private static class DynamicGraphMaterializeFunction<K, VV, EV>
    implements GraphMaterializeFunction<K, VV, EV> {
        public static final long VERSION = 0L;
        private final GraphState<K, VV, EV> graphState;

        public DynamicGraphMaterializeFunction(GraphState<K, VV, EV> graphState) {
            this.graphState = graphState;
        }

        public void materializeVertex(IVertex<K, VV> vertex) {
            this.graphState.dynamicGraph().V().add(0L, vertex);
        }

        public void materializeEdge(IEdge<K, EV> edge) {
            this.graphState.dynamicGraph().E().add(0L, edge);
        }
    }
}

