/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.ipc;

import java.io.IOException;
import java.lang.management.ManagementFactory;
import java.lang.management.ThreadMXBean;
import java.net.InetSocketAddress;
import java.security.PrivilegedExceptionAction;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.ipc.ProtobufRpcEngine;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.ipc.RpcEngine;
import org.apache.hadoop.ipc.TestRpcBase;
import org.apache.hadoop.ipc.protobuf.TestProtos;
import org.apache.hadoop.ipc.protobuf.TestRpcServiceProtos;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.shaded.com.google.common.base.Joiner;
import org.apache.hadoop.shaded.com.google.protobuf.BlockingService;
import org.apache.hadoop.shaded.org.apache.commons.cli.CommandLine;
import org.apache.hadoop.shaded.org.apache.commons.cli.GnuParser;
import org.apache.hadoop.shaded.org.apache.commons.cli.HelpFormatter;
import org.apache.hadoop.shaded.org.apache.commons.cli.OptionBuilder;
import org.apache.hadoop.shaded.org.apache.commons.cli.Options;
import org.apache.hadoop.shaded.org.apache.commons.cli.ParseException;
import org.apache.hadoop.test.MultithreadedTestUtil;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class RPCCallBenchmark
extends TestRpcBase
implements Tool {
    private Configuration conf;
    private AtomicLong callCount = new AtomicLong(0L);
    private static ThreadMXBean threadBean = ManagementFactory.getThreadMXBean();

    private RPC.Server startServer(MyOptions opts) throws IOException {
        if (opts.serverThreads <= 0) {
            return null;
        }
        this.conf.setInt("ipc.server.read.threadpool.size", opts.serverReaderThreads);
        if (opts.rpcEngine != ProtobufRpcEngine.class) {
            throw new RuntimeException("Bad engine: " + opts.rpcEngine);
        }
        TestRpcBase.PBServerImpl serverImpl = new TestRpcBase.PBServerImpl();
        BlockingService service = TestRpcServiceProtos.TestProtobufRpcProto.newReflectiveBlockingService((TestRpcServiceProtos.TestProtobufRpcProto.BlockingInterface)serverImpl);
        RPC.Server server = new RPC.Builder(this.conf).setProtocol(TestRpcBase.TestRpcService.class).setInstance((Object)service).setBindAddress(opts.host).setPort(opts.getPort()).setNumHandlers(opts.serverThreads).setVerbose(false).build();
        server.start();
        return server;
    }

    private long getTotalCpuTime(Iterable<? extends Thread> threads) {
        long total = 0L;
        for (Thread thread : threads) {
            long tid = thread.getId();
            total += threadBean.getThreadCpuTime(tid);
        }
        return total;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public int run(String[] args) throws Exception {
        block12: {
            MyOptions opts = new MyOptions(args);
            if (opts.failed) {
                return -1;
            }
            RPC.setProtocolEngine((Configuration)this.conf, TestRpcBase.TestRpcService.class, opts.rpcEngine);
            RPC.Server server = this.startServer(opts);
            try {
                MultithreadedTestUtil.TestContext ctx = this.setupClientTestContext(opts);
                if (ctx != null) {
                    long totalCalls = 0L;
                    ctx.startThreads();
                    long veryStart = System.nanoTime();
                    for (int i = 0; i < opts.secondsToRun; ++i) {
                        long st = System.nanoTime();
                        ctx.waitFor(1000L);
                        long et = System.nanoTime();
                        long ct = this.callCount.getAndSet(0L);
                        totalCalls += ct;
                        double callsPerSec = ct * 1000000000L / (et - st);
                        System.out.println("Calls per second: " + callsPerSec);
                    }
                    if (totalCalls > 0L) {
                        long veryEnd = System.nanoTime();
                        double callsPerSec = totalCalls * 1000000000L / (veryEnd - veryStart);
                        long cpuNanosClient = this.getTotalCpuTime(ctx.getTestThreads());
                        long cpuNanosServer = -1L;
                        if (server != null) {
                            cpuNanosServer = this.getTotalCpuTime(server.getHandlers());
                        }
                        System.out.println("====== Results ======");
                        System.out.println("Options:\n" + opts);
                        System.out.println("Total calls per second: " + callsPerSec);
                        System.out.println("CPU time per call on client: " + cpuNanosClient / totalCalls + " ns");
                        if (server != null) {
                            System.out.println("CPU time per call on server: " + cpuNanosServer / totalCalls + " ns");
                        }
                    } else {
                        System.out.println("No calls!");
                    }
                    ctx.stop();
                    break block12;
                }
                while (true) {
                    Thread.sleep(10000L);
                }
            }
            finally {
                if (server != null) {
                    server.stop();
                }
            }
        }
        return 0;
    }

    private MultithreadedTestUtil.TestContext setupClientTestContext(final MyOptions opts) throws IOException, InterruptedException {
        if (opts.clientThreads <= 0) {
            return null;
        }
        int numProxies = opts.clientThreads;
        RpcServiceWrapper[] proxies = new RpcServiceWrapper[numProxies];
        for (int i = 0; i < numProxies; ++i) {
            proxies[i] = (RpcServiceWrapper)UserGroupInformation.createUserForTesting((String)("proxy-" + i), (String[])new String[0]).doAs((PrivilegedExceptionAction)new PrivilegedExceptionAction<RpcServiceWrapper>(){

                @Override
                public RpcServiceWrapper run() throws Exception {
                    return RPCCallBenchmark.this.createRpcClient(opts);
                }
            });
        }
        StringBuilder msgBuilder = new StringBuilder(opts.msgSize);
        for (int c = 0; c < opts.msgSize; ++c) {
            msgBuilder.append('x');
        }
        final String echoMessage = msgBuilder.toString();
        MultithreadedTestUtil.TestContext ctx = new MultithreadedTestUtil.TestContext();
        for (int i = 0; i < opts.clientThreads; ++i) {
            final RpcServiceWrapper proxy = proxies[i % numProxies];
            ctx.addThread(new MultithreadedTestUtil.RepeatingTestThread(ctx){

                @Override
                public void doAnAction() throws Exception {
                    proxy.doEcho(echoMessage);
                    RPCCallBenchmark.this.callCount.incrementAndGet();
                }
            });
        }
        return ctx;
    }

    private RpcServiceWrapper createRpcClient(MyOptions opts) throws IOException {
        InetSocketAddress addr = NetUtils.createSocketAddr((String)opts.host, (int)opts.getPort());
        if (opts.rpcEngine == ProtobufRpcEngine.class) {
            final TestRpcBase.TestRpcService proxy = (TestRpcBase.TestRpcService)RPC.getProxy(TestRpcBase.TestRpcService.class, (long)0L, (InetSocketAddress)addr, (Configuration)this.conf);
            return new RpcServiceWrapper(){

                @Override
                public String doEcho(String msg) throws Exception {
                    TestProtos.EchoRequestProto req = TestProtos.EchoRequestProto.newBuilder().setMessage(msg).build();
                    TestProtos.EchoResponseProto responseProto = proxy.echo(null, req);
                    return responseProto.getMessage();
                }
            };
        }
        throw new RuntimeException("unsupported engine: " + opts.rpcEngine);
    }

    public static void main(String[] args) throws Exception {
        int rc = ToolRunner.run((Tool)new RPCCallBenchmark(), (String[])args);
        System.exit(rc);
    }

    public void setConf(Configuration conf) {
        this.conf = conf;
    }

    public Configuration getConf() {
        return this.conf;
    }

    private static interface RpcServiceWrapper {
        public String doEcho(String var1) throws Exception;
    }

    private static class MyOptions {
        private boolean failed = false;
        private int serverThreads = 0;
        private int serverReaderThreads = 1;
        private int clientThreads = 0;
        private String host = "0.0.0.0";
        private int port = 0;
        public int secondsToRun = 15;
        private int msgSize = 1024;
        public Class<? extends RpcEngine> rpcEngine = ProtobufRpcEngine.class;

        private MyOptions(String[] args) {
            try {
                Options opts = this.buildOptions();
                GnuParser parser = new GnuParser();
                CommandLine line = parser.parse(opts, args, true);
                this.processOptions(line, opts);
                this.validateOptions();
            }
            catch (ParseException e) {
                System.err.println(e.getMessage());
                System.err.println("Try \"--help\" option for details.");
                this.failed = true;
            }
        }

        private void validateOptions() throws ParseException {
            if (this.serverThreads <= 0 && this.clientThreads <= 0) {
                throw new ParseException("Must specify at least -c or -s");
            }
        }

        private Options buildOptions() {
            Options opts = new Options();
            OptionBuilder.withLongOpt((String)"serverThreads");
            OptionBuilder.hasArg((boolean)true);
            OptionBuilder.withArgName((String)"numthreads");
            OptionBuilder.withDescription((String)"number of server threads (handlers) to run (or 0 to not run server)");
            opts.addOption(OptionBuilder.create((String)"s"));
            OptionBuilder.withLongOpt((String)"serverReaderThreads");
            OptionBuilder.hasArg((boolean)true);
            OptionBuilder.withArgName((String)"threads");
            OptionBuilder.withDescription((String)"number of server reader threads to run");
            opts.addOption(OptionBuilder.create((String)"r"));
            OptionBuilder.withLongOpt((String)"clientThreads");
            OptionBuilder.hasArg((boolean)true);
            OptionBuilder.withArgName((String)"numthreads");
            OptionBuilder.withDescription((String)"number of client threads to run (or 0 to not run client)");
            opts.addOption(OptionBuilder.create((String)"c"));
            OptionBuilder.withLongOpt((String)"messageSize");
            OptionBuilder.hasArg((boolean)true);
            OptionBuilder.withArgName((String)"bytes");
            OptionBuilder.withDescription((String)"size of call parameter in bytes");
            opts.addOption(OptionBuilder.create((String)"m"));
            OptionBuilder.withLongOpt((String)"time");
            OptionBuilder.hasArg((boolean)true);
            OptionBuilder.withArgName((String)"seconds");
            OptionBuilder.withDescription((String)"number of seconds to run clients for");
            opts.addOption(OptionBuilder.create((String)"t"));
            OptionBuilder.withLongOpt((String)"port");
            OptionBuilder.hasArg((boolean)true);
            OptionBuilder.withArgName((String)"port");
            OptionBuilder.withDescription((String)"port to listen or connect on");
            opts.addOption(OptionBuilder.create((String)"p"));
            OptionBuilder.withLongOpt((String)"host");
            OptionBuilder.hasArg((boolean)true);
            OptionBuilder.withArgName((String)"addr");
            OptionBuilder.withDescription((String)"host to listen or connect on");
            opts.addOption(OptionBuilder.create((char)'h'));
            OptionBuilder.withLongOpt((String)"engine");
            OptionBuilder.hasArg((boolean)true);
            OptionBuilder.withArgName((String)"protobuf");
            OptionBuilder.withDescription((String)"engine to use");
            opts.addOption(OptionBuilder.create((char)'e'));
            OptionBuilder.withLongOpt((String)"help");
            OptionBuilder.hasArg((boolean)false);
            OptionBuilder.withDescription((String)"show this screen");
            opts.addOption(OptionBuilder.create((char)'?'));
            return opts;
        }

        private void processOptions(CommandLine line, Options opts) throws ParseException {
            Object[] remainingArgs;
            if (line.hasOption("help") || line.hasOption('?')) {
                HelpFormatter formatter = new HelpFormatter();
                System.out.println("Protobuf IPC benchmark.");
                System.out.println();
                formatter.printHelp(100, "java ... PBRPCBenchmark [options]", "\nSupported options:", opts, "");
                return;
            }
            if (line.hasOption('s')) {
                this.serverThreads = Integer.parseInt(line.getOptionValue('s'));
            }
            if (line.hasOption('r')) {
                this.serverReaderThreads = Integer.parseInt(line.getOptionValue('r'));
            }
            if (line.hasOption('c')) {
                this.clientThreads = Integer.parseInt(line.getOptionValue('c'));
            }
            if (line.hasOption('t')) {
                this.secondsToRun = Integer.parseInt(line.getOptionValue('t'));
            }
            if (line.hasOption('m')) {
                this.msgSize = Integer.parseInt(line.getOptionValue('m'));
            }
            if (line.hasOption('p')) {
                this.port = Integer.parseInt(line.getOptionValue('p'));
            }
            if (line.hasOption('h')) {
                this.host = line.getOptionValue('h');
            }
            if (line.hasOption('e')) {
                String eng = line.getOptionValue('e');
                if ("protobuf".equals(eng)) {
                    this.rpcEngine = ProtobufRpcEngine.class;
                } else {
                    throw new ParseException("invalid engine: " + eng);
                }
            }
            if ((remainingArgs = line.getArgs()).length != 0) {
                throw new ParseException("Extra arguments: " + Joiner.on((String)" ").join(remainingArgs));
            }
        }

        public int getPort() {
            if (this.port == 0) {
                this.port = NetUtils.getFreeSocketPort();
                if (this.port == 0) {
                    throw new RuntimeException("Could not find a free port");
                }
            }
            return this.port;
        }

        public String toString() {
            return "rpcEngine=" + this.rpcEngine + "\nserverThreads=" + this.serverThreads + "\nserverReaderThreads=" + this.serverReaderThreads + "\nclientThreads=" + this.clientThreads + "\nhost=" + this.host + "\nport=" + this.getPort() + "\nsecondsToRun=" + this.secondsToRun + "\nmsgSize=" + this.msgSize;
        }
    }
}

