/*
 * Decompiled with CFR 0.152.
 */
package com.antgroup.geaflow.store.data;

import com.antgroup.geaflow.common.config.Configuration;
import com.antgroup.geaflow.common.config.keys.StateConfigKeys;
import com.antgroup.geaflow.common.errorcode.RuntimeErrors;
import com.antgroup.geaflow.common.exception.GeaflowRuntimeException;
import com.antgroup.geaflow.common.serialize.ISerializer;
import com.antgroup.geaflow.common.utils.ExecutorUtil;
import com.antgroup.geaflow.model.graph.edge.IEdge;
import com.antgroup.geaflow.model.graph.vertex.IVertex;
import com.antgroup.geaflow.store.data.GraphWriteBuffer;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;
import org.apache.commons.lang3.concurrent.BasicThreadFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class AsyncFlushBuffer<K, VV, EV> {
    private static final Logger LOGGER = LoggerFactory.getLogger(AsyncFlushBuffer.class);
    private static final int SLEEP_MILLI_SECOND = 100;
    private final boolean deepCopy;
    private final int bufferNum;
    private final int bufferSize;
    private final Consumer<GraphWriteBuffer<K, VV, EV>> flushFun;
    private final ISerializer serializer;
    private final ExecutorUtil.ExceptionHandler flushError;
    private List<GraphWriteBuffer<K, VV, EV>> buffers;
    private int curWriteBufferIdx;
    private ThreadPoolExecutor flushService;
    protected AtomicLong writeCounter = new AtomicLong(0L);
    protected AtomicLong flushCounter = new AtomicLong(0L);
    protected volatile Throwable exp;

    public AsyncFlushBuffer(Configuration config, Consumer<GraphWriteBuffer<K, VV, EV>> flushFun, ISerializer serializer) {
        this.flushFun = flushFun;
        this.serializer = serializer;
        this.deepCopy = config.getBoolean(StateConfigKeys.STATE_WRITE_BUFFER_DEEP_COPY);
        this.bufferNum = config.getInteger(StateConfigKeys.STATE_WRITE_BUFFER_NUMBER);
        this.bufferSize = config.getInteger(StateConfigKeys.STATE_WRITE_BUFFER_SIZE);
        this.flushError = exception -> {
            this.exp = exception;
            LOGGER.error("flush error", exception);
        };
        this.initBuffer();
    }

    private void initBuffer() {
        this.buffers = new ArrayList<GraphWriteBuffer<K, VV, EV>>(this.bufferNum);
        for (int i = 0; i < this.bufferNum; ++i) {
            this.buffers.add(new GraphWriteBuffer(this.bufferSize));
        }
        this.curWriteBufferIdx = 0;
        this.flushService = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(this.bufferNum + 1), (ThreadFactory)new BasicThreadFactory.Builder().namingPattern("flush-%d").build());
    }

    public void flush() {
        if (this.writeCounter.get() == 0L) {
            return;
        }
        for (int i = 0; i < this.bufferNum; ++i) {
            if (this.buffers.get(i).isFlushing()) continue;
            this.tryFlushBuffer(i, true);
        }
        ExecutorUtil.spinLockMs(() -> this.flushCounter.get() == this.writeCounter.get(), this::exceptionCheck, (long)100L);
        this.flushCounter.set(0L);
        this.writeCounter.set(0L);
        this.exceptionCheck();
    }

    private void exceptionCheck() {
        if (this.exp != null) {
            LOGGER.error("encounter exception");
            throw new GeaflowRuntimeException(RuntimeErrors.INST.runError(this.exp.getMessage()), this.exp);
        }
    }

    private void tryFlushBuffer(int idx, boolean force) {
        boolean needFlush;
        GraphWriteBuffer<K, VV, EV> buffer = this.buffers.get(idx);
        boolean bl = needFlush = buffer.needFlush() || force && buffer.getSize() > 0;
        if (!needFlush) {
            return;
        }
        buffer.setFlushing();
        this.exceptionCheck();
        ExecutorUtil.execute((ExecutorService)this.flushService, () -> this.flushWriteBuffer(buffer), (ExecutorUtil.ExceptionHandler)this.flushError);
        int toWriteBufferIdx = (idx + 1) % this.buffers.size();
        ExecutorUtil.spinLockMs(() -> !this.buffers.get(toWriteBufferIdx).needFlush(), this::exceptionCheck, (long)100L);
        this.curWriteBufferIdx = toWriteBufferIdx;
    }

    private void flushWriteBuffer(GraphWriteBuffer<K, VV, EV> buffer) {
        this.flushFun.accept(buffer);
        this.flushCounter.addAndGet(buffer.getSize());
        buffer.clear();
    }

    public IVertex<K, VV> readBufferedVertex(K id) {
        for (int i = 0; i < this.bufferNum; ++i) {
            int idx = (this.bufferNum + this.curWriteBufferIdx - i) % this.bufferNum;
            GraphWriteBuffer<K, VV, EV> buffer = this.buffers.get(idx);
            IVertex vertex = buffer.getVertexId2Vertex().get(id);
            if (vertex == null) continue;
            return this.deepCopy ? (IVertex)this.serializer.copy(vertex) : vertex;
        }
        return null;
    }

    public List<IEdge<K, EV>> readBufferedEdges(K srcId) {
        ArrayList<IEdge<K, EV>> list = new ArrayList<IEdge<K, EV>>();
        for (int i = 0; i < this.bufferNum; ++i) {
            GraphWriteBuffer<K, VV, EV> buffer = this.buffers.get(i);
            List<IEdge<K, EV>> edgeList = buffer.getVertexId2Edges().get(srcId);
            if (edgeList == null) continue;
            list.addAll(this.deepCopy ? (Collection)this.serializer.copy(edgeList) : edgeList);
        }
        return list;
    }

    public void addVertex(IVertex<K, VV> vertex) {
        this.writeCounter.incrementAndGet();
        this.buffers.get(this.curWriteBufferIdx).addVertex(vertex);
        this.tryFlushBuffer(this.curWriteBufferIdx, false);
    }

    public void addEdge(IEdge<K, EV> edge) {
        this.writeCounter.incrementAndGet();
        this.buffers.get(this.curWriteBufferIdx).addEdge(edge);
        this.tryFlushBuffer(this.curWriteBufferIdx, false);
    }

    public void close() {
        this.flushService.shutdown();
        this.buffers.forEach(GraphWriteBuffer::clear);
        this.buffers.clear();
    }
}

