/*
 * Decompiled with CFR 0.152.
 */
package org.testcontainers.containers;

import java.util.stream.Stream;
import org.testcontainers.containers.BindMode;
import org.testcontainers.containers.GenericContainer;
import org.testcontainers.containers.Network;
import org.testcontainers.containers.SocatContainer;
import org.testcontainers.utility.Base58;

public class KafkaContainer
extends GenericContainer<KafkaContainer> {
    public static final int KAFKA_PORT = 9092;
    public static final int ZOOKEEPER_PORT = 2181;
    protected String externalZookeeperConnect = null;
    protected SocatContainer proxy;

    public KafkaContainer() {
        this("4.0.0");
    }

    public KafkaContainer(String confluentPlatformVersion) {
        super("confluentinc/cp-kafka:" + confluentPlatformVersion);
        this.withNetwork(Network.newNetwork());
        String networkAlias = "kafka-" + Base58.randomString((int)6);
        this.withNetworkAliases(new String[]{networkAlias});
        this.withExposedPorts(new Integer[]{9092});
        this.withEnv("KAFKA_LISTENERS", "PLAINTEXT://0.0.0.0:9092,BROKER://" + networkAlias + ":9093");
        this.withEnv("KAFKA_LISTENER_SECURITY_PROTOCOL_MAP", "BROKER:PLAINTEXT,PLAINTEXT:PLAINTEXT");
        this.withEnv("KAFKA_INTER_BROKER_LISTENER_NAME", "BROKER");
        this.withEnv("KAFKA_BROKER_ID", "1");
        this.withEnv("KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR", "1");
        this.withEnv("KAFKA_OFFSETS_TOPIC_NUM_PARTITIONS", "1");
        this.withEnv("KAFKA_LOG_FLUSH_INTERVAL_MESSAGES", "9223372036854775807");
    }

    public KafkaContainer withEmbeddedZookeeper() {
        this.externalZookeeperConnect = null;
        return (KafkaContainer)this.self();
    }

    public KafkaContainer withExternalZookeeper(String connectString) {
        this.externalZookeeperConnect = connectString;
        return (KafkaContainer)this.self();
    }

    public String getBootstrapServers() {
        return String.format("PLAINTEXT://%s:%s", this.proxy.getContainerIpAddress(), this.proxy.getFirstMappedPort());
    }

    public void start() {
        String networkAlias = (String)this.getNetworkAliases().get(0);
        this.proxy = ((SocatContainer)new SocatContainer().withNetwork(this.getNetwork())).withTarget(9092, networkAlias).withTarget(2181, networkAlias);
        this.proxy.start();
        this.withEnv("KAFKA_ADVERTISED_LISTENERS", "BROKER://" + networkAlias + ":9093,PLAINTEXT://" + this.proxy.getContainerIpAddress() + ":" + this.proxy.getFirstMappedPort());
        if (this.externalZookeeperConnect != null) {
            this.withEnv("KAFKA_ZOOKEEPER_CONNECT", this.externalZookeeperConnect);
        } else {
            this.addExposedPort(2181);
            this.withEnv("KAFKA_ZOOKEEPER_CONNECT", "localhost:2181");
            this.withClasspathResourceMapping("tc-zookeeper.properties", "/zookeeper.properties", BindMode.READ_ONLY);
            this.withCommand(new String[]{"sh", "-c", "zookeeper-server-start /zookeeper.properties & /etc/confluent/docker/run"});
        }
        super.start();
    }

    public void stop() {
        Runnable[] runnableArray = new Runnable[2];
        runnableArray[0] = () -> super.stop();
        runnableArray[1] = () -> ((SocatContainer)this.proxy).stop();
        ((Stream)Stream.of(runnableArray).parallel()).forEach(Runnable::run);
    }
}

