/*
 * Decompiled with CFR 0.152.
 */
package net.mguenther.kafka.junit;

import java.io.File;
import java.io.IOException;
import java.util.Map;
import java.util.Properties;
import kafka.server.KafkaConfig;
import kafka.server.KafkaConfig$;
import kafka.server.KafkaServer;
import kafka.utils.TestUtils;
import net.mguenther.kafka.junit.EmbeddedKafkaConfig;
import net.mguenther.kafka.junit.EmbeddedLifecycle;
import org.apache.kafka.common.network.ListenerName;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.apache.kafka.common.utils.Time;
import org.junit.rules.TemporaryFolder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class EmbeddedKafka
implements EmbeddedLifecycle {
    private static final Logger log = LoggerFactory.getLogger(EmbeddedKafka.class);
    private final int brokerId;
    private final Properties brokerConfig;
    private final TemporaryFolder workingDirectory;
    private final File logDirectory;
    private KafkaServer kafka;

    public EmbeddedKafka(int brokerId, EmbeddedKafkaConfig config, String zooKeeperConnectUrl) throws IOException {
        this.brokerId = brokerId;
        this.brokerConfig = new Properties();
        this.brokerConfig.putAll((Map<?, ?>)config.getBrokerProperties());
        this.brokerConfig.put(KafkaConfig$.MODULE$.ZkConnectProp(), zooKeeperConnectUrl);
        this.workingDirectory = new TemporaryFolder();
        this.workingDirectory.create();
        this.logDirectory = this.workingDirectory.newFolder();
        this.brokerConfig.put(KafkaConfig$.MODULE$.BrokerIdProp(), (Object)brokerId);
        this.brokerConfig.put(KafkaConfig$.MODULE$.LogDirProp(), this.logDirectory.getAbsolutePath());
    }

    @Override
    public void start() {
        this.activate();
    }

    public void activate() {
        if (this.kafka != null) {
            log.info("The embedded Kafka broker with ID {} is already running.", (Object)this.brokerId);
            return;
        }
        try {
            log.info("Embedded Kafka broker with ID {} is starting.", (Object)this.brokerId);
            KafkaConfig config = new KafkaConfig((Map)this.brokerConfig, true);
            this.kafka = TestUtils.createServer((KafkaConfig)config, (Time)Time.SYSTEM);
            log.info("The embedded Kafka broker with ID {} has been started. Its logs can be found at {}.", (Object)this.brokerId, (Object)this.logDirectory);
        }
        catch (Exception e) {
            throw new RuntimeException(String.format("Unable to start the embedded Kafka broker with ID %s.", this.brokerId), e);
        }
    }

    @Override
    public void stop() {
        if (this.kafka == null) {
            log.info("The embedded Kafka broker with ID {} is not running or was already shut down.", (Object)this.brokerId);
            return;
        }
        this.deactivate();
        log.info("Removing working directory at {}. This directory contains Kafka logs for Kafka broker with ID {} as well.", (Object)this.workingDirectory, (Object)this.brokerId);
        this.workingDirectory.delete();
        log.info("The embedded Kafka broker with ID {} has been stopped.", (Object)this.brokerId);
    }

    public void deactivate() {
        if (this.kafka == null) {
            return;
        }
        log.info("The embedded Kafka broker with ID {} is stopping.", (Object)this.brokerId);
        this.kafka.shutdown();
        this.kafka.awaitShutdown();
        this.kafka = null;
    }

    public String getBrokerList() {
        return String.format("%s:%s", this.kafka.config().hostName(), Integer.toString(this.kafka.boundPort(ListenerName.forSecurityProtocol((SecurityProtocol)SecurityProtocol.PLAINTEXT))));
    }

    public String getClusterId() {
        return this.kafka.clusterId();
    }

    public Integer getBrokerId() {
        return this.brokerId;
    }

    public boolean isActive() {
        return this.kafka != null;
    }
}

