/*
 * Decompiled with CFR 0.152.
 */
package org.apache.accumulo.test.compaction;

import java.time.Duration;
import java.util.ArrayList;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.accumulo.compactor.Compactor;
import org.apache.accumulo.coordinator.CompactionCoordinator;
import org.apache.accumulo.core.client.Accumulo;
import org.apache.accumulo.core.client.AccumuloClient;
import org.apache.accumulo.core.client.IteratorSetting;
import org.apache.accumulo.core.client.admin.CompactionConfig;
import org.apache.accumulo.core.clientImpl.ClientContext;
import org.apache.accumulo.core.compaction.thrift.TCompactionState;
import org.apache.accumulo.core.compaction.thrift.TExternalCompaction;
import org.apache.accumulo.core.compaction.thrift.TExternalCompactionList;
import org.apache.accumulo.core.conf.Property;
import org.apache.accumulo.core.data.TableId;
import org.apache.accumulo.core.iterators.IteratorUtil;
import org.apache.accumulo.core.metadata.StoredTabletFile;
import org.apache.accumulo.core.metadata.schema.TabletMetadata;
import org.apache.accumulo.core.metadata.schema.TabletsMetadata;
import org.apache.accumulo.core.util.UtilWaitThread;
import org.apache.accumulo.core.util.compaction.RunningCompactionInfo;
import org.apache.accumulo.core.util.threads.Threads;
import org.apache.accumulo.harness.AccumuloClusterHarness;
import org.apache.accumulo.minicluster.ServerType;
import org.apache.accumulo.miniclusterImpl.MiniAccumuloConfigImpl;
import org.apache.accumulo.test.compaction.ExternalCompactionTestUtils;
import org.apache.accumulo.test.functional.SlowIterator;
import org.apache.accumulo.test.metrics.TestStatsDRegistryFactory;
import org.apache.accumulo.test.metrics.TestStatsDSink;
import org.apache.accumulo.test.util.Wait;
import org.apache.hadoop.conf.Configuration;
import org.apache.thrift.TException;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.function.Executable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ExternalCompactionProgressIT
extends AccumuloClusterHarness {
    private static final Logger log = LoggerFactory.getLogger(ExternalCompactionProgressIT.class);
    private static final int ROWS = 10000;
    public static final int CHECKER_THREAD_SLEEP_MS = 1000;
    Map<String, RunningCompactionInfo> runningMap = new HashMap<String, RunningCompactionInfo>();
    List<EC_PROGRESS> progressList = new ArrayList<EC_PROGRESS>();
    private static final AtomicBoolean stopCheckerThread = new AtomicBoolean(false);
    private static TestStatsDSink sink;

    @BeforeAll
    public static void before() throws Exception {
        sink = new TestStatsDSink();
    }

    @AfterAll
    public static void after() throws Exception {
        if (sink != null) {
            sink.close();
        }
    }

    @BeforeEach
    public void setup() {
        stopCheckerThread.set(false);
    }

    @Override
    public void configureMiniCluster(MiniAccumuloConfigImpl cfg, Configuration coreSite) {
        ExternalCompactionTestUtils.configureMiniCluster(cfg, coreSite);
        cfg.setProperty(Property.GENERAL_MICROMETER_ENABLED, "true");
        cfg.setProperty(Property.GENERAL_MICROMETER_FACTORY, TestStatsDRegistryFactory.class.getName());
        Map<String, String> sysProps = Map.of("test.meter.registry.host", "127.0.0.1", "test.meter.registry.port", Integer.toString(sink.getPort()));
        cfg.setSystemProperties(sysProps);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testCompactionDurationContinuesAfterCoordinatorStop() throws Exception {
        String table = this.getUniqueNames(1)[0];
        try (AccumuloClient client = (AccumuloClient)Accumulo.newClient().from(ExternalCompactionProgressIT.getCluster().getClientProperties()).build();){
            ExternalCompactionTestUtils.createTable(client, table, "cs1");
            ExternalCompactionTestUtils.writeData(client, table, 10000);
            cluster.getClusterControl().startCompactors(Compactor.class, 1, "DCQ1");
            cluster.getClusterControl().startCoordinator(CompactionCoordinator.class);
            IteratorSetting setting = new IteratorSetting(50, "Slow", SlowIterator.class);
            SlowIterator.setSleepTime(setting, 5L);
            client.tableOperations().attachIterator(table, setting, EnumSet.of(IteratorUtil.IteratorScope.majc));
            log.info("Compacting table");
            ExternalCompactionTestUtils.compact(client, table, 2, "DCQ1", false);
            Wait.waitFor(() -> {
                Map compactions = ExternalCompactionTestUtils.getRunningCompactions((ClientContext)ExternalCompactionProgressIT.getCluster().getServerContext()).getCompactions();
                return compactions == null || compactions.isEmpty();
            }, 30000L, 100L, "Compaction did not start within the expected time");
            long compactionStartTime = System.nanoTime();
            UtilWaitThread.sleepUninterruptibly((long)6L, (TimeUnit)TimeUnit.SECONDS);
            log.info("Stopping the coordinator");
            cluster.getClusterControl().stopAllServers(ServerType.COMPACTION_COORDINATOR);
            UtilWaitThread.sleepUninterruptibly((long)5L, (TimeUnit)TimeUnit.SECONDS);
            log.info("Restarting the coordinator");
            cluster.getClusterControl().startCoordinator(CompactionCoordinator.class);
            long coordinatorRestartTime = System.nanoTime();
            Map metrics = null;
            while (metrics == null) {
                try {
                    metrics = ExternalCompactionTestUtils.getRunningCompactions((ClientContext)ExternalCompactionProgressIT.getCluster().getServerContext()).getCompactions();
                }
                catch (TException e) {
                    UtilWaitThread.sleep((long)250L);
                }
            }
            UtilWaitThread.sleepUninterruptibly((long)6L, (TimeUnit)TimeUnit.SECONDS);
            TExternalCompaction updatedCompaction = (TExternalCompaction)ExternalCompactionTestUtils.getRunningCompactions((ClientContext)ExternalCompactionProgressIT.getCluster().getServerContext()).getCompactions().values().iterator().next();
            RunningCompactionInfo updatedCompactionInfo = new RunningCompactionInfo(updatedCompaction);
            Duration reportedCompactionDuration = Duration.ofNanos(updatedCompactionInfo.duration);
            Duration measuredCompactionDuration = Duration.ofNanos(System.nanoTime() - compactionStartTime);
            Duration coordinatorAge = Duration.ofNanos(System.nanoTime() - coordinatorRestartTime);
            log.info("Coordinator age: {}s. Measured compaction duration: {}s. Reported compaction duration: {}s", new Object[]{coordinatorAge.toSeconds(), measuredCompactionDuration.toSeconds(), reportedCompactionDuration.toSeconds()});
            Assertions.assertTrue((coordinatorAge.compareTo(reportedCompactionDuration) < 0 ? 1 : 0) != 0, (String)"Reported compaction age should be greater than the coordinator age");
            Duration tolerance = Duration.ofSeconds(7L);
            long reportedVsMeasuredDiff = Math.abs(reportedCompactionDuration.minus(measuredCompactionDuration).toNanos());
            Assertions.assertTrue((reportedVsMeasuredDiff <= tolerance.toNanos() ? 1 : 0) != 0, (String)String.format("Reported duration (%s) and elapsed time (%s) differ by more than the tolerance (%s)", reportedCompactionDuration.toSeconds(), measuredCompactionDuration.toSeconds(), tolerance.toSeconds()));
        }
        finally {
            ExternalCompactionProgressIT.getCluster().getClusterControl().stopAllServers(ServerType.COMPACTOR);
            ExternalCompactionProgressIT.getCluster().getClusterControl().stopAllServers(ServerType.COMPACTION_COORDINATOR);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testProgressViaMetrics() throws Exception {
        String table = this.getUniqueNames(1)[0];
        AtomicLong totalEntriesRead = new AtomicLong(0L);
        AtomicLong totalEntriesWritten = new AtomicLong(0L);
        long expectedEntriesRead = 9216L;
        long expectedEntriesWritten = 4096L;
        Thread checkerThread = ExternalCompactionProgressIT.getMetricsCheckerThread(totalEntriesRead, totalEntriesWritten);
        try (AccumuloClient client = (AccumuloClient)Accumulo.newClient().from(ExternalCompactionProgressIT.getCluster().getClientProperties()).build();){
            ExternalCompactionTestUtils.createTable(client, table, "cs1");
            ExternalCompactionTestUtils.writeData(client, table, 10000);
            cluster.getClusterControl().startCompactors(Compactor.class, 1, "DCQ1");
            cluster.getClusterControl().startCoordinator(CompactionCoordinator.class);
            checkerThread.start();
            IteratorSetting setting = new IteratorSetting(50, "Slow", SlowIterator.class);
            SlowIterator.setSleepTime(setting, 1L);
            client.tableOperations().attachIterator(table, setting, EnumSet.of(IteratorUtil.IteratorScope.majc));
            log.info("Compacting table");
            ExternalCompactionTestUtils.compact(client, table, 2, "DCQ1", true);
            Wait.waitFor(() -> {
                if (totalEntriesRead.get() == 9216L && totalEntriesWritten.get() == 4096L) {
                    return true;
                }
                log.info("Waiting for entries read to be {} (currently {}) and entries written to be {} (currently {})", new Object[]{9216L, totalEntriesRead.get(), 4096L, totalEntriesWritten.get()});
                return false;
            }, 30000L, 1000L, "Entries read and written metrics values did not match expected values");
            log.info("Done Compacting table");
            ExternalCompactionTestUtils.verify(client, table, 2, 10000);
        }
        finally {
            stopCheckerThread.set(true);
            checkerThread.join();
            ExternalCompactionProgressIT.getCluster().getClusterControl().stopAllServers(ServerType.COMPACTOR);
            ExternalCompactionProgressIT.getCluster().getClusterControl().stopAllServers(ServerType.COMPACTION_COORDINATOR);
        }
    }

    private static Thread getMetricsCheckerThread(AtomicLong totalEntriesRead, AtomicLong totalEntriesWritten) {
        return Threads.createThread((String)"metric-tailer", () -> {
            log.info("Starting metric tailer");
            sink.getLines().clear();
            block8: while (!stopCheckerThread.get()) {
                List<String> statsDMetrics = sink.getLines();
                for (String s : statsDMetrics) {
                    if (stopCheckerThread.get()) break block8;
                    TestStatsDSink.Metric metric = TestStatsDSink.parseStatsDMetric(s);
                    if (!metric.getName().startsWith("accumulo.compactor.")) continue;
                    int value = Integer.parseInt(metric.getValue());
                    log.debug("Found metric: {} with value: {}", (Object)metric.getName(), (Object)value);
                    switch (metric.getName()) {
                        case "accumulo.compactor.entries.read": {
                            totalEntriesRead.addAndGet(value);
                            break;
                        }
                        case "accumulo.compactor.entries.written": {
                            totalEntriesWritten.addAndGet(value);
                        }
                    }
                }
                UtilWaitThread.sleepUninterruptibly((long)1000L, (TimeUnit)TimeUnit.MILLISECONDS);
            }
            log.info("Metric tailer thread finished");
        });
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testProgress() throws Exception {
        String table1 = this.getUniqueNames(1)[0];
        try (AccumuloClient client = (AccumuloClient)Accumulo.newClient().from(ExternalCompactionProgressIT.getCluster().getClientProperties()).build();){
            ExternalCompactionTestUtils.createTable(client, table1, "cs1");
            ExternalCompactionTestUtils.writeData(client, table1, 10000);
            cluster.getClusterControl().startCompactors(Compactor.class, 1, "DCQ1");
            cluster.getClusterControl().startCoordinator(CompactionCoordinator.class);
            Thread checkerThread = this.startChecker();
            checkerThread.start();
            IteratorSetting setting = new IteratorSetting(50, "Slow", SlowIterator.class);
            SlowIterator.setSleepTime(setting, 1L);
            client.tableOperations().attachIterator(table1, setting, EnumSet.of(IteratorUtil.IteratorScope.majc));
            log.info("Compacting table");
            ExternalCompactionTestUtils.compact(client, table1, 2, "DCQ1", true);
            ExternalCompactionTestUtils.verify(client, table1, 2, 10000);
            log.info("Done Compacting table");
            stopCheckerThread.set(true);
            checkerThread.join();
            this.verifyProgress();
        }
        finally {
            ExternalCompactionProgressIT.getCluster().getClusterControl().stopAllServers(ServerType.COMPACTOR);
            ExternalCompactionProgressIT.getCluster().getClusterControl().stopAllServers(ServerType.COMPACTION_COORDINATOR);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testProgressWithBulkImport() throws Exception {
        String[] tableNames = this.getUniqueNames(2);
        String tableName1 = tableNames[0];
        String tableName2 = tableNames[1];
        try (AccumuloClient client = (AccumuloClient)Accumulo.newClient().from(ExternalCompactionProgressIT.getClientProps()).build();){
            log.info("Creating table " + tableName1);
            ExternalCompactionTestUtils.createTable(client, tableName1, "cs1");
            log.info("Creating table " + tableName2);
            ExternalCompactionTestUtils.createTable(client, tableName2, "cs1");
            log.info("Writing 10000 rows to table " + tableName1);
            ExternalCompactionTestUtils.writeData(client, tableName1, 10000);
            log.info("Writing 10000 rows to table " + tableName2);
            ExternalCompactionTestUtils.writeData(client, tableName2, 10000);
            client.tableOperations().setProperty(tableName1, Property.TABLE_MAJC_RATIO.getKey(), "1000");
            client.tableOperations().setProperty(tableName2, Property.TABLE_MAJC_RATIO.getKey(), "1000");
            ExternalCompactionProgressIT.getCluster().getClusterControl().startCoordinator(CompactionCoordinator.class);
            ExternalCompactionProgressIT.getCluster().getClusterControl().startCompactors(Compactor.class, 1, "DCQ1");
            String dir = this.getDir(client, tableName1);
            log.info("Bulk importing files in dir " + dir + " to table " + tableName2);
            client.tableOperations().importDirectory(dir).to(tableName2).load();
            log.info("Finished bulk import");
            log.info("Starting a compaction progress checker thread");
            Thread checkerThread = this.startChecker();
            checkerThread.start();
            log.info("Attaching a slow iterator to table " + tableName2);
            IteratorSetting setting = new IteratorSetting(50, "Slow", SlowIterator.class);
            SlowIterator.setSleepTime(setting, 1L);
            log.info("Compacting table " + tableName2);
            client.tableOperations().compact(tableName2, new CompactionConfig().setWait(true).setIterators(List.of(setting)));
            log.info("Finished compacting table " + tableName2);
            stopCheckerThread.set(true);
            log.info("Waiting on progress checker thread");
            checkerThread.join();
            this.verifyProgress();
        }
        finally {
            ExternalCompactionProgressIT.getCluster().getClusterControl().stopAllServers(ServerType.COMPACTOR);
            ExternalCompactionProgressIT.getCluster().getClusterControl().stopAllServers(ServerType.COMPACTION_COORDINATOR);
        }
    }

    private String getDir(AccumuloClient client, String tableName) {
        TableId tableId = TableId.of((String)((String)client.tableOperations().tableIdMap().get(tableName)));
        try (TabletsMetadata tabletsMeta = TabletsMetadata.builder((AccumuloClient)client).forTable(tableId).fetch(new TabletMetadata.ColumnType[]{TabletMetadata.ColumnType.FILES}).build();){
            String string = ((StoredTabletFile)((TabletMetadata)tabletsMeta.iterator().next()).getFiles().iterator().next()).getPath().getParent().toUri().getRawPath();
            return string;
        }
    }

    public Thread startChecker() {
        return Threads.createThread((String)"RC checker", () -> {
            try {
                while (!stopCheckerThread.get()) {
                    this.checkRunning();
                    UtilWaitThread.sleepUninterruptibly((long)1000L, (TimeUnit)TimeUnit.MILLISECONDS);
                }
            }
            catch (TException e) {
                log.warn("{}", (Object)e.getMessage(), (Object)e);
            }
        });
    }

    private void checkRunning() throws TException {
        TExternalCompactionList ecList = ExternalCompactionTestUtils.getRunningCompactions((ClientContext)ExternalCompactionProgressIT.getCluster().getServerContext());
        Map ecMap = ecList.getCompactions();
        if (ecMap != null) {
            ecMap.forEach((ecid, ec) -> {
                RunningCompactionInfo rci = new RunningCompactionInfo(ec);
                RunningCompactionInfo previousRci = this.runningMap.put((String)ecid, rci);
                log.debug("ECID {} has been running for {} seconds", ecid, (Object)TimeUnit.NANOSECONDS.toSeconds(rci.duration));
                if (previousRci == null) {
                    log.debug("New ECID {} with inputFiles: {}", ecid, (Object)rci.numFiles);
                } else {
                    if (rci.progress <= previousRci.progress) {
                        log.warn("{} did not progress. It went from {} to {}", new Object[]{ecid, Float.valueOf(previousRci.progress), Float.valueOf(rci.progress)});
                    } else {
                        log.debug("{} progressed from {} to {}", new Object[]{ecid, Float.valueOf(previousRci.progress), Float.valueOf(rci.progress)});
                        if (rci.progress > 0.0f && rci.progress <= 25.0f) {
                            this.progressList.add(EC_PROGRESS.STARTED);
                        } else if (rci.progress > 25.0f && rci.progress <= 50.0f) {
                            this.progressList.add(EC_PROGRESS.QUARTER);
                        } else if (rci.progress > 50.0f && rci.progress <= 75.0f) {
                            this.progressList.add(EC_PROGRESS.HALF);
                        } else if (rci.progress > 75.0f && rci.progress <= 100.0f) {
                            this.progressList.add(EC_PROGRESS.THREE_QUARTERS);
                        } else {
                            this.progressList.add(EC_PROGRESS.INVALID);
                            log.warn("An invalid progress {} has been seen. This should never occur.", (Object)Float.valueOf(rci.progress));
                        }
                    }
                    if (!rci.status.equals(TCompactionState.IN_PROGRESS.name())) {
                        log.debug("Saw status other than IN_PROGRESS: {}", (Object)rci.status);
                    }
                }
            });
        }
    }

    private void verifyProgress() {
        log.info("Verify Progress.");
        Assertions.assertAll((Executable[])new Executable[]{() -> Assertions.assertTrue((boolean)this.progressList.contains((Object)EC_PROGRESS.STARTED), (String)"Missing start of progress"), () -> Assertions.assertTrue((boolean)this.progressList.contains((Object)EC_PROGRESS.QUARTER), (String)"Missing quarter progress"), () -> Assertions.assertTrue((boolean)this.progressList.contains((Object)EC_PROGRESS.HALF), (String)"Missing half progress"), () -> Assertions.assertTrue((boolean)this.progressList.contains((Object)EC_PROGRESS.THREE_QUARTERS), (String)"Missing three quarters progress"), () -> Assertions.assertFalse((boolean)this.progressList.contains((Object)EC_PROGRESS.INVALID), (String)"Invalid progress seen")});
    }

    static enum EC_PROGRESS {
        STARTED,
        QUARTER,
        HALF,
        THREE_QUARTERS,
        INVALID;

    }
}

