/*
 * Decompiled with CFR 0.152.
 */
package io.strimzi.test.container;

import com.github.dockerjava.api.command.InspectContainerResponse;
import com.github.dockerjava.api.model.ContainerNetwork;
import io.strimzi.test.container.KafkaContainer;
import io.strimzi.test.container.KafkaVersionService;
import io.strimzi.test.container.UnsupportedKraftKafkaVersionException;
import java.lang.invoke.CallSite;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Base64;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.function.Function;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testcontainers.containers.GenericContainer;
import org.testcontainers.containers.Network;
import org.testcontainers.containers.ToxiproxyContainer;
import org.testcontainers.containers.wait.strategy.Wait;
import org.testcontainers.containers.wait.strategy.WaitStrategy;
import org.testcontainers.images.builder.Transferable;
import org.testcontainers.utility.MountableFile;

public class StrimziKafkaContainer
extends GenericContainer<StrimziKafkaContainer>
implements KafkaContainer {
    private static final Logger LOGGER = LoggerFactory.getLogger(StrimziKafkaContainer.class);
    public static final String STARTER_SCRIPT = "/testcontainers_start.sh";
    public static final int KAFKA_PORT = 9092;
    private final CompletableFuture<String> imageNameProvider;
    private int kafkaExposedPort;
    private int internalZookeeperExposedPort;
    private Map<String, String> kafkaConfigurationMap;
    private String externalZookeeperConnect;
    private int brokerId;
    private String kafkaVersion;
    private boolean useKraft;
    private Function<StrimziKafkaContainer, String> bootstrapServersProvider = c -> String.format("PLAINTEXT://%s:%s", this.getHost(), this.kafkaExposedPort);
    private String clusterId;
    private ToxiproxyContainer proxyContainer;
    private ToxiproxyContainer.ContainerProxy proxy;

    public StrimziKafkaContainer() {
        this(new CompletableFuture<String>());
    }

    public StrimziKafkaContainer(String dockerImageName) {
        this(CompletableFuture.completedFuture(dockerImageName));
    }

    private StrimziKafkaContainer(CompletableFuture<String> imageName) {
        super(imageName);
        this.imageNameProvider = imageName;
        super.setNetwork(Network.SHARED);
        super.setExposedPorts(Collections.singletonList(9092));
        super.addEnv("LOG_DIR", "/tmp");
    }

    protected void doStart() {
        if (this.proxyContainer != null && !this.proxyContainer.isRunning()) {
            this.proxyContainer.start();
        }
        if (!this.imageNameProvider.isDone()) {
            this.imageNameProvider.complete(KafkaVersionService.strimziTestContainerImageName(this.kafkaVersion));
        }
        try {
            if (this.useKraft && (this.kafkaVersion != null && this.kafkaVersion.startsWith("2.") || this.imageNameProvider.get().contains("2.8.2"))) {
                throw new UnsupportedKraftKafkaVersionException("Specified Kafka version " + this.kafkaVersion + " is not supported in KRaft mode.");
            }
        }
        catch (InterruptedException | ExecutionException e) {
            LOGGER.error("Error occurred during retrieving of image name provider", (Throwable)e);
            throw new RuntimeException(e);
        }
        if (!this.hasKraftOrExternalZooKeeperConfigured()) {
            super.addExposedPort(Integer.valueOf(2181));
        }
        super.setCommand(new String[]{"sh", "-c", this.runStarterScript()});
        super.doStart();
    }

    public void stop() {
        if (this.proxyContainer != null && this.proxyContainer.isRunning()) {
            this.proxyContainer.stop();
        }
        super.stop();
    }

    protected String runStarterScript() {
        return "while [ ! -x /testcontainers_start.sh ]; do sleep 0.1; done; /testcontainers_start.sh";
    }

    public StrimziKafkaContainer waitForRunning() {
        if (this.useKraft) {
            super.waitingFor((WaitStrategy)Wait.forLogMessage((String)".*Transitioning from RECOVERY to RUNNING.*", (int)1));
        } else {
            super.waitingFor((WaitStrategy)Wait.forLogMessage((String)".*Recorded new controller, from now on will use [node|broker].*", (int)1));
        }
        return this;
    }

    protected void containerIsStarting(InspectContainerResponse containerInfo, boolean reused) {
        super.containerIsStarting(containerInfo, reused);
        this.kafkaExposedPort = this.getMappedPort(9092);
        if (!this.hasKraftOrExternalZooKeeperConfigured()) {
            this.internalZookeeperExposedPort = this.getMappedPort(2181);
        }
        LOGGER.info("Mapped port: {}", (Object)this.kafkaExposedPort);
        String bootstrapServers = this.getBootstrapServers();
        String bsListenerName = this.extractListenerName(bootstrapServers);
        StringBuilder advertisedListeners = new StringBuilder(bootstrapServers);
        Collection cns = containerInfo.getNetworkSettings().getNetworks().values();
        int advertisedListenerNumber = 1;
        ArrayList<CallSite> advertisedListenersNames = new ArrayList<CallSite>();
        for (ContainerNetwork cn : cns) {
            String advertisedName = "BROKER" + advertisedListenerNumber;
            advertisedListeners.append(",").append(advertisedName).append("://").append(cn.getIpAddress()).append(":9093");
            advertisedListenersNames.add((CallSite)((Object)advertisedName));
            ++advertisedListenerNumber;
        }
        LOGGER.info("This is all advertised listeners for Kafka {}", (Object)advertisedListeners.toString());
        StringBuilder kafkaListeners = new StringBuilder();
        StringBuilder kafkaListenerSecurityProtocol = new StringBuilder();
        advertisedListenersNames.forEach(name -> {
            kafkaListeners.append((String)name).append("://0.0.0.0:9093").append(",");
            kafkaListenerSecurityProtocol.append((String)name).append(":PLAINTEXT").append(",");
        });
        kafkaListeners.append(bsListenerName).append("://0.0.0.0:").append(9092);
        kafkaListenerSecurityProtocol.append("PLAINTEXT:PLAINTEXT");
        if (!bsListenerName.equals("PLAINTEXT")) {
            kafkaListenerSecurityProtocol.append(",").append(bsListenerName).append(":").append(bsListenerName);
        }
        if (this.useKraft) {
            kafkaListeners.append(",").append("CONTROLLER://localhost:9094");
            kafkaListenerSecurityProtocol.append(",").append("CONTROLLER:PLAINTEXT");
        }
        HashMap<String, String> kafkaConfiguration = new HashMap<String, String>();
        kafkaConfiguration.put("listeners", kafkaListeners.toString());
        kafkaConfiguration.put("advertised.listeners", advertisedListeners.toString());
        kafkaConfiguration.put("listener.security.protocol.map", kafkaListenerSecurityProtocol.toString());
        kafkaConfiguration.put("inter.broker.listener.name", "BROKER1");
        kafkaConfiguration.put("broker.id", String.valueOf(this.brokerId));
        if (this.useKraft) {
            kafkaConfiguration.put("controller.quorum.voters", this.brokerId + "@localhost:9094");
            kafkaConfiguration.put("controller.listener.names", "CONTROLLER");
        } else if (this.externalZookeeperConnect != null) {
            LOGGER.info("Using external ZooKeeper 'zookeeper.connect={}'.", (Object)this.externalZookeeperConnect);
            kafkaConfiguration.put("zookeeper.connect", this.externalZookeeperConnect);
        } else {
            LOGGER.info("Using internal ZooKeeper 'zookeeper.connect={}.'", (Object)"localhost:2181");
            kafkaConfiguration.put("zookeeper.connect", "localhost:2181");
        }
        if (this.kafkaConfigurationMap != null) {
            kafkaConfiguration.putAll(this.kafkaConfigurationMap);
        }
        String kafkaConfigurationOverride = StrimziKafkaContainer.writeOverrideString(kafkaConfiguration);
        Object command = "#!/bin/bash \n";
        if (!this.useKraft) {
            if (this.externalZookeeperConnect != null) {
                this.withEnv("KAFKA_ZOOKEEPER_CONNECT", this.externalZookeeperConnect);
            } else {
                command = (String)command + "bin/zookeeper-server-start.sh config/zookeeper.properties &\n";
            }
            command = (String)command + "bin/kafka-server-start.sh config/server.properties" + kafkaConfigurationOverride;
        } else {
            this.clusterId = this.randomUuid();
            command = (String)command + "bin/kafka-storage.sh format -t \"" + this.clusterId + "\" -c config/kraft/server.properties \n";
            command = (String)command + "bin/kafka-server-start.sh config/kraft/server.properties" + kafkaConfigurationOverride;
        }
        LOGGER.info("Copying command to 'STARTER_SCRIPT' script.");
        this.copyFileToContainer(Transferable.of((byte[])((String)command).getBytes(StandardCharsets.UTF_8), (int)700), STARTER_SCRIPT);
    }

    @Override
    public boolean hasKraftOrExternalZooKeeperConfigured() {
        return this.useKraft || this.externalZookeeperConnect != null;
    }

    private String extractListenerName(String bootstrapServers) {
        String[] strings = bootstrapServers.split(":");
        if (strings.length < 3) {
            throw new IllegalArgumentException("The configured boostrap servers '" + bootstrapServers + "' must be prefixed with a listener name.");
        }
        return strings[0];
    }

    static String writeOverrideString(Map<String, String> kafkaConfigurationMap) {
        StringBuilder kafkaConfiguration = new StringBuilder();
        kafkaConfigurationMap.forEach((configName, configValue) -> kafkaConfiguration.append(" --override ").append('\'').append(configName.replace("'", "'\"'\"'")).append("=").append(configValue.replace("'", "'\"'\"'")).append('\''));
        return kafkaConfiguration.toString();
    }

    private String randomUuid() {
        UUID metadataTopicIdInternal = new UUID(0L, 1L);
        UUID zeroIdImpactInternal = new UUID(0L, 0L);
        UUID uuid = UUID.randomUUID();
        while (uuid.equals(metadataTopicIdInternal) || uuid.equals(zeroIdImpactInternal)) {
            uuid = UUID.randomUUID();
        }
        ByteBuffer uuidBytes = ByteBuffer.wrap(new byte[16]);
        uuidBytes.putLong(uuid.getMostSignificantBits());
        uuidBytes.putLong(uuid.getLeastSignificantBits());
        byte[] uuidBytesArray = uuidBytes.array();
        return Base64.getUrlEncoder().withoutPadding().encodeToString(uuidBytesArray);
    }

    @Override
    public String getInternalZooKeeperConnect() {
        if (this.hasKraftOrExternalZooKeeperConfigured()) {
            throw new IllegalStateException("Connect string is not available when using KRaft or external ZooKeeper");
        }
        return this.getHost() + ":" + this.internalZookeeperExposedPort;
    }

    @Override
    public String getBootstrapServers() {
        if (this.proxyContainer != null) {
            return String.format("PLAINTEXT://%s:%d", this.getProxy().getContainerIpAddress(), this.getProxy().getProxyPort());
        }
        return this.bootstrapServersProvider.apply(this);
    }

    public String getClusterId() {
        return this.clusterId;
    }

    public StrimziKafkaContainer withKafkaConfigurationMap(Map<String, String> kafkaConfigurationMap) {
        this.kafkaConfigurationMap = kafkaConfigurationMap;
        return this;
    }

    public StrimziKafkaContainer withExternalZookeeperConnect(String externalZookeeperConnect) {
        if (this.useKraft) {
            throw new IllegalStateException("Cannot configure an external Zookeeper and use Kraft at the same time");
        }
        this.externalZookeeperConnect = externalZookeeperConnect;
        return (StrimziKafkaContainer)this.self();
    }

    public StrimziKafkaContainer withBrokerId(int brokerId) {
        this.brokerId = brokerId;
        return (StrimziKafkaContainer)this.self();
    }

    public StrimziKafkaContainer withKafkaVersion(String kafkaVersion) {
        this.kafkaVersion = kafkaVersion;
        return (StrimziKafkaContainer)this.self();
    }

    public StrimziKafkaContainer withKraft() {
        this.useKraft = true;
        return (StrimziKafkaContainer)this.self();
    }

    public StrimziKafkaContainer withPort(int fixedPort) {
        if (fixedPort <= 0) {
            throw new IllegalArgumentException("The fixed Kafka port must be greater than 0");
        }
        this.addFixedExposedPort(fixedPort, 9092);
        return (StrimziKafkaContainer)this.self();
    }

    public StrimziKafkaContainer withServerProperties(MountableFile serverPropertiesFile) {
        this.withCopyFileToContainer(serverPropertiesFile, this.useKraft ? "/opt/kafka/config/kraft/server.properties" : "/opt/kafka/config/server.properties");
        return (StrimziKafkaContainer)this.self();
    }

    public StrimziKafkaContainer withBootstrapServers(Function<StrimziKafkaContainer, String> provider) {
        this.bootstrapServersProvider = provider;
        return (StrimziKafkaContainer)this.self();
    }

    public StrimziKafkaContainer withProxyContainer(ToxiproxyContainer proxyContainer) {
        if (proxyContainer != null) {
            this.proxyContainer = proxyContainer;
            proxyContainer.setNetwork(Network.SHARED);
            proxyContainer.setNetworkAliases(Collections.singletonList("toxiproxy"));
        }
        return (StrimziKafkaContainer)this.self();
    }

    public synchronized ToxiproxyContainer.ContainerProxy getProxy() {
        if (this.proxyContainer == null) {
            throw new IllegalStateException("The proxy container has not been configured");
        }
        if (this.proxy == null) {
            this.proxy = this.proxyContainer.getProxy((GenericContainer)this, 9092);
        }
        return this.proxy;
    }
}

