/*
 * Decompiled with CFR 0.152.
 */
package io.kroxylicious.testing.kafka.junit5ext;

import edu.umd.cs.findbugs.annotations.NonNull;
import edu.umd.cs.findbugs.annotations.Nullable;
import info.schnatterer.mobynamesgenerator.MobyNamesGenerator;
import io.kroxylicious.testing.kafka.api.KafkaCluster;
import io.kroxylicious.testing.kafka.api.KafkaClusterConstraint;
import io.kroxylicious.testing.kafka.api.KafkaClusterProvisioningStrategy;
import io.kroxylicious.testing.kafka.api.KroxyliciousTestInfo;
import io.kroxylicious.testing.kafka.common.ClientConfig;
import io.kroxylicious.testing.kafka.internal.AdminSource;
import io.kroxylicious.testing.kafka.junit5ext.AmbiguousKafkaClusterException;
import io.kroxylicious.testing.kafka.junit5ext.ConstraintsMethodSource;
import io.kroxylicious.testing.kafka.junit5ext.DimensionMethodSource;
import io.kroxylicious.testing.kafka.junit5ext.Injector;
import io.kroxylicious.testing.kafka.junit5ext.Name;
import io.kroxylicious.testing.kafka.junit5ext.Topic;
import io.kroxylicious.testing.kafka.junit5ext.TopicConfig;
import io.kroxylicious.testing.kafka.junit5ext.TopicPartitions;
import io.kroxylicious.testing.kafka.junit5ext.TopicReplicationFactor;
import java.lang.annotation.Annotation;
import java.lang.reflect.AccessibleObject;
import java.lang.reflect.AnnotatedElement;
import java.lang.reflect.Executable;
import java.lang.reflect.Field;
import java.lang.reflect.Member;
import java.lang.reflect.Method;
import java.lang.reflect.Parameter;
import java.lang.reflect.ParameterizedType;
import java.lang.reflect.Type;
import java.nio.ByteBuffer;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Base64;
import java.util.Collection;
import java.util.Comparator;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.ServiceLoader;
import java.util.UUID;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.common.serialization.ByteBufferDeserializer;
import org.apache.kafka.common.serialization.ByteBufferSerializer;
import org.apache.kafka.common.serialization.BytesDeserializer;
import org.apache.kafka.common.serialization.BytesSerializer;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.DoubleDeserializer;
import org.apache.kafka.common.serialization.DoubleSerializer;
import org.apache.kafka.common.serialization.FloatDeserializer;
import org.apache.kafka.common.serialization.FloatSerializer;
import org.apache.kafka.common.serialization.IntegerDeserializer;
import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.common.serialization.ListDeserializer;
import org.apache.kafka.common.serialization.ListSerializer;
import org.apache.kafka.common.serialization.LongDeserializer;
import org.apache.kafka.common.serialization.LongSerializer;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.serialization.UUIDDeserializer;
import org.apache.kafka.common.serialization.UUIDSerializer;
import org.apache.kafka.common.serialization.VoidDeserializer;
import org.apache.kafka.common.serialization.VoidSerializer;
import org.apache.kafka.common.utils.Bytes;
import org.awaitility.Awaitility;
import org.junit.jupiter.api.TestInfo;
import org.junit.jupiter.api.TestTemplate;
import org.junit.jupiter.api.extension.BeforeAllCallback;
import org.junit.jupiter.api.extension.BeforeEachCallback;
import org.junit.jupiter.api.extension.Extension;
import org.junit.jupiter.api.extension.ExtensionConfigurationException;
import org.junit.jupiter.api.extension.ExtensionContext;
import org.junit.jupiter.api.extension.ParameterContext;
import org.junit.jupiter.api.extension.ParameterResolutionException;
import org.junit.jupiter.api.extension.ParameterResolver;
import org.junit.jupiter.api.extension.TestTemplateInvocationContext;
import org.junit.jupiter.api.extension.TestTemplateInvocationContextProvider;
import org.junit.platform.commons.support.AnnotationSupport;
import org.junit.platform.commons.support.HierarchyTraversalMode;
import org.junit.platform.commons.support.ReflectionSupport;
import org.junit.platform.commons.util.ExceptionUtils;
import org.junit.platform.commons.util.ReflectionUtils;

