/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.hbase.ipc;

import java.io.IOException;
import java.lang.reflect.Field;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.LinkedBlockingQueue;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.CategoryBasedTimeout;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.ipc.CallQueueInfo;
import org.apache.hadoop.hbase.ipc.CallRunner;
import org.apache.hadoop.hbase.ipc.PriorityFunction;
import org.apache.hadoop.hbase.ipc.RpcCall;
import org.apache.hadoop.hbase.ipc.RpcExecutor;
import org.apache.hadoop.hbase.ipc.RpcScheduler;
import org.apache.hadoop.hbase.ipc.ServerCall;
import org.apache.hadoop.hbase.ipc.SimpleRpcScheduler;
import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler;
import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandlerImpl;
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.shaded.com.google.common.collect.ImmutableList;
import org.apache.hadoop.hbase.shaded.com.google.common.collect.ImmutableMap;
import org.apache.hadoop.hbase.shaded.com.google.common.collect.ImmutableSet;
import org.apache.hadoop.hbase.shaded.com.google.common.collect.Maps;
import org.apache.hadoop.hbase.shaded.com.google.protobuf.Message;
import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos;
import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos;
import org.apache.hadoop.hbase.testclassification.RPCTests;
import org.apache.hadoop.hbase.testclassification.SmallTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.EnvironmentEdge;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.Threads;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.rules.TestRule;
import org.mockito.Matchers;
import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import org.mockito.verification.VerificationMode;

@Category(value={RPCTests.class, SmallTests.class})
public class TestSimpleRpcScheduler {
    @Rule
    public final TestRule timeout = CategoryBasedTimeout.builder().withTimeout(this.getClass()).withLookingForStuckThread(true).build();
    private static final Log LOG = LogFactory.getLog(TestSimpleRpcScheduler.class);
    private final RpcScheduler.Context CONTEXT = new RpcScheduler.Context(){

        public InetSocketAddress getListenerAddress() {
            return InetSocketAddress.createUnresolved("127.0.0.1", 1000);
        }
    };
    private Configuration conf;

    @Before
    public void setUp() {
        this.conf = HBaseConfiguration.create();
    }

    @Test
    public void testBasic() throws IOException, InterruptedException {
        PriorityFunction qosFunction = (PriorityFunction)Mockito.mock(PriorityFunction.class);
        SimpleRpcScheduler scheduler = new SimpleRpcScheduler(this.conf, 10, 0, 0, qosFunction, 0);
        scheduler.init(this.CONTEXT);
        scheduler.start();
        CallRunner task = this.createMockTask();
        task.setStatus((MonitoredRPCHandler)new MonitoredRPCHandlerImpl());
        scheduler.dispatch(task);
        ((CallRunner)Mockito.verify((Object)task, (VerificationMode)Mockito.timeout((long)1000L))).run();
        scheduler.stop();
    }

    private RpcScheduler disableHandlers(RpcScheduler scheduler) {
        try {
            Field ExecutorField = scheduler.getClass().getDeclaredField("callExecutor");
            ExecutorField.setAccessible(true);
            RpcExecutor rpcExecutor = (RpcExecutor)ExecutorField.get(scheduler);
            Field handlerCountField = rpcExecutor.getClass().getSuperclass().getSuperclass().getDeclaredField("handlerCount");
            handlerCountField.setAccessible(true);
            handlerCountField.set(rpcExecutor, 0);
            Field numCallQueuesField = rpcExecutor.getClass().getSuperclass().getSuperclass().getDeclaredField("numCallQueues");
            numCallQueuesField.setAccessible(true);
            numCallQueuesField.set(rpcExecutor, 1);
            Field currentQueueLimitField = rpcExecutor.getClass().getSuperclass().getSuperclass().getDeclaredField("currentQueueLimit");
            currentQueueLimitField.setAccessible(true);
            currentQueueLimitField.set(rpcExecutor, 100);
        }
        catch (NoSuchFieldException e) {
            LOG.error((Object)("No such field exception" + e));
        }
        catch (IllegalAccessException e) {
            LOG.error((Object)("Illegal access exception" + e));
        }
        return scheduler;
    }

