/*
 * Decompiled with CFR 0.152.
 */
package net.manub.embeddedkafka;

import java.net.InetSocketAddress;
import java.util.Properties;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import kafka.consumer.Consumer$;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerConnector;
import kafka.consumer.KafkaStream;
import kafka.consumer.TopicFilter;
import kafka.consumer.Whitelist;
import kafka.serializer.Decoder;
import kafka.serializer.StringDecoder;
import kafka.serializer.StringDecoder$;
import kafka.server.KafkaConfig;
import kafka.server.KafkaServer;
import kafka.server.KafkaServer$;
import net.manub.embeddedkafka.EmbeddedKafka;
import net.manub.embeddedkafka.EmbeddedKafkaConfig;
import net.manub.embeddedkafka.KafkaSpecException;
import net.manub.embeddedkafka.KafkaUnavailableException;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.zookeeper.server.ServerCnxnFactory;
import org.apache.zookeeper.server.ZooKeeperServer;
import org.scalatest.Suite;
import scala.Function0;
import scala.Predef;
import scala.Predef$;
import scala.Serializable;
import scala.StringContext;
import scala.Tuple2;
import scala.collection.JavaConversions$;
import scala.collection.Map;
import scala.collection.Seq;
import scala.concurrent.Await$;
import scala.concurrent.Awaitable;
import scala.concurrent.ExecutionContext;
import scala.concurrent.ExecutionContext$;
import scala.concurrent.Future$;
import scala.concurrent.duration.Duration;
import scala.concurrent.duration.package;
import scala.concurrent.duration.package$;
import scala.reflect.io.Directory;
import scala.reflect.io.Directory$;
import scala.runtime.BoxesRunTime;
import scala.runtime.Nothing$;
import scala.util.Try;
import scala.util.Try$;

public abstract class EmbeddedKafka$class {
    /*
     * WARNING - void declaration
     */
    public static void withRunningKafka(EmbeddedKafka $this, Function0 body, EmbeddedKafkaConfig config) {
        ServerCnxnFactory factory = EmbeddedKafka$class.startZooKeeper($this, config.zooKeeperPort());
        KafkaServer broker = EmbeddedKafka$class.startKafka($this, config);
        try {
            body.apply$mcV$sp();
        }
        catch (Throwable throwable) {
            void var3_3;
            void var4_4;
            var4_4.shutdown();
            var3_3.shutdown();
            throw throwable;
        }
        broker.shutdown();
        factory.shutdown();
    }

