/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.hbase.client;

import java.io.IOException;
import java.io.InterruptedIOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.CallDroppedException;
import org.apache.hadoop.hbase.CallQueueTooBigException;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseServerException;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.RegionLocations;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.AbstractResponse;
import org.apache.hadoop.hbase.client.Action;
import org.apache.hadoop.hbase.client.AsyncProcess;
import org.apache.hadoop.hbase.client.AsyncProcessTask;
import org.apache.hadoop.hbase.client.AsyncRequestFuture;
import org.apache.hadoop.hbase.client.AsyncRequestFutureImpl;
import org.apache.hadoop.hbase.client.BufferedMutatorImpl;
import org.apache.hadoop.hbase.client.BufferedMutatorParams;
import org.apache.hadoop.hbase.client.CancellableRegionServerCallable;
import org.apache.hadoop.hbase.client.ClusterConnection;
import org.apache.hadoop.hbase.client.ConnectionConfiguration;
import org.apache.hadoop.hbase.client.ConnectionImplementation;
import org.apache.hadoop.hbase.client.ConnectionRegistry;
import org.apache.hadoop.hbase.client.ConnectionUtils;
import org.apache.hadoop.hbase.client.Consistency;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.DoNothingConnectionRegistry;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.MultiAction;
import org.apache.hadoop.hbase.client.MultiResponse;
import org.apache.hadoop.hbase.client.MultiServerCallable;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.NonceGenerator;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.client.RegionReplicaUtil;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.RetriesExhaustedException;
import org.apache.hadoop.hbase.client.RetriesExhaustedWithDetailsException;
import org.apache.hadoop.hbase.client.RetryingCallable;
import org.apache.hadoop.hbase.client.RetryingCallerInterceptorFactory;
import org.apache.hadoop.hbase.client.Row;
import org.apache.hadoop.hbase.client.RpcRetryingCaller;
import org.apache.hadoop.hbase.client.RpcRetryingCallerFactory;
import org.apache.hadoop.hbase.client.RpcRetryingCallerImpl;
import org.apache.hadoop.hbase.client.ServerStatisticTracker;
import org.apache.hadoop.hbase.client.SimpleRequestController;
import org.apache.hadoop.hbase.client.backoff.ClientBackoffPolicy;
import org.apache.hadoop.hbase.client.backoff.ServerStatistics;
import org.apache.hadoop.hbase.client.coprocessor.Batch;
import org.apache.hadoop.hbase.exceptions.RegionOpeningException;
import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
import org.apache.hadoop.hbase.testclassification.ClientTests;
import org.apache.hadoop.hbase.testclassification.LargeTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.Threads;
import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.junit.Assert;
import org.junit.Before;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.mockito.Mockito;
import org.mockito.verification.VerificationMode;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Category(value={ClientTests.class, LargeTests.class})
public class TestAsyncProcess {
    @ClassRule
    public static final HBaseClassTestRule CLASS_RULE = HBaseClassTestRule.forClass(TestAsyncProcess.class);
    private static final Logger LOG = LoggerFactory.getLogger(TestAsyncProcess.class);
    private static final TableName DUMMY_TABLE = TableName.valueOf((String)"DUMMY_TABLE");
    private static final byte[] DUMMY_BYTES_1 = Bytes.toBytes((String)"DUMMY_BYTES_1");
    private static final byte[] DUMMY_BYTES_2 = Bytes.toBytes((String)"DUMMY_BYTES_2");
    private static final byte[] DUMMY_BYTES_3 = Bytes.toBytes((String)"DUMMY_BYTES_3");
    private static final byte[] FAILS = Bytes.toBytes((String)"FAILS");
    private Configuration CONF;
    private ConnectionConfiguration CONNECTION_CONFIG;
    private static final ServerName sn = ServerName.valueOf((String)"s1,1,1");
    private static final ServerName sn2 = ServerName.valueOf((String)"s2,2,2");
    private static final ServerName sn3 = ServerName.valueOf((String)"s3,3,3");
    private static final HRegionInfo hri1 = new HRegionInfo(DUMMY_TABLE, DUMMY_BYTES_1, DUMMY_BYTES_2, false, 1L);
    private static final HRegionInfo hri2 = new HRegionInfo(DUMMY_TABLE, DUMMY_BYTES_2, HConstants.EMPTY_END_ROW, false, 2L);
    private static final HRegionInfo hri3 = new HRegionInfo(DUMMY_TABLE, DUMMY_BYTES_3, HConstants.EMPTY_END_ROW, false, 3L);
    private static final HRegionLocation loc1 = new HRegionLocation((RegionInfo)hri1, sn);
    private static final HRegionLocation loc2 = new HRegionLocation((RegionInfo)hri2, sn);
    private static final HRegionLocation loc3 = new HRegionLocation((RegionInfo)hri3, sn2);
    private static final RegionInfo hri1r1 = RegionReplicaUtil.getRegionInfoForReplica((RegionInfo)hri1, (int)1);
    private static final RegionInfo hri1r2 = RegionReplicaUtil.getRegionInfoForReplica((RegionInfo)hri1, (int)2);
    private static final RegionInfo hri2r1 = RegionReplicaUtil.getRegionInfoForReplica((RegionInfo)hri2, (int)1);
    private static final RegionLocations hrls1 = new RegionLocations(new HRegionLocation[]{new HRegionLocation((RegionInfo)hri1, sn), new HRegionLocation(hri1r1, sn2), new HRegionLocation(hri1r2, sn3)});
    private static final RegionLocations hrls2 = new RegionLocations(new HRegionLocation[]{new HRegionLocation((RegionInfo)hri2, sn2), new HRegionLocation(hri2r1, sn3)});
    private static final RegionLocations hrls3 = new RegionLocations(new HRegionLocation[]{new HRegionLocation((RegionInfo)hri3, sn3), null});
    private static final String success = "success";
    private static Exception failure = new Exception("failure");
    private static final int NB_RETRIES = 3;
    private int RPC_TIMEOUT;
    private int OPERATION_TIMEOUT;

    @Before
    public void beforeEach() {
        this.CONF = new Configuration();
        this.CONF.setInt("hbase.client.retries.number", 3);
        this.CONNECTION_CONFIG = new ConnectionConfiguration(this.CONF);
        this.RPC_TIMEOUT = this.CONF.getInt("hbase.rpc.timeout", 60000);
        this.OPERATION_TIMEOUT = this.CONF.getInt("hbase.client.operation.timeout", 1200000);
    }

    static MultiResponse createMultiResponse(MultiAction multi, AtomicInteger nbMultiResponse, AtomicInteger nbActions, ResponseGenerator gen) {
        MultiResponse mr = new MultiResponse();
        nbMultiResponse.incrementAndGet();
        for (Map.Entry entry : multi.actions.entrySet()) {
            byte[] regionName = (byte[])entry.getKey();
            for (Action a : (List)entry.getValue()) {
                nbActions.incrementAndGet();
                gen.addResponse(mr, regionName, a);
            }
        }
        return mr;
    }

    @Test
    public void testListRowAccess() {
        String v;
        int count = 10;
        ArrayList<String> values = new ArrayList<String>();
        for (int i = 0; i != count; ++i) {
            values.add(String.valueOf(i));
        }
        AsyncProcessTask.ListRowAccess taker = new AsyncProcessTask.ListRowAccess(values);
        Assert.assertEquals((long)count, (long)taker.size());
        int takeCount = 0;
        Iterator it = taker.iterator();
        while (it.hasNext()) {
            v = (String)it.next();
            Assert.assertEquals((Object)String.valueOf(takeCount), (Object)v);
            ++takeCount;
            it.remove();
            if (!(Math.random() >= 0.5)) continue;
            break;
        }
        Assert.assertEquals((long)count, (long)(taker.size() + takeCount));
        it = taker.iterator();
        while (it.hasNext()) {
            v = (String)it.next();
            Assert.assertEquals((Object)String.valueOf(takeCount), (Object)v);
            ++takeCount;
            it.remove();
        }
        Assert.assertEquals((long)0L, (long)taker.size());
        Assert.assertEquals((long)count, (long)takeCount);
    }

    private static long calculateRequestCount(long putSizePerServer, long maxHeapSizePerRequest) {
        if (putSizePerServer <= maxHeapSizePerRequest) {
            return 1L;
        }
        if (putSizePerServer % maxHeapSizePerRequest == 0L) {
            return putSizePerServer / maxHeapSizePerRequest;
        }
        return putSizePerServer / maxHeapSizePerRequest + 1L;
    }

    @Test
    public void testSubmitSameSizeOfRequest() throws Exception {
        long writeBuffer;
        long putsHeapSize = writeBuffer = 0x200000L;
        this.doSubmitRequest(writeBuffer, putsHeapSize);
    }

    @Test
    public void testSubmitLargeRequestWithUnlimitedSize() throws Exception {
        long maxHeapSizePerRequest = Long.MAX_VALUE;
        long putsHeapSize = 0x200000L;
        this.doSubmitRequest(maxHeapSizePerRequest, putsHeapSize);
    }

    @Test
    public void testSubmitRandomSizeRequest() throws Exception {
        Random rn = new Random();
        long limit = 0xA00000L;
        int requestCount = 1 + (int)(rn.nextDouble() * 3.0);
        long n = rn.nextLong();
        if (n < 0L) {
            n = -n;
        } else if (n == 0L) {
            n = 1L;
        }
        long putsHeapSize = n % 0xA00000L;
        long maxHeapSizePerRequest = putsHeapSize / (long)requestCount;
        LOG.info("[testSubmitRandomSizeRequest] maxHeapSizePerRequest=" + maxHeapSizePerRequest + ", putsHeapSize=" + putsHeapSize);
        this.doSubmitRequest(maxHeapSizePerRequest, putsHeapSize);
    }

    @Test
    public void testSubmitSmallRequest() throws Exception {
        long maxHeapSizePerRequest = 0x200000L;
        long putsHeapSize = 100L;
        this.doSubmitRequest(maxHeapSizePerRequest, putsHeapSize);
    }

    @Test
    public void testSubmitLargeRequest() throws Exception {
        long maxHeapSizePerRequest = 0x200000L;
        long putsHeapSize = maxHeapSizePerRequest * 2L;
        this.doSubmitRequest(maxHeapSizePerRequest, putsHeapSize);
    }

