/*
 * Decompiled with CFR 0.152.
 */
package org.apache.accumulo.test;

import com.google.common.collect.Iterables;
import java.util.Collections;
import java.util.Objects;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
import org.apache.accumulo.core.client.Accumulo;
import org.apache.accumulo.core.client.AccumuloClient;
import org.apache.accumulo.core.client.BatchScanner;
import org.apache.accumulo.core.client.IteratorSetting;
import org.apache.accumulo.core.client.Scanner;
import org.apache.accumulo.core.client.ScannerBase;
import org.apache.accumulo.core.client.TableOfflineException;
import org.apache.accumulo.core.client.TimedOutException;
import org.apache.accumulo.core.client.admin.NewTableConfiguration;
import org.apache.accumulo.core.conf.ClientProperty;
import org.apache.accumulo.core.conf.Property;
import org.apache.accumulo.core.data.Range;
import org.apache.accumulo.core.fate.zookeeper.ZooReaderWriter;
import org.apache.accumulo.core.security.Authorizations;
import org.apache.accumulo.harness.MiniClusterConfigurationCallback;
import org.apache.accumulo.harness.SharedMiniClusterBase;
import org.apache.accumulo.minicluster.ServerType;
import org.apache.accumulo.miniclusterImpl.MiniAccumuloConfigImpl;
import org.apache.accumulo.test.functional.ReadWriteIT;
import org.apache.accumulo.test.functional.SlowIterator;
import org.apache.hadoop.conf.Configuration;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;

