/*
 * Decompiled with CFR 0.152.
 */
package com.antgroup.geaflow.store.rocksdb.proxy;

import com.antgroup.geaflow.common.config.Configuration;
import com.antgroup.geaflow.common.config.keys.StateConfigKeys;
import com.antgroup.geaflow.common.errorcode.RuntimeErrors;
import com.antgroup.geaflow.common.exception.GeaflowRuntimeException;
import com.antgroup.geaflow.common.tuple.Tuple;
import com.antgroup.geaflow.model.graph.edge.IEdge;
import com.antgroup.geaflow.model.graph.vertex.IVertex;
import com.antgroup.geaflow.state.data.DataType;
import com.antgroup.geaflow.state.data.OneDegreeGraph;
import com.antgroup.geaflow.state.graph.encoder.IEdgeKVEncoder;
import com.antgroup.geaflow.state.graph.encoder.IGraphKVEncoder;
import com.antgroup.geaflow.state.graph.encoder.IVertexKVEncoder;
import com.antgroup.geaflow.state.iterator.IteratorWithFlatFn;
import com.antgroup.geaflow.state.iterator.IteratorWithFnThenFilter;
import com.antgroup.geaflow.state.pushdown.IStatePushDown;
import com.antgroup.geaflow.state.pushdown.filter.inner.IGraphFilter;
import com.antgroup.geaflow.store.iterator.KeysIterator;
import com.antgroup.geaflow.store.rocksdb.RocksdbClient;
import com.antgroup.geaflow.store.rocksdb.iterator.EdgeScanIterator;
import com.antgroup.geaflow.store.rocksdb.iterator.OneDegreeGraphScanIterator;
import com.antgroup.geaflow.store.rocksdb.iterator.RocksdbIterator;
import com.antgroup.geaflow.store.rocksdb.iterator.VertexScanIterator;
import com.antgroup.geaflow.store.rocksdb.proxy.IGraphMultiVersionedRocksdbProxy;
import com.google.common.primitives.Bytes;
import com.google.common.primitives.Longs;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.function.Predicate;

