/*
 * Decompiled with CFR 0.152.
 */
package net.manub.embeddedkafka;

import java.net.InetSocketAddress;
import java.util.Collection;
import java.util.Iterator;
import java.util.Properties;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import kafka.admin.AdminUtils$;
import kafka.cluster.EndPoint;
import kafka.server.KafkaConfig;
import kafka.server.KafkaServer;
import kafka.server.KafkaServer$;
import kafka.utils.ZkUtils;
import kafka.utils.ZkUtils$;
import net.manub.embeddedkafka.EmbeddedKafkaConfig;
import net.manub.embeddedkafka.EmbeddedKafkaSupport;
import net.manub.embeddedkafka.EmbeddedKafkaSupport$;
import net.manub.embeddedkafka.KafkaUnavailableException;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.zookeeper.server.ServerCnxnFactory;
import org.apache.zookeeper.server.ZooKeeperServer;
import scala.Function0;
import scala.Function1;
import scala.Function2;
import scala.MatchError;
import scala.PartialFunction;
import scala.Predef;
import scala.Predef$;
import scala.Serializable;
import scala.StringContext;
import scala.Tuple2;
import scala.collection.JavaConversions$;
import scala.collection.JavaConverters$;
import scala.collection.Map;
import scala.collection.Seq;
import scala.collection.immutable.List;
import scala.collection.immutable.List$;
import scala.collection.immutable.MapLike;
import scala.collection.immutable.Nil$;
import scala.collection.mutable.ListBuffer;
import scala.collection.mutable.ListBuffer$;
import scala.concurrent.ExecutionContext$;
import scala.reflect.io.Directory;
import scala.reflect.io.Directory$;
import scala.runtime.BoxesRunTime;
import scala.util.Try;
import scala.util.Try$;

public abstract class EmbeddedKafkaSupport$class {
    public static Object withRunningKafka(EmbeddedKafkaSupport $this, Function0 body, EmbeddedKafkaConfig config) {
        return EmbeddedKafkaSupport$class.withRunningZooKeeper($this, config.zooKeeperPort(), (Function1)new Serializable($this, body, config){
            public static final long serialVersionUID = 0L;
            private final /* synthetic */ EmbeddedKafkaSupport $outer;
            public final Function0 body$2;
            public final EmbeddedKafkaConfig config$1;

            public final T apply(int zkPort) {
                return (T)EmbeddedKafkaSupport$class.net$manub$embeddedkafka$EmbeddedKafkaSupport$$withTempDir(this.$outer, "kafka", (Function1)new Serializable(this, zkPort){
                    public static final long serialVersionUID = 0L;
                    private final /* synthetic */ EmbeddedKafkaSupport$.anonfun.withRunningKafka.1 $outer;
                    private final int zkPort$1;

                    /*
                     * WARNING - void declaration
                     */
                    public final T apply(Directory kafkaLogsDir) {
                        Object object;
                        int x$4 = this.zkPort$1;
                        int x$5 = this.$outer.config$1.copy$default$1();
                        scala.collection.immutable.Map<String, String> x$6 = this.$outer.config$1.copy$default$3();
                        scala.collection.immutable.Map<String, String> x$7 = this.$outer.config$1.copy$default$4();
                        scala.collection.immutable.Map<String, String> x$8 = this.$outer.config$1.copy$default$5();
                        KafkaServer broker = this.$outer.net$manub$embeddedkafka$EmbeddedKafkaSupport$$anonfun$$$outer().startKafka(this.$outer.config$1.copy(x$5, x$4, x$6, x$7, x$8), kafkaLogsDir);
                        try {
                            object = this.$outer.body$2.apply();
                        }
                        catch (Throwable throwable) {
                            void var2_7;
                            var2_7.shutdown();
                            var2_7.awaitShutdown();
                            throw throwable;
                        }
                        broker.shutdown();
                        broker.awaitShutdown();
                        return (T)object;
                    }
                    {
                        if ($outer == null) {
                            throw null;
                        }
                        this.$outer = $outer;
                        this.zkPort$1 = zkPort$1;
                    }
                });
            }

            public /* synthetic */ EmbeddedKafkaSupport net$manub$embeddedkafka$EmbeddedKafkaSupport$$anonfun$$$outer() {
                return this.$outer;
            }
            {
                if ($outer == null) {
                    throw null;
                }
                this.$outer = $outer;
                this.body$2 = body$2;
                this.config$1 = config$1;
            }
        });
    }

