/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.hive.llap.registry.impl;

import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.io.Closeable;
import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.MalformedURLException;
import java.net.URI;
import java.net.URISyntaxException;
import java.net.URL;
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import javax.security.auth.login.AppConfigurationEntry;
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.api.ACLProvider;
import org.apache.curator.framework.api.BackgroundPathable;
import org.apache.curator.framework.imps.CuratorFrameworkState;
import org.apache.curator.framework.recipes.cache.ChildData;
import org.apache.curator.framework.recipes.cache.PathChildrenCache;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;
import org.apache.curator.framework.recipes.nodes.PersistentEphemeralNode;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.curator.utils.CloseableUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.llap.registry.ServiceInstance;
import org.apache.hadoop.hive.llap.registry.ServiceInstanceSet;
import org.apache.hadoop.hive.llap.registry.ServiceInstanceStateChangeListener;
import org.apache.hadoop.hive.llap.registry.ServiceRegistry;
import org.apache.hadoop.registry.client.binding.RegistryPathUtils;
import org.apache.hadoop.registry.client.binding.RegistryTypeUtils;
import org.apache.hadoop.registry.client.binding.RegistryUtils;
import org.apache.hadoop.registry.client.types.Endpoint;
import org.apache.hadoop.registry.client.types.ServiceRecord;
import org.apache.hadoop.security.SecurityUtil;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.authentication.util.KerberosUtil;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.data.ACL;
import org.apache.zookeeper.data.Id;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class LlapZookeeperRegistryImpl
implements ServiceRegistry {
    private static final Logger LOG = LoggerFactory.getLogger(LlapZookeeperRegistryImpl.class);
    private static final String IPC_SERVICES = "services";
    private static final String IPC_MNG = "llapmng";
    private static final String IPC_SHUFFLE = "shuffle";
    private static final String IPC_LLAP = "llap";
    private static final String IPC_OUTPUTFORMAT = "llapoutputformat";
    private static final String ROOT_NAMESPACE = "llap";
    private static final String USER_SCOPE_PATH_PREFIX = "user-";
    private static final String DISABLE_MESSAGE = "Set " + HiveConf.ConfVars.LLAP_VALIDATE_ACLS.varname + " to false to disable ACL validation";
    private final Configuration conf;
    private final CuratorFramework zooKeeperClient;
    private final String pathPrefix;
    private final String userPathPrefix;
    private String userNameFromPrincipal;
    private PersistentEphemeralNode znode;
    private String znodePath;
    private final RegistryUtils.ServiceRecordMarshal encoder;
    private DynamicServiceInstanceSet instances;
    private PathChildrenCache instancesCache;
    private static final UUID uniq = UUID.randomUUID();
    private static final String UNIQUE_IDENTIFIER = "llap.unique.id";
    private Set<ServiceInstanceStateChangeListener> stateChangeListeners;
    private static final String hostname;
    private final ACLProvider zooKeeperAclProvider = new ACLProvider(){

        public List<ACL> getDefaultAcl() {
            LOG.warn("getDefaultAcl was called");
            return Lists.newArrayList(ZooDefs.Ids.OPEN_ACL_UNSAFE);
        }

        public List<ACL> getAclForPath(String path) {
            if (!UserGroupInformation.isSecurityEnabled() || path == null || !path.contains(LlapZookeeperRegistryImpl.this.userPathPrefix)) {
                return Lists.newArrayList(ZooDefs.Ids.OPEN_ACL_UNSAFE);
            }
            return LlapZookeeperRegistryImpl.createSecureAcls();
        }
    };

    public LlapZookeeperRegistryImpl(String instanceName, Configuration conf) {
        this.conf = new Configuration(conf);
        this.conf.addResource("yarn-site.xml");
        String zkEnsemble = this.getQuorumServers(this.conf);
        this.encoder = new RegistryUtils.ServiceRecordMarshal();
        int sessionTimeout = (int)HiveConf.getTimeVar(conf, HiveConf.ConfVars.HIVE_ZOOKEEPER_SESSION_TIMEOUT, TimeUnit.MILLISECONDS);
        int baseSleepTime = (int)HiveConf.getTimeVar(conf, HiveConf.ConfVars.HIVE_ZOOKEEPER_CONNECTION_BASESLEEPTIME, TimeUnit.MILLISECONDS);
        int maxRetries = HiveConf.getIntVar(conf, HiveConf.ConfVars.HIVE_ZOOKEEPER_CONNECTION_MAX_RETRIES);
        this.zooKeeperClient = CuratorFrameworkFactory.builder().connectString(zkEnsemble).sessionTimeoutMs(sessionTimeout).aclProvider(this.zooKeeperAclProvider).namespace("llap").retryPolicy((RetryPolicy)new ExponentialBackoffRetry(baseSleepTime, maxRetries)).build();
        this.userPathPrefix = USER_SCOPE_PATH_PREFIX + this.getZkPathUser(this.conf);
        this.pathPrefix = "/" + this.userPathPrefix + "/" + instanceName + "/workers/worker-";
        this.instancesCache = null;
        this.instances = null;
        this.stateChangeListeners = new HashSet<ServiceInstanceStateChangeListener>();
        LOG.info("Llap Zookeeper Registry is enabled with registryid: " + instanceName);
    }

    private static List<ACL> createSecureAcls() {
        ArrayList<ACL> nodeAcls = new ArrayList<ACL>(ZooDefs.Ids.READ_ACL_UNSAFE);
        nodeAcls.addAll(ZooDefs.Ids.CREATOR_ALL_ACL);
        return nodeAcls;
    }

    private String getQuorumServers(Configuration conf) {
        String[] hosts = conf.getTrimmedStrings(HiveConf.ConfVars.HIVE_ZOOKEEPER_QUORUM.varname);
        String port = conf.get(HiveConf.ConfVars.HIVE_ZOOKEEPER_CLIENT_PORT.varname, HiveConf.ConfVars.HIVE_ZOOKEEPER_CLIENT_PORT.getDefaultValue());
        StringBuilder quorum = new StringBuilder();
        for (int i = 0; i < hosts.length; ++i) {
            quorum.append(hosts[i].trim());
            if (!hosts[i].contains(":")) {
                quorum.append(":");
                quorum.append(port);
            }
            if (i == hosts.length - 1) continue;
            quorum.append(",");
        }
        return quorum.toString();
    }

    private String getZkPathUser(Configuration conf) {
        String user = HiveConf.getVar(conf, HiveConf.ConfVars.LLAP_ZK_REGISTRY_USER, RegistryUtils.currentUser());
        return user;
    }

    public Endpoint getRpcEndpoint() {
        int rpcPort = HiveConf.getIntVar(this.conf, HiveConf.ConfVars.LLAP_DAEMON_RPC_PORT);
        return RegistryTypeUtils.ipcEndpoint((String)"llap", (InetSocketAddress)new InetSocketAddress(hostname, rpcPort));
    }

    public Endpoint getShuffleEndpoint() {
        int shufflePort = HiveConf.getIntVar(this.conf, HiveConf.ConfVars.LLAP_DAEMON_YARN_SHUFFLE_PORT);
        return RegistryTypeUtils.inetAddrEndpoint((String)IPC_SHUFFLE, (String)"tcp", (String)hostname, (int)shufflePort);
    }

    public Endpoint getServicesEndpoint() {
        int servicePort = HiveConf.getIntVar(this.conf, HiveConf.ConfVars.LLAP_DAEMON_WEB_PORT);
        boolean isSSL = HiveConf.getBoolVar(this.conf, HiveConf.ConfVars.LLAP_DAEMON_WEB_SSL);
        String scheme = isSSL ? "https" : "http";
        try {
            URL serviceURL = new URL(scheme, hostname, servicePort, "");
            return RegistryTypeUtils.webEndpoint((String)IPC_SERVICES, (URI[])new URI[]{serviceURL.toURI()});
        }
        catch (MalformedURLException e) {
            throw new RuntimeException(e);
        }
        catch (URISyntaxException e) {
            throw new RuntimeException("llap service URI for " + hostname + " is invalid", e);
        }
    }

    public Endpoint getMngEndpoint() {
        return RegistryTypeUtils.ipcEndpoint((String)IPC_MNG, (InetSocketAddress)new InetSocketAddress(hostname, HiveConf.getIntVar(this.conf, HiveConf.ConfVars.LLAP_MANAGEMENT_RPC_PORT)));
    }

    public Endpoint getOutputFormatEndpoint() {
        return RegistryTypeUtils.ipcEndpoint((String)IPC_OUTPUTFORMAT, (InetSocketAddress)new InetSocketAddress(hostname, HiveConf.getIntVar(this.conf, HiveConf.ConfVars.LLAP_DAEMON_OUTPUT_SERVICE_PORT)));
    }

    @Override
    public String register() throws IOException {
        ServiceRecord srv = new ServiceRecord();
        Endpoint rpcEndpoint = this.getRpcEndpoint();
        srv.addInternalEndpoint(rpcEndpoint);
        srv.addInternalEndpoint(this.getMngEndpoint());
        srv.addInternalEndpoint(this.getShuffleEndpoint());
        srv.addExternalEndpoint(this.getServicesEndpoint());
        srv.addInternalEndpoint(this.getOutputFormatEndpoint());
        for (Map.Entry kv : this.conf) {
            if (!((String)kv.getKey()).startsWith("llap.") && !((String)kv.getKey()).startsWith("hive.llap.")) continue;
            srv.set((String)kv.getKey(), kv.getValue());
        }
        srv.set(UNIQUE_IDENTIFIER, (Object)uniq.toString());
        try {
            this.znode = new PersistentEphemeralNode(this.zooKeeperClient, PersistentEphemeralNode.Mode.EPHEMERAL_SEQUENTIAL, this.pathPrefix, this.encoder.toBytes((Object)srv));
            this.znode.start();
            long znodeCreationTimeout = 120L;
            if (!this.znode.waitForInitialCreate(znodeCreationTimeout, TimeUnit.SECONDS)) {
                throw new Exception("Max znode creation wait time: " + znodeCreationTimeout + "s exhausted");
            }
            this.znodePath = this.znode.getActualPath();
            if (HiveConf.getBoolVar(this.conf, HiveConf.ConfVars.LLAP_VALIDATE_ACLS)) {
                try {
                    this.checkAndSetAcls();
                }
                catch (Exception ex) {
                    throw new IOException("Error validating or setting ACLs. " + DISABLE_MESSAGE, ex);
                }
            }
            if (this.zooKeeperClient.checkExists().forPath(this.znodePath) == null) {
                throw new Exception("Unable to create znode for this LLAP instance on ZooKeeper.");
            }
            LOG.info("Registered node. Created a znode on ZooKeeper for LLAP instance: rpc: {}, shuffle: {}, webui: {}, mgmt: {}, znodePath: {} ", new Object[]{rpcEndpoint, this.getShuffleEndpoint(), this.getServicesEndpoint(), this.getMngEndpoint(), this.znodePath});
        }
        catch (Exception e) {
            LOG.error("Unable to create a znode for this server instance", (Throwable)e);
            CloseableUtils.closeQuietly((Closeable)this.znode);
        }
        if (LOG.isDebugEnabled()) {
            LOG.debug("Created zknode with path: {} service record: {}", (Object)this.znodePath, (Object)srv);
        }
        return uniq.toString();
    }

    private void checkAndSetAcls() throws Exception {
        List acls;
        if (!UserGroupInformation.isSecurityEnabled()) {
            return;
        }
        String pathToCheck = this.znodePath;
        int ix = pathToCheck.lastIndexOf(47);
        if (ix > 0) {
            pathToCheck = pathToCheck.substring(0, ix);
        }
        if ((acls = (List)this.zooKeeperClient.getACL().forPath(pathToCheck)) == null || acls.isEmpty()) {
            LOG.warn("No ACLs on " + pathToCheck + "; setting up ACLs. " + DISABLE_MESSAGE);
            this.setUpAcls(pathToCheck);
            return;
        }
        assert (this.userNameFromPrincipal != null);
        Id currentUser = new Id("sasl", this.userNameFromPrincipal);
        for (ACL acl : acls) {
            if ((acl.getPerms() & 0xFFFFFFFE) == 0 || currentUser.equals((Object)acl.getId())) continue;
            LOG.warn("The ACL " + acl + " is unnacceptable for " + pathToCheck + "; setting up ACLs. " + DISABLE_MESSAGE);
            this.setUpAcls(pathToCheck);
            return;
        }
    }

    private void setUpAcls(String path) throws Exception {
        List<ACL> acls = LlapZookeeperRegistryImpl.createSecureAcls();
        LinkedList<String> paths = new LinkedList<String>();
        paths.add(path);
        while (!paths.isEmpty()) {
            String currentPath = (String)paths.poll();
            List children = (List)this.zooKeeperClient.getChildren().forPath(currentPath);
            if (children != null) {
                for (String child : children) {
                    paths.add(currentPath + "/" + child);
                }
            }
            ((BackgroundPathable)this.zooKeeperClient.setACL().withACL(acls)).forPath(currentPath);
        }
    }

    @Override
    public void unregister() throws IOException {
    }

    @Override
    public ServiceInstanceSet getInstances(String component) throws IOException {
        this.checkPathChildrenCache();
        if (this.instances == null) {
            this.instances = new DynamicServiceInstanceSet(this.instancesCache);
        }
        return this.instances;
    }

    @Override
    public synchronized void registerStateChangeListener(ServiceInstanceStateChangeListener listener) throws IOException {
        this.checkPathChildrenCache();
        this.stateChangeListeners.add(listener);
    }

    private synchronized void checkPathChildrenCache() throws IOException {
        Preconditions.checkArgument(this.zooKeeperClient != null && this.zooKeeperClient.getState() == CuratorFrameworkState.STARTED, "client is not started");
        if (this.instancesCache == null) {
            this.instancesCache = new PathChildrenCache(this.zooKeeperClient, RegistryPathUtils.parentOf((String)this.pathPrefix).toString(), true);
            this.instancesCache.getListenable().addListener((Object)new InstanceStateChangeListener(), (Executor)Executors.newFixedThreadPool(1, new ThreadFactoryBuilder().setDaemon(true).setNameFormat("StateChangeNotificationHandler").build()));
            try {
                this.instancesCache.start(PathChildrenCache.StartMode.BUILD_INITIAL_CACHE);
            }
            catch (Exception e) {
                LOG.error("Unable to start curator PathChildrenCache. Exception: {}", (Throwable)e);
                throw new IOException(e);
            }
        }
    }

    @Override
    public void start() throws IOException {
        if (this.zooKeeperClient != null) {
            this.setupZookeeperAuth(this.conf);
            this.zooKeeperClient.start();
        }
        CloseableUtils.class.getName();
    }

    @Override
    public void stop() throws IOException {
        CloseableUtils.closeQuietly((Closeable)this.znode);
        CloseableUtils.closeQuietly((Closeable)this.instancesCache);
        CloseableUtils.closeQuietly((Closeable)this.zooKeeperClient);
    }

    private void setupZookeeperAuth(Configuration conf) throws IOException {
        if (UserGroupInformation.isSecurityEnabled()) {
            LOG.info("UGI security is enabled. Setting up ZK auth.");
            String llapPrincipal = HiveConf.getVar(conf, HiveConf.ConfVars.LLAP_KERBEROS_PRINCIPAL);
            if (llapPrincipal == null || llapPrincipal.isEmpty()) {
                throw new IOException("Llap Kerberos principal is empty");
            }
            String llapKeytab = HiveConf.getVar(conf, HiveConf.ConfVars.LLAP_KERBEROS_KEYTAB_FILE);
            if (llapKeytab == null || llapKeytab.isEmpty()) {
                throw new IOException("Llap Kerberos keytab is empty");
            }
            this.setZookeeperClientKerberosJaasConfig(llapPrincipal, llapKeytab);
        } else {
            LOG.info("UGI security is not enabled. Skipping setting up ZK auth.");
        }
    }

    private void setZookeeperClientKerberosJaasConfig(String principal, String keyTabFile) throws IOException {
        String SASL_LOGIN_CONTEXT_NAME = "LlapZooKeeperClient";
        System.setProperty("zookeeper.sasl.clientconfig", "LlapZooKeeperClient");
        principal = SecurityUtil.getServerPrincipal((String)principal, (String)"0.0.0.0");
        this.userNameFromPrincipal = this.getUserNameFromPrincipal(principal);
        JaasConfiguration jaasConf = new JaasConfiguration("LlapZooKeeperClient", principal, keyTabFile);
        javax.security.auth.login.Configuration.setConfiguration(jaasConf);
    }

    private String getUserNameFromPrincipal(String principal) {
        String[] components = principal.split("[/@]");
        return components == null || components.length != 3 ? principal : components[0];
    }

    static {
        String localhost = "localhost";
        try {
            localhost = InetAddress.getLocalHost().getCanonicalHostName();
        }
        catch (UnknownHostException unknownHostException) {
            // empty catch block
        }
        hostname = localhost;
    }

    private static class JaasConfiguration
    extends javax.security.auth.login.Configuration {
        private final javax.security.auth.login.Configuration baseConfig = javax.security.auth.login.Configuration.getConfiguration();
        private final String loginContextName;
        private final String principal;
        private final String keyTabFile;

        public JaasConfiguration(String llapLoginContextName, String principal, String keyTabFile) {
            this.loginContextName = llapLoginContextName;
            this.principal = principal;
            this.keyTabFile = keyTabFile;
        }

        @Override
        public AppConfigurationEntry[] getAppConfigurationEntry(String appName) {
            if (this.loginContextName.equals(appName)) {
                HashMap<String, String> krbOptions = new HashMap<String, String>();
                krbOptions.put("doNotPrompt", "true");
                krbOptions.put("storeKey", "true");
                krbOptions.put("useKeyTab", "true");
                krbOptions.put("principal", this.principal);
                krbOptions.put("keyTab", this.keyTabFile);
                krbOptions.put("refreshKrb5Config", "true");
                AppConfigurationEntry llapZooKeeperClientEntry = new AppConfigurationEntry(KerberosUtil.getKrb5LoginModuleName(), AppConfigurationEntry.LoginModuleControlFlag.REQUIRED, krbOptions);
                return new AppConfigurationEntry[]{llapZooKeeperClientEntry};
            }
            if (this.baseConfig != null) {
                return this.baseConfig.getAppConfigurationEntry(appName);
            }
            return null;
        }
    }

    private class InstanceStateChangeListener
    implements PathChildrenCacheListener {
        private final Logger LOG = LoggerFactory.getLogger(InstanceStateChangeListener.class);

        private InstanceStateChangeListener() {
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception {
            Preconditions.checkArgument(client != null && client.getState() == CuratorFrameworkState.STARTED, "client is not started");
            InstanceStateChangeListener instanceStateChangeListener = this;
            synchronized (instanceStateChangeListener) {
                if (!LlapZookeeperRegistryImpl.this.stateChangeListeners.isEmpty()) {
                    byte[] data;
                    DynamicServiceInstance instance = null;
                    ChildData childData = event.getData();
                    if (childData != null && (data = childData.getData()) != null) {
                        try {
                            ServiceRecord srv = (ServiceRecord)LlapZookeeperRegistryImpl.this.encoder.fromBytes(event.getData().getPath(), data);
                            instance = new DynamicServiceInstance(srv);
                        }
                        catch (IOException e) {
                            this.LOG.error("Unable to decode data for zknode: {}. Dropping notification of type: {}", (Object)childData.getPath(), (Object)event.getType());
                        }
                    }
                    for (ServiceInstanceStateChangeListener listener : LlapZookeeperRegistryImpl.this.stateChangeListeners) {
                        if (event.getType() == PathChildrenCacheEvent.Type.CHILD_ADDED) {
                            this.LOG.info("Added zknode {} to llap namespace. Notifying state change listener.", (Object)event.getData().getPath());
                            listener.onCreate(instance);
                            continue;
                        }
                        if (event.getType() == PathChildrenCacheEvent.Type.CHILD_UPDATED) {
                            this.LOG.info("Updated zknode {} in llap namespace. Notifying state change listener.", (Object)event.getData().getPath());
                            listener.onUpdate(instance);
                            continue;
                        }
                        if (event.getType() != PathChildrenCacheEvent.Type.CHILD_REMOVED) continue;
                        this.LOG.info("Removed zknode {} from llap namespace. Notifying state change listener.", (Object)event.getData().getPath());
                        listener.onRemove(instance);
                    }
                }
            }
        }
    }

    private class DynamicServiceInstanceSet
    implements ServiceInstanceSet {
        private final PathChildrenCache instancesCache;

        public DynamicServiceInstanceSet(PathChildrenCache cache) {
            this.instancesCache = cache;
        }

        @Override
        public Map<String, ServiceInstance> getAll() {
            LinkedHashMap<String, ServiceInstance> instances = new LinkedHashMap<String, ServiceInstance>();
            for (ChildData childData : this.instancesCache.getCurrentData()) {
                byte[] data;
                if (childData == null || (data = childData.getData()) == null) continue;
                try {
                    ServiceRecord srv = (ServiceRecord)LlapZookeeperRegistryImpl.this.encoder.fromBytes(childData.getPath(), data);
                    DynamicServiceInstance instance = new DynamicServiceInstance(srv);
                    instances.put(childData.getPath(), instance);
                }
                catch (IOException e) {
                    LOG.error("Unable to decode data for zkpath: {}. Ignoring from current instances list..", (Object)childData.getPath());
                }
            }
            return instances;
        }

        @Override
        public List<ServiceInstance> getAllInstancesOrdered() {
            LinkedList<ServiceInstance> list = new LinkedList<ServiceInstance>();
            list.addAll(LlapZookeeperRegistryImpl.this.instances.getAll().values());
            Collections.sort(list, new Comparator<ServiceInstance>(){

                @Override
                public int compare(ServiceInstance o1, ServiceInstance o2) {
                    return o2.getWorkerIdentity().compareTo(o2.getWorkerIdentity());
                }
            });
            return list;
        }

        @Override
        public ServiceInstance getInstance(String name) {
            byte[] data;
            ChildData childData = this.instancesCache.getCurrentData(name);
            if (childData != null && (data = childData.getData()) != null) {
                try {
                    ServiceRecord srv = (ServiceRecord)LlapZookeeperRegistryImpl.this.encoder.fromBytes(name, data);
                    DynamicServiceInstance instance = new DynamicServiceInstance(srv);
                    return instance;
                }
                catch (IOException e) {
                    LOG.error("Unable to decode data for zkpath: {}", (Object)name);
                    return null;
                }
            }
            return null;
        }

        @Override
        public Set<ServiceInstance> getByHost(String host) {
            HashSet<ServiceInstance> byHost = new HashSet<ServiceInstance>();
            for (ChildData childData : this.instancesCache.getCurrentData()) {
                byte[] data;
                if (childData == null || (data = childData.getData()) == null) continue;
                try {
                    ServiceRecord srv = (ServiceRecord)LlapZookeeperRegistryImpl.this.encoder.fromBytes(childData.getPath(), data);
                    DynamicServiceInstance instance = new DynamicServiceInstance(srv);
                    if (host.equals(instance.getHost())) {
                        byHost.add(instance);
                    }
                    if (!LOG.isDebugEnabled()) continue;
                    LOG.debug("Locality comparing " + host + " to " + instance.getHost());
                }
                catch (IOException e) {
                    LOG.error("Unable to decode data for zkpath: {}. Ignoring host from current instances list..", (Object)childData.getPath());
                }
            }
            if (LOG.isDebugEnabled()) {
                LOG.debug("Returning " + byHost.size() + " hosts for locality allocation on " + host);
            }
            return byHost;
        }

        @Override
        public int size() {
            return this.instancesCache.getCurrentData().size();
        }
    }

    private class DynamicServiceInstance
    implements ServiceInstance {
        private final ServiceRecord srv;
        private boolean alive = true;
        private final String host;
        private final int rpcPort;
        private final int mngPort;
        private final int shufflePort;
        private final int outputFormatPort;
        private final String serviceAddress;

        public DynamicServiceInstance(ServiceRecord srv) throws IOException {
            this.srv = srv;
            if (LOG.isDebugEnabled()) {
                LOG.debug("Working with ServiceRecord: {}", (Object)srv);
            }
            Endpoint shuffle = srv.getInternalEndpoint(LlapZookeeperRegistryImpl.IPC_SHUFFLE);
            Endpoint rpc = srv.getInternalEndpoint("llap");
            Endpoint mng = srv.getInternalEndpoint(LlapZookeeperRegistryImpl.IPC_MNG);
            Endpoint outputFormat = srv.getInternalEndpoint(LlapZookeeperRegistryImpl.IPC_OUTPUTFORMAT);
            Endpoint services = srv.getExternalEndpoint(LlapZookeeperRegistryImpl.IPC_SERVICES);
            this.host = RegistryTypeUtils.getAddressField((Map)((Map)rpc.addresses.get(0)), (String)"host");
            this.rpcPort = Integer.parseInt(RegistryTypeUtils.getAddressField((Map)((Map)rpc.addresses.get(0)), (String)"port"));
            this.mngPort = Integer.parseInt(RegistryTypeUtils.getAddressField((Map)((Map)mng.addresses.get(0)), (String)"port"));
            this.shufflePort = Integer.parseInt(RegistryTypeUtils.getAddressField((Map)((Map)shuffle.addresses.get(0)), (String)"port"));
            this.outputFormatPort = Integer.valueOf(RegistryTypeUtils.getAddressField((Map)((Map)outputFormat.addresses.get(0)), (String)"port"));
            this.serviceAddress = RegistryTypeUtils.getAddressField((Map)((Map)services.addresses.get(0)), (String)"uri");
        }

        @Override
        public String getWorkerIdentity() {
            return this.srv.get(LlapZookeeperRegistryImpl.UNIQUE_IDENTIFIER);
        }

        @Override
        public String getHost() {
            return this.host;
        }

        @Override
        public int getRpcPort() {
            return this.rpcPort;
        }

        @Override
        public int getShufflePort() {
            return this.shufflePort;
        }

        @Override
        public String getServicesAddress() {
            return this.serviceAddress;
        }

        @Override
        public boolean isAlive() {
            return this.alive;
        }

        public void kill() {
            LOG.info("Killing service instance: " + this);
            this.alive = false;
        }

        @Override
        public Map<String, String> getProperties() {
            return this.srv.attributes();
        }

        @Override
        public Resource getResource() {
            int memory = Integer.parseInt(this.srv.get(HiveConf.ConfVars.LLAP_DAEMON_MEMORY_PER_INSTANCE_MB.varname));
            int vCores = Integer.parseInt(this.srv.get(HiveConf.ConfVars.LLAP_DAEMON_NUM_EXECUTORS.varname));
            return Resource.newInstance((int)memory, (int)vCores);
        }

        public String toString() {
            return "DynamicServiceInstance [alive=" + this.alive + ", host=" + this.host + ":" + this.rpcPort + " with resources=" + this.getResource() + ", shufflePort=" + this.getShufflePort() + ", servicesAddress=" + this.getServicesAddress() + ", mgmtPort=" + this.getManagementPort() + "]";
        }

        @Override
        public int getManagementPort() {
            return this.mngPort;
        }

        @Override
        public int getOutputFormatPort() {
            return this.outputFormatPort;
        }
    }
}

