/*
 * Decompiled with CFR 0.152.
 */
package kafka.test.junit;

import java.io.File;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import kafka.api.IntegrationTestHarness;
import kafka.network.SocketServer;
import kafka.server.KafkaServer;
import kafka.test.ClusterConfig;
import kafka.test.ClusterInstance;
import kafka.test.junit.ClusterInstanceParameterResolver;
import kafka.test.junit.GenericParameterResolver;
import kafka.utils.TestUtils;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.common.network.ListenerName;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.junit.jupiter.api.extension.Extension;
import org.junit.jupiter.api.extension.TestTemplateInvocationContext;
import scala.Option;
import scala.collection.JavaConverters;
import scala.compat.java8.OptionConverters;

public class ZkClusterInvocationContext
implements TestTemplateInvocationContext {
    private final ClusterConfig clusterConfig;
    private final AtomicReference<IntegrationTestHarness> clusterReference;

    public ZkClusterInvocationContext(ClusterConfig clusterConfig) {
        this.clusterConfig = clusterConfig;
        this.clusterReference = new AtomicReference();
    }

    public String getDisplayName(int invocationIndex) {
        String clusterDesc = this.clusterConfig.nameTags().entrySet().stream().map(Object::toString).collect(Collectors.joining(", "));
        return String.format("[Zk %d] %s", invocationIndex, clusterDesc);
    }

    public List<Extension> getAdditionalExtensions() {
        if (this.clusterConfig.numControllers() != 1) {
            throw new IllegalArgumentException("For ZK clusters, please specify exactly 1 controller.");
        }
        ZkClusterInstance clusterShim = new ZkClusterInstance(this.clusterConfig, this.clusterReference);
        return Arrays.asList(context -> {
            IntegrationTestHarness cluster = new IntegrationTestHarness(){

                @Override
                public Properties serverConfig() {
                    return ZkClusterInvocationContext.this.clusterConfig.serverProperties();
                }

                @Override
                public Properties adminClientConfig() {
                    return ZkClusterInvocationContext.this.clusterConfig.adminClientProperties();
                }

                @Override
                public Properties consumerConfig() {
                    return ZkClusterInvocationContext.this.clusterConfig.consumerProperties();
                }

                @Override
                public Properties producerConfig() {
                    return ZkClusterInvocationContext.this.clusterConfig.producerProperties();
                }

                @Override
                public SecurityProtocol securityProtocol() {
                    return ZkClusterInvocationContext.this.clusterConfig.securityProtocol();
                }

                @Override
                public ListenerName listenerName() {
                    return ZkClusterInvocationContext.this.clusterConfig.listenerName().map(ListenerName::normalised).orElseGet(() -> ListenerName.forSecurityProtocol((SecurityProtocol)this.securityProtocol()));
                }

                @Override
                public Option<Properties> serverSaslProperties() {
                    if (ZkClusterInvocationContext.this.clusterConfig.saslServerProperties().isEmpty()) {
                        return Option.empty();
                    }
                    return Option.apply((Object)ZkClusterInvocationContext.this.clusterConfig.saslServerProperties());
                }

                @Override
                public Option<Properties> clientSaslProperties() {
                    if (ZkClusterInvocationContext.this.clusterConfig.saslClientProperties().isEmpty()) {
                        return Option.empty();
                    }
                    return Option.apply((Object)ZkClusterInvocationContext.this.clusterConfig.saslClientProperties());
                }

                @Override
                public int brokerCount() {
                    return ZkClusterInvocationContext.this.clusterConfig.numBrokers();
                }

                @Override
                public Option<File> trustStoreFile() {
                    return OptionConverters.toScala(ZkClusterInvocationContext.this.clusterConfig.trustStoreFile());
                }
            };
            this.clusterReference.set(cluster);
            if (this.clusterConfig.isAutoStart()) {
                clusterShim.start();
            }
        }, context -> clusterShim.stop(), new ClusterInstanceParameterResolver(clusterShim), new GenericParameterResolver<ClusterConfig>(this.clusterConfig, ClusterConfig.class));
    }

    public static class ZkClusterInstance
    implements ClusterInstance {
        final AtomicReference<IntegrationTestHarness> clusterReference;
        final ClusterConfig config;
        final AtomicBoolean started = new AtomicBoolean(false);
        final AtomicBoolean stopped = new AtomicBoolean(false);

        ZkClusterInstance(ClusterConfig config, AtomicReference<IntegrationTestHarness> clusterReference) {
            this.config = config;
            this.clusterReference = clusterReference;
        }

        @Override
        public String bootstrapServers() {
            return TestUtils.bootstrapServers(this.clusterReference.get().servers(), this.clusterReference.get().listenerName());
        }

        @Override
        public Collection<SocketServer> brokerSocketServers() {
            return JavaConverters.asJavaCollection(this.clusterReference.get().servers()).stream().map(KafkaServer::socketServer).collect(Collectors.toList());
        }

        @Override
        public ListenerName clientListener() {
            return this.clusterReference.get().listenerName();
        }

        @Override
        public Collection<SocketServer> controllerSocketServers() {
            return JavaConverters.asJavaCollection(this.clusterReference.get().servers()).stream().filter(broker -> broker.kafkaController().isActive()).map(KafkaServer::socketServer).collect(Collectors.toList());
        }

        @Override
        public SocketServer anyBrokerSocketServer() {
            return JavaConverters.asJavaCollection(this.clusterReference.get().servers()).stream().map(KafkaServer::socketServer).findFirst().orElseThrow(() -> new RuntimeException("No broker SocketServers found"));
        }

        @Override
        public SocketServer anyControllerSocketServer() {
            return JavaConverters.asJavaCollection(this.clusterReference.get().servers()).stream().filter(broker -> broker.kafkaController().isActive()).map(KafkaServer::socketServer).findFirst().orElseThrow(() -> new RuntimeException("No broker SocketServers found"));
        }

        @Override
        public ClusterInstance.ClusterType clusterType() {
            return ClusterInstance.ClusterType.ZK;
        }

        @Override
        public ClusterConfig config() {
            return this.config;
        }

        @Override
        public IntegrationTestHarness getUnderlying() {
            return this.clusterReference.get();
        }

        @Override
        public Admin createAdminClient(Properties configOverrides) {
            return this.clusterReference.get().createAdminClient(configOverrides);
        }

        @Override
        public void start() {
            if (this.started.compareAndSet(false, true)) {
                this.clusterReference.get().setUp();
            }
        }

        @Override
        public void stop() {
            if (this.stopped.compareAndSet(false, true)) {
                this.clusterReference.get().tearDown();
            }
        }
    }
}

