/*
 * Decompiled with CFR 0.152.
 */
package org.jetlinks.supports.config;

import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import io.netty.util.ReferenceCountUtil;
import java.util.Map;
import java.util.concurrent.ConcurrentMap;
import java.util.function.Function;
import java.util.function.Supplier;
import org.jetlinks.core.cluster.ClusterCache;
import org.jetlinks.core.cluster.ClusterManager;
import org.jetlinks.core.config.ConfigStorage;
import org.jetlinks.core.config.ConfigStorageManager;
import org.jetlinks.core.event.EventBus;
import org.jetlinks.core.event.Subscription;
import org.jetlinks.supports.cluster.EventBusLocalCache;
import org.jetlinks.supports.config.ClusterConfigStorage;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Mono;

public class EventBusStorageManager
implements ConfigStorageManager {
    private static final Logger log = LoggerFactory.getLogger(EventBusStorageManager.class);
    private final ConcurrentMap<String, ClusterConfigStorage> cache;
    private final Supplier<Cache<String, Object>> cacheSupplier;
    private final Function<String, ClusterConfigStorage> storageBuilder;

    public EventBusStorageManager(ClusterManager clusterManager, EventBus eventBus) {
        this(clusterManager, eventBus, () -> CacheBuilder.newBuilder().build());
    }

    public EventBusStorageManager(ClusterManager clusterManager, EventBus eventBus, Supplier<Cache<String, Object>> supplier) {
        this(clusterManager, eventBus, supplier, true);
    }

    public EventBusStorageManager(ClusterManager clusterManager, EventBus eventBus, Supplier<Cache<String, Object>> supplier, boolean cacheEmpty) {
        this.cache = supplier.get().asMap();
        this.cacheSupplier = supplier;
        this.storageBuilder = id -> new ClusterConfigStorage(new EventBusLocalCache<String, Object>((String)id, eventBus, (ClusterCache<String, Object>)clusterManager.getCache(id), this.cacheSupplier.get(), cacheEmpty));
        eventBus.subscribe(Subscription.builder().subscriberId("event-bus-storage-listener").topics(new String[]{"/_sys/cluster_cache/*/*/*"}).justBroker().build()).subscribe(payload -> {
            try {
                Map vars = payload.getTopicVars("/_sys/cluster_cache/{name}/{type}/{key}");
                ClusterConfigStorage storage = (ClusterConfigStorage)this.cache.get(vars.get("name"));
                if (storage != null) {
                    log.trace("clear local cache :{}", (Object)vars);
                    EventBusLocalCache eventBusLocalCache = (EventBusLocalCache)storage.getCache();
                    eventBusLocalCache.clearLocalCache(vars.get("key"));
                } else {
                    log.trace("ignore clear local cache :{}", (Object)vars);
                }
            }
            catch (Throwable error) {
                log.warn("clearn local cache error", error);
            }
            finally {
                ReferenceCountUtil.safeRelease((Object)payload);
            }
        });
    }

    public Mono<ConfigStorage> getStorage(String id) {
        return Mono.fromSupplier(() -> this.cache.computeIfAbsent(id, this.storageBuilder));
    }
}

