/*
 * Decompiled with CFR 0.152.
 */
package org.apache.accumulo.test;

import com.google.common.collect.Iterables;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.SortedSet;
import java.util.TreeSet;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import org.apache.accumulo.core.client.Accumulo;
import org.apache.accumulo.core.client.AccumuloClient;
import org.apache.accumulo.core.client.BatchScanner;
import org.apache.accumulo.core.client.Scanner;
import org.apache.accumulo.core.client.ScannerBase;
import org.apache.accumulo.core.client.TableNotFoundException;
import org.apache.accumulo.core.client.admin.NewTableConfiguration;
import org.apache.accumulo.core.conf.Property;
import org.apache.accumulo.core.data.Range;
import org.apache.accumulo.core.fate.zookeeper.ZooReaderWriter;
import org.apache.accumulo.core.security.Authorizations;
import org.apache.accumulo.harness.MiniClusterConfigurationCallback;
import org.apache.accumulo.harness.SharedMiniClusterBase;
import org.apache.accumulo.minicluster.ServerType;
import org.apache.accumulo.miniclusterImpl.MiniAccumuloConfigImpl;
import org.apache.accumulo.test.ScanServerIT;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.Text;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Tag(value="MiniClusterOnly")
public class ScanServerMultipleScansIT
extends SharedMiniClusterBase {
    private static final Logger log = LoggerFactory.getLogger(ScanServerMultipleScansIT.class);
    private static final int NUM_SCANS = 4;
    private ExecutorService executor;

    @BeforeAll
    public static void start() throws Exception {
        ScanServerITConfiguration c = new ScanServerITConfiguration();
        SharedMiniClusterBase.startMiniClusterWithConfig(c);
        SharedMiniClusterBase.getCluster().getClusterControl().start(ServerType.SCAN_SERVER, "localhost");
        String zooRoot = ScanServerMultipleScansIT.getCluster().getServerContext().getZooKeeperRoot();
        ZooReaderWriter zrw = ScanServerMultipleScansIT.getCluster().getServerContext().getZooReaderWriter();
        String scanServerRoot = zooRoot + "/sservers";
        while (zrw.getChildren(scanServerRoot).size() == 0) {
            Thread.sleep(500L);
        }
    }

    @AfterAll
    public static void stop() throws Exception {
        SharedMiniClusterBase.stopMiniCluster();
    }

    @BeforeEach
    public void before() throws Exception {
        this.executor = Executors.newCachedThreadPool();
    }

    @AfterEach
    public void after() throws Exception {
        this.executor.shutdown();
    }

    @Test
    public void testMultipleScansSameTablet() throws Exception {
        try (AccumuloClient client = (AccumuloClient)Accumulo.newClient().from(ScanServerMultipleScansIT.getClientProps()).build();){
            String tableName = this.getUniqueNames(1)[0];
            int ingestedEntryCount = ScanServerIT.createTableAndIngest(client, tableName, null, 10, 10, "colf");
            CountDownLatch latch = new CountDownLatch(1);
            ArrayList<Future> futures = new ArrayList<Future>(4);
            for (int i = 0; i < 4; ++i) {
                Future future = this.executor.submit(() -> {
                    try {
                        latch.await();
                    }
                    catch (InterruptedException e1) {
                        Assertions.fail((String)"InterruptedException waiting for latch");
                    }
                    try (Scanner scanner = client.createScanner(tableName, Authorizations.EMPTY);){
                        scanner.setRange(new Range());
                        scanner.setConsistencyLevel(ScannerBase.ConsistencyLevel.EVENTUAL);
                        Assertions.assertEquals((int)ingestedEntryCount, (int)Iterables.size((Iterable)scanner));
                    }
                    catch (TableNotFoundException e) {
                        Assertions.fail((String)"Table not found");
                    }
                });
                futures.add(future);
            }
            latch.countDown();
            for (Future future : futures) {
                future.get();
            }
        }
    }

    @Test
    public void testSingleScanDifferentTablets() throws Exception {
        try (AccumuloClient client = (AccumuloClient)Accumulo.newClient().from(ScanServerMultipleScansIT.getClientProps()).build();){
            String tableName = this.getUniqueNames(1)[0];
            SortedSet<Text> splitPoints = this.getSplits("row_0000000002\\0", "row_0000000005\\0", "row_0000000008\\0");
            NewTableConfiguration ntc = new NewTableConfiguration().withSplits(splitPoints);
            int ingestedEntryCount = ScanServerIT.createTableAndIngest(client, tableName, ntc, 10, 10, "colf");
            try (Scanner scanner = client.createScanner(tableName, Authorizations.EMPTY);){
                scanner.setRange(new Range());
                scanner.setConsistencyLevel(ScannerBase.ConsistencyLevel.EVENTUAL);
                Assertions.assertEquals((int)ingestedEntryCount, (int)Iterables.size((Iterable)scanner));
            }
        }
    }

    @Test
    public void testMultipleScansDifferentTablets() throws Exception {
        try (AccumuloClient client = (AccumuloClient)Accumulo.newClient().from(ScanServerMultipleScansIT.getClientProps()).build();){
            String tableName = this.getUniqueNames(1)[0];
            SortedSet<Text> splitPoints = this.getSplits("row_0000000002\\0", "row_0000000005\\0", "row_0000000008\\0");
            NewTableConfiguration ntc = new NewTableConfiguration().withSplits(splitPoints);
            int ingestedEntryCount = ScanServerIT.createTableAndIngest(client, tableName, ntc, 10, 10, "colf");
            Collection splitsFound = client.tableOperations().listSplits(tableName);
            Assertions.assertEquals(splitPoints, new TreeSet(splitsFound));
            log.debug("Splits found: {}", (Object)splitsFound);
            CountDownLatch latch = new CountDownLatch(1);
            AtomicInteger counter = new AtomicInteger(0);
            ArrayList futures = new ArrayList(4);
            int i = 0;
            while (i < 4) {
                int n = i++;
                Future<?> future = this.executor.submit(() -> {
                    try {
                        latch.await();
                    }
                    catch (InterruptedException e1) {
                        Assertions.fail((String)"InterruptedException waiting for latch");
                    }
                    try (Scanner scanner = client.createScanner(tableName, Authorizations.EMPTY);){
                        switch (threadNum) {
                            case 0: {
                                scanner.setRange(new Range((CharSequence)"row_0000000000", (CharSequence)"row_0000000002"));
                                break;
                            }
                            case 1: {
                                scanner.setRange(new Range((CharSequence)"row_0000000003", (CharSequence)"row_0000000005"));
                                break;
                            }
                            case 2: {
                                scanner.setRange(new Range((CharSequence)"row_0000000006", (CharSequence)"row_0000000008"));
                                break;
                            }
                            case 3: {
                                scanner.setRange(new Range((CharSequence)"row_0000000009"));
                                break;
                            }
                            default: {
                                Assertions.fail((String)"Invalid threadNum");
                            }
                        }
                        scanner.setConsistencyLevel(ScannerBase.ConsistencyLevel.EVENTUAL);
                        counter.addAndGet(Iterables.size((Iterable)scanner));
                    }
                    catch (TableNotFoundException e) {
                        Assertions.fail((String)"Table not found");
                    }
                });
                futures.add(future);
            }
            latch.countDown();
            for (Future future : futures) {
                future.get();
            }
            Assertions.assertEquals((int)ingestedEntryCount, (int)counter.get());
        }
    }

    @Test
    public void testMultipleBatchScansSameTablet() throws Exception {
        try (AccumuloClient client = (AccumuloClient)Accumulo.newClient().from(ScanServerMultipleScansIT.getClientProps()).build();){
            String tableName = this.getUniqueNames(1)[0];
            int ingestedEntryCount = ScanServerIT.createTableAndIngest(client, tableName, null, 10, 10, "colf");
            CountDownLatch latch = new CountDownLatch(1);
            ArrayList<Future> futures = new ArrayList<Future>(4);
            for (int i = 0; i < 4; ++i) {
                Future future = this.executor.submit(() -> {
                    try {
                        latch.await();
                    }
                    catch (InterruptedException e1) {
                        Assertions.fail((String)"InterruptedException waiting for latch");
                    }
                    try (BatchScanner scanner = client.createBatchScanner(tableName, Authorizations.EMPTY);){
                        scanner.setRanges(Collections.singletonList(new Range()));
                        scanner.setConsistencyLevel(ScannerBase.ConsistencyLevel.EVENTUAL);
                        Assertions.assertEquals((int)ingestedEntryCount, (int)Iterables.size((Iterable)scanner));
                    }
                    catch (TableNotFoundException e) {
                        Assertions.fail((String)"Table not found");
                    }
                });
                futures.add(future);
            }
            latch.countDown();
            for (Future future : futures) {
                future.get();
            }
        }
    }

    @Test
    public void testSingleBatchScanDifferentTablets() throws Exception {
        try (AccumuloClient client = (AccumuloClient)Accumulo.newClient().from(ScanServerMultipleScansIT.getClientProps()).build();){
            String tableName = this.getUniqueNames(1)[0];
            SortedSet<Text> splitPoints = this.getSplits("row_0000000002\\0", "row_0000000005\\0", "row_0000000008\\0");
            NewTableConfiguration ntc = new NewTableConfiguration().withSplits(splitPoints);
            int ingestedEntryCount = ScanServerIT.createTableAndIngest(client, tableName, ntc, 10, 10, "colf");
            try (BatchScanner scanner = client.createBatchScanner(tableName, Authorizations.EMPTY, 4);){
                scanner.setRanges(Collections.singletonList(new Range()));
                scanner.setConsistencyLevel(ScannerBase.ConsistencyLevel.EVENTUAL);
                Assertions.assertEquals((int)ingestedEntryCount, (int)Iterables.size((Iterable)scanner));
            }
        }
    }

    @Test
    public void testMultipleBatchScansDifferentTablets() throws Exception {
        try (AccumuloClient client = (AccumuloClient)Accumulo.newClient().from(ScanServerMultipleScansIT.getClientProps()).build();){
            String tableName = this.getUniqueNames(1)[0];
            SortedSet<Text> splitPoints = this.getSplits("row_0000000002\\0", "row_0000000005\\0", "row_0000000008\\0");
            NewTableConfiguration ntc = new NewTableConfiguration().withSplits(splitPoints);
            int ingestedEntryCount = ScanServerIT.createTableAndIngest(client, tableName, ntc, 10, 10, "colf");
            Collection splitsFound = client.tableOperations().listSplits(tableName);
            Assertions.assertEquals(splitPoints, new TreeSet(splitsFound));
            log.debug("Splits found: {}", (Object)splitsFound);
            CountDownLatch latch = new CountDownLatch(1);
            AtomicInteger counter = new AtomicInteger(0);
            ArrayList futures = new ArrayList(4);
            int i = 0;
            while (i < 4) {
                int n = i++;
                Future<?> future = this.executor.submit(() -> {
                    try {
                        latch.await();
                    }
                    catch (InterruptedException e1) {
                        Assertions.fail((String)"InterruptedException waiting for latch");
                    }
                    try (BatchScanner scanner = client.createBatchScanner(tableName, Authorizations.EMPTY);){
                        switch (threadNum) {
                            case 0: {
                                scanner.setRanges(Collections.singletonList(new Range((CharSequence)"row_0000000000", (CharSequence)"row_0000000002")));
                                break;
                            }
                            case 1: {
                                scanner.setRanges(Collections.singletonList(new Range((CharSequence)"row_0000000003", (CharSequence)"row_0000000005")));
                                break;
                            }
                            case 2: {
                                scanner.setRanges(Collections.singletonList(new Range((CharSequence)"row_0000000006", (CharSequence)"row_0000000008")));
                                break;
                            }
                            case 3: {
                                scanner.setRanges(Collections.singletonList(new Range((CharSequence)"row_0000000009")));
                                break;
                            }
                            default: {
                                Assertions.fail((String)"Invalid threadNum");
                            }
                        }
                        scanner.setConsistencyLevel(ScannerBase.ConsistencyLevel.EVENTUAL);
                        counter.addAndGet(Iterables.size((Iterable)scanner));
                    }
                    catch (TableNotFoundException e) {
                        Assertions.fail((String)"Table not found");
                    }
                });
                futures.add(future);
            }
            latch.countDown();
            for (Future future : futures) {
                future.get();
            }
            Assertions.assertEquals((int)ingestedEntryCount, (int)counter.get());
        }
    }

    private SortedSet<Text> getSplits(String ... rows) {
        return Arrays.stream(rows).map(Text::new).collect(Collectors.toCollection(TreeSet::new));
    }

    private static class ScanServerITConfiguration
    implements MiniClusterConfigurationCallback {
        private ScanServerITConfiguration() {
        }

        @Override
        public void configureMiniCluster(MiniAccumuloConfigImpl cfg, Configuration coreSite) {
            cfg.setNumScanServers(1);
            cfg.setProperty(Property.TSERV_SESSION_MAXIDLE, "3s");
        }
    }
}

