/*
 * Decompiled with CFR 0.152.
 */
package com.antgroup.geaflow.operator.impl.graph.source;

import com.antgroup.geaflow.api.function.io.GraphSourceFunction;
import com.antgroup.geaflow.collector.ICollector;
import com.antgroup.geaflow.common.exception.GeaflowRuntimeException;
import com.antgroup.geaflow.metrics.common.MetricNameFormatter;
import com.antgroup.geaflow.metrics.common.api.Meter;
import com.antgroup.geaflow.model.graph.GraphRecord;
import com.antgroup.geaflow.model.graph.edge.IEdge;
import com.antgroup.geaflow.model.graph.vertex.IVertex;
import com.antgroup.geaflow.operator.Operator;
import com.antgroup.geaflow.operator.impl.io.WindowSourceOperator;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class GraphSourceOperator<K, VV, EV>
extends WindowSourceOperator<GraphRecord<IVertex<K, VV>, IEdge<K, EV>>> {
    public static final String EDGE_TAG = "edge";
    public static final String VERTEX_TAG = "vertex";
    private static final Logger LOGGER = LoggerFactory.getLogger(GraphSourceOperator.class);
    private GraphSourceFunction.GraphSourceContext<K, VV, EV> sourceCxt;
    private GraphSourceFunction<K, VV, EV> sourceFunction;
    private long edgeCnt;
    private long vertexCnt;
    private long filteredVertexCnt;
    private boolean isDedupEnabled;
    private Set<K> vertexIdSet;
    protected Meter vertexTps;
    protected Meter edgeTps;

    public GraphSourceOperator() {
    }

    public GraphSourceOperator(GraphSourceFunction<K, VV, EV> sourceFunction) {
        this.sourceFunction = sourceFunction;
        this.vertexIdSet = new HashSet<K>();
    }

    @Override
    public void open(Operator.OpContext opContext) {
        super.open(opContext);
        this.vertexTps = this.metricGroup.meter(MetricNameFormatter.vertexTpsMetricName(this.getClass(), (int)this.opArgs.getOpId()));
        this.edgeTps = this.metricGroup.meter(MetricNameFormatter.edgeTpsMetricName(this.getClass(), (int)this.opArgs.getOpId()));
        this.sourceCxt = new DefaultGraphSourceContext();
    }

    public void emitRecord(long batchId) {
        try {
            this.sourceFunction.fetch(batchId, this.sourceCxt);
            this.vertexIdSet.clear();
            LOGGER.info("totalVertex: {}, filteredVertex: {}", (Object)this.vertexCnt, (Object)this.filteredVertexCnt);
        }
        catch (Exception e) {
            LOGGER.error(e.getMessage(), (Throwable)e);
            throw new GeaflowRuntimeException((Throwable)e);
        }
    }

    private boolean filterVertex(K vertexId) {
        if (this.vertexIdSet.contains(vertexId)) {
            ++this.filteredVertexCnt;
            return true;
        }
        this.vertexIdSet.add(vertexId);
        return false;
    }

    class DefaultGraphSourceContext
    implements GraphSourceFunction.GraphSourceContext<K, VV, EV> {
        private final List<ICollector> vertexCollectors = new ArrayList<ICollector>();
        private final List<ICollector> edgeCollectors = new ArrayList<ICollector>();

        public DefaultGraphSourceContext() {
            this.filterCollectors(GraphSourceOperator.this.collectors, this.vertexCollectors, this.edgeCollectors);
        }

        public void collectVertex(IVertex<K, VV> vertex) throws Exception {
            if (GraphSourceOperator.this.isDedupEnabled && GraphSourceOperator.this.filterVertex(vertex.getId())) {
                return;
            }
            this.collect(new GraphRecord(vertex));
        }

        public void collectEdge(IEdge<K, EV> edge) throws Exception {
            this.collect(new GraphRecord(edge));
        }

        private void filterCollectors(List<ICollector> collectors, List<ICollector> vertexCollectors, List<ICollector> edgeCollectors) {
            for (ICollector collector : collectors) {
                int collectorId = collector.getId();
                String outputTag = (String)GraphSourceOperator.this.outputTags.get(collectorId);
                if (GraphSourceOperator.VERTEX_TAG.equals(outputTag)) {
                    vertexCollectors.add(collector);
                    continue;
                }
                if (GraphSourceOperator.EDGE_TAG.equals(outputTag)) {
                    edgeCollectors.add(collector);
                    continue;
                }
                throw new GeaflowRuntimeException("unrecognized tag: " + outputTag);
            }
        }

        public boolean collect(GraphRecord<IVertex<K, VV>, IEdge<K, EV>> element) throws Exception {
            if (element.getViewType() == GraphRecord.ViewType.vertex) {
                for (ICollector collector : this.vertexCollectors) {
                    collector.partition(((IVertex)element.getVertex()).getId(), element.getVertex());
                    GraphSourceOperator.this.vertexCnt++;
                    GraphSourceOperator.this.vertexTps.mark();
                }
            } else {
                for (ICollector collector : this.edgeCollectors) {
                    collector.partition(((IEdge)element.getEdge()).getSrcId(), element.getEdge());
                    GraphSourceOperator.this.edgeCnt++;
                    GraphSourceOperator.this.edgeTps.mark();
                }
            }
            return true;
        }
    }
}

