/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.testclient;

import com.beust.jcommander.JCommander;
import com.beust.jcommander.Parameter;
import com.beust.jcommander.ParameterException;
import com.beust.jcommander.Parameters;
import java.io.BufferedReader;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.FileInputStream;
import java.io.InputStreamReader;
import java.net.Socket;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import org.apache.bookkeeper.util.ZkUtils;
import org.apache.pulsar.broker.BundleData;
import org.apache.pulsar.common.policies.data.ResourceQuota;
import org.apache.pulsar.common.util.ObjectMapperFactory;
import org.apache.pulsar.policies.data.loadbalancer.LoadReport;
import org.apache.pulsar.policies.data.loadbalancer.NamespaceBundleStats;
import org.apache.pulsar.testclient.PerfClientUtils;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.ZooKeeper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class LoadSimulationController {
    private static final Logger log = LoggerFactory.getLogger(LoadSimulationController.class);
    private static final String QUOTA_ROOT = "/loadbalance/resource-quota/namespace";
    private static final String BUNDLE_DATA_ROOT = "/loadbalance/bundle-data";
    private final DataInputStream[] inputStreams;
    private final DataOutputStream[] outputStreams;
    private final String[] clients;
    private final int clientPort;
    private final String cluster;
    private final Random random = new Random();
    private static final ExecutorService threadPool = Executors.newCachedThreadPool();

    public LoadSimulationController(MainArguments arguments) throws Exception {
        this.clientPort = arguments.clientPort;
        this.cluster = arguments.cluster;
        this.clients = arguments.clientHostNames.split(",");
        Socket[] sockets = new Socket[this.clients.length];
        this.inputStreams = new DataInputStream[this.clients.length];
        this.outputStreams = new DataOutputStream[this.clients.length];
        log.info("Found {} clients:", (Object)this.clients.length);
        for (int i = 0; i < this.clients.length; ++i) {
            sockets[i] = new Socket(this.clients[i], this.clientPort);
            this.inputStreams[i] = new DataInputStream(sockets[i].getInputStream());
            this.outputStreams[i] = new DataOutputStream(sockets[i].getOutputStream());
            log.info("Connected to {}", (Object)this.clients[i]);
        }
    }

    private boolean checkAppArgs(int numAppArgs, int numRequired) {
        if (numAppArgs != numRequired) {
            log.info("ERROR: Wrong number of application arguments (found {}, required {})", (Object)numAppArgs, (Object)numRequired);
            return false;
        }
        return true;
    }

    private void getResourceQuotas(String path, ZooKeeper zkClient, Map<String, ResourceQuota>[] threadLocalMaps) throws Exception {
        List children = zkClient.getChildren(path, false);
        if (children.isEmpty()) {
            threadLocalMaps[this.random.nextInt(this.clients.length)].put(path, (ResourceQuota)ObjectMapperFactory.getThreadLocal().readValue(zkClient.getData(path, false, null), ResourceQuota.class));
        } else {
            for (String child : children) {
                this.getResourceQuotas(String.format("%s/%s", path, child), zkClient, threadLocalMaps);
            }
        }
    }

    private BundleData initializeBundleData(ResourceQuota quota, ShellArguments arguments) {
        double messageRate = (quota.getMsgRateIn() + quota.getMsgRateOut()) / 2.0;
        int messageSize = (int)Math.ceil((quota.getBandwidthIn() + quota.getBandwidthOut()) / (2.0 * messageRate));
        arguments.rate = messageRate * arguments.rateMultiplier;
        arguments.size = messageSize;
        NamespaceBundleStats startingStats = new NamespaceBundleStats();
        double modifiedRate = messageRate * arguments.rateMultiplier;
        double modifiedBandwidth = (quota.getBandwidthIn() + quota.getBandwidthOut()) * arguments.rateMultiplier / 2.0;
        quota.setMsgRateIn(modifiedRate);
        quota.setMsgRateOut(modifiedRate);
        quota.setBandwidthIn(modifiedBandwidth);
        quota.setBandwidthOut(modifiedBandwidth);
        quota.setMemory(quota.getMemory() * arguments.rateMultiplier);
        startingStats.msgRateIn = quota.getMsgRateIn();
        startingStats.msgRateOut = quota.getMsgRateOut();
        startingStats.msgThroughputIn = quota.getBandwidthIn();
        startingStats.msgThroughputOut = quota.getBandwidthOut();
        BundleData bundleData = new BundleData(10, 1000, startingStats);
        bundleData.getLongTermData().setNumSamples(1000);
        bundleData.getShortTermData().setNumSamples(10);
        return bundleData;
    }

    private String makeTopic(String tenant, String namespace, String topic) {
        return String.format("persistent://%s/%s/%s/%s", tenant, this.cluster, namespace, topic);
    }

    private void writeProducerOptions(DataOutputStream outputStream, ShellArguments arguments, String topic) throws Exception {
        if (!arguments.rangeString.isEmpty()) {
            String[] splits = arguments.rangeString.split(",");
            if (splits.length != 2) {
                log.error("Argument to --rand-rate should be two comma-separated values");
                return;
            }
            double first = Double.parseDouble(splits[0]);
            double second = Double.parseDouble(splits[1]);
            double min = Math.min(first, second);
            double max = Math.max(first, second);
            arguments.rate = this.random.nextDouble() * (max - min) + min;
        }
        outputStream.writeUTF(topic);
        outputStream.writeInt(arguments.size);
        outputStream.writeDouble(arguments.rate);
    }

    private void change(ShellArguments arguments, String topic, int client) throws Exception {
        this.outputStreams[client].write(0);
        this.writeProducerOptions(this.outputStreams[client], arguments, topic);
        this.outputStreams[client].flush();
    }

    private int changeOrCreate(ShellArguments arguments, String topic) throws Exception {
        int client = this.find(topic);
        if (client == -1) {
            this.trade(arguments, topic, this.random.nextInt(this.clients.length));
        } else {
            this.change(arguments, topic, client);
        }
        return client;
    }

    private int changeIfExists(ShellArguments arguments, String topic) throws Exception {
        int client = this.find(topic);
        if (client != -1) {
            this.change(arguments, topic, client);
        }
        return client;
    }

    private int find(String topic) throws Exception {
        int i;
        int clientWithTopic = -1;
        for (i = 0; i < this.clients.length; ++i) {
            this.outputStreams[i].write(5);
            this.outputStreams[i].writeUTF(topic);
        }
        for (i = 0; i < this.clients.length; ++i) {
            if (!this.inputStreams[i].readBoolean()) continue;
            clientWithTopic = i;
        }
        return clientWithTopic;
    }

    private synchronized void trade(ShellArguments arguments, String topic, int client) throws Exception {
        this.outputStreams[client].write(2);
        this.writeProducerOptions(this.outputStreams[client], arguments, topic);
        this.outputStreams[client].flush();
    }

    private void handleChange(ShellArguments arguments) throws Exception {
        String topic;
        List<String> commandArguments = arguments.commandArguments;
        if (this.checkAppArgs(commandArguments.size() - 1, 3) && this.changeIfExists(arguments, topic = this.makeTopic(commandArguments.get(1), commandArguments.get(2), commandArguments.get(3))) == -1) {
            log.info("Topic {} not found", (Object)topic);
        }
    }

    private void handleCopy(ShellArguments arguments) throws Exception {
        List<String> commandArguments = arguments.commandArguments;
        if (this.checkAppArgs(commandArguments.size() - 1, 3)) {
            String tenantName = commandArguments.get(1);
            String sourceZKConnectString = commandArguments.get(2);
            String targetZKConnectString = commandArguments.get(3);
            ZooKeeper sourceZKClient = new ZooKeeper(sourceZKConnectString, 5000, null);
            ZooKeeper targetZKClient = new ZooKeeper(targetZKConnectString, 5000, null);
            Map[] threadLocalMaps = new Map[this.clients.length];
            for (int i = 0; i < this.clients.length; ++i) {
                threadLocalMaps[i] = new HashMap();
            }
            this.getResourceQuotas(QUOTA_ROOT, sourceZKClient, threadLocalMaps);
            ArrayList futures = new ArrayList(this.clients.length);
            int i = 0;
            log.info("Copying...");
            for (Map bundleToQuota : threadLocalMaps) {
                int j = i++;
                futures.add(threadPool.submit(() -> {
                    for (Map.Entry entry : bundleToQuota.entrySet()) {
                        String bundle = (String)entry.getKey();
                        ResourceQuota quota = (ResourceQuota)entry.getValue();
                        int tenantStart = QUOTA_ROOT.length() + 1;
                        int clusterStart = bundle.indexOf(47, tenantStart) + 1;
                        String sourceTenant = bundle.substring(tenantStart, clusterStart - 1);
                        int namespaceStart = bundle.indexOf(47, clusterStart) + 1;
                        String sourceCluster = bundle.substring(clusterStart, namespaceStart - 1);
                        String namespace = bundle.substring(namespaceStart, bundle.lastIndexOf(47));
                        String keyRangeString = bundle.substring(bundle.lastIndexOf(47) + 1);
                        String manglePrefix = String.format("%s-%s-%s", sourceCluster, sourceTenant, keyRangeString);
                        String mangledNamespace = String.format("%s-%s", manglePrefix, namespace);
                        BundleData bundleData = this.initializeBundleData(quota, arguments);
                        String oldAPITargetPath = String.format("/loadbalance/resource-quota/namespace/%s/%s/%s/0x00000000_0xffffffff", tenantName, this.cluster, mangledNamespace);
                        String newAPITargetPath = String.format("/loadbalance/bundle-data/%s/%s/%s/0x00000000_0xffffffff", tenantName, this.cluster, mangledNamespace);
                        try {
                            ZkUtils.createFullPathOptimistic((ZooKeeper)targetZKClient, (String)oldAPITargetPath, (byte[])ObjectMapperFactory.getThreadLocal().writeValueAsBytes((Object)quota), (List)ZooDefs.Ids.OPEN_ACL_UNSAFE, (CreateMode)CreateMode.PERSISTENT);
                        }
                        catch (KeeperException.NodeExistsException nodeExistsException) {
                        }
                        catch (Exception e) {
                            throw new RuntimeException(e);
                        }
                        try {
                            ZkUtils.createFullPathOptimistic((ZooKeeper)targetZKClient, (String)newAPITargetPath, (byte[])ObjectMapperFactory.getThreadLocal().writeValueAsBytes((Object)bundleData), (List)ZooDefs.Ids.OPEN_ACL_UNSAFE, (CreateMode)CreateMode.PERSISTENT);
                        }
                        catch (KeeperException.NodeExistsException e) {
                        }
                        catch (Exception e) {
                            throw new RuntimeException(e);
                        }
                        try {
                            this.trade(arguments, this.makeTopic(tenantName, mangledNamespace, "t"), j);
                        }
                        catch (Exception e) {
                            throw new RuntimeException(e);
                        }
                    }
                }));
            }
            for (Future future : futures) {
                future.get();
            }
            sourceZKClient.close();
            targetZKClient.close();
        }
    }

    private void handleSimulate(ShellArguments arguments) throws Exception {
        List<String> commandArguments = arguments.commandArguments;
        this.checkAppArgs(commandArguments.size() - 1, 1);
        ZooKeeper zkClient = new ZooKeeper(commandArguments.get(1), 5000, null);
        Map[] threadLocalMaps = new Map[this.clients.length];
        for (int i = 0; i < this.clients.length; ++i) {
            threadLocalMaps[i] = new HashMap();
        }
        this.getResourceQuotas(QUOTA_ROOT, zkClient, threadLocalMaps);
        ArrayList futures = new ArrayList(this.clients.length);
        int i = 0;
        log.info("Simulating...");
        for (Map bundleToQuota : threadLocalMaps) {
            int j = i++;
            futures.add(threadPool.submit(() -> {
                for (Map.Entry entry : bundleToQuota.entrySet()) {
                    String bundle = (String)entry.getKey();
                    String newAPIPath = bundle.replace(QUOTA_ROOT, BUNDLE_DATA_ROOT);
                    ResourceQuota quota = (ResourceQuota)entry.getValue();
                    int tenantStart = QUOTA_ROOT.length() + 1;
                    String topic = String.format("persistent://%s/t", bundle.substring(tenantStart));
                    BundleData bundleData = this.initializeBundleData(quota, arguments);
                    try {
                        ZkUtils.createFullPathOptimistic((ZooKeeper)zkClient, (String)newAPIPath, (byte[])ObjectMapperFactory.getThreadLocal().writeValueAsBytes((Object)bundleData), (List)ZooDefs.Ids.OPEN_ACL_UNSAFE, (CreateMode)CreateMode.PERSISTENT);
                    }
                    catch (KeeperException.NodeExistsException e) {
                        try {
                            zkClient.setData(newAPIPath, ObjectMapperFactory.getThreadLocal().writeValueAsBytes((Object)bundleData), -1);
                        }
                        catch (Exception ex) {
                            throw new RuntimeException(ex);
                        }
                    }
                    catch (Exception e) {
                        throw new RuntimeException(e);
                    }
                    try {
                        this.trade(arguments, topic, j);
                    }
                    catch (Exception e) {
                        throw new RuntimeException(e);
                    }
                }
            }));
        }
        for (Future future : futures) {
            future.get();
        }
        zkClient.close();
    }

    private void handleStop(ShellArguments arguments) throws Exception {
        List<String> commandArguments = arguments.commandArguments;
        if (this.checkAppArgs(commandArguments.size() - 1, 3)) {
            String topic = this.makeTopic(commandArguments.get(1), commandArguments.get(2), commandArguments.get(3));
            for (DataOutputStream outputStream : this.outputStreams) {
                outputStream.write(1);
                outputStream.writeUTF(topic);
                outputStream.flush();
            }
        }
    }

    private void handleStream(ShellArguments arguments) throws Exception {
        List<String> commandArguments = arguments.commandArguments;
        if (this.checkAppArgs(commandArguments.size() - 1, 1)) {
            String zkConnectString = commandArguments.get(1);
            ZooKeeper zkClient = new ZooKeeper(zkConnectString, 5000, null);
            new BrokerWatcher(zkClient, arguments);
            while (true) {
                // Infinite loop
            }
        }
    }

    private void handleTrade(ShellArguments arguments) throws Exception {
        List<String> commandArguments = arguments.commandArguments;
        if (this.checkAppArgs(commandArguments.size() - 1, 3)) {
            String topic = this.makeTopic(commandArguments.get(1), commandArguments.get(2), commandArguments.get(3));
            this.trade(arguments, topic, this.random.nextInt(this.clients.length));
        }
    }

    private void handleGroupChange(ShellArguments arguments) throws Exception {
        List<String> commandArguments = arguments.commandArguments;
        if (this.checkAppArgs(commandArguments.size() - 1, 2)) {
            String tenant = commandArguments.get(1);
            String group = commandArguments.get(2);
            for (DataOutputStream outputStream : this.outputStreams) {
                outputStream.write(3);
                outputStream.writeUTF(tenant);
                outputStream.writeUTF(group);
                outputStream.writeInt(arguments.size);
                outputStream.writeDouble(arguments.rate);
                outputStream.flush();
            }
        }
    }

    private void handleGroupStop(ShellArguments arguments) throws Exception {
        List<String> commandArguments = arguments.commandArguments;
        if (this.checkAppArgs(commandArguments.size() - 1, 2)) {
            String tenant = commandArguments.get(1);
            String group = commandArguments.get(2);
            for (DataOutputStream outputStream : this.outputStreams) {
                outputStream.write(4);
                outputStream.writeUTF(tenant);
                outputStream.writeUTF(group);
                outputStream.flush();
            }
        }
    }

    private void handleGroupTrade(ShellArguments arguments) throws Exception {
        List<String> commandArguments = arguments.commandArguments;
        if (this.checkAppArgs(commandArguments.size() - 1, 3)) {
            String tenant = commandArguments.get(1);
            String group = commandArguments.get(2);
            int numNamespaces = Integer.parseInt(commandArguments.get(3));
            for (int i = 0; i < numNamespaces; ++i) {
                for (int j = 0; j < arguments.topicsPerNamespace; ++j) {
                    String topic = this.makeTopic(tenant, String.format("%s-%d", group, i), Integer.toString(j));
                    this.trade(arguments, topic, this.random.nextInt(this.clients.length));
                    Thread.sleep(arguments.separation);
                }
            }
        }
    }

    private void read(String[] args) {
        if (!(args.length <= 0 || args.length == 1 && args[0].isEmpty())) {
            ShellArguments arguments = new ShellArguments();
            JCommander jc = new JCommander((Object)arguments);
            try {
                String command;
                jc.parse(args);
                switch (command = arguments.commandArguments.get(0)) {
                    case "trade": {
                        this.handleTrade(arguments);
                        break;
                    }
                    case "change": {
                        this.handleChange(arguments);
                        break;
                    }
                    case "stop": {
                        this.handleStop(arguments);
                        break;
                    }
                    case "trade_group": {
                        this.handleGroupTrade(arguments);
                        break;
                    }
                    case "change_group": {
                        this.handleGroupChange(arguments);
                        break;
                    }
                    case "stop_group": {
                        this.handleGroupStop(arguments);
                        break;
                    }
                    case "script": {
                        List<String> commandArguments = arguments.commandArguments;
                        this.checkAppArgs(commandArguments.size() - 1, 1);
                        String scriptName = commandArguments.get(1);
                        BufferedReader scriptReader = new BufferedReader(new InputStreamReader(new FileInputStream(Paths.get(scriptName, new String[0]).toFile())));
                        String line = scriptReader.readLine();
                        while (line != null) {
                            this.read(line.split("\\s+"));
                            line = scriptReader.readLine();
                        }
                        scriptReader.close();
                        break;
                    }
                    case "copy": {
                        this.handleCopy(arguments);
                        break;
                    }
                    case "stream": {
                        this.handleStream(arguments);
                        break;
                    }
                    case "simulate": {
                        this.handleSimulate(arguments);
                        break;
                    }
                    case "quit": 
                    case "exit": {
                        PerfClientUtils.exit(0);
                        break;
                    }
                    default: {
                        log.info("ERROR: Unknown command \"{}\"", (Object)command);
                        break;
                    }
                }
            }
            catch (ParameterException ex) {
                ex.printStackTrace();
                jc.usage();
            }
            catch (Exception ex) {
                ex.printStackTrace();
            }
        }
    }

    public void run() throws Exception {
        BufferedReader inReader = new BufferedReader(new InputStreamReader(System.in));
        while (true) {
            System.out.println();
            System.out.print("> ");
            this.read(inReader.readLine().split("\\s+"));
        }
    }

    public static void main(String[] args) throws Exception {
        MainArguments arguments = new MainArguments();
        JCommander jc = new JCommander((Object)arguments);
        jc.setProgramName("pulsar-perf simulation-controller");
        try {
            jc.parse(args);
        }
        catch (Exception ex) {
            System.out.println(ex.getMessage());
            jc.usage();
            PerfClientUtils.exit(-1);
        }
        new LoadSimulationController(arguments).run();
    }

    private class LoadReportWatcher
    implements Watcher {
        private final ZooKeeper zkClient;
        private final String path;
        private final ShellArguments arguments;

        public LoadReportWatcher(String path, ZooKeeper zkClient, ShellArguments arguments) {
            this.path = path;
            this.zkClient = zkClient;
            this.arguments = arguments;
            this.process(null);
        }

        public synchronized void process(WatchedEvent event) {
            try {
                LoadReport loadReport = (LoadReport)ObjectMapperFactory.getThreadLocal().readValue(this.zkClient.getData(this.path, (Watcher)this, null), LoadReport.class);
                for (Map.Entry entry : loadReport.getBundleStats().entrySet()) {
                    String bundle = (String)entry.getKey();
                    String namespace = bundle.substring(0, bundle.lastIndexOf(47));
                    String topic = String.format("%s/%s", namespace, "t");
                    NamespaceBundleStats stats = (NamespaceBundleStats)entry.getValue();
                    double messageRate = this.arguments.rateMultiplier * (stats.msgRateIn + stats.msgRateOut) / 2.0;
                    int messageSize = (int)Math.ceil(this.arguments.rateMultiplier * (stats.msgThroughputIn + stats.msgThroughputOut) / (2.0 * messageRate));
                    this.arguments.rate = messageRate;
                    this.arguments.size = messageSize;
                    LoadSimulationController.this.changeOrCreate(this.arguments, topic);
                }
            }
            catch (Exception ex) {
                throw new RuntimeException(ex);
            }
        }
    }

    private class BrokerWatcher
    implements Watcher {
        private final ZooKeeper zkClient;
        private final Set<String> brokers;
        private final ShellArguments arguments;

        private BrokerWatcher(ZooKeeper zkClient, ShellArguments arguments) {
            this.zkClient = zkClient;
            this.arguments = arguments;
            this.brokers = new HashSet<String>();
            this.process(null);
        }

        public synchronized void process(WatchedEvent event) {
            try {
                List currentBrokers = this.zkClient.getChildren("/loadbalance/brokers", (Watcher)this);
                for (String broker : currentBrokers) {
                    if (this.brokers.contains(broker)) continue;
                    new LoadReportWatcher(String.format("%s/%s", "/loadbalance/brokers", broker), this.zkClient, this.arguments);
                    this.brokers.add(broker);
                }
            }
            catch (Exception ex) {
                throw new RuntimeException(ex);
            }
        }
    }

    private static class ShellArguments {
        @Parameter(description="Command arguments:\ntrade tenant namespace topic\nchange tenant namespace topic\nstop tenant namespace topic\ntrade_group tenant group_name num_namespaces\nchange_group tenant group_name\nstop_group tenant group_name\nscript script_name\ncopy tenant_name source_zk target_zk\nstream source_zk\nsimulate zk\n", required=true)
        List<String> commandArguments;
        @Parameter(names={"--rand-rate"}, description="Choose message rate uniformly randomly from the next two comma separated values (overrides --rate)")
        String rangeString = "";
        @Parameter(names={"--rate"}, description="Messages per second")
        double rate = 1.0;
        @Parameter(names={"--rate-multiplier"}, description="Multiplier to use for copying or streaming rates")
        double rateMultiplier = 1.0;
        @Parameter(names={"--separation"}, description="Separation time in ms for trade_group actions (0 for no separation)")
        int separation = 0;
        @Parameter(names={"--size"}, description="Message size in bytes")
        int size = 1024;
        @Parameter(names={"--topics-per-namespace"}, description="Number of topics to create per namespace in trade_group (total number of topics is num_namespaces X num_topics)")
        int topicsPerNamespace = 1;

        private ShellArguments() {
        }
    }

    @Parameters(commandDescription="Provides a shell for the user to dictate how simulation clients should incur load.")
    private static class MainArguments {
        @Parameter(names={"-h", "--help"}, description="Help message", help=true)
        boolean help;
        @Parameter(names={"--cluster"}, description="Cluster to test on", required=true)
        String cluster;
        @Parameter(names={"--clients"}, description="Comma separated list of client hostnames", required=true)
        String clientHostNames;
        @Parameter(names={"--client-port"}, description="Port that the clients are listening on", required=true)
        int clientPort;

        private MainArguments() {
        }
    }
}

