/*
 * Decompiled with CFR 0.152.
 */
package io.gravitee.gateway.services.ratelimit;

import io.gravitee.gateway.services.ratelimit.BaseSchedulerProvider;
import io.gravitee.gateway.services.ratelimit.LocalRateLimit;
import io.gravitee.gateway.services.ratelimit.LocalRateLimitRepository;
import io.gravitee.repository.ratelimit.api.RateLimitRepository;
import io.gravitee.repository.ratelimit.model.RateLimit;
import io.reactivex.rxjava3.core.Completable;
import io.reactivex.rxjava3.core.MaybeSource;
import io.reactivex.rxjava3.core.Observable;
import io.reactivex.rxjava3.core.Single;
import io.reactivex.rxjava3.core.SingleSource;
import io.reactivex.rxjava3.disposables.Disposable;
import io.reactivex.rxjava3.functions.BiFunction;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.function.Supplier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class AsyncRateLimitRepository
implements RateLimitRepository<RateLimit> {
    private final Logger logger = LoggerFactory.getLogger(AsyncRateLimitRepository.class);
    private LocalRateLimitRepository localCacheRateLimitRepository;
    private RateLimitRepository<RateLimit> remoteCacheRateLimitRepository;
    private final Set<String> keys = new CopyOnWriteArraySet<String>();
    private final BaseSchedulerProvider schedulerProvider;
    private final Map<String, Semaphore> locks = new ConcurrentHashMap<String, Semaphore>();

    public AsyncRateLimitRepository(BaseSchedulerProvider schedulerProvider) {
        this.schedulerProvider = schedulerProvider;
    }

    public void initialize() {
        Disposable subscribe = Observable.timer((long)5000L, (TimeUnit)TimeUnit.MILLISECONDS).repeat().subscribe(tick -> this.merge());
    }

    public Single<RateLimit> incrementAndGet(String key, long weight, Supplier<RateLimit> supplier) {
        return this.isLocked(key).subscribeOn(this.schedulerProvider.computation()).andThen((SingleSource)Single.defer(() -> this.localCacheRateLimitRepository.incrementAndGet(key, weight, () -> AsyncRateLimitRepository.lambda$incrementAndGet$1((Supplier)supplier)).map(localRateLimit -> {
            this.keys.add(localRateLimit.getKey());
            return localRateLimit;
        })));
    }

    void merge() {
        if (!this.keys.isEmpty()) {
            this.keys.forEach(new Consumer<String>(){

                @Override
                public void accept(String key) {
                    AsyncRateLimitRepository.this.lock(key).andThen((MaybeSource)AsyncRateLimitRepository.this.localCacheRateLimitRepository.get(key).flatMapSingle(localRateLimit -> AsyncRateLimitRepository.this.remoteCacheRateLimitRepository.incrementAndGet(key, localRateLimit.getLocal(), () -> localRateLimit)).zipWith(AsyncRateLimitRepository.this.localCacheRateLimitRepository.get(key), (BiFunction)new BiFunction<RateLimit, LocalRateLimit, LocalRateLimit>(){

                        public LocalRateLimit apply(RateLimit rateLimit, LocalRateLimit localRateLimit) throws Exception {
                            localRateLimit.setCounter(rateLimit.getCounter());
                            localRateLimit.setLocal(0L);
                            return localRateLimit;
                        }
                    }).flatMapSingle(rateLimit -> AsyncRateLimitRepository.this.localCacheRateLimitRepository.save((LocalRateLimit)((Object)rateLimit))).doAfterTerminate(() -> AsyncRateLimitRepository.this.unlock(key)).doOnError(throwable -> AsyncRateLimitRepository.this.logger.error("An unexpected error occurs while refreshing asynchronous rate-limit", throwable))).subscribe();
                }
            });
            this.keys.clear();
        }
    }

    private Completable isLocked(String key) {
        return Completable.create(emitter -> {
            Semaphore sem = this.locks.get(key);
            if (sem == null) {
                emitter.onComplete();
            } else {
                boolean acquired = false;
                while (!acquired) {
                    acquired = sem.tryAcquire();
                }
                sem.release();
            }
            emitter.onComplete();
        });
    }

    private Completable lock(String key) {
        return Completable.create(emitter -> {
            Semaphore sem = this.locks.computeIfAbsent(key, key1 -> new Semaphore(1));
            boolean acquired = false;
            while (!acquired) {
                acquired = sem.tryAcquire();
            }
            emitter.onComplete();
        });
    }

    private void unlock(String key) {
        Semaphore lock = this.locks.get(key);
        if (lock != null) {
            lock.release();
        }
    }

    public void setLocalCacheRateLimitRepository(LocalRateLimitRepository localCacheRateLimitRepository) {
        this.localCacheRateLimitRepository = localCacheRateLimitRepository;
    }

    public void setRemoteCacheRateLimitRepository(RateLimitRepository<RateLimit> remoteCacheRateLimitRepository) {
        this.remoteCacheRateLimitRepository = remoteCacheRateLimitRepository;
    }

    private static /* synthetic */ LocalRateLimit lambda$incrementAndGet$1(Supplier supplier) {
        return new LocalRateLimit((RateLimit)supplier.get());
    }
}

