/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.proxy.server;

import com.google.common.base.Joiner;
import io.netty.util.concurrent.DefaultThreadFactory;
import java.io.Closeable;
import java.io.IOException;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.bookkeeper.common.util.MathUtils;
import org.apache.bookkeeper.common.util.OrderedExecutor;
import org.apache.bookkeeper.common.util.OrderedScheduler;
import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
import org.apache.pulsar.common.policies.data.TenantInfo;
import org.apache.pulsar.common.util.ObjectMapperFactory;
import org.apache.pulsar.policies.data.loadbalancer.LoadManagerReport;
import org.apache.pulsar.proxy.server.ProxyConfiguration;
import org.apache.pulsar.proxy.server.ProxyService;
import org.apache.pulsar.proxy.server.util.ZookeeperCacheLoader;
import org.apache.pulsar.zookeeper.GlobalZooKeeperCache;
import org.apache.pulsar.zookeeper.ZooKeeperClientFactory;
import org.apache.zookeeper.KeeperException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class BrokerDiscoveryProvider
implements Closeable {
    final ZookeeperCacheLoader localZkCache;
    final GlobalZooKeeperCache globalZkCache;
    private final AtomicInteger counter = new AtomicInteger();
    private final OrderedScheduler orderedExecutor = (OrderedScheduler)OrderedScheduler.newSchedulerBuilder().numThreads(4).name("pulsar-proxy-ordered").build();
    private final ScheduledExecutorService scheduledExecutorScheduler = Executors.newScheduledThreadPool(4, (ThreadFactory)new DefaultThreadFactory("pulsar-proxy-scheduled-executor"));
    private static final String PARTITIONED_TOPIC_PATH_ZNODE = "partitioned-topics";
    private static final Logger LOG = LoggerFactory.getLogger(BrokerDiscoveryProvider.class);

    public BrokerDiscoveryProvider(ProxyConfiguration config, ZooKeeperClientFactory zkClientFactory) throws PulsarServerException {
        try {
            this.localZkCache = new ZookeeperCacheLoader(zkClientFactory, config.getZookeeperServers(), config.getZookeeperSessionTimeoutMs());
            this.globalZkCache = new GlobalZooKeeperCache(zkClientFactory, config.getZookeeperSessionTimeoutMs(), (int)TimeUnit.MILLISECONDS.toSeconds(config.getZookeeperSessionTimeoutMs()), config.getConfigurationStoreServers(), (OrderedExecutor)this.orderedExecutor, this.scheduledExecutorScheduler, config.getZookeeperSessionTimeoutMs());
            this.globalZkCache.start();
        }
        catch (Exception e) {
            LOG.error("Failed to start ZooKeeper {}", (Object)e.getMessage(), (Object)e);
            throw new PulsarServerException("Failed to start zookeeper :" + e.getMessage(), (Throwable)e);
        }
    }

    LoadManagerReport nextBroker() throws PulsarServerException {
        List<LoadManagerReport> availableBrokers = this.localZkCache.getAvailableBrokers();
        if (availableBrokers.isEmpty()) {
            throw new PulsarServerException("No active broker is available");
        }
        int brokersCount = availableBrokers.size();
        int nextIdx = MathUtils.signSafeMod((long)this.counter.getAndIncrement(), (int)brokersCount);
        return availableBrokers.get(nextIdx);
    }

    CompletableFuture<PartitionedTopicMetadata> getPartitionedTopicMetadata(ProxyService service, TopicName topicName, String role, AuthenticationDataSource authenticationData) {
        CompletableFuture<PartitionedTopicMetadata> metadataFuture = new CompletableFuture<PartitionedTopicMetadata>();
        try {
            BrokerDiscoveryProvider.checkAuthorization(service, topicName, role, authenticationData);
            String path = BrokerDiscoveryProvider.path(PARTITIONED_TOPIC_PATH_ZNODE, topicName.getNamespaceObject().toString(), "persistent", topicName.getEncodedLocalName());
            ((CompletableFuture)this.globalZkCache.getDataAsync(path, (key, content) -> (PartitionedTopicMetadata)ObjectMapperFactory.getThreadLocal().readValue(content, PartitionedTopicMetadata.class)).thenAccept(metadata -> {
                if (metadata.isPresent()) {
                    metadataFuture.complete((PartitionedTopicMetadata)metadata.get());
                } else {
                    metadataFuture.complete(new PartitionedTopicMetadata());
                }
            })).exceptionally(ex -> {
                metadataFuture.completeExceptionally((Throwable)ex);
                return null;
            });
        }
        catch (Exception e) {
            metadataFuture.completeExceptionally(e);
        }
        return metadataFuture;
    }

    protected static void checkAuthorization(ProxyService service, TopicName topicName, String role, AuthenticationDataSource authenticationData) throws Exception {
        if (!service.getConfiguration().isAuthorizationEnabled() || service.getConfiguration().getSuperUserRoles().contains(role)) {
            return;
        }
        if (!service.getAuthorizationService().canLookup(topicName, role, authenticationData)) {
            TenantInfo tenantInfo;
            LOG.warn("[{}] Role {} is not allowed to lookup topic", (Object)topicName, (Object)role);
            try {
                tenantInfo = (TenantInfo)service.getConfigurationCacheService().propertiesCache().get(BrokerDiscoveryProvider.path("policies", topicName.getTenant())).orElseThrow(() -> new IllegalAccessException("Property does not exist"));
            }
            catch (KeeperException.NoNodeException e) {
                LOG.warn("Failed to get property admin data for non existing property {}", (Object)topicName.getTenant());
                throw new IllegalAccessException("Property does not exist");
            }
            catch (Exception e) {
                LOG.error("Failed to get property admin data for property");
                throw new IllegalAccessException(String.format("Failed to get property %s admin data due to %s", topicName.getTenant(), e.getMessage()));
            }
            if (!((Boolean)service.getAuthorizationService().isTenantAdmin(topicName.getTenant(), role, tenantInfo, authenticationData).get()).booleanValue()) {
                throw new IllegalAccessException("Don't have permission to administrate resources on this tenant");
            }
        }
        if (LOG.isDebugEnabled()) {
            LOG.debug("Successfully authorized {} on property {}", (Object)role, (Object)topicName.getTenant());
        }
    }

    public static String path(String ... parts) {
        StringBuilder sb = new StringBuilder();
        sb.append("/admin/");
        Joiner.on((char)'/').appendTo(sb, (Object[])parts);
        return sb.toString();
    }

    @Override
    public void close() throws IOException {
        this.localZkCache.close();
        this.globalZkCache.close();
        this.orderedExecutor.shutdown();
        this.scheduledExecutorScheduler.shutdownNow();
    }
}