    private void doSubmitRequest(long maxHeapSizePerRequest, long putsHeapSize) throws Exception {
        ClusterConnection conn = this.createHConnection();
        String defaultClazz = conn.getConfiguration().get("hbase.client.request.controller.impl");
        long defaultHeapSizePerRequest = conn.getConfiguration().getLong("hbase.client.max.perrequest.heapsize", 0x400000L);
        conn.getConfiguration().set("hbase.client.request.controller.impl", SimpleRequestController.class.getName());
        conn.getConfiguration().setLong("hbase.client.max.perrequest.heapsize", maxHeapSizePerRequest);
        long putSizeSN = 0L;
        long putSizeSN2 = 0L;
        ArrayList<Put> puts = new ArrayList<Put>();
        while (putSizeSN + putSizeSN2 <= putsHeapSize) {
            Put put1 = new Put(DUMMY_BYTES_1);
            put1.addColumn(DUMMY_BYTES_1, DUMMY_BYTES_1, DUMMY_BYTES_1);
            Put put2 = new Put(DUMMY_BYTES_2);
            put2.addColumn(DUMMY_BYTES_2, DUMMY_BYTES_2, DUMMY_BYTES_2);
            Put put3 = new Put(DUMMY_BYTES_3);
            put3.addColumn(DUMMY_BYTES_3, DUMMY_BYTES_3, DUMMY_BYTES_3);
            putSizeSN += put1.heapSize() + put2.heapSize();
            putSizeSN2 += put3.heapSize();
            puts.add(put1);
            puts.add(put2);
            puts.add(put3);
        }
        int minCountSnRequest = (int)TestAsyncProcess.calculateRequestCount(putSizeSN, maxHeapSizePerRequest);
        int minCountSn2Request = (int)TestAsyncProcess.calculateRequestCount(putSizeSN2, maxHeapSizePerRequest);
        LOG.info("Total put count:" + puts.size() + ", putSizeSN:" + putSizeSN + ", putSizeSN2:" + putSizeSN2 + ", maxHeapSizePerRequest:" + maxHeapSizePerRequest + ", minCountSnRequest:" + minCountSnRequest + ", minCountSn2Request:" + minCountSn2Request);
        MyAsyncProcess ap = new MyAsyncProcess(conn, this.CONF);
        BufferedMutatorParams bufferParam = this.createBufferedMutatorParams(ap, DUMMY_TABLE);
        try (BufferedMutatorImpl mutator = new BufferedMutatorImpl(conn, bufferParam, (AsyncProcess)ap);){
            mutator.mutate(puts);
            mutator.flush();
            List<AsyncRequestFuture> reqs = ap.allReqs;
            int actualSnReqCount = 0;
            int actualSn2ReqCount = 0;
            for (AsyncRequestFuture req : reqs) {
                if (!(req instanceof AsyncRequestFutureImpl)) continue;
                MyAsyncRequestFutureImpl ars = (MyAsyncRequestFutureImpl)req;
                if (ars.getRequestHeapSize().containsKey(sn)) {
                    ++actualSnReqCount;
                }
                if (!ars.getRequestHeapSize().containsKey(sn2)) continue;
                ++actualSn2ReqCount;
            }
            Assert.assertEquals((Object)true, (Object)(minCountSnRequest <= actualSnReqCount ? 1 : 0));
            Assert.assertEquals((Object)true, (Object)(minCountSn2Request <= actualSn2ReqCount ? 1 : 0));
            HashMap<ServerName, Long> sizePerServers = new HashMap<ServerName, Long>();
            for (AsyncRequestFuture req : reqs) {
                if (!(req instanceof AsyncRequestFutureImpl)) continue;
                MyAsyncRequestFutureImpl ars = (MyAsyncRequestFutureImpl)req;
                Map<ServerName, List<Long>> requestHeapSize = ars.getRequestHeapSize();
                for (Map.Entry<ServerName, List<Long>> entry : requestHeapSize.entrySet()) {
                    long sum = 0L;
                    for (long size : entry.getValue()) {
                        Assert.assertEquals((Object)true, (Object)(size <= maxHeapSizePerRequest ? 1 : 0));
                        sum += size;
                    }
                    Assert.assertEquals((Object)true, (Object)(sum <= maxHeapSizePerRequest ? 1 : 0));
                    long value = sizePerServers.getOrDefault(entry.getKey(), 0L);
                    sizePerServers.put(entry.getKey(), value + sum);
                }
            }
            Assert.assertEquals((Object)true, (Object)sizePerServers.containsKey(sn));
            Assert.assertEquals((Object)true, (Object)sizePerServers.containsKey(sn2));
            Assert.assertEquals((Object)false, (Object)sizePerServers.containsKey(sn3));
            Assert.assertEquals((long)putSizeSN, (long)((Long)sizePerServers.get(sn)));
            Assert.assertEquals((long)putSizeSN2, (long)((Long)sizePerServers.get(sn2)));
        }
        conn.getConfiguration().setLong("hbase.client.max.perrequest.heapsize", defaultHeapSizePerRequest);
        if (defaultClazz != null) {
            conn.getConfiguration().set("hbase.client.request.controller.impl", defaultClazz);
        }
    }

    @Test
    public void testSubmit() throws Exception {
        ClusterConnection hc = this.createHConnection();
        MyAsyncProcess ap = new MyAsyncProcess(hc, this.CONF);
        ArrayList<Put> puts = new ArrayList<Put>(1);
        puts.add(this.createPut(1, true));
        ap.submit(null, DUMMY_TABLE, puts, false, null, false);
        Assert.assertTrue((boolean)puts.isEmpty());
    }

    @Test
    public void testSubmitWithCB() throws Exception {
        ClusterConnection hc = this.createHConnection();
        final AtomicInteger updateCalled = new AtomicInteger(0);
        Batch.Callback<Object> cb = new Batch.Callback<Object>(){

            public void update(byte[] region, byte[] row, Object result) {
                updateCalled.incrementAndGet();
            }
        };
        MyAsyncProcess ap = new MyAsyncProcess(hc, this.CONF);
        ArrayList<Put> puts = new ArrayList<Put>(1);
        puts.add(this.createPut(1, true));
        AsyncRequestFuture ars = ap.submit(null, DUMMY_TABLE, puts, false, cb, false);
        Assert.assertTrue((boolean)puts.isEmpty());
        ars.waitUntilDone();
        Assert.assertEquals((long)1L, (long)updateCalled.get());
    }

    @Test
    public void testSubmitBusyRegion() throws Exception {
        ClusterConnection conn = this.createHConnection();
        String defaultClazz = conn.getConfiguration().get("hbase.client.request.controller.impl");
        conn.getConfiguration().set("hbase.client.request.controller.impl", SimpleRequestController.class.getName());
        MyAsyncProcess ap = new MyAsyncProcess(conn, this.CONF);
        SimpleRequestController controller = (SimpleRequestController)ap.requestController;
        ArrayList<Put> puts = new ArrayList<Put>(1);
        puts.add(this.createPut(1, true));
        for (int i = 0; i != controller.maxConcurrentTasksPerRegion; ++i) {
            ap.incTaskCounters(Collections.singleton(hri1.getRegionName()), sn);
        }
        ap.submit(null, DUMMY_TABLE, puts, false, null, false);
        Assert.assertEquals((long)puts.size(), (long)1L);
        ap.decTaskCounters(Collections.singleton(hri1.getRegionName()), sn);
        ap.submit(null, DUMMY_TABLE, puts, false, null, false);
        Assert.assertEquals((long)0L, (long)puts.size());
        if (defaultClazz != null) {
            conn.getConfiguration().set("hbase.client.request.controller.impl", defaultClazz);
        }
    }

    @Test
    public void testSubmitBusyRegionServer() throws Exception {
        ClusterConnection conn = this.createHConnection();
        MyAsyncProcess ap = new MyAsyncProcess(conn, this.CONF);
        String defaultClazz = conn.getConfiguration().get("hbase.client.request.controller.impl");
        conn.getConfiguration().set("hbase.client.request.controller.impl", SimpleRequestController.class.getName());
        SimpleRequestController controller = (SimpleRequestController)ap.requestController;
        controller.taskCounterPerServer.put(sn2, new AtomicInteger(controller.maxConcurrentTasksPerServer));
        ArrayList<Put> puts = new ArrayList<Put>(4);
        puts.add(this.createPut(1, true));
        puts.add(this.createPut(3, true));
        puts.add(this.createPut(1, true));
        puts.add(this.createPut(2, true));
        ap.submit(null, DUMMY_TABLE, puts, false, null, false);
        Assert.assertEquals((String)(" puts=" + puts), (long)1L, (long)puts.size());
        controller.taskCounterPerServer.put(sn2, new AtomicInteger(controller.maxConcurrentTasksPerServer - 1));
        ap.submit(null, DUMMY_TABLE, puts, false, null, false);
        Assert.assertTrue((boolean)puts.isEmpty());
        if (defaultClazz != null) {
            conn.getConfiguration().set("hbase.client.request.controller.impl", defaultClazz);
        }
    }

    @Test
    public void testFail() throws Exception {
        MyAsyncProcess ap = new MyAsyncProcess(this.createHConnection(), this.CONF);
        ArrayList<Put> puts = new ArrayList<Put>(1);
        Put p = this.createPut(1, false);
        puts.add(p);
        AsyncRequestFuture ars = ap.submit(null, DUMMY_TABLE, puts, false, null, true);
        Assert.assertEquals((long)0L, (long)puts.size());
        ars.waitUntilDone();
        this.verifyResult(ars, false);
        Assert.assertEquals((long)4L, (long)ap.callsCt.get());
        Assert.assertEquals((long)1L, (long)ars.getErrors().exceptions.size());
        Assert.assertTrue((String)("was: " + ars.getErrors().exceptions.get(0)), (boolean)failure.equals(ars.getErrors().exceptions.get(0)));
        Assert.assertTrue((String)("was: " + ars.getErrors().exceptions.get(0)), (boolean)failure.equals(ars.getErrors().exceptions.get(0)));
        Assert.assertEquals((long)1L, (long)ars.getFailedOperations().size());
        Assert.assertTrue((String)("was: " + ars.getFailedOperations().get(0)), (boolean)p.equals(ars.getFailedOperations().get(0)));
    }

    @Test
    public void testSubmitTrue() throws IOException {
        ClusterConnection conn = this.createHConnection();
        MyAsyncProcess ap = new MyAsyncProcess(conn, this.CONF);
        String defaultClazz = conn.getConfiguration().get("hbase.client.request.controller.impl");
        conn.getConfiguration().set("hbase.client.request.controller.impl", SimpleRequestController.class.getName());
        final SimpleRequestController controller = (SimpleRequestController)ap.requestController;
        controller.tasksInProgress.incrementAndGet();
        final AtomicInteger ai = new AtomicInteger(controller.maxConcurrentTasksPerRegion);
        controller.taskCounterPerRegion.put(hri1.getRegionName(), ai);
        final AtomicBoolean checkPoint = new AtomicBoolean(false);
        final AtomicBoolean checkPoint2 = new AtomicBoolean(false);
        Thread t = new Thread(){

            @Override
            public void run() {
                Threads.sleep((long)1000L);
                Assert.assertFalse((boolean)checkPoint.get());
                ai.decrementAndGet();
                controller.tasksInProgress.decrementAndGet();
                checkPoint2.set(true);
            }
        };
        ArrayList<Put> puts = new ArrayList<Put>(1);
        Put p = this.createPut(1, true);
        puts.add(p);
        ap.submit(null, DUMMY_TABLE, puts, false, null, false);
        Assert.assertFalse((boolean)puts.isEmpty());
        t.start();
        ap.submit(null, DUMMY_TABLE, puts, true, null, false);
        Assert.assertTrue((boolean)puts.isEmpty());
        checkPoint.set(true);
        while (!checkPoint2.get()) {
            Threads.sleep((long)1L);
        }
        if (defaultClazz != null) {
            conn.getConfiguration().set("hbase.client.request.controller.impl", defaultClazz);
        }
    }