    @Test
    public void testCallQueueInfo() throws IOException, InterruptedException {
        int totalCallMethods;
        PriorityFunction qosFunction = (PriorityFunction)Mockito.mock(PriorityFunction.class);
        SimpleRpcScheduler scheduler = new SimpleRpcScheduler(this.conf, 0, 0, 0, qosFunction, 0);
        scheduler.init(this.CONTEXT);
        scheduler = this.disableHandlers((RpcScheduler)scheduler);
        scheduler.start();
        for (int i = totalCallMethods = 10; i > 0; --i) {
            CallRunner task = this.createMockTask();
            task.setStatus((MonitoredRPCHandler)new MonitoredRPCHandlerImpl());
            scheduler.dispatch(task);
        }
        CallQueueInfo callQueueInfo = scheduler.getCallQueueInfo();
        for (String callQueueName : callQueueInfo.getCallQueueNames()) {
            for (String calledMethod : callQueueInfo.getCalledMethodNames(callQueueName)) {
                Assert.assertEquals((long)callQueueInfo.getCallMethodCount(callQueueName, calledMethod), (long)totalCallMethods);
            }
        }
        scheduler.stop();
    }

    @Test
    public void testHandlerIsolation() throws IOException, InterruptedException {
        CallRunner generalTask = this.createMockTask();
        CallRunner priorityTask = this.createMockTask();
        CallRunner replicationTask = this.createMockTask();
        ImmutableList tasks = ImmutableList.of((Object)generalTask, (Object)priorityTask, (Object)replicationTask);
        ImmutableMap qos = ImmutableMap.of((Object)generalTask, (Object)0, (Object)priorityTask, (Object)201, (Object)replicationTask, (Object)5);
        PriorityFunction qosFunction = (PriorityFunction)Mockito.mock(PriorityFunction.class);
        final HashMap handlerThreads = Maps.newHashMap();
        final CountDownLatch countDownLatch = new CountDownLatch(tasks.size());
        Answer<Void> answerToRun = new Answer<Void>(){

            /*
             * WARNING - Removed try catching itself - possible behaviour change.
             */
            public Void answer(InvocationOnMock invocationOnMock) throws Throwable {
                Map map = handlerThreads;
                synchronized (map) {
                    handlerThreads.put((CallRunner)invocationOnMock.getMock(), Thread.currentThread());
                }
                countDownLatch.countDown();
                return null;
            }
        };
        for (CallRunner task : tasks) {
            task.setStatus((MonitoredRPCHandler)new MonitoredRPCHandlerImpl());
            ((CallRunner)Mockito.doAnswer((Answer)answerToRun).when((Object)task)).run();
        }
        SimpleRpcScheduler scheduler = new SimpleRpcScheduler(this.conf, 1, 1, 1, qosFunction, 200);
        scheduler.init(this.CONTEXT);
        scheduler.start();
        for (CallRunner task : tasks) {
            Mockito.when((Object)qosFunction.getPriority((RPCProtos.RequestHeader)Matchers.anyObject(), (Message)Matchers.anyObject(), (User)Matchers.anyObject())).thenReturn(qos.get(task));
            scheduler.dispatch(task);
        }
        for (CallRunner task : tasks) {
            ((CallRunner)Mockito.verify((Object)task, (VerificationMode)Mockito.timeout((long)1000L))).run();
        }
        scheduler.stop();
        countDownLatch.await();
        Assert.assertEquals((long)3L, (long)ImmutableSet.copyOf(handlerThreads.values()).size());
    }

    private CallRunner createMockTask() {
        ServerCall call = (ServerCall)Mockito.mock(ServerCall.class);
        CallRunner task = (CallRunner)Mockito.mock(CallRunner.class);
        Mockito.when((Object)task.getRpcCall()).thenReturn((Object)call);
        Mockito.when((Object)call.getRequestUser()).thenReturn(Optional.empty());
        return task;
    }

