/*
 * Decompiled with CFR 0.152.
 */
package net.mguenther.kafka.junit;

import java.io.IOException;
import java.nio.file.FileVisitResult;
import java.nio.file.FileVisitor;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.SimpleFileVisitor;
import java.nio.file.attribute.BasicFileAttributes;
import java.nio.file.attribute.FileAttribute;
import java.util.Map;
import java.util.Properties;
import kafka.server.KafkaConfig;
import kafka.server.KafkaConfig$;
import kafka.server.KafkaServer;
import kafka.utils.TestUtils;
import net.mguenther.kafka.junit.EmbeddedKafkaConfig;
import net.mguenther.kafka.junit.EmbeddedLifecycle;
import org.apache.kafka.common.network.ListenerName;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.apache.kafka.common.utils.Time;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class EmbeddedKafka
implements EmbeddedLifecycle {
    private static final Logger log = LoggerFactory.getLogger(EmbeddedKafka.class);
    private static final int UNDEFINED_BOUND_PORT = -1;
    private final int brokerId;
    private final Properties brokerConfig;
    private final Path logDirectory;
    private KafkaServer kafka;
    private int boundPort = -1;

    public EmbeddedKafka(int brokerId, String listener, EmbeddedKafkaConfig config, String zooKeeperConnectUrl, boolean usesConnect) throws IOException {
        this.brokerId = brokerId;
        this.brokerConfig = new Properties();
        this.brokerConfig.putAll((Map<?, ?>)config.getBrokerProperties());
        this.brokerConfig.put(KafkaConfig$.MODULE$.ListenersProp(), listener);
        this.brokerConfig.put(KafkaConfig$.MODULE$.ZkConnectProp(), zooKeeperConnectUrl);
        this.logDirectory = Files.createTempDirectory("kafka-junit", new FileAttribute[0]);
        this.brokerConfig.put(KafkaConfig$.MODULE$.BrokerIdProp(), (Object)brokerId);
        this.brokerConfig.put(KafkaConfig$.MODULE$.LogDirProp(), this.logDirectory.toFile().getAbsolutePath());
        if (usesConnect) {
            log.info("Enforcing 'log.cleanup.policy=compact', due to the presence of a Kafka Connect deployment.");
            this.brokerConfig.put(KafkaConfig$.MODULE$.LogCleanupPolicyProp(), "compact");
        }
    }

    @Override
    public void start() {
        this.activate();
    }

    public void activate() {
        if (this.kafka != null) {
            log.info("The embedded Kafka broker with ID {} is already running.", (Object)this.brokerId);
            return;
        }
        try {
            log.info("Embedded Kafka broker with ID {} is starting.", (Object)this.brokerId);
            if (this.boundPort != -1) {
                this.brokerConfig.put(KafkaConfig$.MODULE$.ListenersProp(), String.format("PLAINTEXT://localhost:%s", this.boundPort));
            }
            KafkaConfig config = new KafkaConfig((Map)this.brokerConfig, true);
            this.kafka = TestUtils.createServer((KafkaConfig)config, (Time)Time.SYSTEM);
            this.boundPort = this.kafka.boundPort(config.interBrokerListenerName());
            log.info("The embedded Kafka broker with ID {} has been started. Its logs can be found at {}.", (Object)this.brokerId, (Object)this.logDirectory);
        }
        catch (Exception e) {
            throw new RuntimeException(String.format("Unable to start the embedded Kafka broker with ID %s.", this.brokerId), e);
        }
    }

    @Override
    public void stop() {
        if (this.kafka == null) {
            log.info("The embedded Kafka broker with ID {} is not running or was already shut down.", (Object)this.brokerId);
            return;
        }
        this.deactivate();
        log.info("Removing working directory at {}. This directory contains Kafka logs for Kafka broker with ID {} as well.", (Object)this.logDirectory, (Object)this.brokerId);
        try {
            this.recursivelyDelete(this.logDirectory);
        }
        catch (IOException e) {
            log.warn("Unable to remove working directory at {}.", (Object)this.logDirectory);
        }
        log.info("The embedded Kafka broker with ID {} has been stopped.", (Object)this.brokerId);
    }

    private void recursivelyDelete(Path path) throws IOException {
        Files.walkFileTree(path, (FileVisitor<? super Path>)new SimpleFileVisitor<Path>(){

            @Override
            public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) {
                file.toFile().delete();
                return FileVisitResult.CONTINUE;
            }

            @Override
            public FileVisitResult preVisitDirectory(Path dir, BasicFileAttributes attrs) {
                return FileVisitResult.CONTINUE;
            }

            @Override
            public FileVisitResult postVisitDirectory(Path dir, IOException exc) {
                dir.toFile().delete();
                return FileVisitResult.CONTINUE;
            }
        });
    }

    public void deactivate() {
        if (this.kafka == null) {
            return;
        }
        this.boundPort = this.kafka.boundPort(ListenerName.forSecurityProtocol((SecurityProtocol)SecurityProtocol.PLAINTEXT));
        log.info("The embedded Kafka broker with ID {} is stopping.", (Object)this.brokerId);
        this.kafka.shutdown();
        this.kafka.awaitShutdown();
        this.kafka = null;
    }

    public String getBrokerList() {
        return String.format("localhost:%s", this.kafka.boundPort(ListenerName.forSecurityProtocol((SecurityProtocol)SecurityProtocol.PLAINTEXT)));
    }

    public String getClusterId() {
        return this.kafka.clusterId();
    }

    public Integer getBrokerId() {
        return this.brokerId;
    }

    public boolean isActive() {
        return this.kafka != null;
    }
}