    @Test
    public void testFailAndSuccess() throws Exception {
        MyAsyncProcess ap = new MyAsyncProcess(this.createHConnection(), this.CONF);
        ArrayList<Put> puts = new ArrayList<Put>(3);
        puts.add(this.createPut(1, false));
        puts.add(this.createPut(1, true));
        puts.add(this.createPut(1, true));
        AsyncRequestFuture ars = ap.submit(null, DUMMY_TABLE, puts, false, null, true);
        Assert.assertTrue((boolean)puts.isEmpty());
        ars.waitUntilDone();
        this.verifyResult(ars, false, true, true);
        Assert.assertEquals((long)4L, (long)ap.callsCt.get());
        ap.callsCt.set(0);
        Assert.assertEquals((long)1L, (long)ars.getErrors().actions.size());
        puts.add(this.createPut(1, true));
        ap.waitForMaximumCurrentTasks(0, null);
        ars = ap.submit(null, DUMMY_TABLE, puts, false, null, true);
        Assert.assertEquals((long)0L, (long)puts.size());
        ars.waitUntilDone();
        Assert.assertEquals((long)1L, (long)ap.callsCt.get());
        this.verifyResult(ars, true);
    }

    @Test
    public void testFlush() throws Exception {
        MyAsyncProcess ap = new MyAsyncProcess(this.createHConnection(), this.CONF);
        ArrayList<Put> puts = new ArrayList<Put>(3);
        puts.add(this.createPut(1, false));
        puts.add(this.createPut(1, true));
        puts.add(this.createPut(1, true));
        AsyncRequestFuture ars = ap.submit(null, DUMMY_TABLE, puts, false, null, true);
        ars.waitUntilDone();
        this.verifyResult(ars, false, true, true);
        Assert.assertEquals((long)4L, (long)ap.callsCt.get());
        Assert.assertEquals((long)1L, (long)ars.getFailedOperations().size());
    }

    @Test
    public void testTaskCountWithoutClientBackoffPolicy() throws IOException, InterruptedException {
        ClusterConnection hc = this.createHConnection();
        MyAsyncProcess ap = new MyAsyncProcess(hc, this.CONF);
        this.testTaskCount(ap);
    }

    @Test
    public void testTaskCountWithClientBackoffPolicy() throws IOException, InterruptedException {
        Configuration copyConf = new Configuration(this.CONF);
        copyConf.setBoolean("hbase.client.backpressure.enabled", true);
        MyClientBackoffPolicy bp = new MyClientBackoffPolicy();
        ClusterConnection conn = this.createHConnection();
        Mockito.when((Object)conn.getConfiguration()).thenReturn((Object)copyConf);
        Mockito.when((Object)conn.getStatisticsTracker()).thenReturn((Object)ServerStatisticTracker.create((Configuration)copyConf));
        Mockito.when((Object)conn.getBackoffPolicy()).thenReturn((Object)bp);
        String defaultClazz = conn.getConfiguration().get("hbase.client.request.controller.impl");
        conn.getConfiguration().set("hbase.client.request.controller.impl", SimpleRequestController.class.getName());
        MyAsyncProcess ap = new MyAsyncProcess(conn, copyConf);
        this.testTaskCount(ap);
        if (defaultClazz != null) {
            conn.getConfiguration().set("hbase.client.request.controller.impl", defaultClazz);
        }
    }

    private void testTaskCount(MyAsyncProcess ap) throws InterruptedIOException, InterruptedException {
        SimpleRequestController controller = (SimpleRequestController)ap.requestController;
        ArrayList<Put> puts = new ArrayList<Put>();
        for (int i = 0; i != 3; ++i) {
            puts.add(this.createPut(1, true));
            puts.add(this.createPut(2, true));
            puts.add(this.createPut(3, true));
        }
        ap.submit(null, DUMMY_TABLE, puts, true, null, false);
        ap.waitForMaximumCurrentTasks(0, null);
        TimeUnit.SECONDS.sleep(1L);
        Assert.assertEquals((long)0L, (long)controller.tasksInProgress.get());
        for (AtomicInteger count : controller.taskCounterPerRegion.values()) {
            Assert.assertEquals((long)0L, (long)count.get());
        }
        for (AtomicInteger count : controller.taskCounterPerServer.values()) {
            Assert.assertEquals((long)0L, (long)count.get());
        }
    }

    @Test
    public void testMaxTask() throws Exception {
        ClusterConnection conn = this.createHConnection();
        String defaultClazz = conn.getConfiguration().get("hbase.client.request.controller.impl");
        conn.getConfiguration().set("hbase.client.request.controller.impl", SimpleRequestController.class.getName());
        final MyAsyncProcess ap = new MyAsyncProcess(conn, this.CONF);
        final SimpleRequestController controller = (SimpleRequestController)ap.requestController;
        for (int i = 0; i < 1000; ++i) {
            ap.incTaskCounters(Collections.singleton(Bytes.toBytes((String)"dummy")), sn);
        }
        final Thread myThread = Thread.currentThread();
        Thread t = new Thread(){

            @Override
            public void run() {
                Threads.sleep((long)2000L);
                myThread.interrupt();
            }
        };
        ArrayList<Put> puts = new ArrayList<Put>(1);
        puts.add(this.createPut(1, true));
        t.start();
        try {
            ap.submit(null, DUMMY_TABLE, puts, false, null, false);
            Assert.fail((String)"We should have been interrupted.");
        }
        catch (InterruptedIOException interruptedIOException) {
            // empty catch block
        }
        long sleepTime = 2000L;
        Thread t2 = new Thread(){

            @Override
            public void run() {
                Threads.sleep((long)2000L);
                while (controller.tasksInProgress.get() > 0L) {
                    ap.decTaskCounters(Collections.singleton(Bytes.toBytes((String)"dummy")), sn);
                }
            }
        };
        t2.start();
        long start = EnvironmentEdgeManager.currentTime();
        ap.submit(null, DUMMY_TABLE, new ArrayList(), false, null, false);
        long end = EnvironmentEdgeManager.currentTime();
        Assert.assertTrue((start + 100L + 2000L > end ? 1 : 0) != 0);
        if (defaultClazz != null) {
            conn.getConfiguration().set("hbase.client.request.controller.impl", defaultClazz);
        }
    }

    private ClusterConnection createHConnection() throws IOException {
        return this.createHConnection(this.CONNECTION_CONFIG);
    }

    private ClusterConnection createHConnection(ConnectionConfiguration configuration) throws IOException {
        ClusterConnection hc = this.createHConnectionCommon(configuration);
        TestAsyncProcess.setMockLocation(hc, DUMMY_BYTES_1, new RegionLocations(new HRegionLocation[]{loc1}));
        TestAsyncProcess.setMockLocation(hc, DUMMY_BYTES_2, new RegionLocations(new HRegionLocation[]{loc2}));
        TestAsyncProcess.setMockLocation(hc, DUMMY_BYTES_3, new RegionLocations(new HRegionLocation[]{loc3}));
        Mockito.when((Object)hc.locateRegions((TableName)Mockito.eq((Object)DUMMY_TABLE), Mockito.anyBoolean(), Mockito.anyBoolean())).thenReturn(Arrays.asList(loc1, loc2, loc3));
        TestAsyncProcess.setMockLocation(hc, FAILS, new RegionLocations(new HRegionLocation[]{loc2}));
        return hc;
    }

    private ClusterConnection createHConnectionWithReplicas(ConnectionConfiguration configuration) throws IOException {
        ClusterConnection hc = this.createHConnectionCommon(configuration);
        TestAsyncProcess.setMockLocation(hc, DUMMY_BYTES_1, hrls1);
        TestAsyncProcess.setMockLocation(hc, DUMMY_BYTES_2, hrls2);
        TestAsyncProcess.setMockLocation(hc, DUMMY_BYTES_3, hrls3);
        ArrayList<HRegionLocation> locations = new ArrayList<HRegionLocation>();
        for (HRegionLocation loc : hrls1.getRegionLocations()) {
            locations.add(loc);
        }
        for (HRegionLocation loc : hrls2.getRegionLocations()) {
            locations.add(loc);
        }
        for (HRegionLocation loc : hrls3.getRegionLocations()) {
            locations.add(loc);
        }
        Mockito.when((Object)hc.locateRegions((TableName)Mockito.eq((Object)DUMMY_TABLE), Mockito.anyBoolean(), Mockito.anyBoolean())).thenReturn(locations);
        return hc;
    }

    private static void setMockLocation(ClusterConnection hc, byte[] row, RegionLocations result) throws IOException {
        Mockito.when((Object)hc.locateRegion((TableName)Mockito.eq((Object)DUMMY_TABLE), (byte[])Mockito.eq((Object)row), Mockito.anyBoolean(), Mockito.anyBoolean(), Mockito.anyInt())).thenReturn((Object)result);
        Mockito.when((Object)hc.locateRegion((TableName)Mockito.eq((Object)DUMMY_TABLE), (byte[])Mockito.eq((Object)row), Mockito.anyBoolean(), Mockito.anyBoolean())).thenReturn((Object)result);
    }

    private ClusterConnection createHConnectionCommon(ConnectionConfiguration connectionConfiguration) {
        ClusterConnection hc = (ClusterConnection)Mockito.mock(ClusterConnection.class);
        NonceGenerator ng = (NonceGenerator)Mockito.mock(NonceGenerator.class);
        Mockito.when((Object)ng.getNonceGroup()).thenReturn((Object)0L);
        Mockito.when((Object)hc.getNonceGenerator()).thenReturn((Object)ng);
        Mockito.when((Object)hc.getConfiguration()).thenReturn((Object)this.CONF);
        Mockito.when((Object)hc.getConnectionConfiguration()).thenReturn((Object)connectionConfiguration);
        return hc;
    }

