/*
 * Decompiled with CFR 0.152.
 */
package io.goodforgod.testcontainers.extensions.kafka;

import io.goodforgod.testcontainers.extensions.AbstractTestcontainersExtension;
import io.goodforgod.testcontainers.extensions.ContainerContext;
import io.goodforgod.testcontainers.extensions.ContainerMode;
import io.goodforgod.testcontainers.extensions.kafka.ConnectionKafka;
import io.goodforgod.testcontainers.extensions.kafka.ContainerKafka;
import io.goodforgod.testcontainers.extensions.kafka.KafkaConnection;
import io.goodforgod.testcontainers.extensions.kafka.KafkaConnectionImpl;
import io.goodforgod.testcontainers.extensions.kafka.KafkaContext;
import io.goodforgod.testcontainers.extensions.kafka.KafkaMetadata;
import io.goodforgod.testcontainers.extensions.kafka.TestcontainersKafka;
import io.goodforgod.testcontainers.extensions.kafka.Topics;
import java.lang.annotation.Annotation;
import java.lang.reflect.Field;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.Set;
import java.util.function.Consumer;
import org.jetbrains.annotations.ApiStatus;
import org.jetbrains.annotations.NotNull;
import org.junit.jupiter.api.extension.ExtensionConfigurationException;
import org.junit.jupiter.api.extension.ExtensionContext;
import org.junit.jupiter.api.extension.ParameterContext;
import org.junit.jupiter.api.extension.ParameterResolutionException;
import org.slf4j.LoggerFactory;
import org.testcontainers.containers.KafkaContainer;
import org.testcontainers.containers.Network;
import org.testcontainers.containers.output.Slf4jLogConsumer;
import org.testcontainers.containers.wait.strategy.Wait;
import org.testcontainers.containers.wait.strategy.WaitStrategy;
import org.testcontainers.utility.ComparableVersion;
import org.testcontainers.utility.DockerImageName;