    public static Object withRunningKafkaOnFoundPort(EmbeddedKafkaSupport $this, EmbeddedKafkaConfig config, Function1 body) {
        return EmbeddedKafkaSupport$class.withRunningZooKeeper($this, config.zooKeeperPort(), (Function1)new Serializable($this, config, body){
            public static final long serialVersionUID = 0L;
            private final /* synthetic */ EmbeddedKafkaSupport $outer;
            public final EmbeddedKafkaConfig config$2;
            public final Function1 body$3;

            public final T apply(int zkPort) {
                return (T)EmbeddedKafkaSupport$class.net$manub$embeddedkafka$EmbeddedKafkaSupport$$withTempDir(this.$outer, "kafka", (Function1)new Serializable(this, zkPort){
                    public static final long serialVersionUID = 0L;
                    private final /* synthetic */ EmbeddedKafkaSupport$.anonfun.withRunningKafkaOnFoundPort.1 $outer;
                    private final int zkPort$2;

                    /*
                     * WARNING - void declaration
                     */
                    public final T apply(Directory kafkaLogsDir) {
                        Object object;
                        int x$9 = this.zkPort$2;
                        int x$10 = this.$outer.config$2.copy$default$1();
                        scala.collection.immutable.Map<String, String> x$11 = this.$outer.config$2.copy$default$3();
                        scala.collection.immutable.Map<String, String> x$12 = this.$outer.config$2.copy$default$4();
                        scala.collection.immutable.Map<String, String> x$13 = this.$outer.config$2.copy$default$5();
                        KafkaServer broker = this.$outer.net$manub$embeddedkafka$EmbeddedKafkaSupport$$anonfun$$$outer().startKafka(this.$outer.config$2.copy(x$10, x$9, x$11, x$12, x$13), kafkaLogsDir);
                        int kafkaPort = broker.boundPort(((EndPoint)broker.config().listeners().head()).listenerName());
                        EmbeddedKafkaConfig actualConfig = this.$outer.config$2.copy(kafkaPort, this.zkPort$2, this.$outer.config$2.copy$default$3(), this.$outer.config$2.copy$default$4(), this.$outer.config$2.copy$default$5());
                        try {
                            object = this.$outer.body$3.apply((Object)actualConfig);
                        }
                        catch (Throwable throwable) {
                            void var2_7;
                            var2_7.shutdown();
                            var2_7.awaitShutdown();
                            throw throwable;
                        }
                        broker.shutdown();
                        broker.awaitShutdown();
                        return (T)object;
                    }
                    {
                        if ($outer == null) {
                            throw null;
                        }
                        this.$outer = $outer;
                        this.zkPort$2 = zkPort$2;
                    }
                });
            }

            public /* synthetic */ EmbeddedKafkaSupport net$manub$embeddedkafka$EmbeddedKafkaSupport$$anonfun$$$outer() {
                return this.$outer;
            }
            {
                if ($outer == null) {
                    throw null;
                }
                this.$outer = $outer;
                this.config$2 = config$2;
                this.body$3 = body$3;
            }
        });
    }

    private static Object withRunningZooKeeper(EmbeddedKafkaSupport $this, int port, Function1 body) {
        return EmbeddedKafkaSupport$class.net$manub$embeddedkafka$EmbeddedKafkaSupport$$withTempDir($this, "zookeeper-logs", (Function1)new Serializable($this, port, body){
            public static final long serialVersionUID = 0L;
            private final /* synthetic */ EmbeddedKafkaSupport $outer;
            private final int port$1;
            private final Function1 body$1;

            /*
             * WARNING - void declaration
             */
            public final Object apply(Directory zkLogsDir) {
                Object object;
                ServerCnxnFactory factory = this.$outer.startZooKeeper(this.port$1, zkLogsDir);
                try {
                    object = this.body$1.apply((Object)BoxesRunTime.boxToInteger((int)factory.getLocalPort()));
                }
                catch (Throwable throwable) {
                    void var2_2;
                    var2_2.shutdown();
                    throw throwable;
                }
                factory.shutdown();
                return object;
            }
            {
                if ($outer == null) {
                    throw null;
                }
                this.$outer = $outer;
                this.port$1 = port$1;
                this.body$1 = body$1;
            }
        });
    }