    @Test
    public void testHTablePutSuccess() throws Exception {
        ClusterConnection conn = this.createHConnection();
        MyAsyncProcess ap = new MyAsyncProcess(conn, this.CONF);
        BufferedMutatorParams bufferParam = this.createBufferedMutatorParams(ap, DUMMY_TABLE);
        BufferedMutatorImpl ht = new BufferedMutatorImpl(conn, bufferParam, (AsyncProcess)ap);
        Put put = this.createPut(1, true);
        Assert.assertEquals((long)conn.getConnectionConfiguration().getWriteBufferSize(), (long)ht.getWriteBufferSize());
        Assert.assertEquals((long)0L, (long)ht.getCurrentWriteBufferSize());
        ht.mutate((Mutation)put);
        ht.flush();
        Assert.assertEquals((long)0L, (long)ht.getCurrentWriteBufferSize());
    }

    @Test
    public void testSettingWriteBufferPeriodicFlushParameters() throws Exception {
        ClusterConnection conn = this.createHConnection();
        MyAsyncProcess ap = new MyAsyncProcess(conn, this.CONF);
        this.checkPeriodicFlushParameters(conn, ap, 1234L, 1234L, 1234L, 1234L);
        this.checkPeriodicFlushParameters(conn, ap, 0L, 0L, 0L, 100L);
        this.checkPeriodicFlushParameters(conn, ap, -1234L, 0L, -1234L, 100L);
        this.checkPeriodicFlushParameters(conn, ap, 1L, 1L, 1L, 100L);
    }

    private void checkPeriodicFlushParameters(ClusterConnection conn, MyAsyncProcess ap, long setTO, long expectTO, long setTT, long expectTT) {
        BufferedMutatorParams bufferParam = this.createBufferedMutatorParams(ap, DUMMY_TABLE);
        bufferParam.setWriteBufferPeriodicFlushTimeoutMs(setTO);
        bufferParam.setWriteBufferPeriodicFlushTimerTickMs(setTT);
        Assert.assertEquals((long)setTO, (long)bufferParam.getWriteBufferPeriodicFlushTimeoutMs());
        Assert.assertEquals((long)setTT, (long)bufferParam.getWriteBufferPeriodicFlushTimerTickMs());
        BufferedMutatorImpl ht1 = new BufferedMutatorImpl(conn, bufferParam, (AsyncProcess)ap);
        Assert.assertEquals((long)expectTO, (long)ht1.getWriteBufferPeriodicFlushTimeoutMs());
        Assert.assertEquals((long)expectTT, (long)ht1.getWriteBufferPeriodicFlushTimerTickMs());
        BufferedMutatorImpl ht2 = new BufferedMutatorImpl(conn, this.createBufferedMutatorParams(ap, DUMMY_TABLE), (AsyncProcess)ap);
        ht2.setWriteBufferPeriodicFlush(setTO, setTT);
        Assert.assertEquals((long)expectTO, (long)ht2.getWriteBufferPeriodicFlushTimeoutMs());
        Assert.assertEquals((long)expectTT, (long)ht2.getWriteBufferPeriodicFlushTimerTickMs());
    }

    @Test
    public void testWriteBufferPeriodicFlushTimeoutMs() throws Exception {
        ClusterConnection conn = this.createHConnection();
        MyAsyncProcess ap = new MyAsyncProcess(conn, this.CONF);
        BufferedMutatorParams bufferParam = this.createBufferedMutatorParams(ap, DUMMY_TABLE);
        bufferParam.setWriteBufferPeriodicFlushTimeoutMs(1L);
        bufferParam.setWriteBufferPeriodicFlushTimerTickMs(1L);
        bufferParam.writeBufferSize(10000L);
        BufferedMutatorImpl ht = new BufferedMutatorImpl(conn, bufferParam, (AsyncProcess)ap);
        Assert.assertEquals((long)10000L, (long)ht.getWriteBufferSize());
        Assert.assertEquals((long)1L, (long)ht.getWriteBufferPeriodicFlushTimeoutMs());
        Assert.assertEquals((long)100L, (long)ht.getWriteBufferPeriodicFlushTimerTickMs());
        Put put = this.createPut(1, true);
        Assert.assertEquals((long)0L, (long)ht.getExecutedWriteBufferPeriodicFlushes());
        Assert.assertEquals((long)0L, (long)ht.getCurrentWriteBufferSize());
        ht.mutate((Mutation)put);
        ht.flush();
        Thread.sleep(1000L);
        Assert.assertEquals((long)0L, (long)ht.getExecutedWriteBufferPeriodicFlushes());
        Assert.assertEquals((long)0L, (long)ht.getCurrentWriteBufferSize());
        ht.mutate((Mutation)put);
        Assert.assertEquals((long)0L, (long)ht.getExecutedWriteBufferPeriodicFlushes());
        Assert.assertTrue((ht.getCurrentWriteBufferSize() > 0L ? 1 : 0) != 0);
        Thread.sleep(200L);
        Assert.assertEquals((long)1L, (long)ht.getExecutedWriteBufferPeriodicFlushes());
        Assert.assertEquals((long)0L, (long)ht.getCurrentWriteBufferSize());
        Thread.sleep(200L);
        Assert.assertEquals((long)1L, (long)ht.getExecutedWriteBufferPeriodicFlushes());
        Assert.assertEquals((long)0L, (long)ht.getCurrentWriteBufferSize());
        ht.disableWriteBufferPeriodicFlush();
        ht.mutate((Mutation)put);
        Assert.assertEquals((long)1L, (long)ht.getExecutedWriteBufferPeriodicFlushes());
        Assert.assertTrue((ht.getCurrentWriteBufferSize() > 0L ? 1 : 0) != 0);
        Thread.sleep(200L);
        Assert.assertEquals((long)1L, (long)ht.getExecutedWriteBufferPeriodicFlushes());
        Assert.assertTrue((ht.getCurrentWriteBufferSize() > 0L ? 1 : 0) != 0);
        ht.setWriteBufferPeriodicFlush(1L, 100L);
        Thread.sleep(2000L);
        Assert.assertEquals((long)2L, (long)ht.getExecutedWriteBufferPeriodicFlushes());
        Assert.assertEquals((long)0L, (long)ht.getCurrentWriteBufferSize());
    }

    @Test
    public void testBufferedMutatorImplWithSharedPool() throws Exception {
        ClusterConnection conn = this.createHConnection();
        MyAsyncProcess ap = new MyAsyncProcess(conn, this.CONF);
        BufferedMutatorParams bufferParam = this.createBufferedMutatorParams(ap, DUMMY_TABLE);
        BufferedMutatorImpl ht = new BufferedMutatorImpl(conn, bufferParam, (AsyncProcess)ap);
        ht.close();
        Assert.assertFalse((boolean)ap.service.isShutdown());
    }

    @Test
    public void testFailedPutAndNewPut() throws Exception {
        ClusterConnection conn = this.createHConnection();
        MyAsyncProcess ap = new MyAsyncProcess(conn, this.CONF);
        BufferedMutatorParams bufferParam = this.createBufferedMutatorParams(ap, DUMMY_TABLE).writeBufferSize(0L);
        BufferedMutatorImpl mutator = new BufferedMutatorImpl(conn, bufferParam, (AsyncProcess)ap);
        Put p = this.createPut(1, false);
        try {
            mutator.mutate((Mutation)p);
            Assert.fail();
        }
        catch (RetriesExhaustedWithDetailsException expected) {
            Assert.assertEquals((long)1L, (long)expected.getNumExceptions());
            Assert.assertTrue((expected.getRow(0) == p ? 1 : 0) != 0);
        }
        ap.waitForMaximumCurrentTasks(0, null);
        Assert.assertEquals((long)0L, (long)mutator.size());
        mutator.mutate((Mutation)this.createPut(1, true));
        Assert.assertEquals((String)"the put should not been inserted.", (long)0L, (long)mutator.size());
    }

    @Test
    public void testAction() {
        Action action_0 = new Action((Row)new Put(Bytes.toBytes((String)"abc")), 10);
        Action action_1 = new Action((Row)new Put(Bytes.toBytes((String)"ccc")), 10);
        Action action_2 = new Action((Row)new Put(Bytes.toBytes((String)"ccc")), 10);
        Action action_3 = new Action((Row)new Delete(Bytes.toBytes((String)"ccc")), 10);
        Assert.assertFalse((boolean)action_0.equals((Object)action_1));
        Assert.assertTrue((boolean)action_0.equals((Object)action_0));
        Assert.assertTrue((boolean)action_1.equals((Object)action_2));
        Assert.assertTrue((boolean)action_2.equals((Object)action_1));
        Assert.assertFalse((boolean)action_0.equals((Object)new Put(Bytes.toBytes((String)"abc"))));
        Assert.assertTrue((boolean)action_2.equals((Object)action_3));
        Assert.assertFalse((boolean)action_0.equals((Object)action_3));
        Assert.assertEquals((long)0L, (long)action_0.compareTo(action_0));
        Assert.assertTrue((action_0.compareTo(action_1) < 0 ? 1 : 0) != 0);
        Assert.assertTrue((action_1.compareTo(action_0) > 0 ? 1 : 0) != 0);
        Assert.assertEquals((long)0L, (long)action_1.compareTo(action_2));
    }

    @Test
    public void testBatch() throws IOException, InterruptedException {
        MyConnectionImpl conn = new MyConnectionImpl(this.CONF);
        HTable ht = (HTable)conn.getTable(DUMMY_TABLE);
        ht.multiAp = new MyAsyncProcess((ClusterConnection)conn, this.CONF);
        ArrayList<Put> puts = new ArrayList<Put>(7);
        puts.add(this.createPut(1, true));
        puts.add(this.createPut(1, true));
        puts.add(this.createPut(1, true));
        puts.add(this.createPut(1, true));
        puts.add(this.createPut(1, false));
        puts.add(this.createPut(1, true));
        puts.add(this.createPut(1, false));
        Object[] res = new Object[puts.size()];
        try {
            ht.batch(puts, res);
            Assert.fail();
        }
        catch (RetriesExhaustedException retriesExhaustedException) {
            // empty catch block
        }
        Assert.assertEquals((Object)success, (Object)res[0]);
        Assert.assertEquals((Object)success, (Object)res[1]);
        Assert.assertEquals((Object)success, (Object)res[2]);
        Assert.assertEquals((Object)success, (Object)res[3]);
        Assert.assertEquals((Object)failure, (Object)res[4]);
        Assert.assertEquals((Object)success, (Object)res[5]);
        Assert.assertEquals((Object)failure, (Object)res[6]);
    }

