/*
 * Decompiled with CFR 0.152.
 */
package kafka.test.junit;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import kafka.network.SocketServer;
import kafka.server.BrokerServer;
import kafka.server.ControllerServer;
import kafka.server.KafkaBroker;
import kafka.test.ClusterConfig;
import kafka.test.ClusterInstance;
import kafka.test.annotation.Type;
import kafka.test.junit.ClusterInstanceParameterResolver;
import kafka.testkit.KafkaClusterTestKit;
import kafka.testkit.TestKitNodes;
import kafka.utils.TestUtils;
import kafka.zk.EmbeddedZookeeper;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.common.metadata.FeatureLevelRecord;
import org.apache.kafka.common.network.ListenerName;
import org.apache.kafka.common.protocol.ApiMessage;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.metadata.BrokerState;
import org.apache.kafka.metadata.bootstrap.BootstrapMetadata;
import org.apache.kafka.server.common.ApiMessageAndVersion;
import org.junit.jupiter.api.extension.Extension;
import org.junit.jupiter.api.extension.TestTemplateInvocationContext;
import scala.Function0;
import scala.Option;
import scala.compat.java8.OptionConverters;

public class RaftClusterInvocationContext
implements TestTemplateInvocationContext {
    private final String baseDisplayName;
    private final ClusterConfig clusterConfig;
    private final boolean isCombined;

    public RaftClusterInvocationContext(String baseDisplayName, ClusterConfig clusterConfig, boolean isCombined) {
        this.baseDisplayName = baseDisplayName;
        this.clusterConfig = clusterConfig;
        this.isCombined = isCombined;
    }

    public String getDisplayName(int invocationIndex) {
        return String.format("%s [%d] Type=Raft-%s, %s", this.baseDisplayName, invocationIndex, this.isCombined ? "Combined" : "Isolated", String.join((CharSequence)",", this.clusterConfig.displayTags()));
    }

    public List<Extension> getAdditionalExtensions() {
        RaftClusterInstance clusterInstance = new RaftClusterInstance(this.clusterConfig, this.isCombined);
        return Arrays.asList(context -> {
            clusterInstance.format();
            if (this.clusterConfig.isAutoStart()) {
                clusterInstance.start();
            }
        }, context -> clusterInstance.stop(), new ClusterInstanceParameterResolver(clusterInstance));
    }

    public static class RaftClusterInstance
    implements ClusterInstance {
        private final ClusterConfig clusterConfig;
        final AtomicBoolean started = new AtomicBoolean(false);
        final AtomicBoolean stopped = new AtomicBoolean(false);
        final AtomicBoolean formated = new AtomicBoolean(false);
        private final ConcurrentLinkedQueue<Admin> admins = new ConcurrentLinkedQueue();
        private EmbeddedZookeeper embeddedZookeeper;
        private KafkaClusterTestKit clusterTestKit;
        private final boolean isCombined;

        RaftClusterInstance(ClusterConfig clusterConfig, boolean isCombined) {
            this.clusterConfig = clusterConfig;
            this.isCombined = isCombined;
        }

        @Override
        public String bootstrapServers() {
            return this.clusterTestKit.bootstrapServers();
        }

        @Override
        public String bootstrapControllers() {
            return this.clusterTestKit.bootstrapControllers();
        }

        @Override
        public ListenerName clientListener() {
            return ListenerName.normalised((String)"EXTERNAL");
        }

        @Override
        public Optional<ListenerName> controllerListenerName() {
            return this.controllers().values().stream().findAny().flatMap(s -> OptionConverters.toJava((Option)s.config().controllerListenerNames().headOption())).map(ListenerName::new);
        }

        @Override
        public Collection<SocketServer> controllerSocketServers() {
            return this.controllers().values().stream().map(ControllerServer::socketServer).collect(Collectors.toList());
        }

        @Override
        public String clusterId() {
            return Stream.concat(this.controllers().values().stream().map(ControllerServer::clusterId), this.brokers().values().stream().map(KafkaBroker::clusterId)).findFirst().orElseThrow(() -> new RuntimeException("No controllers or brokers!"));
        }

        @Override
        public Type type() {
            return this.isCombined ? Type.CO_KRAFT : Type.KRAFT;
        }

        @Override
        public ClusterConfig config() {
            return this.clusterConfig;
        }

        @Override
        public Set<Integer> controllerIds() {
            return this.controllers().keySet();
        }

        @Override
        public KafkaClusterTestKit getUnderlying() {
            return this.clusterTestKit;
        }

        @Override
        public Admin createAdminClient(Properties configOverrides) {
            Admin admin = Admin.create((Properties)this.clusterTestKit.newClientPropertiesBuilder(configOverrides).build());
            this.admins.add(admin);
            return admin;
        }

        @Override
        public void start() {
            try {
                this.format();
                if (this.started.compareAndSet(false, true)) {
                    this.clusterTestKit.startup();
                    TestUtils.waitUntilTrue((Function0<Object>)((Function0)() -> this.clusterTestKit.brokers().values().stream().allMatch(brokers -> brokers.brokerState() == BrokerState.RUNNING)), (Function0<String>)((Function0)() -> "Broker never made it to RUNNING state."), 15000L, 100L);
                }
            }
            catch (Exception e) {
                throw new RuntimeException("Failed to start Raft server", e);
            }
        }

        @Override
        public void stop() {
            if (this.stopped.compareAndSet(false, true)) {
                this.admins.forEach(admin -> Utils.closeQuietly((AutoCloseable)admin, (String)"admin"));
                this.admins.clear();
                Utils.closeQuietly((AutoCloseable)this.clusterTestKit, (String)"cluster");
                if (this.embeddedZookeeper != null) {
                    Utils.closeQuietly((AutoCloseable)this.embeddedZookeeper, (String)"zk");
                }
            }
        }

        @Override
        public void shutdownBroker(int brokerId) {
            this.findBrokerOrThrow(brokerId).shutdown();
        }

        @Override
        public void startBroker(int brokerId) {
            this.findBrokerOrThrow(brokerId).startup();
        }

        @Override
        public void waitForReadyBrokers() throws InterruptedException {
            try {
                this.clusterTestKit.waitForReadyBrokers();
            }
            catch (ExecutionException e) {
                throw new AssertionError("Failed while waiting for brokers to become ready", e);
            }
        }

        @Override
        public Map<Integer, KafkaBroker> brokers() {
            return this.clusterTestKit.brokers().entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
        }

        @Override
        public Map<Integer, ControllerServer> controllers() {
            return Collections.unmodifiableMap(this.clusterTestKit.controllers());
        }

        public void format() throws Exception {
            if (this.formated.compareAndSet(false, true)) {
                ArrayList<ApiMessageAndVersion> records = new ArrayList<ApiMessageAndVersion>();
                records.add(new ApiMessageAndVersion((ApiMessage)new FeatureLevelRecord().setName("metadata.version").setFeatureLevel(this.clusterConfig.metadataVersion().featureLevel()), 0));
                this.clusterConfig.features().forEach((feature, version) -> records.add(new ApiMessageAndVersion((ApiMessage)new FeatureLevelRecord().setName(feature.featureName()).setFeatureLevel(version.shortValue()), 0)));
                TestKitNodes nodes = new TestKitNodes.Builder().setBootstrapMetadata(BootstrapMetadata.fromRecords(records, (String)"testkit")).setCombined(this.isCombined).setNumBrokerNodes(this.clusterConfig.numBrokers()).setNumDisksPerBroker(this.clusterConfig.numDisksPerBroker()).setPerServerProperties(this.clusterConfig.perServerOverrideProperties()).setNumControllerNodes(this.clusterConfig.numControllers()).build();
                KafkaClusterTestKit.Builder builder = new KafkaClusterTestKit.Builder(nodes);
                if (Boolean.parseBoolean(this.clusterConfig.serverProperties().getOrDefault("zookeeper.metadata.migration.enable", "false"))) {
                    this.embeddedZookeeper = new EmbeddedZookeeper();
                    builder.setConfigProp("zookeeper.connect", String.format("localhost:%d", this.embeddedZookeeper.port()));
                }
                this.clusterConfig.serverProperties().forEach(builder::setConfigProp);
                this.clusterTestKit = builder.build();
                this.clusterTestKit.format();
            }
        }

        private BrokerServer findBrokerOrThrow(int brokerId) {
            return Optional.ofNullable(this.clusterTestKit.brokers().get(brokerId)).orElseThrow(() -> new IllegalArgumentException("Unknown brokerId " + brokerId));
        }
    }
}