    @Test
    public void testRpcScheduler() throws Exception {
        this.testRpcScheduler("deadline");
        this.testRpcScheduler("fifo");
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void testRpcScheduler(String queueType) throws Exception {
        Configuration schedConf = HBaseConfiguration.create();
        schedConf.set("hbase.ipc.server.callqueue.type", queueType);
        PriorityFunction priority = (PriorityFunction)Mockito.mock(PriorityFunction.class);
        Mockito.when((Object)priority.getPriority((RPCProtos.RequestHeader)Matchers.any(RPCProtos.RequestHeader.class), (Message)Matchers.any(Message.class), (User)Matchers.any(User.class))).thenReturn((Object)0);
        SimpleRpcScheduler scheduler = new SimpleRpcScheduler(schedConf, 1, 1, 1, priority, 10);
        try {
            scheduler.start();
            CallRunner smallCallTask = (CallRunner)Mockito.mock(CallRunner.class);
            ServerCall smallCall = (ServerCall)Mockito.mock(ServerCall.class);
            RPCProtos.RequestHeader smallHead = RPCProtos.RequestHeader.newBuilder().setCallId(1).build();
            Mockito.when((Object)smallCallTask.getRpcCall()).thenReturn((Object)smallCall);
            Mockito.when((Object)smallCall.getHeader()).thenReturn((Object)smallHead);
            Mockito.when((Object)smallCall.getRequestUser()).thenReturn(Optional.empty());
            CallRunner largeCallTask = (CallRunner)Mockito.mock(CallRunner.class);
            ServerCall largeCall = (ServerCall)Mockito.mock(ServerCall.class);
            RPCProtos.RequestHeader largeHead = RPCProtos.RequestHeader.newBuilder().setCallId(50).build();
            Mockito.when((Object)largeCallTask.getRpcCall()).thenReturn((Object)largeCall);
            Mockito.when((Object)largeCall.getHeader()).thenReturn((Object)largeHead);
            Mockito.when((Object)largeCall.getRequestUser()).thenReturn(Optional.empty());
            CallRunner hugeCallTask = (CallRunner)Mockito.mock(CallRunner.class);
            ServerCall hugeCall = (ServerCall)Mockito.mock(ServerCall.class);
            RPCProtos.RequestHeader hugeHead = RPCProtos.RequestHeader.newBuilder().setCallId(100).build();
            Mockito.when((Object)hugeCallTask.getRpcCall()).thenReturn((Object)hugeCall);
            Mockito.when((Object)hugeCall.getHeader()).thenReturn((Object)hugeHead);
            Mockito.when((Object)hugeCall.getRequestUser()).thenReturn(Optional.empty());
            Mockito.when((Object)priority.getDeadline((RPCProtos.RequestHeader)Matchers.eq((Object)smallHead), (Message)Matchers.any(Message.class))).thenReturn((Object)0L);
            Mockito.when((Object)priority.getDeadline((RPCProtos.RequestHeader)Matchers.eq((Object)largeHead), (Message)Matchers.any(Message.class))).thenReturn((Object)50L);
            Mockito.when((Object)priority.getDeadline((RPCProtos.RequestHeader)Matchers.eq((Object)hugeHead), (Message)Matchers.any(Message.class))).thenReturn((Object)100L);
            ArrayList<Integer> work = new ArrayList<Integer>();
            this.doAnswerTaskExecution(smallCallTask, work, 10, 250);
            this.doAnswerTaskExecution(largeCallTask, work, 50, 250);
            this.doAnswerTaskExecution(hugeCallTask, work, 100, 250);
            scheduler.dispatch(smallCallTask);
            scheduler.dispatch(smallCallTask);
            scheduler.dispatch(smallCallTask);
            scheduler.dispatch(hugeCallTask);
            scheduler.dispatch(smallCallTask);
            scheduler.dispatch(largeCallTask);
            scheduler.dispatch(smallCallTask);
            scheduler.dispatch(smallCallTask);
            while (work.size() < 8) {
                Thread.sleep(100L);
            }
            int seqSum = 0;
            int totalTime = 0;
            for (int i = 0; i < work.size(); ++i) {
                LOG.debug((Object)("Request i=" + i + " value=" + work.get(i)));
                totalTime += (seqSum += work.get(i).intValue());
            }
            LOG.debug((Object)("Total Time: " + totalTime));
            if (queueType.equals("deadline")) {
                Assert.assertEquals((long)530L, (long)totalTime);
            } else if (queueType.equals("fifo")) {
                Assert.assertEquals((long)930L, (long)totalTime);
            }
        }
        finally {
            scheduler.stop();
        }
    }

    @Test
    public void testScanQueueWithZeroScanRatio() throws Exception {
        Configuration schedConf = HBaseConfiguration.create();
        schedConf.setFloat("hbase.ipc.server.callqueue.handler.factor", 1.0f);
        schedConf.setFloat("hbase.ipc.server.callqueue.read.ratio", 0.5f);
        schedConf.setFloat("hbase.ipc.server.callqueue.scan.ratio", 0.0f);
        PriorityFunction priority = (PriorityFunction)Mockito.mock(PriorityFunction.class);
        Mockito.when((Object)priority.getPriority((RPCProtos.RequestHeader)Matchers.any(RPCProtos.RequestHeader.class), (Message)Matchers.any(Message.class), (User)Matchers.any(User.class))).thenReturn((Object)0);
        SimpleRpcScheduler scheduler = new SimpleRpcScheduler(schedConf, 2, 1, 1, priority, 10);
        Assert.assertNotEquals((Object)scheduler, null);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testScanQueues() throws Exception {
        Configuration schedConf = HBaseConfiguration.create();
        schedConf.setFloat("hbase.ipc.server.callqueue.handler.factor", 1.0f);
        schedConf.setFloat("hbase.ipc.server.callqueue.read.ratio", 0.7f);
        schedConf.setFloat("hbase.ipc.server.callqueue.scan.ratio", 0.5f);
        PriorityFunction priority = (PriorityFunction)Mockito.mock(PriorityFunction.class);
        Mockito.when((Object)priority.getPriority((RPCProtos.RequestHeader)Matchers.any(RPCProtos.RequestHeader.class), (Message)Matchers.any(Message.class), (User)Matchers.any(User.class))).thenReturn((Object)0);
        SimpleRpcScheduler scheduler = new SimpleRpcScheduler(schedConf, 3, 1, 1, priority, 10);
        try {
            scheduler.start();
            CallRunner putCallTask = (CallRunner)Mockito.mock(CallRunner.class);
            ServerCall putCall = (ServerCall)Mockito.mock(ServerCall.class);
            putCall.param = RequestConverter.buildMutateRequest((byte[])Bytes.toBytes((String)"abc"), (Put)new Put(Bytes.toBytes((String)"row")));
            RPCProtos.RequestHeader putHead = RPCProtos.RequestHeader.newBuilder().setMethodName("mutate").build();
            Mockito.when((Object)putCallTask.getRpcCall()).thenReturn((Object)putCall);
            Mockito.when((Object)putCall.getHeader()).thenReturn((Object)putHead);
            Mockito.when((Object)putCall.getParam()).thenReturn((Object)putCall.param);
            Mockito.when((Object)putCall.getRequestUser()).thenReturn(Optional.empty());
            CallRunner getCallTask = (CallRunner)Mockito.mock(CallRunner.class);
            ServerCall getCall = (ServerCall)Mockito.mock(ServerCall.class);
            RPCProtos.RequestHeader getHead = RPCProtos.RequestHeader.newBuilder().setMethodName("get").build();
            Mockito.when((Object)getCallTask.getRpcCall()).thenReturn((Object)getCall);
            Mockito.when((Object)getCall.getHeader()).thenReturn((Object)getHead);
            Mockito.when((Object)getCall.getRequestUser()).thenReturn(Optional.empty());
            CallRunner scanCallTask = (CallRunner)Mockito.mock(CallRunner.class);
            ServerCall scanCall = (ServerCall)Mockito.mock(ServerCall.class);
            scanCall.param = ClientProtos.ScanRequest.newBuilder().setScannerId(1L).build();
            RPCProtos.RequestHeader scanHead = RPCProtos.RequestHeader.newBuilder().setMethodName("scan").build();
            Mockito.when((Object)scanCallTask.getRpcCall()).thenReturn((Object)scanCall);
            Mockito.when((Object)scanCall.getHeader()).thenReturn((Object)scanHead);
            Mockito.when((Object)scanCall.getParam()).thenReturn((Object)scanCall.param);
            Mockito.when((Object)scanCall.getRequestUser()).thenReturn(Optional.empty());
            ArrayList<Integer> work = new ArrayList<Integer>();
            this.doAnswerTaskExecution(putCallTask, work, 1, 1000);
            this.doAnswerTaskExecution(getCallTask, work, 2, 1000);
            this.doAnswerTaskExecution(scanCallTask, work, 3, 1000);
            scheduler.dispatch(putCallTask);
            scheduler.dispatch(putCallTask);
            scheduler.dispatch(putCallTask);
            scheduler.dispatch(getCallTask);
            scheduler.dispatch(getCallTask);
            scheduler.dispatch(getCallTask);
            scheduler.dispatch(scanCallTask);
            scheduler.dispatch(scanCallTask);
            scheduler.dispatch(scanCallTask);
            while (work.size() < 6) {
                Thread.sleep(100L);
            }
            for (int i = 0; i < work.size() - 2; i += 3) {
                Assert.assertNotEquals((Object)work.get(i + 0), (Object)work.get(i + 1));
                Assert.assertNotEquals((Object)work.get(i + 0), (Object)work.get(i + 2));
                Assert.assertNotEquals((Object)work.get(i + 1), (Object)work.get(i + 2));
            }
        }
        finally {
            scheduler.stop();
        }
    }

    private void doAnswerTaskExecution(CallRunner callTask, final ArrayList<Integer> results, final int value, final int sleepInterval) {
        callTask.setStatus((MonitoredRPCHandler)new MonitoredRPCHandlerImpl());
        ((CallRunner)Mockito.doAnswer((Answer)new Answer<Object>(){

            /*
             * WARNING - Removed try catching itself - possible behaviour change.
             */
            public Object answer(InvocationOnMock invocation) {
                ArrayList arrayList = results;
                synchronized (arrayList) {
                    results.add(value);
                }
                Threads.sleepWithoutInterrupt((long)sleepInterval);
                return null;
            }
        }).when((Object)callTask)).run();
    }

    private static void waitUntilQueueEmpty(SimpleRpcScheduler scheduler) throws InterruptedException {
        while (scheduler.getGeneralQueueLength() > 0) {
            Thread.sleep(100L);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testSoftAndHardQueueLimits() throws Exception {
        Configuration schedConf = HBaseConfiguration.create();
        schedConf.setInt("hbase.regionserver.handler.count", 0);
        schedConf.setInt("hbase.ipc.server.max.callqueue.length", 5);
        PriorityFunction priority = (PriorityFunction)Mockito.mock(PriorityFunction.class);
        Mockito.when((Object)priority.getPriority((RPCProtos.RequestHeader)Matchers.any(RPCProtos.RequestHeader.class), (Message)Matchers.any(Message.class), (User)Matchers.any(User.class))).thenReturn((Object)0);
        SimpleRpcScheduler scheduler = new SimpleRpcScheduler(schedConf, 0, 0, 0, priority, 10);
        try {
            scheduler.start();
            CallRunner putCallTask = (CallRunner)Mockito.mock(CallRunner.class);
            ServerCall putCall = (ServerCall)Mockito.mock(ServerCall.class);
            putCall.param = RequestConverter.buildMutateRequest((byte[])Bytes.toBytes((String)"abc"), (Put)new Put(Bytes.toBytes((String)"row")));
            RPCProtos.RequestHeader putHead = RPCProtos.RequestHeader.newBuilder().setMethodName("mutate").build();
            Mockito.when((Object)putCallTask.getRpcCall()).thenReturn((Object)putCall);
            Mockito.when((Object)putCall.getHeader()).thenReturn((Object)putHead);
            Mockito.when((Object)putCall.getRequestUser()).thenReturn(Optional.empty());
            Assert.assertTrue((boolean)scheduler.dispatch(putCallTask));
            schedConf.setInt("hbase.ipc.server.max.callqueue.length", 0);
            scheduler.onConfigurationChange(schedConf);
            Assert.assertFalse((boolean)scheduler.dispatch(putCallTask));
            TestSimpleRpcScheduler.waitUntilQueueEmpty(scheduler);
            schedConf.setInt("hbase.ipc.server.max.callqueue.length", 1);
            scheduler.onConfigurationChange(schedConf);
            Assert.assertTrue((boolean)scheduler.dispatch(putCallTask));
        }
        finally {
            scheduler.stop();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testCoDelScheduling() throws Exception {
        CoDelEnvironmentEdge envEdge = new CoDelEnvironmentEdge();
        envEdge.threadNamePrefixs.add("RpcServer.default.FPBQ.Codel.handler");
        Configuration schedConf = HBaseConfiguration.create();
        schedConf.setInt("hbase.ipc.server.max.callqueue.length", 250);
        schedConf.set("hbase.ipc.server.callqueue.type", "codel");
        PriorityFunction priority = (PriorityFunction)Mockito.mock(PriorityFunction.class);
        Mockito.when((Object)priority.getPriority((RPCProtos.RequestHeader)Matchers.any(RPCProtos.RequestHeader.class), (Message)Matchers.any(Message.class), (User)Matchers.any(User.class))).thenReturn((Object)0);
        SimpleRpcScheduler scheduler = new SimpleRpcScheduler(schedConf, 1, 1, 1, priority, 10);
        try {
            CallRunner cr;
            long time;
            int i;
            this.getMockedCallRunner(System.currentTimeMillis(), 2L);
            scheduler.start();
            EnvironmentEdgeManager.injectEdge((EnvironmentEdge)envEdge);
            envEdge.offset = 5L;
            for (i = 0; i < 100; ++i) {
                time = System.currentTimeMillis();
                envEdge.timeQ.put(time);
                cr = this.getMockedCallRunner(time, 2L);
                scheduler.dispatch(cr);
            }
            TestSimpleRpcScheduler.waitUntilQueueEmpty(scheduler);
            Thread.sleep(100L);
            Assert.assertEquals((String)"None of these calls should have been discarded", (long)0L, (long)scheduler.getNumGeneralCallsDropped());
            envEdge.offset = 151L;
            for (i = 0; i < 20; ++i) {
                time = System.currentTimeMillis();
                envEdge.timeQ.put(time);
                cr = this.getMockedCallRunner(time, 2L);
                scheduler.dispatch(cr);
            }
            TestSimpleRpcScheduler.waitUntilQueueEmpty(scheduler);
            Thread.sleep(100L);
            Assert.assertEquals((String)"None of these calls should have been discarded", (long)0L, (long)scheduler.getNumGeneralCallsDropped());
            envEdge.offset = 2000L;
            for (i = 0; i < 60; ++i) {
                time = System.currentTimeMillis();
                envEdge.timeQ.put(time);
                cr = this.getMockedCallRunner(time, 100L);
                scheduler.dispatch(cr);
            }
            TestSimpleRpcScheduler.waitUntilQueueEmpty(scheduler);
            Thread.sleep(100L);
            Assert.assertTrue((String)("There should have been at least 12 calls dropped however there were " + scheduler.getNumGeneralCallsDropped()), (scheduler.getNumGeneralCallsDropped() > 12L ? 1 : 0) != 0);
        }
        finally {
            scheduler.stop();
        }
    }

    private CallRunner getMockedCallRunner(long timestamp, final long sleepTime) throws IOException {
        final ServerCall putCall = new ServerCall(1, null, null, RPCProtos.RequestHeader.newBuilder().setMethodName("mutate").build(), (Message)RequestConverter.buildMutateRequest((byte[])Bytes.toBytes((String)"abc"), (Put)new Put(Bytes.toBytes((String)"row"))), null, null, 9L, null, null, timestamp, 0, null, null, null){

            public void sendResponseIfReady() throws IOException {
            }
        };
        CallRunner cr = new CallRunner(null, (RpcCall)putCall){

            public void run() {
                if (sleepTime <= 0L) {
                    return;
                }
                try {
                    LOG.warn((Object)("Sleeping for " + sleepTime));
                    Thread.sleep(sleepTime);
                    LOG.warn((Object)("Done Sleeping for " + sleepTime));
                }
                catch (InterruptedException interruptedException) {
                    // empty catch block
                }
            }

            public RpcCall getRpcCall() {
                return putCall;
            }

            public void drop() {
            }
        };
        return cr;
    }

    private static final class CoDelEnvironmentEdge
    implements EnvironmentEdge {
        private final BlockingQueue<Long> timeQ = new LinkedBlockingQueue<Long>();
        private long offset;
        private final Set<String> threadNamePrefixs = new HashSet<String>();

        private CoDelEnvironmentEdge() {
        }

        public long currentTime() {
            for (String threadNamePrefix : this.threadNamePrefixs) {
                String threadName = Thread.currentThread().getName();
                if (!threadName.startsWith(threadNamePrefix)) continue;
                return (Long)this.timeQ.poll() + this.offset;
            }
            return System.currentTimeMillis();
        }
    }
}