    @Test
    public void testErrorsServers() throws IOException {
        Configuration configuration = new Configuration(this.CONF);
        MyConnectionImpl conn = new MyConnectionImpl(configuration);
        MyAsyncProcess ap = new MyAsyncProcess((ClusterConnection)conn, configuration);
        BufferedMutatorParams bufferParam = this.createBufferedMutatorParams(ap, DUMMY_TABLE);
        BufferedMutatorImpl mutator = new BufferedMutatorImpl((ClusterConnection)conn, bufferParam, (AsyncProcess)ap);
        configuration.setBoolean("hbase.client.retries.by.server", true);
        Assert.assertNotNull((Object)ap.createServerErrorTracker());
        Assert.assertTrue((ap.serverTrackerTimeout > 200L ? 1 : 0) != 0);
        ap.serverTrackerTimeout = 1L;
        Put p = this.createPut(1, false);
        mutator.mutate((Mutation)p);
        try {
            mutator.flush();
            Assert.fail();
        }
        catch (RetriesExhaustedWithDetailsException expected) {
            Assert.assertEquals((long)1L, (long)expected.getNumExceptions());
            Assert.assertTrue((expected.getRow(0) == p ? 1 : 0) != 0);
        }
        Assert.assertEquals((long)4L, (long)ap.callsCt.get());
    }

    @Test
    public void testReadAndWriteTimeout() throws IOException {
        long readTimeout = 10000L;
        long writeTimeout = 20000L;
        Configuration copyConf = new Configuration(this.CONF);
        copyConf.setLong("hbase.rpc.read.timeout", 10000L);
        copyConf.setLong("hbase.rpc.write.timeout", 20000L);
        MyConnectionImpl conn = new MyConnectionImpl(copyConf);
        MyAsyncProcess ap = new MyAsyncProcess((ClusterConnection)conn, copyConf);
        try (HTable ht = (HTable)conn.getTable(DUMMY_TABLE);){
            ht.multiAp = ap;
            ArrayList<Get> gets = new ArrayList<Get>();
            gets.add(new Get(DUMMY_BYTES_1));
            gets.add(new Get(DUMMY_BYTES_2));
            try {
                ht.get(gets);
            }
            catch (ClassCastException classCastException) {
                // empty catch block
            }
            Assert.assertEquals((long)10000L, (long)ap.previousTimeout);
            ap.previousTimeout = -1L;
            try {
                ht.existsAll(gets);
            }
            catch (ClassCastException classCastException) {
                // empty catch block
            }
            Assert.assertEquals((long)10000L, (long)ap.previousTimeout);
            ap.previousTimeout = -1L;
            ArrayList<Delete> deletes = new ArrayList<Delete>();
            deletes.add(new Delete(DUMMY_BYTES_1));
            deletes.add(new Delete(DUMMY_BYTES_2));
            ht.delete(deletes);
            Assert.assertEquals((long)20000L, (long)ap.previousTimeout);
        }
    }

    @Test
    public void testErrors() throws IOException {
        MyConnectionImpl conn = new MyConnectionImpl(this.CONF);
        AsyncProcessWithFailure ap = new AsyncProcessWithFailure((ClusterConnection)conn, this.CONF, new IOException("test"));
        BufferedMutatorParams bufferParam = this.createBufferedMutatorParams(ap, DUMMY_TABLE);
        BufferedMutatorImpl mutator = new BufferedMutatorImpl((ClusterConnection)conn, bufferParam, (AsyncProcess)ap);
        Assert.assertNotNull((Object)ap.createServerErrorTracker());
        Put p = this.createPut(1, true);
        mutator.mutate((Mutation)p);
        try {
            mutator.flush();
            Assert.fail();
        }
        catch (RetriesExhaustedWithDetailsException expected) {
            Assert.assertEquals((long)1L, (long)expected.getNumExceptions());
            Assert.assertTrue((expected.getRow(0) == p ? 1 : 0) != 0);
        }
        Assert.assertEquals((long)4L, (long)ap.callsCt.get());
    }

    @Test
    public void testCallQueueTooLarge() throws IOException {
        MyConnectionImpl conn = new MyConnectionImpl(this.CONF);
        AsyncProcessWithFailure ap = new AsyncProcessWithFailure((ClusterConnection)conn, this.CONF, (IOException)new CallQueueTooBigException());
        BufferedMutatorParams bufferParam = this.createBufferedMutatorParams(ap, DUMMY_TABLE);
        BufferedMutatorImpl mutator = new BufferedMutatorImpl((ClusterConnection)conn, bufferParam, (AsyncProcess)ap);
        Assert.assertNotNull((Object)ap.createServerErrorTracker());
        Put p = this.createPut(1, true);
        mutator.mutate((Mutation)p);
        try {
            mutator.flush();
            Assert.fail();
        }
        catch (RetriesExhaustedWithDetailsException expected) {
            Assert.assertEquals((long)1L, (long)expected.getNumExceptions());
            Assert.assertTrue((expected.getRow(0) == p ? 1 : 0) != 0);
        }
        Assert.assertEquals((long)4L, (long)ap.callsCt.get());
    }

    @Test
    public void testThreadCreation() throws Exception {
        int NB_REGS = 100;
        ArrayList<HRegionLocation> hrls = new ArrayList<HRegionLocation>(100);
        ArrayList<Get> gets = new ArrayList<Get>(100);
        for (int i = 0; i < 100; ++i) {
            HRegionInfo hri = new HRegionInfo(DUMMY_TABLE, Bytes.toBytes((long)((long)i * 10L)), Bytes.toBytes((long)((long)i * 10L + 9L)), false, (long)i);
            HRegionLocation hrl = new HRegionLocation((RegionInfo)hri, i % 2 == 0 ? sn : sn2);
            hrls.add(hrl);
            Get get = new Get(Bytes.toBytes((long)((long)i * 10L)));
            gets.add(get);
        }
        MyConnectionImpl2 con = new MyConnectionImpl2(hrls, this.CONF);
        MyAsyncProcess ap = new MyAsyncProcess((ClusterConnection)con, this.CONF, con.nbThreads);
        HTable ht = (HTable)con.getTable(DUMMY_TABLE, ap.service);
        ht.multiAp = ap;
        ht.batch(gets, null);
        Assert.assertEquals((long)100L, (long)ap.nbActions.get());
        Assert.assertEquals((String)"1 multi response per server", (long)2L, (long)ap.nbMultiResponse.get());
        Assert.assertEquals((String)"1 thread per server", (long)2L, (long)con.nbThreads.get());
        int nbReg = 0;
        for (int i = 0; i < 100; ++i) {
            if (!con.usedRegions[i]) continue;
            ++nbReg;
        }
        Assert.assertEquals((String)("nbReg=" + nbReg), (long)100L, (long)nbReg);
    }

    @Test
    public void testReplicaReplicaSuccess() throws Exception {
        MyAsyncProcessWithReplicas ap = this.createReplicaAp(10, 1000, 0);
        List<Get> rows = TestAsyncProcess.makeTimelineGets(DUMMY_BYTES_1, DUMMY_BYTES_2, DUMMY_BYTES_3);
        AsyncProcessTask task = AsyncProcessTask.newBuilder().setPool(ap.service).setRpcTimeout(this.RPC_TIMEOUT).setOperationTimeout(this.OPERATION_TIMEOUT).setTableName(DUMMY_TABLE).setRowAccess(rows).setResults(new Object[3]).setSubmittedRows(AsyncProcessTask.SubmittedRows.ALL).build();
        AsyncRequestFuture ars = ap.submit(task);
        this.verifyReplicaResult(ars, RR.TRUE, RR.TRUE, RR.FALSE);
        Assert.assertEquals((long)2L, (long)ap.getReplicaCallCount());
    }

    @Test
    public void testReplicaPrimarySuccessWoReplicaCalls() throws Exception {
        MyAsyncProcessWithReplicas ap = this.createReplicaAp(1000, 10, 0);
        List<Get> rows = TestAsyncProcess.makeTimelineGets(DUMMY_BYTES_1, DUMMY_BYTES_2, DUMMY_BYTES_3);
        AsyncProcessTask task = AsyncProcessTask.newBuilder().setPool(ap.service).setRpcTimeout(this.RPC_TIMEOUT).setOperationTimeout(this.OPERATION_TIMEOUT).setTableName(DUMMY_TABLE).setRowAccess(rows).setResults(new Object[3]).setSubmittedRows(AsyncProcessTask.SubmittedRows.ALL).build();
        AsyncRequestFuture ars = ap.submit(task);
        this.verifyReplicaResult(ars, RR.FALSE, RR.FALSE, RR.FALSE);
        Assert.assertEquals((long)0L, (long)ap.getReplicaCallCount());
    }

    @Test
    public void testReplicaParallelCallsSucceed() throws Exception {
        MyAsyncProcessWithReplicas ap = this.createReplicaAp(0, 0, 0);
        List<Get> rows = TestAsyncProcess.makeTimelineGets(DUMMY_BYTES_1, DUMMY_BYTES_2);
        AsyncProcessTask task = AsyncProcessTask.newBuilder().setPool(ap.service).setRpcTimeout(this.RPC_TIMEOUT).setOperationTimeout(this.OPERATION_TIMEOUT).setTableName(DUMMY_TABLE).setRowAccess(rows).setResults(new Object[2]).setSubmittedRows(AsyncProcessTask.SubmittedRows.ALL).build();
        AsyncRequestFuture ars = ap.submit(task);
        this.verifyReplicaResult(ars, RR.DONT_CARE, RR.DONT_CARE);
        long replicaCalls = ap.getReplicaCallCount();
        Assert.assertTrue((replicaCalls >= 0L ? 1 : 0) != 0);
        Assert.assertTrue((replicaCalls <= 2L ? 1 : 0) != 0);
    }

    @Test
    public void testReplicaPartialReplicaCall() throws Exception {
        MyAsyncProcessWithReplicas ap = this.createReplicaAp(1000, 0, 0);
        ap.setPrimaryCallDelay(sn2, 2000L);
        List<Get> rows = TestAsyncProcess.makeTimelineGets(DUMMY_BYTES_1, DUMMY_BYTES_2);
        AsyncProcessTask task = AsyncProcessTask.newBuilder().setPool(ap.service).setRpcTimeout(this.RPC_TIMEOUT).setOperationTimeout(this.OPERATION_TIMEOUT).setTableName(DUMMY_TABLE).setRowAccess(rows).setResults(new Object[2]).setSubmittedRows(AsyncProcessTask.SubmittedRows.ALL).build();
        AsyncRequestFuture ars = ap.submit(task);
        this.verifyReplicaResult(ars, RR.FALSE, RR.TRUE);
        Assert.assertEquals((long)1L, (long)ap.getReplicaCallCount());
    }

