/*
 * Decompiled with CFR 0.152.
 */
package io.strimzi.test.container;

import com.groupcdg.pitest.annotations.DoNotMutate;
import io.strimzi.test.container.KafkaContainer;
import io.strimzi.test.container.KafkaVersionService;
import io.strimzi.test.container.StrimziKafkaContainer;
import io.strimzi.test.container.Utils;
import java.io.IOException;
import java.time.Duration;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.Stream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testcontainers.containers.Container;
import org.testcontainers.containers.Network;
import org.testcontainers.containers.ToxiproxyContainer;
import org.testcontainers.lifecycle.Startable;
import org.testcontainers.lifecycle.Startables;

public class StrimziKafkaCluster
implements KafkaContainer {
    private static final Logger LOGGER = LoggerFactory.getLogger(StrimziKafkaCluster.class);
    private final int brokersNum;
    private final int internalTopicReplicationFactor;
    private final Map<String, String> additionalKafkaConfiguration;
    private final ToxiproxyContainer proxyContainer;
    private final boolean enableSharedNetwork;
    private final String kafkaVersion;
    private final boolean enableKraft;
    private final Network network;
    private Collection<KafkaContainer> brokers;
    private final String clusterId;

    private StrimziKafkaCluster(StrimziKafkaClusterBuilder builder) {
        this.brokersNum = builder.brokersNum;
        this.enableSharedNetwork = builder.enableSharedNetwork;
        this.network = this.enableSharedNetwork ? Network.SHARED : Network.newNetwork();
        this.internalTopicReplicationFactor = builder.internalTopicReplicationFactor == 0 ? this.brokersNum : builder.internalTopicReplicationFactor;
        this.additionalKafkaConfiguration = builder.additionalKafkaConfiguration;
        this.proxyContainer = builder.proxyContainer;
        this.kafkaVersion = builder.kafkaVersion;
        this.enableKraft = builder.enableKRaft;
        this.clusterId = builder.clusterId;
        this.validateBrokerNum(this.brokersNum);
        this.validateInternalTopicReplicationFactor(this.internalTopicReplicationFactor, this.brokersNum);
        if (this.proxyContainer != null) {
            this.proxyContainer.setNetwork(this.network);
        }
        this.prepareKafkaCluster(this.additionalKafkaConfiguration, this.kafkaVersion);
    }

    private void prepareKafkaCluster(Map<String, String> additionalKafkaConfiguration, String kafkaVersion) {
        HashMap<String, String> defaultKafkaConfigurationForMultiNode = new HashMap<String, String>();
        defaultKafkaConfigurationForMultiNode.put("offsets.topic.replication.factor", String.valueOf(this.internalTopicReplicationFactor));
        defaultKafkaConfigurationForMultiNode.put("num.partitions", String.valueOf(this.internalTopicReplicationFactor));
        defaultKafkaConfigurationForMultiNode.put("transaction.state.log.replication.factor", String.valueOf(this.internalTopicReplicationFactor));
        defaultKafkaConfigurationForMultiNode.put("transaction.state.log.min.isr", String.valueOf(this.internalTopicReplicationFactor));
        this.configureQuorumVoters(additionalKafkaConfiguration);
        if (additionalKafkaConfiguration != null) {
            defaultKafkaConfigurationForMultiNode.putAll(additionalKafkaConfiguration);
        }
        this.brokers = IntStream.range(0, this.brokersNum).mapToObj(brokerId -> {
            LOGGER.info("Starting broker with id {}", (Object)brokerId);
            StrimziKafkaContainer kafkaContainer = ((StrimziKafkaContainer)new StrimziKafkaContainer().withBrokerId(brokerId).withKafkaConfigurationMap(defaultKafkaConfigurationForMultiNode).withNetwork(this.network)).withProxyContainer(this.proxyContainer).withKafkaVersion(kafkaVersion == null ? KafkaVersionService.getInstance().latestRelease().getVersion() : kafkaVersion).withNodeId(brokerId).withClusterId(this.clusterId).waitForRunning();
            LOGGER.info("Started broker with id: {}", (Object)kafkaContainer);
            return kafkaContainer;
        }).collect(Collectors.toList());
    }

    private void validateBrokerNum(int brokersNum) {
        if (brokersNum <= 0) {
            throw new IllegalArgumentException("brokersNum '" + brokersNum + "' must be greater than 0");
        }
    }

    private void validateInternalTopicReplicationFactor(int internalTopicReplicationFactor, int brokersNum) {
        if (internalTopicReplicationFactor < 1 || internalTopicReplicationFactor > brokersNum) {
            throw new IllegalArgumentException("internalTopicReplicationFactor '" + internalTopicReplicationFactor + "' must be between 1 and " + brokersNum);
        }
    }

    public Collection<KafkaContainer> getBrokers() {
        return this.brokers;
    }

    @DoNotMutate
    public String getNetworkBootstrapServers() {
        return this.brokers.stream().map(broker -> ((StrimziKafkaContainer)broker).getNetworkBootstrapServers()).collect(Collectors.joining(","));
    }

    @Override
    public String getBootstrapServers() {
        return this.brokers.stream().map(KafkaContainer::getBootstrapServers).collect(Collectors.joining(","));
    }

    int getInternalTopicReplicationFactor() {
        return this.internalTopicReplicationFactor;
    }

    boolean isSharedNetworkEnabled() {
        return this.enableSharedNetwork;
    }

    Map<String, String> getAdditionalKafkaConfiguration() {
        return this.additionalKafkaConfiguration;
    }

    private void configureQuorumVoters(Map<String, String> additionalKafkaConfiguration) {
        String quorumVoters = IntStream.range(0, this.brokersNum).mapToObj(brokerId -> String.format("%d@broker-%d:9094", brokerId, brokerId)).collect(Collectors.joining(","));
        additionalKafkaConfiguration.put("controller.quorum.voters", quorumVoters);
    }

    @DoNotMutate
    public void start() {
        Stream<KafkaContainer> startables = this.brokers.stream();
        try {
            Startables.deepStart(startables).get(60L, TimeUnit.SECONDS);
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new RuntimeException("Interrupted while starting Kafka containers", e);
        }
        catch (ExecutionException e) {
            throw new RuntimeException("Failed to start Kafka containers", e);
        }
        catch (TimeoutException e) {
            throw new RuntimeException("Timed out while starting Kafka containers", e);
        }
        Utils.waitFor("Kafka brokers to form a quorum", Duration.ofSeconds(1L), Duration.ofMinutes(1L), () -> {
            try {
                for (KafkaContainer kafkaContainer : this.brokers) {
                    Container.ExecResult result = ((StrimziKafkaContainer)kafkaContainer).execInContainer(new String[]{"bash", "-c", "bin/kafka-metadata-quorum.sh --bootstrap-server localhost:9091 describe --status"});
                    String output = result.getStdout();
                    LOGGER.info("Metadata quorum status from broker {}: {}", (Object)((StrimziKafkaContainer)kafkaContainer).getBrokerId(), (Object)output);
                    if (output == null || output.isEmpty()) {
                        return false;
                    }
                    Pattern leaderIdPattern = Pattern.compile("LeaderId:\\s+(\\d+)");
                    Matcher leaderIdMatcher = leaderIdPattern.matcher(output);
                    if (!leaderIdMatcher.find()) {
                        return false;
                    }
                    String leaderIdStr = leaderIdMatcher.group(1);
                    try {
                        int leaderId = Integer.parseInt(leaderIdStr);
                        if (leaderId >= 0) continue;
                        return false;
                    }
                    catch (NumberFormatException e) {
                        return false;
                    }
                }
                return true;
            }
            catch (IOException | InterruptedException e) {
                Thread.currentThread().interrupt();
                throw new RuntimeException("Failed to execute command in Kafka container", e);
            }
        });
    }

    @DoNotMutate
    public void stop() {
        ((Stream)this.brokers.stream().parallel()).forEach(Startable::stop);
    }

    protected Network getNetwork() {
        return this.network;
    }

    public static class StrimziKafkaClusterBuilder {
        private int brokersNum;
        private int internalTopicReplicationFactor;
        private Map<String, String> additionalKafkaConfiguration = new HashMap<String, String>();
        private ToxiproxyContainer proxyContainer;
        private boolean enableSharedNetwork;
        private String kafkaVersion;
        private boolean enableKRaft;
        private String clusterId;

        public StrimziKafkaClusterBuilder withNumberOfBrokers(int brokersNum) {
            this.brokersNum = brokersNum;
            return this;
        }

        public StrimziKafkaClusterBuilder withInternalTopicReplicationFactor(int internalTopicReplicationFactor) {
            this.internalTopicReplicationFactor = internalTopicReplicationFactor;
            return this;
        }

        public StrimziKafkaClusterBuilder withAdditionalKafkaConfiguration(Map<String, String> additionalKafkaConfiguration) {
            if (additionalKafkaConfiguration != null) {
                this.additionalKafkaConfiguration.putAll(additionalKafkaConfiguration);
            }
            return this;
        }

        public StrimziKafkaClusterBuilder withProxyContainer(ToxiproxyContainer proxyContainer) {
            this.proxyContainer = proxyContainer;
            return this;
        }

        public StrimziKafkaClusterBuilder withSharedNetwork() {
            this.enableSharedNetwork = true;
            return this;
        }

        public StrimziKafkaClusterBuilder withKafkaVersion(String kafkaVersion) {
            this.kafkaVersion = kafkaVersion;
            return this;
        }

        public StrimziKafkaCluster build() {
            this.clusterId = UUID.randomUUID().toString();
            return new StrimziKafkaCluster(this);
        }
    }
}