public class KafkaClusterExtension
implements ParameterResolver,
BeforeEachCallback,
BeforeAllCallback,
TestTemplateInvocationContextProvider {
    private static final System.Logger LOGGER = System.getLogger(KafkaClusterExtension.class.getName());
    private static final ExtensionContext.Namespace CLUSTER_NAMESPACE = ExtensionContext.Namespace.create((Object[])new Object[]{KafkaClusterExtension.class, KafkaCluster.class});
    private static final ExtensionContext.Namespace ADMIN_NAMESPACE = ExtensionContext.Namespace.create((Object[])new Object[]{KafkaClusterExtension.class, Admin.class});
    private static final ExtensionContext.Namespace PRODUCER_NAMESPACE = ExtensionContext.Namespace.create((Object[])new Object[]{KafkaClusterExtension.class, Producer.class});
    private static final ExtensionContext.Namespace CONSUMER_NAMESPACE = ExtensionContext.Namespace.create((Object[])new Object[]{KafkaClusterExtension.class, Consumer.class});
    public static final String STARTING_PREFIX = "WY9Br5K1vAfov_8jjJ3KUA";

    public boolean supportsTestTemplate(ExtensionContext context) {
        Parameter[] parameters;
        for (Parameter parameter : parameters = context.getRequiredTestMethod().getParameters()) {
            if (KafkaClusterExtension.supportsParameter(parameter)) continue;
            return false;
        }
        return true;
    }

    private static List<? extends List<? extends Object>> cartesianProduct(List<List<?>> domains) {
        if (domains.isEmpty()) {
            throw new IllegalArgumentException();
        }
        return KafkaClusterExtension._cartesianProduct(0, domains);
    }

    private static List<? extends List<? extends Object>> _cartesianProduct(int index, List<List<?>> domains) {
        ArrayList<List<Object>> ret = new ArrayList<List<Object>>();
        if (index == domains.size()) {
            ret.add(new ArrayList(domains.size()));
        } else {
            for (Object obj : domains.get(index)) {
                for (List<? extends Object> list : KafkaClusterExtension._cartesianProduct(index + 1, domains)) {
                    list.add(0, obj);
                    ret.add(list);
                }
            }
        }
        return ret;
    }

    public Stream<TestTemplateInvocationContext> provideTestTemplateInvocationContexts(ExtensionContext context) {
        Method testTemplateMethod = context.getRequiredTestMethod();
        Parameter[] parameters = testTemplateMethod.getParameters();
        Parameter parameter = Arrays.stream(parameters).filter(p -> KafkaCluster.class.isAssignableFrom(p.getType())).findFirst().get();
        DimensionMethodSource[] freeConstraintsSource = (DimensionMethodSource[])parameter.getAnnotationsByType(DimensionMethodSource.class);
        List<List<?>> lists = Arrays.stream(freeConstraintsSource).map(methodSource -> KafkaClusterExtension.invokeDimensionMethodSource(context, methodSource)).toList();
        final List<Object> cartesianProduct = lists.isEmpty() ? List.of() : KafkaClusterExtension.cartesianProduct(lists);
        ConstraintsMethodSource annotation = parameter.getAnnotation(ConstraintsMethodSource.class);
        final List constraints = annotation != null ? KafkaClusterExtension.invokeConstraintsMethodSource(context, annotation) : List.of();
        return Stream.concat(cartesianProduct.stream(), constraints.stream()).map(additionalConstraints -> new TestTemplateInvocationContext(){
            final /* synthetic */ List val$additionalConstraints;
            {
                this.val$additionalConstraints = list3;
            }

            public String getDisplayName(int invocationIndex) {
                List list = invocationIndex > cartesianProduct.size() ? (List)constraints.get(invocationIndex - cartesianProduct.size() - 1) : (List)cartesianProduct.get(invocationIndex - 1);
                return list.toString();
            }

            public List<Extension> getAdditionalExtensions() {
                return List.of(new ParameterResolver(){

                    public boolean supportsParameter(ParameterContext parameterContext, ExtensionContext extensionContext) {
                        return KafkaClusterExtension.supportsParameter(parameterContext.getParameter());
                    }

                    public Object resolveParameter(ParameterContext parameterContext, ExtensionContext extensionContext) {
                        return KafkaClusterExtension.resolveParameter(parameterContext, extensionContext, val$additionalConstraints);
                    }
                });
            }
        });
    }

    @NonNull
    private static List<? extends List<Annotation>> invokeConstraintsMethodSource(ExtensionContext context, ConstraintsMethodSource methodSource) {
        Object source;
        Method testTemplateMethod = context.getRequiredTestMethod();
        Class requiredTestClass = context.getRequiredTestClass();
        try {
            Method sourceMethod = KafkaClusterExtension.getTargetMethod(requiredTestClass, methodSource.clazz(), methodSource.value());
            if (ReflectionUtils.isNotStatic((Member)sourceMethod)) {
                throw new ParameterResolutionException("Method " + methodSource.value() + " given in @" + ConstraintsMethodSource.class.getSimpleName() + " on " + requiredTestClass + " must be static");
            }
            if (sourceMethod.getParameters().length != 0) {
                throw new ParameterResolutionException("Method " + methodSource.value() + " given in @" + ConstraintsMethodSource.class.getSimpleName() + " on " + requiredTestClass + " cannot have any parameters");
            }
            Class<?> returnType = sourceMethod.getReturnType();
            if (Stream.class.isAssignableFrom(returnType)) {
                Class clsTypeArg;
                Type type;
                ParameterizedType pt;
                Type genericReturnType = sourceMethod.getGenericReturnType();
                if (genericReturnType instanceof ParameterizedType && Stream.class.equals((Object)(pt = (ParameterizedType)genericReturnType).getRawType()) && (type = pt.getActualTypeArguments()[0]) instanceof Class && !(clsTypeArg = (Class)type).isAnnotation()) {
                    throw KafkaClusterExtension.returnTypeError(testTemplateMethod, methodSource.value(), ConstraintsMethodSource.class, requiredTestClass);
                }
            } else if (Collection.class.isAssignableFrom(returnType)) {
                Class clsTypeArg;
                Type type;
                ParameterizedType pt;
                Type genericReturnType = sourceMethod.getGenericReturnType();
                if (genericReturnType instanceof ParameterizedType && Collection.class.equals((Object)(pt = (ParameterizedType)genericReturnType).getRawType()) && (type = pt.getActualTypeArguments()[0]) instanceof Class && !(clsTypeArg = (Class)type).isAnnotation()) {
                    throw KafkaClusterExtension.returnTypeError(testTemplateMethod, methodSource.value(), ConstraintsMethodSource.class, requiredTestClass);
                }
            } else if (returnType.isArray()) {
                Class<?> elementType = returnType.getComponentType();
                if (!elementType.isAnnotation()) {
                    throw KafkaClusterExtension.returnTypeError(testTemplateMethod, methodSource.value(), ConstraintsMethodSource.class, requiredTestClass);
                }
            } else {
                throw new ParameterResolutionException("Method " + methodSource.value() + " given in @" + DimensionMethodSource.class.getSimpleName() + " on " + requiredTestClass + " must return a Stream, a Collection, or an array withAnnotation type");
            }
            source = ((Method)ReflectionUtils.makeAccessible((AccessibleObject)sourceMethod)).invoke(null, new Object[0]);
        }
        catch (ReflectiveOperationException e) {
            throw new ParameterResolutionException("Error invoking method " + methodSource.value() + " given in @" + DimensionMethodSource.class.getSimpleName() + " on " + requiredTestClass, (Throwable)e);
        }
        return KafkaClusterExtension.coerceToList(methodSource.value(), ConstraintsMethodSource.class, testTemplateMethod, requiredTestClass, source).stream().map(list -> KafkaClusterExtension.filterAnnotations(list, KafkaClusterConstraint.class)).toList();
    }

    @NonNull
    private static List<Annotation> invokeDimensionMethodSource(ExtensionContext context, DimensionMethodSource methodSource) {
        Object source;
        Method testTemplateMethod = context.getRequiredTestMethod();
        Class requiredTestClass = context.getRequiredTestClass();
        try {
            Method sourceMethod = KafkaClusterExtension.getTargetMethod(requiredTestClass, methodSource.clazz(), methodSource.value());
            if (ReflectionUtils.isNotStatic((Member)sourceMethod)) {
                throw new ParameterResolutionException("Method " + methodSource.value() + " given in @" + DimensionMethodSource.class.getSimpleName() + " on " + requiredTestClass + " must be static");
            }
            if (sourceMethod.getParameters().length != 0) {
                throw new ParameterResolutionException("Method " + methodSource.value() + " given in @" + DimensionMethodSource.class.getSimpleName() + " on " + requiredTestClass + " cannot have any parameters");
            }
            Class<?> returnType = sourceMethod.getReturnType();
            if (Stream.class.isAssignableFrom(returnType)) {
                Class clsTypeArg;
                Type type;
                ParameterizedType pt;
                Type genericReturnType = sourceMethod.getGenericReturnType();
                if (genericReturnType instanceof ParameterizedType && Stream.class.equals((Object)(pt = (ParameterizedType)genericReturnType).getRawType()) && (type = pt.getActualTypeArguments()[0]) instanceof Class && !(clsTypeArg = (Class)type).isAnnotation()) {
                    throw KafkaClusterExtension.returnTypeError(testTemplateMethod, methodSource.value(), DimensionMethodSource.class, requiredTestClass);
                }
            } else if (Collection.class.isAssignableFrom(returnType)) {
                Class clsTypeArg;
                Type type;
                ParameterizedType pt;
                Type genericReturnType = sourceMethod.getGenericReturnType();
                if (genericReturnType instanceof ParameterizedType && Collection.class.equals((Object)(pt = (ParameterizedType)genericReturnType).getRawType()) && (type = pt.getActualTypeArguments()[0]) instanceof Class && !(clsTypeArg = (Class)type).isAnnotation()) {
                    throw KafkaClusterExtension.returnTypeError(testTemplateMethod, methodSource.value(), DimensionMethodSource.class, requiredTestClass);
                }
            } else if (returnType.isArray()) {
                Class<?> elementType = returnType.getComponentType();
                if (!elementType.isAnnotation()) {
                    throw KafkaClusterExtension.returnTypeError(testTemplateMethod, methodSource.value(), DimensionMethodSource.class, requiredTestClass);
                }
            } else {
                throw new ParameterResolutionException("Method " + methodSource.value() + " given in @" + DimensionMethodSource.class.getSimpleName() + " on " + requiredTestClass + " must return a Stream, a Collection, or an array withAnnotation type");
            }
            source = sourceMethod.invoke(null, new Object[0]);
        }
        catch (ReflectiveOperationException e) {
            throw new ParameterResolutionException("Error invoking method " + methodSource.value() + " given in @" + DimensionMethodSource.class.getSimpleName() + " on " + requiredTestClass, (Throwable)e);
        }
        return KafkaClusterExtension.filterAnnotations(KafkaClusterExtension.coerceToList(methodSource.value(), DimensionMethodSource.class, testTemplateMethod, requiredTestClass, source), KafkaClusterConstraint.class);
    }

    @NonNull
    private static Method getTargetMethod(Class<?> clazz, Class<?> methodClazz, String methodName) throws NoSuchMethodException {
        Class<?> target = methodClazz == null || methodClazz == Void.class ? clazz : methodClazz;
        return (Method)ReflectionUtils.makeAccessible((AccessibleObject)target.getDeclaredMethod(methodName, new Class[0]));
    }

    @NonNull
    private static <T> List<T> coerceToList(String methodName, Class<? extends Annotation> annotationType, Method testTemplateMethod, Class<?> requiredTestClass, Object source) {
        List<Object> list;
        if (source instanceof Stream) {
            list = ((Stream)source).toList();
        } else if (source instanceof List) {
            list = (ArrayList)source;
        } else if (source instanceof Collection) {
            list = new ArrayList((Collection)source);
        } else if (source instanceof Object[]) {
            list = Arrays.asList((Object[])source);
        } else {
            throw KafkaClusterExtension.returnTypeError(testTemplateMethod, methodName, annotationType, requiredTestClass);
        }
        return list;
    }

    @NonNull
    private static ParameterResolutionException returnTypeError(Method testTemplateMethod, String methodName, Class<? extends Annotation> annotationType, Class<?> requiredTestClass) {
        return new ParameterResolutionException("Method " + methodName + " given in @" + annotationType.getSimpleName() + " on " + testTemplateMethod.getName() + "() of " + requiredTestClass + " must return a Stream, a Collection, or an array withAnnotation type");
    }

    public boolean supportsParameter(ParameterContext parameterContext, ExtensionContext extensionContext) throws ParameterResolutionException {
        return !this.hasTestTemplateConfiguration(parameterContext.getDeclaringExecutable()) && KafkaClusterExtension.supportsParameter(parameterContext.getParameter());
    }

    private boolean hasTestTemplateConfiguration(Executable executable) {
        return executable.isAnnotationPresent(TestTemplate.class) && Arrays.stream(executable.getParameters()).anyMatch(p -> ((DimensionMethodSource[])p.getAnnotationsByType(DimensionMethodSource.class)).length > 0 || ((ConstraintsMethodSource[])p.getAnnotationsByType(ConstraintsMethodSource.class)).length > 0);
    }

    public Object resolveParameter(ParameterContext parameterContext, ExtensionContext extensionContext) throws ParameterResolutionException {
        return KafkaClusterExtension.resolveParameter(parameterContext, extensionContext, List.of());
    }

    public static Object resolveParameter(ParameterContext parameterContext, ExtensionContext extensionContext, List<Annotation> extraConstraints) throws ParameterResolutionException {
        Parameter parameter = parameterContext.getParameter();
        Class<?> type = parameter.getType();
        LOGGER.log(System.Logger.Level.TRACE, "test {0}: Resolving parameter ({1} {2})", extensionContext.getUniqueId(), type.getSimpleName(), parameter.getName());
        if (KafkaCluster.class.isAssignableFrom(type)) {
            Class<KafkaCluster> paramType = type.asSubclass(KafkaCluster.class);
            ArrayList<Annotation> constraints = KafkaClusterExtension.getConstraintAnnotations(parameter, KafkaClusterConstraint.class);
            constraints.addAll(extraConstraints);
            return KafkaClusterExtension.getCluster(parameter, paramType, constraints, extensionContext);
        }
        if (Admin.class.isAssignableFrom(type)) {
            Class<Admin> paramType = type.asSubclass(Admin.class);
            return KafkaClusterExtension.createAdmin("parameter " + parameter.getName(), parameter, paramType, extensionContext);
        }
        if (Producer.class.isAssignableFrom(type)) {
            Class<Producer> paramType = type.asSubclass(Producer.class);
            Type paramGenericType = parameterContext.getDeclaringExecutable().getGenericParameterTypes()[parameterContext.getIndex()];
            return KafkaClusterExtension.createProducer("parameter " + parameter.getName(), parameter, paramType, paramGenericType, extensionContext);
        }
        if (Consumer.class.isAssignableFrom(type)) {
            Class<Consumer> paramType = type.asSubclass(Consumer.class);
            Type paramGenericType = parameterContext.getDeclaringExecutable().getGenericParameterTypes()[parameterContext.getIndex()];
            return KafkaClusterExtension.createConsumer("parameter " + parameter.getName(), parameter, paramType, paramGenericType, extensionContext);
        }
        if (Topic.class.isAssignableFrom(type)) {
            Class<Topic> paramType = type.asSubclass(Topic.class);
            return KafkaClusterExtension.createTopic("parameter " + parameter.getName(), parameter, paramType, type, extensionContext);
        }
        throw new ExtensionConfigurationException("Could not resolve " + parameterContext);
    }

    public void beforeAll(ExtensionContext context) throws Exception {
        this.injectStaticFields(context, context.getRequiredTestClass());
    }

    public void beforeEach(ExtensionContext context) throws Exception {
        context.getRequiredTestInstances().getAllInstances().forEach(instance -> this.injectInstanceFields(context, instance));
    }

    private static boolean supportsParameter(Parameter parameter) {
        Class<?> type = parameter.getType();
        return KafkaCluster.class.isAssignableFrom(type) || (KafkaClusterExtension.isKafkaClient(type) || KafkaClusterExtension.isKafkaTopic(parameter.getType())) && KafkaClusterExtension.isCandidate(parameter);
    }

    private static boolean isCandidate(AnnotatedElement annotatedElement) {
        return KafkaClusterExtension.noAnnotations(annotatedElement) || KafkaClusterExtension.hasOnlySupportedAnnotations(annotatedElement);
    }

    private static boolean isKafkaClient(Class<?> type) {
        return Admin.class.isAssignableFrom(type) || Producer.class.isAssignableFrom(type) || Consumer.class.isAssignableFrom(type);
    }

    private static boolean isKafkaTopic(Class<?> type) {
        return Topic.class.isAssignableFrom(type);
    }

    private static boolean noAnnotations(AnnotatedElement parameter) {
        return parameter.getAnnotations().length == 0;
    }

    private static boolean hasOnlySupportedAnnotations(AnnotatedElement parameter) {
        boolean supported = true;
        for (Annotation annotation : parameter.getAnnotations()) {
            String canonicalName = annotation.annotationType().getCanonicalName();
            if (canonicalName.startsWith("io.kroxylicious") || canonicalName.startsWith("org.junit") || canonicalName.startsWith("java.lang")) continue;
            supported = false;
            break;
        }
        return supported;
    }

    private void injectInstanceFields(ExtensionContext context, Object instance) {
        this.injectFields(context, instance, instance.getClass(), ReflectionUtils::isNotStatic);
    }

    private void injectStaticFields(ExtensionContext context, Class<?> testClass) {
        this.injectFields(context, null, testClass, ReflectionUtils::isStatic);
    }

    private void injectFields(ExtensionContext context, Object testInstance, Class<?> testClass, Predicate<Field> predicate) {
        ReflectionSupport.findFields(testClass, field -> predicate.test((Field)field) && KafkaCluster.class.isAssignableFrom(field.getType()), (HierarchyTraversalMode)HierarchyTraversalMode.BOTTOM_UP).forEach(field -> {
            this.assertSupportedType("field", field.getType());
            try {
                Field accessibleField = (Field)ReflectionUtils.makeAccessible((AccessibleObject)field);
                ArrayList<Annotation> constraints = KafkaClusterExtension.getConstraintAnnotations(accessibleField, KafkaClusterConstraint.class);
                accessibleField.set(testInstance, KafkaClusterExtension.getCluster(accessibleField, accessibleField.getType().asSubclass(KafkaCluster.class), constraints, context));
            }
            catch (Throwable t) {
                ExceptionUtils.throwAsUncheckedException((Throwable)t);
            }
        });
        Map<Class<?>, List<Field>> fieldsByType = ReflectionSupport.findFields(testClass, field -> predicate.test((Field)field) && KafkaClusterExtension.isCandidate(field), (HierarchyTraversalMode)HierarchyTraversalMode.BOTTOM_UP).stream().collect(Collectors.groupingBy(Field::getType));
        KafkaClusterExtension.injectField(Admin.class, KafkaClusterExtension::createAdmin, context, testInstance, fieldsByType);
        KafkaClusterExtension.injectField(Producer.class, KafkaClusterExtension::createProducer, context, testInstance, fieldsByType);
        KafkaClusterExtension.injectField(Consumer.class, KafkaClusterExtension::createConsumer, context, testInstance, fieldsByType);
        KafkaClusterExtension.injectField(Topic.class, KafkaClusterExtension::createTopic, context, testInstance, fieldsByType);
    }

    private static <T, X extends T> void injectField(Class<T> clientType, Injector<T, X> injector, ExtensionContext context, Object testInstance, Map<Class<?>, List<Field>> fieldsByType) {
        fieldsByType.entrySet().stream().filter(entry -> clientType.isAssignableFrom((Class)entry.getKey())).map(Map.Entry::getValue).flatMap(Collection::stream).filter(field -> {
            try {
                return ((Field)ReflectionUtils.makeAccessible((AccessibleObject)field)).get(testInstance) == null;
            }
            catch (IllegalAccessException e) {
                ExceptionUtils.throwAsUncheckedException((Throwable)e);
                return false;
            }
        }).forEach(field -> {
            try {
                ((Field)ReflectionUtils.makeAccessible((AccessibleObject)field)).set(testInstance, injector.inject("field " + field.getName(), (AnnotatedElement)field, field.getType().asSubclass(clientType), field.getGenericType(), context));
            }
            catch (Exception e) {
                ExceptionUtils.throwAsUncheckedException((Throwable)e);
            }
        });
    }

    @Nullable
    private static Serializer<?> getSerializerFromGenericType(Type type, int typeArgumentIndex) {
        Class cls;
        ParameterizedType pt;
        Type type2;
        Serializer<?> keySerializer = null;
        if (type instanceof ParameterizedType && (type2 = (pt = (ParameterizedType)type).getRawType()) instanceof Class && Producer.class.isAssignableFrom(cls = (Class)type2)) {
            Type key = pt.getActualTypeArguments()[typeArgumentIndex];
            keySerializer = KafkaClusterExtension.getSerializerFromType(key);
        }
        return keySerializer;
    }

    private static Serializer<?> getSerializerFromType(Type keyOrValueType) {
        ParameterizedType pt;
        StringSerializer serializer = null;
        if (keyOrValueType instanceof Class) {
            if (keyOrValueType == String.class) {
                serializer = new StringSerializer();
            } else if (keyOrValueType == Integer.class) {
                serializer = new IntegerSerializer();
            } else if (keyOrValueType == Long.class) {
                serializer = new LongSerializer();
            } else if (keyOrValueType == UUID.class) {
                serializer = new UUIDSerializer();
            } else if (keyOrValueType == Float.class) {
                serializer = new FloatSerializer();
            } else if (keyOrValueType == Double.class) {
                serializer = new DoubleSerializer();
            } else if (keyOrValueType == byte[].class) {
                serializer = new ByteArraySerializer();
            } else if (keyOrValueType == ByteBuffer.class) {
                serializer = new ByteBufferSerializer();
            } else if (keyOrValueType == Bytes.class) {
                serializer = new BytesSerializer();
            } else if (keyOrValueType == Void.class) {
                serializer = new VoidSerializer();
            }
        } else if (keyOrValueType instanceof ParameterizedType && List.class == (pt = (ParameterizedType)keyOrValueType).getRawType()) {
            return new ListSerializer(KafkaClusterExtension.getSerializerFromType(keyOrValueType));
        }
        return serializer;
    }

    @Nullable
    private static Deserializer<?> getDeserializerFromGenericType(Type type, int typeArgumentIndex) {
        Class cls;
        ParameterizedType pt;
        Type type2;
        Deserializer<?> deserializer = null;
        if (type instanceof ParameterizedType && (type2 = (pt = (ParameterizedType)type).getRawType()) instanceof Class && Consumer.class.isAssignableFrom(cls = (Class)type2)) {
            Type key = pt.getActualTypeArguments()[typeArgumentIndex];
            deserializer = KafkaClusterExtension.getDeserializerFromType(key);
        }
        return deserializer;
    }

    private static Deserializer<?> getDeserializerFromType(Type keyOrValueType) {
        Type ta;
        ParameterizedType pt;
        StringDeserializer deserializer = null;
        if (keyOrValueType instanceof Class) {
            if (keyOrValueType == String.class) {
                deserializer = new StringDeserializer();
            } else if (keyOrValueType == Integer.class) {
                deserializer = new IntegerDeserializer();
            } else if (keyOrValueType == Long.class) {
                deserializer = new LongDeserializer();
            } else if (keyOrValueType == UUID.class) {
                deserializer = new UUIDDeserializer();
            } else if (keyOrValueType == Float.class) {
                deserializer = new FloatDeserializer();
            } else if (keyOrValueType == Double.class) {
                deserializer = new DoubleDeserializer();
            } else if (keyOrValueType == byte[].class) {
                deserializer = new ByteArrayDeserializer();
            } else if (keyOrValueType == ByteBuffer.class) {
                deserializer = new ByteBufferDeserializer();
            } else if (keyOrValueType == Bytes.class) {
                deserializer = new BytesDeserializer();
            } else if (keyOrValueType == Void.class) {
                deserializer = new VoidDeserializer();
            }
        } else if (keyOrValueType instanceof ParameterizedType && List.class == (pt = (ParameterizedType)keyOrValueType).getRawType() && (ta = pt.getActualTypeArguments()[0]) instanceof Class) {
            Class cls = (Class)ta;
            return new ListDeserializer(cls, KafkaClusterExtension.getDeserializerFromType(keyOrValueType));
        }
        return deserializer;
    }

    private static Iterable<String> uuidsFrom(String startingPrefix) {
        if (startingPrefix.length() > 22) {
            throw new IllegalArgumentException("startingPrefix is too long to be a Base64-encoded UUID prefix");
        }
        int pad = 22 - startingPrefix.length();
        StringBuilder stringBuilder = new StringBuilder(startingPrefix);
        for (int i = 0; i < pad; ++i) {
            stringBuilder.append('0');
        }
        byte[] decode = Base64.getUrlDecoder().decode(stringBuilder.toString());
        final ByteBuffer bb = ByteBuffer.wrap(decode);
        final long msb = bb.getLong();
        final long lsb = bb.getLong();
        return () -> new Iterator<String>(){
            long most;
            long least;
            {
                this.most = msb;
                this.least = lsb;
            }

            @Override
            public boolean hasNext() {
                return true;
            }

            @Override
            public String next() {
                long oldLeast = this.least;
                if (oldLeast > 0L && this.least < 0L) {
                    ++this.most;
                }
                bb.putLong(0, this.most).putLong(8, this.least);
                ++this.least;
                return Base64.getUrlEncoder().withoutPadding().encodeToString(bb.array());
            }
        };
    }

    private static KafkaCluster findClusterFromContext(AnnotatedElement element, ExtensionContext extensionContext, Class<?> type, String description) {
        String clusterName;
        ExtensionContext.Store store = extensionContext.getStore(CLUSTER_NAMESPACE);
        if (element.isAnnotationPresent(Name.class) && !element.getAnnotation(Name.class).value().isEmpty()) {
            clusterName = element.getAnnotation(Name.class).value();
        } else {
            clusterName = KafkaClusterExtension.findLastUsedClusterId(store, KafkaClusterExtension.uuidsFrom(STARTING_PREFIX));
            if (clusterName == null || !clusterName.equals(STARTING_PREFIX)) {
                throw new AmbiguousKafkaClusterException("KafkaCluster to associate with " + description + " is ambiguous, use @Name on the intended cluster and this element to disambiguate");
            }
        }
        LOGGER.log(System.Logger.Level.TRACE, "test {0}: decl {1}: Looking up cluster {2}", extensionContext.getUniqueId(), element, clusterName);
        Closeable last = (Closeable)store.get((Object)clusterName, Closeable.class);
        Objects.requireNonNull(last);
        return (KafkaCluster)last.get();
    }

    private static KafkaCluster getCluster(AnnotatedElement sourceElement, Class<? extends KafkaCluster> type, List<Annotation> constraints, ExtensionContext extensionContext) {
        String clusterName;
        ExtensionContext.Store store = extensionContext.getStore(CLUSTER_NAMESPACE);
        if (sourceElement.isAnnotationPresent(Name.class) && !sourceElement.getAnnotation(Name.class).value().isEmpty()) {
            clusterName = sourceElement.getAnnotation(Name.class).value();
            if (store.get((Object)clusterName) != null && !constraints.isEmpty()) {
                throw new ExtensionConfigurationException("A " + KafkaCluster.class.getSimpleName() + "-typed declaration with @Name(\"" + clusterName + "\") already exists, we cannot apply new constraints");
            }
        } else {
            Iterable<String> clusterIdIter = KafkaClusterExtension.uuidsFrom(STARTING_PREFIX);
            clusterName = KafkaClusterExtension.findFirstUnusedClusterId(store, clusterIdIter);
        }
        LOGGER.log(System.Logger.Level.TRACE, "test {0}: decl {1}: cluster ''{2}'': Looking up cluster", extensionContext.getUniqueId(), sourceElement, clusterName);
        Closeable closeableCluster = (Closeable)store.getOrComputeIfAbsent((Object)clusterName, __ -> KafkaClusterExtension.createCluster(extensionContext, clusterName, type, sourceElement, constraints), Closeable.class);
        Objects.requireNonNull(closeableCluster);
        KafkaCluster cluster = (KafkaCluster)closeableCluster.get();
        LOGGER.log(System.Logger.Level.TRACE, "test {0}: decl {1}: cluster ''{2}'': Starting", extensionContext.getUniqueId(), sourceElement, clusterName);
        return cluster;
    }

    private static String findFirstUnusedClusterId(ExtensionContext.Store store, Iterable<String> clusterIdIter) {
        String clusterId;
        Object cluster;
        Iterator<String> it = clusterIdIter.iterator();
        while ((cluster = store.get((Object)(clusterId = it.next()))) != null) {
        }
        return clusterId;
    }

    private static String findLastUsedClusterId(ExtensionContext.Store store, Iterable<String> clusterIdIter) {
        Iterator<String> it = clusterIdIter.iterator();
        String last = null;
        String clusterId;
        Object cluster;
        while ((cluster = store.get((Object)(clusterId = it.next()))) != null) {
            last = clusterId;
        }
        return last;
    }

    private static Admin createAdmin(String description, AnnotatedElement sourceElement, Class<? extends Admin> type, ExtensionContext extensionContext) {
        return KafkaClusterExtension.createAdmin(description, sourceElement, type, Void.class, extensionContext);
    }

    private static Admin createAdmin(String description, AnnotatedElement sourceElement, Class<? extends Admin> type, Type genericType, ExtensionContext extensionContext) {
        KafkaCluster cluster = KafkaClusterExtension.findClusterFromContext(sourceElement, extensionContext, type, description);
        return (Admin)((Closeable)extensionContext.getStore(ADMIN_NAMESPACE).getOrComputeIfAbsent((Object)sourceElement, __ -> {
            LOGGER.log(System.Logger.Level.TRACE, "test {0}: decl {1}: Creating Admin", extensionContext.getUniqueId(), sourceElement);
            return new Closeable<Admin>(sourceElement, cluster.getClusterId(), Admin.create(KafkaClusterExtension.buildConfig(sourceElement, cluster)));
        }, Closeable.class)).get();
    }

    private static Producer<?, ?> createProducer(String description, AnnotatedElement sourceElement, Class<? extends Producer<?, ?>> type, Type genericType, ExtensionContext extensionContext) {
        Serializer<?> keySerializer = KafkaClusterExtension.getSerializerFromGenericType(genericType, 0);
        LOGGER.log(System.Logger.Level.TRACE, "test {0}: decl {1}: key serializer {2}", extensionContext.getUniqueId(), sourceElement, keySerializer);
        Serializer<?> valueSerializer = KafkaClusterExtension.getSerializerFromGenericType(genericType, 1);
        LOGGER.log(System.Logger.Level.TRACE, "test {0}: decl {1}: value serializer {2}", extensionContext.getUniqueId(), sourceElement, valueSerializer);
        KafkaCluster cluster = KafkaClusterExtension.findClusterFromContext(sourceElement, extensionContext, type, description);
        return (Producer)((Closeable)extensionContext.getStore(PRODUCER_NAMESPACE).getOrComputeIfAbsent((Object)sourceElement, __ -> {
            LOGGER.log(System.Logger.Level.TRACE, "test {0}: decl {1}: Creating KafkaProducer<>", extensionContext.getUniqueId(), sourceElement);
            return new Closeable<KafkaProducer>(sourceElement, cluster.getClusterId(), new KafkaProducer(KafkaClusterExtension.buildConfig(sourceElement, cluster), keySerializer, valueSerializer));
        }, Closeable.class)).get();
    }

    private static Consumer<?, ?> createConsumer(String description, AnnotatedElement sourceElement, Class<? extends Consumer<?, ?>> type, Type genericType, ExtensionContext extensionContext) {
        Deserializer<?> keySerializer = KafkaClusterExtension.getDeserializerFromGenericType(genericType, 0);
        LOGGER.log(System.Logger.Level.TRACE, "test {0}: decl {1}: key deserializer {2}", extensionContext.getUniqueId(), sourceElement, keySerializer);
        Deserializer<?> valueSerializer = KafkaClusterExtension.getDeserializerFromGenericType(genericType, 1);
        LOGGER.log(System.Logger.Level.TRACE, "test {0}: decl {1}: value deserializer {2}", extensionContext.getUniqueId(), sourceElement, valueSerializer);
        KafkaCluster cluster = KafkaClusterExtension.findClusterFromContext(sourceElement, extensionContext, type, description);
        return (Consumer)((Closeable)extensionContext.getStore(CONSUMER_NAMESPACE).getOrComputeIfAbsent((Object)sourceElement, __ -> {
            LOGGER.log(System.Logger.Level.TRACE, "test {0}: decl {1}: Creating KafkaConsumer<>", extensionContext.getUniqueId(), sourceElement);
            return new Closeable<KafkaConsumer>(sourceElement, cluster.getClusterId(), new KafkaConsumer(KafkaClusterExtension.buildConfig(sourceElement, cluster), keySerializer, valueSerializer));
        }, Closeable.class)).get();
    }

    @NonNull
    private static Topic createTopic(String description, AnnotatedElement sourceElement, Class<? extends Topic> type, Type genericType, ExtensionContext extensionContext) {
        KafkaCluster cluster = KafkaClusterExtension.findClusterFromContext(sourceElement, extensionContext, type, description);
        if (cluster instanceof AdminSource) {
            AdminSource adminSource = (AdminSource)cluster;
            try (Admin admin = adminSource.createAdmin();){
                String topicName = MobyNamesGenerator.getRandomName();
                Optional<Integer> numPartitions = Optional.ofNullable(sourceElement.getAnnotation(TopicPartitions.class)).map(TopicPartitions::value);
                Optional<Short> replicationFactor = Optional.ofNullable(sourceElement.getAnnotation(TopicReplicationFactor.class)).map(TopicReplicationFactor::value);
                NewTopic topicDef = new NewTopic(topicName, numPartitions, replicationFactor).configs(KafkaClusterExtension.buildTopicConfig(sourceElement));
                KafkaFuture createFuture = admin.createTopics(List.of(topicDef)).all();
                Awaitility.await().failFast(() -> ((KafkaFuture)createFuture).isCompletedExceptionally()).atMost(Duration.ofSeconds(5L)).until(() -> (Map)admin.listTopics().namesToListings().get(), n -> n.containsKey(topicName));
                Topic topic = () -> topicName;
                return topic;
            }
        }
        throw new UnsupportedOperationException("Kafka cluster " + cluster.getClass() + " does not support producing an anonymous admin client.");
    }

    private static Map<String, Object> buildConfig(AnnotatedElement sourceElement, KafkaCluster cluster) {
        Map clientConfig = cluster.getKafkaClientConfiguration();
        for (Annotation annotation : sourceElement.getAnnotations()) {
            if (annotation instanceof ClientConfig.List) {
                ClientConfig.List configList = (ClientConfig.List)annotation;
                for (ClientConfig config : configList.value()) {
                    clientConfig.put(config.name(), config.value());
                }
                continue;
            }
            if (!(annotation instanceof ClientConfig)) continue;
            ClientConfig config = (ClientConfig)annotation;
            clientConfig.put(config.name(), config.value());
        }
        return clientConfig;
    }

    private static Map<String, String> buildTopicConfig(AnnotatedElement sourceElement) {
        HashMap<String, String> topicConfig = new HashMap<String, String>();
        for (Annotation annotation : sourceElement.getAnnotations()) {
            if (annotation instanceof TopicConfig.List) {
                TopicConfig.List configList = (TopicConfig.List)annotation;
                for (TopicConfig config : configList.value()) {
                    topicConfig.put(config.name(), config.value());
                }
                continue;
            }
            if (!(annotation instanceof TopicConfig)) continue;
            TopicConfig config = (TopicConfig)annotation;
            topicConfig.put(config.name(), config.value());
        }
        return topicConfig;
    }

    private static Closeable<KafkaCluster> createCluster(ExtensionContext extensionContext, String clusterName, Class<? extends KafkaCluster> type, AnnotatedElement sourceElement, List<Annotation> constraints) {
        LOGGER.log(System.Logger.Level.TRACE, "test {0}: decl: {1}: cluster ''{2}'': Creating new cluster", extensionContext.getUniqueId(), sourceElement, clusterName);
        LOGGER.log(System.Logger.Level.TRACE, "test {0}: decl: {1}: cluster ''{2}'': Constraints {3}", extensionContext.getUniqueId(), sourceElement, clusterName, constraints);
        KafkaClusterProvisioningStrategy best = KafkaClusterExtension.findBestProvisioningStrategy(constraints, type);
        LOGGER.log(System.Logger.Level.TRACE, "test {0}: decl: {1}: cluster ''{2}'': Chosen provisioning strategy: {3}", extensionContext.getUniqueId(), sourceElement, clusterName, best);
        KafkaCluster c = best.create(constraints, type, (TestInfo)KafkaClusterExtension.generateTestInfo(extensionContext));
        LOGGER.log(System.Logger.Level.TRACE, "test {0}: decl: {1}: cluster ''{2}'': Created", extensionContext.getUniqueId(), sourceElement, clusterName);
        c.start();
        return new Closeable<KafkaCluster>(sourceElement, clusterName, c);
    }

    @NonNull
    private static KroxyliciousTestInfo generateTestInfo(ExtensionContext extensionContext) {
        return new KroxyliciousTestInfo(extensionContext.getDisplayName(), extensionContext.getTestClass(), extensionContext.getTestMethod(), extensionContext.getTags());
    }

    @NonNull
    private static ArrayList<Annotation> getConstraintAnnotations(AnnotatedElement sourceElement, Class<? extends Annotation> metaAnnotationType) {
        ArrayList<Annotation> constraints;
        if (AnnotationSupport.isAnnotated((AnnotatedElement)sourceElement, metaAnnotationType)) {
            Annotation[] annotations = sourceElement.getAnnotations();
            constraints = KafkaClusterExtension.filterAnnotations(annotations, metaAnnotationType);
        } else {
            constraints = new ArrayList<Annotation>();
        }
        return constraints;
    }

    @NonNull
    private static ArrayList<Annotation> filterAnnotations(List<Annotation> annotations, Class<? extends Annotation> metaAnnotationType) {
        return KafkaClusterExtension.filterAnnotations(annotations.stream(), metaAnnotationType);
    }

    @NonNull
    private static ArrayList<Annotation> filterAnnotations(Annotation[] annotations, Class<? extends Annotation> metaAnnotationType) {
        return KafkaClusterExtension.filterAnnotations(Arrays.stream(annotations), metaAnnotationType);
    }

    @NonNull
    private static ArrayList<Annotation> filterAnnotations(Stream<Annotation> annotations, Class<? extends Annotation> metaAnnotationType) {
        ArrayList constraints = annotations.filter(anno -> anno.annotationType().isAnnotationPresent(metaAnnotationType)).collect(Collectors.toCollection(ArrayList::new));
        return constraints;
    }

    static KafkaClusterProvisioningStrategy findBestProvisioningStrategy(List<Annotation> constraints, Class<? extends KafkaCluster> declarationType) {
        ServiceLoader<KafkaClusterProvisioningStrategy> loader = ServiceLoader.load(KafkaClusterProvisioningStrategy.class);
        return loader.stream().map(ServiceLoader.Provider::get).filter(strategy -> {
            boolean supports = strategy.supportsType(declarationType);
            if (!supports) {
                LOGGER.log(System.Logger.Level.TRACE, "Excluding {0} because it is not compatible with declaration of type {1}", strategy, declarationType.getName());
            }
            return supports;
        }).filter(strategy -> {
            for (Annotation anno : constraints) {
                boolean supports = strategy.supportsAnnotation(anno);
                if (supports) continue;
                LOGGER.log(System.Logger.Level.TRACE, "Excluding {0} because doesn't support {1}", strategy, anno);
                return false;
            }
            return true;
        }).min(Comparator.comparing(x -> x.estimatedProvisioningTimeMs(constraints, declarationType))).orElseThrow(() -> {
            List<Class> strategies = ServiceLoader.load(KafkaClusterProvisioningStrategy.class).stream().map(ServiceLoader.Provider::type).toList();
            return new ExtensionConfigurationException("No provisioning strategy for a declaration of type " + declarationType.getName() + " and supporting all of " + constraints + " was found (tried: " + KafkaClusterExtension.classNames(strategies) + ")");
        });
    }

    @NonNull
    private static List<String> classNames(Collection<? extends Class<?>> constraints) {
        return constraints.stream().map(Class::getName).sorted().toList();
    }

    private void assertSupportedType(String target, Class<?> type) {
        if (!KafkaCluster.class.isAssignableFrom(type)) {
            throw new ExtensionConfigurationException("Can only resolve declarations of type " + KafkaCluster.class + " but " + target + " has type " + type.getName());
        }
    }

    static class Closeable<T extends AutoCloseable>
    implements ExtensionContext.Store.CloseableResource {
        private final String clusterName;
        private final T resource;
        private final AnnotatedElement sourceElement;

        public Closeable(AnnotatedElement sourceElement, String clusterName, T resource) {
            this.sourceElement = sourceElement;
            this.clusterName = clusterName;
            this.resource = resource;
        }

        public T get() {
            return this.resource;
        }

        public void close() throws Throwable {
            LOGGER.log(System.Logger.Level.TRACE, "Stopping '{0}' with cluster name '{1}' for {2}", this.resource, this.clusterName, this.sourceElement);
            this.resource.close();
        }
    }
}

