/*
 * Decompiled with CFR 0.152.
 */
package info.batey.kafka.unit;

import info.batey.kafka.unit.Zookeeper;
import java.io.File;
import java.io.IOException;
import java.io.PrintStream;
import java.net.ServerSocket;
import java.nio.file.Files;
import java.nio.file.attribute.FileAttribute;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import joptsimple.OptionSpec;
import kafka.admin.TopicCommand;
import kafka.server.KafkaConfig;
import kafka.server.KafkaServerStartable;
import kafka.utils.ZkUtils;
import org.apache.commons.io.FileUtils;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.security.JaasUtils;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.Console;

public class KafkaUnit {
    private static final Logger LOGGER = LoggerFactory.getLogger(KafkaUnit.class);
    private final String zookeeperString;
    private final String brokerString;
    private final int zkPort;
    private final int brokerPort;
    private final Properties kafkaBrokerConfig = new Properties();
    private final int zkMaxConnections;
    private KafkaServerStartable broker;
    private Zookeeper zookeeper;
    private KafkaProducer<String, String> producer;
    private File logDir;

    public KafkaUnit() throws IOException {
        this(KafkaUnit.getEphemeralPort(), KafkaUnit.getEphemeralPort());
    }

    public KafkaUnit(int zkPort, int brokerPort) {
        this(zkPort, brokerPort, 16);
    }

    public KafkaUnit(String zkConnectionString, String kafkaConnectionString) {
        this(KafkaUnit.parseConnectionString(zkConnectionString), KafkaUnit.parseConnectionString(kafkaConnectionString));
    }

    public KafkaUnit(String zkConnectionString, String kafkaConnectionString, int zkMaxConnections) {
        this(KafkaUnit.parseConnectionString(zkConnectionString), KafkaUnit.parseConnectionString(kafkaConnectionString), zkMaxConnections);
    }

    public KafkaUnit(int zkPort, int brokerPort, int zkMaxConnections) {
        this.zkPort = zkPort;
        this.brokerPort = brokerPort;
        this.zookeeperString = "localhost:" + zkPort;
        this.brokerString = "localhost:" + brokerPort;
        this.zkMaxConnections = zkMaxConnections;
        this.producer = this.createProducer();
    }

    private KafkaProducer<String, String> createProducer() {
        Properties props = new Properties();
        props.put("bootstrap.servers", this.brokerString);
        props.put("key.serializer", StringSerializer.class.getName());
        props.put("value.serializer", StringSerializer.class.getName());
        return new KafkaProducer(props);
    }

    private static int parseConnectionString(String connectionString) {
        try {
            String[] hostPorts = connectionString.split(",");
            if (hostPorts.length != 1) {
                throw new IllegalArgumentException("Only one 'host:port' pair is allowed in connection string");
            }
            String[] hostPort = hostPorts[0].split(":");
            if (hostPort.length != 2) {
                throw new IllegalArgumentException("Invalid format of a 'host:port' pair");
            }
            if (!"localhost".equals(hostPort[0])) {
                throw new IllegalArgumentException("Only localhost is allowed for KafkaUnit");
            }
            return Integer.parseInt(hostPort[1]);
        }
        catch (Exception e) {
            throw new RuntimeException("Cannot parse connectionString " + connectionString, e);
        }
    }

    private static int getEphemeralPort() throws IOException {
        try (ServerSocket socket = new ServerSocket(0);){
            int n = socket.getLocalPort();
            return n;
        }
    }

    public void startup() {
        this.zookeeper = new Zookeeper(this.zkPort, this.zkMaxConnections);
        this.zookeeper.startup();
        try {
            this.logDir = Files.createTempDirectory("kafka", new FileAttribute[0]).toFile();
        }
        catch (IOException e) {
            throw new RuntimeException("Unable to start Kafka", e);
        }
        this.logDir.deleteOnExit();
        Runtime.getRuntime().addShutdownHook(new Thread(this.getDeleteLogDirectoryAction()));
        this.kafkaBrokerConfig.setProperty("zookeeper.connect", this.zookeeperString);
        this.kafkaBrokerConfig.setProperty("broker.id", "1");
        this.kafkaBrokerConfig.setProperty("host.name", "localhost");
        this.kafkaBrokerConfig.setProperty("port", Integer.toString(this.brokerPort));
        this.kafkaBrokerConfig.setProperty("log.dir", this.logDir.getAbsolutePath());
        this.kafkaBrokerConfig.setProperty("log.flush.interval.messages", String.valueOf(1));
        this.kafkaBrokerConfig.setProperty("delete.topic.enable", String.valueOf(true));
        this.kafkaBrokerConfig.setProperty("offsets.topic.replication.factor", String.valueOf(1));
        this.kafkaBrokerConfig.setProperty("auto.create.topics.enable", String.valueOf(false));
        this.broker = new KafkaServerStartable(new KafkaConfig((Map)this.kafkaBrokerConfig));
        this.broker.startup();
    }

    private Runnable getDeleteLogDirectoryAction() {
        return new Runnable(){

            @Override
            public void run() {
                if (KafkaUnit.this.logDir != null) {
                    try {
                        FileUtils.deleteDirectory((File)KafkaUnit.this.logDir);
                    }
                    catch (IOException e) {
                        LOGGER.warn("Problems deleting temporary directory " + KafkaUnit.this.logDir.getAbsolutePath(), (Throwable)e);
                    }
                }
            }
        };
    }

    public String getKafkaConnect() {
        return this.brokerString;
    }

    public int getZkPort() {
        return this.zkPort;
    }

    public int getBrokerPort() {
        return this.brokerPort;
    }

