/*
 * Decompiled with CFR 0.152.
 */
package com.antgroup.geaflow.store.rocksdb.proxy;

import com.antgroup.geaflow.common.config.Configuration;
import com.antgroup.geaflow.common.exception.GeaflowRuntimeException;
import com.antgroup.geaflow.common.serialize.SerializerFactory;
import com.antgroup.geaflow.common.tuple.Tuple;
import com.antgroup.geaflow.model.graph.edge.IEdge;
import com.antgroup.geaflow.model.graph.vertex.IVertex;
import com.antgroup.geaflow.state.graph.encoder.IGraphKVEncoder;
import com.antgroup.geaflow.state.pushdown.IStatePushDown;
import com.antgroup.geaflow.state.pushdown.filter.inner.IGraphFilter;
import com.antgroup.geaflow.store.data.AsyncFlushMultiVersionedBuffer;
import com.antgroup.geaflow.store.data.GraphWriteMultiVersionedBuffer;
import com.antgroup.geaflow.store.rocksdb.RocksdbClient;
import com.antgroup.geaflow.store.rocksdb.proxy.SyncGraphMultiVersionedProxy;
import java.util.ArrayList;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import org.rocksdb.ColumnFamilyHandle;
import org.rocksdb.WriteBatch;

public class AsyncGraphMultiVersionedProxy<K, VV, EV>
extends SyncGraphMultiVersionedProxy<K, VV, EV> {
    private final AsyncFlushMultiVersionedBuffer<K, VV, EV> flushBuffer;

    public AsyncGraphMultiVersionedProxy(RocksdbClient rocksdbStore, IGraphKVEncoder<K, VV, EV> encoder, Configuration config) {
        super(rocksdbStore, encoder, config);
        this.flushBuffer = new AsyncFlushMultiVersionedBuffer(config, this::flush, SerializerFactory.getKryoSerializer());
    }

    private void flush(GraphWriteMultiVersionedBuffer<K, VV, EV> graphWriteBuffer) {
        if (graphWriteBuffer.getSize() == 0) {
            return;
        }
        ColumnFamilyHandle vertexCF = this.rocksdbClient.getColumnFamilyHandleMap().get("default");
        ColumnFamilyHandle indexCF = this.rocksdbClient.getColumnFamilyHandleMap().get("v_index");
        ColumnFamilyHandle edgeCF = this.rocksdbClient.getColumnFamilyHandleMap().get("e");
        WriteBatch writeBatch = new WriteBatch();
        try {
            for (Map.Entry entry : graphWriteBuffer.getVertexId2Vertex().entrySet()) {
                for (Map.Entry innerEntry : ((Map)entry.getValue()).entrySet()) {
                    Tuple tuple = this.vertexEncoder.format((IVertex)innerEntry.getValue());
                    byte[] bVersion = this.getBinaryVersion((Long)innerEntry.getKey());
                    writeBatch.put(vertexCF, this.concat(bVersion, (byte[])tuple.f0), (byte[])tuple.f1);
                    writeBatch.put(indexCF, this.concat((byte[])tuple.f0, bVersion), EMPTY_BYTES);
                }
            }
            for (Map.Entry entry : graphWriteBuffer.getVertexId2Edges().entrySet()) {
                for (Map.Entry innerEntry : ((Map)entry.getValue()).entrySet()) {
                    byte[] bVersion = this.getBinaryVersion((Long)innerEntry.getKey());
                    for (IEdge c : (List)innerEntry.getValue()) {
                        Tuple tuple = this.edgeEncoder.format(c);
                        writeBatch.put(edgeCF, this.concat(bVersion, (byte[])tuple.f0), (byte[])tuple.f1);
                    }
                }
            }
        }
        catch (Exception ex) {
            throw new GeaflowRuntimeException((Throwable)ex);
        }
        this.rocksdbClient.write(writeBatch);
        writeBatch.clear();
        writeBatch.close();
    }

    @Override
    public void addVertex(long version, IVertex<K, VV> vertex) {
        this.flushBuffer.addVertex(version, vertex);
    }

    @Override
    public void addEdge(long version, IEdge<K, EV> edge) {
        this.flushBuffer.addEdge(version, edge);
    }

    @Override
    public IVertex<K, VV> getVertex(long version, K sid, IStatePushDown pushdown) {
        IVertex vertex = this.flushBuffer.readBufferedVertex(version, sid);
        if (vertex != null) {
            return ((IGraphFilter)pushdown.getFilter()).filterVertex(vertex) ? vertex : null;
        }
        return super.getVertex(version, sid, pushdown);
    }

    @Override
    public List<IEdge<K, EV>> getEdges(long version, K sid, IStatePushDown pushdown) {
        List list = this.flushBuffer.readBufferedEdges(version, sid);
        LinkedHashSet set = new LinkedHashSet();
        list.stream().filter(arg_0 -> ((IGraphFilter)((IGraphFilter)pushdown.getFilter())).filterEdge(arg_0)).forEach(set::add);
        set.addAll(super.getEdges(version, sid, pushdown));
        return new ArrayList<IEdge<K, EV>>(set);
    }

    @Override
    public void flush() {
        this.flushBuffer.flush();
    }

    @Override
    public void close() {
        this.flushBuffer.close();
    }
}

