/*
 * Decompiled with CFR 0.152.
 */
package org.gradoop.flink.algorithms.gelly.randomjump.functions;

import com.google.common.collect.Lists;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ThreadLocalRandom;
import org.apache.flink.api.common.aggregators.LongSumAggregator;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.graph.Edge;
import org.apache.flink.graph.Vertex;
import org.apache.flink.graph.pregel.ComputeFunction;
import org.apache.flink.graph.pregel.MessageIterator;
import org.apache.flink.types.LongValue;
import org.apache.flink.types.NullValue;
import org.gradoop.flink.algorithms.gelly.randomjump.functions.VCIVertexValue;

public class VCIComputeFunction
extends ComputeFunction<Long, VCIVertexValue, Long, NullValue> {
    public static final String START_INDICES_BROADCAST_SET = "startIndices";
    public static final String VERTEX_INDICES_BROADCAST_SET = "vertexIndices";
    public static final String VISITED_VERTICES_AGGREGATOR_NAME = "visitedVerticesAggregator";
    private final double jumpProbability;
    private final long verticesToVisit;
    private List<Long> startIndices;
    private List<Long> vertexIndices;
    private LongSumAggregator visitedVerticesAggregator;
    private long currentVisitedCount;

    public VCIComputeFunction(double jumpProbability, long verticesToVisit) {
        this.jumpProbability = jumpProbability;
        this.verticesToVisit = verticesToVisit;
        this.visitedVerticesAggregator = new LongSumAggregator();
        this.currentVisitedCount = 0L;
    }

    public void preSuperstep() {
        this.startIndices = (List)this.getBroadcastSet(START_INDICES_BROADCAST_SET);
        this.vertexIndices = (List)this.getBroadcastSet(VERTEX_INDICES_BROADCAST_SET);
        this.visitedVerticesAggregator = (LongSumAggregator)this.getIterationAggregator(VISITED_VERTICES_AGGREGATOR_NAME);
        LongValue previousAggregate = (LongValue)this.getPreviousIterationAggregate(VISITED_VERTICES_AGGREGATOR_NAME);
        if (previousAggregate != null) {
            this.currentVisitedCount += previousAggregate.getValue();
        }
    }

    public void compute(Vertex<Long, VCIVertexValue> vertex, MessageIterator<NullValue> messages) {
        if (this.currentVisitedCount < this.verticesToVisit) {
            ArrayList edgesList = Lists.newArrayList((Iterable)this.getEdges());
            Tuple2<VCIVertexValue, Boolean> valueWithHasChanged = Tuple2.of((Object)vertex.getValue(), (Object)false);
            if (this.startIndices.contains(vertex.getId()) && !((VCIVertexValue)((Object)valueWithHasChanged.f0)).isVisited()) {
                valueWithHasChanged = this.walkToRandomNeighbor(valueWithHasChanged, edgesList);
            } else if (messages.hasNext()) {
                for (NullValue msg : messages) {
                    valueWithHasChanged = this.walkToRandomNeighbor(valueWithHasChanged, edgesList);
                }
            }
            if (((Boolean)valueWithHasChanged.f1).booleanValue()) {
                this.setNewVertexValue(valueWithHasChanged.f0);
            }
        }
    }

    private Tuple2<VCIVertexValue, Boolean> walkToRandomNeighbor(Tuple2<VCIVertexValue, Boolean> valueWithHasChanged, List<Edge<Long, Long>> edgesList) {
        if (!((VCIVertexValue)((Object)valueWithHasChanged.f0)).isVisited()) {
            this.visitedVerticesAggregator.aggregate(1L);
            ((VCIVertexValue)((Object)valueWithHasChanged.f0)).setVisited();
            valueWithHasChanged.f1 = true;
        }
        if (this.jumpProbability == 0.0 || this.jumpProbability < ThreadLocalRandom.current().nextDouble()) {
            ArrayList<Tuple2> unvisitedNeighborWithEdgeId = new ArrayList<Tuple2>();
            for (Edge<Long, Long> edge : edgesList) {
                if (((VCIVertexValue)((Object)valueWithHasChanged.f0)).getVisitedOutEdges().contains(edge.getValue())) continue;
                unvisitedNeighborWithEdgeId.add(Tuple2.of((Object)edge.getTarget(), (Object)edge.getValue()));
            }
            if (!unvisitedNeighborWithEdgeId.isEmpty()) {
                int randomIndex = ThreadLocalRandom.current().nextInt(unvisitedNeighborWithEdgeId.size());
                Long randomNeighborIndex = (Long)((Tuple2)unvisitedNeighborWithEdgeId.get((int)randomIndex)).f0;
                ((VCIVertexValue)((Object)valueWithHasChanged.f0)).addVisitedOutEdge((Long)((Tuple2)unvisitedNeighborWithEdgeId.get((int)randomIndex)).f1);
                this.sendMessageTo(randomNeighborIndex, new NullValue());
                valueWithHasChanged.f1 = true;
            } else {
                this.jumpToRandomVertex();
            }
        } else {
            this.jumpToRandomVertex();
        }
        return valueWithHasChanged;
    }

    private void jumpToRandomVertex() {
        int randomIndex = ThreadLocalRandom.current().nextInt(this.vertexIndices.size());
        Long randomVertexIndex = this.vertexIndices.get(randomIndex);
        this.sendMessageTo(randomVertexIndex, new NullValue());
    }
}

