/*
 * Decompiled with CFR 0.152.
 */
package org.apache.accumulo.test.functional;

import java.time.Duration;
import java.util.EnumSet;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.accumulo.compactor.Compactor;
import org.apache.accumulo.coordinator.CompactionCoordinator;
import org.apache.accumulo.core.client.Accumulo;
import org.apache.accumulo.core.client.AccumuloClient;
import org.apache.accumulo.core.client.IteratorSetting;
import org.apache.accumulo.core.client.Scanner;
import org.apache.accumulo.core.client.ScannerBase;
import org.apache.accumulo.core.conf.Property;
import org.apache.accumulo.core.iterators.IteratorUtil;
import org.apache.accumulo.core.security.Authorizations;
import org.apache.accumulo.harness.MiniClusterConfigurationCallback;
import org.apache.accumulo.harness.SharedMiniClusterBase;
import org.apache.accumulo.minicluster.ServerType;
import org.apache.accumulo.miniclusterImpl.MiniAccumuloConfigImpl;
import org.apache.accumulo.test.compaction.ExternalCompactionTestUtils;
import org.apache.accumulo.test.functional.SlowIterator;
import org.apache.accumulo.test.metrics.TestStatsDRegistryFactory;
import org.apache.accumulo.test.metrics.TestStatsDSink;
import org.apache.accumulo.test.util.Wait;
import org.apache.hadoop.conf.Configuration;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class IdleProcessMetricsIT
extends SharedMiniClusterBase {
    private static final Logger log = LoggerFactory.getLogger(IdleProcessMetricsIT.class);
    static final Duration idleProcessInterval = Duration.ofSeconds(10L);
    private static TestStatsDSink sink;

    @BeforeAll
    public static void before() throws Exception {
        sink = new TestStatsDSink();
        SharedMiniClusterBase.startMiniClusterWithConfig(new IdleStopITConfig());
    }

    @AfterAll
    public static void after() throws Exception {
        sink.close();
        SharedMiniClusterBase.stopMiniCluster();
    }

    @Test
    public void testIdleStopMetrics() throws Exception {
        IdleProcessMetricsIT.getCluster().getClusterControl().startCoordinator(CompactionCoordinator.class);
        IdleProcessMetricsIT.getCluster().getClusterControl().startCompactors(Compactor.class, 1, "DCQ1");
        IdleProcessMetricsIT.getCluster().getClusterControl().start(ServerType.SCAN_SERVER, "localhost");
        IdleProcessMetricsIT.getCluster().getClusterControl().start(ServerType.TABLET_SERVER);
        Thread.sleep(idleProcessInterval.toMillis());
        AtomicBoolean sawCompactor = new AtomicBoolean(false);
        AtomicBoolean sawSServer = new AtomicBoolean(false);
        AtomicBoolean sawTServer = new AtomicBoolean(false);
        Wait.waitFor(() -> {
            List<String> statsDMetrics = sink.getLines();
            statsDMetrics.stream().filter(line -> line.startsWith("accumulo.server.idle")).peek(arg_0 -> ((Logger)log).info(arg_0)).map(TestStatsDSink::parseStatsDMetric).forEach(a -> {
                String processName = a.getTags().get("process.name");
                int value = Integer.parseInt(a.getValue());
                Assertions.assertTrue((value == 0 || value == 1 || value == -1 ? 1 : 0) != 0, (String)("Unexpected value " + value));
                if ("tserver".equals(processName) && value == 0) {
                    sawTServer.set(true);
                } else if ("sserver".equals(processName) && value == 1) {
                    sawSServer.set(true);
                } else if ("compactor".equals(processName) && value == 1) {
                    sawCompactor.set(true);
                }
            });
            return sawCompactor.get() && sawSServer.get() && sawTServer.get();
        });
    }

    @Test
    public void idleCompactorTest() throws Exception {
        try (AccumuloClient client = (AccumuloClient)Accumulo.newClient().from(IdleProcessMetricsIT.getCluster().getClientProperties()).build();){
            IdleProcessMetricsIT.getCluster().getClusterControl().startCoordinator(CompactionCoordinator.class);
            IdleProcessMetricsIT.getCluster().getClusterControl().startCompactors(Compactor.class, 1, "DCQ1");
            Thread.sleep(idleProcessInterval.toMillis());
            String processName = "compactor";
            log.info("Waiting for compactor to go idle");
            IdleProcessMetricsIT.waitForIdleMetricValueToBe(1, "compactor");
            String table1 = this.getUniqueNames(1)[0];
            ExternalCompactionTestUtils.createTable(client, table1, "cs1");
            ExternalCompactionTestUtils.writeData(client, table1);
            IteratorSetting setting = new IteratorSetting(50, "Slow", SlowIterator.class);
            SlowIterator.setSleepTime(setting, 5L);
            client.tableOperations().attachIterator(table1, setting, EnumSet.of(IteratorUtil.IteratorScope.majc));
            ExternalCompactionTestUtils.compact(client, table1, 2, "DCQ1", false);
            log.info("Waiting for compactor to be not idle after starting compaction");
            IdleProcessMetricsIT.waitForIdleMetricValueToBe(0, "compactor");
            log.info("Waiting for compactor to go idle once compaction completes");
            IdleProcessMetricsIT.waitForIdleMetricValueToBe(1, "compactor");
            ExternalCompactionTestUtils.verify(client, table1, 2);
        }
    }

    @Test
    public void idleScanServerTest() throws Exception {
        try (AccumuloClient client = (AccumuloClient)Accumulo.newClient().from(IdleProcessMetricsIT.getCluster().getClientProperties()).build();){
            IdleProcessMetricsIT.getCluster().getClusterControl().start(ServerType.SCAN_SERVER, "localhost");
            Thread.sleep(idleProcessInterval.toMillis());
            String processName = "sserver";
            log.info("Waiting for sserver to go idle");
            IdleProcessMetricsIT.waitForIdleMetricValueToBe(1, "sserver");
            String table1 = this.getUniqueNames(1)[0];
            ExternalCompactionTestUtils.createTable(client, table1, "cs1");
            ExternalCompactionTestUtils.writeData(client, table1);
            IteratorSetting setting = new IteratorSetting(50, "Slow", SlowIterator.class);
            SlowIterator.setSleepTime(setting, 5L);
            client.tableOperations().attachIterator(table1, setting, EnumSet.of(IteratorUtil.IteratorScope.scan));
            try (Scanner scanner = client.createScanner(table1, Authorizations.EMPTY);){
                scanner.setConsistencyLevel(ScannerBase.ConsistencyLevel.EVENTUAL);
                Assertions.assertEquals((long)1000L, (long)scanner.stream().count());
            }
            log.info("Waiting for sserver to be not idle after starting a scan");
            IdleProcessMetricsIT.waitForIdleMetricValueToBe(0, "sserver");
            log.info("Waiting for sserver to go idle once scan completes completes");
            IdleProcessMetricsIT.waitForIdleMetricValueToBe(1, "sserver");
        }
    }

    private static void waitForIdleMetricValueToBe(int expectedValue, String processName) {
        Wait.waitFor(() -> sink.getLines().stream().filter(line -> line.startsWith("accumulo.server.idle")).map(TestStatsDSink::parseStatsDMetric).filter(a -> a.getTags().get("process.name").equals(processName)).peek(a -> log.info("Idle metric: {}", a)).anyMatch(a -> Integer.parseInt(a.getValue()) == expectedValue), 60000L, 2000L, "Idle metric did not reach the expected value " + expectedValue);
    }

    public static class IdleStopITConfig
    implements MiniClusterConfigurationCallback {
        @Override
        public void configureMiniCluster(MiniAccumuloConfigImpl cfg, Configuration coreSite) {
            ExternalCompactionTestUtils.configureMiniCluster(cfg, coreSite);
            cfg.setNumCompactors(1);
            cfg.setNumTservers(1);
            cfg.setNumScanServers(1);
            cfg.setProperty(Property.GENERAL_IDLE_PROCESS_INTERVAL, idleProcessInterval.toSeconds() + "s");
            cfg.setProperty(Property.SSERV_CACHED_TABLET_METADATA_EXPIRATION, "1s");
            cfg.setProperty(Property.GENERAL_MICROMETER_ENABLED, "true");
            cfg.setProperty(Property.GENERAL_MICROMETER_FACTORY, TestStatsDRegistryFactory.class.getName());
            Map<String, String> sysProps = Map.of("test.meter.registry.host", "127.0.0.1", "test.meter.registry.port", Integer.toString(sink.getPort()));
            cfg.setSystemProperties(sysProps);
        }
    }
}

