/*
 * Decompiled with CFR 0.152.
 */
package net.manub.embeddedkafka;

import java.net.InetSocketAddress;
import java.util.Properties;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import kafka.consumer.Consumer$;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerConnector;
import kafka.consumer.KafkaStream;
import kafka.consumer.TopicFilter;
import kafka.consumer.Whitelist;
import kafka.serializer.Decoder;
import kafka.serializer.StringDecoder;
import kafka.serializer.StringDecoder$;
import kafka.server.KafkaConfig;
import kafka.server.KafkaServer;
import kafka.server.KafkaServer$;
import net.manub.embeddedkafka.EmbeddedKafkaConfig;
import net.manub.embeddedkafka.EmbeddedKafkaSupport;
import net.manub.embeddedkafka.KafkaSpecException;
import net.manub.embeddedkafka.KafkaUnavailableException;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.zookeeper.server.ServerCnxnFactory;
import org.apache.zookeeper.server.ZooKeeperServer;
import scala.Function0;
import scala.Function1;
import scala.MatchError;
import scala.Option;
import scala.Predef;
import scala.Predef$;
import scala.Serializable;
import scala.StringContext;
import scala.Tuple2;
import scala.collection.JavaConversions$;
import scala.collection.Map;
import scala.collection.Seq;
import scala.collection.immutable.Nil$;
import scala.concurrent.Await$;
import scala.concurrent.Awaitable;
import scala.concurrent.ExecutionContext;
import scala.concurrent.ExecutionContext$;
import scala.concurrent.Future$;
import scala.concurrent.duration.Duration;
import scala.concurrent.duration.package;
import scala.concurrent.duration.package$;
import scala.reflect.io.Directory;
import scala.reflect.io.Directory$;
import scala.runtime.BoxesRunTime;
import scala.runtime.Nothing$;
import scala.util.Try;
import scala.util.Try$;
import scala.util.control.NonFatal$;

public abstract class EmbeddedKafkaSupport$class {
    /*
     * WARNING - void declaration
     */
    public static void withRunningKafka(EmbeddedKafkaSupport $this, Function0 body, EmbeddedKafkaConfig config) {
        ServerCnxnFactory factory = $this.startZooKeeper(config.zooKeeperPort(), $this.startZooKeeper$default$2());
        KafkaServer broker = $this.startKafka(config, $this.startKafka$default$2());
        try {
            body.apply$mcV$sp();
        }
        catch (Throwable throwable) {
            void var3_3;
            void var4_4;
            var4_4.shutdown();
            var3_3.shutdown();
            throw throwable;
        }
        broker.shutdown();
        factory.shutdown();
    }

    public static void publishStringMessageToKafka(EmbeddedKafkaSupport $this, String topic, String message, EmbeddedKafkaConfig config) {
        $this.publishToKafka(topic, message, config, new StringSerializer());
    }