    @Test
    public void testReplicaMainFailsBeforeReplicaCalls() throws Exception {
        MyAsyncProcessWithReplicas ap = this.createReplicaAp(1000, 0, 0, 0);
        ap.addFailures(new RegionInfo[]{hri1, hri2});
        List<Get> rows = TestAsyncProcess.makeTimelineGets(DUMMY_BYTES_1, DUMMY_BYTES_2);
        AsyncProcessTask task = AsyncProcessTask.newBuilder().setPool(ap.service).setRpcTimeout(this.RPC_TIMEOUT).setOperationTimeout(this.OPERATION_TIMEOUT).setTableName(DUMMY_TABLE).setRowAccess(rows).setResults(new Object[2]).setSubmittedRows(AsyncProcessTask.SubmittedRows.ALL).build();
        AsyncRequestFuture ars = ap.submit(task);
        this.verifyReplicaResult(ars, RR.FAILED, RR.FAILED);
        Assert.assertEquals((long)0L, (long)ap.getReplicaCallCount());
    }

    @Test
    public void testReplicaReplicaSuccessWithParallelFailures() throws Exception {
        MyAsyncProcessWithReplicas ap = this.createReplicaAp(0, 1000, 1000, 0);
        ap.addFailures(new RegionInfo[]{hri1, hri1r2, hri2});
        List<Get> rows = TestAsyncProcess.makeTimelineGets(DUMMY_BYTES_1, DUMMY_BYTES_2);
        AsyncProcessTask task = AsyncProcessTask.newBuilder().setPool(ap.service).setRpcTimeout(this.RPC_TIMEOUT).setOperationTimeout(this.OPERATION_TIMEOUT).setTableName(DUMMY_TABLE).setRowAccess(rows).setResults(new Object[2]).setSubmittedRows(AsyncProcessTask.SubmittedRows.ALL).build();
        AsyncRequestFuture ars = ap.submit(task);
        this.verifyReplicaResult(ars, RR.TRUE, RR.TRUE);
        Assert.assertEquals((long)2L, (long)ap.getReplicaCallCount());
    }

    @Test
    public void testReplicaAllCallsFailForOneRegion() throws Exception {
        MyAsyncProcessWithReplicas ap = this.createReplicaAp(500, 1000, 0, 0);
        ap.addFailures(new RegionInfo[]{hri1, hri1r1, hri1r2, hri2r1});
        List<Get> rows = TestAsyncProcess.makeTimelineGets(DUMMY_BYTES_1, DUMMY_BYTES_2);
        AsyncProcessTask task = AsyncProcessTask.newBuilder().setPool(ap.service).setRpcTimeout(this.RPC_TIMEOUT).setOperationTimeout(this.OPERATION_TIMEOUT).setTableName(DUMMY_TABLE).setRowAccess(rows).setResults(new Object[2]).setSubmittedRows(AsyncProcessTask.SubmittedRows.ALL).build();
        AsyncRequestFuture ars = ap.submit(task);
        this.verifyReplicaResult(ars, RR.FAILED, RR.FALSE);
        Assert.assertEquals((long)3L, (long)ars.getErrors().getNumExceptions());
        for (int i = 0; i < ars.getErrors().getNumExceptions(); ++i) {
            Assert.assertArrayEquals((byte[])DUMMY_BYTES_1, (byte[])ars.getErrors().getRow(i).getRow());
        }
    }

    private MyAsyncProcessWithReplicas createReplicaAp(int replicaAfterMs, int primaryMs, int replicaMs) throws Exception {
        return this.createReplicaAp(replicaAfterMs, primaryMs, replicaMs, -1);
    }

    private MyAsyncProcessWithReplicas createReplicaAp(int replicaAfterMs, int primaryMs, int replicaMs, int retries) throws Exception {
        Configuration conf = new Configuration();
        conf.setInt("hbase.client.primaryCallTimeout.multiget", replicaAfterMs * 1000);
        if (retries >= 0) {
            conf.setInt("hbase.client.retries.number", retries);
        }
        ClusterConnection conn = this.createHConnectionWithReplicas(new ConnectionConfiguration(conf));
        MyAsyncProcessWithReplicas ap = new MyAsyncProcessWithReplicas(conn, conf);
        ap.setCallDelays(primaryMs, replicaMs);
        return ap;
    }

    private BufferedMutatorParams createBufferedMutatorParams(MyAsyncProcess ap, TableName name) {
        return new BufferedMutatorParams(name).pool(ap.service).rpcTimeout(this.RPC_TIMEOUT).opertationTimeout(this.OPERATION_TIMEOUT);
    }

    private static List<Get> makeTimelineGets(byte[] ... rows) {
        ArrayList<Get> result = new ArrayList<Get>(rows.length);
        for (byte[] row : rows) {
            Get get = new Get(row);
            get.setConsistency(Consistency.TIMELINE);
            result.add(get);
        }
        return result;
    }

    private void verifyResult(AsyncRequestFuture ars, boolean ... expected) throws Exception {
        Object[] actual = ars.getResults();
        Assert.assertEquals((long)expected.length, (long)actual.length);
        for (int i = 0; i < expected.length; ++i) {
            Assert.assertEquals((Object)expected[i], (Object)(!(actual[i] instanceof Throwable) ? 1 : 0));
        }
    }

    private void verifyReplicaResult(AsyncRequestFuture ars, RR ... expecteds) throws Exception {
        Object[] actuals = ars.getResults();
        Assert.assertEquals((long)expecteds.length, (long)actuals.length);
        for (int i = 0; i < expecteds.length; ++i) {
            Object actual = actuals[i];
            RR expected = expecteds[i];
            Assert.assertEquals((String)actual.toString(), (Object)(expected == RR.FAILED ? 1 : 0), (Object)(actual instanceof Throwable));
            if (expected == RR.FAILED || expected == RR.DONT_CARE) continue;
            Assert.assertEquals((Object)(expected == RR.TRUE ? 1 : 0), (Object)((Result)actual).isStale());
        }
    }

    private Put createPut(int regCnt, boolean success) {
        Put p;
        if (!success) {
            p = new Put(FAILS);
        } else {
            switch (regCnt) {
                case 1: {
                    p = new Put(DUMMY_BYTES_1);
                    break;
                }
                case 2: {
                    p = new Put(DUMMY_BYTES_2);
                    break;
                }
                case 3: {
                    p = new Put(DUMMY_BYTES_3);
                    break;
                }
                default: {
                    throw new IllegalArgumentException("unknown " + regCnt);
                }
            }
        }
        p.addColumn(DUMMY_BYTES_1, DUMMY_BYTES_1, DUMMY_BYTES_1);
        return p;
    }

    @Test
    public void testUncheckedException() throws Exception {
        ClusterConnection hc = this.createHConnection();
        MyThreadPoolExecutor myPool = new MyThreadPoolExecutor(1, 20, 60L, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(200));
        AsyncProcessForThrowableCheck ap = new AsyncProcessForThrowableCheck(hc, this.CONF);
        ArrayList<Put> puts = new ArrayList<Put>(1);
        puts.add(this.createPut(1, true));
        AsyncProcessTask task = AsyncProcessTask.newBuilder().setPool((ExecutorService)myPool).setRpcTimeout(this.RPC_TIMEOUT).setOperationTimeout(this.OPERATION_TIMEOUT).setTableName(DUMMY_TABLE).setRowAccess(puts).setSubmittedRows(AsyncProcessTask.SubmittedRows.NORMAL).build();
        ap.submit(task);
        Assert.assertTrue((boolean)puts.isEmpty());
    }

    @Test
    public void testRetryPauseWhenServerOverloadedDueToCQTBE() throws Exception {
        this.testRetryPauseWhenServerIsOverloaded((HBaseServerException)new CallQueueTooBigException());
    }

    @Test
    public void testRetryPauseWhenServerOverloadedDueToCDE() throws Exception {
        this.testRetryPauseWhenServerIsOverloaded((HBaseServerException)new CallDroppedException());
    }

    private void testRetryPauseWhenServerIsOverloaded(HBaseServerException exception) throws IOException {
        Configuration conf = new Configuration(this.CONF);
        long specialPause = 500L;
        boolean retries = true;
        conf.setLong("hbase.client.pause.server.overloaded", 500L);
        conf.setInt("hbase.client.retries.number", 1);
        MyConnectionImpl conn = new MyConnectionImpl(conf);
        AsyncProcessWithFailure ap = new AsyncProcessWithFailure((ClusterConnection)conn, conf, (IOException)exception);
        BufferedMutatorParams bufferParam = this.createBufferedMutatorParams(ap, DUMMY_TABLE);
        BufferedMutatorImpl mutator = new BufferedMutatorImpl((ClusterConnection)conn, bufferParam, (AsyncProcess)ap);
        Assert.assertNotNull((Object)mutator.getAsyncProcess().createServerErrorTracker());
        Put p = this.createPut(1, true);
        mutator.mutate((Mutation)p);
        long startTime = EnvironmentEdgeManager.currentTime();
        try {
            mutator.flush();
            Assert.fail();
        }
        catch (RetriesExhaustedWithDetailsException expected) {
            Assert.assertEquals((long)1L, (long)expected.getNumExceptions());
            Assert.assertTrue((expected.getRow(0) == p ? 1 : 0) != 0);
        }
        long actualSleep = EnvironmentEdgeManager.currentTime() - startTime;
        long expectedSleep = 0L;
        for (int i = 0; i < 1; ++i) {
            expectedSleep += ConnectionUtils.getPauseTime((long)500L, (int)i);
            actualSleep += 5L;
        }
        LOG.debug("Expected to sleep " + expectedSleep + "ms, actually slept " + actualSleep + "ms");
        Assert.assertTrue((String)("Expected to sleep " + expectedSleep + " but actually " + actualSleep + "ms"), (actualSleep >= expectedSleep ? 1 : 0) != 0);
        long normalPause = conf.getLong("hbase.client.pause", 100L);
        ap = new AsyncProcessWithFailure((ClusterConnection)conn, conf, new IOException());
        bufferParam = this.createBufferedMutatorParams(ap, DUMMY_TABLE);
        mutator = new BufferedMutatorImpl((ClusterConnection)conn, bufferParam, (AsyncProcess)ap);
        Assert.assertNotNull((Object)mutator.getAsyncProcess().createServerErrorTracker());
        mutator.mutate((Mutation)p);
        startTime = EnvironmentEdgeManager.currentTime();
        try {
            mutator.flush();
            Assert.fail();
        }
        catch (RetriesExhaustedWithDetailsException expected) {
            Assert.assertEquals((long)1L, (long)expected.getNumExceptions());
            Assert.assertTrue((expected.getRow(0) == p ? 1 : 0) != 0);
        }
        actualSleep = EnvironmentEdgeManager.currentTime() - startTime;
        expectedSleep = 0L;
        for (int i = 0; i < 1; ++i) {
            expectedSleep += ConnectionUtils.getPauseTime((long)normalPause, (int)i);
        }
        LOG.debug("Expected to sleep " + (expectedSleep += normalPause) + "ms, actually slept " + actualSleep + "ms");
        Assert.assertTrue((String)("Slept for too long: " + actualSleep + "ms"), (actualSleep <= expectedSleep ? 1 : 0) != 0);
    }

