/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.hbase.client;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.ForkJoinPool;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.Waiter;
import org.apache.hadoop.hbase.client.AsyncConnection;
import org.apache.hadoop.hbase.client.AsyncTable;
import org.apache.hadoop.hbase.client.AsyncTableResultScanner;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.testclassification.ClientTests;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.experimental.categories.Category;

@Category(value={MediumTests.class, ClientTests.class})
public class TestAsyncTableScannerCloseWhileSuspending {
    private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
    private static TableName TABLE_NAME = TableName.valueOf((String)"async");
    private static byte[] FAMILY = Bytes.toBytes((String)"cf");
    private static byte[] CQ = Bytes.toBytes((String)"cq");
    private static AsyncConnection CONN;
    private static AsyncTable TABLE;

    @BeforeClass
    public static void setUp() throws Exception {
        TEST_UTIL.startMiniCluster(1);
        TEST_UTIL.createTable(TABLE_NAME, FAMILY);
        CONN = (AsyncConnection)ConnectionFactory.createAsyncConnection((Configuration)TEST_UTIL.getConfiguration()).get();
        TABLE = CONN.getTable(TABLE_NAME, (ExecutorService)ForkJoinPool.commonPool());
        TABLE.putAll(IntStream.range(0, 100).mapToObj(i -> new Put(Bytes.toBytes((String)String.format("%02d", i))).addColumn(FAMILY, CQ, Bytes.toBytes((int)i))).collect(Collectors.toList())).get();
    }

    @AfterClass
    public static void tearDown() throws Exception {
        CONN.close();
        TEST_UTIL.shutdownMiniCluster();
    }

    private int getScannersCount() {
        return TEST_UTIL.getHBaseCluster().getRegionServerThreads().stream().map(t -> t.getRegionServer()).mapToInt(rs -> rs.getRSRpcServices().getScannersCount()).sum();
    }

    @Test
    public void testCloseScannerWhileSuspending() throws Exception {
        try (final ResultScanner scanner = TABLE.getScanner(new Scan().setMaxResultSize(1L));){
            TEST_UTIL.waitFor(10000L, 100L, new Waiter.ExplainingPredicate<Exception>(){

                public boolean evaluate() throws Exception {
                    return ((AsyncTableResultScanner)scanner).isSuspended();
                }

                public String explainFailure() throws Exception {
                    return "The given scanner has been suspended in time";
                }
            });
            Assert.assertEquals((long)1L, (long)this.getScannersCount());
        }
        TEST_UTIL.waitFor(10000L, 100L, new Waiter.ExplainingPredicate<Exception>(){

            public boolean evaluate() throws Exception {
                return TestAsyncTableScannerCloseWhileSuspending.this.getScannersCount() == 0;
            }

            public String explainFailure() throws Exception {
                return "Still have " + TestAsyncTableScannerCloseWhileSuspending.this.getScannersCount() + " scanners opened";
            }
        });
    }
}

