/*
 * Decompiled with CFR 0.152.
 */
package com.github.charithe.kafka;

import java.io.IOException;
import java.nio.file.FileVisitResult;
import java.nio.file.FileVisitor;
import java.nio.file.Files;
import java.nio.file.LinkOption;
import java.nio.file.Path;
import java.nio.file.SimpleFileVisitor;
import java.nio.file.attribute.BasicFileAttributes;
import java.nio.file.attribute.FileAttribute;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import kafka.consumer.Consumer;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
import kafka.producer.ProducerConfig;
import kafka.serializer.Decoder;
import kafka.serializer.DefaultDecoder;
import kafka.serializer.StringDecoder;
import kafka.server.KafkaConfig;
import kafka.server.KafkaServerStartable;
import org.apache.curator.test.InstanceSpec;
import org.apache.curator.test.TestingServer;
import org.junit.rules.ExternalResource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class KafkaJunitRule
extends ExternalResource {
    private static final Logger LOGGER = LoggerFactory.getLogger(KafkaJunitRule.class);
    private static final int ALLOCATE_RANDOM_PORT = -1;
    private static final String LOCALHOST = "localhost";
    private TestingServer zookeeper;
    private KafkaServerStartable kafkaServer;
    private int zookeeperPort = -1;
    private String zookeeperConnectionString;
    private int kafkaPort;
    private Path kafkaLogDir;

    public KafkaJunitRule() {
        this(-1);
    }

    public KafkaJunitRule(int kafkaPort, int zookeeperPort) {
        this(kafkaPort);
        this.zookeeperPort = zookeeperPort;
    }

    public KafkaJunitRule(int kafkaPort) {
        this.kafkaPort = kafkaPort == -1 ? InstanceSpec.getRandomPort() : kafkaPort;
    }

    protected void before() throws Throwable {
        if (this.zookeeperPort == -1) {
            this.zookeeper = new TestingServer(true);
            this.zookeeperPort = this.zookeeper.getPort();
        } else {
            this.zookeeper = new TestingServer(this.zookeeperPort, true);
        }
        this.zookeeperConnectionString = this.zookeeper.getConnectString();
        KafkaConfig kafkaConfig = this.buildKafkaConfig(this.zookeeperConnectionString);
        LOGGER.info("Starting Kafka server with config: {}", (Object)kafkaConfig.props().props());
        this.kafkaServer = new KafkaServerStartable(kafkaConfig);
        this.startKafka();
    }

    protected void after() {
        try {
            this.shutdownKafka();
            if (this.zookeeper != null) {
                LOGGER.info("Shutting down Zookeeper");
                this.zookeeper.close();
            }
            if (Files.exists(this.kafkaLogDir, new LinkOption[0])) {
                LOGGER.info("Deleting the log dir:  {}", (Object)this.kafkaLogDir);
                Files.walkFileTree(this.kafkaLogDir, (FileVisitor<? super Path>)new SimpleFileVisitor<Path>(){

                    @Override
                    public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) throws IOException {
                        Files.deleteIfExists(file);
                        return FileVisitResult.CONTINUE;
                    }

                    @Override
                    public FileVisitResult postVisitDirectory(Path dir, IOException exc) throws IOException {
                        Files.deleteIfExists(dir);
                        return FileVisitResult.CONTINUE;
                    }
                });
            }
        }
        catch (Exception e) {
            LOGGER.error("Failed to clean-up Kafka", (Throwable)e);
        }
    }

    public void shutdownKafka() {
        if (this.kafkaServer != null) {
            LOGGER.info("Shutting down Kafka Server");
            this.kafkaServer.shutdown();
        }
    }

    public void startKafka() {
        if (this.kafkaServer != null) {
            LOGGER.info("Starting Kafka Server");
            this.kafkaServer.startup();
        }
    }

    private KafkaConfig buildKafkaConfig(String zookeeperQuorum) throws IOException {
        this.kafkaLogDir = Files.createTempDirectory("kafka_junit", new FileAttribute[0]);
        Properties props = new Properties();
        props.put("advertised.host.name", LOCALHOST);
        props.put("port", this.kafkaPort + "");
        props.put("broker.id", "1");
        props.put("log.dirs", this.kafkaLogDir.toAbsolutePath().toString());
        props.put("zookeeper.connect", zookeeperQuorum);
        return new KafkaConfig(props);
    }

    public ProducerConfig producerConfigWithDefaultEncoder() {
        return this.producerConfig("kafka.serializer.DefaultEncoder");
    }

    public ProducerConfig producerConfigWithStringEncoder() {
        return this.producerConfig("kafka.serializer.StringEncoder");
    }

    public ProducerConfig producerConfig(String serializerClass) {
        Properties props = new Properties();
        props.put("bootstrap.servers", "localhost:" + this.kafkaPort);
        props.put("metadata.broker.list", "localhost:" + this.kafkaPort);
        props.put("serializer.class", serializerClass);
        props.put("producer.type", "sync");
        props.put("request.required.acks", "1");
        return new ProducerConfig(props);
    }

    public ConsumerConfig consumerConfig() {
        Properties props = new Properties();
        props.put("zookeeper.connect", this.zookeeperConnectionString);
        props.put("group.id", "kafka-junit-consumer");
        props.put("zookeeper.session.timeout.ms", "400");
        props.put("zookeeper.sync.time.ms", "200");
        props.put("auto.commit.interval.ms", "1000");
        props.put("auto.offset.reset", "smallest");
        return new ConsumerConfig(props);
    }

    public List<String> readStringMessages(String topic, int expectedMessages) throws TimeoutException {
        return this.readMessages(topic, expectedMessages, (Decoder)new StringDecoder(null));
    }

    public <T> List<T> readMessages(String topic, final int expectedMessages, Decoder<T> decoder) throws TimeoutException {
        ExecutorService singleThread = Executors.newSingleThreadExecutor();
        ConsumerConnector connector = null;
        try {
            connector = Consumer.createJavaConsumerConnector((ConsumerConfig)this.consumerConfig());
            Map streams = connector.createMessageStreams(Collections.singletonMap(topic, 1), (Decoder)new DefaultDecoder(null), decoder);
            final KafkaStream messageSteam = (KafkaStream)((List)streams.get(topic)).get(0);
            Future future = singleThread.submit(new Callable<List<T>>(){

                @Override
                public List<T> call() throws Exception {
                    ArrayList<Object> messages = new ArrayList<Object>(expectedMessages);
                    ConsumerIterator iterator = messageSteam.iterator();
                    while (messages.size() != expectedMessages && iterator.hasNext()) {
                        Object message = iterator.next().message();
                        LOGGER.debug("Received message: {}", message);
                        messages.add(message);
                    }
                    return messages;
                }
            });
            List list = (List)future.get(5L, TimeUnit.SECONDS);
            return list;
        }
        catch (InterruptedException | ExecutionException | TimeoutException e) {
            throw new TimeoutException("Timed out waiting for messages");
        }
        catch (Exception e) {
            throw new RuntimeException("Unexpected exception while reading messages", e);
        }
        finally {
            singleThread.shutdown();
            if (connector != null) {
                connector.shutdown();
            }
        }
    }

    public Path kafkaLogDir() {
        return this.kafkaLogDir;
    }

    public int kafkaBrokerPort() {
        return this.kafkaPort;
    }

    public int zookeeperPort() {
        return this.zookeeperPort;
    }

    public String zookeeperConnectionString() {
        return this.zookeeperConnectionString;
    }
}