@ApiStatus.Internal
final class TestcontainersKafkaExtension
extends AbstractTestcontainersExtension<KafkaConnection, KafkaContainer, KafkaMetadata> {
    private static final String MIN_KRAFT_TAG = "7.0.0";
    private static final ExtensionContext.Namespace NAMESPACE = ExtensionContext.Namespace.create((Object[])new Object[]{TestcontainersKafkaExtension.class});

    TestcontainersKafkaExtension() {
    }

    protected Class<? extends Annotation> getContainerAnnotation() {
        return ContainerKafka.class;
    }

    protected Class<? extends Annotation> getConnectionAnnotation() {
        return ConnectionKafka.class;
    }

    protected Class<KafkaConnection> getConnectionType() {
        return KafkaConnection.class;
    }

    protected Class<KafkaContainer> getContainerType() {
        return KafkaContainer.class;
    }

    protected ExtensionContext.Namespace getNamespace() {
        return NAMESPACE;
    }

    protected KafkaContainer createContainerDefault(KafkaMetadata metadata) {
        DockerImageName image = DockerImageName.parse((String)metadata.image()).asCompatibleSubstituteFor(DockerImageName.parse((String)"confluentinc/cp-kafka"));
        KafkaContainer container = new KafkaContainer(image);
        String alias = Optional.ofNullable(metadata.networkAlias()).orElseGet(() -> "kafka-" + System.currentTimeMillis());
        container.withLogConsumer((Consumer)new Slf4jLogConsumer(LoggerFactory.getLogger(KafkaContainer.class)).withMdc("image", image.asCanonicalNameString()).withMdc("alias", alias));
        container.withEnv("KAFKA_CONFLUENT_SUPPORT_METRICS_ENABLE", "false");
        container.withEnv("AUTO_CREATE_TOPICS", "true");
        container.withEnv("KAFKA_LOG4J_LOGGERS", "org.apache.zookeeper=ERROR,org.kafka.zookeeper=ERROR,kafka.zookeeper=ERROR,org.apache.kafka=ERROR,kafka=ERROR,kafka.network=ERROR,kafka.cluster=ERROR,kafka.controller=ERROR,kafka.coordinator=INFO,kafka.log=ERROR,kafka.server=ERROR,state.change.logger=ERROR");
        container.withEnv("ZOOKEEPER_LOG4J_LOGGERS", "org.apache.zookeeper=ERROR,org.kafka.zookeeper=ERROR,org.kafka.zookeeper.server=ERROR,kafka.zookeeper=ERROR,org.apache.kafka=ERROR");
        container.withExposedPorts(new Integer[]{9092, 9093});
        container.waitingFor((WaitStrategy)Wait.forListeningPort());
        container.withStartupTimeout(Duration.ofMinutes(5L));
        ComparableVersion actualVersion = new ComparableVersion(DockerImageName.parse((String)container.getDockerImageName()).getVersionPart());
        if (!actualVersion.isLessThan(MIN_KRAFT_TAG)) {
            Optional<Method> withKraft = Arrays.stream(KafkaContainer.class.getDeclaredMethods()).filter(m -> m.getName().equals("withKraft")).findFirst();
            if (withKraft.isPresent()) {
                withKraft.get().setAccessible(true);
                try {
                    withKraft.get().invoke((Object)this, new Object[0]);
                    LoggerFactory.getLogger(KafkaContainer.class).info("Kraft is enabled");
                }
                catch (IllegalAccessException | InvocationTargetException e) {
                    container.withEmbeddedZookeeper();
                }
            } else {
                container.withEmbeddedZookeeper();
            }
        }
        container.setNetworkAliases(new ArrayList<String>(List.of(alias)));
        if (metadata.networkShared()) {
            container.withNetwork(Network.SHARED);
        }
        return container;
    }

    @NotNull
    protected Optional<KafkaMetadata> findMetadata(@NotNull ExtensionContext context) {
        return this.findAnnotation(TestcontainersKafka.class, context).map(a -> new KafkaMetadata(a.network().shared(), a.network().alias(), a.image(), a.mode(), Set.of(a.topics().value()), a.topics().reset()));
    }

    protected void injectContextIntoField(ContainerContext<KafkaConnection> containerContext, Field field, Object testClassInstance) {
        try {
            KafkaConnectionImpl fieldKafkaConnection;
            ConnectionKafka annotation = field.getAnnotation(ConnectionKafka.class);
            if (annotation.properties().length == 0) {
                fieldKafkaConnection = (KafkaConnectionImpl)containerContext.connection();
            } else {
                if (annotation.properties().length % 2 != 0) {
                    throw new ExtensionConfigurationException("@ConnectionKafka#properties must have even number, properties expected as map of keys and values");
                }
                Properties fieldProperties = new Properties();
                fieldProperties.putAll((Map<?, ?>)((KafkaConnection)containerContext.connection()).params().properties());
                for (int i = 0; i < annotation.properties().length; i += 2) {
                    fieldProperties.put(annotation.properties()[i], annotation.properties()[i + 1]);
                }
                fieldKafkaConnection = (KafkaConnectionImpl)((KafkaConnection)containerContext.connection()).withProperties(fieldProperties);
                ((KafkaContext)containerContext).pool().add(fieldKafkaConnection);
            }
            field.setAccessible(true);
            field.set(testClassInstance, fieldKafkaConnection);
        }
        catch (IllegalAccessException e) {
            throw new IllegalStateException(String.format("Field '%s' annotated with @%s can't set kafka connection", field.getName(), this.getConnectionAnnotation().getSimpleName()), e);
        }
    }

    protected ContainerContext<KafkaConnection> createContainerContext(KafkaContainer container) {
        return new KafkaContext(container);
    }

    public void beforeAll(ExtensionContext context) {
        ContainerContext containerContext;
        super.beforeAll(context);
        KafkaMetadata metadata = (KafkaMetadata)this.getMetadata(context);
        if (!metadata.topics().isEmpty() && (containerContext = this.getContainerContext(context)) != null) {
            KafkaConnection connectionCurrent = (KafkaConnection)containerContext.connection();
            ExtensionContext.Store storage = this.getStorage(context);
            if (metadata.runMode() == ContainerMode.PER_RUN) {
                connectionCurrent.createTopics(metadata.topics());
                ((KafkaConnectionImpl)connectionCurrent).createTopicsIfNeeded(metadata.topics(), metadata.reset() != Topics.Mode.NONE);
                storage.put(Topics.class, (Object)metadata.reset());
            } else if (metadata.runMode() == ContainerMode.PER_CLASS) {
                ((KafkaConnectionImpl)connectionCurrent).createTopicsIfNeeded(metadata.topics(), false);
                storage.put(Topics.class, (Object)metadata.reset());
            }
        }
    }

    public void beforeEach(ExtensionContext context) {
        ContainerContext containerContext;
        KafkaMetadata metadata = (KafkaMetadata)this.getMetadata(context);
        if (metadata.runMode() == ContainerMode.PER_METHOD && metadata.reset() == Topics.Mode.PER_CLASS) {
            throw new ExtensionConfigurationException(String.format("@%s can't apply migration in Topics.Mode.PER_CLASS mode when ContainerMode.PER_METHOD is used", this.getContainerAnnotation().getSimpleName()));
        }
        super.beforeEach(context);
        if (!metadata.topics().isEmpty() && (containerContext = this.getContainerContext(context)) != null) {
            ExtensionContext.Store storage;
            Topics.Mode createdTopicsFlag;
            KafkaConnection connectionCurrent = (KafkaConnection)containerContext.connection();
            if (metadata.runMode() == ContainerMode.PER_METHOD) {
                ((KafkaConnectionImpl)connectionCurrent).createTopicsIfNeeded(metadata.topics(), false);
            } else if (metadata.reset() == Topics.Mode.PER_METHOD && (createdTopicsFlag = (Topics.Mode)((Object)(storage = this.getStorage(context)).get(Topics.class, Topics.Mode.class))) == null) {
                ((KafkaConnectionImpl)connectionCurrent).createTopicsIfNeeded(metadata.topics(), true);
            }
        }
    }

    public void afterEach(ExtensionContext context) {
        KafkaMetadata metadata = (KafkaMetadata)this.getMetadata(context);
        ExtensionContext.Store storage = this.getStorage(context);
        storage.remove(Topics.class);
        ContainerContext containerContext = this.getContainerContext(context);
        if (metadata.runMode() != ContainerMode.PER_METHOD) {
            ((KafkaContext)containerContext).pool().clear();
        }
        super.afterEach(context);
    }

    public void afterAll(ExtensionContext context) {
        super.afterAll(context);
    }

    public Object resolveParameter(ParameterContext parameterContext, ExtensionContext context) throws ParameterResolutionException {
        KafkaConnection connection = (KafkaConnection)super.resolveParameter(parameterContext, context);
        if (connection == null) {
            return null;
        }
        ConnectionKafka annotation = parameterContext.getParameter().getAnnotation(ConnectionKafka.class);
        if (annotation.properties().length == 0) {
            return connection;
        }
        if (annotation.properties().length % 2 != 0) {
            throw new ExtensionConfigurationException("@ConnectionKafka#properties must have even number, properties expected as map of keys and values");
        }
        Properties properties = connection.params().properties();
        for (int i = 0; i < annotation.properties().length; i += 2) {
            properties.put(annotation.properties()[i], annotation.properties()[i + 1]);
        }
        ContainerContext extensionContainer = this.getContainerContext(context);
        KafkaConnection paramConnection = connection.withProperties(properties);
        ((KafkaContext)extensionContainer).pool().add((KafkaConnectionImpl)paramConnection);
        return paramConnection;
    }
}