    /*
     * WARNING - void declaration
     */
    public static Object net$manub$embeddedkafka$EmbeddedKafkaSupport$$withTempDir(EmbeddedKafkaSupport $this, String prefix, Function1 body) {
        Object object;
        Directory dir = Directory$.MODULE$.makeTemp(prefix, Directory$.MODULE$.makeTemp$default$2(), Directory$.MODULE$.makeTemp$default$3());
        try {
            object = body.apply((Object)dir);
        }
        catch (Throwable throwable) {
            void var3_3;
            var3_3.deleteRecursively();
            throw throwable;
        }
        dir.deleteRecursively();
        return object;
    }

    public static void publishStringMessageToKafka(EmbeddedKafkaSupport $this, String topic, String message, EmbeddedKafkaConfig config) {
        $this.publishToKafka(topic, message, config, new StringSerializer());
    }

    public static void publishToKafka(EmbeddedKafkaSupport $this, String topic, Object message, EmbeddedKafkaConfig config, Serializer serializer) throws KafkaUnavailableException {
        EmbeddedKafkaSupport$class.publishToKafka($this, new KafkaProducer(JavaConversions$.MODULE$.mapAsJavaMap((Map)EmbeddedKafkaSupport$class.net$manub$embeddedkafka$EmbeddedKafkaSupport$$baseProducerConfig($this, config)), (Serializer)new StringSerializer(), serializer), new ProducerRecord(topic, message));
    }

    public static void publishToKafka(EmbeddedKafkaSupport $this, String topic, Object key, Object message, EmbeddedKafkaConfig config, Serializer keySerializer, Serializer serializer) throws KafkaUnavailableException {
        EmbeddedKafkaSupport$class.publishToKafka($this, new KafkaProducer(JavaConversions$.MODULE$.mapAsJavaMap((Map)EmbeddedKafkaSupport$class.net$manub$embeddedkafka$EmbeddedKafkaSupport$$baseProducerConfig($this, config)), keySerializer, serializer), new ProducerRecord(topic, key, message));
    }

    private static void publishToKafka(EmbeddedKafkaSupport $this, KafkaProducer kafkaProducer, ProducerRecord record) {
        Future sendFuture = kafkaProducer.send(record);
        Try sendResult = Try$.MODULE$.apply((Function0)new Serializable($this, sendFuture){
            public static final long serialVersionUID = 0L;
            private final Future sendFuture$1;

            public final RecordMetadata apply() {
                return (RecordMetadata)this.sendFuture$1.get(10L, TimeUnit.SECONDS);
            }
            {
                this.sendFuture$1 = sendFuture$1;
            }
        });
        kafkaProducer.close();
        if (sendResult.isFailure()) {
            throw new KafkaUnavailableException((Throwable)sendResult.failed().get());
        }
    }

