/*
 * Decompiled with CFR 0.152.
 */
package io.smallrye.reactive.messaging.kafka.companion.test;

import io.strimzi.test.container.StrimziKafkaContainer;
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
import java.lang.reflect.Method;
import java.time.Duration;
import java.util.Collection;
import java.util.HashMap;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.common.Node;
import org.jboss.logging.Logger;
import org.junit.jupiter.api.extension.BeforeAllCallback;
import org.junit.jupiter.api.extension.BeforeEachCallback;
import org.junit.jupiter.api.extension.ExtensionContext;
import org.junit.jupiter.api.extension.ParameterContext;
import org.junit.jupiter.api.extension.ParameterResolutionException;
import org.junit.jupiter.api.extension.ParameterResolver;
import org.testcontainers.shaded.org.awaitility.Awaitility;
import org.testcontainers.shaded.org.awaitility.core.ConditionTimeoutException;

public class KafkaBrokerExtension
implements BeforeAllCallback,
BeforeEachCallback,
ParameterResolver,
ExtensionContext.Store.CloseableResource {
    public static final Logger LOGGER = Logger.getLogger((String)KafkaBrokerExtension.class.getName());
    public static final String KAFKA_VERSION = "3.3.2";
    protected StrimziKafkaContainer kafka;

    public void beforeAll(ExtensionContext context) {
        ExtensionContext.Store globalStore = context.getRoot().getStore(ExtensionContext.Namespace.GLOBAL);
        KafkaBrokerExtension extension = (KafkaBrokerExtension)globalStore.get(KafkaBrokerExtension.class);
        if (extension == null) {
            LOGGER.info((Object)"Starting Kafka broker");
            this.startKafkaBroker();
            globalStore.put(KafkaBrokerExtension.class, (Object)this);
        }
    }

    public void close() {
        LOGGER.info((Object)"Stopping Kafka broker");
        this.stopKafkaBroker();
    }

    public static StrimziKafkaContainer createKafkaContainer() {
        return KafkaBrokerExtension.configureKafkaContainer(new StrimziKafkaContainer());
    }

    public static <T extends StrimziKafkaContainer> T configureKafkaContainer(T container) {
        String kafkaVersion = System.getProperty("kafka-container-version", KAFKA_VERSION);
        container.withKafkaVersion(kafkaVersion);
        HashMap<String, String> config = new HashMap<String, String>();
        config.put("log.cleaner.enable", "false");
        config.put("group.initial.rebalance.delay.ms", "0");
        container.withKafkaConfigurationMap(config);
        return container;
    }

    public void startKafkaBroker() {
        this.kafka = KafkaBrokerExtension.createKafkaContainer();
        this.kafka.start();
        LOGGER.info((Object)("Kafka broker started: " + this.kafka.getBootstrapServers() + " (" + this.kafka.getMappedPort(9092) + ")"));
        Awaitility.await().until(() -> this.kafka.isRunning());
    }

    public static StrimziKafkaContainer restart(StrimziKafkaContainer kafka, int gracePeriodInSecond) {
        int port = kafka.getMappedPort(9092);
        try {
            kafka.close();
        }
        catch (Exception exception) {
            // empty catch block
        }
        Awaitility.await().until(() -> !kafka.isRunning());
        KafkaBrokerExtension.sleep(Duration.ofSeconds(gracePeriodInSecond));
        return KafkaBrokerExtension.startKafkaBroker(port);
    }

    public static StrimziKafkaContainer startKafkaBroker(int port) {
        StrimziKafkaContainer kafka = KafkaBrokerExtension.createKafkaContainer().withPort(port);
        kafka.start();
        Awaitility.await().until(() -> ((StrimziKafkaContainer)kafka).isRunning());
        return kafka;
    }

    private static void sleep(Duration duration) {
        try {
            Thread.sleep(duration.toMillis());
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }

    public void stopKafkaBroker() {
        if (this.kafka != null) {
            try {
                this.kafka.stop();
            }
            catch (Exception exception) {
                // empty catch block
            }
            Awaitility.await().until(() -> !this.kafka.isRunning());
        }
    }

    public boolean supportsParameter(ParameterContext parameterContext, ExtensionContext extensionContext) throws ParameterResolutionException {
        return parameterContext.isAnnotated(KafkaBootstrapServers.class) && parameterContext.getParameter().getType().equals(String.class);
    }

    public Object resolveParameter(ParameterContext parameterContext, ExtensionContext extensionContext) throws ParameterResolutionException {
        if (parameterContext.isAnnotated(KafkaBootstrapServers.class)) {
            ExtensionContext.Store globalStore = extensionContext.getRoot().getStore(ExtensionContext.Namespace.GLOBAL);
            KafkaBrokerExtension extension = (KafkaBrokerExtension)globalStore.get(KafkaBrokerExtension.class);
            if (extension.kafka != null) {
                return extension.kafka.getBootstrapServers();
            }
        }
        return null;
    }

    public void beforeEach(ExtensionContext context) throws Exception {
        LOGGER.infof("Running test %s (%s#%s)", (Object)context.getDisplayName(), (Object)context.getTestClass().map(Class::getName).orElse(""), (Object)context.getTestMethod().map(Method::getName).orElse(""));
        if (this.kafka != null) {
            for (int i = 0; i < 3; ++i) {
                try {
                    this.isBrokerHealthy();
                    return;
                }
                catch (ConditionTimeoutException e) {
                    LOGGER.warn((Object)"The Kafka broker is not healthy, restarting it");
                    KafkaBrokerExtension.restart(this.kafka, 0);
                    continue;
                }
            }
            throw new IllegalStateException("The Kafka broker is not unhealthy, despite 3 restarts");
        }
    }

    private void isBrokerHealthy() {
        Awaitility.await().until(() -> this.kafka.isRunning());
        Awaitility.await().catchUncaughtExceptions().until(() -> {
            HashMap<String, String> config = new HashMap<String, String>();
            config.put("bootstrap.servers", this.kafka.getBootstrapServers());
            config.put("client.id", "broker-healthy-admin");
            try (AdminClient admin = AdminClient.create(config);){
                Collection nodes = (Collection)admin.describeCluster().nodes().get();
                Boolean bl = nodes.size() == 1 && ((Node)nodes.iterator().next()).id() >= 0;
                return bl;
            }
        });
    }

    @Target(value={ElementType.FIELD, ElementType.PARAMETER})
    @Retention(value=RetentionPolicy.RUNTIME)
    public static @interface KafkaBootstrapServers {
    }
}