    public static void publishToKafka(EmbeddedKafka $this, String topic, String message, EmbeddedKafkaConfig config) throws KafkaUnavailableException {
        KafkaProducer kafkaProducer = new KafkaProducer(JavaConversions$.MODULE$.mapAsJavaMap((Map)Predef$.MODULE$.Map().apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)"bootstrap.servers"), (Object)new StringContext((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"localhost:", ""})).s((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{BoxesRunTime.boxToInteger((int)config.kafkaPort())}))), Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)"key.serializer"), (Object)StringSerializer.class.getName()), Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)"value.serializer"), (Object)StringSerializer.class.getName()), Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)"metadata.fetch.timeout.ms"), (Object)((Object)BoxesRunTime.boxToInteger((int)3000)).toString()), Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)"retry.backoff.ms"), (Object)((Object)BoxesRunTime.boxToInteger((int)1000)).toString())}))));
        Future sendFuture = kafkaProducer.send(new ProducerRecord(topic, (Object)message));
        Try sendResult = Try$.MODULE$.apply((Function0)new Serializable($this, sendFuture){
            public static final long serialVersionUID = 0L;
            private final Future sendFuture$1;

            public final RecordMetadata apply() {
                return (RecordMetadata)this.sendFuture$1.get(3L, TimeUnit.SECONDS);
            }
            {
                this.sendFuture$1 = sendFuture$1;
            }
        });
        kafkaProducer.close();
        if (sendResult.isFailure()) {
            throw new KafkaUnavailableException();
        }
    }

    /*
     * WARNING - void declaration
     */
    public static String consumeFirstMessageFrom(EmbeddedKafka $this, String topic, EmbeddedKafkaConfig config) throws TimeoutException, KafkaUnavailableException {
        String string;
        Whitelist filter;
        Properties props = new Properties();
        props.put("group.id", new StringContext((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"embedded-kafka-spec-", ""})).s((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{((Suite)$this).suiteId()})));
        props.put("zookeeper.connect", new StringContext((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"localhost:", ""})).s((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{BoxesRunTime.boxToInteger((int)config.zooKeeperPort())})));
        props.put("auto.offset.reset", "smallest");
        props.put("zookeeper.connection.timeout.ms", "6000");
        ConsumerConnector consumer = (ConsumerConnector)Try$.MODULE$.apply((Function0)new Serializable($this, props){
            public static final long serialVersionUID = 0L;
            private final Properties props$1;

            public final ConsumerConnector apply() {
                return Consumer$.MODULE$.create(new ConsumerConfig(this.props$1));
            }
            {
                this.props$1 = props$1;
            }
        }).getOrElse((Function0)new Serializable($this){
            public static final long serialVersionUID = 0L;

            public final Nothing$ apply() {
                throw new KafkaUnavailableException();
            }
        });
        Whitelist x$1 = filter = new Whitelist(topic);
        StringDecoder x$2 = new StringDecoder(StringDecoder$.MODULE$.$lessinit$greater$default$1());
        StringDecoder x$3 = new StringDecoder(StringDecoder$.MODULE$.$lessinit$greater$default$1());
        int x$4 = consumer.createMessageStreamsByFilter$default$2();
        Seq messageStreams = consumer.createMessageStreamsByFilter((TopicFilter)x$1, x$4, (Decoder)x$2, (Decoder)x$3);
        scala.concurrent.Future messageFuture = Future$.MODULE$.apply((Function0)new Serializable($this, messageStreams){
            public static final long serialVersionUID = 0L;
            private final Seq messageStreams$1;

            public final String apply() {
                return (String)((KafkaStream)this.messageStreams$1.headOption().getOrElse((Function0)new Serializable(this){
                    public static final long serialVersionUID = 0L;

                    public final Nothing$ apply() {
                        throw new KafkaSpecException("Unable to find a message stream");
                    }
                })).iterator().next().message();
            }
            {
                this.messageStreams$1 = messageStreams$1;
            }
        }, (ExecutionContext)$this.executionContext());
        try {
            string = (String)Await$.MODULE$.result((Awaitable)messageFuture, (Duration)new package.DurationInt(package$.MODULE$.DurationInt(3)).seconds());
        }
        catch (Throwable throwable) {
            void var4_4;
            var4_4.shutdown();
            throw throwable;
        }
        consumer.shutdown();
        return string;
    }

    private static ServerCnxnFactory startZooKeeper(EmbeddedKafka $this, int zooKeeperPort) {
        Directory zkLogsDir = Directory$.MODULE$.makeTemp("zookeeper-logs", Directory$.MODULE$.makeTemp$default$2(), Directory$.MODULE$.makeTemp$default$3());
        int tickTime = 2000;
        ZooKeeperServer zkServer = new ZooKeeperServer(zkLogsDir.toFile().jfile(), zkLogsDir.toFile().jfile(), tickTime);
        ServerCnxnFactory factory = ServerCnxnFactory.createFactory();
        factory.configure(new InetSocketAddress("localhost", zooKeeperPort), 1024);
        factory.startup(zkServer);
        return factory;
    }

    private static KafkaServer startKafka(EmbeddedKafka $this, EmbeddedKafkaConfig config) {
        Directory kafkaLogDir = Directory$.MODULE$.makeTemp("kafka", Directory$.MODULE$.makeTemp$default$2(), Directory$.MODULE$.makeTemp$default$3());
        String zkAddress = new StringContext((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"localhost:", ""})).s((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{BoxesRunTime.boxToInteger((int)config.zooKeeperPort())}));
        Properties properties = new Properties();
        properties.setProperty("zookeeper.connect", zkAddress);
        properties.setProperty("broker.id", "0");
        properties.setProperty("host.name", "localhost");
        properties.setProperty("auto.create.topics.enable", "true");
        properties.setProperty("port", ((Object)BoxesRunTime.boxToInteger((int)config.kafkaPort())).toString());
        properties.setProperty("log.dir", kafkaLogDir.toAbsolute().path());
        properties.setProperty("log.flush.interval.messages", ((Object)BoxesRunTime.boxToInteger((int)1)).toString());
        KafkaServer broker = new KafkaServer(new KafkaConfig(properties), KafkaServer$.MODULE$.$lessinit$greater$default$2());
        broker.startup();
        return broker;
    }

    public static void $init$(EmbeddedKafka $this) {
        $this.net$manub$embeddedkafka$EmbeddedKafka$_setter_$executorService_$eq(Executors.newFixedThreadPool(2));
        $this.net$manub$embeddedkafka$EmbeddedKafka$_setter_$executionContext_$eq(ExecutionContext$.MODULE$.fromExecutorService($this.executorService()));
    }
}

