/*
 * Decompiled with CFR 0.152.
 */
package com.antgroup.geaflow.stats.sink;

import com.alibaba.fastjson.JSON;
import com.antgroup.geaflow.common.config.Configuration;
import com.antgroup.geaflow.common.config.keys.ExecutionConfigKeys;
import com.antgroup.geaflow.common.tuple.Tuple;
import com.antgroup.geaflow.common.utils.SleepUtils;
import com.antgroup.geaflow.common.utils.ThreadUtil;
import com.antgroup.geaflow.state.DataModel;
import com.antgroup.geaflow.state.serializer.DefaultKVSerializer;
import com.antgroup.geaflow.state.serializer.IKeySerializer;
import com.antgroup.geaflow.stats.sink.IStatsWriter;
import com.antgroup.geaflow.store.IStoreBuilder;
import com.antgroup.geaflow.store.api.key.IKVStore;
import com.antgroup.geaflow.store.api.key.StoreBuilderFactory;
import com.antgroup.geaflow.store.context.StoreContext;
import java.util.LinkedList;
import java.util.Queue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class AsyncKvStoreWriter
implements IStatsWriter {
    private static final Logger LOGGER = LoggerFactory.getLogger(AsyncKvStoreWriter.class);
    private static final int MAX_METRIC_QUEUE_SIZE = 1024;
    private static final String DEFAULT_NAMESPACE = "default";
    private final int batchFlushSize;
    private final int flushIntervalMs;
    private volatile boolean running;
    private final IKVStore<String, String> kvStore;
    private final Queue<Tuple<String, Object>> metricQueue;
    private final ExecutorService executorService;

    public AsyncKvStoreWriter(Configuration configuration) {
        this.batchFlushSize = configuration.getInteger(ExecutionConfigKeys.STATS_METRIC_FLUSH_BATCH_SIZE);
        this.flushIntervalMs = configuration.getInteger(ExecutionConfigKeys.STATS_METRIC_FLUSH_INTERVAL_MS);
        this.kvStore = this.createKvStore(configuration);
        this.metricQueue = new LinkedBlockingQueue<Tuple<String, Object>>(1024);
        int threadNum = configuration.getInteger(ExecutionConfigKeys.STATS_METRIC_FLUSH_THREADS);
        this.executorService = new ThreadPoolExecutor(threadNum, threadNum, 30L, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(threadNum), ThreadUtil.namedThreadFactory((boolean)true, (String)"stats-flusher"));
        for (int i = 0; i < threadNum; ++i) {
            this.executorService.submit(new MetricFlushTask());
        }
        this.running = true;
    }

    private IKVStore<String, String> createKvStore(Configuration configuration) {
        String namespace = DEFAULT_NAMESPACE;
        if (configuration.contains(ExecutionConfigKeys.SYSTEM_META_TABLE)) {
            namespace = configuration.getString(ExecutionConfigKeys.SYSTEM_META_TABLE);
        }
        StoreContext storeContext = new StoreContext(namespace);
        storeContext.withKeySerializer((IKeySerializer)new DefaultKVSerializer(String.class, String.class));
        storeContext.withConfig(configuration);
        String storeType = configuration.getString(ExecutionConfigKeys.STATS_METRIC_STORE_TYPE);
        IStoreBuilder builder = StoreBuilderFactory.build((String)storeType);
        IKVStore kvStore = (IKVStore)builder.getStore(DataModel.KV, configuration);
        kvStore.init(storeContext);
        LOGGER.info("create stats store with type:{} namespace:{}", (Object)storeType, (Object)namespace);
        return kvStore;
    }

    @Override
    public void addMetric(String key, Object value) {
        Tuple tuple = Tuple.of((Object)key, (Object)value);
        boolean result = this.metricQueue.offer((Tuple<String, Object>)tuple);
        while (!result) {
            Tuple<String, Object> expired = this.metricQueue.poll();
            if (expired != null) {
                LOGGER.warn("discard metric: {} due to capacity limit", expired.getF0());
            }
            result = this.metricQueue.offer((Tuple<String, Object>)tuple);
        }
    }

    @Override
    public void close() {
        if (!this.running) {
            return;
        }
        if (this.executorService != null) {
            this.executorService.shutdown();
        }
        this.running = false;
    }

    public class MetricFlushTask
    implements Runnable {
        private int flushSize = 0;
        private final Queue<Tuple<String, Object>> buffers = new LinkedList<Tuple<String, Object>>();

        @Override
        public void run() {
            while (true) {
                try {
                    while (true) {
                        this.fillBuffers();
                        if (this.flushSize > 0) {
                            this.doFlush();
                        }
                        SleepUtils.sleepMilliSecond((long)AsyncKvStoreWriter.this.flushIntervalMs);
                    }
                }
                catch (Throwable e) {
                    LOGGER.warn("flush stats metrics failed:{}", (Object)e.getMessage(), (Object)e);
                    continue;
                }
                break;
            }
        }

        private void fillBuffers() {
            Tuple tuple;
            int count;
            for (count = 0; count < AsyncKvStoreWriter.this.batchFlushSize && (tuple = (Tuple)AsyncKvStoreWriter.this.metricQueue.poll()) != null; ++count) {
                this.buffers.add((Tuple<String, Object>)tuple);
            }
            this.flushSize = count;
        }

        private void doFlush() {
            try {
                while (!this.buffers.isEmpty()) {
                    Tuple<String, Object> tuple = this.buffers.poll();
                    AsyncKvStoreWriter.this.kvStore.put(tuple.f0, (Object)JSON.toJSONString((Object)tuple.f1));
                }
                AsyncKvStoreWriter.this.kvStore.flush();
            }
            catch (Throwable e) {
                LOGGER.warn("discard {} metrics due to: {}", new Object[]{this.flushSize, e.getMessage(), e});
            }
            finally {
                this.flushSize = 0;
            }
        }
    }
}

