/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.proxy.server.util;

import java.io.Closeable;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import org.apache.bookkeeper.common.util.OrderedExecutor;
import org.apache.bookkeeper.common.util.OrderedScheduler;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.common.util.ObjectMapperFactory;
import org.apache.pulsar.policies.data.loadbalancer.LoadManagerReport;
import org.apache.pulsar.zookeeper.LocalZooKeeperCache;
import org.apache.pulsar.zookeeper.ZooKeeperCache;
import org.apache.pulsar.zookeeper.ZooKeeperChildrenCache;
import org.apache.pulsar.zookeeper.ZooKeeperClientFactory;
import org.apache.pulsar.zookeeper.ZooKeeperDataCache;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.ZooKeeper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ZookeeperCacheLoader
implements Closeable {
    private final ZooKeeper zkClient;
    private final ZooKeeperCache localZkCache;
    private final ZooKeeperDataCache<LoadManagerReport> brokerInfo;
    private final ZooKeeperChildrenCache availableBrokersCache;
    private volatile List<LoadManagerReport> availableBrokers;
    private final OrderedScheduler orderedExecutor = (OrderedScheduler)OrderedScheduler.newSchedulerBuilder().numThreads(8).name("pulsar-proxy-ordered-cache").build();
    public static final String LOADBALANCE_BROKERS_ROOT = "/loadbalance/brokers";
    private static final Logger log = LoggerFactory.getLogger(ZookeeperCacheLoader.class);

    public ZookeeperCacheLoader(ZooKeeperClientFactory factory, String zookeeperServers, int zookeeperSessionTimeoutMs) throws Exception {
        this.zkClient = (ZooKeeper)factory.create(zookeeperServers, ZooKeeperClientFactory.SessionType.AllowReadOnly, zookeeperSessionTimeoutMs).get();
        int zkOperationTimeoutSeconds = (int)TimeUnit.MILLISECONDS.toSeconds(zookeeperSessionTimeoutMs);
        this.localZkCache = new LocalZooKeeperCache(this.zkClient, zkOperationTimeoutSeconds, (OrderedExecutor)this.orderedExecutor);
        this.brokerInfo = new ZooKeeperDataCache<LoadManagerReport>(this.localZkCache){

            public LoadManagerReport deserialize(String key, byte[] content) throws Exception {
                return (LoadManagerReport)ObjectMapperFactory.getThreadLocal().readValue(content, LoadManagerReport.class);
            }
        };
        this.availableBrokersCache = new ZooKeeperChildrenCache(this.getLocalZkCache(), LOADBALANCE_BROKERS_ROOT);
        this.availableBrokersCache.registerListener((path, brokerNodes, stat) -> ((CompletableFuture)this.updateBrokerList((Set<String>)brokerNodes).thenRun(() -> log.info("Successfully updated broker info {}", brokerNodes))).exceptionally(ex -> {
            log.warn("Error updating broker info after broker list changed", ex);
            return null;
        }));
        try {
            this.updateBrokerList(this.availableBrokersCache.get()).get(zkOperationTimeoutSeconds, TimeUnit.SECONDS);
        }
        catch (KeeperException.NoNodeException nne) {
            this.updateBrokerList(Collections.emptySet()).get(zkOperationTimeoutSeconds, TimeUnit.SECONDS);
        }
    }

    public List<LoadManagerReport> getAvailableBrokers() {
        return this.availableBrokers;
    }

    public ZooKeeperCache getLocalZkCache() {
        return this.localZkCache;
    }

    @Override
    public void close() throws IOException {
        try {
            this.zkClient.close();
        }
        catch (InterruptedException e) {
            Thread.interrupted();
            throw new IOException(e);
        }
        this.orderedExecutor.shutdown();
    }

    private CompletableFuture<Void> updateBrokerList(Set<String> brokerNodes) {
        CompletableFuture<Void> future = new CompletableFuture<Void>();
        if (brokerNodes.isEmpty()) {
            this.availableBrokers = new ArrayList<LoadManagerReport>();
            future.complete(null);
            return future;
        }
        ArrayList<CompletableFuture> loadReportFutureList = new ArrayList<CompletableFuture>();
        for (String broker : brokerNodes) {
            loadReportFutureList.add(this.brokerInfo.getAsync("/loadbalance/brokers/" + broker));
        }
        ((CompletableFuture)FutureUtil.waitForAll(loadReportFutureList).thenRun(() -> {
            ArrayList<LoadManagerReport> newAvailableBrokers = new ArrayList<LoadManagerReport>(brokerNodes.size());
            for (CompletableFuture loadReportFuture : loadReportFutureList) {
                try {
                    Optional loadReport = (Optional)loadReportFuture.get();
                    if (!loadReport.isPresent()) continue;
                    newAvailableBrokers.add((LoadManagerReport)loadReport.get());
                }
                catch (Exception e) {
                    future.completeExceptionally(e);
                    return;
                }
            }
            this.availableBrokers = newAvailableBrokers;
            future.complete(null);
        })).exceptionally(ex -> {
            future.completeExceptionally((Throwable)ex);
            return null;
        });
        return future;
    }
}

