/*
 * Decompiled with CFR 0.152.
 */
package org.apache.accumulo.test;

import java.io.IOException;
import java.time.Duration;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.accumulo.core.client.Accumulo;
import org.apache.accumulo.core.client.AccumuloClient;
import org.apache.accumulo.core.client.BatchWriter;
import org.apache.accumulo.core.client.Scanner;
import org.apache.accumulo.core.client.TableNotFoundException;
import org.apache.accumulo.core.conf.Property;
import org.apache.accumulo.core.data.Mutation;
import org.apache.accumulo.core.data.Value;
import org.apache.accumulo.core.metadata.MetadataTable;
import org.apache.accumulo.core.security.Authorizations;
import org.apache.accumulo.core.util.UtilWaitThread;
import org.apache.accumulo.minicluster.ServerType;
import org.apache.accumulo.miniclusterImpl.MiniAccumuloConfigImpl;
import org.apache.accumulo.test.functional.ConfigurableMacBase;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.RawLocalFileSystem;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;

public class MultiTableRecoveryIT
extends ConfigurableMacBase {
    @Override
    protected Duration defaultTimeout() {
        return Duration.ofMinutes(5L);
    }

    @Override
    protected void configure(MiniAccumuloConfigImpl cfg, Configuration hadoopCoreSite) {
        cfg.setProperty(Property.INSTANCE_ZK_TIMEOUT, "15s");
        hadoopCoreSite.set("fs.file.impl", RawLocalFileSystem.class.getName());
        cfg.setProperty(Property.TSERV_WAL_SORT_FILE_PREFIX + "compress.type", "none");
    }

    @Test
    public void testRecoveryOverMultipleTables() throws Exception {
        int N = 3;
        try (AccumuloClient c = (AccumuloClient)Accumulo.newClient().from(this.getClientProperties()).build();){
            String[] tables = this.getUniqueNames(3);
            BatchWriter[] writers = new BatchWriter[3];
            byte[][] values = new byte[3][];
            int i = 0;
            System.out.println("Creating tables");
            for (String tableName : tables) {
                c.tableOperations().create(tableName);
                values[i] = Integer.toString(i).getBytes();
                writers[i] = c.createBatchWriter(tableName);
                ++i;
            }
            System.out.println("Creating agitator");
            AtomicBoolean stop = new AtomicBoolean(false);
            Thread agitator = this.agitator(stop);
            agitator.start();
            System.out.println("writing");
            for (i = 0; i < 1000000; ++i) {
                long randomRow = random.nextLong() & Long.MAX_VALUE;
                Assertions.assertTrue((randomRow >= 0L ? 1 : 0) != 0);
                int table = (int)(randomRow % 3L);
                Mutation m = new Mutation((CharSequence)Long.toHexString(randomRow));
                m.put(new byte[0], new byte[0], values[table]);
                writers[table].addMutation(m);
                if (i % 10000 != 0) continue;
                System.out.println("flushing");
                for (int w = 0; w < 3; ++w) {
                    writers[w].flush();
                }
            }
            System.out.println("closing");
            for (int w = 0; w < 3; ++w) {
                writers[w].close();
            }
            System.out.println("stopping the agitator");
            stop.set(true);
            agitator.join();
            System.out.println("checking the data");
            long count = 0L;
            for (int w = 0; w < 3; ++w) {
                try (Scanner scanner = c.createScanner(tables[w], Authorizations.EMPTY);){
                    for (Map.Entry entry : scanner) {
                        int value = Integer.parseInt(((Value)entry.getValue()).toString());
                        Assertions.assertEquals((int)w, (int)value);
                        ++count;
                    }
                    continue;
                }
            }
            Assertions.assertEquals((long)1000000L, (long)count);
        }
    }

    private Thread agitator(AtomicBoolean stop) {
        return new Thread(() -> {
            try (AccumuloClient client = (AccumuloClient)Accumulo.newClient().from(this.getClientProperties()).build();){
                int i = 0;
                while (!stop.get()) {
                    UtilWaitThread.sleepUninterruptibly((long)10L, (TimeUnit)TimeUnit.SECONDS);
                    System.out.println("Restarting");
                    this.getCluster().getClusterControl().stop(ServerType.TABLET_SERVER);
                    this.getCluster().start();
                    try (Scanner scanner = client.createScanner(MetadataTable.NAME, Authorizations.EMPTY);){
                        scanner.forEach((k, v) -> {});
                    }
                    ++i;
                }
                System.out.println("Restarted " + i + " times");
            }
            catch (IOException | InterruptedException | TableNotFoundException ex) {
                log.error("{}", (Object)ex.getMessage(), (Object)ex);
            }
        });
    }
}

