/*
 * Decompiled with CFR 0.152.
 */
package com.netflix.genie.web.agent.services.impl;

import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import com.netflix.genie.common.internal.util.GenieHostInfo;
import com.netflix.genie.web.agent.services.AgentRoutingService;
import com.netflix.genie.web.properties.AgentRoutingServiceProperties;
import com.netflix.genie.web.util.MetricsUtils;
import io.micrometer.core.instrument.MeterRegistry;
import io.micrometer.core.instrument.Tag;
import java.time.Instant;
import java.util.HashSet;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import javax.validation.constraints.NotBlank;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.listen.Listenable;
import org.apache.curator.framework.state.ConnectionState;
import org.apache.curator.framework.state.ConnectionStateListener;
import org.apache.curator.x.discovery.ServiceDiscovery;
import org.apache.curator.x.discovery.ServiceInstance;
import org.apache.curator.x.discovery.ServiceType;
import org.apache.zookeeper.KeeperException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.scheduling.TaskScheduler;

public class AgentRoutingServiceCuratorDiscoveryImpl
implements AgentRoutingService {
    private static final Logger log = LoggerFactory.getLogger(AgentRoutingServiceCuratorDiscoveryImpl.class);
    private static final String SERVICE_NAME = "agent_connections";
    private static final String METRICS_PREFIX = "genie.agents.connections.";
    private static final String CONNECTED_AGENTS_GAUGE_NAME = "genie.agents.connections.connected.gauge";
    private static final String REGISTERED_AGENTS_GAUGE_NAME = "genie.agents.connections.registered.gauge";
    private static final String ZOOKEEPER_SESSION_STATE_COUNTER_NAME = "genie.agents.connections.zookeeperSessionState.counter";
    private static final String AGENT_REGISTERED_TIMER_NAME = "genie.agents.connections.registered.timer";
    private static final String AGENT_UNREGISTERED_TIMER_NAME = "genie.agents.connections.unregistered.timer";
    private static final String AGENT_REFRESH_TIMER_NAME = "genie.agents.connections.refreshed.timer";
    private static final String AGENT_CONNECTED_COUNTER_NAME = "genie.agents.connections.connected.counter";
    private static final String AGENT_DISCONNECTED_COUNTER_NAME = "genie.agents.connections.disconnected.counter";
    private static final String AGENT_LOOKUP_TIMER_NAME = "genie.agents.connections.lookup.timer";
    private static final String ZK_CONNECTION_STATE_TAG_NAME = "connectionState";
    private static final String ROUTE_FOUND_TAG_NAME = "found";
    private static final Set<Tag> EMPTY_TAG_SET = ImmutableSet.of();
    private final String localHostname;
    private final ServiceDiscovery<Agent> serviceDiscovery;
    private final TaskScheduler taskScheduler;
    private final MeterRegistry registry;
    private final AgentRoutingServiceProperties properties;
    private final Set<String> connectedAgentsSet;
    private final Map<String, ServiceInstance<Agent>> registeredAgentsMap;
    private final PriorityBlockingQueue<RegisterMutation> registrationQueue;
    private final AtomicReference<Thread> registrationTaskThread;
    private final ThreadFactory threadFactory;

    public AgentRoutingServiceCuratorDiscoveryImpl(GenieHostInfo genieHostInfo, ServiceDiscovery<Agent> serviceDiscovery, TaskScheduler taskScheduler, Listenable<ConnectionStateListener> listenableCuratorConnectionState, MeterRegistry registry, AgentRoutingServiceProperties properties) {
        this(genieHostInfo, serviceDiscovery, taskScheduler, listenableCuratorConnectionState, registry, properties, new ThreadFactory(){
            private final AtomicLong threadCounter = new AtomicLong();

            @Override
            public Thread newThread(Runnable r) {
                return new Thread(r, this.getClass().getSimpleName() + "-registration-" + this.threadCounter.incrementAndGet());
            }
        });
    }

    @VisibleForTesting
    AgentRoutingServiceCuratorDiscoveryImpl(GenieHostInfo genieHostInfo, ServiceDiscovery<Agent> serviceDiscovery, TaskScheduler taskScheduler, Listenable<ConnectionStateListener> listenableCuratorConnectionState, MeterRegistry registry, AgentRoutingServiceProperties properties, ThreadFactory threadFactory) {
        this.localHostname = genieHostInfo.getHostname();
        this.serviceDiscovery = serviceDiscovery;
        this.taskScheduler = taskScheduler;
        this.registry = registry;
        this.threadFactory = threadFactory;
        this.properties = properties;
        this.registeredAgentsMap = Maps.newConcurrentMap();
        this.connectedAgentsSet = Sets.newConcurrentHashSet();
        this.registrationQueue = new PriorityBlockingQueue();
        this.registrationTaskThread = new AtomicReference();
        registry.gauge(CONNECTED_AGENTS_GAUGE_NAME, EMPTY_TAG_SET, this.connectedAgentsSet, Set::size);
        registry.gaugeMapSize(REGISTERED_AGENTS_GAUGE_NAME, EMPTY_TAG_SET, this.registeredAgentsMap);
        listenableCuratorConnectionState.addListener(this::handleConnectionStateChange);
        this.startRegistrationThread();
    }

    private void startRegistrationThread() {
        Thread newThread = this.threadFactory.newThread(this::registrationTask);
        Thread oldThread = this.registrationTaskThread.getAndSet(newThread);
        if (oldThread != null) {
            oldThread.interrupt();
        }
        newThread.start();
    }

    private void stopRegistrationThread() {
        Thread thread = this.registrationTaskThread.getAndSet(null);
        if (thread != null) {
            thread.interrupt();
        }
    }

    private void registrationTask() {
        try {
            while (true) {
                this.processNextRegistrationMutation();
            }
        }
        catch (InterruptedException e) {
            log.debug("Registration thread terminating");
            return;
        }
    }

    private void processNextRegistrationMutation() throws InterruptedException {
        RegisterMutation mutation = null;
        try {
            mutation = this.registrationQueue.take();
            String jobId = mutation.getJobId();
            boolean agentIsConnected = this.connectedAgentsSet.contains(mutation.getJobId());
            if (agentIsConnected) {
                if (mutation.isRefresh()) {
                    this.refreshAgentConnection(jobId);
                } else {
                    this.registerAgentConnection(jobId);
                }
                this.taskScheduler.schedule(() -> this.registrationQueue.add(RegisterMutation.refresh(jobId)), Instant.now().plus(this.properties.getRefreshInterval()));
            } else {
                this.unregisterAgentConnection(jobId);
            }
        }
        catch (InterruptedException e) {
            log.warn("Registration task interrupted", (Throwable)e);
            if (mutation != null) {
                this.registrationQueue.add(mutation);
            }
            throw e;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void registerAgentConnection(String jobId) throws InterruptedException {
        log.debug("Registering route for job: {}", (Object)jobId);
        ServiceInstance serviceInstance = new ServiceInstance(SERVICE_NAME, jobId, this.localHostname, null, null, (Object)new Agent(jobId), Instant.now().getEpochSecond(), ServiceType.DYNAMIC, null);
        Set<Tag> tags = MetricsUtils.newSuccessTagsSet();
        long start = System.nanoTime();
        try {
            this.serviceDiscovery.registerService(serviceInstance);
            this.registeredAgentsMap.put(jobId, (ServiceInstance<Agent>)serviceInstance);
        }
        catch (InterruptedException e) {
            log.debug("Interrupted while registering {}", (Object)jobId);
            tags = MetricsUtils.newFailureTagsSetForException(e);
            throw e;
        }
        catch (Exception e) {
            log.error("Failed to register agent executing job: {}", (Object)jobId, (Object)e);
            tags = MetricsUtils.newFailureTagsSetForException(e);
        }
        finally {
            this.registry.timer(AGENT_REGISTERED_TIMER_NAME, tags).record(System.nanoTime() - start, TimeUnit.NANOSECONDS);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void refreshAgentConnection(String jobId) throws InterruptedException {
        log.debug("Refreshing route for job: {}", (Object)jobId);
        ServiceInstance<Agent> serviceInstance = this.registeredAgentsMap.get(jobId);
        if (serviceInstance == null) {
            log.warn("Instance record not found for job {}", (Object)jobId);
            this.registerAgentConnection(jobId);
            return;
        }
        Set<Tag> tags = MetricsUtils.newSuccessTagsSet();
        long start = System.nanoTime();
        try {
            this.serviceDiscovery.updateService(serviceInstance);
        }
        catch (KeeperException.NoNodeException e) {
            log.warn("Failed to update registration of agent executing job id: {}", (Object)jobId);
            this.registerAgentConnection(jobId);
        }
        catch (InterruptedException e) {
            log.debug("Interrupted while refreshing {}", (Object)jobId);
            tags = MetricsUtils.newFailureTagsSetForException(e);
            throw e;
        }
        catch (Exception e) {
            log.error("Failed to refresh agent executing job id: {}", (Object)jobId);
            tags = MetricsUtils.newFailureTagsSetForException(e);
        }
        finally {
            this.registry.timer(AGENT_REFRESH_TIMER_NAME, tags).record(System.nanoTime() - start, TimeUnit.NANOSECONDS);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void unregisterAgentConnection(String jobId) throws InterruptedException {
        ServiceInstance<Agent> serviceInstance = this.registeredAgentsMap.get(jobId);
        if (serviceInstance == null) {
            log.debug("Skipping unregistration, already removed");
            return;
        }
        log.debug("Unregistering route for job: {}", (Object)jobId);
        Set<Tag> tags = MetricsUtils.newSuccessTagsSet();
        long start = System.nanoTime();
        try {
            this.serviceDiscovery.unregisterService(serviceInstance);
            this.registeredAgentsMap.remove(jobId);
        }
        catch (InterruptedException e) {
            log.debug("Interrupted while unregistering {}", (Object)jobId);
            tags = MetricsUtils.newFailureTagsSetForException(e);
            throw e;
        }
        catch (Exception e) {
            log.error("Failed to unregister agent executing job id: {}", (Object)jobId);
            tags = MetricsUtils.newFailureTagsSetForException(e);
        }
        finally {
            this.registry.timer(AGENT_UNREGISTERED_TIMER_NAME, tags).record(System.nanoTime() - start, TimeUnit.NANOSECONDS);
        }
    }

    private void handleConnectionStateChange(CuratorFramework client, ConnectionState newState) {
        this.registry.counter(ZOOKEEPER_SESSION_STATE_COUNTER_NAME, (Iterable)Sets.newHashSet((Object[])new Tag[]{Tag.of((String)ZK_CONNECTION_STATE_TAG_NAME, (String)newState.name())})).increment();
        log.info("Zookeeper/Curator client: {}", (Object)newState);
        switch (newState) {
            case CONNECTED: 
            case RECONNECTED: {
                this.startRegistrationThread();
                break;
            }
            case LOST: 
            case SUSPENDED: {
                this.stopRegistrationThread();
                break;
            }
            default: {
                log.warn("Zookeeper/Curator unhandled connection state: {}", (Object)newState);
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public Optional<String> getHostnameForAgentConnection(@NotBlank String jobId) {
        log.debug("Looking up agent executing job: {}", (Object)jobId);
        if (this.isAgentConnectionLocal(jobId)) {
            return Optional.of(this.localHostname);
        }
        long start = System.nanoTime();
        HashSet tags = Sets.newHashSet();
        String address = null;
        try {
            ServiceInstance instance = this.serviceDiscovery.queryForInstance(SERVICE_NAME, jobId);
            if (instance == null) {
                log.debug("Could not find agent connection for job {}", (Object)jobId);
            } else {
                address = instance.getAddress();
            }
            MetricsUtils.addSuccessTags(tags);
            tags.add(Tag.of((String)ROUTE_FOUND_TAG_NAME, (String)String.valueOf(address != null)));
        }
        catch (Exception e) {
            try {
                log.error("Error looking up agent connection for job {}", (Object)jobId, (Object)e);
                address = null;
                MetricsUtils.addFailureTagsWithException(tags, e);
                tags.add(Tag.of((String)ROUTE_FOUND_TAG_NAME, (String)String.valueOf(address != null)));
            }
            catch (Throwable throwable) {
                tags.add(Tag.of((String)ROUTE_FOUND_TAG_NAME, (String)String.valueOf(address != null)));
                this.registry.timer(AGENT_LOOKUP_TIMER_NAME, (Iterable)tags).record(System.nanoTime() - start, TimeUnit.NANOSECONDS);
                throw throwable;
            }
            this.registry.timer(AGENT_LOOKUP_TIMER_NAME, (Iterable)tags).record(System.nanoTime() - start, TimeUnit.NANOSECONDS);
        }
        this.registry.timer(AGENT_LOOKUP_TIMER_NAME, (Iterable)tags).record(System.nanoTime() - start, TimeUnit.NANOSECONDS);
        return Optional.ofNullable(address);
    }

    @Override
    public boolean isAgentConnectionLocal(@NotBlank String jobId) {
        return this.connectedAgentsSet.contains(jobId);
    }

    @Override
    public void handleClientConnected(@NotBlank String jobId) {
        log.debug("Adding to routing table (pending registration): {}", (Object)jobId);
        boolean isNew = this.connectedAgentsSet.add(jobId);
        this.registrationQueue.add(RegisterMutation.update(jobId));
        if (isNew) {
            this.registry.counter(AGENT_CONNECTED_COUNTER_NAME, new String[0]).increment();
        }
    }

    @Override
    public void handleClientDisconnected(@NotBlank String jobId) {
        log.debug("Removing from routing table (pending un-registration): {}", (Object)jobId);
        boolean removed = this.connectedAgentsSet.remove(jobId);
        this.registrationQueue.add(RegisterMutation.update(jobId));
        if (removed) {
            this.registry.counter(AGENT_DISCONNECTED_COUNTER_NAME, new String[0]).increment();
        }
    }

    @Override
    public boolean isAgentConnected(String jobId) {
        return this.getHostnameForAgentConnection(jobId).isPresent();
    }

    private static final class RegisterMutation
    implements Comparable<RegisterMutation> {
        private final String jobId;
        private final boolean refresh;
        private final long timestamp;

        private RegisterMutation(String jobId, boolean refresh) {
            this.jobId = jobId;
            this.refresh = refresh;
            this.timestamp = System.nanoTime();
        }

        static RegisterMutation refresh(String jobId) {
            return new RegisterMutation(jobId, true);
        }

        static RegisterMutation update(String jobId) {
            return new RegisterMutation(jobId, false);
        }

        @Override
        public int compareTo(RegisterMutation other) {
            if (this.isRefresh() == other.isRefresh()) {
                long timestampDifference = this.getTimestamp() - other.getTimestamp();
                if (timestampDifference == 0L) {
                    return this.getJobId().compareTo(other.getJobId());
                }
                return this.timestamp > 0L ? 1 : -1;
            }
            return this.isRefresh() ? 1 : -1;
        }

        public String getJobId() {
            return this.jobId;
        }

        public boolean isRefresh() {
            return this.refresh;
        }

        public long getTimestamp() {
            return this.timestamp;
        }

        public boolean equals(Object o) {
            if (o == this) {
                return true;
            }
            if (!(o instanceof RegisterMutation)) {
                return false;
            }
            RegisterMutation other = (RegisterMutation)o;
            String this$jobId = this.getJobId();
            String other$jobId = other.getJobId();
            if (this$jobId == null ? other$jobId != null : !this$jobId.equals(other$jobId)) {
                return false;
            }
            if (this.isRefresh() != other.isRefresh()) {
                return false;
            }
            return this.getTimestamp() == other.getTimestamp();
        }

        public int hashCode() {
            int PRIME = 59;
            int result = 1;
            String $jobId = this.getJobId();
            result = result * 59 + ($jobId == null ? 43 : $jobId.hashCode());
            result = result * 59 + (this.isRefresh() ? 79 : 97);
            long $timestamp = this.getTimestamp();
            result = result * 59 + (int)($timestamp >>> 32 ^ $timestamp);
            return result;
        }
    }

    public static final class Agent {
        private final String jobId;

        private Agent() {
            this.jobId = null;
        }

        public Agent(@JsonProperty(value="jobId", required=true) String jobId) {
            this.jobId = jobId;
        }

        public String getJobId() {
            return this.jobId;
        }

        public boolean equals(Object o) {
            if (o == this) {
                return true;
            }
            if (!(o instanceof Agent)) {
                return false;
            }
            Agent other = (Agent)o;
            String this$jobId = this.getJobId();
            String other$jobId = other.getJobId();
            return !(this$jobId == null ? other$jobId != null : !this$jobId.equals(other$jobId));
        }

        public int hashCode() {
            int PRIME = 59;
            int result = 1;
            String $jobId = this.getJobId();
            result = result * 59 + ($jobId == null ? 43 : $jobId.hashCode());
            return result;
        }
    }
}