@Tag(value="MiniClusterOnly")
public class ScanServerIT
extends SharedMiniClusterBase {
    @BeforeAll
    public static void start() throws Exception {
        ScanServerITConfiguration c = new ScanServerITConfiguration();
        SharedMiniClusterBase.startMiniClusterWithConfig(c);
        SharedMiniClusterBase.getCluster().getClusterControl().start(ServerType.SCAN_SERVER, "localhost");
        String zooRoot = ScanServerIT.getCluster().getServerContext().getZooKeeperRoot();
        ZooReaderWriter zrw = ScanServerIT.getCluster().getServerContext().getZooReaderWriter();
        String scanServerRoot = zooRoot + "/sservers";
        while (zrw.getChildren(scanServerRoot).size() == 0) {
            Thread.sleep(500L);
        }
    }

    @AfterAll
    public static void stop() throws Exception {
        SharedMiniClusterBase.stopMiniCluster();
    }

    @Test
    public void testScan() throws Exception {
        try (AccumuloClient client = (AccumuloClient)Accumulo.newClient().from(ScanServerIT.getClientProps()).build();){
            String tableName = this.getUniqueNames(1)[0];
            int ingestedEntryCount = ScanServerIT.createTableAndIngest(client, tableName, null, 10, 10, "colf");
            try (Scanner scanner = client.createScanner(tableName, Authorizations.EMPTY);){
                scanner.setRange(new Range());
                scanner.setConsistencyLevel(ScannerBase.ConsistencyLevel.EVENTUAL);
                Assertions.assertEquals((int)ingestedEntryCount, (int)Iterables.size((Iterable)scanner), (String)"The scan server scanner should have seen all ingested and flushed entries");
                int additionalIngestedEntryCount = ScanServerIT.ingest(client, tableName, 10, 10, 10, "colf", false);
                Assertions.assertEquals((int)ingestedEntryCount, (int)Iterables.size((Iterable)scanner), (String)"The scan server scanner should have seen all ingested and flushed entries");
                scanner.setConsistencyLevel(ScannerBase.ConsistencyLevel.IMMEDIATE);
                Assertions.assertEquals((int)(ingestedEntryCount + additionalIngestedEntryCount), (int)Iterables.size((Iterable)scanner), (String)"Scanning against tserver should have resulted in seeing all ingested entries");
            }
        }
    }

    @Test
    public void testBatchScan() throws Exception {
        try (AccumuloClient client = (AccumuloClient)Accumulo.newClient().from(ScanServerIT.getClientProps()).build();){
            String tableName = this.getUniqueNames(1)[0];
            int ingestedEntryCount = ScanServerIT.createTableAndIngest(client, tableName, null, 10, 10, "colf");
            try (BatchScanner scanner = client.createBatchScanner(tableName, Authorizations.EMPTY);){
                scanner.setRanges(Collections.singletonList(new Range()));
                scanner.setConsistencyLevel(ScannerBase.ConsistencyLevel.EVENTUAL);
                Assertions.assertEquals((int)ingestedEntryCount, (int)Iterables.size((Iterable)scanner), (String)"The scan server scanner should have seen all ingested and flushed entries");
                int additionalIngestedEntryCount = ScanServerIT.ingest(client, tableName, 10, 10, 10, "colf", false);
                Assertions.assertEquals((int)ingestedEntryCount, (int)Iterables.size((Iterable)scanner), (String)"The scan server scanner should have seen all ingested and flushed entries");
                scanner.setConsistencyLevel(ScannerBase.ConsistencyLevel.IMMEDIATE);
                Assertions.assertEquals((int)(ingestedEntryCount + additionalIngestedEntryCount), (int)Iterables.size((Iterable)scanner), (String)"Scanning against tserver should have resulted in seeing all ingested entries");
            }
        }
    }

    @Test
    public void testScanOfflineTable() throws Exception {
        try (AccumuloClient client = (AccumuloClient)Accumulo.newClient().from(ScanServerIT.getClientProps()).build();){
            String tableName = this.getUniqueNames(1)[0];
            ScanServerIT.createTableAndIngest(client, tableName, null, 10, 10, "colf");
            client.tableOperations().offline(tableName, true);
            Assertions.assertThrows(TableOfflineException.class, () -> {
                try (Scanner scanner = client.createScanner(tableName, Authorizations.EMPTY);){
                    scanner.setRange(new Range());
                    scanner.setConsistencyLevel(ScannerBase.ConsistencyLevel.EVENTUAL);
                    Assertions.assertEquals((int)100, (int)Iterables.size((Iterable)scanner));
                }
            });
        }
    }

    @Test
    @Timeout(value=20L)
    public void testBatchScannerTimeout() throws Exception {
        Properties props = ScanServerIT.getClientProps();
        String profiles = "[{'isDefault':true,'maxBusyTimeout':'1s', 'busyTimeoutMultiplier':8, 'attemptPlans':[{'servers':'3', 'busyTimeout':'100ms'},{'servers':'100%', 'busyTimeout':'100ms'}]}]";
        props.put(ClientProperty.SCAN_SERVER_SELECTOR_OPTS_PREFIX.getKey() + "profiles", profiles);
        String tableName = this.getUniqueNames(1)[0];
        try (AccumuloClient client = (AccumuloClient)Accumulo.newClient().from(props).build();){
            ScanServerIT.createTableAndIngest(client, tableName, null, 10, 10, "colf");
            try (BatchScanner bs = client.createBatchScanner(tableName);){
                bs.setRanges(Collections.singletonList(new Range()));
                bs.setConsistencyLevel(ScannerBase.ConsistencyLevel.EVENTUAL);
                bs.stream().forEach(entry -> Assertions.assertNotNull(entry.getKey()));
                bs.setTimeout(5L, TimeUnit.SECONDS);
                IteratorSetting iterSetting = new IteratorSetting(100, SlowIterator.class);
                iterSetting.addOption("sleepTime", "2000");
                bs.addScanIterator(iterSetting);
                Assertions.assertThrows(TimedOutException.class, () -> bs.iterator().next(), (String)"batch scanner did not time out");
            }
        }
    }

    public static int createTableAndIngest(AccumuloClient client, String tableName, NewTableConfiguration ntc, int rowCount, int colCount, String colf) throws Exception {
        if (Objects.isNull(ntc)) {
            ntc = new NewTableConfiguration();
        }
        client.tableOperations().create(tableName, ntc);
        return ScanServerIT.ingest(client, tableName, rowCount, colCount, 0, colf, true);
    }

    public static int ingest(AccumuloClient client, String tableName, int rowCount, int colCount, int offset, String colf, boolean shouldFlush) throws Exception {
        ReadWriteIT.ingest(client, colCount, rowCount, 50, offset, colf, tableName);
        int ingestedEntriesCount = colCount * rowCount;
        if (shouldFlush) {
            client.tableOperations().flush(tableName, null, null, true);
        }
        return ingestedEntriesCount;
    }

    private static class ScanServerITConfiguration
    implements MiniClusterConfigurationCallback {
        private ScanServerITConfiguration() {
        }

        @Override
        public void configureMiniCluster(MiniAccumuloConfigImpl cfg, Configuration coreSite) {
            cfg.setNumScanServers(1);
            cfg.setProperty(Property.TSERV_SESSION_MAXIDLE, "3s");
            cfg.setProperty(Property.SSERV_SCAN_EXECUTORS_DEFAULT_THREADS, "1");
        }
    }
}

