/*
 * Decompiled with CFR 0.152.
 */
package org.apache.accumulo.test.functional;

import com.google.common.collect.Iterators;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.Map;
import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.apache.accumulo.core.client.Accumulo;
import org.apache.accumulo.core.client.AccumuloClient;
import org.apache.accumulo.core.client.BatchWriter;
import org.apache.accumulo.core.client.BatchWriterConfig;
import org.apache.accumulo.core.client.MutationsRejectedException;
import org.apache.accumulo.core.client.Scanner;
import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.Mutation;
import org.apache.accumulo.core.data.Range;
import org.apache.accumulo.core.data.Value;
import org.apache.accumulo.core.security.Authorizations;
import org.apache.accumulo.core.util.UtilWaitThread;
import org.apache.accumulo.core.util.threads.ThreadPools;
import org.apache.accumulo.harness.AccumuloClusterHarness;
import org.apache.hadoop.io.Text;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;

public class BatchWriterFlushIT
extends AccumuloClusterHarness {
    private static final int NUM_TO_FLUSH = 100000;
    private static final int NUM_THREADS = 3;

    @Override
    protected Duration defaultTimeout() {
        return Duration.ofSeconds(90L);
    }

    @Test
    public void run() throws Exception {
        try (AccumuloClient c = (AccumuloClient)Accumulo.newClient().from(BatchWriterFlushIT.getClientProps()).build();){
            String[] tableNames = this.getUniqueNames(2);
            String bwft = tableNames[0];
            c.tableOperations().create(bwft);
            String bwlt = tableNames[1];
            c.tableOperations().create(bwlt);
            this.runFlushTest(c, bwft);
            this.runLatencyTest(c, bwlt);
        }
    }

    private void runLatencyTest(AccumuloClient client, String tableName) throws Exception {
        try (BatchWriter bw = client.createBatchWriter(tableName, new BatchWriterConfig().setMaxLatency(1000L, TimeUnit.MILLISECONDS));
             Scanner scanner = client.createScanner(tableName, Authorizations.EMPTY);){
            Mutation m = new Mutation(new Text(String.format("r_%10d", 1)));
            m.put((CharSequence)"cf", (CharSequence)"cq", (CharSequence)"1");
            bw.addMutation(m);
            UtilWaitThread.sleepUninterruptibly((long)500L, (TimeUnit)TimeUnit.MILLISECONDS);
            int count = Iterators.size((Iterator)scanner.iterator());
            if (count != 0) {
                throw new Exception("Flushed too soon");
            }
            UtilWaitThread.sleepUninterruptibly((long)1500L, (TimeUnit)TimeUnit.MILLISECONDS);
            count = Iterators.size((Iterator)scanner.iterator());
            if (count != 1) {
                throw new Exception("Did not flush");
            }
        }
    }

    private void runFlushTest(AccumuloClient client, String tableName) throws Exception {
        BatchWriter bw = client.createBatchWriter(tableName);
        try (Scanner scanner = client.createScanner(tableName, Authorizations.EMPTY);){
            for (int i = 0; i < 4; ++i) {
                Map.Entry entry;
                for (int j = 0; j < 100000; ++j) {
                    int row = i * 100000 + j;
                    Mutation m = new Mutation(new Text(String.format("r_%10d", row)));
                    m.put((CharSequence)"cf", (CharSequence)"cq", (CharSequence)("" + row));
                    bw.addMutation(m);
                }
                bw.flush();
                for (int k = 0; k < 10; ++k) {
                    int rowToLookup = random.nextInt(100000) + i * 100000;
                    scanner.setRange(new Range(new Text(String.format("r_%10d", rowToLookup))));
                    Iterator iter = scanner.iterator();
                    if (!iter.hasNext()) {
                        throw new Exception(" row " + rowToLookup + " not found after flush");
                    }
                    entry = (Map.Entry)iter.next();
                    if (iter.hasNext()) {
                        throw new Exception("Scanner returned too much");
                    }
                    this.verifyEntry(rowToLookup, entry);
                }
                scanner.setRange(new Range(new Text(String.format("r_%10d", i * 100000)), true, new Text(String.format("r_%10d", (i + 1) * 100000)), false));
                Iterator iter = scanner.iterator();
                for (int j = 0; j < 100000; ++j) {
                    int row = i * 100000 + j;
                    if (!iter.hasNext()) {
                        throw new Exception("Scan stopped prematurely at " + row);
                    }
                    entry = (Map.Entry)iter.next();
                    this.verifyEntry(row, entry);
                }
                if (!iter.hasNext()) continue;
                throw new Exception("Scanner returned too much");
            }
            bw.close();
            boolean caught = false;
            try {
                bw.addMutation(new Mutation(new Text("foobar")));
            }
            catch (IllegalStateException ise) {
                caught = true;
            }
            if (!caught) {
                throw new Exception("Adding to closed batch writer did not fail");
            }
        }
    }

    @Test
    public void runMultiThreadedBinningTest() throws Exception {
        try (AccumuloClient c = (AccumuloClient)Accumulo.newClient().from(BatchWriterFlushIT.getClientProps()).build();){
            String[] tableNames = this.getUniqueNames(1);
            String tableName = tableNames[0];
            c.tableOperations().create(tableName);
            for (int x = 0; x < 3; ++x) {
                c.tableOperations().addSplits(tableName, new TreeSet<Text>(Collections.singleton(new Text(Integer.toString(x * 100000)))));
            }
            c.instanceOperations().waitForBalance();
            final LinkedList allMuts = new LinkedList();
            ArrayList<Mutation> data = new ArrayList<Mutation>();
            for (int i = 0; i < 3; ++i) {
                for (int j = 0; j < 100000; ++j) {
                    int row = i * 100000 + j;
                    Mutation m = new Mutation(new Text(String.format("%10d", row)));
                    m.put((CharSequence)("cf" + i), (CharSequence)"cq", (CharSequence)("" + row));
                    data.add(m);
                }
            }
            Assertions.assertEquals((int)300000, (int)data.size());
            Collections.shuffle(data);
            for (int n = 0; n < 300000; n += 100000) {
                HashSet muts = new HashSet(data.subList(n, n + 100000));
                allMuts.add(muts);
            }
            ThreadPoolExecutor threads = ThreadPools.getServerThreadPools().getPoolBuilder("batch.writer.client.flush").numCoreThreads(3).build();
            threads.allowCoreThreadTimeOut(false);
            threads.prestartAllCoreThreads();
            BatchWriterConfig cfg = new BatchWriterConfig();
            cfg.setMaxLatency(10L, TimeUnit.SECONDS);
            cfg.setMaxMemory(0x100000L);
            cfg.setMaxWriteThreads(3);
            final BatchWriter bw = c.createBatchWriter(tableName, cfg);
            int k = 0;
            while (k < 3) {
                final int idx = k++;
                threads.execute(new Runnable(){

                    @Override
                    public void run() {
                        try {
                            bw.addMutations((Iterable)allMuts.get(idx));
                            bw.flush();
                        }
                        catch (MutationsRejectedException e) {
                            Assertions.fail((String)"Error adding mutations to batch writer");
                        }
                    }
                });
            }
            threads.shutdown();
            threads.awaitTermination(3L, TimeUnit.MINUTES);
            bw.close();
            try (Scanner scanner = c.createScanner(tableName, Authorizations.EMPTY);){
                for (Map.Entry e : scanner) {
                    Mutation m = new Mutation(((Key)e.getKey()).getRow());
                    m.put(((Key)e.getKey()).getColumnFamily(), ((Key)e.getKey()).getColumnQualifier(), (Value)e.getValue());
                    boolean found = false;
                    for (int l = 0; l < 3; ++l) {
                        if (!((Set)allMuts.get(l)).contains(m)) continue;
                        found = true;
                        ((Set)allMuts.get(l)).remove(m);
                        break;
                    }
                    Assertions.assertTrue((boolean)found, (String)("Mutation not found: " + m));
                }
                for (int m = 0; m < 3; ++m) {
                    Assertions.assertEquals((int)0, (int)((Set)allMuts.get(m)).size());
                }
            }
        }
    }

    private void verifyEntry(int row, Map.Entry<Key, Value> entry) throws Exception {
        if (!entry.getKey().getRow().toString().equals(String.format("r_%10d", row))) {
            throw new Exception("Unexpected key returned, expected " + row + " got " + entry.getKey());
        }
        if (!entry.getValue().toString().equals("" + row)) {
            throw new Exception("Unexpected value, expected " + row + " got " + entry.getValue());
        }
    }
}