    public static void publishToKafka(EmbeddedKafkaSupport $this, String topic, Object message, EmbeddedKafkaConfig config, Serializer serializer) throws KafkaUnavailableException {
        KafkaProducer kafkaProducer = new KafkaProducer(JavaConversions$.MODULE$.mapAsJavaMap((Map)Predef$.MODULE$.Map().apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)"bootstrap.servers"), (Object)new StringContext((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"localhost:", ""})).s((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{BoxesRunTime.boxToInteger((int)config.kafkaPort())}))), Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)"metadata.fetch.timeout.ms"), (Object)((Object)BoxesRunTime.boxToInteger((int)5000)).toString()), Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)"retry.backoff.ms"), (Object)((Object)BoxesRunTime.boxToInteger((int)1000)).toString())}))), (Serializer)new StringSerializer(), serializer);
        Future sendFuture = kafkaProducer.send(new ProducerRecord(topic, message));
        Try sendResult = Try$.MODULE$.apply((Function0)new Serializable($this, sendFuture){
            public static final long serialVersionUID = 0L;
            private final Future sendFuture$1;

            public final RecordMetadata apply() {
                return (RecordMetadata)this.sendFuture$1.get(5L, TimeUnit.SECONDS);
            }
            {
                this.sendFuture$1 = sendFuture$1;
            }
        });
        kafkaProducer.close();
        if (sendResult.isFailure()) {
            throw new KafkaUnavailableException((Throwable)sendResult.failed().get());
        }
    }

    public static String consumeFirstStringMessageFrom(EmbeddedKafkaSupport $this, String topic, EmbeddedKafkaConfig config) {
        return (String)$this.consumeFirstMessageFrom(topic, config, new StringDecoder(StringDecoder$.MODULE$.$lessinit$greater$default$1()));
    }

    /*
     * WARNING - void declaration
     */
    public static Object consumeFirstMessageFrom(EmbeddedKafkaSupport $this, String topic, EmbeddedKafkaConfig config, Decoder decoder) throws TimeoutException, KafkaUnavailableException {
        Object object;
        ConsumerConnector consumer;
        Properties props = new Properties();
        props.put("group.id", new StringContext((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"embedded-kafka-spec"})).s((Seq)Nil$.MODULE$));
        props.put("zookeeper.connect", new StringContext((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"localhost:", ""})).s((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{BoxesRunTime.boxToInteger((int)config.zooKeeperPort())})));
        props.put("auto.offset.reset", "smallest");
        props.put("zookeeper.connection.timeout.ms", "6000");
        try {
            consumer = Consumer$.MODULE$.create(new ConsumerConfig(props));
        }
        catch (Throwable throwable) {
            Throwable throwable2 = throwable;
            Option option = NonFatal$.MODULE$.unapply(throwable2);
            if (option.isEmpty()) {
                throw throwable;
            }
            Throwable e = (Throwable)option.get();
            throw new KafkaUnavailableException(e);
        }
        Whitelist x$4 = new Whitelist(topic);
        StringDecoder x$5 = new StringDecoder(StringDecoder$.MODULE$.$lessinit$greater$default$1());
        Decoder x$6 = decoder;
        int x$7 = consumer.createMessageStreamsByFilter$default$2();
        Seq messageStreams = consumer.createMessageStreamsByFilter((TopicFilter)x$4, x$7, (Decoder)x$5, x$6);
        scala.concurrent.Future messageFuture = Future$.MODULE$.apply((Function0)new Serializable($this, messageStreams){
            public static final long serialVersionUID = 0L;
            private final Seq messageStreams$1;

            public final T apply() {
                return (T)((KafkaStream)this.messageStreams$1.headOption().getOrElse((Function0)new Serializable(this){
                    public static final long serialVersionUID = 0L;

                    public final Nothing$ apply() {
                        throw new KafkaSpecException("Unable to find a message stream");
                    }
                })).iterator().next().message();
            }
            {
                this.messageStreams$1 = messageStreams$1;
            }
        }, (ExecutionContext)$this.executionContext());
        try {
            object = Await$.MODULE$.result((Awaitable)messageFuture, (Duration)new package.DurationInt(package$.MODULE$.DurationInt(5)).seconds());
        }
        catch (Throwable throwable) {
            void var5_5;
            var5_5.shutdown();
            throw throwable;
        }
        consumer.shutdown();
        return object;
    }

    public static ServerCnxnFactory startZooKeeper(EmbeddedKafkaSupport $this, int zooKeeperPort, Directory zkLogsDir) {
        int tickTime = 2000;
        ZooKeeperServer zkServer = new ZooKeeperServer(zkLogsDir.toFile().jfile(), zkLogsDir.toFile().jfile(), tickTime);
        ServerCnxnFactory factory = ServerCnxnFactory.createFactory();
        factory.configure(new InetSocketAddress("localhost", zooKeeperPort), 1024);
        factory.startup(zkServer);
        return factory;
    }

    public static Directory startZooKeeper$default$2(EmbeddedKafkaSupport $this) {
        return Directory$.MODULE$.makeTemp("zookeeper-logs", Directory$.MODULE$.makeTemp$default$2(), Directory$.MODULE$.makeTemp$default$3());
    }

    public static KafkaServer startKafka(EmbeddedKafkaSupport $this, EmbeddedKafkaConfig config, Directory kafkaLogDir) {
        String zkAddress = new StringContext((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"localhost:", ""})).s((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{BoxesRunTime.boxToInteger((int)config.zooKeeperPort())}));
        Properties properties = new Properties();
        properties.setProperty("zookeeper.connect", zkAddress);
        properties.setProperty("broker.id", "0");
        properties.setProperty("host.name", "localhost");
        properties.setProperty("auto.create.topics.enable", "true");
        properties.setProperty("port", ((Object)BoxesRunTime.boxToInteger((int)config.kafkaPort())).toString());
        properties.setProperty("log.dir", kafkaLogDir.toAbsolute().path());
        properties.setProperty("log.flush.interval.messages", ((Object)BoxesRunTime.boxToInteger((int)1)).toString());
        config.customBrokerProperties().foreach((Function1)new Serializable($this, properties){
            public static final long serialVersionUID = 0L;
            private final Properties properties$1;

            public final Object apply(Tuple2<String, String> x0$1) {
                Tuple2<String, String> tuple2 = x0$1;
                if (tuple2 != null) {
                    String key = (String)tuple2._1();
                    String value = (String)tuple2._2();
                    Object object = this.properties$1.setProperty(key, value);
                    return object;
                }
                throw new MatchError(tuple2);
            }
            {
                this.properties$1 = properties$1;
            }
        });
        KafkaServer broker = new KafkaServer(new KafkaConfig(properties), KafkaServer$.MODULE$.$lessinit$greater$default$2());
        broker.startup();
        return broker;
    }

    public static Directory startKafka$default$2(EmbeddedKafkaSupport $this) {
        return Directory$.MODULE$.makeTemp("kafka", Directory$.MODULE$.makeTemp$default$2(), Directory$.MODULE$.makeTemp$default$3());
    }

    public static void $init$(EmbeddedKafkaSupport $this) {
        $this.net$manub$embeddedkafka$EmbeddedKafkaSupport$_setter_$executorService_$eq(Executors.newFixedThreadPool(2));
        $this.net$manub$embeddedkafka$EmbeddedKafkaSupport$_setter_$executionContext_$eq(ExecutionContext$.MODULE$.fromExecutorService($this.executorService()));
    }
}

