/*
 * Decompiled with CFR 0.152.
 */
package io.streamnative.pulsar.handlers.kop;

import io.streamnative.pulsar.handlers.kop.KafkaRequestHandler;
import io.streamnative.pulsar.handlers.kop.KafkaTopicConsumerManager;
import io.streamnative.pulsar.handlers.kop.KafkaTopicConsumerManagerCache;
import io.streamnative.pulsar.handlers.kop.KopBrokerLookupManager;
import java.net.SocketAddress;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import org.apache.pulsar.broker.service.BrokerService;
import org.apache.pulsar.broker.service.Producer;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public final class KafkaTopicManagerSharedState {
    private static final Logger log = LoggerFactory.getLogger(KafkaTopicManagerSharedState.class);
    private final KafkaTopicConsumerManagerCache kafkaTopicConsumerManagerCache = new KafkaTopicConsumerManagerCache();
    private static final long checkPeriodMillis = 60000L;
    private static final long expirePeriodMillis = 120000L;
    private static volatile ScheduledFuture<?> cursorExpireTask = null;
    private final ConcurrentHashMap<String, CompletableFuture<Optional<PersistentTopic>>> topics = new ConcurrentHashMap();
    private final ConcurrentHashMap<String, Producer> references = new ConcurrentHashMap();

    public KafkaTopicManagerSharedState(BrokerService brokerService) {
        this.initializeCursorExpireTask((ScheduledExecutorService)brokerService.executor());
    }

    private void initializeCursorExpireTask(ScheduledExecutorService executor) {
        if (executor == null) {
            return;
        }
        cursorExpireTask = executor.scheduleWithFixedDelay(() -> {
            long current = System.currentTimeMillis();
            this.kafkaTopicConsumerManagerCache.forEach(future -> {
                if (future != null && future.isDone() && !future.isCompletedExceptionally()) {
                    ((KafkaTopicConsumerManager)future.join()).deleteExpiredCursor(current, 120000L);
                }
            });
        }, 60000L, 60000L, TimeUnit.MILLISECONDS);
    }

    public void close() {
        this.cancelCursorExpireTask();
        this.kafkaTopicConsumerManagerCache.close();
        this.references.clear();
        this.topics.clear();
    }

    private void cancelCursorExpireTask() {
        if (cursorExpireTask != null) {
            cursorExpireTask.cancel(true);
            cursorExpireTask = null;
        }
    }

    private void removePersistentTopicAndReferenceProducer(String topicName) {
        CompletableFuture<Optional<PersistentTopic>> topicFuture = this.topics.remove(topicName);
        Producer producer = this.references.remove(topicName);
        if (topicFuture == null) {
            KopBrokerLookupManager.removeTopicManagerCache(topicName);
            return;
        }
        ((CompletableFuture)topicFuture.thenAccept(persistentTopic -> {
            if (producer != null && persistentTopic.isPresent()) {
                try {
                    ((PersistentTopic)persistentTopic.get()).removeProducer(producer);
                }
                catch (IllegalArgumentException ignored) {
                    log.error("[{}] The producer's topic ({}) doesn't match the current PersistentTopic", (Object)topicName, (Object)(producer.getTopic() == null ? "null" : producer.getTopic().getName()));
                }
            }
        })).exceptionally(e -> {
            log.error("Failed to get topic '{}' in removeTopicAndReferenceProducer", (Object)topicName, e);
            return null;
        });
    }

    public void handlerKafkaRequestHandlerClosed(SocketAddress remoteAddress, KafkaRequestHandler requestHandler) {
        try {
            this.kafkaTopicConsumerManagerCache.removeAndCloseByAddress(remoteAddress);
            ((ConcurrentHashMap.KeySetView)this.topics.keySet()).forEach(topicName -> {
                if (log.isDebugEnabled()) {
                    log.debug("[{}] remove producer {} for topic {} at close()", new Object[]{requestHandler.ctx.channel(), this.references.get(topicName), topicName});
                }
                this.removePersistentTopicAndReferenceProducer((String)topicName);
            });
        }
        catch (Exception e) {
            log.error("[{}] Failed to close KafkaTopicManager. exception:", (Object)requestHandler.ctx.channel(), (Object)e);
        }
    }

    public void deReference(String topicName) {
        try {
            KopBrokerLookupManager.removeTopicManagerCache(topicName);
            this.kafkaTopicConsumerManagerCache.removeAndCloseByTopic(topicName);
            this.removePersistentTopicAndReferenceProducer(topicName);
        }
        catch (Exception e) {
            log.error("Failed to close reference for individual topic {}. exception:", (Object)topicName, (Object)e);
        }
    }

    public KafkaTopicConsumerManagerCache getKafkaTopicConsumerManagerCache() {
        return this.kafkaTopicConsumerManagerCache;
    }

    public ConcurrentHashMap<String, CompletableFuture<Optional<PersistentTopic>>> getTopics() {
        return this.topics;
    }

    public ConcurrentHashMap<String, Producer> getReferences() {
        return this.references;
    }
}