    public void createTopic(String topicName) {
        this.createTopic(topicName, 1);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void createTopic(String topicName, int numPartitions) {
        Object[] arguments = new String[]{"--create", "--zookeeper", this.zookeeperString, "--replication-factor", "1", "--partitions", String.valueOf(numPartitions), "--topic", topicName};
        TopicCommand.TopicCommandOptions opts = new TopicCommand.TopicCommandOptions((String[])arguments);
        try (ZkUtils zkUtils = ZkUtils.apply((String)((String)opts.options().valueOf((OptionSpec)opts.zkConnectOpt())), (int)30000, (int)30000, (boolean)JaasUtils.isZkSecurityEnabled());){
            LOGGER.info("Executing: CreateTopic " + Arrays.toString(arguments));
            TopicCommand.createTopic((ZkUtils)zkUtils, (TopicCommand.TopicCommandOptions)opts);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public List<String> listTopics() {
        Object[] arguments = new String[]{"--zookeeper", this.zookeeperString, "--list"};
        TopicCommand.TopicCommandOptions opts = new TopicCommand.TopicCommandOptions((String[])arguments);
        final ArrayList<String> topics = new ArrayList<String>();
        try (ZkUtils zkUtils = ZkUtils.apply((String)((String)opts.options().valueOf((OptionSpec)opts.zkConnectOpt())), (int)30000, (int)30000, (boolean)JaasUtils.isZkSecurityEnabled());){
            LOGGER.info("Executing: ListTopics " + Arrays.toString(arguments));
            PrintStream oldOut = Console.out();
            try {
                Console.setOut((PrintStream)new PrintStream(oldOut){

                    @Override
                    public void print(String s) {
                        super.print(s);
                        if (!s.endsWith("marked for deletion")) {
                            topics.add(s);
                        }
                    }
                });
                TopicCommand.listTopics((ZkUtils)zkUtils, (TopicCommand.TopicCommandOptions)opts);
            }
            finally {
                Console.setOut((PrintStream)oldOut);
            }
        }
        return topics;
    }

    public void deleteAllTopics() {
        for (String topic : this.listTopics()) {
            try {
                this.deleteTopic(topic);
            }
            catch (Throwable throwable) {}
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void deleteTopic(String topicName) {
        Object[] arguments = new String[]{"--zookeeper", this.zookeeperString, "--delete", "--topic", topicName};
        TopicCommand.TopicCommandOptions opts = new TopicCommand.TopicCommandOptions((String[])arguments);
        try (ZkUtils zkUtils = ZkUtils.apply((String)((String)opts.options().valueOf((OptionSpec)opts.zkConnectOpt())), (int)30000, (int)30000, (boolean)JaasUtils.isZkSecurityEnabled());){
            LOGGER.info("Executing: DeleteTopic " + Arrays.toString(arguments));
            TopicCommand.deleteTopic((ZkUtils)zkUtils, (TopicCommand.TopicCommandOptions)opts);
        }
    }

    public void shutdown() {
        if (this.broker != null) {
            this.broker.shutdown();
            this.broker.awaitShutdown();
        }
        if (this.zookeeper != null) {
            this.zookeeper.shutdown();
        }
    }

    public List<ConsumerRecord<String, String>> readRecords(String topicName, int maxPoll) {
        return this.readMessages(topicName, maxPoll, new PasstroughMessageExtractor());
    }

    public List<String> readMessages(String topicName, int maxPoll) {
        return this.readMessages(topicName, maxPoll, new ValueMessageExtractor());
    }

    public List<String> readAllMessages(String topicName) {
        return this.readMessages(topicName, Integer.MAX_VALUE, new ValueMessageExtractor());
    }

    private <T> List<T> readMessages(String topicName, int maxPoll, MessageExtractor<T> messageExtractor) {
        Properties props = new Properties();
        props.put("bootstrap.servers", this.brokerString);
        props.put("group.id", "test");
        props.put("enable.auto.commit", "true");
        props.put("auto.commit.interval.ms", "1000");
        props.put("session.timeout.ms", "30000");
        props.put("key.deserializer", StringDeserializer.class.getName());
        props.put("value.deserializer", StringDeserializer.class.getName());
        props.put("max.poll.records", String.valueOf(maxPoll));
        try (KafkaConsumer kafkaConsumer = new KafkaConsumer(props);){
            kafkaConsumer.subscribe(Collections.singletonList(topicName));
            kafkaConsumer.poll(0L);
            kafkaConsumer.seekToBeginning(Collections.singletonList(new TopicPartition(topicName, 0)));
            ConsumerRecords records = kafkaConsumer.poll(1000L);
            ArrayList<T> messages = new ArrayList<T>();
            for (ConsumerRecord record : records) {
                messages.add(messageExtractor.extract((ConsumerRecord<String, String>)record));
            }
            ArrayList<T> arrayList = messages;
            return arrayList;
        }
    }

    @SafeVarargs
    public final void sendMessages(ProducerRecord<String, String> ... records) {
        for (ProducerRecord<String, String> record : records) {
            try {
                this.producer.send(record).get();
            }
            catch (InterruptedException | ExecutionException e) {
                throw new RuntimeException(e);
            }
            finally {
                this.producer.flush();
            }
        }
    }

    public final void setKafkaBrokerConfig(String configKey, String configValue) {
        this.kafkaBrokerConfig.setProperty(configKey, configValue);
    }

    public class PasstroughMessageExtractor
    implements MessageExtractor<ConsumerRecord<String, String>> {
        @Override
        public ConsumerRecord<String, String> extract(ConsumerRecord<String, String> record) {
            return record;
        }
    }

    public class ValueMessageExtractor
    implements MessageExtractor<String> {
        @Override
        public String extract(ConsumerRecord<String, String> record) {
            return (String)record.value();
        }
    }

    private static interface MessageExtractor<T> {
        public T extract(ConsumerRecord<String, String> var1);
    }
}

