/*
 * Decompiled with CFR 0.152.
 */
package org.apache.druid.tests.leadership;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableList;
import com.google.inject.Inject;
import java.net.MalformedURLException;
import java.net.URL;
import java.util.Collection;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import org.apache.druid.common.config.NullHandling;
import org.apache.druid.curator.discovery.ServerDiscoveryFactory;
import org.apache.druid.discovery.DiscoveryDruidNode;
import org.apache.druid.discovery.DruidNodeDiscovery;
import org.apache.druid.discovery.DruidNodeDiscoveryProvider;
import org.apache.druid.discovery.NodeRole;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.java.util.http.client.HttpClient;
import org.apache.druid.java.util.http.client.Request;
import org.apache.druid.java.util.http.client.response.HttpResponseHandler;
import org.apache.druid.java.util.http.client.response.StatusResponseHandler;
import org.apache.druid.java.util.http.client.response.StatusResponseHolder;
import org.apache.druid.testing.IntegrationTestingConfig;
import org.apache.druid.testing.clients.CoordinatorResourceTestClient;
import org.apache.druid.testing.guice.DruidTestModuleFactory;
import org.apache.druid.testing.guice.TestClient;
import org.apache.druid.testing.utils.DruidClusterAdminClient;
import org.apache.druid.testing.utils.ITRetryUtil;
import org.apache.druid.testing.utils.SqlTestQueryHelper;
import org.apache.druid.tests.indexer.AbstractIndexerTest;
import org.jboss.netty.handler.codec.http.HttpMethod;
import org.jboss.netty.handler.codec.http.HttpResponseStatus;
import org.testng.Assert;
import org.testng.annotations.Guice;
import org.testng.annotations.Test;

@Test(groups={"high-availability"})
@Guice(moduleFactory=DruidTestModuleFactory.class)
public class ITHighAvailabilityTest {
    private static final Logger LOG = new Logger(ITHighAvailabilityTest.class);
    private static final String SYSTEM_QUERIES_RESOURCE = "/queries/high_availability_sys.json";
    private static final int NUM_LEADERSHIP_SWAPS = 3;
    private static final int NUM_RETRIES = 120;
    private static final long RETRY_DELAY = TimeUnit.SECONDS.toMillis(5L);
    @Inject
    private IntegrationTestingConfig config;
    @Inject
    private DruidClusterAdminClient druidClusterAdminClient;
    @Inject
    ServerDiscoveryFactory factory;
    @Inject
    DruidNodeDiscoveryProvider druidNodeDiscovery;
    @Inject
    CoordinatorResourceTestClient coordinatorClient;
    @Inject
    SqlTestQueryHelper queryHelper;
    @Inject
    ObjectMapper jsonMapper;
    @Inject
    @TestClient
    HttpClient httpClient;

    @Test
    public void testLeadershipChanges() throws Exception {
        int runCount = 0;
        String previousCoordinatorLeader = null;
        String previousOverlordLeader = null;
        do {
            String coordinatorLeader = this.getLeader("coordinator");
            String overlordLeader = this.getLeader("indexer");
            Assert.assertNotEquals(previousCoordinatorLeader, (Object)coordinatorLeader);
            Assert.assertNotEquals(previousOverlordLeader, (Object)overlordLeader);
            previousCoordinatorLeader = coordinatorLeader;
            previousOverlordLeader = overlordLeader;
            String queries = ITHighAvailabilityTest.fillTemplate(this.config, AbstractIndexerTest.getResourceAsString(SYSTEM_QUERIES_RESOURCE), overlordLeader, coordinatorLeader);
            this.queryHelper.testQueriesFromString(queries);
            this.swapLeadersAndWait(coordinatorLeader, overlordLeader);
        } while (runCount++ < 3);
    }

    @Test
    public void testDiscoveryAndSelfDiscovery() {
        ITRetryUtil.retryUntil(() -> {
            try {
                ImmutableList disco = ImmutableList.of((Object)this.druidNodeDiscovery.getForNodeRole(NodeRole.COORDINATOR), (Object)this.druidNodeDiscovery.getForNodeRole(NodeRole.OVERLORD), (Object)this.druidNodeDiscovery.getForNodeRole(NodeRole.HISTORICAL), (Object)this.druidNodeDiscovery.getForNodeRole(NodeRole.MIDDLE_MANAGER), (Object)this.druidNodeDiscovery.getForNodeRole(NodeRole.INDEXER), (Object)this.druidNodeDiscovery.getForNodeRole(NodeRole.BROKER), (Object)this.druidNodeDiscovery.getForNodeRole(NodeRole.ROUTER));
                int servicesDiscovered = 0;
                for (DruidNodeDiscovery nodeRole : disco) {
                    Collection nodes = nodeRole.getAllNodes();
                    servicesDiscovered += this.testSelfDiscovery(nodes);
                }
                return servicesDiscovered > 5;
            }
            catch (Throwable t) {
                return false;
            }
        }, (boolean)true, (long)RETRY_DELAY, (int)120, (String)"Standard services discovered");
    }

    @Test
    public void testCustomDiscovery() {
        ITRetryUtil.retryUntil(() -> {
            try {
                DruidNodeDiscovery customDisco = this.druidNodeDiscovery.getForNodeRole(new NodeRole("custom-node-role"));
                int count = this.testSelfDiscovery(customDisco.getAllNodes());
                return count > 0;
            }
            catch (Throwable t) {
                return false;
            }
        }, (boolean)true, (long)RETRY_DELAY, (int)120, (String)"Custom service discovered");
    }

