/*
 * Decompiled with CFR 0.152.
 */
package org.hswebframework.web.cache.supports;

import java.time.Duration;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Supplier;
import lombok.Generated;
import org.hswebframework.web.cache.ReactiveCache;
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.CoreSubscriber;
import reactor.core.Disposable;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.MonoOperator;
import reactor.core.publisher.Sinks;
import reactor.util.context.Context;
import reactor.util.context.ContextView;

public abstract class AbstractReactiveCache<E>
implements ReactiveCache<E> {
    @Generated
    private static final Logger log = LoggerFactory.getLogger(AbstractReactiveCache.class);
    static final Sinks.EmitFailureHandler emitFailureHandler = Sinks.EmitFailureHandler.busyLooping((Duration)Duration.ofSeconds(30L));
    private final Map<Object, CacheLoader> cacheLoading = new ConcurrentHashMap<Object, CacheLoader>();

    protected abstract Mono<Object> getNow(Object var1);

    public abstract Mono<Void> putNow(Object var1, Object var2);

    @Override
    public final Mono<E> getMono(Object key) {
        return this.cacheLoading.computeIfAbsent(key, _key -> new CacheLoader(this, _key, this.getNow(_key))).onErrorResume(err -> this.handleLoaderError(key, (Throwable)err));
    }

    @Override
    public final Mono<E> getMono(Object key, Supplier<Mono<E>> loader) {
        return Mono.deferContextual(ctx -> {
            CacheLoader cacheLoader = this.cacheLoading.compute(key, (arg_0, arg_1) -> this.lambda$getMono$2((Supplier)loader, ctx, arg_0, arg_1));
            return cacheLoader;
        }).onErrorResume(err -> this.handleLoaderError(key, (Throwable)err));
    }

    @Override
    public final Flux<E> getFlux(Object key) {
        return this.cacheLoading.computeIfAbsent(key, _key -> new CacheLoader(this, _key, this.getNow(_key))).flatMapIterable(e -> (List)e).onErrorResume(err -> this.handleLoaderError(key, (Throwable)err));
    }

    @Override
    public final Flux<E> getFlux(Object key, Supplier<Flux<E>> loader) {
        return Flux.deferContextual(ctx -> {
            CacheLoader cacheLoader = this.cacheLoading.compute(key, (arg_0, arg_1) -> this.lambda$getFlux$8((Supplier)loader, ctx, arg_0, arg_1));
            return cacheLoader.flatMapIterable(e -> (List)e);
        }).onErrorResume(err -> this.handleLoaderError(key, (Throwable)err));
    }

    protected Mono<E> handleLoaderError(Object key, Throwable err) {
        log.warn("load cache error,key:{},evict it.", key, (Object)err);
        return this.evict(key).then(Mono.empty());
    }

    @Override
    public final Mono<Void> put(Object key, Publisher<E> data) {
        if (data instanceof Mono) {
            return Mono.from(data).flatMap(e -> this.putNow(key, e));
        }
        return Flux.from(data).collectList().flatMap(e -> this.putNow(key, e));
    }

    @Override
    public abstract Mono<Void> evict(Object var1);

    @Override
    public Flux<E> getAll(Object ... keys) {
        return Flux.just((Object[])keys).flatMap(this::getMono);
    }

    @Override
    public abstract Mono<Void> evictAll(Iterable<?> var1);

    @Override
    public abstract Mono<Void> clear();

    private /* synthetic */ CacheLoader lambda$getFlux$8(Supplier loader, ContextView ctx, Object _key, CacheLoader old) {
        CacheLoader cl = new CacheLoader(this, _key, this.getNow(_key));
        cl.defaultValue((Mono<? extends Object>)((Flux)loader.get()).collectList(), ctx);
        return cl;
    }

    private /* synthetic */ CacheLoader lambda$getMono$2(Supplier loader, ContextView ctx, Object _key, CacheLoader old) {
        CacheLoader cl = new CacheLoader(this, _key, this.getNow(_key));
        cl.defaultValue((Mono<? extends Object>)((Mono)loader.get()), ctx);
        return cl;
    }

    protected static class CacheLoader
    extends MonoOperator<Object, Object> {
        private final AbstractReactiveCache<?> parent;
        private final Object key;
        private Mono<? extends Object> defaultValue;
        private final Sinks.One<Object> holder = Sinks.one();
        private volatile Disposable loading;

        protected CacheLoader(AbstractReactiveCache<?> parent, Object key, Mono<? extends Object> source) {
            super(source.cache());
            this.parent = parent;
            this.key = key;
        }

        protected void defaultValue(Mono<? extends Object> defaultValue, ContextView context) {
            if (this.defaultValue != null) {
                return;
            }
            this.defaultValue = defaultValue;
            this.tryLoad(context);
        }

        private void tryLoad(ContextView context) {
            if (this.holder.currentSubscriberCount() == 1 && this.loading == null) {
                Mono source = this.source;
                if (this.defaultValue != null) {
                    source = source.switchIfEmpty(this.defaultValue.flatMap(val -> this.parent.putNow(this.key, val).thenReturn(val)));
                }
                this.loading = source.subscribe(val -> {
                    this.complete();
                    this.holder.emitValue(val, emitFailureHandler);
                }, err -> {
                    this.complete();
                    this.holder.emitError(err, emitFailureHandler);
                }, () -> {
                    this.complete();
                    this.holder.emitEmpty(emitFailureHandler);
                }, Context.of((ContextView)context));
            }
        }

        public void subscribe(CoreSubscriber<? super Object> actual) {
            this.holder.asMono().subscribe(actual);
            this.tryLoad((ContextView)actual.currentContext());
        }

        private void complete() {
            this.parent.cacheLoading.remove(this.key, (Object)this);
        }
    }
}

