/*
 * Decompiled with CFR 0.152.
 */
package io.streamnative.pulsar.handlers.kop;

import io.streamnative.pulsar.handlers.kop.InternalProducer;
import io.streamnative.pulsar.handlers.kop.InternalServerCnx;
import io.streamnative.pulsar.handlers.kop.KafkaRequestHandler;
import io.streamnative.pulsar.handlers.kop.KafkaTopicConsumerManager;
import io.streamnative.pulsar.handlers.kop.KafkaTopicLookupService;
import io.streamnative.pulsar.handlers.kop.LookupClient;
import java.net.SocketAddress;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.service.BrokerService;
import org.apache.pulsar.broker.service.Producer;
import org.apache.pulsar.broker.service.Topic;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class KafkaTopicManager {
    private static final Logger log = LoggerFactory.getLogger(KafkaTopicManager.class);
    private final KafkaRequestHandler requestHandler;
    private final BrokerService brokerService;
    private final LookupClient lookupClient;
    private final KafkaTopicLookupService kafkaTopicLookupService;
    private volatile SocketAddress remoteAddress;
    private final InternalServerCnx internalServerCnx;
    private final AtomicBoolean closed = new AtomicBoolean(false);

    KafkaTopicManager(KafkaRequestHandler kafkaRequestHandler) {
        this.requestHandler = kafkaRequestHandler;
        PulsarService pulsarService = kafkaRequestHandler.getPulsarService();
        this.brokerService = pulsarService.getBrokerService();
        this.internalServerCnx = new InternalServerCnx(this.requestHandler);
        this.lookupClient = kafkaRequestHandler.getLookupClient();
        this.kafkaTopicLookupService = new KafkaTopicLookupService(pulsarService.getBrokerService());
    }

    public void setRemoteAddress(SocketAddress remoteAddress) {
        this.internalServerCnx.updateCtx(remoteAddress);
        this.remoteAddress = remoteAddress;
    }

    public CompletableFuture<KafkaTopicConsumerManager> getTopicConsumerManager(String topicName) {
        if (this.closed.get()) {
            if (log.isDebugEnabled()) {
                log.debug("[{}] Return null for getTopicConsumerManager({}) since channel closing", (Object)this.requestHandler.ctx.channel(), (Object)topicName);
            }
            return CompletableFuture.completedFuture(null);
        }
        if (this.remoteAddress == null) {
            log.error("[{}] Try to getTopicConsumerManager({}) while remoteAddress is not set", (Object)this.requestHandler.ctx.channel(), (Object)topicName);
            return CompletableFuture.completedFuture(null);
        }
        return this.requestHandler.getKafkaTopicManagerSharedState().getKafkaTopicConsumerManagerCache().computeIfAbsent(topicName, this.remoteAddress, () -> {
            CompletableFuture tcmFuture = new CompletableFuture();
            this.getTopic(topicName).whenComplete((persistentTopic, throwable) -> {
                if (throwable == null && persistentTopic.isPresent()) {
                    if (log.isDebugEnabled()) {
                        log.debug("[{}] Call getTopicConsumerManager for {}, and create TCM for {}.", new Object[]{this.requestHandler.ctx.channel(), topicName, persistentTopic});
                    }
                    tcmFuture.complete(new KafkaTopicConsumerManager(this.requestHandler, (PersistentTopic)persistentTopic.get()));
                } else {
                    if (throwable != null) {
                        log.error("[{}] Failed to getTopicConsumerManager caused by getTopic '{}' throws {}", new Object[]{this.requestHandler.ctx.channel(), topicName, throwable.getMessage()});
                    } else {
                        log.error("[{}] Failed to getTopicConsumerManager caused by getTopic '{}' returns empty", (Object)this.requestHandler.ctx.channel(), (Object)topicName);
                    }
                    tcmFuture.complete(null);
                }
            });
            return tcmFuture;
        });
    }

    private Producer registerInPersistentTopic(PersistentTopic persistentTopic) {
        InternalProducer producer = new InternalProducer((Topic)persistentTopic, this.internalServerCnx, this.lookupClient.getPulsarClient().newRequestId(), this.brokerService.generateUniqueProducerName());
        if (log.isDebugEnabled()) {
            log.debug("[{}] Register Mock Producer {} into PersistentTopic {}", new Object[]{this.requestHandler.ctx.channel(), producer, persistentTopic.getName()});
        }
        persistentTopic.addProducer((Producer)producer, new CompletableFuture());
        return producer;
    }

    public Optional<Producer> registerProducerInPersistentTopic(String topicName, PersistentTopic persistentTopic) {
        if (this.closed.get()) {
            if (log.isDebugEnabled()) {
                log.debug("[{}] Failed to registerProducerInPersistentTopic for topic '{}'", (Object)this.requestHandler.ctx.channel(), (Object)topicName);
            }
            return Optional.empty();
        }
        return Optional.of(this.requestHandler.getKafkaTopicManagerSharedState().getReferences().computeIfAbsent(topicName, __ -> this.registerInPersistentTopic(persistentTopic)));
    }

    public void close() {
        if (!this.closed.compareAndSet(false, true)) {
            if (log.isDebugEnabled()) {
                log.debug("[{}] Closing TopicManager", (Object)this.requestHandler.ctx.channel());
            }
            return;
        }
        this.requestHandler.getKafkaTopicManagerSharedState().handlerKafkaRequestHandlerClosed(this.remoteAddress, this.requestHandler);
    }

    public CompletableFuture<Optional<PersistentTopic>> getTopic(String topicName) {
        if (this.closed.get()) {
            if (log.isDebugEnabled()) {
                log.debug("[{}] Return null for getTopic({}) since channel is closing", (Object)this.requestHandler.ctx.channel(), (Object)topicName);
            }
            return CompletableFuture.completedFuture(Optional.empty());
        }
        CompletableFuture<Optional<PersistentTopic>> topicCompletableFuture = this.kafkaTopicLookupService.getTopic(topicName, this.requestHandler.ctx.channel());
        this.requestHandler.getKafkaTopicManagerSharedState().getTopics().put(topicName, topicCompletableFuture);
        return topicCompletableFuture;
    }

    public void invalidateCacheForFencedManagerLedgerOnTopic(String fullTopicName) {
        log.info("Invalidating cache for fenced error on topic {} (maybe topic was deleted)", (Object)fullTopicName);
        this.requestHandler.getKafkaTopicManagerSharedState().deReference(fullTopicName);
    }
}

