/*
 * Decompiled with CFR 0.152.
 */
package org.apache.accumulo.test.functional;

import com.google.common.base.Preconditions;
import com.google.common.collect.Iterators;
import com.google.common.collect.Lists;
import java.io.IOException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Objects;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.accumulo.core.client.Accumulo;
import org.apache.accumulo.core.client.AccumuloClient;
import org.apache.accumulo.core.client.AccumuloException;
import org.apache.accumulo.core.client.AccumuloSecurityException;
import org.apache.accumulo.core.client.BatchWriter;
import org.apache.accumulo.core.client.IteratorSetting;
import org.apache.accumulo.core.client.Scanner;
import org.apache.accumulo.core.client.ScannerBase;
import org.apache.accumulo.core.client.TableNotFoundException;
import org.apache.accumulo.core.client.admin.CompactionConfig;
import org.apache.accumulo.core.client.admin.NewTableConfiguration;
import org.apache.accumulo.core.client.admin.PluginConfig;
import org.apache.accumulo.core.client.admin.compaction.CompactionSelector;
import org.apache.accumulo.core.client.admin.compaction.CompressionConfigurer;
import org.apache.accumulo.core.clientImpl.ClientContext;
import org.apache.accumulo.core.conf.Property;
import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.Mutation;
import org.apache.accumulo.core.data.TableId;
import org.apache.accumulo.core.data.Value;
import org.apache.accumulo.core.iterators.DevNull;
import org.apache.accumulo.core.iterators.Filter;
import org.apache.accumulo.core.iterators.IteratorEnvironment;
import org.apache.accumulo.core.iterators.IteratorUtil;
import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
import org.apache.accumulo.core.iterators.user.AgeOffFilter;
import org.apache.accumulo.core.iterators.user.GrepIterator;
import org.apache.accumulo.core.metadata.MetadataTable;
import org.apache.accumulo.core.metadata.schema.Ample;
import org.apache.accumulo.core.metadata.schema.MetadataSchema;
import org.apache.accumulo.core.metadata.schema.TabletMetadata;
import org.apache.accumulo.core.metadata.schema.TabletsMetadata;
import org.apache.accumulo.core.security.Authorizations;
import org.apache.accumulo.core.spi.compaction.DefaultCompactionPlanner;
import org.apache.accumulo.core.spi.compaction.SimpleCompactionDispatcher;
import org.apache.accumulo.harness.AccumuloClusterHarness;
import org.apache.accumulo.miniclusterImpl.MiniAccumuloConfigImpl;
import org.apache.accumulo.test.VerifyIngest;
import org.apache.accumulo.test.compaction.CompactionExecutorIT;
import org.apache.accumulo.test.compaction.ExternalCompaction_1_IT;
import org.apache.accumulo.test.functional.ErrorThrowingIterator;
import org.apache.accumulo.test.functional.FunctionalTestUtils;
import org.apache.accumulo.test.functional.ReadWriteIT;
import org.apache.accumulo.test.functional.SlowIterator;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RawLocalFileSystem;
import org.apache.hadoop.io.Text;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class CompactionIT
extends AccumuloClusterHarness {
    private static final Logger log = LoggerFactory.getLogger(CompactionIT.class);
    private static final int MAX_DATA = 1000;

    @Override
    protected Duration defaultTimeout() {
        return Duration.ofMinutes(4L);
    }

    @Override
    public void configureMiniCluster(MiniAccumuloConfigImpl cfg, Configuration hadoopCoreSite) {
        cfg.setProperty(Property.INSTANCE_ZK_TIMEOUT, "15s");
        cfg.setProperty(Property.TSERV_MAJC_THREAD_MAXOPEN, "4");
        cfg.setProperty(Property.TSERV_MAJC_DELAY, "1");
        cfg.setProperty(Property.TSERV_MAJC_MAXCONCURRENT, "1");
        hadoopCoreSite.set("fs.file.impl", RawLocalFileSystem.class.getName());
    }

    @Test
    public void testBadSelector() throws Exception {
        try (AccumuloClient c = (AccumuloClient)Accumulo.newClient().from(CompactionIT.getClientProps()).build();){
            String tableName = this.getUniqueNames(1)[0];
            NewTableConfiguration tc = new NewTableConfiguration();
            tc.setProperties(Map.of(Property.TABLE_MAJC_RATIO.getKey(), "10.0"));
            c.tableOperations().create(tableName, tc);
            try (BatchWriter bw = c.createBatchWriter(tableName);){
                for (int i = 1; i <= 4; ++i) {
                    Mutation m = new Mutation((CharSequence)Integer.toString(i));
                    m.put((CharSequence)"cf", (CharSequence)"cq", new Value());
                    bw.addMutation(m);
                    bw.flush();
                    c.tableOperations().flush(tableName, null, null, true);
                }
            }
            List<String> files = FunctionalTestUtils.getRFilePaths(c, tableName);
            Assertions.assertEquals((int)4, (int)files.size());
            String subset = files.get(0).substring(files.get(0).lastIndexOf(47) + 1) + "," + files.get(3).substring(files.get(3).lastIndexOf(47) + 1);
            CompactionConfig config = new CompactionConfig().setSelector(new PluginConfig(RandomErrorThrowingSelector.class.getName(), Map.of("filesToCompact", subset))).setWait(true);
            c.tableOperations().compact(tableName, config);
            List<String> filesAfterCompact = FunctionalTestUtils.getRFilePaths(c, tableName);
            Assertions.assertFalse((boolean)filesAfterCompact.contains(files.get(0)));
            Assertions.assertTrue((boolean)filesAfterCompact.contains(files.get(1)));
            Assertions.assertTrue((boolean)filesAfterCompact.contains(files.get(2)));
            Assertions.assertFalse((boolean)filesAfterCompact.contains(files.get(3)));
            ArrayList rows = new ArrayList();
            c.createScanner(tableName).forEach((k, v) -> rows.add(k.getRow().toString()));
            Assertions.assertEquals(List.of("1", "2", "3", "4"), rows);
        }
    }

    @Test
    public void testCompactionWithTableIterator() throws Exception {
        String table1 = this.getUniqueNames(1)[0];
        try (AccumuloClient client = (AccumuloClient)Accumulo.newClient().from(CompactionIT.getClientProps()).build();){
            client.tableOperations().create(table1);
            try (BatchWriter bw = client.createBatchWriter(table1);){
                for (int i = 1; i <= 4; ++i) {
                    Mutation m = new Mutation((CharSequence)Integer.toString(i));
                    m.put((CharSequence)"cf", (CharSequence)"cq", new Value());
                    bw.addMutation(m);
                    bw.flush();
                    client.tableOperations().flush(table1, null, null, true);
                }
            }
            IteratorSetting setting = new IteratorSetting(50, "delete", DevNull.class);
            client.tableOperations().attachIterator(table1, setting, EnumSet.of(IteratorUtil.IteratorScope.majc));
            client.tableOperations().compact(table1, new CompactionConfig().setWait(true));
            try (Scanner s = client.createScanner(table1);){
                Assertions.assertFalse((boolean)s.iterator().hasNext());
            }
        }
    }

    @Test
    public void testUserCompactionCancellation() throws Exception {
        String table1 = this.getUniqueNames(1)[0];
        try (AccumuloClient client = (AccumuloClient)Accumulo.newClient().from(CompactionIT.getClientProps()).build();){
            client.tableOperations().create(table1);
            try (BatchWriter bw = client.createBatchWriter(table1);){
                for (int i = 1; i <= 1000; ++i) {
                    Mutation m = new Mutation((CharSequence)Integer.toString(i));
                    m.put((CharSequence)"cf", (CharSequence)"cq", new Value());
                    bw.addMutation(m);
                    bw.flush();
                    client.tableOperations().flush(table1, null, null, true);
                }
            }
            AtomicReference error = new AtomicReference();
            AtomicBoolean started = new AtomicBoolean(false);
            Thread t = new Thread(() -> {
                try {
                    started.set(true);
                    IteratorSetting setting = new IteratorSetting(50, "sleepy", SlowIterator.class);
                    setting.addOption("sleepTime", "3000");
                    setting.addOption("seekSleepTime", "3000");
                    client.tableOperations().attachIterator(table1, setting, EnumSet.of(IteratorUtil.IteratorScope.majc));
                    client.tableOperations().compact(table1, new CompactionConfig().setWait(true));
                }
                catch (AccumuloException | AccumuloSecurityException | TableNotFoundException e) {
                    error.set(e);
                }
            });
            t.start();
            while (!started.get()) {
                Thread.sleep(1000L);
            }
            client.tableOperations().cancelCompaction(table1);
            t.join();
            Exception e = (Exception)error.get();
            Assertions.assertNotNull((Object)e);
            Assertions.assertEquals((Object)"Compaction canceled", (Object)e.getMessage());
        }
    }

    @Test
    public void testErrorDuringUserCompaction() throws Exception {
        String table1 = this.getUniqueNames(1)[0];
        try (AccumuloClient client = (AccumuloClient)Accumulo.newClient().from(CompactionIT.getClientProps()).build();){
            client.tableOperations().create(table1);
            client.tableOperations().setProperty(table1, Property.TABLE_FILE_MAX.getKey(), "1001");
            client.tableOperations().setProperty(table1, Property.TABLE_MAJC_RATIO.getKey(), "1001");
            TableId tid = TableId.of((String)((String)client.tableOperations().tableIdMap().get(table1)));
            ReadWriteIT.ingest(client, 1000, 1, 1, 0, "colf", table1, 1);
            Ample ample = ((ClientContext)client).getAmple();
            TabletsMetadata tms = ample.readTablets().forTable(tid).fetch(new TabletMetadata.ColumnType[]{TabletMetadata.ColumnType.FILES}).build();
            TabletMetadata tm = (TabletMetadata)tms.iterator().next();
            Assertions.assertEquals((int)1000, (int)tm.getFiles().size());
            IteratorSetting setting = new IteratorSetting(50, "error", ErrorThrowingIterator.class);
            setting.addOption("error.throwing.iterator.times", "3");
            client.tableOperations().attachIterator(table1, setting, EnumSet.of(IteratorUtil.IteratorScope.majc));
            client.tableOperations().compact(table1, new CompactionConfig().setWait(true));
            tms = ample.readTablets().forTable(tid).fetch(new TabletMetadata.ColumnType[]{TabletMetadata.ColumnType.FILES}).build();
            tm = (TabletMetadata)tms.iterator().next();
            Assertions.assertEquals((int)1, (int)tm.getFiles().size());
            ReadWriteIT.verify(client, 1000, 1, 1, 0, table1);
        }
    }

    @Test
    public void testErrorDuringCompactionNoOutput() throws Exception {
        String table1 = this.getUniqueNames(1)[0];
        try (AccumuloClient client = (AccumuloClient)Accumulo.newClient().from(CompactionIT.getClientProps()).build();){
            client.tableOperations().create(table1);
            client.tableOperations().setProperty(table1, Property.TABLE_FILE_MAX.getKey(), "1001");
            client.tableOperations().setProperty(table1, Property.TABLE_MAJC_RATIO.getKey(), "51");
            TableId tid = TableId.of((String)((String)client.tableOperations().tableIdMap().get(table1)));
            ReadWriteIT.ingest(client, 50, 1, 1, 0, "colf", table1, 1);
            ReadWriteIT.verify(client, 50, 1, 1, 0, table1);
            Ample ample = ((ClientContext)client).getAmple();
            TabletsMetadata tms = ample.readTablets().forTable(tid).fetch(new TabletMetadata.ColumnType[]{TabletMetadata.ColumnType.FILES}).build();
            TabletMetadata tm = (TabletMetadata)tms.iterator().next();
            Assertions.assertEquals((int)50, (int)tm.getFiles().size());
            IteratorSetting setting = new IteratorSetting(50, "ageoff", AgeOffFilter.class);
            setting.addOption("ttl", "0");
            setting.addOption("currentTime", Long.toString(System.currentTimeMillis() + 86400L));
            client.tableOperations().attachIterator(table1, setting, EnumSet.of(IteratorUtil.IteratorScope.majc));
            IteratorSetting setting2 = new IteratorSetting(51, "error", ErrorThrowingIterator.class);
            setting2.addOption("error.throwing.iterator.times", "3");
            client.tableOperations().attachIterator(table1, setting2, EnumSet.of(IteratorUtil.IteratorScope.majc));
            client.tableOperations().compact(table1, new CompactionConfig().setWait(true));
            Assertions.assertThrows(NoSuchElementException.class, () -> ample.readTablets().forTable(tid).fetch(new TabletMetadata.ColumnType[]{TabletMetadata.ColumnType.FILES}).build().iterator().next());
            Assertions.assertEquals((long)0L, (long)client.createScanner(table1).stream().count());
        }
    }

    @Test
    public void testTableDeletedDuringUserCompaction() throws Exception {
        String table1 = this.getUniqueNames(1)[0];
        try (AccumuloClient client = (AccumuloClient)Accumulo.newClient().from(CompactionIT.getClientProps()).build();){
            client.tableOperations().create(table1);
            try (BatchWriter bw = client.createBatchWriter(table1);){
                for (int i = 1; i <= 1000; ++i) {
                    Mutation m = new Mutation((CharSequence)Integer.toString(i));
                    m.put((CharSequence)"cf", (CharSequence)"cq", new Value());
                    bw.addMutation(m);
                    bw.flush();
                    client.tableOperations().flush(table1, null, null, true);
                }
            }
            AtomicReference error = new AtomicReference();
            AtomicBoolean started = new AtomicBoolean(false);
            Thread t = new Thread(() -> {
                try {
                    started.set(true);
                    IteratorSetting setting = new IteratorSetting(50, "sleepy", SlowIterator.class);
                    setting.addOption("sleepTime", "3000");
                    setting.addOption("seekSleepTime", "3000");
                    client.tableOperations().attachIterator(table1, setting, EnumSet.of(IteratorUtil.IteratorScope.majc));
                    client.tableOperations().compact(table1, new CompactionConfig().setWait(true));
                }
                catch (AccumuloException | AccumuloSecurityException | TableNotFoundException e) {
                    error.set(e);
                }
            });
            t.start();
            while (!started.get()) {
                Thread.sleep(1000L);
            }
            client.tableOperations().delete(table1);
            t.join();
            Exception e = (Exception)error.get();
            Assertions.assertNotNull((Object)e);
            Assertions.assertEquals((Object)"Compaction canceled", (Object)e.getMessage());
        }
    }

    @Test
    public void testPartialCompaction() throws Exception {
        String tableName = this.getUniqueNames(1)[0];
        try (AccumuloClient client = (AccumuloClient)Accumulo.newClient().from(CompactionIT.getClientProps()).build();){
            client.tableOperations().create(tableName);
            try (BatchWriter bw = client.createBatchWriter(tableName);){
                for (int i = 0; i < 1000; ++i) {
                    Mutation m = new Mutation((CharSequence)String.format("r:%04d", i));
                    m.put((CharSequence)"", (CharSequence)"", (CharSequence)("" + i));
                    bw.addMutation(m);
                }
            }
            client.tableOperations().flush(tableName);
            IteratorSetting iterSetting = new IteratorSetting(100, TestFilter.class);
            iterSetting.addOption("modulus", "17");
            CompactionConfig config = new CompactionConfig().setIterators(List.of(iterSetting)).setWait(true);
            client.tableOperations().compact(tableName, config);
            try (BatchWriter bw = client.createBatchWriter(tableName);){
                for (int i = 1000; i < 2000; ++i) {
                    Mutation m = new Mutation((CharSequence)String.format("r:%04d", i));
                    m.put((CharSequence)"", (CharSequence)"", (CharSequence)("" + i));
                    bw.addMutation(m);
                }
            }
            client.tableOperations().flush(tableName);
            iterSetting = new IteratorSetting(100, TestFilter.class);
            iterSetting.addOption("pmodulus", "19");
            config = new CompactionConfig().setIterators(List.of(iterSetting)).setWait(true).setSelector(new PluginConfig(ExternalCompaction_1_IT.FSelector.class.getName()));
            client.tableOperations().compact(tableName, config);
            try (Scanner scanner = client.createScanner(tableName);){
                int count = 0;
                for (Map.Entry entry : scanner) {
                    int v = Integer.parseInt(((Value)entry.getValue()).toString());
                    int modulus = v < 1000 ? 17 : 19;
                    Assertions.assertEquals((int)0, (int)(Integer.parseInt(((Value)entry.getValue()).toString()) % modulus), (String)String.format("%s %s %d != 0", entry.getValue(), "%", modulus));
                    ++count;
                }
                int expectedCount = 0;
                for (int i = 0; i < 2000; ++i) {
                    int modulus;
                    int n = modulus = i < 1000 ? 17 : 19;
                    if (i % modulus != 0) continue;
                    ++expectedCount;
                }
                Assertions.assertEquals((int)expectedCount, (int)count);
            }
        }
    }

    @Test
    public void testConfigurer() throws Exception {
        String tableName = this.getUniqueNames(1)[0];
        try (AccumuloClient client = (AccumuloClient)Accumulo.newClient().from(CompactionIT.getClientProps()).build();){
            Map<String, String> props = Map.of(Property.TABLE_FILE_COMPRESSION_TYPE.getKey(), "none");
            NewTableConfiguration ntc = new NewTableConfiguration().setProperties(props);
            client.tableOperations().create(tableName, ntc);
            byte[] data = new byte[100000];
            Arrays.fill(data, (byte)65);
            try (BatchWriter writer = client.createBatchWriter(tableName);){
                for (int row = 0; row < 10; ++row) {
                    Mutation m = new Mutation((CharSequence)("" + row));
                    m.at().family((CharSequence)"big").qualifier((CharSequence)"stuff").put(data);
                    writer.addMutation(m);
                }
            }
            client.tableOperations().flush(tableName, null, null, true);
            long sizes = CompactionExecutorIT.getFileSizes(client, tableName);
            Assertions.assertTrue((sizes > (long)(data.length * 10) && sizes < (long)(data.length * 11) ? 1 : 0) != 0, (String)("Unexpected files sizes : " + sizes));
            client.tableOperations().compact(tableName, new CompactionConfig().setWait(true).setConfigurer(new PluginConfig(CompressionConfigurer.class.getName(), Map.of("large.compress.type", "gz", "large.compress.threshold", "" + data.length))));
            sizes = CompactionExecutorIT.getFileSizes(client, tableName);
            Assertions.assertTrue((sizes < (long)data.length ? 1 : 0) != 0, (String)("Unexpected files sizes: data: " + data.length + ", file:" + sizes));
            client.tableOperations().compact(tableName, new CompactionConfig().setWait(true));
            sizes = CompactionExecutorIT.getFileSizes(client, tableName);
            Assertions.assertTrue((sizes > (long)(data.length * 10) && sizes < (long)(data.length * 11) ? 1 : 0) != 0, (String)("Unexpected files sizes : " + sizes));
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testSuccessfulCompaction() throws Exception {
        try (AccumuloClient c = (AccumuloClient)Accumulo.newClient().from(CompactionIT.getClientProps()).build();){
            String tableName = this.getUniqueNames(1)[0];
            c.tableOperations().create(tableName);
            c.tableOperations().setProperty(tableName, Property.TABLE_MAJC_RATIO.getKey(), "1.0");
            FileSystem fs = CompactionIT.getFileSystem();
            Path root = new Path(cluster.getTemporaryPath(), this.getClass().getName());
            fs.deleteOnExit(root);
            Path testrf = new Path(root, "testrf");
            fs.deleteOnExit(testrf);
            FunctionalTestUtils.createRFiles(c, fs, testrf.toString(), 500000, 59, 4);
            c.tableOperations().importDirectory(testrf.toString()).to(tableName).load();
            int beforeCount = this.countFiles(c, tableName);
            AtomicBoolean fail = new AtomicBoolean(false);
            int THREADS = 5;
            for (int count = 0; count < 5; ++count) {
                ExecutorService executor = Executors.newFixedThreadPool(5);
                int span = 8474;
                for (int i = 0; i < 500000; i += 8474) {
                    int finalI = i;
                    Runnable r = () -> {
                        try {
                            VerifyIngest.VerifyParams params = new VerifyIngest.VerifyParams(CompactionIT.getClientProps(), tableName, 8474);
                            params.startRow = finalI;
                            params.random = 56;
                            params.dataSize = 50;
                            params.cols = 1;
                            VerifyIngest.verifyIngest(c, params);
                        }
                        catch (Exception ex) {
                            log.warn("Got exception verifying data", (Throwable)ex);
                            fail.set(true);
                        }
                    };
                    executor.execute(r);
                }
                executor.shutdown();
                executor.awaitTermination(this.defaultTimeout().toSeconds(), TimeUnit.SECONDS);
                Assertions.assertFalse((boolean)fail.get(), (String)"Failed to successfully run all threads, Check the test output for error");
            }
            int finalCount = this.countFiles(c, tableName);
            Assertions.assertTrue((finalCount < beforeCount ? 1 : 0) != 0);
            try {
                CompactionIT.getClusterControl().adminStopAll();
            }
            finally {
                CompactionIT.getCluster().stop();
                if (CompactionIT.getClusterType() == AccumuloClusterHarness.ClusterType.STANDALONE) {
                    CompactionIT.getCluster().start();
                }
            }
        }
    }

    @Test
    public void testMultiStepCompactionThatDeletesAll() throws Exception {
        try (AccumuloClient c = (AccumuloClient)Accumulo.newClient().from(CompactionIT.getClientProps()).build();){
            String tableName = this.getUniqueNames(1)[0];
            c.tableOperations().create(tableName);
            c.tableOperations().setProperty(tableName, Property.TABLE_FILE_MAX.getKey(), "1001");
            c.tableOperations().setProperty(tableName, Property.TABLE_MAJC_RATIO.getKey(), "100.0");
            int beforeCount = this.countFiles(c, tableName);
            int NUM_ENTRIES_AND_FILES = 60;
            try (BatchWriter writer = c.createBatchWriter(tableName);){
                for (int i = 0; i < 60; ++i) {
                    Mutation m = new Mutation((CharSequence)("r" + i));
                    m.put((CharSequence)"f1", (CharSequence)"q1", (CharSequence)("v" + i));
                    writer.addMutation(m);
                    writer.flush();
                    c.tableOperations().flush(tableName, null, null, true);
                }
            }
            try (Scanner scanner = c.createScanner(tableName);){
                Assertions.assertEquals((long)60L, (long)scanner.stream().count());
            }
            int afterCount = this.countFiles(c, tableName);
            Assertions.assertTrue((afterCount >= beforeCount + 60 ? 1 : 0) != 0);
            CompactionConfig comactionConfig = new CompactionConfig();
            IteratorSetting iter = new IteratorSetting(100, GrepIterator.class);
            GrepIterator.setTerm((IteratorSetting)iter, (String)"keep");
            comactionConfig.setIterators(List.of(iter));
            comactionConfig.setWait(true);
            c.tableOperations().compact(tableName, comactionConfig);
            try (Scanner scanner = c.createScanner(tableName);){
                Assertions.assertEquals((long)0L, (long)scanner.stream().count());
            }
            int finalCount = this.countFiles(c, tableName);
            Assertions.assertTrue((finalCount <= beforeCount ? 1 : 0) != 0);
        }
    }

    @Test
    public void testDeleteCompactionService() throws Exception {
        try (AccumuloClient c = (AccumuloClient)Accumulo.newClient().from(CompactionIT.getClientProps()).build();){
            String[] uniqueNames = this.getUniqueNames(2);
            String table1 = uniqueNames[0];
            String table2 = uniqueNames[1];
            c.instanceOperations().setProperty(Property.TSERV_COMPACTION_SERVICE_PREFIX.getKey() + "deleteme.planner", DefaultCompactionPlanner.class.getName());
            c.instanceOperations().setProperty(Property.TSERV_COMPACTION_SERVICE_PREFIX.getKey() + "deleteme.planner.opts.executors", "[{'name':'all','type':'internal','numThreads':1}]".replaceAll("'", "\""));
            c.instanceOperations().setProperty(Property.TSERV_COMPACTION_SERVICE_PREFIX.getKey() + "keepme.planner", DefaultCompactionPlanner.class.getName());
            c.instanceOperations().setProperty(Property.TSERV_COMPACTION_SERVICE_PREFIX.getKey() + "keepme.planner.opts.executors", "[{'name':'all','type':'internal','numThreads':1}]".replaceAll("'", "\""));
            HashMap<Object, String> props = new HashMap<Object, String>();
            props.put(Property.TABLE_COMPACTION_DISPATCHER.getKey(), SimpleCompactionDispatcher.class.getName());
            props.put(Property.TABLE_COMPACTION_DISPATCHER_OPTS.getKey() + "service", "deleteme");
            c.tableOperations().create(table1, new NewTableConfiguration().setProperties(props));
            props.clear();
            props.put(Property.TABLE_COMPACTION_DISPATCHER.getKey(), SimpleCompactionDispatcher.class.getName());
            props.put(Property.TABLE_COMPACTION_DISPATCHER_OPTS.getKey() + "service", "keepme");
            c.tableOperations().create(table2, new NewTableConfiguration().setProperties(props));
            try (BatchWriter writer1 = c.createBatchWriter(table1);
                 BatchWriter writer2 = c.createBatchWriter(table2);){
                for (int i = 0; i < 10; ++i) {
                    Mutation m = new Mutation((CharSequence)("" + i));
                    m.put((CharSequence)"f", (CharSequence)"q", (CharSequence)("" + i));
                    writer1.addMutation(m);
                    writer2.addMutation(m);
                }
            }
            c.tableOperations().compact(table1, new CompactionConfig().setWait(true));
            c.tableOperations().compact(table2, new CompactionConfig().setWait(true));
            c.instanceOperations().removeProperty(Property.TSERV_COMPACTION_SERVICE_PREFIX.getKey() + "deleteme.planner");
            c.instanceOperations().removeProperty(Property.TSERV_COMPACTION_SERVICE_PREFIX.getKey() + "deleteme.planner.opts.executors");
            c.instanceOperations().setProperty(Property.TSERV_COMPACTION_SERVICE_PREFIX.getKey() + "newcs.planner", DefaultCompactionPlanner.class.getName());
            c.instanceOperations().setProperty(Property.TSERV_COMPACTION_SERVICE_PREFIX.getKey() + "newcs.planner.opts.executors", "[{'name':'all','type':'internal','numThreads':1}]".replaceAll("'", "\""));
            c.tableOperations().setProperty(table1, Property.TABLE_COMPACTION_DISPATCHER_OPTS.getKey() + "service", "newcs");
            for (int i = 0; i < 10; ++i) {
                c.tableOperations().compact(table1, new CompactionConfig().setWait(true));
                c.tableOperations().compact(table2, new CompactionConfig().setWait(true));
                try (Scanner scanner = c.createScanner(table1);){
                    Assertions.assertEquals((int)45, (int)scanner.stream().map(Map.Entry::getValue).mapToInt(v -> Integer.parseInt(v.toString())).sum());
                }
                scanner = c.createScanner(table2);
                try {
                    Assertions.assertEquals((int)45, (int)scanner.stream().map(Map.Entry::getValue).mapToInt(v -> Integer.parseInt(v.toString())).sum());
                }
                finally {
                    if (scanner != null) {
                        scanner.close();
                    }
                }
                Thread.sleep(100L);
            }
        }
    }

    private int countFiles(AccumuloClient c, String tableName) throws Exception {
        TableId tableId = CompactionIT.getCluster().getServerContext().getTableId(tableName);
        try (Scanner s = c.createScanner(MetadataTable.NAME, Authorizations.EMPTY);){
            s.setRange(MetadataSchema.TabletsSection.getRange((TableId)tableId));
            MetadataSchema.TabletsSection.TabletColumnFamily.PREV_ROW_COLUMN.fetch((ScannerBase)s);
            s.fetchColumnFamily(new Text(MetadataSchema.TabletsSection.DataFileColumnFamily.NAME));
            int n = Iterators.size((Iterator)s.iterator());
            return n;
        }
    }

    public static class RandomErrorThrowingSelector
    implements CompactionSelector {
        public static final String FILE_LIST_PARAM = "filesToCompact";
        private static Boolean ERROR_THROWN = Boolean.FALSE;
        private List<String> filesToCompact;

        public void init(CompactionSelector.InitParameters iparams) {
            String files = (String)iparams.getOptions().get(FILE_LIST_PARAM);
            Objects.requireNonNull(files);
            Object[] f = files.split(",");
            this.filesToCompact = Lists.newArrayList((Object[])f);
        }

        public CompactionSelector.Selection select(CompactionSelector.SelectionParameters sparams) {
            if (!ERROR_THROWN.booleanValue()) {
                ERROR_THROWN = Boolean.TRUE;
                throw new RuntimeException("Exception for test");
            }
            ArrayList matches = new ArrayList();
            sparams.getAvailableFiles().forEach(cf -> {
                if (this.filesToCompact.contains(cf.getFileName())) {
                    matches.add(cf);
                }
            });
            return new CompactionSelector.Selection(matches);
        }
    }

    public static class TestFilter
    extends Filter {
        int modulus = 1;

        public void init(SortedKeyValueIterator<Key, Value> source, Map<String, String> options, IteratorEnvironment env) throws IOException {
            super.init(source, options, env);
            if (options.containsKey("modulus")) {
                Preconditions.checkArgument((!options.containsKey("pmodulus") ? 1 : 0) != 0);
                this.modulus = Integer.parseInt(options.get("modulus"));
            }
            if (options.containsKey("pmodulus")) {
                Preconditions.checkArgument((!options.containsKey("modulus") ? 1 : 0) != 0);
                this.modulus = Integer.parseInt(options.get("pmodulus"));
            }
        }

        public boolean accept(Key k, Value v) {
            return Integer.parseInt(v.toString()) % this.modulus == 0;
        }
    }
}

