/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.broker.loadbalance.extensions;

import com.google.common.annotations.VisibleForTesting;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.BiConsumer;
import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.loadbalance.extensions.BrokerRegistry;
import org.apache.pulsar.broker.loadbalance.extensions.data.BrokerLookupData;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.metadata.api.MetadataStoreException;
import org.apache.pulsar.metadata.api.Notification;
import org.apache.pulsar.metadata.api.NotificationType;
import org.apache.pulsar.metadata.api.coordination.LockManager;
import org.apache.pulsar.metadata.api.coordination.ResourceLock;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class BrokerRegistryImpl
implements BrokerRegistry {
    private static final Logger log = LoggerFactory.getLogger(BrokerRegistryImpl.class);
    private final PulsarService pulsar;
    private final ServiceConfiguration conf;
    private final BrokerLookupData brokerLookupData;
    private final LockManager<BrokerLookupData> brokerLookupDataLockManager;
    private final String brokerId;
    private final ScheduledExecutorService scheduler;
    private final List<BiConsumer<String, NotificationType>> listeners;
    private volatile ResourceLock<BrokerLookupData> brokerLookupDataLock;
    private State state;

    public BrokerRegistryImpl(PulsarService pulsar) {
        this.pulsar = pulsar;
        this.conf = pulsar.getConfiguration();
        this.brokerLookupDataLockManager = pulsar.getCoordinationService().getLockManager(BrokerLookupData.class);
        this.scheduler = pulsar.getLoadManagerExecutor();
        this.listeners = new ArrayList<BiConsumer<String, NotificationType>>();
        this.brokerId = pulsar.getLookupServiceAddress();
        this.brokerLookupData = new BrokerLookupData(pulsar.getSafeWebServiceAddress(), pulsar.getWebServiceAddressTls(), pulsar.getBrokerServiceUrl(), pulsar.getBrokerServiceUrlTls(), pulsar.getAdvertisedListeners(), pulsar.getProtocolDataToAdvertise(), pulsar.getConfiguration().isEnablePersistentTopics(), pulsar.getConfiguration().isEnableNonPersistentTopics(), this.conf.getLoadManagerClassName(), System.currentTimeMillis(), pulsar.getBrokerVersion());
        this.state = State.Init;
    }

    @Override
    public synchronized void start() throws PulsarServerException {
        if (this.state != State.Init) {
            return;
        }
        this.pulsar.getLocalMetadataStore().registerListener(this::handleMetadataStoreNotification);
        try {
            this.state = State.Started;
            this.register();
        }
        catch (MetadataStoreException e) {
            throw new PulsarServerException((Throwable)e);
        }
    }

    @Override
    public boolean isStarted() {
        return this.state == State.Started || this.state == State.Registered;
    }

    @Override
    public synchronized void register() throws MetadataStoreException {
        if (this.state == State.Started) {
            try {
                this.brokerLookupDataLock = (ResourceLock)this.brokerLookupDataLockManager.acquireLock(BrokerRegistryImpl.keyPath(this.brokerId), (Object)this.brokerLookupData).get(this.conf.getMetadataStoreOperationTimeoutSeconds(), TimeUnit.SECONDS);
                this.state = State.Registered;
            }
            catch (InterruptedException | ExecutionException | TimeoutException e) {
                throw MetadataStoreException.unwrap((Throwable)e);
            }
        }
    }

    @Override
    public synchronized void unregister() throws MetadataStoreException {
        if (this.state == State.Registered) {
            try {
                this.brokerLookupDataLock.release().get(this.conf.getMetadataStoreOperationTimeoutSeconds(), TimeUnit.SECONDS);
                this.state = State.Started;
            }
            catch (InterruptedException | CompletionException | ExecutionException | TimeoutException e) {
                throw MetadataStoreException.unwrap((Throwable)e);
            }
        }
    }

    @Override
    public String getBrokerId() {
        return this.brokerId;
    }

    @Override
    public CompletableFuture<List<String>> getAvailableBrokersAsync() {
        this.checkState();
        return this.brokerLookupDataLockManager.listLocks("/loadbalance/brokers").thenApply(ArrayList::new);
    }

    @Override
    public CompletableFuture<Optional<BrokerLookupData>> lookupAsync(String broker) {
        this.checkState();
        return this.brokerLookupDataLockManager.readLock(BrokerRegistryImpl.keyPath(broker));
    }

    @Override
    public CompletableFuture<Map<String, BrokerLookupData>> getAvailableBrokerLookupDataAsync() {
        this.checkState();
        return this.getAvailableBrokersAsync().thenCompose(availableBrokers -> {
            ConcurrentHashMap map = new ConcurrentHashMap();
            ArrayList<CompletionStage> futures = new ArrayList<CompletionStage>();
            for (String brokerId : availableBrokers) {
                futures.add(this.lookupAsync(brokerId).thenAccept(lookupDataOpt -> {
                    if (lookupDataOpt.isPresent()) {
                        map.put(brokerId, (BrokerLookupData)lookupDataOpt.get());
                    } else {
                        log.warn("Got an empty lookup data, brokerId: {}", (Object)brokerId);
                    }
                }));
            }
            return FutureUtil.waitForAll(futures).thenApply(__ -> map);
        });
    }

    @Override
    public synchronized void addListener(BiConsumer<String, NotificationType> listener) {
        this.checkState();
        this.listeners.add(listener);
    }

    @Override
    public synchronized void close() throws PulsarServerException {
        if (this.state == State.Closed) {
            return;
        }
        try {
            this.listeners.clear();
            this.unregister();
            this.brokerLookupDataLockManager.close();
        }
        catch (Exception ex) {
            if (ex.getCause() instanceof MetadataStoreException.NotFoundException) {
                throw new PulsarServerException.NotFoundException((Throwable)MetadataStoreException.unwrap((Throwable)ex));
            }
            throw new PulsarServerException((Throwable)MetadataStoreException.unwrap((Throwable)ex));
        }
        finally {
            this.state = State.Closed;
        }
    }

    private void handleMetadataStoreNotification(Notification t) {
        if (!this.isStarted() || !BrokerRegistryImpl.isVerifiedNotification(t)) {
            return;
        }
        try {
            if (log.isDebugEnabled()) {
                log.debug("Handle notification: [{}]", (Object)t);
            }
            if (this.listeners.isEmpty()) {
                return;
            }
            this.scheduler.submit(() -> {
                String brokerId = t.getPath().substring("/loadbalance/brokers".length() + 1);
                for (BiConsumer<String, NotificationType> listener : this.listeners) {
                    listener.accept(brokerId, t.getType());
                }
            });
        }
        catch (RejectedExecutionException rejectedExecutionException) {
            // empty catch block
        }
    }

    @VisibleForTesting
    protected static boolean isVerifiedNotification(Notification t) {
        return t.getPath().startsWith("/loadbalance/brokers/") && t.getPath().length() > "/loadbalance/brokers".length() + 1;
    }

    @VisibleForTesting
    protected static String keyPath(String brokerId) {
        return String.format("%s/%s", "/loadbalance/brokers", brokerId);
    }

    private void checkState() throws IllegalStateException {
        if (this.state == State.Closed) {
            throw new IllegalStateException("The registry already closed.");
        }
    }

    protected static enum State {
        Init,
        Started,
        Registered,
        Closed;

    }
}

