/*
 * Decompiled with CFR 0.152.
 */
package com.netflix.spinnaker.cats.redis.cluster;

import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.netflix.spinnaker.cats.agent.Agent;
import com.netflix.spinnaker.cats.agent.AgentExecution;
import com.netflix.spinnaker.cats.agent.AgentLock;
import com.netflix.spinnaker.cats.agent.AgentScheduler;
import com.netflix.spinnaker.cats.agent.AgentSchedulerAware;
import com.netflix.spinnaker.cats.agent.ExecutionInstrumentation;
import com.netflix.spinnaker.cats.cluster.AgentIntervalProvider;
import com.netflix.spinnaker.cats.cluster.NodeIdentity;
import com.netflix.spinnaker.cats.cluster.NodeStatusProvider;
import com.netflix.spinnaker.cats.cluster.ShardingFilter;
import com.netflix.spinnaker.cats.module.CatsModuleAware;
import com.netflix.spinnaker.kork.dynamicconfig.DynamicConfigService;
import com.netflix.spinnaker.kork.jedis.RedisClientDelegate;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import lombok.Generated;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import redis.clients.jedis.params.SetParams;

public class ClusteredAgentScheduler
extends CatsModuleAware
implements AgentScheduler<AgentLock>,
Runnable {
    private static final Logger logger = LoggerFactory.getLogger(ClusteredAgentScheduler.class);
    private final RedisClientDelegate redisClientDelegate;
    private final NodeIdentity nodeIdentity;
    private final AgentIntervalProvider intervalProvider;
    private final ExecutorService agentExecutionPool;
    private final Pattern enabledAgentPattern;
    private final Map<String, AgentExecutionAction> agents = new ConcurrentHashMap<String, AgentExecutionAction>();
    private final Map<String, NextAttempt> activeAgents = new ConcurrentHashMap<String, NextAttempt>();
    private final NodeStatusProvider nodeStatusProvider;
    private final DynamicConfigService dynamicConfigService;
    private final ShardingFilter shardingFilter;
    private static final long MIN_TTL_THRESHOLD = 500L;
    private static final String SET_IF_NOT_EXIST = "NX";
    private static final String SET_EXPIRE_TIME_MILLIS = "PX";
    private static final String SUCCESS_RESPONSE = "OK";
    private static final Long DEL_SUCCESS = 1L;
    private static final String DELETE_LOCK_KEY = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end";
    private static final String TTL_LOCK_KEY = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('set', KEYS[1], ARGV[1], 'PX', ARGV[2], 'XX') else return nil end";

    public ClusteredAgentScheduler(RedisClientDelegate redisClientDelegate, NodeIdentity nodeIdentity, AgentIntervalProvider intervalProvider, NodeStatusProvider nodeStatusProvider, String enabledAgentPattern, Integer agentLockAcquisitionIntervalSeconds, DynamicConfigService dynamicConfigService, ShardingFilter shardingFilter) {
        this(redisClientDelegate, nodeIdentity, intervalProvider, nodeStatusProvider, Executors.newSingleThreadScheduledExecutor(new ThreadFactoryBuilder().setNameFormat(ClusteredAgentScheduler.class.getSimpleName() + "-%d").build()), Executors.newCachedThreadPool(new ThreadFactoryBuilder().setNameFormat(AgentExecutionAction.class.getSimpleName() + "-%d").build()), enabledAgentPattern, agentLockAcquisitionIntervalSeconds, dynamicConfigService, shardingFilter);
    }

    public ClusteredAgentScheduler(RedisClientDelegate redisClientDelegate, NodeIdentity nodeIdentity, AgentIntervalProvider intervalProvider, NodeStatusProvider nodeStatusProvider, ScheduledExecutorService lockPollingScheduler, ExecutorService agentExecutionPool, String enabledAgentPattern, Integer agentLockAcquisitionIntervalSeconds, DynamicConfigService dynamicConfigService, ShardingFilter shardingFilter) {
        this.redisClientDelegate = redisClientDelegate;
        this.nodeIdentity = nodeIdentity;
        this.intervalProvider = intervalProvider;
        this.nodeStatusProvider = nodeStatusProvider;
        this.agentExecutionPool = agentExecutionPool;
        this.enabledAgentPattern = Pattern.compile(enabledAgentPattern);
        this.dynamicConfigService = dynamicConfigService;
        this.shardingFilter = shardingFilter;
        Integer lockInterval = agentLockAcquisitionIntervalSeconds == null ? 1 : agentLockAcquisitionIntervalSeconds;
        lockPollingScheduler.scheduleAtFixedRate(this, 0L, lockInterval.intValue(), TimeUnit.SECONDS);
    }

    private Map<String, NextAttempt> acquire() {
        HashSet<String> skip = new HashSet<String>(this.activeAgents.keySet());
        Integer maxConcurrentAgents = (Integer)this.dynamicConfigService.getConfig(Integer.class, "redis.agent.max-concurrent-agents", (Object)1000);
        Integer availableAgents = maxConcurrentAgents - skip.size();
        if (availableAgents <= 0) {
            logger.debug("Not acquiring more locks (maxConcurrentAgents: {} activeAgents: {}, runningAgents: {})", new Object[]{maxConcurrentAgents, skip.size(), skip.stream().sorted().collect(Collectors.joining(","))});
            return Collections.emptyMap();
        }
        HashMap<String, NextAttempt> acquired = new HashMap<String, NextAttempt>(this.agents.size());
        ArrayList<Map.Entry<String, AgentExecutionAction>> agentsEntrySet = new ArrayList<Map.Entry<String, AgentExecutionAction>>(this.agents.entrySet());
        Collections.shuffle(agentsEntrySet);
        for (Map.Entry entry : agentsEntrySet) {
            AgentIntervalProvider.Interval interval;
            String agentType;
            if (this.shardingFilter.filter(((AgentExecutionAction)entry.getValue()).getAgent()) && !skip.contains(entry.getKey()) && this.acquireRunKey(agentType = (String)entry.getKey(), (interval = this.intervalProvider.getInterval(((AgentExecutionAction)entry.getValue()).getAgent())).getTimeout())) {
                acquired.put(agentType, new NextAttempt(System.currentTimeMillis(), interval.getInterval(), interval.getErrorInterval(), interval.getTimeout()));
            }
            if (acquired.size() < availableAgents) continue;
            return acquired;
        }
        return acquired;
    }

    @Override
    public void run() {
        if (!this.nodeStatusProvider.isNodeEnabled()) {
            return;
        }
        try {
            this.pruneActiveAgents();
            this.runAgents();
        }
        catch (Throwable t) {
            logger.error("Unable to run agents", t);
        }
    }

    private void pruneActiveAgents() {
        long currentTime = System.currentTimeMillis();
        int count = 0;
        for (Map.Entry<String, NextAttempt> activeAgent : this.activeAgents.entrySet()) {
            long removalTime = activeAgent.getValue().currentTime + activeAgent.getValue().timeout;
            if (removalTime + 500L >= currentTime) continue;
            logger.info("removing agent: {} from the active agents map as its max execution time has elapsed", (Object)activeAgent.getKey());
            this.activeAgents.remove(activeAgent.getKey());
            ++count;
        }
        if (count > 0) {
            logger.info("removed {} accounts from the active agents map as their max execution times have elapsed", (Object)count);
        }
    }

    private void runAgents() {
        Map<String, NextAttempt> thisRun = this.acquire();
        this.activeAgents.putAll(thisRun);
        logger.debug("scheduling {} new agents, total number of active agents: {}", (Object)thisRun.size(), (Object)this.activeAgents.size());
        for (Map.Entry<String, NextAttempt> toRun : thisRun.entrySet()) {
            AgentExecutionAction exec = this.agents.get(toRun.getKey());
            this.agentExecutionPool.submit(new AgentJob(toRun.getValue(), exec, this));
        }
    }

    private boolean acquireRunKey(String agentType, long timeout) {
        return (Boolean)this.redisClientDelegate.withCommandsClient(client -> {
            String response = client.set(agentType, this.nodeIdentity.getNodeIdentity(), SetParams.setParams().nx().px(timeout));
            return SUCCESS_RESPONSE.equals(response);
        });
    }

    private boolean deleteLock(String agentType) {
        return (Boolean)this.redisClientDelegate.withScriptingClient(client -> {
            Object response = client.eval(DELETE_LOCK_KEY, Arrays.asList(agentType), Arrays.asList(this.nodeIdentity.getNodeIdentity()));
            return DEL_SUCCESS.equals(response);
        });
    }

    private boolean ttlLock(String agentType, long newTtl) {
        return (Boolean)this.redisClientDelegate.withScriptingClient(client -> {
            Object response = client.eval(TTL_LOCK_KEY, Arrays.asList(agentType), Arrays.asList(this.nodeIdentity.getNodeIdentity(), Long.toString(newTtl)));
            return SUCCESS_RESPONSE.equals(response);
        });
    }

    private void releaseRunKey(String agentType, long when) {
        boolean delete;
        long newTtl = when - System.currentTimeMillis();
        boolean bl = delete = newTtl < 500L;
        if (delete) {
            boolean success = this.deleteLock(agentType);
            if (!success) {
                logger.debug("Delete lock was unsuccessful for " + agentType);
            }
        } else {
            boolean success = this.ttlLock(agentType, newTtl);
            if (!success) {
                logger.debug("Ttl lock was unsuccessful for " + agentType);
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void agentCompleted(String agentType, long nextExecutionTime) {
        try {
            this.releaseRunKey(agentType, nextExecutionTime);
        }
        finally {
            this.activeAgents.remove(agentType);
        }
    }

    public void schedule(Agent agent, AgentExecution agentExecution, ExecutionInstrumentation executionInstrumentation) {
        if (!this.enabledAgentPattern.matcher(agent.getAgentType().toLowerCase()).matches()) {
            logger.debug("Agent is not enabled (agent: {}, agentType: {}, pattern: {})", new Object[]{agent.getClass().getSimpleName(), agent.getAgentType(), this.enabledAgentPattern.pattern()});
            return;
        }
        if (agent instanceof AgentSchedulerAware) {
            ((AgentSchedulerAware)agent).setAgentScheduler((AgentScheduler)this);
        }
        AgentExecutionAction agentExecutionAction = new AgentExecutionAction(agent, agentExecution, executionInstrumentation);
        this.agents.put(agent.getAgentType(), agentExecutionAction);
    }

    public void unschedule(Agent agent) {
        try {
            this.releaseRunKey(agent.getAgentType(), 0L);
        }
        finally {
            this.agents.remove(agent.getAgentType());
            this.activeAgents.remove(agent.getAgentType());
        }
    }

    @Generated
    public Map<String, AgentExecutionAction> getAgents() {
        return this.agents;
    }

    @Generated
    public Map<String, NextAttempt> getActiveAgents() {
        return this.activeAgents;
    }

    @Generated
    public static String getDELETE_LOCK_KEY() {
        return DELETE_LOCK_KEY;
    }

    @Generated
    public static String getTTL_LOCK_KEY() {
        return TTL_LOCK_KEY;
    }

    private static class AgentExecutionAction {
        private final Agent agent;
        private final AgentExecution agentExecution;
        private final ExecutionInstrumentation executionInstrumentation;

        public AgentExecutionAction(Agent agent, AgentExecution agentExecution, ExecutionInstrumentation executionInstrumentation) {
            this.agent = agent;
            this.agentExecution = agentExecution;
            this.executionInstrumentation = executionInstrumentation;
        }

        public Agent getAgent() {
            return this.agent;
        }

        Status execute() {
            long startTimeMs = System.currentTimeMillis();
            try {
                this.executionInstrumentation.executionStarted(this.agent);
                this.agentExecution.executeAgent(this.agent);
                this.executionInstrumentation.executionCompleted(this.agent, ExecutionInstrumentation.elapsedTimeMs((long)startTimeMs));
                return Status.SUCCESS;
            }
            catch (Throwable cause) {
                this.executionInstrumentation.executionFailed(this.agent, cause, ExecutionInstrumentation.elapsedTimeMs((long)startTimeMs));
                return Status.FAILURE;
            }
        }
    }

    private static class NextAttempt {
        private final long currentTime;
        private final long successInterval;
        private final long errorInterval;
        private final long timeout;

        public NextAttempt(long currentTime, long successInterval, long errorInterval, long timeout) {
            this.currentTime = currentTime;
            this.successInterval = successInterval;
            this.errorInterval = errorInterval;
            this.timeout = timeout;
        }

        public long getNextTime(Status status) {
            if (status == Status.SUCCESS) {
                return this.currentTime + this.successInterval;
            }
            return this.currentTime + this.errorInterval;
        }
    }

    private static class AgentJob
    implements Runnable {
        private final NextAttempt lockReleaseTime;
        private final AgentExecutionAction action;
        private final ClusteredAgentScheduler scheduler;

        public AgentJob(NextAttempt times, AgentExecutionAction action, ClusteredAgentScheduler scheduler) {
            this.lockReleaseTime = times;
            this.action = action;
            this.scheduler = scheduler;
        }

        @Override
        public void run() {
            Status status = Status.FAILURE;
            try {
                status = this.action.execute();
            }
            finally {
                this.scheduler.agentCompleted(this.action.getAgent().getAgentType(), this.lockReleaseTime.getNextTime(status));
            }
        }
    }

    private static enum Status {
        SUCCESS,
        FAILURE;

    }
}