    @Test
    public void testRetryWithExceptionClearsMetaCacheUsingRegionException() throws Exception {
        this.testRetryWithExceptionClearsMetaCache(true);
    }

    @Test
    public void testRetryWithExceptionClearsMetaCacheUsingServerException() throws Exception {
        this.testRetryWithExceptionClearsMetaCache(false);
    }

    private void testRetryWithExceptionClearsMetaCache(boolean useRegionException) throws IOException {
        Configuration myConf = new Configuration(this.CONF);
        myConf.setInt("hbase.client.retries.number", 1);
        ClusterConnection conn = this.createHConnection(new ConnectionConfiguration(myConf));
        AsyncProcessWithFailure ap = new AsyncProcessWithFailure(conn, myConf, (IOException)new RegionOpeningException("test"), loc1.getServerName(), useRegionException);
        BufferedMutatorParams bufferParam = this.createBufferedMutatorParams(ap, DUMMY_TABLE);
        BufferedMutatorImpl mutator = new BufferedMutatorImpl(conn, bufferParam, (AsyncProcess)ap);
        Assert.assertNotNull((Object)mutator.getAsyncProcess().createServerErrorTracker());
        Assert.assertEquals((Object)conn.locateRegion(DUMMY_TABLE, DUMMY_BYTES_1, true, true).toString(), (Object)new RegionLocations(new HRegionLocation[]{loc1}).toString());
        ((ClusterConnection)Mockito.doAnswer(invocation -> {
            TestAsyncProcess.setMockLocation(conn, DUMMY_BYTES_1, new RegionLocations(new HRegionLocation[]{loc3}));
            return null;
        }).when((Object)conn)).updateCachedLocations((TableName)Mockito.eq((Object)DUMMY_TABLE), (byte[])Mockito.eq((Object)loc1.getRegion().getRegionName()), (byte[])Mockito.eq((Object)DUMMY_BYTES_1), Mockito.any(), (ServerName)Mockito.eq((Object)loc1.getServerName()));
        ((ClusterConnection)Mockito.verify((Object)conn, (VerificationMode)Mockito.times((int)0))).updateCachedLocations((TableName)Mockito.any(), (byte[])Mockito.any(), (byte[])Mockito.any(), Mockito.any(), (ServerName)Mockito.any());
        Put p = this.createPut(1, true);
        mutator.mutate((Mutation)p);
        mutator.flush();
        Assert.assertEquals((Object)conn.locateRegion(DUMMY_TABLE, DUMMY_BYTES_1, true, true).toString(), (Object)new RegionLocations(new HRegionLocation[]{loc3}).toString());
        ((ClusterConnection)Mockito.verify((Object)conn, (VerificationMode)Mockito.atLeastOnce())).updateCachedLocations((TableName)Mockito.eq((Object)DUMMY_TABLE), (byte[])Mockito.eq((Object)loc1.getRegion().getRegionName()), (byte[])Mockito.eq((Object)DUMMY_BYTES_1), Mockito.any(), (ServerName)Mockito.eq((Object)loc1.getServerName()));
    }

    @Test
    public void testQueueRowAccess() throws Exception {
        ClusterConnection conn = this.createHConnection();
        BufferedMutatorImpl mutator = new BufferedMutatorImpl(conn, null, null, new BufferedMutatorParams(DUMMY_TABLE).writeBufferSize(100000L));
        Put p0 = new Put(DUMMY_BYTES_1).addColumn(DUMMY_BYTES_1, DUMMY_BYTES_1, DUMMY_BYTES_1);
        Put p1 = new Put(DUMMY_BYTES_2).addColumn(DUMMY_BYTES_2, DUMMY_BYTES_2, DUMMY_BYTES_2);
        mutator.mutate((Mutation)p0);
        BufferedMutatorImpl.QueueRowAccess ra0 = mutator.createQueueRowAccess();
        Assert.assertEquals((long)0L, (long)mutator.size());
        mutator.mutate((Mutation)p1);
        Assert.assertEquals((long)1L, (long)mutator.size());
        BufferedMutatorImpl.QueueRowAccess ra1 = mutator.createQueueRowAccess();
        Assert.assertEquals((long)0L, (long)mutator.size());
        Assert.assertEquals((long)1L, (long)ra0.size());
        Assert.assertEquals((long)1L, (long)ra1.size());
        Iterator iter0 = ra0.iterator();
        Iterator iter1 = ra1.iterator();
        Assert.assertTrue((boolean)iter0.hasNext());
        Assert.assertTrue((boolean)iter1.hasNext());
        Assert.assertTrue((iter0.next() == p0 ? 1 : 0) != 0);
        Assert.assertEquals((long)1L, (long)mutator.getUnflushedSize());
        Assert.assertEquals((long)p1.heapSize(), (long)mutator.getCurrentWriteBufferSize());
        Assert.assertTrue((iter1.next() == p1 ? 1 : 0) != 0);
        Assert.assertEquals((long)0L, (long)mutator.getUnflushedSize());
        Assert.assertEquals((long)0L, (long)mutator.getCurrentWriteBufferSize());
        Assert.assertFalse((boolean)iter0.hasNext());
        Assert.assertFalse((boolean)iter1.hasNext());
        iter0.remove();
        ra0.close();
        Assert.assertEquals((long)0L, (long)mutator.size());
        Assert.assertEquals((long)0L, (long)mutator.getUnflushedSize());
        Assert.assertEquals((long)0L, (long)mutator.getCurrentWriteBufferSize());
        ra1.close();
        Assert.assertEquals((long)1L, (long)mutator.size());
        Assert.assertEquals((long)1L, (long)mutator.getUnflushedSize());
        Assert.assertEquals((long)p1.heapSize(), (long)mutator.getCurrentWriteBufferSize());
    }

    static class AsyncProcessForThrowableCheck
    extends AsyncProcess {
        public AsyncProcessForThrowableCheck(ClusterConnection hc, Configuration conf) {
            super(hc, conf, new RpcRetryingCallerFactory(conf, hc.getConnectionConfiguration()), new RpcControllerFactory(conf));
        }
    }

    static class MyThreadPoolExecutor
    extends ThreadPoolExecutor {
        public MyThreadPoolExecutor(int coreThreads, int maxThreads, long keepAliveTime, TimeUnit timeunit, BlockingQueue<Runnable> blockingqueue) {
            super(coreThreads, maxThreads, keepAliveTime, timeunit, blockingqueue);
        }

        public Future submit(Runnable runnable) {
            throw new OutOfMemoryError("OutOfMemory error thrown by means");
        }
    }

    private static enum RR {
        TRUE,
        FALSE,
        DONT_CARE,
        FAILED;

    }

    static class MyConnectionImpl2
    extends MyConnectionImpl {
        List<HRegionLocation> hrl;
        final boolean[] usedRegions;

        protected MyConnectionImpl2(List<HRegionLocation> hrl, Configuration conf) throws IOException {
            super(conf);
            this.hrl = hrl;
            this.usedRegions = new boolean[hrl.size()];
        }

        @Override
        public RegionLocations locateRegion(TableName tableName, byte[] row, boolean useCache, boolean retry, int replicaId) throws IOException {
            int i = 0;
            for (HRegionLocation hr : this.hrl) {
                if (Arrays.equals(row, hr.getRegionInfo().getStartKey())) {
                    this.usedRegions[i] = true;
                    return new RegionLocations(new HRegionLocation[]{hr});
                }
                ++i;
            }
            return null;
        }
    }

    static class MyConnectionImpl
    extends ConnectionImplementation {
        final AtomicInteger nbThreads = new AtomicInteger(0);

        protected MyConnectionImpl(Configuration conf) throws IOException {
            super(MyConnectionImpl.setupConf(conf), null, null);
        }

        private static Configuration setupConf(Configuration conf) {
            conf.setClass("hbase.client.registry.impl", TestRegistry.class, ConnectionRegistry.class);
            return conf;
        }

        public RegionLocations locateRegion(TableName tableName, byte[] row, boolean useCache, boolean retry, int replicaId) throws IOException {
            return new RegionLocations(new HRegionLocation[]{loc1});
        }

        public boolean hasCellBlockSupport() {
            return false;
        }

        public static class TestRegistry
        extends DoNothingConnectionRegistry {
            public TestRegistry(Configuration conf) {
                super(conf);
            }

            @Override
            public CompletableFuture<String> getClusterId() {
                return CompletableFuture.completedFuture("testClusterId");
            }
        }
    }

    private static interface ResponseGenerator {
        public void addResponse(MultiResponse var1, byte[] var2, Action var3);
    }

    static class MyAsyncProcessWithReplicas
    extends MyAsyncProcess {
        private Set<byte[]> failures = new TreeSet<byte[]>((Comparator<byte[]>)new Bytes.ByteArrayComparator());
        private long primarySleepMs = 0L;
        private long replicaSleepMs = 0L;
        private Map<ServerName, Long> customPrimarySleepMs = new HashMap<ServerName, Long>();
        private final AtomicLong replicaCalls = new AtomicLong(0L);

        public void addFailures(RegionInfo ... hris) {
            for (RegionInfo hri : hris) {
                this.failures.add(hri.getRegionName());
            }
        }

        public long getReplicaCallCount() {
            return this.replicaCalls.get();
        }

        public void setPrimaryCallDelay(ServerName server, long primaryMs) {
            this.customPrimarySleepMs.put(server, primaryMs);
        }

        public MyAsyncProcessWithReplicas(ClusterConnection hc, Configuration conf) {
            super(hc, conf);
        }

        public void setCallDelays(long primaryMs, long replicaMs) {
            this.primarySleepMs = primaryMs;
            this.replicaSleepMs = replicaMs;
        }

        @Override
        protected RpcRetryingCaller<AbstractResponse> createCaller(CancellableRegionServerCallable payloadCallable, int rpcTimeout) {
            MultiServerCallable callable = (MultiServerCallable)payloadCallable;
            final MultiResponse mr = TestAsyncProcess.createMultiResponse(callable.getMulti(), this.nbMultiResponse, this.nbActions, new ResponseGenerator(){

                @Override
                public void addResponse(MultiResponse mr, byte[] regionName, Action a) {
                    if (failures.contains(regionName)) {
                        mr.add(regionName, a.getOriginalIndex(), (Object)failure);
                    } else {
                        boolean isStale = !RegionReplicaUtil.isDefaultReplica((int)a.getReplicaId());
                        mr.add(regionName, a.getOriginalIndex(), (Object)Result.create((Cell[])new Cell[0], null, (boolean)isStale));
                    }
                }
            });
            final boolean isDefault = RegionReplicaUtil.isDefaultReplica((int)((Action)((List)callable.getMulti().actions.values().iterator().next()).iterator().next()).getReplicaId());
            final ServerName server = callable.getServerName();
            String debugMsg = "Call to " + server + ", primary=" + isDefault + " with " + callable.getMulti().actions.size() + " entries: ";
            for (byte[] region : callable.getMulti().actions.keySet()) {
                debugMsg = debugMsg + "[" + Bytes.toStringBinary((byte[])region) + "], ";
            }
            LOG.debug(debugMsg);
            if (!isDefault) {
                this.replicaCalls.incrementAndGet();
            }
            return new RpcRetryingCallerImpl<AbstractResponse>(100L, 500L, 10, RetryingCallerInterceptorFactory.NO_OP_INTERCEPTOR, 9, 0, null){

                public MultiResponse callWithoutRetries(RetryingCallable<AbstractResponse> callable, int callTimeout) throws IOException, RuntimeException {
                    Long customSleep;
                    long sleep = -1L;
                    sleep = isDefault ? ((customSleep = (Long)customPrimarySleepMs.get(server)) == null ? primarySleepMs : customSleep) : replicaSleepMs;
                    if (sleep != 0L) {
                        try {
                            Thread.sleep(sleep);
                        }
                        catch (InterruptedException e) {
                            Thread.currentThread().interrupt();
                        }
                    }
                    return mr;
                }
            };
        }
    }

