/*
 * Decompiled with CFR 0.152.
 */
package io.streamnative.pulsar.handlers.kop;

import com.google.common.annotations.VisibleForTesting;
import io.streamnative.pulsar.handlers.kop.KafkaTopicConsumerManager;
import java.net.SocketAddress;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import lombok.NonNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class KafkaTopicConsumerManagerCache {
    private static final Logger log = LoggerFactory.getLogger(KafkaTopicConsumerManagerCache.class);
    private final Map<String, Map<SocketAddress, CompletableFuture<KafkaTopicConsumerManager>>> cache = new ConcurrentHashMap<String, Map<SocketAddress, CompletableFuture<KafkaTopicConsumerManager>>>();

    public CompletableFuture<KafkaTopicConsumerManager> computeIfAbsent(String fullTopicName, SocketAddress remoteAddress, Supplier<CompletableFuture<KafkaTopicConsumerManager>> mappingFunction) {
        return this.cache.computeIfAbsent(fullTopicName, ignored -> new ConcurrentHashMap()).computeIfAbsent(remoteAddress, ignored -> (CompletableFuture)mappingFunction.get());
    }

    public void forEach(Consumer<CompletableFuture<KafkaTopicConsumerManager>> action) {
        this.cache.values().forEach((? super T internalMap) -> internalMap.values().forEach(action));
    }

    private static void closeTcmFuture(CompletableFuture<KafkaTopicConsumerManager> tcmFuture) {
        tcmFuture.thenAccept(tcm -> {
            if (tcm != null) {
                tcm.close();
            }
        });
    }

    public void removeAndCloseByTopic(String fullTopicName) {
        Optional.ofNullable(this.cache.remove(fullTopicName)).ifPresent(map -> map.forEach((? super K remoteAddress, ? super V future) -> {
            if (log.isDebugEnabled()) {
                log.debug("[{}][{}] Remove and close TCM", (Object)fullTopicName, remoteAddress);
            }
            KafkaTopicConsumerManagerCache.closeTcmFuture(future);
        }));
    }

    public void removeAndCloseByAddress(SocketAddress remoteAddress) {
        this.cache.forEach((? super K fullTopicName, ? super V internalMap) -> Optional.ofNullable((CompletableFuture)internalMap.remove(remoteAddress)).ifPresent(future -> {
            if (log.isDebugEnabled()) {
                log.debug("[{}][{}] Remove and close TCM", fullTopicName, (Object)remoteAddress);
            }
            KafkaTopicConsumerManagerCache.closeTcmFuture(future);
        }));
    }

    public void close() {
        this.cache.forEach((? super K fullTopicName, ? super V internalMap) -> internalMap.forEach((? super K remoteAddress, ? super V future) -> {
            try {
                Optional.ofNullable((KafkaTopicConsumerManager)future.get(100L, TimeUnit.MILLISECONDS)).ifPresent(KafkaTopicConsumerManager::close);
            }
            catch (InterruptedException | ExecutionException | TimeoutException e) {
                log.warn("[{}][{}] Failed to get TCM future when trying to close it", fullTopicName, remoteAddress);
            }
        }));
    }

    @VisibleForTesting
    public int getCount() {
        AtomicInteger count = new AtomicInteger(0);
        this.forEach(ignored -> count.incrementAndGet());
        return count.get();
    }

    @VisibleForTesting
    @NonNull
    public List<KafkaTopicConsumerManager> getTopicConsumerManagers(String fullTopicName) {
        return this.cache.getOrDefault(fullTopicName, Collections.emptyMap()).values().stream().map(CompletableFuture::join).collect(Collectors.toList());
    }
}

