/*
 * Decompiled with CFR 0.152.
 */
package org.apache.accumulo.test;

import com.google.common.base.Preconditions;
import com.google.common.collect.Collections2;
import com.google.common.collect.MoreCollectors;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.Random;
import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.accumulo.core.client.Accumulo;
import org.apache.accumulo.core.client.AccumuloClient;
import org.apache.accumulo.core.client.BatchScanner;
import org.apache.accumulo.core.client.BatchWriter;
import org.apache.accumulo.core.client.IsolatedScanner;
import org.apache.accumulo.core.client.IteratorSetting;
import org.apache.accumulo.core.client.Scanner;
import org.apache.accumulo.core.client.admin.CompactionConfig;
import org.apache.accumulo.core.client.rfile.RFile;
import org.apache.accumulo.core.client.rfile.RFileWriter;
import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.Mutation;
import org.apache.accumulo.core.data.Range;
import org.apache.accumulo.core.data.Value;
import org.apache.accumulo.core.iterators.Filter;
import org.apache.accumulo.core.iterators.IteratorEnvironment;
import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
import org.apache.accumulo.harness.AccumuloClusterHarness;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocalFileSystem;
import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.io.Text;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ScanConsistencyIT
extends AccumuloClusterHarness {
    private static final Logger log = LoggerFactory.getLogger(ScanConsistencyIT.class);

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    @SuppressFBWarnings(value={"PREDICTABLE_RANDOM", "DMI_RANDOM_USED_ONLY_ONCE"}, justification="predictable random is ok for testing")
    public void testConcurrentScanConsistency() throws Exception {
        String table = this.getUniqueNames(1)[0];
        ExecutorService executor = Executors.newCachedThreadPool();
        try (AccumuloClient client = (AccumuloClient)Accumulo.newClient().from(ScanConsistencyIT.getClientProps()).build();){
            Object stats;
            int i;
            client.tableOperations().create(table);
            TestContext testContext = new TestContext(client, table, ScanConsistencyIT.getCluster().getFileSystem(), ScanConsistencyIT.getCluster().getTemporaryPath().toString());
            ArrayList<Future<WriteStats>> writeTasks = new ArrayList<Future<WriteStats>>();
            ArrayList<Future<ScanStats>> scanTasks = new ArrayList<Future<ScanStats>>();
            Random random = new Random();
            int numWriteTask = random.nextInt(10) + 1;
            int numsScanTask = random.nextInt(10) + 1;
            for (i = 0; i < numWriteTask; ++i) {
                writeTasks.add(executor.submit(new WriteTask(testContext)));
            }
            for (i = 0; i < numsScanTask; ++i) {
                scanTasks.add(executor.submit(new ScanTask(testContext)));
            }
            Future<String> tableOpsTask = executor.submit(new TableOpsTask(testContext));
            Thread.sleep(60000L);
            testContext.keepRunning.set(false);
            for (Future future : writeTasks) {
                stats = (WriteStats)future.get();
                log.debug(String.format("Wrote:%,d Bulk imported:%,d Deleted:%,d Bulk deleted:%,d", ((WriteStats)stats).written, ((WriteStats)stats).bulkImported, ((WriteStats)stats).deleted, ((WriteStats)stats).bulkDeleted));
                Assertions.assertTrue((((WriteStats)stats).written + ((WriteStats)stats).bulkImported > 0L ? 1 : 0) != 0);
                Assertions.assertTrue((((WriteStats)stats).deleted + ((WriteStats)stats).bulkDeleted > 0L ? 1 : 0) != 0);
            }
            for (Future future : scanTasks) {
                stats = (ScanStats)future.get();
                log.debug(String.format("Scanned:%,d verified:%,d", ((ScanStats)stats).scanned, ((ScanStats)stats).verified));
                Assertions.assertTrue((((ScanStats)stats).verified > 0L ? 1 : 0) != 0);
                Assertions.assertTrue((((ScanStats)stats).scanned > ((ScanStats)stats).verified ? 1 : 0) != 0);
            }
            log.debug(tableOpsTask.get());
            ScanStats stats1 = ScanConsistencyIT.scanData(testContext, random, new Range(), false);
            ScanStats scanStats = ScanConsistencyIT.scanData(testContext, random, new Range(), true);
            ScanStats stats3 = ScanConsistencyIT.batchScanData(testContext, new Range());
            log.debug(String.format("Final scan, scanned:%,d verified:%,d", stats1.scanned, stats1.verified));
            Assertions.assertTrue((stats1.verified > 0L ? 1 : 0) != 0);
            Assertions.assertEquals((long)stats1.scanned, (long)stats1.verified);
            Assertions.assertEquals((long)scanStats.scanned, (long)stats1.scanned);
            Assertions.assertEquals((long)scanStats.verified, (long)stats1.verified);
            Assertions.assertEquals((long)stats3.scanned, (long)stats1.scanned);
            Assertions.assertEquals((long)stats3.verified, (long)stats1.verified);
        }
        finally {
            executor.shutdownNow();
        }
    }

    private static Stream<Key> toKeys(Mutation m) {
        return m.getUpdates().stream().map(cu -> new Key(m.getRow(), cu.getColumnFamily(), cu.getColumnQualifier(), cu.getColumnVisibility(), 0L, cu.isDeleted(), false));
    }

    private static ScanStats scan(Stream<Map.Entry<Key, Value>> scanner, Set<Key> expected) {
        ScanStats stats = new ScanStats();
        scanner.forEach(entry -> {
            ++stats.scanned;
            Key key = (Key)entry.getKey();
            key.setTimestamp(0L);
            if (expected.remove(key)) {
                ++stats.verified;
            }
        });
        Assertions.assertTrue((boolean)expected.isEmpty());
        return stats;
    }

    private static ScanStats batchScanData(TestContext tctx, Range range) throws Exception {
        try (ExpectedScanData expectedScanData = tctx.dataTracker.beginScan();){
            ScanStats scanStats;
            block12: {
                BatchScanner scanner = tctx.client.createBatchScanner(tctx.table);
                try {
                    Set<Key> expected = expectedScanData.getExpectedData(range).collect(Collectors.toSet());
                    scanner.setRanges(List.of(range));
                    scanStats = ScanConsistencyIT.scan(scanner.stream(), expected);
                    if (scanner == null) break block12;
                }
                catch (Throwable throwable) {
                    if (scanner != null) {
                        try {
                            scanner.close();
                        }
                        catch (Throwable throwable2) {
                            throwable.addSuppressed(throwable2);
                        }
                    }
                    throw throwable;
                }
                scanner.close();
            }
            return scanStats;
        }
    }

    private static ScanStats scanData(TestContext tctx, Random random, Range range, boolean scanIsolated) throws Exception {
        try (ExpectedScanData expectedScanData = tctx.dataTracker.beginScan();){
            ScanStats scanStats;
            block14: {
                IsolatedScanner scanner = scanIsolated ? new IsolatedScanner(tctx.client.createScanner(tctx.table)) : tctx.client.createScanner(tctx.table);
                try {
                    Stream<Map.Entry> scanStream;
                    Set<Key> expected = expectedScanData.getExpectedData(range).collect(Collectors.toSet());
                    if (!range.isInfiniteStopKey() && random.nextBoolean()) {
                        Range openEndedRange = new Range(range.getStartKey(), range.isStartKeyInclusive(), null, true);
                        scanner.setRange(openEndedRange);
                        scanStream = scanner.stream().takeWhile(entry -> range.contains((Key)entry.getKey()));
                    } else {
                        scanner.setRange(range);
                        scanStream = scanner.stream();
                    }
                    scanStats = ScanConsistencyIT.scan((Stream<Map.Entry<Key, Value>>)scanStream, expected);
                    if (scanner == null) break block14;
                }
                catch (Throwable throwable) {
                    if (scanner != null) {
                        try {
                            scanner.close();
                        }
                        catch (Throwable throwable2) {
                            throwable.addSuppressed(throwable2);
                        }
                    }
                    throw throwable;
                }
                scanner.close();
            }
            return scanStats;
        }
    }

    private static long nextLongAbs(Random random) {
        return random.nextLong() & Long.MAX_VALUE;
    }

    public static void findRow(String row, String tableDir) throws Exception {
        LocalFileSystem fs = FileSystem.getLocal((Configuration)new Configuration());
        RemoteIterator iter = fs.listFiles(new Path(tableDir), true);
        while (iter.hasNext()) {
            LocatedFileStatus f = (LocatedFileStatus)iter.next();
            if (!f.isFile() || !f.getPath().getName().endsWith(".rf")) continue;
            Scanner scanner = RFile.newScanner().from(new String[]{f.getPath().toString()}).withFileSystem((FileSystem)fs).withoutSystemIterators().build();
            try {
                scanner.setRange(new Range(new Text(row)));
                Iterator siter = scanner.iterator();
                if (!siter.hasNext()) continue;
                System.out.println("File " + f.getPath().getName());
                Map.Entry e = (Map.Entry)siter.next();
                System.out.println("  " + e.getKey() + " " + e.getValue());
            }
            finally {
                if (scanner == null) continue;
                scanner.close();
            }
        }
    }

    private static class TestContext {
        final DataTracker dataTracker = new DataTracker();
        final AccumuloClient client;
        final String table;
        final AtomicBoolean keepRunning = new AtomicBoolean(true);
        final AtomicLong generationCounter = new AtomicLong(0L);
        final FileSystem fileSystem;
        private final String tmpDir;

        private TestContext(AccumuloClient client, String table, FileSystem fs, String tmpDir) {
            this.client = client;
            this.table = table;
            this.fileSystem = fs;
            this.tmpDir = tmpDir;
        }
    }

    private static class WriteTask
    implements Callable<WriteStats> {
        private final TestContext tctx;

        private WriteTask(TestContext testContext) {
            this.tctx = testContext;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        private long bulkImport(Random random, Collection<Mutation> mutations) throws Exception {
            if (mutations.isEmpty()) {
                return 0L;
            }
            String name = "/bulkimport_" + ScanConsistencyIT.nextLongAbs(random);
            Path bulkDir = new Path(this.tctx.tmpDir + name);
            Path failDir = new Path(this.tctx.tmpDir + name + "_failures");
            List keys = mutations.stream().flatMap(x$0 -> ScanConsistencyIT.toKeys(x$0)).sorted().collect(Collectors.toList());
            Value val = new Value();
            try {
                this.tctx.fileSystem.mkdirs(bulkDir);
                try (RFileWriter writer = RFile.newWriter().to(bulkDir + "/f1.rf").withFileSystem(this.tctx.fileSystem).build();){
                    writer.startDefaultLocalityGroup();
                    for (Key key : keys) {
                        writer.append(key, val);
                    }
                }
                if (random.nextBoolean()) {
                    this.tctx.fileSystem.mkdirs(failDir);
                    this.tctx.client.tableOperations().importDirectory(this.tctx.table, bulkDir.toString(), failDir.toString(), true);
                    Assertions.assertEquals((int)0, (int)this.tctx.fileSystem.listStatus(failDir).length, (String)("Failure dir was not empty " + failDir));
                } else {
                    this.tctx.client.tableOperations().importDirectory(bulkDir.toString()).to(this.tctx.table).tableTime(true).load();
                }
            }
            finally {
                this.tctx.fileSystem.delete(bulkDir, true);
                this.tctx.fileSystem.delete(failDir, true);
            }
            return keys.size();
        }

        private long write(Iterable<Mutation> mutations) throws Exception {
            long written = 0L;
            try (BatchWriter writer = this.tctx.client.createBatchWriter(this.tctx.table);){
                for (Mutation m : mutations) {
                    writer.addMutation(m);
                    written += (long)m.size();
                }
            }
            return written;
        }

        @Override
        @SuppressFBWarnings(value={"PREDICTABLE_RANDOM", "DMI_RANDOM_USED_ONLY_ONCE"}, justification="predictable random is ok for testing")
        public WriteStats call() throws Exception {
            WriteStats stats = new WriteStats();
            Random random = new Random();
            while (this.tctx.keepRunning.get()) {
                if (this.tctx.dataTracker.estimatedRows() > 2000L) {
                    if (random.nextInt(5) == 0) {
                        Collection<Mutation> deletes = this.tctx.dataTracker.getDeletes();
                        this.tctx.client.tableOperations().flush(this.tctx.table, null, null, true);
                        stats.bulkDeleted += this.bulkImport(random, deletes);
                        continue;
                    }
                    stats.deleted += this.write(this.tctx.dataTracker.getDeletes());
                    continue;
                }
                ArrayList<Mutation> dataAdded = new ArrayList<Mutation>();
                long generation = this.tctx.generationCounter.getAndIncrement();
                int rowsToGenerate = random.nextInt(1000);
                HashSet<Long> seen = new HashSet<Long>();
                for (int i = 0; i < rowsToGenerate; ++i) {
                    Mutation m = this.generateMutation(random, generation, seen);
                    dataAdded.add(m);
                }
                if (random.nextInt(5) == 0) {
                    stats.bulkImported += this.bulkImport(random, dataAdded);
                } else {
                    stats.written += this.write(dataAdded);
                }
                this.tctx.dataTracker.addExpectedData(dataAdded);
            }
            return stats;
        }

        private Mutation generateMutation(Random random, long generation, Set<Long> seen) {
            int cols = random.nextInt(100) + 1;
            long nextRow = ScanConsistencyIT.nextLongAbs(random);
            while (!seen.add(nextRow)) {
                nextRow = ScanConsistencyIT.nextLongAbs(random);
            }
            String row = String.format("%016x:%016x", nextRow, generation);
            Mutation m = new Mutation((CharSequence)row);
            for (int i = 0; i < cols; ++i) {
                m.put((CharSequence)String.valueOf(random.nextInt(10)), (CharSequence)String.format("%04x", i), (CharSequence)"");
            }
            return m;
        }
    }

    private static class ScanTask
    implements Callable<ScanStats> {
        private final TestContext tctx;

        private ScanTask(TestContext testContext) {
            this.tctx = testContext;
        }

        @Override
        @SuppressFBWarnings(value={"PREDICTABLE_RANDOM", "DMI_RANDOM_USED_ONLY_ONCE"}, justification="predictable random is ok for testing")
        public ScanStats call() throws Exception {
            ScanStats allStats = new ScanStats();
            Random random = new Random();
            while (this.tctx.keepRunning.get()) {
                Range range;
                if (random.nextInt(10) == 0) {
                    range = new Range();
                } else {
                    long start = ScanConsistencyIT.nextLongAbs(random);
                    long end = ScanConsistencyIT.nextLongAbs(random);
                    while (end <= start) {
                        end = ScanConsistencyIT.nextLongAbs(random);
                    }
                    range = new Range((CharSequence)String.format("%016x", start), (CharSequence)String.format("%016x", end));
                }
                int scanChance = random.nextInt(3);
                if (scanChance == 0) {
                    allStats.add(ScanConsistencyIT.scanData(this.tctx, random, range, false));
                    continue;
                }
                if (scanChance == 1) {
                    allStats.add(ScanConsistencyIT.scanData(this.tctx, random, range, true));
                    continue;
                }
                allStats.add(ScanConsistencyIT.batchScanData(this.tctx, range));
            }
            return allStats;
        }
    }

    private static class TableOpsTask
    implements Callable<String> {
        private final TestContext tctx;

        private TableOpsTask(TestContext testContext) {
            this.tctx = testContext;
        }

        @Override
        @SuppressFBWarnings(value={"PREDICTABLE_RANDOM", "DMI_RANDOM_USED_ONLY_ONCE"}, justification="predictable random is ok for testing")
        public String call() throws Exception {
            int numFlushes = 0;
            int numCompactions = 0;
            int numSplits = 0;
            int numMerges = 0;
            int numFilters = 0;
            Random random = new Random();
            while (this.tctx.keepRunning.get()) {
                Collection<Mutation> deletes;
                Thread.sleep(1000L);
                int pick = random.nextInt(100);
                if (pick < 10) {
                    this.tctx.client.tableOperations().flush(this.tctx.table, null, null, random.nextBoolean());
                    ++numFlushes;
                    continue;
                }
                if (pick < 15) {
                    this.tctx.client.tableOperations().compact(this.tctx.table, new CompactionConfig().setFlush(random.nextBoolean()).setWait(random.nextBoolean()));
                    ++numCompactions;
                    continue;
                }
                if (pick < 20) {
                    int splitsToAdd = random.nextInt(10);
                    TreeSet<Text> splits = new TreeSet<Text>();
                    for (int i = 0; i < splitsToAdd; ++i) {
                        splits.add(new Text(String.format("%016x", ScanConsistencyIT.nextLongAbs(random))));
                    }
                    this.tctx.client.tableOperations().addSplits(this.tctx.table, splits);
                    numSplits += splitsToAdd;
                    continue;
                }
                if (pick < 25) {
                    long start = ScanConsistencyIT.nextLongAbs(random);
                    long end = ScanConsistencyIT.nextLongAbs(random);
                    while (end <= start) {
                        end = ScanConsistencyIT.nextLongAbs(random);
                    }
                    this.tctx.client.tableOperations().merge(this.tctx.table, new Text(String.format("%016x", start)), new Text(String.format("%016x", end)));
                    ++numMerges;
                    continue;
                }
                if (pick >= 30 || (deletes = this.tctx.dataTracker.getDeletes()).isEmpty()) continue;
                String gen = (String)deletes.stream().map(m -> new String(m.getRow(), StandardCharsets.UTF_8)).map(row -> row.split(":")[1]).distinct().collect(MoreCollectors.onlyElement());
                IteratorSetting iterSetting = new IteratorSetting(100, "genfilter", GenerationFilter.class);
                iterSetting.addOptions(Map.of("generation", gen));
                this.tctx.client.tableOperations().compact(this.tctx.table, new CompactionConfig().setFlush(true).setWait(true).setIterators(List.of(iterSetting)));
                ++numFilters;
            }
            return String.format("Flushes:%,d Compactions:%,d Splits added:%,d Merges:%,d Filter compactions:%,d", numFlushes, numCompactions, numSplits, numMerges, numFilters);
        }
    }

    private static class WriteStats {
        long written;
        long deleted;
        long bulkImported;
        long bulkDeleted;

        private WriteStats() {
        }
    }

    private static class ScanStats {
        long scanned;
        long verified;

        private ScanStats() {
        }

        public void add(ScanStats stats) {
            this.scanned += stats.scanned;
            this.verified += stats.verified;
        }
    }

    private static class DataTracker {
        private final Queue<DataSet> dataSets = new ConcurrentLinkedQueue<DataSet>();

        private DataTracker() {
        }

        public ExpectedScanData beginScan() {
            ArrayList<DataSet> reservedData = new ArrayList<DataSet>();
            for (DataSet dataSet : this.dataSets) {
                if (!dataSet.reserveForScan()) continue;
                reservedData.add(dataSet);
            }
            return new ExpectedScanData(reservedData);
        }

        public void addExpectedData(List<Mutation> data) {
            this.dataSets.add(new DataSet(data));
        }

        public Collection<Mutation> getDeletes() {
            DataSet dataSet = this.dataSets.poll();
            if (dataSet == null) {
                return List.of();
            }
            dataSet.reserveForDelete();
            return Collections2.transform(dataSet.data, m -> {
                Mutation delMutation = new Mutation(m.getRow());
                m.getUpdates().forEach(cu -> delMutation.putDelete(cu.getColumnFamily(), cu.getColumnQualifier()));
                return delMutation;
            });
        }

        public long estimatedRows() {
            return this.dataSets.stream().mapToLong(ds -> ds.data.size()).sum();
        }
    }

    private static class ExpectedScanData
    implements AutoCloseable {
        private final List<DataSet> reservedData;

        public ExpectedScanData(List<DataSet> reservedData) {
            this.reservedData = reservedData;
        }

        Stream<Key> getExpectedData(Range range) {
            return this.reservedData.stream().flatMap(ds -> ds.getExpectedData(range));
        }

        @Override
        public void close() {
            this.reservedData.forEach(DataSet::unreserveForScan);
        }
    }

    public static class GenerationFilter
    extends Filter {
        private String generation;

        public void init(SortedKeyValueIterator<Key, Value> source, Map<String, String> options, IteratorEnvironment env) throws IOException {
            super.init(source, options, env);
            this.generation = options.get("generation");
        }

        public boolean accept(Key k, Value v) {
            String kgen = k.getRowData().toString().split(":")[1];
            return !this.generation.equals(kgen);
        }
    }

    private static class DataSet {
        private final List<Mutation> data;
        private int activeScans = 0;
        private boolean deleting = false;

        public DataSet(List<Mutation> data) {
            this.data = data;
        }

        synchronized boolean reserveForScan() {
            if (this.deleting) {
                return false;
            }
            ++this.activeScans;
            return true;
        }

        synchronized void unreserveForScan() {
            --this.activeScans;
            Preconditions.checkState((this.activeScans >= 0 ? 1 : 0) != 0);
            if (this.activeScans == 0) {
                this.notify();
            }
        }

        synchronized void reserveForDelete() {
            Preconditions.checkState((!this.deleting ? 1 : 0) != 0);
            this.deleting = true;
            while (this.activeScans > 0) {
                try {
                    this.wait(50L);
                }
                catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
            }
        }

        Stream<Key> getExpectedData(Range range) {
            return this.data.stream().flatMap(x$0 -> ScanConsistencyIT.toKeys(x$0)).filter(arg_0 -> ((Range)range).contains(arg_0));
        }
    }
}

