/*
 * Decompiled with CFR 0.152.
 */
package io.streamnative.pulsar.handlers.kop.utils;

import com.google.common.collect.Sets;
import io.streamnative.pulsar.handlers.kop.KafkaServiceConfiguration;
import io.streamnative.pulsar.handlers.kop.utils.KopTopic;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import org.apache.pulsar.client.admin.Clusters;
import org.apache.pulsar.client.admin.Namespaces;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.admin.Tenants;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.RetentionPolicies;
import org.apache.pulsar.common.policies.data.TenantInfo;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class MetadataUtils {
    private static final Logger log = LoggerFactory.getLogger(MetadataUtils.class);
    public static final int MAX_COMPACTION_THRESHOLD = 0x6400000;

    public static String constructOffsetsTopicBaseName(String tenant, KafkaServiceConfiguration conf) {
        return tenant + "/" + conf.getKafkaMetadataNamespace() + "/__consumer_offsets";
    }

    public static String constructTxnLogTopicBaseName(String tenant, KafkaServiceConfiguration conf) {
        return tenant + "/" + conf.getKafkaMetadataNamespace() + "/__transaction_state";
    }

    public static String constructTxnProducerIdTopicBaseName(String tenant, KafkaServiceConfiguration conf) {
        return tenant + "/" + conf.getKafkaMetadataNamespace() + "/__transaction_producerid_generator";
    }

    public static String constructSchemaRegistryTopicName(String tenant, KafkaServiceConfiguration conf) {
        return tenant + "/" + conf.getKopSchemaRegistryNamespace() + "/" + conf.getKopSchemaRegistryTopicName();
    }

    public static String constructMetadataNamespace(String tenant, KafkaServiceConfiguration conf) {
        return tenant + "/" + conf.getKafkaMetadataNamespace();
    }

    public static String constructUserTopicsNamespace(String tenant, KafkaServiceConfiguration conf) {
        return tenant + "/" + conf.getKafkaNamespace();
    }

    public static void createOffsetMetadataIfMissing(String tenant, PulsarAdmin pulsarAdmin, ClusterData clusterData, KafkaServiceConfiguration conf) throws PulsarAdminException {
        KopTopic kopTopic = new KopTopic(MetadataUtils.constructOffsetsTopicBaseName(tenant, conf), MetadataUtils.constructMetadataNamespace(tenant, conf));
        MetadataUtils.createKafkaMetadataIfMissing(tenant, conf.getKafkaMetadataNamespace(), pulsarAdmin, clusterData, conf, kopTopic, conf.getOffsetsTopicNumPartitions(), false);
    }

    public static void createTxnMetadataIfMissing(String tenant, PulsarAdmin pulsarAdmin, ClusterData clusterData, KafkaServiceConfiguration conf) throws PulsarAdminException {
        KopTopic kopTopic = new KopTopic(MetadataUtils.constructTxnLogTopicBaseName(tenant, conf), MetadataUtils.constructMetadataNamespace(tenant, conf));
        MetadataUtils.createKafkaMetadataIfMissing(tenant, conf.getKafkaMetadataNamespace(), pulsarAdmin, clusterData, conf, kopTopic, conf.getKafkaTxnLogTopicNumPartitions(), false);
        if (conf.isKafkaTransactionProducerIdsStoredOnPulsar()) {
            KopTopic producerIdKopTopic = new KopTopic(MetadataUtils.constructTxnProducerIdTopicBaseName(tenant, conf), MetadataUtils.constructMetadataNamespace(tenant, conf));
            MetadataUtils.createTopicIfNotExist(pulsarAdmin, producerIdKopTopic.getFullName(), 1);
        }
    }

    private static void createKafkaMetadataIfMissing(String tenant, String namespace, PulsarAdmin pulsarAdmin, ClusterData clusterData, KafkaServiceConfiguration conf, KopTopic kopTopic, int partitionNum, boolean infiniteRetention) throws PulsarAdminException {
        if (!conf.isKafkaManageSystemNamespaces()) {
            log.info("Skipping initialization of topic {} for tenant {}", (Object)kopTopic.getFullName(), (Object)tenant);
            return;
        }
        String kafkaMetadataNamespace = tenant + "/" + namespace;
        String cluster = conf.getClusterName();
        boolean clusterExists = false;
        boolean tenantExists = false;
        boolean namespaceExists = false;
        boolean offsetsTopicExists = false;
        try {
            Clusters clusters = pulsarAdmin.clusters();
            if (!clusters.getClusters().contains(cluster)) {
                try {
                    pulsarAdmin.clusters().createCluster(cluster, clusterData);
                }
                catch (PulsarAdminException e) {
                    if (e instanceof PulsarAdminException.ConflictException) {
                        log.info("Attempted to create cluster {} however it was created concurrently.", (Object)cluster);
                    }
                    throw e;
                }
            } else {
                ClusterData configuredClusterData = clusters.getCluster(cluster);
                log.info("Cluster {} found: {}", (Object)cluster, (Object)configuredClusterData);
            }
            clusterExists = true;
            Tenants tenants = pulsarAdmin.tenants();
            MetadataUtils.createTenantIfMissing(tenant, conf, cluster, tenants);
            tenantExists = true;
            Namespaces namespaces = pulsarAdmin.namespaces();
            MetadataUtils.createNamespaceIfMissing(tenant, conf, cluster, kafkaMetadataNamespace, namespaces, infiniteRetention, true);
            namespaceExists = true;
            MetadataUtils.createTopicIfNotExist(pulsarAdmin, kopTopic.getFullName(), partitionNum);
            offsetsTopicExists = true;
        }
        catch (PulsarAdminException e) {
            block11: {
                try {
                    if (!(e instanceof PulsarAdminException.ConflictException)) break block11;
                    log.info("Resources concurrent creating and cause e: ", (Throwable)e);
                }
                catch (Throwable throwable) {
                    log.info("Current state of kafka metadata, cluster: {} exists: {}, tenant: {} exists: {}, namespace: {} exists: {}, topic: {} exists: {}", new Object[]{cluster, clusterExists, tenant, tenantExists, kafkaMetadataNamespace, namespaceExists, kopTopic.getOriginalName(), offsetsTopicExists});
                    throw throwable;
                }
                log.info("Current state of kafka metadata, cluster: {} exists: {}, tenant: {} exists: {}, namespace: {} exists: {}, topic: {} exists: {}", new Object[]{cluster, clusterExists, tenant, tenantExists, kafkaMetadataNamespace, namespaceExists, kopTopic.getOriginalName(), offsetsTopicExists});
                return;
            }
            log.error("Failed to successfully initialize Kafka Metadata {}", (Object)kafkaMetadataNamespace, (Object)e);
            throw e;
        }
        log.info("Current state of kafka metadata, cluster: {} exists: {}, tenant: {} exists: {}, namespace: {} exists: {}, topic: {} exists: {}", new Object[]{cluster, clusterExists, tenant, tenantExists, kafkaMetadataNamespace, namespaceExists, kopTopic.getOriginalName(), offsetsTopicExists});
    }

    private static void createTenantIfMissing(String tenant, KafkaServiceConfiguration conf, String cluster, Tenants tenants) throws PulsarAdminException {
        if (!tenants.getTenants().contains(tenant)) {
            log.info("Tenant: {} does not exist, creating it ...", (Object)tenant);
            tenants.createTenant(tenant, TenantInfo.builder().adminRoles(conf.getSuperUserRoles()).allowedClusters(Collections.singleton(cluster)).build());
        } else {
            TenantInfo kafkaMetadataTenantInfo = tenants.getTenantInfo(tenant);
            Set allowedClusters = kafkaMetadataTenantInfo.getAllowedClusters();
            if (!allowedClusters.contains(cluster)) {
                log.info("Tenant: {} exists but cluster: {} is not in the allowedClusters list, updating it ...", (Object)tenant, (Object)cluster);
                allowedClusters.add(cluster);
                tenants.updateTenant(tenant, kafkaMetadataTenantInfo);
            }
        }
    }

    private static void createNamespaceIfMissing(String tenant, KafkaServiceConfiguration conf, String cluster, String kafkaNamespace, Namespaces namespaces, boolean infiniteRetention, boolean isMetadataNamespace) throws PulsarAdminException {
        if (!namespaces.getNamespaces(tenant).contains(kafkaNamespace)) {
            log.info("Namespaces: {} does not exist in tenant: {}, creating it ...", (Object)kafkaNamespace, (Object)tenant);
            HashSet replicationClusters = Sets.newHashSet((Object[])new String[]{cluster});
            namespaces.createNamespace(kafkaNamespace, (Set)replicationClusters);
            namespaces.setNamespaceReplicationClusters(kafkaNamespace, (Set)replicationClusters);
            if (isMetadataNamespace) {
                if (infiniteRetention) {
                    log.info("Namespaces: {}, setting infinite retention", (Object)kafkaNamespace, (Object)tenant);
                    namespaces.setRetention(kafkaNamespace, new RetentionPolicies(-1, -1));
                } else {
                    namespaces.setRetention(kafkaNamespace, new RetentionPolicies((int)conf.getOffsetsRetentionMinutes(), conf.getSystemTopicRetentionSizeInMB()));
                    namespaces.setNamespaceMessageTTL(kafkaNamespace, conf.getOffsetsMessageTTL());
                }
                namespaces.setCompactionThreshold(kafkaNamespace, 0x6400000L);
            }
        } else {
            List replicationClusters = namespaces.getNamespaceReplicationClusters(kafkaNamespace);
            if (!replicationClusters.contains(cluster)) {
                log.info("Namespace: {} exists but cluster: {} is not in the replicationClusters list,updating it ...", (Object)kafkaNamespace, (Object)cluster);
                HashSet newReplicationClusters = Sets.newHashSet((Iterable)replicationClusters);
                newReplicationClusters.add(cluster);
                namespaces.setNamespaceReplicationClusters(kafkaNamespace, (Set)newReplicationClusters);
            }
        }
    }

    public static void createKafkaNamespaceIfMissing(PulsarAdmin pulsarAdmin, ClusterData clusterData, KafkaServiceConfiguration conf) throws PulsarAdminException {
        String cluster = conf.getClusterName();
        String tenant = conf.getKafkaTenant();
        String kafkaNamespace = MetadataUtils.constructUserTopicsNamespace(tenant, conf);
        boolean clusterExists = false;
        boolean tenantExists = false;
        boolean namespaceExists = false;
        try {
            Clusters clusters = pulsarAdmin.clusters();
            if (!clusters.getClusters().contains(cluster)) {
                try {
                    pulsarAdmin.clusters().createCluster(cluster, clusterData);
                }
                catch (PulsarAdminException e) {
                    if (e instanceof PulsarAdminException.ConflictException) {
                        log.info("Attempted to create cluster {} however it was created concurrently.", (Object)cluster);
                    }
                    throw e;
                }
            } else {
                ClusterData configuredClusterData = clusters.getCluster(cluster);
                log.info("Cluster {} found: {}", (Object)cluster, (Object)configuredClusterData);
            }
            clusterExists = true;
            Tenants tenants = pulsarAdmin.tenants();
            MetadataUtils.createTenantIfMissing(tenant, conf, cluster, tenants);
            tenantExists = true;
            Namespaces namespaces = pulsarAdmin.namespaces();
            MetadataUtils.createNamespaceIfMissing(tenant, conf, cluster, kafkaNamespace, namespaces, false, false);
            namespaceExists = true;
        }
        catch (PulsarAdminException e) {
            block10: {
                try {
                    if (!(e instanceof PulsarAdminException.ConflictException)) break block10;
                    log.info("Resources concurrent creating and cause e: ", (Throwable)e);
                }
                catch (Throwable throwable) {
                    log.info("Current state of kafka metadata, cluster: {} exists: {}, tenant: {} exists: {}, namespace: {} exists: {}", new Object[]{cluster, clusterExists, tenant, tenantExists, kafkaNamespace, namespaceExists});
                    throw throwable;
                }
                log.info("Current state of kafka metadata, cluster: {} exists: {}, tenant: {} exists: {}, namespace: {} exists: {}", new Object[]{cluster, clusterExists, tenant, tenantExists, kafkaNamespace, namespaceExists});
                return;
            }
            log.error("Failed to successfully initialize Kafka Metadata {}", (Object)kafkaNamespace, (Object)e);
            throw e;
        }
        log.info("Current state of kafka metadata, cluster: {} exists: {}, tenant: {} exists: {}, namespace: {} exists: {}", new Object[]{cluster, clusterExists, tenant, tenantExists, kafkaNamespace, namespaceExists});
    }

    private static void createTopicIfNotExist(PulsarAdmin admin, String topic, int numPartitions) throws PulsarAdminException {
        try {
            admin.topics().createPartitionedTopic(topic, numPartitions);
        }
        catch (PulsarAdminException.ConflictException e) {
            log.info("Resources concurrent creating for topic : {}, caused by : {}", (Object)topic, (Object)e.getMessage());
        }
        try {
            admin.topics().createMissedPartitions(topic);
        }
        catch (PulsarAdminException pulsarAdminException) {
            // empty catch block
        }
    }

    public static void createSchemaRegistryMetadataIfMissing(String tenant, PulsarAdmin pulsarAdmin, ClusterData clusterData, KafkaServiceConfiguration conf) throws PulsarAdminException {
        KopTopic kopTopic = new KopTopic(MetadataUtils.constructSchemaRegistryTopicName(tenant, conf), MetadataUtils.constructMetadataNamespace(tenant, conf));
        MetadataUtils.createKafkaMetadataIfMissing(tenant, conf.getKopSchemaRegistryNamespace(), pulsarAdmin, clusterData, conf, kopTopic, 1, true);
    }
}

