/*
 * Decompiled with CFR 0.152.
 */
package com.antgroup.geaflow.store.rocksdb.proxy;

import com.antgroup.geaflow.common.config.Configuration;
import com.antgroup.geaflow.common.serialize.SerializerFactory;
import com.antgroup.geaflow.common.tuple.Tuple;
import com.antgroup.geaflow.model.graph.edge.IEdge;
import com.antgroup.geaflow.model.graph.vertex.IVertex;
import com.antgroup.geaflow.state.graph.encoder.IGraphKVEncoder;
import com.antgroup.geaflow.state.pushdown.IStatePushDown;
import com.antgroup.geaflow.state.pushdown.filter.IFilter;
import com.antgroup.geaflow.state.pushdown.filter.inner.GraphFilter;
import com.antgroup.geaflow.state.pushdown.filter.inner.IGraphFilter;
import com.antgroup.geaflow.state.pushdown.limit.IEdgeLimit;
import com.antgroup.geaflow.store.data.AsyncFlushBuffer;
import com.antgroup.geaflow.store.data.GraphWriteBuffer;
import com.antgroup.geaflow.store.rocksdb.RocksdbClient;
import com.antgroup.geaflow.store.rocksdb.proxy.SyncGraphRocksdbProxy;
import com.google.common.collect.Lists;
import java.util.ArrayList;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.stream.Collectors;

public class AsyncGraphRocksdbProxy<K, VV, EV>
extends SyncGraphRocksdbProxy<K, VV, EV> {
    private final AsyncFlushBuffer<K, VV, EV> flushBuffer;

    public AsyncGraphRocksdbProxy(RocksdbClient rocksdbClient, IGraphKVEncoder<K, VV, EV> encoder, Configuration config) {
        super(rocksdbClient, encoder, config);
        this.flushBuffer = new AsyncFlushBuffer(config, this::flush, SerializerFactory.getKryoSerializer());
    }

    private void flush(GraphWriteBuffer<K, VV, EV> graphWriteBuffer) {
        if (graphWriteBuffer.getSize() == 0) {
            return;
        }
        List<Tuple<byte[], byte[]>> list = graphWriteBuffer.getVertexId2Vertex().values().stream().map(v -> this.vertexEncoder.format(v)).collect(Collectors.toList());
        this.rocksdbClient.write("default", list);
        list.clear();
        for (List edges : graphWriteBuffer.getVertexId2Edges().values()) {
            edges.forEach(e -> list.add(this.edgeEncoder.format(e)));
        }
        this.rocksdbClient.write("e", list);
    }

    @Override
    public void addVertex(IVertex<K, VV> vertex) {
        this.flushBuffer.addVertex(vertex);
    }

    @Override
    public void addEdge(IEdge<K, EV> edge) {
        this.flushBuffer.addEdge(edge);
    }

    @Override
    public IVertex<K, VV> getVertex(K sid, IStatePushDown pushdown) {
        IVertex vertex = this.flushBuffer.readBufferedVertex(sid);
        if (vertex != null) {
            return ((IGraphFilter)pushdown.getFilter()).filterVertex(vertex) ? vertex : null;
        }
        return super.getVertex(sid, pushdown);
    }

    @Override
    public List<IEdge<K, EV>> getEdges(K sid, IStatePushDown pushdown) {
        List list = this.flushBuffer.readBufferedEdges(sid);
        LinkedHashSet set = new LinkedHashSet();
        IGraphFilter filter = GraphFilter.of((IFilter)pushdown.getFilter(), (IEdgeLimit)pushdown.getEdgeLimit());
        Lists.reverse((List)list).stream().filter(arg_0 -> ((IGraphFilter)filter).filterEdge(arg_0)).forEach(set::add);
        if (!filter.dropAllRemaining()) {
            set.addAll(super.getEdges(sid, filter));
        }
        return new ArrayList<IEdge<K, EV>>(set);
    }

    @Override
    public void flush() {
        this.flushBuffer.flush();
    }

    @Override
    public void close() {
        this.flushBuffer.close();
    }
}

