/*
 * Decompiled with CFR 0.152.
 */
package com.github.myzhan.locust4j.runtime;

import com.github.myzhan.locust4j.AbstractTask;
import com.github.myzhan.locust4j.Locust;
import com.github.myzhan.locust4j.message.Message;
import com.github.myzhan.locust4j.rpc.Client;
import com.github.myzhan.locust4j.runtime.RunnerState;
import com.github.myzhan.locust4j.stats.Stats;
import com.github.myzhan.locust4j.utils.Utils;
import com.sun.management.OperatingSystemMXBean;
import java.io.IOException;
import java.lang.management.ManagementFactory;
import java.lang.ref.WeakReference;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class Runner {
    private static final Logger logger = LoggerFactory.getLogger(Runner.class);
    private static final int CONNECT_TIMEOUT = 5;
    private final Map<String, Object> remoteParams = new ConcurrentHashMap<String, Object>();
    private final AtomicInteger threadNumber = new AtomicInteger();
    private final AtomicBoolean heartbeatStopped = new AtomicBoolean(false);
    protected String nodeID;
    protected int numClients = 0;
    protected Map<String, Integer> userClassesCountFromMaster;
    private RunnerState state;
    private List<AbstractTask> tasks;
    private final HashMap<String, List<WeakReference<Future<?>>>> futures = new HashMap();
    private final CountDownLatch waitForAck = new CountDownLatch(1);
    private boolean masterConnected = false;
    private int workerIndex = 0;
    private AtomicLong lastMasterHeartbeatTimestamp = new AtomicLong(0L);
    private Client rpcClient;
    private ThreadPoolExecutor taskExecutor;
    private ExecutorService executor;
    private Stats stats;

    public Runner() {
        this.nodeID = Utils.getNodeID();
    }

    protected boolean isHeartbeatStopped() {
        return this.heartbeatStopped.get();
    }

    protected void setHeartbeatStopped(boolean value) {
        this.heartbeatStopped.set(value);
    }

    protected boolean isMasterHeartbeatTimeout(long timeout) {
        if (this.lastMasterHeartbeatTimestamp.get() != 0L) {
            return System.currentTimeMillis() - this.lastMasterHeartbeatTimestamp.get() > timeout;
        }
        return false;
    }

    public RunnerState getState() {
        return this.state;
    }

    public String getNodeID() {
        return this.nodeID;
    }

    public void setRPCClient(Client client) {
        this.rpcClient = client;
    }

    public Map<String, Object> getRemoteParams() {
        return this.remoteParams;
    }

    public void setStats(Stats stats) {
        this.stats = stats;
    }

    public void setTasks(List<AbstractTask> tasks) {
        this.tasks = tasks;
    }

    protected void setTaskExecutor(ThreadPoolExecutor taskExecutor) {
        this.taskExecutor = taskExecutor;
    }

    private void spawnWorkers(int spawnCount) {
        logger.debug("Required {} clients. Currently running {}.", (Object)spawnCount, (Object)this.taskExecutor.getActiveCount());
        float weightSum = 0.0f;
        for (AbstractTask task : this.tasks) {
            weightSum += (float)task.getWeight();
        }
        this.numClients = 0;
        for (AbstractTask task : this.tasks) {
            Future future;
            int amount;
            if (0.0f == weightSum) {
                amount = spawnCount / this.tasks.size();
            } else {
                float percent = (float)task.getWeight() / weightSum;
                amount = Math.round((float)spawnCount * percent);
            }
            List<WeakReference<Future<?>>> runningTasks = this.futures.get(task.getName());
            if (runningTasks == null) {
                runningTasks = new ArrayList();
            }
            Iterator<WeakReference<Future<?>>> itr = runningTasks.iterator();
            while (itr.hasNext()) {
                future = (Future)itr.next().get();
                if (future != null && !future.isDone()) continue;
                itr.remove();
            }
            while (runningTasks.size() < amount) {
                runningTasks.add(new WeakReference(this.taskExecutor.submit(task)));
                logger.debug("Adding thread to task, which name is {}", (Object)task.getName());
            }
            while (runningTasks.size() > amount) {
                future = (Future)runningTasks.remove(0).get();
                if (future != null) {
                    future.cancel(true);
                }
                logger.debug("Removing thread from task, which name is {}", (Object)task.getName());
            }
            this.futures.put(task.getName(), runningTasks);
            logger.debug("Allocated {} threads to task, which name is {}", (Object)amount, (Object)task.getName());
            this.numClients += runningTasks.size();
        }
    }

    protected void startSpawning(int spawnCount) {
        Stats.getInstance().wakeMeUp();
        if (spawnCount <= 0) {
            this.spawnWorkers(0);
            return;
        }
        if (this.taskExecutor == null) {
            this.setTaskExecutor(new ThreadPoolExecutor(spawnCount, spawnCount, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(), new ThreadFactory(){

                @Override
                public Thread newThread(Runnable r) {
                    Thread thread = new Thread(r);
                    thread.setName("locust4j-worker#" + Runner.this.threadNumber.getAndIncrement());
                    return thread;
                }
            }));
        } else if (spawnCount > this.taskExecutor.getMaximumPoolSize()) {
            this.taskExecutor.setMaximumPoolSize(spawnCount);
            this.taskExecutor.setCorePoolSize(spawnCount);
        } else {
            this.taskExecutor.setCorePoolSize(spawnCount);
            this.taskExecutor.setMaximumPoolSize(spawnCount);
        }
        this.spawnWorkers(spawnCount);
    }

    protected void spawnComplete() {
        HashMap<String, Object> data = new HashMap<String, Object>(1);
        data.put("count", this.numClients);
        data.put("user_classes_count", this.userClassesCountFromMaster);
        try {
            this.rpcClient.send(new Message("spawning_complete", data, -1, this.nodeID));
        }
        catch (IOException ex) {
            logger.error("Error while sending a message about the completed spawn", (Throwable)ex);
        }
    }

    public void quit() {
        try {
            this.rpcClient.send(new Message("quit", null, -1, this.nodeID));
            this.executor.shutdownNow();
        }
        catch (IOException ex) {
            logger.error("Error while sending a message about quiting", (Throwable)ex);
        }
    }

    private void shutdownThreadPool() {
        this.taskExecutor.shutdownNow();
        try {
            this.taskExecutor.awaitTermination(1L, TimeUnit.SECONDS);
        }
        catch (InterruptedException ex) {
            logger.error("Error while waiting for termination", (Throwable)ex);
        }
        this.taskExecutor = null;
    }

    protected void stop() {
        this.shutdownThreadPool();
    }

    private boolean spawnMessageIsValid(Message message) {
        Map<String, Object> data = message.getData();
        if (!data.containsKey("user_classes_count")) {
            logger.debug("Invalid spawn message without user_classes_count, you may use a newer but incompatible version of locust.");
            return false;
        }
        return true;
    }

    private int sumUsersAmount(Message message) {
        Map userClassesCount = (Map)message.getData().get("user_classes_count");
        int amount = 0;
        for (Map.Entry entry : userClassesCount.entrySet()) {
            amount += ((Integer)entry.getValue()).intValue();
        }
        this.userClassesCountFromMaster = userClassesCount;
        return amount;
    }

    private void onSpawnMessage(Message message) {
        Map<String, Object> data = message.getData();
        int numUsers = this.sumUsersAmount(message);
        try {
            this.rpcClient.send(new Message("spawning", null, -1, this.nodeID));
        }
        catch (IOException ex) {
            logger.error("Error while sending a message about spawning", (Throwable)ex);
        }
        this.remoteParams.put("user_classes_count", this.userClassesCountFromMaster);
        if (data.get("host") != null) {
            this.remoteParams.put("host", data.get("host").toString());
        }
        this.startSpawning(numUsers);
        this.spawnComplete();
    }

    private void onMessage(Message message) {
        String type;
        switch (type = message.getType()) {
            case "ack": 
            case "spawn": 
            case "stop": {
                break;
            }
            case "heartbeat": {
                this.lastMasterHeartbeatTimestamp.set(System.currentTimeMillis());
                break;
            }
            case "quit": {
                logger.debug("Got quit message from master, shutting down...");
                System.exit(0);
            }
            default: {
                logger.error("Got {} message from master, which is not supported, please report an issue to locust4j.", (Object)type);
                return;
            }
        }
        if (this.state == RunnerState.Ready) {
            if ("spawn".equals(type) && this.spawnMessageIsValid(message)) {
                this.state = RunnerState.Spawning;
                this.onSpawnMessage(message);
                if (null != Locust.getInstance().getRateLimiter()) {
                    Locust.getInstance().getRateLimiter().start();
                }
                this.state = RunnerState.Running;
            } else if ("ack".equals(type)) {
                this.waitForAck.countDown();
                this.masterConnected = true;
                Map<String, Object> data = message.getData();
                if (data != null && data.containsKey("index")) {
                    this.workerIndex = (Integer)data.get("index");
                }
            }
        } else if (this.state == RunnerState.Spawning || this.state == RunnerState.Running) {
            if ("spawn".equals(type) && this.spawnMessageIsValid(message)) {
                this.state = RunnerState.Spawning;
                this.onSpawnMessage(message);
                this.state = RunnerState.Running;
            } else if ("stop".equals(type)) {
                this.stop();
                if (null != Locust.getInstance().getRateLimiter()) {
                    Locust.getInstance().getRateLimiter().stop();
                }
                this.state = RunnerState.Stopped;
                logger.debug("Recv stop message from master, all the workers are stopped");
                try {
                    this.rpcClient.send(new Message("client_stopped", null, -1, this.nodeID));
                    this.rpcClient.send(new Message("client_ready", null, -1, this.nodeID));
                    this.state = RunnerState.Ready;
                }
                catch (IOException ex) {
                    logger.error("Error while switching from the state stopped to ready", (Throwable)ex);
                }
            }
        } else if (this.state == RunnerState.Stopped && "spawn".equals(type) && this.spawnMessageIsValid(message)) {
            this.state = RunnerState.Spawning;
            this.onSpawnMessage(message);
            if (null != Locust.getInstance().getRateLimiter()) {
                Locust.getInstance().getRateLimiter().start();
            }
            this.state = RunnerState.Running;
        }
    }

    public void getReady() {
        this.executor = new ThreadPoolExecutor(4, 4, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(), new ThreadFactory(){

            @Override
            public Thread newThread(Runnable r) {
                return new Thread(r);
            }
        });
        this.state = RunnerState.Ready;
        try {
            this.rpcClient.send(new Message("client_ready", null, -1, this.nodeID));
        }
        catch (IOException ex) {
            logger.error("Error while sending a message that the system is ready", (Throwable)ex);
        }
        this.executor.submit(new Receiver(this));
        this.executor.submit(new Sender(this));
        try {
            this.waitForAck.await(5L, TimeUnit.SECONDS);
        }
        catch (InterruptedException ex) {
            logger.info("Timeout waiting for ack message from master, you may use a locust version before 2.10.0 or havea network issue");
        }
        this.executor.submit(new Heartbeater(this));
        this.executor.submit(new HeartbeatListener(this));
    }

    private static class Receiver
    implements Runnable {
        private final Runner runner;

        private Receiver(Runner runner) {
            this.runner = runner;
        }

        @Override
        public void run() {
            String name = Thread.currentThread().getName();
            Thread.currentThread().setName(name + "receive-from-client");
            while (true) {
                try {
                    while (true) {
                        Message message = this.runner.rpcClient.recv();
                        this.runner.onMessage(message);
                    }
                }
                catch (Exception ex) {
                    logger.error("Error while receiving a message", (Throwable)ex);
                    continue;
                }
                break;
            }
        }
    }

    private static class Sender
    implements Runnable {
        private final Runner runner;

        private Sender(Runner runner) {
            this.runner = runner;
        }

        @Override
        public void run() {
            String name = Thread.currentThread().getName();
            Thread.currentThread().setName(name + "send-to-client");
            while (true) {
                try {
                    while (true) {
                        Map<String, Object> data;
                        if ((data = this.runner.stats.getMessageToRunnerQueue().take()).containsKey("current_cpu_usage")) {
                            this.runner.rpcClient.send(new Message("heartbeat", data, -1, this.runner.nodeID));
                            continue;
                        }
                        if (this.runner.state == RunnerState.Ready || this.runner.state == RunnerState.Stopped) continue;
                        data.put("user_count", this.runner.numClients);
                        data.put("user_classes_count", this.runner.userClassesCountFromMaster);
                        this.runner.rpcClient.send(new Message("stats", data, -1, this.runner.nodeID));
                    }
                }
                catch (InterruptedException ex) {
                    return;
                }
                catch (Exception ex) {
                    logger.error("Error in running the sender", (Throwable)ex);
                    continue;
                }
                break;
            }
        }
    }

    private static class Heartbeater
    implements Runnable {
        private static final int HEARTBEAT_INTERVAL = 1000;
        private final Runner runner;
        private final OperatingSystemMXBean osBean = this.getOsBean();

        private Heartbeater(Runner runner) {
            this.runner = runner;
        }

        @Override
        public void run() {
            String name = Thread.currentThread().getName();
            Thread.currentThread().setName(name + "heartbeat");
            while (true) {
                try {
                    while (true) {
                        Thread.sleep(1000L);
                        if (this.runner.isHeartbeatStopped()) continue;
                        HashMap<String, Object> data = new HashMap<String, Object>(2);
                        data.put("state", this.runner.state.name().toLowerCase());
                        data.put("current_cpu_usage", this.getCpuUsage());
                        boolean success = this.runner.stats.getMessageToRunnerQueue().offer(data);
                        if (success) continue;
                        logger.error("Failed to insert heartbeat message to the queue");
                    }
                }
                catch (InterruptedException ex) {
                    return;
                }
                catch (Exception ex) {
                    logger.error("Error in running the heartbeat", (Throwable)ex);
                    continue;
                }
                break;
            }
        }

        private double getCpuUsage() {
            return this.osBean.getSystemCpuLoad() * 100.0;
        }

        private OperatingSystemMXBean getOsBean() {
            return (OperatingSystemMXBean)ManagementFactory.getOperatingSystemMXBean();
        }
    }

    private static class HeartbeatListener
    implements Runnable {
        private static final int MASTER_HEARTBEAT_TIMEOUT = 60000;
        private final Runner runner;

        private HeartbeatListener(Runner runner) {
            this.runner = runner;
        }

        @Override
        public void run() {
            while (true) {
                try {
                    while (true) {
                        Thread.sleep(1000L);
                        if (!this.runner.isMasterHeartbeatTimeout(60000L)) continue;
                        logger.error("Did't get heartbeat from master in over 60s, quitting");
                        this.runner.quit();
                    }
                }
                catch (InterruptedException ex) {
                    return;
                }
                catch (Exception ex) {
                    logger.error("Error in running the heartbeat listener", (Throwable)ex);
                    continue;
                }
                break;
            }
        }
    }
}

