/*
 * Decompiled with CFR 0.152.
 */
package com.consol.citrus.kafka.embedded;

import com.consol.citrus.exceptions.CitrusRuntimeException;
import java.io.File;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.Set;
import java.util.function.BiConsumer;
import java.util.stream.Collectors;
import kafka.server.KafkaConfig;
import kafka.server.KafkaServer;
import kafka.server.NotRunning;
import kafka.utils.CoreUtils;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.CreateTopicsResult;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.common.network.ListenerName;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.apache.kafka.common.utils.Time;
import org.apache.zookeeper.server.NIOServerCnxnFactory;
import org.apache.zookeeper.server.ServerCnxnFactory;
import org.apache.zookeeper.server.ZooKeeperServer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.util.SocketUtils;
import org.springframework.util.StringUtils;
import scala.Option;
import scala.collection.JavaConversions;
import scala.collection.Seq;

public class EmbeddedKafkaServer
implements InitializingBean,
DisposableBean {
    private static Logger log = LoggerFactory.getLogger(EmbeddedKafkaServer.class);
    private ZooKeeperServer zookeeper;
    private ServerCnxnFactory serverFactory;
    private int zookeeperPort = SocketUtils.findAvailableTcpPort();
    private KafkaServer kafkaServer;
    private int kafkaServerPort = 9092;
    private int partitions = 1;
    private String topics = "citrus";
    private String logDirPath;
    private boolean autoDeleteLogs = true;
    private Map<String, String> brokerProperties = Collections.emptyMap();

    public void start() {
        if (this.kafkaServer != null) {
            log.warn("Found instance of Kafka server - avoid duplicate Kafka server startup");
            return;
        }
        File logDir = this.createLogDir();
        this.zookeeper = this.createZookeeperServer(logDir);
        this.serverFactory = this.createServerFactory();
        try {
            this.serverFactory.startup(this.zookeeper);
        }
        catch (IOException | InterruptedException e) {
            throw new CitrusRuntimeException("Failed to start embedded zookeeper server", (Throwable)e);
        }
        Properties brokerConfigProperties = this.createBrokerProperties("localhost:" + this.zookeeperPort, this.kafkaServerPort, logDir);
        brokerConfigProperties.setProperty(KafkaConfig.ReplicaSocketTimeoutMsProp(), "1000");
        brokerConfigProperties.setProperty(KafkaConfig.ControllerSocketTimeoutMsProp(), "1000");
        brokerConfigProperties.setProperty(KafkaConfig.OffsetsTopicReplicationFactorProp(), "1");
        brokerConfigProperties.setProperty(KafkaConfig.ReplicaHighWatermarkCheckpointIntervalMsProp(), String.valueOf(Long.MAX_VALUE));
        if (this.brokerProperties != null) {
            this.brokerProperties.forEach(brokerConfigProperties::put);
        }
        this.kafkaServer = new KafkaServer(new KafkaConfig((Map)brokerConfigProperties), Time.SYSTEM, Option.apply(null), (Seq)JavaConversions.asScalaBuffer(Collections.emptyList()).toList());
        this.kafkaServer.startup();
        this.kafkaServer.boundPort(ListenerName.forSecurityProtocol((SecurityProtocol)SecurityProtocol.PLAINTEXT));
        this.createKafkaTopics(StringUtils.commaDelimitedListToSet((String)this.topics));
    }

    public void stop() {
        if (this.kafkaServer != null) {
            try {
                if (this.kafkaServer.brokerState().currentState() != NotRunning.state()) {
                    this.kafkaServer.shutdown();
                    this.kafkaServer.awaitShutdown();
                }
            }
            catch (Exception e) {
                log.warn("Failed to shutdown Kafka embedded server", (Throwable)e);
            }
            try {
                CoreUtils.delete((Seq)this.kafkaServer.config().logDirs());
            }
            catch (Exception e) {
                log.warn("Failed to remove logs on Kafka embedded server", (Throwable)e);
            }
        }
        if (this.serverFactory != null) {
            try {
                this.serverFactory.shutdown();
            }
            catch (Exception e) {
                log.warn("Failed to shutdown Zookeeper instance", (Throwable)e);
            }
        }
    }

    public void destroy() throws Exception {
        this.stop();
    }

    public void afterPropertiesSet() throws Exception {
        this.start();
    }

    protected ZooKeeperServer createZookeeperServer(File logDir) {
        try {
            return new ZooKeeperServer(logDir, logDir, 2000);
        }
        catch (IOException e) {
            throw new CitrusRuntimeException("Failed to create embedded zookeeper server", (Throwable)e);
        }
    }

    protected File createLogDir() {
        File logDir = Optional.ofNullable(this.logDirPath).map(x$0 -> Paths.get(x$0, new String[0])).map(Path::toFile).orElse(new File(System.getProperty("java.io.tmpdir")));
        if (!logDir.exists() && !logDir.mkdirs()) {
            log.warn("Unable to create log directory: " + logDir.getAbsolutePath());
            logDir = new File(System.getProperty("java.io.tmpdir"));
            log.info("Using default log directory: " + logDir.getAbsolutePath());
        }
        File logs = new File(logDir, "zookeeper" + System.currentTimeMillis()).getAbsoluteFile();
        if (this.autoDeleteLogs) {
            logs.deleteOnExit();
        }
        return logs;
    }

    protected ServerCnxnFactory createServerFactory() {
        try {
            NIOServerCnxnFactory serverFactory = new NIOServerCnxnFactory();
            serverFactory.configure(new InetSocketAddress(this.zookeeperPort), 5000);
            return serverFactory;
        }
        catch (IOException e) {
            throw new CitrusRuntimeException("Failed to create default zookeeper server factory", (Throwable)e);
        }
    }

    protected void createKafkaTopics(Set<String> topics) {
        HashMap<String, String> adminConfigs = new HashMap<String, String>();
        adminConfigs.put("bootstrap.servers", "localhost:" + this.kafkaServerPort);
        try (AdminClient admin = AdminClient.create(adminConfigs);){
            List newTopics = topics.stream().map(t -> new NewTopic(t, this.partitions, 1)).collect(Collectors.toList());
            CreateTopicsResult createTopics = admin.createTopics(newTopics);
            try {
                createTopics.all().get();
            }
            catch (Exception e) {
                log.warn("Failed to create Kafka topics", (Throwable)e);
            }
        }
    }

    protected Properties createBrokerProperties(String zooKeeperConnect, int kafkaServerPort, File logDir) {
        Properties props = new Properties();
        props.put(KafkaConfig.BrokerIdProp(), "0");
        props.put(KafkaConfig.ZkConnectProp(), zooKeeperConnect);
        props.put(KafkaConfig.ZkConnectionTimeoutMsProp(), "10000");
        props.put(KafkaConfig.ReplicaSocketTimeoutMsProp(), "1500");
        props.put(KafkaConfig.ControllerSocketTimeoutMsProp(), "1500");
        props.put(KafkaConfig.ControlledShutdownEnableProp(), "false");
        props.put(KafkaConfig.DeleteTopicEnableProp(), "true");
        props.put(KafkaConfig.LogDeleteDelayMsProp(), "1000");
        props.put(KafkaConfig.ControlledShutdownRetryBackoffMsProp(), "100");
        props.put(KafkaConfig.LogCleanerDedupeBufferSizeProp(), "2097152");
        props.put(KafkaConfig.LogMessageTimestampDifferenceMaxMsProp(), (Object)Long.MAX_VALUE);
        props.put(KafkaConfig.OffsetsTopicReplicationFactorProp(), "1");
        props.put(KafkaConfig.OffsetsTopicPartitionsProp(), "5");
        props.put(KafkaConfig.GroupInitialRebalanceDelayMsProp(), "0");
        props.put(KafkaConfig.LogDirProp(), logDir.getAbsolutePath());
        props.put(KafkaConfig.ListenersProp(), SecurityProtocol.PLAINTEXT.name + "://localhost:" + kafkaServerPort);
        props.forEach((BiConsumer<? super Object, ? super Object>)((BiConsumer<Object, Object>)(key, value) -> log.debug(String.format("Using default Kafka broker property %s='%s'", key, value))));
        return props;
    }

    public int getZookeeperPort() {
        return this.zookeeperPort;
    }

    public void setZookeeperPort(int zookeeperPort) {
        this.zookeeperPort = zookeeperPort;
    }

    public int getKafkaServerPort() {
        return this.kafkaServerPort;
    }

    public void setKafkaServerPort(int kafkaServerPort) {
        this.kafkaServerPort = kafkaServerPort;
    }

    public int getPartitions() {
        return this.partitions;
    }

    public void setPartitions(int partitions) {
        this.partitions = partitions;
    }

    public String getTopics() {
        return this.topics;
    }

    public void setTopics(String topics) {
        this.topics = topics;
    }

    public Map<String, String> getBrokerProperties() {
        return this.brokerProperties;
    }

    public void setBrokerProperties(Map<String, String> brokerProperties) {
        this.brokerProperties = brokerProperties;
    }

    public String getLogDirPath() {
        return this.logDirPath;
    }

    public void setLogDirPath(String logDirPath) {
        this.logDirPath = logDirPath;
    }

    public boolean isAutoDeleteLogs() {
        return this.autoDeleteLogs;
    }

    public void setAutoDeleteLogs(boolean autoDeleteLogs) {
        this.autoDeleteLogs = autoDeleteLogs;
    }
}

