/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.proxy.server;

import com.google.common.base.Joiner;
import io.netty.util.concurrent.DefaultThreadFactory;
import java.io.Closeable;
import java.io.IOException;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.bookkeeper.util.MathUtils;
import org.apache.bookkeeper.util.OrderedSafeExecutor;
import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
import org.apache.pulsar.common.naming.DestinationName;
import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
import org.apache.pulsar.common.policies.data.PropertyAdmin;
import org.apache.pulsar.common.util.ObjectMapperFactory;
import org.apache.pulsar.policies.data.loadbalancer.LoadManagerReport;
import org.apache.pulsar.proxy.server.ProxyConfiguration;
import org.apache.pulsar.proxy.server.ProxyService;
import org.apache.pulsar.proxy.server.util.ZookeeperCacheLoader;
import org.apache.pulsar.zookeeper.GlobalZooKeeperCache;
import org.apache.pulsar.zookeeper.ZooKeeperClientFactory;
import org.apache.zookeeper.KeeperException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class BrokerDiscoveryProvider
implements Closeable {
    final ZookeeperCacheLoader localZkCache;
    final GlobalZooKeeperCache globalZkCache;
    private final AtomicInteger counter = new AtomicInteger();
    private final OrderedSafeExecutor orderedExecutor = new OrderedSafeExecutor(4, "pulsar-proxy-ordered");
    private final ScheduledExecutorService scheduledExecutorScheduler = Executors.newScheduledThreadPool(4, (ThreadFactory)new DefaultThreadFactory("pulsar-proxy-scheduled-executor"));
    private static final String PARTITIONED_TOPIC_PATH_ZNODE = "partitioned-topics";
    private static final Logger LOG = LoggerFactory.getLogger(BrokerDiscoveryProvider.class);

    public BrokerDiscoveryProvider(ProxyConfiguration config, ZooKeeperClientFactory zkClientFactory) throws PulsarServerException {
        try {
            this.localZkCache = new ZookeeperCacheLoader(zkClientFactory, config.getZookeeperServers(), config.getZookeeperSessionTimeoutMs());
            this.globalZkCache = new GlobalZooKeeperCache(zkClientFactory, config.getZookeeperSessionTimeoutMs(), config.getGlobalZookeeperServers(), this.orderedExecutor, this.scheduledExecutorScheduler);
            this.globalZkCache.start();
        }
        catch (Exception e) {
            LOG.error("Failed to start Zookkeeper {}", (Object)e.getMessage(), (Object)e);
            throw new PulsarServerException("Failed to start zookeeper :" + e.getMessage(), (Throwable)e);
        }
    }

    LoadManagerReport nextBroker() throws PulsarServerException {
        List<LoadManagerReport> availableBrokers = this.localZkCache.getAvailableBrokers();
        if (availableBrokers.isEmpty()) {
            throw new PulsarServerException("No active broker is available");
        }
        int brokersCount = availableBrokers.size();
        int nextIdx = MathUtils.signSafeMod((long)this.counter.getAndIncrement(), (int)brokersCount);
        return availableBrokers.get(nextIdx);
    }

    CompletableFuture<PartitionedTopicMetadata> getPartitionedTopicMetadata(ProxyService service, DestinationName destination, String role, AuthenticationDataSource authenticationData) {
        CompletableFuture<PartitionedTopicMetadata> metadataFuture = new CompletableFuture<PartitionedTopicMetadata>();
        try {
            BrokerDiscoveryProvider.checkAuthorization(service, destination, role, authenticationData);
            String path = BrokerDiscoveryProvider.path(PARTITIONED_TOPIC_PATH_ZNODE, destination.getProperty(), destination.getCluster(), destination.getNamespacePortion(), "persistent", destination.getEncodedLocalName());
            ((CompletableFuture)this.globalZkCache.getDataAsync(path, (key, content) -> (PartitionedTopicMetadata)ObjectMapperFactory.getThreadLocal().readValue(content, PartitionedTopicMetadata.class)).thenAccept(metadata -> {
                if (metadata.isPresent()) {
                    metadataFuture.complete((PartitionedTopicMetadata)metadata.get());
                } else {
                    metadataFuture.complete(new PartitionedTopicMetadata());
                }
            })).exceptionally(ex -> {
                metadataFuture.completeExceptionally((Throwable)ex);
                return null;
            });
        }
        catch (Exception e) {
            metadataFuture.completeExceptionally(e);
        }
        return metadataFuture;
    }

    protected static void checkAuthorization(ProxyService service, DestinationName destination, String role, AuthenticationDataSource authenticationData) throws Exception {
        if (!service.getConfiguration().isAuthorizationEnabled() || service.getConfiguration().getSuperUserRoles().contains(role)) {
            return;
        }
        if (!service.getAuthorizationService().canLookup(destination, role, authenticationData)) {
            PropertyAdmin propertyAdmin;
            LOG.warn("[{}] Role {} is not allowed to lookup topic", (Object)destination, (Object)role);
            try {
                propertyAdmin = (PropertyAdmin)service.getConfigurationCacheService().propertiesCache().get(BrokerDiscoveryProvider.path("policies", destination.getProperty())).orElseThrow(() -> new IllegalAccessException("Property does not exist"));
            }
            catch (KeeperException.NoNodeException e) {
                LOG.warn("Failed to get property admin data for non existing property {}", (Object)destination.getProperty());
                throw new IllegalAccessException("Property does not exist");
            }
            catch (Exception e) {
                LOG.error("Failed to get property admin data for property");
                throw new IllegalAccessException(String.format("Failed to get property %s admin data due to %s", destination.getProperty(), e.getMessage()));
            }
            if (!propertyAdmin.getAdminRoles().contains(role)) {
                throw new IllegalAccessException("Don't have permission to administrate resources on this property");
            }
        }
        if (LOG.isDebugEnabled()) {
            LOG.debug("Successfully authorized {} on property {}", (Object)role, (Object)destination.getProperty());
        }
    }

    public static String path(String ... parts) {
        StringBuilder sb = new StringBuilder();
        sb.append("/admin/");
        Joiner.on((char)'/').appendTo(sb, (Object[])parts);
        return sb.toString();
    }

    @Override
    public void close() throws IOException {
        this.localZkCache.close();
        this.globalZkCache.close();
        this.orderedExecutor.shutdown();
        this.scheduledExecutorScheduler.shutdownNow();
    }
}

