/*
 * Decompiled with CFR 0.152.
 */
package org.apache.accumulo.test.manager;

import com.google.common.collect.HashMultimap;
import com.google.common.collect.SetMultimap;
import java.net.UnknownHostException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.TreeSet;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.FutureTask;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import org.apache.accumulo.core.client.Accumulo;
import org.apache.accumulo.core.client.AccumuloClient;
import org.apache.accumulo.core.client.admin.InstanceOperations;
import org.apache.accumulo.core.client.admin.NewTableConfiguration;
import org.apache.accumulo.core.clientImpl.ClientContext;
import org.apache.accumulo.core.clientImpl.TabletLocator;
import org.apache.accumulo.core.conf.ClientProperty;
import org.apache.accumulo.core.conf.Property;
import org.apache.accumulo.core.data.Range;
import org.apache.accumulo.core.data.TableId;
import org.apache.accumulo.core.dataImpl.KeyExtent;
import org.apache.accumulo.core.metadata.MetadataTable;
import org.apache.accumulo.core.metadata.RootTable;
import org.apache.accumulo.core.metadata.TServerInstance;
import org.apache.accumulo.core.metadata.TabletLocationState;
import org.apache.accumulo.core.rpc.clients.ThriftClientTypes;
import org.apache.accumulo.core.spi.balancer.HostRegexTableLoadBalancer;
import org.apache.accumulo.core.spi.balancer.TabletBalancer;
import org.apache.accumulo.core.spi.balancer.data.TabletServerId;
import org.apache.accumulo.core.util.HostAndPort;
import org.apache.accumulo.harness.AccumuloITBase;
import org.apache.accumulo.minicluster.ServerType;
import org.apache.accumulo.miniclusterImpl.MiniAccumuloConfigImpl;
import org.apache.accumulo.miniclusterImpl.ProcessReference;
import org.apache.accumulo.server.manager.state.MetaDataTableScanner;
import org.apache.accumulo.test.functional.ConfigurableMacBase;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.Text;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class SuspendedTabletsIT
extends ConfigurableMacBase {
    private static final Logger log = LoggerFactory.getLogger(SuspendedTabletsIT.class);
    private static ExecutorService THREAD_POOL;
    public static final int TSERVERS = 3;
    public static final int TABLETS = 30;
    private ProcessReference metadataTserverProcess;
    private static final AtomicInteger threadCounter;

    @Override
    protected Duration defaultTimeout() {
        return Duration.ofMinutes(5L);
    }

    @Override
    public void configure(MiniAccumuloConfigImpl cfg, Configuration fsConf) {
        cfg.setClientProperty(ClientProperty.INSTANCE_ZOOKEEPERS_TIMEOUT, "5s");
        cfg.setProperty(Property.INSTANCE_ZK_TIMEOUT, "5s");
        cfg.setNumTservers(1);
        cfg.setProperty(HostRegexTableLoadBalancer.HOST_BALANCER_OOB_CHECK_KEY, "1ms");
        cfg.setProperty(Property.MANAGER_TABLET_BALANCER.getKey(), HostAndPortRegexTableLoadBalancer.class.getName());
    }

    @Override
    @BeforeEach
    public void setUp() throws Exception {
        super.setUp();
        try (AccumuloClient client = (AccumuloClient)Accumulo.newClient().from(this.getClientProperties()).build();){
            InstanceOperations iops = client.instanceOperations();
            List tservers = iops.getTabletServers();
            while (tservers == null || tservers.size() < 1) {
                Thread.sleep(1000L);
                tservers = client.instanceOperations().getTabletServers();
            }
            HostAndPort metadataServer = HostAndPort.fromString((String)((String)tservers.get(0)));
            log.info("Configuring balancer to assign all metadata tablets to {}", (Object)metadataServer);
            iops.setProperty(HostRegexTableLoadBalancer.HOST_BALANCER_PREFIX + MetadataTable.NAME, metadataServer.toString());
            ClientContext ctx = (ClientContext)client;
            TabletLocations tl = TabletLocations.retrieve(ctx, MetadataTable.NAME, RootTable.NAME);
            while (tl.hosted.keySet().size() != 1 || !tl.hosted.containsKey((Object)metadataServer)) {
                log.info("Metadata tablets are not hosted on the correct server. Waiting for balancer...");
                Thread.sleep(1000L);
                tl = TabletLocations.retrieve(ctx, MetadataTable.NAME, RootTable.NAME);
            }
            log.info("Metadata tablets are now hosted on {}", (Object)metadataServer);
        }
        Collection procs = (Collection)this.getCluster().getProcesses().get(ServerType.TABLET_SERVER);
        Assertions.assertEquals((int)1, (int)procs.size(), (String)"Expected a single tserver process");
        this.metadataTserverProcess = (ProcessReference)procs.iterator().next();
        this.getCluster().getConfig().setNumTservers(3);
        this.getCluster().start();
    }

    @Test
    public void crashAndResumeTserver() throws Exception {
        this.suspensionTestBody(new CrashTserverKiller(), AfterSuspendAction.RESUME);
    }

    @Test
    public void crashAndOffline() throws Exception {
        this.suspensionTestBody(new CrashTserverKiller(), AfterSuspendAction.OFFLINE);
    }

    @Test
    public void shutdownAndResumeTserver() throws Exception {
        this.suspensionTestBody(new ShutdownTserverKiller(), AfterSuspendAction.RESUME);
    }

    @Test
    public void shutdownAndOffline() throws Exception {
        this.suspensionTestBody(new ShutdownTserverKiller(), AfterSuspendAction.OFFLINE);
    }

    private void suspensionTestBody(TServerKiller serverStopper, AfterSuspendAction action) throws Exception {
        block16: {
            try (AccumuloClient client = (AccumuloClient)Accumulo.newClient().from(this.getClientProperties()).build();){
                ClientContext ctx = (ClientContext)client;
                String tableName = this.getUniqueNames(1)[0];
                TreeSet<Text> splitPoints = new TreeSet<Text>();
                for (int i = 1; i < 30; ++i) {
                    splitPoints.add(new Text("" + i));
                }
                log.info("Creating table " + tableName);
                NewTableConfiguration ntc = new NewTableConfiguration().withSplits(splitPoints);
                ntc.setProperties(Map.of(Property.TABLE_SUSPEND_DURATION.getKey(), action.suspendTime));
                ctx.tableOperations().create(tableName, ntc);
                log.info("Waiting on hosting and balance");
                TabletLocations ds = TabletLocations.retrieve(ctx, tableName);
                while (ds.hostedCount != 30) {
                    Thread.sleep(1000L);
                    ds = TabletLocations.retrieve(ctx, tableName);
                }
                ctx.instanceOperations().waitForBalance();
                do {
                    Thread.sleep(1000L);
                    ds = TabletLocations.retrieve(ctx, tableName);
                } while (ds.hostedCount != 30 || ds.hosted.keySet().size() != 2);
                Assertions.assertEquals((int)2, (int)ds.hosted.keySet().size());
                TabletLocations beforeDeathState = ds;
                log.info("Eliminating tablet servers");
                serverStopper.eliminateTabletServers(ctx, beforeDeathState, 2);
                log.info("Waiting on suspended tablets");
                do {
                    Thread.sleep(1000L);
                    ds = TabletLocations.retrieve(ctx, tableName);
                } while (ds.suspended.keySet().size() != 2 || ds.suspendedCount + ds.hostedCount != 30);
                SetMultimap<HostAndPort, KeyExtent> deadTabletsByServer = ds.suspended;
                for (HostAndPort server : deadTabletsByServer.keySet()) {
                    Assertions.assertEquals((Object)beforeDeathState.hosted.get((Object)server), (Object)deadTabletsByServer.get((Object)server));
                }
                Assertions.assertEquals((int)30, (int)(ds.hostedCount + ds.suspendedCount));
                Assertions.assertTrue((ds.suspendedCount > 0 ? 1 : 0) != 0);
                if (action == AfterSuspendAction.OFFLINE) {
                    client.tableOperations().offline(tableName, true);
                    while (ds.suspendedCount > 0) {
                        Thread.sleep(1000L);
                        ds = TabletLocations.retrieve(ctx, tableName);
                        log.info("Waiting for suspended {}", ds.suspended);
                    }
                    break block16;
                }
                if (action == AfterSuspendAction.RESUME) {
                    HostAndPort restartedServer = (HostAndPort)deadTabletsByServer.keySet().iterator().next();
                    log.info("Restarting " + restartedServer);
                    this.getCluster().getClusterControl().start(ServerType.TABLET_SERVER, Map.of(Property.TSERV_CLIENTPORT.getKey(), "" + restartedServer.getPort(), Property.TSERV_PORTSEARCH.getKey(), "false"), 1);
                    log.info("Awaiting tablet unsuspension for tablets belonging to " + restartedServer);
                    while (ds.suspended.containsKey((Object)restartedServer) || ds.assignedCount != 0) {
                        Thread.sleep(1000L);
                        ds = TabletLocations.retrieve(ctx, tableName);
                    }
                    Assertions.assertEquals((Object)deadTabletsByServer.get((Object)restartedServer), (Object)ds.hosted.get((Object)restartedServer));
                    log.info("Awaiting tablet reassignment for remaining tablets");
                    while (ds.hostedCount != 30) {
                        Thread.sleep(1000L);
                        ds = TabletLocations.retrieve(ctx, tableName);
                    }
                    break block16;
                }
                throw new IllegalStateException("Unknown action " + action);
            }
        }
    }

    @BeforeAll
    public static void init() {
        THREAD_POOL = Executors.newCachedThreadPool(r -> new Thread(r, "Scanning deadline thread #" + threadCounter.incrementAndGet()));
    }

    @AfterAll
    public static void cleanup() {
        THREAD_POOL.shutdownNow();
    }

    static {
        threadCounter = new AtomicInteger(0);
    }

    public static class HostAndPortRegexTableLoadBalancer
    extends HostRegexTableLoadBalancer {
        private static final Logger LOG = LoggerFactory.getLogger((String)HostAndPortRegexTableLoadBalancer.class.getName());

        protected List<String> getPoolNamesForHost(TabletServerId tabletServerId) {
            String host;
            String test = host = tabletServerId.getHost();
            if (!this.isIpBasedRegex()) {
                try {
                    test = this.getNameFromIp(host);
                }
                catch (UnknownHostException e1) {
                    LOG.error("Unable to determine host name for IP: " + host + ", setting to default pool", (Throwable)e1);
                    return Collections.singletonList("HostTableLoadBalancer.ALL");
                }
            }
            String hostString = test + ":" + tabletServerId.getPort();
            List<String> pools = this.getPoolNameToRegexPattern().entrySet().stream().filter(e -> ((Pattern)e.getValue()).matcher(hostString).matches()).map(Map.Entry::getKey).collect(Collectors.toList());
            if (pools.isEmpty()) {
                pools.add("HostTableLoadBalancer.ALL");
            }
            return pools;
        }

        public long balance(TabletBalancer.BalanceParameters params) {
            super.balance(params);
            return 1000L;
        }
    }

    private static class TabletLocations {
        public final Map<KeyExtent, TabletLocationState> locationStates = new HashMap<KeyExtent, TabletLocationState>();
        public final SetMultimap<HostAndPort, KeyExtent> hosted = HashMultimap.create();
        public final SetMultimap<HostAndPort, KeyExtent> suspended = HashMultimap.create();
        public int hostedCount = 0;
        public int assignedCount = 0;
        public int suspendedCount = 0;

        private TabletLocations() {
        }

        public static TabletLocations retrieve(ClientContext ctx, String tableName) throws Exception {
            return TabletLocations.retrieve(ctx, tableName, MetadataTable.NAME);
        }

        public static TabletLocations retrieve(ClientContext ctx, String tableName, String metaName) throws Exception {
            int sleepTime = 200;
            int remainingAttempts = 30;
            while (true) {
                try {
                    FutureTask<TabletLocations> tlsFuture = new FutureTask<TabletLocations>(() -> {
                        TabletLocations answer = new TabletLocations();
                        answer.scan(ctx, tableName, metaName);
                        return answer;
                    });
                    THREAD_POOL.execute(tlsFuture);
                    return tlsFuture.get(5L, TimeUnit.SECONDS);
                }
                catch (TimeoutException ex) {
                    log.debug("Retrieval timed out", (Throwable)ex);
                }
                catch (Exception ex) {
                    log.warn("Failed to scan metadata", (Throwable)ex);
                }
                sleepTime = Math.min(2 * sleepTime, 10000);
                Thread.sleep(sleepTime);
                if (--remainingAttempts != 0) continue;
                Assertions.fail((String)"Scanning of metadata failed, aborting");
            }
        }

        private void scan(ClientContext ctx, String tableName, String metaName) {
            Map idMap = ctx.tableOperations().tableIdMap();
            String tableId = Objects.requireNonNull((String)idMap.get(tableName));
            try (MetaDataTableScanner scanner = new MetaDataTableScanner(ctx, new Range(), metaName);){
                while (scanner.hasNext()) {
                    TabletLocationState tls = scanner.next();
                    if (!tls.extent.tableId().canonical().equals(tableId)) continue;
                    this.locationStates.put(tls.extent, tls);
                    if (tls.suspend != null) {
                        this.suspended.put((Object)tls.suspend.server, (Object)tls.extent);
                        ++this.suspendedCount;
                        continue;
                    }
                    if (tls.current != null) {
                        this.hosted.put((Object)tls.current.getHostAndPort(), (Object)tls.extent);
                        ++this.hostedCount;
                        continue;
                    }
                    if (tls.future == null) continue;
                    ++this.assignedCount;
                }
            }
        }
    }

    private class CrashTserverKiller
    implements TServerKiller {
        private CrashTserverKiller() {
        }

        @Override
        public void eliminateTabletServers(ClientContext ctx, TabletLocations locs, int count) throws Exception {
            List procs = ((Collection)SuspendedTabletsIT.this.getCluster().getProcesses().get(ServerType.TABLET_SERVER)).stream().filter(p -> !SuspendedTabletsIT.this.metadataTserverProcess.equals(p)).collect(Collectors.toList());
            Collections.shuffle(procs, AccumuloITBase.random);
            Assertions.assertEquals((int)2, (int)procs.size(), (String)"Not enough tservers exist");
            Assertions.assertTrue((procs.size() >= count ? 1 : 0) != 0, (String)("Attempting to kill more tservers (" + count + ") than exist in the cluster (" + procs.size() + ")"));
            for (int i = 0; i < count; ++i) {
                ProcessReference pr = (ProcessReference)procs.get(i);
                log.info("Crashing {}", (Object)pr.getProcess());
                SuspendedTabletsIT.this.getCluster().killProcess(ServerType.TABLET_SERVER, pr);
            }
        }
    }

    static enum AfterSuspendAction {
        RESUME("80s"),
        OFFLINE("800s");

        public final String suspendTime;

        private AfterSuspendAction(String suspendTime) {
            this.suspendTime = suspendTime;
        }
    }

    private static interface TServerKiller {
        public void eliminateTabletServers(ClientContext var1, TabletLocations var2, int var3) throws Exception;
    }

    private class ShutdownTserverKiller
    implements TServerKiller {
        private ShutdownTserverKiller() {
        }

        @Override
        public void eliminateTabletServers(ClientContext ctx, TabletLocations locs, int count) throws Exception {
            HashSet<TServerInstance> tserverSet = new HashSet<TServerInstance>();
            HashSet<TServerInstance> metadataServerSet = new HashSet<TServerInstance>();
            TabletLocator tl = TabletLocator.getLocator((ClientContext)ctx, (TableId)MetadataTable.ID);
            for (TabletLocationState tls : locs.locationStates.values()) {
                if (tls.current == null) continue;
                tserverSet.add(tls.current.getServerInstance());
                TabletLocator.TabletLocation tab = tl.locateTablet(ctx, tls.extent.toMetaRow(), false, false);
                metadataServerSet.add(new TServerInstance(tab.tablet_location, Long.valueOf(tab.tablet_session, 16).longValue()));
            }
            Assertions.assertEquals((int)1, (int)metadataServerSet.size(), (String)"Expecting a single tServer in metadataServerSet");
            tserverSet.removeAll(metadataServerSet);
            Assertions.assertEquals((int)2, (int)tserverSet.size(), (String)"Expecting 2 tServers in shutdown-list");
            ArrayList tserversList = new ArrayList(tserverSet);
            Collections.shuffle(tserversList, AccumuloITBase.random);
            for (int i1 = 0; i1 < count; ++i1) {
                String tserverName = ((TServerInstance)tserversList.get(i1)).getHostPortSession();
                ThriftClientTypes.MANAGER.executeVoid(ctx, client -> {
                    log.info("Sending shutdown command to {} via ManagerClientService", (Object)tserverName);
                    client.shutdownTabletServer(null, ctx.rpcCreds(), tserverName, false);
                });
            }
            log.info("Waiting for tserver process{} to die", (Object)(count == 1 ? "" : "es"));
            for (int i2 = 0; i2 < 10; ++i2) {
                ArrayList<ProcessReference> deadProcs = new ArrayList<ProcessReference>();
                for (ProcessReference pr1 : (Collection)SuspendedTabletsIT.this.getCluster().getProcesses().get(ServerType.TABLET_SERVER)) {
                    Process p = pr1.getProcess();
                    if (p.isAlive()) continue;
                    deadProcs.add(pr1);
                }
                for (ProcessReference pr2 : deadProcs) {
                    log.info("Process {} is dead, informing cluster control about this", (Object)pr2.getProcess());
                    SuspendedTabletsIT.this.getCluster().getClusterControl().killProcess(ServerType.TABLET_SERVER, pr2);
                    --count;
                }
                if (count == 0) {
                    return;
                }
                Thread.sleep(TimeUnit.SECONDS.toMillis(2L));
            }
            throw new IllegalStateException("Tablet servers didn't die!");
        }
    }
}