    public static scala.collection.immutable.Map net$manub$embeddedkafka$EmbeddedKafkaSupport$$baseProducerConfig(EmbeddedKafkaSupport $this, EmbeddedKafkaConfig config) {
        return ((MapLike)Predef$.MODULE$.Map().apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)"bootstrap.servers"), (Object)new StringContext((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"localhost:", ""})).s((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{BoxesRunTime.boxToInteger((int)config.kafkaPort())}))), Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)"max.block.ms"), (Object)((Object)BoxesRunTime.boxToInteger((int)10000)).toString()), Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)"retry.backoff.ms"), (Object)((Object)BoxesRunTime.boxToInteger((int)1000)).toString())}))).$plus$plus(config.customProducerProperties());
    }

    /*
     * WARNING - void declaration
     */
    private static Properties baseConsumerConfig(EmbeddedKafkaSupport $this, EmbeddedKafkaConfig config) {
        void var2_2;
        Properties props = new Properties();
        props.put("group.id", new StringContext((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"embedded-kafka-spec"})).s((Seq)Nil$.MODULE$));
        props.put("bootstrap.servers", new StringContext((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"localhost:", ""})).s((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{BoxesRunTime.boxToInteger((int)config.kafkaPort())})));
        props.put("auto.offset.reset", "earliest");
        props.put("enable.auto.commit", "false");
        props.putAll((java.util.Map<?, ?>)JavaConversions$.MODULE$.mapAsJavaMap(config.customConsumerProperties()));
        return var2_2;
    }

    public static String consumeFirstStringMessageFrom(EmbeddedKafkaSupport $this, String topic, boolean autoCommit, EmbeddedKafkaConfig config) {
        return (String)$this.consumeFirstMessageFrom(topic, autoCommit, config, new StringDeserializer());
    }

    public static boolean consumeFirstStringMessageFrom$default$2(EmbeddedKafkaSupport $this) {
        return false;
    }

    public static List consumeNumberStringMessagesFrom(EmbeddedKafkaSupport $this, String topic, int number, boolean autoCommit, EmbeddedKafkaConfig config) {
        return $this.consumeNumberMessagesFrom(topic, number, autoCommit, config, new StringDeserializer());
    }

    public static boolean consumeNumberStringMessagesFrom$default$3(EmbeddedKafkaSupport $this) {
        return false;
    }

    public static Object consumeFirstMessageFrom(EmbeddedKafkaSupport $this, String topic, boolean autoCommit, EmbeddedKafkaConfig config, Deserializer deserializer) throws TimeoutException, KafkaUnavailableException {
        Properties props = EmbeddedKafkaSupport$class.baseConsumerConfig($this, config);
        props.put("enable.auto.commit", ((Object)BoxesRunTime.boxToBoolean((boolean)autoCommit)).toString());
        KafkaConsumer consumer = new KafkaConsumer(props, (Deserializer)new StringDeserializer(), deserializer);
        Try message = Try$.MODULE$.apply((Function0)new Serializable($this, consumer, topic){
            public static final long serialVersionUID = 0L;
            private final KafkaConsumer consumer$1;
            private final String topic$1;

            public final T apply() {
                this.consumer$1.subscribe((Collection)JavaConversions$.MODULE$.seqAsJavaList((Seq)List$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{this.topic$1}))));
                this.consumer$1.partitionsFor(this.topic$1);
                ConsumerRecords records = this.consumer$1.poll(5000L);
                if (records.isEmpty()) {
                    throw new TimeoutException("Unable to retrieve a message from Kafka in 5000ms");
                }
                ConsumerRecord record = (ConsumerRecord)records.iterator().next();
                TopicPartition tp = new TopicPartition(record.topic(), record.partition());
                OffsetAndMetadata om = new OffsetAndMetadata(record.offset() + 1L);
                this.consumer$1.commitSync(JavaConversions$.MODULE$.mapAsJavaMap((Map)Predef$.MODULE$.Map().apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)tp), (Object)om)}))));
                return (T)record.value();
            }
            {
                this.consumer$1 = consumer$1;
                this.topic$1 = topic$1;
            }
        });
        consumer.close();
        return message.recover((PartialFunction)new Serializable($this){
            public static final long serialVersionUID = 0L;

            public final <A1 extends Throwable, B1> B1 applyOrElse(A1 x1, Function1<A1, B1> function1) {
                A1 A1 = x1;
                if (A1 instanceof KafkaException) {
                    KafkaException kafkaException = (KafkaException)A1;
                    throw new KafkaUnavailableException((Throwable)kafkaException);
                }
                Object object = function1.apply(x1);
                return (B1)object;
            }

            public final boolean isDefinedAt(Throwable x1) {
                Throwable throwable = x1;
                boolean bl = throwable instanceof KafkaException;
                return bl;
            }
        }).get();
    }

    public static boolean consumeFirstMessageFrom$default$2(EmbeddedKafkaSupport $this) {
        return false;
    }

    public static List consumeNumberMessagesFrom(EmbeddedKafkaSupport $this, String topic, int number, boolean autoCommit, EmbeddedKafkaConfig config, Deserializer deserializer) {
        Properties props = EmbeddedKafkaSupport$class.baseConsumerConfig($this, config);
        props.put("enable.auto.commit", ((Object)BoxesRunTime.boxToBoolean((boolean)autoCommit)).toString());
        KafkaConsumer consumer = new KafkaConsumer(props, (Deserializer)new StringDeserializer(), deserializer);
        Try messages = Try$.MODULE$.apply((Function0)new Serializable($this, consumer, topic, number){
            public static final long serialVersionUID = 0L;
            private final KafkaConsumer consumer$2;
            private final String topic$2;
            private final int number$1;

            public final List<T> apply() {
                ListBuffer messagesBuffer = (ListBuffer)ListBuffer$.MODULE$.empty();
                int messagesRead = 0;
                this.consumer$2.subscribe((Collection)JavaConverters$.MODULE$.seqAsJavaListConverter((Seq)List$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{this.topic$2}))).asJava());
                this.consumer$2.partitionsFor(this.topic$2);
                while (messagesRead < this.number$1) {
                    ConsumerRecords records = this.consumer$2.poll(5000L);
                    if (records.isEmpty()) {
                        throw new TimeoutException("Unable to retrieve a message from Kafka in 5000ms");
                    }
                    Iterator recordIter = records.iterator();
                    while (recordIter.hasNext() && messagesRead < this.number$1) {
                        ConsumerRecord record = (ConsumerRecord)recordIter.next();
                        messagesBuffer.$plus$eq(record.value());
                        TopicPartition tp = new TopicPartition(record.topic(), record.partition());
                        OffsetAndMetadata om = new OffsetAndMetadata(record.offset() + 1L);
                        this.consumer$2.commitSync((java.util.Map)JavaConverters$.MODULE$.mapAsJavaMapConverter((Map)Predef$.MODULE$.Map().apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)tp), (Object)om)}))).asJava());
                        ++messagesRead;
                    }
                }
                return messagesBuffer.toList();
            }
            {
                this.consumer$2 = consumer$2;
                this.topic$2 = topic$2;
                this.number$1 = number$1;
            }
        });
        consumer.close();
        return (List)messages.recover((PartialFunction)new Serializable($this){
            public static final long serialVersionUID = 0L;

            public final <A1 extends Throwable, B1> B1 applyOrElse(A1 x2, Function1<A1, B1> function1) {
                A1 A1 = x2;
                if (A1 instanceof KafkaException) {
                    KafkaException kafkaException = (KafkaException)A1;
                    throw new KafkaUnavailableException((Throwable)kafkaException);
                }
                Object object = function1.apply(x2);
                return (B1)object;
            }

            public final boolean isDefinedAt(Throwable x2) {
                Throwable throwable = x2;
                boolean bl = throwable instanceof KafkaException;
                return bl;
            }
        }).get();
    }

    public static boolean consumeNumberMessagesFrom$default$3(EmbeddedKafkaSupport $this) {
        return false;
    }

    public static ServerCnxnFactory startZooKeeper(EmbeddedKafkaSupport $this, int zooKeeperPort, Directory zkLogsDir) {
        int tickTime = 2000;
        ZooKeeperServer zkServer = new ZooKeeperServer(zkLogsDir.toFile().jfile(), zkLogsDir.toFile().jfile(), tickTime);
        ServerCnxnFactory factory = ServerCnxnFactory.createFactory();
        factory.configure(new InetSocketAddress("0.0.0.0", zooKeeperPort), 1024);
        factory.startup(zkServer);
        return factory;
    }

    public static KafkaServer startKafka(EmbeddedKafkaSupport $this, EmbeddedKafkaConfig config, Directory kafkaLogDir) {
        String zkAddress = new StringContext((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"localhost:", ""})).s((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{BoxesRunTime.boxToInteger((int)config.zooKeeperPort())}));
        Properties properties = new Properties();
        properties.setProperty("zookeeper.connect", zkAddress);
        properties.setProperty("broker.id", "0");
        properties.setProperty("host.name", "localhost");
        properties.setProperty("advertised.host.name", "localhost");
        properties.setProperty("auto.create.topics.enable", "true");
        properties.setProperty("port", ((Object)BoxesRunTime.boxToInteger((int)config.kafkaPort())).toString());
        properties.setProperty("log.dir", kafkaLogDir.toAbsolute().path());
        properties.setProperty("log.flush.interval.messages", ((Object)BoxesRunTime.boxToInteger((int)1)).toString());
        properties.setProperty("log.cleaner.dedupe.buffer.size", "1048577");
        config.customBrokerProperties().foreach((Function1)new Serializable($this, properties){
            public static final long serialVersionUID = 0L;
            private final Properties properties$1;

            public final Object apply(Tuple2<String, String> x0$1) {
                Tuple2<String, String> tuple2 = x0$1;
                if (tuple2 != null) {
                    String key = (String)tuple2._1();
                    String value = (String)tuple2._2();
                    Object object = this.properties$1.setProperty(key, value);
                    return object;
                }
                throw new MatchError(tuple2);
            }
            {
                this.properties$1 = properties$1;
            }
        });
        KafkaServer broker = new KafkaServer(new KafkaConfig((java.util.Map)properties), KafkaServer$.MODULE$.$lessinit$greater$default$2(), KafkaServer$.MODULE$.$lessinit$greater$default$3(), KafkaServer$.MODULE$.$lessinit$greater$default$4());
        broker.startup();
        return broker;
    }

    /*
     * WARNING - void declaration
     */
    public static void createCustomTopic(EmbeddedKafkaSupport $this, String topic, scala.collection.immutable.Map topicConfig, int partitions, int replicationFactor, EmbeddedKafkaConfig config) {
        ZkUtils zkUtils = ZkUtils$.MODULE$.apply(new StringContext((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"localhost:", ""})).s((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{BoxesRunTime.boxToInteger((int)config.zooKeeperPort())})), $this.zkSessionTimeoutMs(), $this.zkConnectionTimeoutMs(), $this.zkSecurityEnabled());
        Properties topicProperties = (Properties)topicConfig.foldLeft((Object)new Properties(), (Function2)new Serializable($this){
            public static final long serialVersionUID = 0L;

            public final Properties apply(Properties x0$2, Tuple2<String, String> x1$1) {
                Tuple2 tuple2 = new Tuple2((Object)x0$2, x1$1);
                if (tuple2 != null) {
                    Properties props = (Properties)tuple2._1();
                    Tuple2 tuple22 = (Tuple2)tuple2._2();
                    if (tuple22 != null) {
                        String k = (String)tuple22._1();
                        String v = (String)tuple22._2();
                        props.put(k, v);
                        Properties properties = props;
                        return properties;
                    }
                }
                throw new MatchError((Object)tuple2);
            }
        });
        try {
            AdminUtils$.MODULE$.createTopic(zkUtils, topic, partitions, replicationFactor, topicProperties, AdminUtils$.MODULE$.createTopic$default$6());
        }
        catch (Throwable throwable) {
            void var6_6;
            var6_6.close();
            throw throwable;
        }
        zkUtils.close();
    }

    public static scala.collection.immutable.Map createCustomTopic$default$2(EmbeddedKafkaSupport $this) {
        return Predef$.MODULE$.Map().empty();
    }

    public static int createCustomTopic$default$3(EmbeddedKafkaSupport $this) {
        return 1;
    }

    public static int createCustomTopic$default$4(EmbeddedKafkaSupport $this) {
        return 1;
    }

    public static void $init$(EmbeddedKafkaSupport $this) {
        $this.net$manub$embeddedkafka$EmbeddedKafkaSupport$_setter_$net$manub$embeddedkafka$EmbeddedKafkaSupport$$executorService_$eq(Executors.newFixedThreadPool(2));
        $this.net$manub$embeddedkafka$EmbeddedKafkaSupport$_setter_$net$manub$embeddedkafka$EmbeddedKafkaSupport$$executionContext_$eq(ExecutionContext$.MODULE$.fromExecutorService($this.net$manub$embeddedkafka$EmbeddedKafkaSupport$$executorService()));
        $this.net$manub$embeddedkafka$EmbeddedKafkaSupport$_setter_$zkSessionTimeoutMs_$eq(10000);
        $this.net$manub$embeddedkafka$EmbeddedKafkaSupport$_setter_$zkConnectionTimeoutMs_$eq(10000);
        $this.net$manub$embeddedkafka$EmbeddedKafkaSupport$_setter_$zkSecurityEnabled_$eq(false);
    }
}

