/*
 * Decompiled with CFR 0.152.
 */
package org.apache.accumulo.test.functional;

import java.time.Duration;
import java.util.ArrayList;
import java.util.EnumSet;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.SortedSet;
import java.util.TreeSet;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.accumulo.core.client.Accumulo;
import org.apache.accumulo.core.client.AccumuloClient;
import org.apache.accumulo.core.client.AccumuloException;
import org.apache.accumulo.core.client.AccumuloSecurityException;
import org.apache.accumulo.core.client.BatchWriter;
import org.apache.accumulo.core.client.IteratorSetting;
import org.apache.accumulo.core.client.MutationsRejectedException;
import org.apache.accumulo.core.client.Scanner;
import org.apache.accumulo.core.client.TableNotFoundException;
import org.apache.accumulo.core.client.admin.ActiveScan;
import org.apache.accumulo.core.clientImpl.Namespace;
import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.Mutation;
import org.apache.accumulo.core.data.Range;
import org.apache.accumulo.core.data.Value;
import org.apache.accumulo.core.iterators.IteratorUtil;
import org.apache.accumulo.core.security.Authorizations;
import org.apache.accumulo.core.security.ColumnVisibility;
import org.apache.accumulo.core.util.UtilWaitThread;
import org.apache.accumulo.harness.AccumuloClusterHarness;
import org.apache.hadoop.io.Text;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ScanIdIT
extends AccumuloClusterHarness {
    private static final Logger log = LoggerFactory.getLogger(ScanIdIT.class);
    private static final int NUM_SCANNERS = 8;
    private static final int NUM_DATA_ROWS = 100;
    private static final ExecutorService pool = Executors.newFixedThreadPool(8);
    private static final AtomicBoolean testInProgress = new AtomicBoolean(true);
    private static final Map<Integer, Value> resultsByWorker = new ConcurrentHashMap<Integer, Value>();

    @Override
    protected Duration defaultTimeout() {
        return Duration.ofMinutes(1L);
    }

    @Test
    public void testScanId() throws Exception {
        String tableName = this.getUniqueNames(1)[0];
        try (AccumuloClient client = (AccumuloClient)Accumulo.newClient().from(ScanIdIT.getClientProps()).build();){
            client.tableOperations().create(tableName);
            this.addSplits(client, tableName);
            log.info("Splits added");
            this.generateSampleData(client, tableName);
            log.info("Generated data for {}", (Object)tableName);
            this.attachSlowIterator(client, tableName);
            CountDownLatch latch = new CountDownLatch(8);
            ArrayList<ScannerThread> scanThreadsToClose = new ArrayList<ScannerThread>(8);
            for (int scannerIndex = 0; scannerIndex < 8; ++scannerIndex) {
                ScannerThread st2 = new ScannerThread(client, scannerIndex, tableName, latch);
                scanThreadsToClose.add(st2);
                pool.execute(st2);
            }
            while (testInProgress.get()) {
                if (resultsByWorker.size() < 8) {
                    log.trace("Results reported {}", (Object)resultsByWorker.size());
                    UtilWaitThread.sleepUninterruptibly((long)750L, (TimeUnit)TimeUnit.MILLISECONDS);
                    continue;
                }
                testInProgress.set(false);
                log.debug("Final result count {}", (Object)resultsByWorker.size());
                UtilWaitThread.sleepUninterruptibly((long)1L, (TimeUnit)TimeUnit.SECONDS);
            }
            Set<Long> scanIds = this.getScanIds(client);
            Assertions.assertTrue((scanIds.size() >= 8 ? 1 : 0) != 0, (String)("Expected at least 8 scanIds, but saw " + scanIds.size()));
            scanThreadsToClose.forEach(st -> {
                if (st.scanner != null) {
                    st.scanner.close();
                }
            });
            while (!(scanIds = this.getScanIds(client)).isEmpty()) {
                log.debug("Waiting for active scans to stop...");
                Thread.sleep(200L);
            }
            Assertions.assertEquals((int)0, (int)scanIds.size(), (String)"Expected no scanIds after closing scanners");
        }
    }

    private Set<Long> getScanIds(AccumuloClient client) throws AccumuloSecurityException, InterruptedException, AccumuloException {
        HashSet<Long> scanIds = new HashSet<Long>();
        List tservers = client.instanceOperations().getTabletServers();
        log.debug("tablet servers {}", (Object)tservers);
        for (String tserver : tservers) {
            List activeScans = null;
            for (int i = 0; i < 10; ++i) {
                try {
                    activeScans = client.instanceOperations().getActiveScans(tserver);
                    break;
                }
                catch (AccumuloException e) {
                    if (!(e.getCause() instanceof TableNotFoundException)) {
                        throw e;
                    }
                    log.debug("Got TableNotFoundException, will retry");
                    Thread.sleep(200L);
                    continue;
                }
            }
            Assertions.assertNotNull(activeScans, (String)"Repeatedly got exception trying to active scans");
            activeScans.removeIf(scan -> scan.getTable().startsWith(Namespace.ACCUMULO.name() + "."));
            log.debug("TServer {} has {} active non-metadata scans", (Object)tserver, (Object)activeScans.size());
            for (ActiveScan scan2 : activeScans) {
                log.debug("Tserver {} scan id {} ({})", new Object[]{tserver, scan2.getScanid(), scan2.getTable()});
                scanIds.add(scan2.getScanid());
            }
        }
        return scanIds;
    }

    private void addSplits(AccumuloClient client, String tableName) {
        SortedSet<Text> splits = this.createSplits();
        try {
            client.tableOperations().addSplits(tableName, splits);
            client.tableOperations().offline(tableName, true);
            UtilWaitThread.sleepUninterruptibly((long)2L, (TimeUnit)TimeUnit.SECONDS);
            client.tableOperations().online(tableName, true);
            for (Text split : client.tableOperations().listSplits(tableName)) {
                log.trace("Split {}", (Object)split);
            }
        }
        catch (AccumuloException | AccumuloSecurityException | TableNotFoundException e) {
            throw new IllegalStateException("Initialization failed. Could not add splits to " + tableName, e);
        }
    }

    private SortedSet<Text> createSplits() {
        TreeSet<Text> splits = new TreeSet<Text>();
        for (int split = 0; split < 10; ++split) {
            splits.add(new Text(Integer.toString(split)));
        }
        return splits;
    }

    private void generateSampleData(AccumuloClient accumuloClient, String tablename) {
        try (BatchWriter bw = accumuloClient.createBatchWriter(tablename);){
            ColumnVisibility vis = new ColumnVisibility("public");
            for (int i = 0; i < 100; ++i) {
                Text rowId = new Text(String.format("%d", random.nextInt(10) * 100 + i));
                Mutation m = new Mutation(rowId);
                m.put((CharSequence)"fam1", (CharSequence)"count", (CharSequence)Integer.toString(i));
                m.put(new Text("fam1"), new Text("positive"), vis, new Value((CharSequence)Integer.toString(100 - i)));
                m.put(new Text("fam1"), new Text("negative"), vis, new Value((CharSequence)Integer.toString(i - 100)));
                log.trace("Added row {}", (Object)rowId);
                bw.addMutation(m);
            }
        }
        catch (MutationsRejectedException | TableNotFoundException ex) {
            throw new IllegalStateException("Initialization failed. Could not create test data", ex);
        }
    }

    private void attachSlowIterator(AccumuloClient accumuloClient, String tablename) {
        try {
            IteratorSetting slowIter = new IteratorSetting(50, "slowIter", "org.apache.accumulo.test.functional.SlowIterator");
            slowIter.addOption("sleepTime", "200");
            slowIter.addOption("seekSleepTime", "200");
            accumuloClient.tableOperations().attachIterator(tablename, slowIter, EnumSet.of(IteratorUtil.IteratorScope.scan));
        }
        catch (AccumuloException | AccumuloSecurityException | TableNotFoundException ex) {
            throw new IllegalStateException("Initialization failed. Could not attach slow iterator", ex);
        }
    }

    private static class ScannerThread
    implements Runnable {
        private final AccumuloClient accumuloClient;
        private Scanner scanner = null;
        private final int workerIndex;
        private final String tablename;
        private final CountDownLatch latch;

        public ScannerThread(AccumuloClient accumuloClient, int workerIndex, String tablename, CountDownLatch latch) {
            this.accumuloClient = accumuloClient;
            this.workerIndex = workerIndex;
            this.tablename = tablename;
            this.latch = latch;
        }

        @Override
        public void run() {
            this.latch.countDown();
            try {
                this.latch.await();
            }
            catch (InterruptedException e) {
                log.error("Thread interrupted with id {}", (Object)this.workerIndex);
                Thread.currentThread().interrupt();
                return;
            }
            log.debug("Creating scanner in worker thread {}", (Object)this.workerIndex);
            try {
                this.scanner = this.accumuloClient.createScanner(this.tablename, new Authorizations());
                this.scanner.setReadaheadThreshold(Long.MAX_VALUE);
                this.scanner.setBatchSize(1);
                this.scanner.setRange(new Range(new Text(Integer.toString(this.workerIndex)), new Text("9")));
                this.scanner.fetchColumnFamily(new Text("fam1"));
                for (Map.Entry entry : this.scanner) {
                    if (!testInProgress.get()) {
                        this.scanner.clearScanIterators();
                        return;
                    }
                    Text row = ((Key)entry.getKey()).getRow();
                    log.debug("worker {}, row {}", (Object)this.workerIndex, (Object)row);
                    if (entry.getValue() != null) {
                        Value prevValue = resultsByWorker.put(this.workerIndex, (Value)entry.getValue());
                        if (prevValue == null) continue;
                        log.trace("worker {} values {}", (Object)this.workerIndex, (Object)String.format("%1$s < %2$s", prevValue, entry.getValue()));
                        Assertions.assertTrue((prevValue.compareTo(entry.getValue()) > 0 ? 1 : 0) != 0);
                        continue;
                    }
                    log.info("Scanner returned null");
                    Assertions.fail((String)"Scanner returned unexpected null value");
                }
                log.debug("Scanner ran out of data. (info only, not an error) ");
            }
            catch (TableNotFoundException e) {
                throw new IllegalStateException("Initialization failure. Could not create scanner", e);
            }
        }
    }
}

