/*
 * Decompiled with CFR 0.152.
 */
package com.antgroup.geaflow.stats.sink;

import com.alibaba.fastjson.JSON;
import com.antgroup.geaflow.common.config.Configuration;
import com.antgroup.geaflow.common.config.keys.ExecutionConfigKeys;
import com.antgroup.geaflow.state.DataModel;
import com.antgroup.geaflow.state.serializer.DefaultKVSerializer;
import com.antgroup.geaflow.state.serializer.IKeySerializer;
import com.antgroup.geaflow.stats.sink.IStatsWriter;
import com.antgroup.geaflow.store.IStoreBuilder;
import com.antgroup.geaflow.store.api.key.IKVStore;
import com.antgroup.geaflow.store.api.key.StoreBuilderFactory;
import com.antgroup.geaflow.store.context.StoreContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class SyncKvStoreWriter
implements IStatsWriter {
    private static final Logger LOGGER = LoggerFactory.getLogger(SyncKvStoreWriter.class);
    private static final String DEFAULT_NAMESPACE = "default";
    private final IKVStore<String, String> kvStore;

    public SyncKvStoreWriter(Configuration configuration) {
        this.kvStore = this.createKvStore(configuration);
    }

    private IKVStore<String, String> createKvStore(Configuration configuration) {
        String namespace = DEFAULT_NAMESPACE;
        if (configuration.contains(ExecutionConfigKeys.SYSTEM_META_TABLE)) {
            namespace = configuration.getString(ExecutionConfigKeys.SYSTEM_META_TABLE);
        }
        StoreContext storeContext = new StoreContext(namespace);
        storeContext.withKeySerializer((IKeySerializer)new DefaultKVSerializer(String.class, String.class));
        storeContext.withConfig(configuration);
        String storeType = configuration.getString(ExecutionConfigKeys.STATS_METRIC_STORE_TYPE);
        IStoreBuilder builder = StoreBuilderFactory.build((String)storeType);
        IKVStore kvStore = (IKVStore)builder.getStore(DataModel.KV, configuration);
        kvStore.init(storeContext);
        LOGGER.info("create stats store with type:{} namespace:{}", (Object)storeType, (Object)namespace);
        return kvStore;
    }

    @Override
    public void addMetric(String key, Object value) {
        this.kvStore.put((Object)key, (Object)JSON.toJSONString((Object)value));
        this.kvStore.flush();
    }

    @Override
    public void close() {
    }
}