    static class MyClientBackoffPolicy
    implements ClientBackoffPolicy {
        private final Map<ServerName, AtomicInteger> count = new HashMap<ServerName, AtomicInteger>();

        MyClientBackoffPolicy() {
        }

        public long getBackoffTime(ServerName serverName, byte[] region, ServerStatistics stats) {
            AtomicInteger inc = this.count.get(serverName);
            if (inc == null) {
                inc = new AtomicInteger(0);
                this.count.put(serverName, inc);
            }
            return inc.getAndIncrement();
        }
    }

    static class AsyncProcessWithFailure
    extends MyAsyncProcess {
        private final IOException ioe;
        private final ServerName failingServer;
        private final boolean returnAsRegionException;

        public AsyncProcessWithFailure(ClusterConnection hc, Configuration myConf, IOException ioe, ServerName failingServer, boolean returnAsRegionException) {
            super(hc, myConf);
            this.ioe = ioe;
            this.failingServer = failingServer;
            this.returnAsRegionException = returnAsRegionException;
            this.serverTrackerTimeout = 1L;
        }

        public AsyncProcessWithFailure(ClusterConnection hc, Configuration myConf, IOException ioe) {
            this(hc, myConf, ioe, null, false);
        }

        @Override
        protected RpcRetryingCaller<AbstractResponse> createCaller(CancellableRegionServerCallable callable, int rpcTimeout) {
            MultiServerCallable msc = (MultiServerCallable)callable;
            if (this.failingServer != null && !msc.getServerName().equals((Object)this.failingServer)) {
                return super.createCaller(callable, rpcTimeout);
            }
            if (this.returnAsRegionException) {
                return new CallerWithRegionException(this.ioe, msc.getMulti());
            }
            this.callsCt.incrementAndGet();
            return new CallerWithFailure(this.ioe);
        }
    }

    static class CallerWithRegionException
    extends RpcRetryingCallerImpl<AbstractResponse> {
        private final IOException e;
        private MultiAction multi;

        public CallerWithRegionException(IOException e, MultiAction multi) {
            super(100L, 500L, 100, RetryingCallerInterceptorFactory.NO_OP_INTERCEPTOR, 9, 0, null);
            this.e = e;
            this.multi = multi;
        }

        public AbstractResponse callWithoutRetries(RetryingCallable<AbstractResponse> callable, int callTimeout) throws IOException, RuntimeException {
            MultiResponse response = new MultiResponse();
            for (Map.Entry entry : this.multi.actions.entrySet()) {
                response.addException((byte[])entry.getKey(), (Throwable)this.e);
            }
            return response;
        }
    }

    static class CallerWithFailure
    extends RpcRetryingCallerImpl<AbstractResponse> {
        private final IOException e;

        public CallerWithFailure(IOException e) {
            super(100L, 500L, 100, RetryingCallerInterceptorFactory.NO_OP_INTERCEPTOR, 9, 0, null);
            this.e = e;
        }

        public AbstractResponse callWithoutRetries(RetryingCallable<AbstractResponse> callable, int callTimeout) throws IOException, RuntimeException {
            throw this.e;
        }
    }

    static class MyAsyncRequestFutureImpl<Res>
    extends AsyncRequestFutureImpl<Res> {
        private final Map<ServerName, List<Long>> heapSizesByServer = new HashMap<ServerName, List<Long>>();

        public MyAsyncRequestFutureImpl(AsyncProcessTask task, List<Action> actions, long nonceGroup, AsyncProcess asyncProcess) {
            super(task, actions, nonceGroup, asyncProcess);
        }

        protected void updateStats(ServerName server, MultiResponse resp) {
        }

        Map<ServerName, List<Long>> getRequestHeapSize() {
            return this.heapSizesByServer;
        }

        AsyncRequestFutureImpl.SingleServerRequestRunnable createSingleServerRequest(MultiAction multiAction, int numAttempt, ServerName server, Set<CancellableRegionServerCallable> callsInProgress) {
            AsyncRequestFutureImpl.SingleServerRequestRunnable rq = new AsyncRequestFutureImpl.SingleServerRequestRunnable((AsyncRequestFutureImpl)this, multiAction, numAttempt, server, callsInProgress);
            List<Long> heapCount = this.heapSizesByServer.get(server);
            if (heapCount == null) {
                heapCount = new ArrayList<Long>();
                this.heapSizesByServer.put(server, heapCount);
            }
            heapCount.add(this.heapSizeOf(multiAction));
            return rq;
        }

        private long heapSizeOf(MultiAction multiAction) {
            return multiAction.actions.values().stream().flatMap(v -> v.stream()).map(action -> action.getAction()).filter(row -> row instanceof Mutation).mapToLong(row -> ((Mutation)row).heapSize()).sum();
        }
    }

    static class MyAsyncProcess
    extends AsyncProcess {
        final AtomicInteger nbMultiResponse = new AtomicInteger();
        final AtomicInteger nbActions = new AtomicInteger();
        public List<AsyncRequestFuture> allReqs = new ArrayList<AsyncRequestFuture>();
        public AtomicInteger callsCt = new AtomicInteger();
        private Configuration conf;
        private long previousTimeout = -1L;
        final ExecutorService service;

        protected <Res> AsyncRequestFutureImpl<Res> createAsyncRequestFuture(AsyncProcessTask task, List<Action> actions, long nonceGroup) {
            AsyncProcessTask wrap = new AsyncProcessTask(task){

                public TableName getTableName() {
                    return DUMMY_TABLE;
                }
            };
            MyAsyncRequestFutureImpl r = new MyAsyncRequestFutureImpl(wrap, actions, nonceGroup, this);
            this.allReqs.add((AsyncRequestFuture)r);
            return r;
        }

        public MyAsyncProcess(ClusterConnection hc, Configuration conf) {
            super(hc, conf, new RpcRetryingCallerFactory(conf, hc.getConnectionConfiguration()), new RpcControllerFactory(conf));
            this.service = Executors.newFixedThreadPool(5);
            this.conf = conf;
        }

        public MyAsyncProcess(ClusterConnection hc, Configuration conf, AtomicInteger nbThreads) {
            super(hc, conf, new RpcRetryingCallerFactory(conf, hc.getConnectionConfiguration()), new RpcControllerFactory(conf));
            this.service = new ThreadPoolExecutor(1, 20, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), new CountingThreadFactory(nbThreads));
        }

        public <CResult> AsyncRequestFuture submit(ExecutorService pool, TableName tableName, List<? extends Row> rows, boolean atLeastOne, Batch.Callback<CResult> callback, boolean needResults) throws InterruptedIOException {
            AsyncProcessTask task = AsyncProcessTask.newBuilder(callback).setPool(pool == null ? this.service : pool).setTableName(tableName).setRowAccess(rows).setSubmittedRows(atLeastOne ? AsyncProcessTask.SubmittedRows.AT_LEAST_ONE : AsyncProcessTask.SubmittedRows.NORMAL).setNeedResults(needResults).setRpcTimeout(this.conf.getInt("hbase.rpc.read.timeout", 60000)).setOperationTimeout(this.conf.getInt("hbase.client.operation.timeout", 1200000)).build();
            return this.submit(task);
        }

        public <CResult> AsyncRequestFuture submit(TableName tableName, List<? extends Row> rows, boolean atLeastOne, Batch.Callback<CResult> callback, boolean needResults) throws InterruptedIOException {
            return this.submit(null, tableName, rows, atLeastOne, callback, needResults);
        }

        public <Res> AsyncRequestFuture submit(AsyncProcessTask<Res> task) throws InterruptedIOException {
            this.previousTimeout = task.getRpcTimeout();
            AsyncProcessTask wrap = new AsyncProcessTask<Res>(task){

                public boolean getNeedResults() {
                    return true;
                }
            };
            return super.submit(wrap);
        }

        protected RpcRetryingCaller<AbstractResponse> createCaller(CancellableRegionServerCallable callable, int rpcTimeout) {
            this.callsCt.incrementAndGet();
            MultiServerCallable callable1 = (MultiServerCallable)callable;
            final MultiResponse mr = TestAsyncProcess.createMultiResponse(callable1.getMulti(), this.nbMultiResponse, this.nbActions, new ResponseGenerator(){

                @Override
                public void addResponse(MultiResponse mr, byte[] regionName, Action a) {
                    if (Arrays.equals(FAILS, a.getAction().getRow())) {
                        mr.add(regionName, a.getOriginalIndex(), (Object)failure);
                    } else {
                        mr.add(regionName, a.getOriginalIndex(), (Object)TestAsyncProcess.success);
                    }
                }
            });
            return new RpcRetryingCallerImpl<AbstractResponse>(100L, 500L, 10, RetryingCallerInterceptorFactory.NO_OP_INTERCEPTOR, 9, 0, null){

                public AbstractResponse callWithoutRetries(RetryingCallable<AbstractResponse> callable, int callTimeout) throws IOException, RuntimeException {
                    try {
                        Thread.sleep(1000L);
                    }
                    catch (InterruptedException interruptedException) {
                        // empty catch block
                    }
                    return mr;
                }
            };
        }
    }

    static class CountingThreadFactory
    implements ThreadFactory {
        final AtomicInteger nbThreads;
        ThreadFactory realFactory = new ThreadFactoryBuilder().setNameFormat("test-TestAsyncProcess-pool-%d").setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER).build();

        @Override
        public Thread newThread(Runnable r) {
            this.nbThreads.incrementAndGet();
            return this.realFactory.newThread(r);
        }

        CountingThreadFactory(AtomicInteger nbThreads) {
            this.nbThreads = nbThreads;
        }
    }
}

