/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.bookie.rackawareness;

import java.net.InetAddress;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import org.apache.bookkeeper.client.ITopologyAwareEnsemblePlacementPolicy;
import org.apache.bookkeeper.client.RackChangeNotifier;
import org.apache.bookkeeper.meta.exceptions.Code;
import org.apache.bookkeeper.meta.exceptions.MetadataException;
import org.apache.bookkeeper.net.AbstractDNSToSwitchMapping;
import org.apache.bookkeeper.net.BookieId;
import org.apache.bookkeeper.net.BookieNode;
import org.apache.bookkeeper.net.BookieSocketAddress;
import org.apache.bookkeeper.proto.BookieAddressResolver;
import org.apache.commons.configuration.Configuration;
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.common.policies.data.BookieInfo;
import org.apache.pulsar.common.policies.data.BookiesRackConfiguration;
import org.apache.pulsar.metadata.api.MetadataCache;
import org.apache.pulsar.metadata.api.MetadataStore;
import org.apache.pulsar.metadata.api.MetadataStoreConfig;
import org.apache.pulsar.metadata.api.MetadataStoreException;
import org.apache.pulsar.metadata.api.Notification;
import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class BookieRackAffinityMapping
extends AbstractDNSToSwitchMapping
implements RackChangeNotifier {
    private static final Logger LOG = LoggerFactory.getLogger(BookieRackAffinityMapping.class);
    public static final String BOOKIE_INFO_ROOT_PATH = "/bookies";
    public static final String METADATA_STORE_INSTANCE = "METADATA_STORE_INSTANCE";
    private MetadataCache<BookiesRackConfiguration> bookieMappingCache = null;
    private ITopologyAwareEnsemblePlacementPolicy<BookieNode> rackawarePolicy = null;
    private List<BookieId> bookieAddressListLastTime = new ArrayList<BookieId>();
    private volatile BookiesRackConfiguration racksWithHost = new BookiesRackConfiguration();
    private volatile Map<String, BookieInfo> bookieInfoMap = new HashMap<String, BookieInfo>();

    public static MetadataStore createMetadataStore(Configuration conf) throws MetadataException {
        MetadataStore store;
        Object storeProperty = conf.getProperty(METADATA_STORE_INSTANCE);
        if (storeProperty != null) {
            if (!(storeProperty instanceof MetadataStore)) {
                throw new RuntimeException("METADATA_STORE_INSTANCE is not an instance of MetadataStore");
            }
            store = (MetadataStore)storeProperty;
        } else {
            String url;
            String metadataServiceUri = (String)conf.getProperty("metadataServiceUri");
            if (StringUtils.isNotBlank((CharSequence)metadataServiceUri)) {
                try {
                    url = metadataServiceUri.replaceFirst("metadata-store:", "").replace(";", ",");
                }
                catch (Exception e) {
                    throw new MetadataException(Code.METADATA_SERVICE_ERROR, (Throwable)e);
                }
            } else {
                String zkServers = (String)conf.getProperty("zkServers");
                if (StringUtils.isBlank((CharSequence)zkServers)) {
                    String errorMsg = String.format("Neither %s configuration set in the BK client configuration nor metadataServiceUri/zkServers set in bk server configuration", METADATA_STORE_INSTANCE);
                    throw new RuntimeException(errorMsg);
                }
                url = zkServers;
            }
            try {
                int zkTimeout = Integer.parseInt((String)conf.getProperty("zkTimeout"));
                store = MetadataStoreExtended.create((String)url, (MetadataStoreConfig)MetadataStoreConfig.builder().sessionTimeoutMillis(zkTimeout).build());
            }
            catch (MetadataStoreException e) {
                throw new MetadataException(Code.METADATA_SERVICE_ERROR, (Throwable)e);
            }
        }
        return store;
    }

    public void setConf(Configuration conf) {
        MetadataStore store;
        super.setConf(conf);
        try {
            store = BookieRackAffinityMapping.createMetadataStore(conf);
            this.bookieMappingCache = store.getMetadataCache(BookiesRackConfiguration.class);
            this.bookieMappingCache.get(BOOKIE_INFO_ROOT_PATH).join();
            for (Map bookieMapping : (Collection)((Optional)this.bookieMappingCache.get(BOOKIE_INFO_ROOT_PATH).get()).map(Map::values).orElse(Collections.emptyList())) {
                for (String address : bookieMapping.keySet()) {
                    this.bookieAddressListLastTime.add(BookieId.parse((String)address));
                }
                if (!LOG.isDebugEnabled()) continue;
                LOG.debug("BookieRackAffinityMapping init, bookieAddressListLastTime {}", this.bookieAddressListLastTime);
            }
        }
        catch (InterruptedException | ExecutionException | MetadataException e) {
            throw new RuntimeException("METADATA_STORE_INSTANCE failed to init BookieId list");
        }
        store.registerListener(this::handleUpdates);
    }

    private void updateRacksWithHost(BookiesRackConfiguration racks) {
        BookiesRackConfiguration newRacksWithHost = new BookiesRackConfiguration();
        HashMap<String, BookieInfo> newBookieInfoMap = new HashMap<String, BookieInfo>();
        racks.forEach((group, bookies) -> bookies.forEach((addr, bi) -> {
            try {
                BookieId bookieId = BookieId.parse((String)addr);
                BookieAddressResolver addressResolver = this.getBookieAddressResolver();
                if (addressResolver == null) {
                    LOG.warn("Bookie address resolver not yet initialized, skipping resolution");
                } else {
                    BookieSocketAddress bsa = addressResolver.resolve(bookieId);
                    newRacksWithHost.updateBookie(group, bsa.toString(), bi);
                    String hostname = bsa.getSocketAddress().getHostName();
                    newBookieInfoMap.put(hostname, (BookieInfo)bi);
                    InetAddress address = bsa.getSocketAddress().getAddress();
                    if (null != address) {
                        String hostIp = address.getHostAddress();
                        if (null != hostIp) {
                            newBookieInfoMap.put(hostIp, (BookieInfo)bi);
                        }
                    } else {
                        LOG.info("Network address for {} is unresolvable yet.", addr);
                    }
                }
            }
            catch (BookieAddressResolver.BookieIdNotResolvedException e) {
                LOG.info("Network address for {} is unresolvable yet. error is {}", addr, (Object)e);
            }
        }));
        this.racksWithHost = newRacksWithHost;
        this.bookieInfoMap = newBookieInfoMap;
    }

    public List<String> resolve(List<String> bookieAddressList) {
        ArrayList<String> racks = new ArrayList<String>(bookieAddressList.size());
        for (String bookieAddress : bookieAddressList) {
            racks.add(this.getRack(bookieAddress));
        }
        return racks;
    }

    private String getRack(String bookieAddress) {
        try {
            CompletableFuture future = this.bookieMappingCache.get(BOOKIE_INFO_ROOT_PATH);
            Optional racks = future.isDone() && !future.isCompletedExceptionally() ? (Optional)future.join() : Optional.empty();
            this.updateRacksWithHost(racks.orElseGet(BookiesRackConfiguration::new));
            if (!racks.isPresent()) {
                return null;
            }
        }
        catch (Exception e) {
            throw new RuntimeException(e);
        }
        BookieInfo bi = this.bookieInfoMap.get(bookieAddress);
        if (bi == null) {
            Optional biOpt = this.racksWithHost.getBookie(bookieAddress);
            if (biOpt.isPresent()) {
                bi = (BookieInfo)biOpt.get();
            } else {
                this.updateRacksWithHost(this.racksWithHost);
                bi = this.bookieInfoMap.get(bookieAddress);
            }
        }
        if (bi != null && !StringUtils.isEmpty((CharSequence)bi.getRack()) && !bi.getRack().trim().equals("/")) {
            String rack = bi.getRack();
            if (!rack.startsWith("/")) {
                rack = "/" + rack;
            }
            return rack;
        }
        return null;
    }

    public String toString() {
        return "zk based bookie rack affinity mapping";
    }

    public void reloadCachedMappings() {
    }

    private void handleUpdates(Notification n) {
        if (!n.getPath().equals(BOOKIE_INFO_ROOT_PATH)) {
            return;
        }
        if (this.rackawarePolicy != null) {
            this.bookieMappingCache.get(BOOKIE_INFO_ROOT_PATH).thenAccept(optVal -> {
                LOG.info("Bookie rack info updated to {}. Notifying rackaware policy.", optVal);
                ArrayList<BookieId> bookieAddressList = new ArrayList<BookieId>();
                for (Map bookieMapping : (Collection)optVal.map(Map::values).orElse(Collections.emptyList())) {
                    for (String addr : bookieMapping.keySet()) {
                        bookieAddressList.add(BookieId.parse((String)addr));
                    }
                }
                if (LOG.isDebugEnabled()) {
                    LOG.debug("Bookies with rack update from {} to {}", this.bookieAddressListLastTime, bookieAddressList);
                }
                HashSet<BookieId> bookieIdSet = new HashSet<BookieId>(bookieAddressList);
                bookieIdSet.addAll(this.bookieAddressListLastTime);
                this.bookieAddressListLastTime = bookieAddressList;
                this.rackawarePolicy.onBookieRackChange(new ArrayList(bookieIdSet));
            });
        }
    }

    public void registerRackChangeListener(ITopologyAwareEnsemblePlacementPolicy<BookieNode> rackawarePolicy) {
        this.rackawarePolicy = rackawarePolicy;
    }
}

