/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.reactive.client.internal.adapter;

import java.util.concurrent.CompletableFuture;
import java.util.function.BiFunction;
import java.util.function.Supplier;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.reactive.client.adapter.ProducerCacheProvider;
import org.apache.pulsar.reactive.client.api.ReactiveMessageSenderCache;
import org.apache.pulsar.reactive.client.internal.adapter.AdapterImplementationFactory;
import org.apache.pulsar.reactive.client.internal.adapter.ProducerCacheEntry;
import org.apache.pulsar.reactive.client.internal.adapter.ProducerCacheKey;
import org.apache.pulsar.reactive.client.internal.api.PublisherTransformer;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

class ProducerCache
implements ReactiveMessageSenderCache {
    private final ProducerCacheProvider cacheProvider;

    ProducerCache(ProducerCacheProvider cacheProvider) {
        this.cacheProvider = cacheProvider;
    }

    static <T> CompletableFuture<ProducerCacheEntry> createCacheEntry(Mono<Producer<T>> producerMono, Supplier<PublisherTransformer> producerActionTransformer) {
        return producerMono.map(producer -> new ProducerCacheEntry((Producer<?>)producer, producerActionTransformer)).toFuture();
    }

    private <T> Mono<ProducerCacheEntry> getProducerCacheEntry(ProducerCacheKey cacheKey, Mono<Producer<T>> producerMono, Supplier<PublisherTransformer> producerActionTransformer) {
        return AdapterImplementationFactory.adaptPulsarFuture(() -> this.cacheProvider.getOrCreateCachedEntry(cacheKey, arg_0 -> ProducerCache.lambda$getProducerCacheEntry$1(producerMono, (Supplier)producerActionTransformer, arg_0))).flatMap(producerCacheEntry -> producerCacheEntry.recreateIfClosed(producerMono));
    }

    <T, R> Mono<R> usingCachedProducer(ProducerCacheKey cacheKey, Mono<Producer<T>> producerMono, Supplier<PublisherTransformer> producerActionTransformer, BiFunction<Producer<T>, PublisherTransformer, Mono<R>> usingProducerAction) {
        return Mono.usingWhen(this.leaseCacheEntry(cacheKey, producerMono, producerActionTransformer), producerCacheEntry -> (Mono)usingProducerAction.apply(producerCacheEntry.getProducer(), producerCacheEntry.getProducerActionTransformer()), this::returnCacheEntry);
    }

    private Mono<Object> returnCacheEntry(ProducerCacheEntry producerCacheEntry) {
        return Mono.fromRunnable(producerCacheEntry::releaseLease);
    }

    private <T> Mono<ProducerCacheEntry> leaseCacheEntry(ProducerCacheKey cacheKey, Mono<Producer<T>> producerMono, Supplier<PublisherTransformer> producerActionTransformer) {
        return this.getProducerCacheEntry(cacheKey, producerMono, producerActionTransformer).doOnNext(ProducerCacheEntry::activateLease);
    }

    <T, R> Flux<R> usingCachedProducerMany(ProducerCacheKey cacheKey, Mono<Producer<T>> producerMono, Supplier<PublisherTransformer> producerActionTransformer, BiFunction<Producer<T>, PublisherTransformer, Flux<R>> usingProducerAction) {
        return Flux.usingWhen(this.leaseCacheEntry(cacheKey, producerMono, producerActionTransformer), producerCacheEntry -> (Publisher)usingProducerAction.apply(producerCacheEntry.getProducer(), producerCacheEntry.getProducerActionTransformer()), this::returnCacheEntry);
    }

    public void close() throws Exception {
        if (this.cacheProvider != null) {
            this.cacheProvider.close();
        }
    }

    private static /* synthetic */ CompletableFuture lambda$getProducerCacheEntry$1(Mono producerMono, Supplier producerActionTransformer, ProducerCacheKey __) {
        return ProducerCache.createCacheEntry(producerMono, producerActionTransformer);
    }
}