public class SyncGraphMultiVersionedProxy<K, VV, EV>
implements IGraphMultiVersionedRocksdbProxy<K, VV, EV> {
    private static final int VERSION_BYTES_SIZE = 8;
    private static final int VERTEX_INDEX_SUFFIX_SIZE = 8 + StateConfigKeys.DELIMITER.length;
    protected static final byte[] EMPTY_BYTES = new byte[0];
    protected final Configuration config;
    protected RocksdbClient rocksdbClient;
    protected IGraphKVEncoder<K, VV, EV> encoder;
    protected IEdgeKVEncoder<K, EV> edgeEncoder;
    protected IVertexKVEncoder<K, VV> vertexEncoder;

    public SyncGraphMultiVersionedProxy(RocksdbClient rocksdbStore, IGraphKVEncoder<K, VV, EV> encoder, Configuration config) {
        this.encoder = encoder;
        this.rocksdbClient = rocksdbStore;
        this.vertexEncoder = encoder.getVertexEncoder();
        this.edgeEncoder = encoder.getEdgeEncoder();
        this.config = config;
    }

    public void addVertex(long version, IVertex<K, VV> vertex) {
        Tuple tuple = this.vertexEncoder.format(vertex);
        byte[] bVersion = this.getBinaryVersion(version);
        this.rocksdbClient.write("default", this.concat(bVersion, (byte[])tuple.f0), (byte[])tuple.f1);
        this.rocksdbClient.write("v_index", this.concat((byte[])tuple.f0, bVersion), EMPTY_BYTES);
    }

    public void addEdge(long version, IEdge<K, EV> edge) {
        byte[] bVersion = this.getBinaryVersion(version);
        Tuple tuple = this.edgeEncoder.format(edge);
        this.rocksdbClient.write("e", this.concat(bVersion, (byte[])tuple.f0), (byte[])tuple.f1);
    }

    public IVertex<K, VV> getVertex(long version, K sid, IStatePushDown pushdown) {
        byte[] key = this.encoder.getKeyType().serialize(sid);
        byte[] bVersion = this.getBinaryVersion(version);
        byte[] value = this.rocksdbClient.get("default", this.concat(bVersion, key));
        if (value != null) {
            IVertex vertex = this.vertexEncoder.getVertex(key, value);
            if (pushdown == null || ((IGraphFilter)pushdown.getFilter()).filterVertex(vertex)) {
                return vertex;
            }
        }
        return null;
    }

    public List<IEdge<K, EV>> getEdges(long version, K sid, IStatePushDown pushdown) {
        ArrayList<IEdge<K, EV>> list = new ArrayList<IEdge<K, EV>>();
        byte[] bVersion = this.getBinaryVersion(version);
        byte[] prefix = this.concat(bVersion, this.edgeEncoder.getScanBytes(sid));
        IGraphFilter filter = (IGraphFilter)pushdown.getFilter();
        try (RocksdbIterator it = new RocksdbIterator(this.rocksdbClient.getIterator("e"), prefix);){
            while (it.hasNext()) {
                Tuple<byte[], byte[]> pair = it.next();
                IEdge edge = this.edgeEncoder.getEdge(this.getKeyFromVersionToKey((byte[])pair.f0), (byte[])pair.f1);
                if (!filter.filterEdge(edge)) continue;
                list.add(edge);
            }
        }
        return list;
    }

    public OneDegreeGraph<K, VV, EV> getOneDegreeGraph(long version, K sid, IStatePushDown pushdown) {
        IVertex<K, VV> vertex = this.getVertex(version, sid, pushdown);
        List<IEdge<K, EV>> edgeList = this.getEdges(version, sid, pushdown);
        OneDegreeGraph oneDegreeGraph = new OneDegreeGraph(sid, vertex, edgeList.iterator());
        if (((IGraphFilter)pushdown.getFilter()).filterOneDegreeGraph(oneDegreeGraph)) {
            return oneDegreeGraph;
        }
        return null;
    }

    public Iterator<K> vertexIDIterator() {
        this.flush();
        RocksdbIterator it = new RocksdbIterator(this.rocksdbClient.getIterator("v_index"));
        return new IteratorWithFnThenFilter((Iterator)it, tuple2 -> this.vertexEncoder.getVertexID(this.getKeyFromKeyToVersion((byte[])tuple2.f0)), new Predicate<K>(){
            K last = null;

            @Override
            public boolean test(K k) {
                boolean res = k.equals(this.last);
                this.last = k;
                return !res;
            }
        });
    }

    public Iterator<IVertex<K, VV>> getVertexIterator(long version, IStatePushDown pushdown) {
        this.flush();
        byte[] prefix = this.getVersionPrefix(version);
        RocksdbIterator it = new RocksdbIterator(this.rocksdbClient.getIterator("default"), prefix);
        return new VertexScanIterator(it, pushdown, (key, value) -> this.vertexEncoder.getVertex(this.getKeyFromVersionToKey((byte[])key), value));
    }

    public Iterator<IVertex<K, VV>> getVertexIterator(long version, List<K> keys, IStatePushDown pushdown) {
        return new KeysIterator(keys, (k, f) -> this.getVertex(version, (K)k, (IStatePushDown)f), pushdown);
    }

    public Iterator<IEdge<K, EV>> getEdgeIterator(long version, IStatePushDown pushdown) {
        this.flush();
        byte[] prefix = this.getVersionPrefix(version);
        RocksdbIterator it = new RocksdbIterator(this.rocksdbClient.getIterator("e"), prefix);
        return new EdgeScanIterator(it, pushdown, (key, value) -> this.edgeEncoder.getEdge(this.getKeyFromVersionToKey((byte[])key), value));
    }

    public Iterator<IEdge<K, EV>> getEdgeIterator(long version, List<K> keys, IStatePushDown pushdown) {
        return new IteratorWithFlatFn((Iterator)new KeysIterator(keys, (k, f) -> this.getEdges(version, (K)k, (IStatePushDown)f), pushdown), List::iterator);
    }

    public Iterator<OneDegreeGraph<K, VV, EV>> getOneDegreeGraphIterator(long version, IStatePushDown pushdown) {
        this.flush();
        return new OneDegreeGraphScanIterator<K, VV, EV>(this.encoder.getKeyType(), this.getVertexIterator(version, pushdown), this.getEdgeIterator(version, pushdown), pushdown);
    }

    public Iterator<OneDegreeGraph<K, VV, EV>> getOneDegreeGraphIterator(long version, List<K> keys, IStatePushDown pushdown) {
        return new KeysIterator(keys, (k, f) -> this.getOneDegreeGraph(version, (K)k, (IStatePushDown)f), pushdown);
    }

    public List<Long> getAllVersions(K id, DataType dataType) {
        this.flush();
        if (dataType == DataType.V || dataType == DataType.V_TOPO) {
            ArrayList<Long> list = new ArrayList<Long>();
            byte[] prefix = Bytes.concat((byte[][])new byte[][]{this.encoder.getKeyType().serialize(id), StateConfigKeys.DELIMITER});
            try (RocksdbIterator it = new RocksdbIterator(this.rocksdbClient.getIterator("v_index"), prefix);){
                while (it.hasNext()) {
                    Tuple<byte[], byte[]> pair = it.next();
                    list.add(this.getVersionFromKeyToVersion((byte[])pair.f0));
                }
            }
            return list;
        }
        throw new GeaflowRuntimeException(RuntimeErrors.INST.unsupportedError());
    }

    public long getLatestVersion(K id, DataType dataType) {
        this.flush();
        if (dataType == DataType.V || dataType == DataType.V_TOPO) {
            byte[] prefix = this.getKeyPrefix(id);
            try (RocksdbIterator it = new RocksdbIterator(this.rocksdbClient.getIterator("v_index"), prefix);){
                if (it.hasNext()) {
                    Tuple<byte[], byte[]> pair = it.next();
                    long l = this.getVersionFromKeyToVersion((byte[])pair.f0);
                    return l;
                }
            }
            return -1L;
        }
        throw new GeaflowRuntimeException(RuntimeErrors.INST.unsupportedError());
    }

    public Map<Long, IVertex<K, VV>> getAllVersionData(K id, IStatePushDown pushdown, DataType dataType) {
        List<Long> allVersions = this.getAllVersions(id, dataType);
        return this.getVersionData(id, allVersions, pushdown, dataType);
    }

    public Map<Long, IVertex<K, VV>> getVersionData(K id, Collection<Long> versions, IStatePushDown pushdown, DataType dataType) {
        if (dataType == DataType.V || dataType == DataType.V_TOPO) {
            HashMap<Long, IVertex<Long, VV>> map = new HashMap<Long, IVertex<Long, VV>>();
            for (long version : versions) {
                IVertex<K, VV> vertex = this.getVertex(version, id, pushdown);
                if (vertex == null) continue;
                map.put(version, vertex);
            }
            return map;
        }
        throw new GeaflowRuntimeException(RuntimeErrors.INST.unsupportedError());
    }

    @Override
    public RocksdbClient getClient() {
        return this.rocksdbClient;
    }

    @Override
    public void flush() {
    }

    @Override
    public void close() {
    }

    private long getVersionFromKeyToVersion(byte[] key) {
        byte[] bVersion = Arrays.copyOfRange(key, key.length - 8, key.length);
        return Long.MAX_VALUE - Longs.fromByteArray((byte[])bVersion);
    }

    protected byte[] getKeyFromKeyToVersion(byte[] key) {
        return Arrays.copyOf(key, key.length - VERTEX_INDEX_SUFFIX_SIZE);
    }

    protected byte[] getBinaryVersion(long version) {
        return Longs.toByteArray((long)(Long.MAX_VALUE - version));
    }

    protected byte[] getKeyPrefix(K id) {
        return Bytes.concat((byte[][])new byte[][]{this.encoder.getKeyType().serialize(id), StateConfigKeys.DELIMITER});
    }

    protected byte[] getVersionPrefix(long version) {
        return Bytes.concat((byte[][])new byte[][]{this.getBinaryVersion(version), StateConfigKeys.DELIMITER});
    }

    protected byte[] getKeyFromVersionToKey(byte[] key) {
        return Arrays.copyOfRange(key, 10, key.length);
    }

    protected byte[] concat(byte[] a, byte[] b) {
        return Bytes.concat((byte[][])new byte[][]{a, StateConfigKeys.DELIMITER, b});
    }
}