    private int testSelfDiscovery(Collection<DiscoveryDruidNode> nodes) throws MalformedURLException, ExecutionException, InterruptedException {
        int count = 0;
        for (DiscoveryDruidNode node : nodes) {
            String location = StringUtils.format((String)"http://%s:%s/status/selfDiscovered", (Object[])new Object[]{this.config.isDocker() ? this.config.getDockerHost() : node.getDruidNode().getHost(), node.getDruidNode().getPlaintextPort()});
            LOG.info("testing self discovery %s", new Object[]{location});
            StatusResponseHolder response = (StatusResponseHolder)this.httpClient.go(new Request(HttpMethod.GET, new URL(location)), (HttpResponseHandler)StatusResponseHandler.getInstance()).get();
            LOG.info("%s responded with %s", new Object[]{location, response.getStatus().getCode()});
            Assert.assertEquals((Object)response.getStatus(), (Object)HttpResponseStatus.OK);
            ++count;
        }
        return count;
    }

    private void swapLeadersAndWait(String coordinatorLeader, String overlordLeader) {
        Runnable waitUntilOverlordSupplier;
        Runnable waitUntilCoordinatorSupplier;
        if (ITHighAvailabilityTest.isCoordinatorOneLeader(this.config, coordinatorLeader)) {
            this.druidClusterAdminClient.restartCoordinatorContainer();
            waitUntilCoordinatorSupplier = () -> this.druidClusterAdminClient.waitUntilCoordinatorReady();
        } else {
            this.druidClusterAdminClient.restartCoordinatorTwoContainer();
            waitUntilCoordinatorSupplier = () -> this.druidClusterAdminClient.waitUntilCoordinatorTwoReady();
        }
        if (ITHighAvailabilityTest.isOverlordOneLeader(this.config, overlordLeader)) {
            this.druidClusterAdminClient.restartOverlordContainer();
            waitUntilOverlordSupplier = () -> this.druidClusterAdminClient.waitUntilIndexerReady();
        } else {
            this.druidClusterAdminClient.restartOverlordTwoContainer();
            waitUntilOverlordSupplier = () -> this.druidClusterAdminClient.waitUntilOverlordTwoReady();
        }
        waitUntilCoordinatorSupplier.run();
        waitUntilOverlordSupplier.run();
    }

    private String getLeader(String service) {
        try {
            StatusResponseHolder response = (StatusResponseHolder)this.httpClient.go(new Request(HttpMethod.GET, new URL(StringUtils.format((String)"%s/druid/%s/v1/leader", (Object[])new Object[]{this.config.getRouterUrl(), service}))), (HttpResponseHandler)StatusResponseHandler.getInstance()).get();
            if (!response.getStatus().equals((Object)HttpResponseStatus.OK)) {
                throw new ISE("Error while fetching leader from[%s] status[%s] content[%s]", new Object[]{this.config.getRouterUrl(), response.getStatus(), response.getContent()});
            }
            return response.getContent();
        }
        catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    private static String fillTemplate(IntegrationTestingConfig config, String template, String overlordLeader, String coordinatorLeader) {
        String working = template;
        working = StringUtils.replace((String)working, (String)"%%OVERLORD_ONE%%", (String)config.getOverlordInternalHost());
        working = StringUtils.replace((String)working, (String)"%%OVERLORD_TWO%%", (String)config.getOverlordTwoInternalHost());
        working = StringUtils.replace((String)working, (String)"%%COORDINATOR_ONE%%", (String)config.getCoordinatorInternalHost());
        working = StringUtils.replace((String)working, (String)"%%COORDINATOR_TWO%%", (String)config.getCoordinatorTwoInternalHost());
        working = StringUtils.replace((String)working, (String)"%%BROKER%%", (String)config.getBrokerInternalHost());
        working = StringUtils.replace((String)working, (String)"%%ROUTER%%", (String)config.getRouterInternalHost());
        if (ITHighAvailabilityTest.isOverlordOneLeader(config, overlordLeader)) {
            working = StringUtils.replace((String)working, (String)"%%OVERLORD_ONE_LEADER%%", (String)"1");
            working = StringUtils.replace((String)working, (String)"%%OVERLORD_TWO_LEADER%%", (String)"0");
        } else {
            working = StringUtils.replace((String)working, (String)"%%OVERLORD_ONE_LEADER%%", (String)"0");
            working = StringUtils.replace((String)working, (String)"%%OVERLORD_TWO_LEADER%%", (String)"1");
        }
        if (ITHighAvailabilityTest.isCoordinatorOneLeader(config, coordinatorLeader)) {
            working = StringUtils.replace((String)working, (String)"%%COORDINATOR_ONE_LEADER%%", (String)"1");
            working = StringUtils.replace((String)working, (String)"%%COORDINATOR_TWO_LEADER%%", (String)"0");
        } else {
            working = StringUtils.replace((String)working, (String)"%%COORDINATOR_ONE_LEADER%%", (String)"0");
            working = StringUtils.replace((String)working, (String)"%%COORDINATOR_TWO_LEADER%%", (String)"1");
        }
        working = StringUtils.replace((String)working, (String)"%%NON_LEADER%%", (String)String.valueOf(NullHandling.defaultLongValue()));
        return working;
    }

    private static boolean isCoordinatorOneLeader(IntegrationTestingConfig config, String coordinatorLeader) {
        return coordinatorLeader.contains(ITHighAvailabilityTest.transformHost(config.getCoordinatorInternalHost()));
    }

    private static boolean isOverlordOneLeader(IntegrationTestingConfig config, String overlordLeader) {
        return overlordLeader.contains(ITHighAvailabilityTest.transformHost(config.getOverlordInternalHost()));
    }

    private static String transformHost(String host) {
        return StringUtils.format((String)"%s:", (Object[])new Object[]{host});
    }
}

