/*
 * Decompiled with CFR 0.152.
 */
package org.jetlinks.supports.config;

import com.google.common.collect.Maps;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import java.util.function.Consumer;
import java.util.function.Function;
import javax.annotation.Nonnull;
import lombok.Generated;
import org.hswebframework.web.exception.BusinessException;
import org.jetlinks.core.Value;
import org.jetlinks.core.Values;
import org.jetlinks.core.cluster.ClusterCache;
import org.jetlinks.core.config.ConfigKey;
import org.jetlinks.core.config.ConfigStorage;
import org.jetlinks.core.device.DeviceConfigKey;
import org.jetlinks.core.utils.Reactors;
import org.jetlinks.supports.config.CacheNotify;
import org.jetlinks.supports.config.EventBusStorageManager;
import org.reactivestreams.Subscription;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.util.CollectionUtils;
import org.springframework.util.ConcurrentReferenceHashMap;
import reactor.core.CoreSubscriber;
import reactor.core.Disposable;
import reactor.core.Scannable;
import reactor.core.publisher.BaseSubscriber;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.SignalType;
import reactor.core.publisher.Sinks;
import reactor.util.context.Context;

public class LocalCacheClusterConfigStorage
implements ConfigStorage {
    @Generated
    private static final Logger log = LoggerFactory.getLogger(LocalCacheClusterConfigStorage.class);
    private static final AtomicReferenceFieldUpdater<Cache, Mono> CACHE_REF = AtomicReferenceFieldUpdater.newUpdater(Cache.class, Mono.class, "ref");
    private static final AtomicReferenceFieldUpdater<Cache, Value> CACHED = AtomicReferenceFieldUpdater.newUpdater(Cache.class, Value.class, "cached");
    private static final AtomicReferenceFieldUpdater<Cache, Sinks.One> AWAIT = AtomicReferenceFieldUpdater.newUpdater(Cache.class, Sinks.One.class, "await");
    private static final AtomicIntegerFieldUpdater<Cache> CACHE_VERSION = AtomicIntegerFieldUpdater.newUpdater(Cache.class, "version");
    private static final AtomicReferenceFieldUpdater<Cache, Disposable> CACHE_LOADING = AtomicReferenceFieldUpdater.newUpdater(Cache.class, Disposable.class, "loading");
    private static final Map<Value, Value> shared = new ConcurrentReferenceHashMap(1024);
    private static final Set<String> sharedKey = ConcurrentHashMap.newKeySet();
    public static final Value NULL = Value.simple(null);
    private final Map<String, Cache> caches;
    final String id;
    private final EventBusStorageManager manager;
    private final ClusterCache<String, Object> clusterCache;
    private final long expires;
    private final Consumer<LocalCacheClusterConfigStorage> doOnClear;
    Throwable lastError;

    LocalCacheClusterConfigStorage(String id, EventBusStorageManager manager, ClusterCache<String, Object> clusterCache, long expires, Consumer<LocalCacheClusterConfigStorage> doOnClear, Map<String, Cache> cacheContainer) {
        this.id = id;
        this.manager = manager;
        this.clusterCache = clusterCache;
        this.expires = expires;
        this.doOnClear = doOnClear;
        this.caches = cacheContainer;
    }

    LocalCacheClusterConfigStorage(String id, EventBusStorageManager manager, ClusterCache<String, Object> clusterCache, long expires, Consumer<LocalCacheClusterConfigStorage> doOnClear) {
        this(id, manager, clusterCache, expires, doOnClear, new ConcurrentHashMap<String, Cache>());
    }

    boolean isEmpty() {
        return this.caches.isEmpty();
    }

    void cleanup() {
        if (this.expires > 0L) {
            this.caches.forEach((k, v) -> {
                if (v.isExpired()) {
                    this.caches.remove(k, v);
                }
            });
        }
    }

    void error(Throwable err) {
        this.lastError = err;
    }

    private Cache createCache(String key) {
        return new Cache(key);
    }

    private Cache getOrCreateCache(String key) {
        return this.caches.computeIfAbsent(key, this::createCache);
    }

    public Mono<Value> getConfig(String key) {
        return this.getConfig(key, (Mono<Object>)Mono.empty());
    }

    public Mono<Value> getConfig(String key, Mono<Object> loader) {
        return this.getOrCreateCache(key).getRef(loader);
    }

    public Mono<Values> getConfigs(Collection<String> keys) {
        int hits = 0;
        HashMap loaded = Maps.newHashMapWithExpectedSize((int)keys.size());
        for (String key : keys) {
            Cache local = this.getOrCreateCache(key);
            Value cached = local.getCached();
            if (cached == null) continue;
            ++hits;
            loaded.put(key, cached.get());
        }
        Values wrap = Values.of((Map)Maps.filterValues((Map)loaded, Objects::nonNull));
        if (hits == keys.size()) {
            return Mono.just((Object)wrap);
        }
        HashSet<String> needLoadKeys = new HashSet<String>(keys);
        needLoadKeys.removeAll(loaded.keySet());
        if (needLoadKeys.isEmpty()) {
            return Mono.just((Object)wrap);
        }
        HashMap versions = Maps.newHashMapWithExpectedSize((int)needLoadKeys.size());
        for (String needLoadKey : needLoadKeys) {
            Cache cache = this.caches.get(needLoadKey);
            if (cache == null) continue;
            versions.put(needLoadKey, cache.version);
        }
        return new MultiCacheLoaderMono((Flux<? extends Map.Entry<String, Object>>)this.clusterCache.get(needLoadKeys), needLoadKeys, versions, loaded, wrap);
    }

    private void updateValue(Cache cache, int version, Object value) {
        if (cache != null && CACHE_VERSION.get(cache) == version) {
            cache.setValue(version, value);
        }
    }

    public Mono<Boolean> setConfigs(Map<String, Object> values) {
        if (CollectionUtils.isEmpty(values)) {
            return Reactors.ALWAYS_TRUE;
        }
        values.forEach(this::trySetLocalValue);
        return this.clusterCache.putAll(values).then(this.notify(CacheNotify.expires(this.id, values.keySet()))).then(Reactors.ALWAYS_TRUE);
    }

    public Mono<Boolean> setConfig(String key, Object value) {
        if (key == null) {
            return Reactors.ALWAYS_FALSE;
        }
        if (value == null) {
            return this.remove(key);
        }
        this.trySetLocalValue(key, value);
        return this.clusterCache.put((Object)key, value).then(this.notifyRemoveKey(key)).then(Reactors.ALWAYS_TRUE);
    }

    private void trySetLocalValue(String key, Object value) {
        if (value == null) {
            return;
        }
        Cache cache = this.caches.get(key);
        if (cache != null && cache.loading == null) {
            cache.setValue(cache.version, value);
        }
    }

    public Mono<Boolean> remove(String key) {
        return this.clusterCache.remove((Object)key).then(this.notifyRemoveKey(key)).then(Reactors.ALWAYS_TRUE);
    }

    public Mono<Value> getAndRemove(String key) {
        return this.clusterCache.getAndRemove((Object)key).flatMap(res -> this.notify(CacheNotify.expires(this.id, Collections.singleton(key))).thenReturn(res)).map(Value::simple);
    }

    public Mono<Boolean> remove(Collection<String> key) {
        return this.clusterCache.remove(key).then(this.notify(CacheNotify.expires(this.id, key))).then(Reactors.ALWAYS_TRUE);
    }

    public Mono<Boolean> clear() {
        return this.clusterCache.clear().then(this.notify(CacheNotify.clear(this.id))).then(Reactors.ALWAYS_TRUE);
    }

    void clearLocalCache(CacheNotify notify) {
        if (CollectionUtils.isEmpty(notify.getKeys())) {
            this.caches.clear();
        } else {
            for (String key : notify.getKeys()) {
                Cache cache = this.caches.get(key);
                if (cache == null || CACHE_LOADING.get(cache) != null) continue;
                this.caches.remove(key, cache);
            }
        }
        if (notify.isClear() && this.doOnClear != null) {
            this.doOnClear.accept(this);
        }
    }

    Mono<Void> notify(CacheNotify notify) {
        return Mono.defer(() -> {
            this.clearLocalCache(notify);
            return this.manager.doNotify(notify);
        });
    }

    Mono<Void> notifyRemoveKey(String key) {
        return this.notify(CacheNotify.expires(this.id, Collections.singleton(key)));
    }

    Map<String, Object> getAll() {
        return Collections.unmodifiableMap(Maps.transformValues(this.caches, Cache::getCachedValue));
    }

    public Mono<Void> refresh() {
        return this.notify(CacheNotify.expiresAll(this.id));
    }

    public Mono<Void> refresh(Collection<String> keys) {
        return this.notify(CacheNotify.expires(this.id, keys));
    }

    public static void addSharedKey(ConfigKey<?> ... key) {
        for (ConfigKey<?> configKey : key) {
            sharedKey.add(configKey.getKey());
        }
    }

    public static void addSharedKey(String ... key) {
        sharedKey.addAll(Arrays.asList(key));
    }

    private static Value tryShare(String key, Value val) {
        if (!sharedKey.contains(key)) {
            return val;
        }
        return shared.computeIfAbsent(val, Function.identity());
    }

    public String toString() {
        return this.caches.toString();
    }

    static {
        LocalCacheClusterConfigStorage.addSharedKey(new ConfigKey[]{DeviceConfigKey.productId});
        LocalCacheClusterConfigStorage.addSharedKey(new ConfigKey[]{DeviceConfigKey.protocol});
        LocalCacheClusterConfigStorage.addSharedKey(new ConfigKey[]{DeviceConfigKey.connectionServerId});
        LocalCacheClusterConfigStorage.addSharedKey("state");
        LocalCacheClusterConfigStorage.addSharedKey("productName");
    }

    public class Cache {
        final String key;
        long t;
        volatile int version;
        volatile Value cached;
        volatile Mono<Value> ref;
        volatile Sinks.One<Value> await;
        volatile Disposable loading;

        public String toString() {
            return this.key + ":" + this.version + "(" + this.cached + ")";
        }

        public Cache(String key) {
            this.key = key;
            this.updateTime();
        }

        boolean isExpired() {
            return LocalCacheClusterConfigStorage.this.expires > 0L && System.currentTimeMillis() - this.t > LocalCacheClusterConfigStorage.this.expires;
        }

        Mono<Value> getRef(Mono<Object> loader) {
            Mono ref = CACHE_REF.get(this);
            if (this.isExpired() || ref == null) {
                return this.reload(loader);
            }
            this.updateTime();
            Value cached = CACHED.get(this);
            if (cached != null && !(ref instanceof Callable)) {
                if (CACHE_LOADING.get(this) == null && AWAIT.get(this) != null) {
                    this.error((Throwable)new BusinessException.NoStackTrace("conflict cache load"));
                }
                return cached.get() == null ? Mono.empty() : Mono.just((Object)cached);
            }
            return ref;
        }

        public Value getCached() {
            if (this.isExpired()) {
                return null;
            }
            this.updateTime();
            return this.cached;
        }

        public Object getCachedValue() {
            this.updateTime();
            Value cached = this.cached;
            return cached == null ? null : cached.get();
        }

        void updateTime() {
            if (LocalCacheClusterConfigStorage.this.expires > 0L) {
                this.t = System.currentTimeMillis();
            }
        }

        boolean setValue(int version, Object value) {
            return this.setValue(this.await(), CACHE_LOADING.get(this), version, value);
        }

        boolean setValue(Sinks.One<Value> await, Disposable loading, int version, Object value) {
            return this.setValue(await, loading, version, value == null ? null : Value.simple((Object)value));
        }

        boolean updateValue(int version, Value value) {
            Value cached = CACHED.get(this);
            if (CACHE_VERSION.compareAndSet(this, version, version + 1)) {
                if (value != null) {
                    if (CACHED.compareAndSet(this, cached, value = LocalCacheClusterConfigStorage.tryShare(this.key, value))) {
                        CACHE_REF.set(this, Mono.just((Object)value));
                    }
                } else if (CACHED.compareAndSet(this, cached, NULL)) {
                    CACHE_REF.set(this, Mono.empty());
                }
                return true;
            }
            if (log.isDebugEnabled()) {
                log.debug("local cache [id=`{},`key=`{}` value={}] version not match, expect:{},actual:{}", new Object[]{LocalCacheClusterConfigStorage.this.id, this.key, value == null ? null : value.get(), version, this.version});
            }
            CACHE_REF.set(this, null);
            return false;
        }

        boolean setValue(Sinks.One<Value> await, Disposable loading, int version, Value value) {
            this.updateTime();
            boolean success = this.updateValue(version, value);
            Value cached = CACHED.get(this);
            this.dispose(await, loading, cached == null ? value : cached);
            return success;
        }

        Mono<Value> reload(Mono<Object> loader) {
            Sinks.One await;
            do {
                if ((await = AWAIT.get(this)) == null) continue;
                return await.asMono();
            } while (!AWAIT.compareAndSet(this, null, await = Sinks.one()));
            Mono ref = await.asMono();
            CACHE_REF.set(this, ref);
            Loader _loader = new Loader(this.version);
            Disposable old = CACHE_LOADING.getAndSet(this, (Disposable)_loader);
            if (old != null) {
                old.dispose();
            }
            LocalCacheClusterConfigStorage.this.clusterCache.get((Object)this.key).switchIfEmpty(loader).subscribe((CoreSubscriber)_loader);
            return ref;
        }

        void clear(Value value) {
            Value val = CACHED.get(this);
            if (val == null) {
                val = value;
            }
            this.dispose(val);
            CACHE_REF.set(this, null);
            CACHE_VERSION.incrementAndGet(this);
        }

        void dispose(Sinks.One<Value> sink, Disposable loading, Value value) {
            if (sink != null) {
                AWAIT.compareAndSet(this, sink, null);
                if (value == null || value.get() == null) {
                    sink.emitEmpty(Reactors.emitFailureHandler());
                } else {
                    sink.emitValue((Object)value, Reactors.emitFailureHandler());
                }
            }
            if (loading != null) {
                CACHE_LOADING.compareAndSet(this, loading, null);
                loading.dispose();
            }
        }

        void dispose(Value value) {
            this.dispose(this.await(), CACHE_LOADING.get(this), value);
        }

        void error(Throwable error) {
            LocalCacheClusterConfigStorage.this.error(error);
            Value cached = CACHED.get(this);
            Sinks.One await = AWAIT.getAndSet(this, null);
            if (await != null) {
                if (cached != null) {
                    log.warn("load cache [{} {}] failed,use cached value:{} ,ver:{}", new Object[]{LocalCacheClusterConfigStorage.this.id, this.key, cached, this.version, error});
                    if (cached.get() == null) {
                        await.emitEmpty(Reactors.emitFailureHandler());
                    } else {
                        await.emitValue((Object)cached, Reactors.emitFailureHandler());
                    }
                } else {
                    await.emitError(error, Reactors.emitFailureHandler());
                }
            }
            this.clear(cached);
        }

        Sinks.One<Value> await() {
            return AWAIT.get(this);
        }

        private class Loader
        extends BaseSubscriber<Object> {
            private final int loadingVersion;
            private boolean hasValue;

            protected void hookOnSubscribe(@Nonnull Subscription subscription) {
                log.trace("loading cache `{}` key `{}`", (Object)LocalCacheClusterConfigStorage.this.id, (Object)Cache.this.key);
                super.hookOnSubscribe(subscription);
            }

            protected void hookOnNext(@Nonnull Object value) {
                this.hasValue = true;
                if (!Cache.this.setValue(Cache.this.await(), (Disposable)this, this.loadingVersion, value)) {
                    Cache.this.clear(Value.simple((Object)value));
                }
            }

            protected void hookOnComplete() {
                Sinks.One<Value> await = Cache.this.await();
                if (!this.hasValue && await != null && this.loadingVersion == CACHE_VERSION.get(Cache.this)) {
                    Cache.this.setValue(await, (Disposable)this, this.loadingVersion, null);
                }
            }

            protected void hookOnError(@Nonnull Throwable throwable) {
                Cache.this.error(throwable);
            }

            protected void hookFinally(@Nonnull SignalType type) {
                CACHE_LOADING.compareAndSet(Cache.this, (Disposable)this, null);
            }

            @Generated
            public Loader(int loadingVersion) {
                this.loadingVersion = loadingVersion;
            }
        }
    }

    class MultiCacheLoaderMono
    extends Mono<Values> {
        private final Flux<? extends Map.Entry<String, Object>> source;
        private final Set<String> keys;
        private final Map<String, Integer> versions;
        private final Map<String, Object> container;
        private final Values wrapper;

        public void subscribe(@Nonnull CoreSubscriber<? super Values> actual) {
            this.source.subscribe((CoreSubscriber)new MultiCacheLoader(this.keys, this.versions, this.container, actual, this.wrapper));
        }

        @Generated
        public MultiCacheLoaderMono(Flux<? extends Map.Entry<String, Object>> source, Set<String> keys, Map<String, Integer> versions, Map<String, Object> container, Values wrapper) {
            this.source = source;
            this.keys = keys;
            this.versions = versions;
            this.container = container;
            this.wrapper = wrapper;
        }
    }

    class MultiCacheLoader
    extends BaseSubscriber<Map.Entry<String, Object>>
    implements Scannable {
        private final Set<String> keys;
        private final Map<String, Integer> versions;
        private final Map<String, Object> container;
        private final CoreSubscriber<? super Values> actual;
        private final Values wrapper;

        protected void hookOnSubscribe(@Nonnull Subscription subscription) {
            this.actual.onSubscribe((Subscription)this);
        }

        @Nonnull
        public Context currentContext() {
            return this.actual.currentContext();
        }

        protected void hookOnNext(Map.Entry<String, Object> entry) {
            String key = entry.getKey();
            Object value = entry.getValue();
            Cache cache = LocalCacheClusterConfigStorage.this.getOrCreateCache(key);
            int version = this.versions.getOrDefault(key, cache.version);
            LocalCacheClusterConfigStorage.this.updateValue(cache, version, value);
            Object val = cache.getCachedValue();
            if (val == null) {
                val = value;
            }
            this.container.put(key, val);
            if (null != value) {
                this.keys.remove(key);
            }
            this.request(1L);
        }

        protected void hookOnError(@Nonnull Throwable throwable) {
            this.actual.onError(throwable);
        }

        protected void hookOnComplete() {
            if (!this.keys.isEmpty()) {
                for (String needLoadKey : this.keys) {
                    Cache cache = LocalCacheClusterConfigStorage.this.getOrCreateCache(needLoadKey);
                    int version = this.versions.getOrDefault(needLoadKey, cache.version);
                    LocalCacheClusterConfigStorage.this.updateValue(cache, version, null);
                    this.container.put(needLoadKey, cache.getCachedValue());
                }
            }
            this.actual.onNext((Object)this.wrapper);
            this.actual.onComplete();
        }

        public Object scanUnsafe(@Nonnull Scannable.Attr key) {
            if (key == Scannable.Attr.RUN_STYLE) {
                return Scannable.Attr.RunStyle.SYNC;
            }
            if (key == Scannable.Attr.ACTUAL) {
                return this.actual;
            }
            return null;
        }

        @Generated
        public MultiCacheLoader(Set<String> keys, Map<String, Integer> versions, Map<String, Object> container, CoreSubscriber<? super Values> actual, Values wrapper) {
            this.keys = keys;
            this.versions = versions;
            this.container = container;
            this.actual = actual;
            this.wrapper = wrapper;
        }
    }
}

