/*
 * Decompiled with CFR 0.152.
 */
package kafka.consumer;

import java.util.Collections;
import java.util.Properties;
import java.util.Set;
import kafka.common.MessageStreamsExistException;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.ConsumerThreadId;
import kafka.consumer.ConsumerTimeoutException;
import kafka.consumer.KafkaStream;
import kafka.consumer.PartitionTopicInfo;
import kafka.consumer.ZookeeperConsumerConnector;
import kafka.consumer.ZookeeperConsumerConnectorTest$;
import kafka.consumer.ZookeeperConsumerConnectorTest$$anonfun$testConsumerDecoder$1$;
import kafka.integration.KafkaServerTestHarness;
import kafka.javaapi.consumer.ConsumerRebalanceListener;
import kafka.message.CompressionCodec;
import kafka.message.DefaultCompressionCodec$;
import kafka.message.GZIPCompressionCodec$;
import kafka.message.NoCompressionCodec$;
import kafka.serializer.Decoder;
import kafka.serializer.StringDecoder;
import kafka.serializer.StringDecoder$;
import kafka.server.KafkaConfig;
import kafka.server.KafkaConfig$;
import kafka.server.KafkaRequestHandler;
import kafka.server.KafkaServer;
import kafka.utils.Pool;
import kafka.utils.TestUtils$;
import kafka.utils.ZKGroupTopicDirs;
import kafka.utils.ZkUtils;
import kafka.utils.ZkUtils$;
import org.apache.log4j.Level;
import org.apache.log4j.Logger;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.scalactic.source.Position;
import scala.Function0;
import scala.Function1;
import scala.Predef;
import scala.Predef$;
import scala.Serializable;
import scala.Tuple2;
import scala.collection.Iterable;
import scala.collection.Iterable$;
import scala.collection.IterableLike;
import scala.collection.JavaConversions$;
import scala.collection.Map;
import scala.collection.Map$;
import scala.collection.Seq;
import scala.collection.Seq$;
import scala.collection.Set$;
import scala.collection.TraversableLike;
import scala.collection.TraversableOnce;
import scala.collection.immutable.List;
import scala.collection.immutable.List$;
import scala.collection.immutable.Nil$;
import scala.collection.mutable.HashMap;
import scala.collection.mutable.StringBuilder;
import scala.math.Ordering;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxesRunTime;
import scala.runtime.ObjectRef;
import scala.runtime.RichInt$;

@ScalaSignature(bytes="\u0006\u0001\tUb\u0001B\u0001\u0003\u0001\u001d\u0011aDW8pW\u0016,\u0007/\u001a:D_:\u001cX/\\3s\u0007>tg.Z2u_J$Vm\u001d;\u000b\u0005\r!\u0011\u0001C2p]N,X.\u001a:\u000b\u0003\u0015\tQa[1gW\u0006\u001c\u0001aE\u0002\u0001\u00119\u0001\"!\u0003\u0007\u000e\u0003)Q!a\u0003\u0003\u0002\u0017%tG/Z4sCRLwN\\\u0005\u0003\u001b)\u0011acS1gW\u0006\u001cVM\u001d<feR+7\u000f\u001e%be:,7o\u001d\t\u0003\u001fIi\u0011\u0001\u0005\u0006\u0003#\u0011\tQ!\u001e;jYNL!a\u0005\t\u0003\u000f1{wmZ5oO\")Q\u0003\u0001C\u0001-\u00051A(\u001b8jiz\"\u0012a\u0006\t\u00031\u0001i\u0011A\u0001\u0005\b5\u0001\u0011\r\u0011\"\u0001\u001c\u0003I\u0011VMY1mC:\u001cWMQ1dW>4g-T:\u0016\u0003q\u0001\"!\b\u0011\u000e\u0003yQ\u0011aH\u0001\u0006g\u000e\fG.Y\u0005\u0003Cy\u00111!\u00138u\u0011\u0019\u0019\u0003\u0001)A\u00059\u0005\u0019\"+\u001a2bY\u0006t7-\u001a\"bG.|gMZ'tA!9Q\u0005\u0001a\u0001\n\u00031\u0013\u0001\u00023jeN,\u0012a\n\t\u0003\u001f!J!!\u000b\t\u0003!i[uI]8vaR{\u0007/[2ESJ\u001c\bbB\u0016\u0001\u0001\u0004%\t\u0001L\u0001\tI&\u00148o\u0018\u0013fcR\u0011Q\u0006\r\t\u0003;9J!a\f\u0010\u0003\tUs\u0017\u000e\u001e\u0005\bc)\n\t\u00111\u0001(\u0003\rAH%\r\u0005\u0007g\u0001\u0001\u000b\u0015B\u0014\u0002\u000b\u0011L'o\u001d\u0011\t\u000fU\u0002!\u0019!C\u00017\u0005Aa.^7O_\u0012,7\u000f\u0003\u00048\u0001\u0001\u0006I\u0001H\u0001\n]Vlgj\u001c3fg\u0002Bq!\u000f\u0001C\u0002\u0013\u00051$\u0001\u0005ok6\u0004\u0016M\u001d;t\u0011\u0019Y\u0004\u0001)A\u00059\u0005Ia.^7QCJ$8\u000f\t\u0005\b{\u0001\u0011\r\u0011\"\u0001?\u0003\u0015!x\u000e]5d+\u0005y\u0004C\u0001!F\u001b\u0005\t%B\u0001\"D\u0003\u0011a\u0017M\\4\u000b\u0003\u0011\u000bAA[1wC&\u0011a)\u0011\u0002\u0007'R\u0014\u0018N\\4\t\r!\u0003\u0001\u0015!\u0003@\u0003\u0019!x\u000e]5dA!9!\n\u0001b\u0001\n\u0003Y\u0015aD8wKJ\u0014\u0018\u000eZ5oOB\u0013x\u000e]:\u0016\u00031\u0003\"!\u0014)\u000e\u00039S!aT\"\u0002\tU$\u0018\u000e\\\u0005\u0003#:\u0013!\u0002\u0015:pa\u0016\u0014H/[3t\u0011\u0019\u0019\u0006\u0001)A\u0005\u0019\u0006\u0001rN^3se&$\u0017N\\4Qe>\u00048\u000f\t\u0005\u0006+\u0002!\tEV\u0001\u0010O\u0016tWM]1uK\u000e{gNZ5hgR\tq\u000bE\u0002YA\u000et!!\u00170\u000f\u0005ikV\"A.\u000b\u0005q3\u0011A\u0002\u001fs_>$h(C\u0001 \u0013\tyf$A\u0004qC\u000e\\\u0017mZ3\n\u0005\u0005\u0014'aA*fc*\u0011qL\b\t\u0003I\u001el\u0011!\u001a\u0006\u0003M\u0012\taa]3sm\u0016\u0014\u0018B\u00015f\u0005-Y\u0015MZ6b\u0007>tg-[4\t\u000f)\u0004!\u0019!C\u0001}\u0005)qM]8va\"1A\u000e\u0001Q\u0001\n}\naa\u001a:pkB\u0004\u0003b\u00028\u0001\u0005\u0004%\tAP\u0001\nG>t7/^7feBBa\u0001\u001d\u0001!\u0002\u0013y\u0014AC2p]N,X.\u001a:1A!9!\u000f\u0001b\u0001\n\u0003q\u0014!C2p]N,X.\u001a:2\u0011\u0019!\b\u0001)A\u0005\u007f\u0005Q1m\u001c8tk6,'/\r\u0011\t\u000fY\u0004!\u0019!C\u0001}\u0005I1m\u001c8tk6,'O\r\u0005\u0007q\u0002\u0001\u000b\u0011B \u0002\u0015\r|gn];nKJ\u0014\u0004\u0005C\u0004{\u0001\t\u0007I\u0011\u0001 \u0002\u0013\r|gn];nKJ\u001c\u0004B\u0002?\u0001A\u0003%q(\u0001\u0006d_:\u001cX/\\3sg\u0001BqA \u0001C\u0002\u0013\u00051$A\u0005o\u001b\u0016\u001c8/Y4fg\"9\u0011\u0011\u0001\u0001!\u0002\u0013a\u0012A\u00038NKN\u001c\u0018mZ3tA!9\u0011Q\u0001\u0001\u0005B\u0005\u001d\u0011!B:fiV\u0003H#A\u0017)\t\u0005\r\u00111\u0002\t\u0005\u0003\u001b\t9\"\u0004\u0002\u0002\u0010)!\u0011\u0011CA\n\u0003\u0015QWO\\5u\u0015\t\t)\"A\u0002pe\u001eLA!!\u0007\u0002\u0010\t1!)\u001a4pe\u0016Dq!!\b\u0001\t\u0003\n9!\u0001\u0005uK\u0006\u0014Hi\\<oQ\u0011\tY\"!\t\u0011\t\u00055\u00111E\u0005\u0005\u0003K\tyAA\u0003BMR,'\u000fC\u0004\u0002*\u0001!\t!a\u0002\u0002\u0013Q,7\u000f\u001e\"bg&\u001c\u0007\u0006BA\u0014\u0003[\u0001B!!\u0004\u00020%!\u0011\u0011GA\b\u0005\u0011!Vm\u001d;\t\u000f\u0005U\u0002\u0001\"\u0001\u0002\b\u0005yA/Z:u\u0007>l\u0007O]3tg&|g\u000e\u000b\u0003\u00024\u00055\u0002bBA\u001e\u0001\u0011\u0005\u0011qA\u0001\u001ei\u0016\u001cHoQ8naJ,7o]5p]N+GoQ8ogVl\u0007\u000f^5p]\"\"\u0011\u0011HA\u0017\u0011\u001d\t\t\u0005\u0001C\u0001\u0003\u000f\t1\u0003^3ti\u000e{gn];nKJ$UmY8eKJDC!a\u0010\u0002.!9\u0011q\t\u0001\u0005\u0002\u0005\u001d\u0011a\b;fgRdU-\u00193feN+G.Z2uS>tgi\u001c:QCJ$\u0018\u000e^5p]\"\"\u0011QIA\u0017\u0011\u001d\ti\u0005\u0001C\u0001\u0003\u000f\tQ\u0004^3ti\u000e{gn];nKJ\u0014VMY1mC:\u001cW\rT5ti\u0016tWM\u001d\u0015\u0005\u0003\u0017\ni\u0003C\u0004\u0002T\u0001!\t!!\u0016\u0002'\u001d,GOW&DQ&dGM]3o-\u0006dW/Z:\u0015\t\u0005]\u00131\u000f\t\u0007\u00033\ny&!\u0019\u000e\u0005\u0005m#bAA/=\u0005Q1m\u001c7mK\u000e$\u0018n\u001c8\n\u0007\u0005\fY\u0006E\u0004\u001e\u0003G\n9'a\u001a\n\u0007\u0005\u0015dD\u0001\u0004UkBdWM\r\t\u0005\u0003S\nyGD\u0002\u001e\u0003WJ1!!\u001c\u001f\u0003\u0019\u0001&/\u001a3fM&\u0019a)!\u001d\u000b\u0007\u00055d\u0004\u0003\u0005\u0002v\u0005E\u0003\u0019AA4\u0003\u0011\u0001\u0018\r\u001e5\u0007\r\u0005e\u0004\u0001BA>\u0005u!Vm\u001d;D_:\u001cX/\\3s%\u0016\u0014\u0017\r\\1oG\u0016d\u0015n\u001d;f]\u0016\u00148CBA<\u0003{\n\u0019\tE\u0002A\u0003\u007fJ1!!!B\u0005\u0019y%M[3diB!\u0011QQAG\u001b\t\t9IC\u0002\u0004\u0003\u0013S1!a#\u0005\u0003\u001dQ\u0017M^1ba&LA!a$\u0002\b\nI2i\u001c8tk6,'OU3cC2\fgnY3MSN$XM\\3s\u0011\u001d)\u0012q\u000fC\u0001\u0003'#\"!!&\u0011\t\u0005]\u0015qO\u0007\u0002\u0001!Q\u00111TA<\u0001\u0004%\t!!(\u0002?\t,gm\u001c:f%\u0016dW-Y:j]\u001e\u0004\u0016M\u001d;ji&|gn]\"bY2,G-\u0006\u0002\u0002 B\u0019Q$!)\n\u0007\u0005\rfDA\u0004C_>dW-\u00198\t\u0015\u0005\u001d\u0016q\u000fa\u0001\n\u0003\tI+A\u0012cK\u001a|'/\u001a*fY\u0016\f7/\u001b8h!\u0006\u0014H/\u001b;j_:\u001c8)\u00197mK\u0012|F%Z9\u0015\u00075\nY\u000bC\u00052\u0003K\u000b\t\u00111\u0001\u0002 \"I\u0011qVA<A\u0003&\u0011qT\u0001!E\u00164wN]3SK2,\u0017m]5oOB\u000b'\u000f^5uS>t7oQ1mY\u0016$\u0007\u0005\u0003\u0006\u00024\u0006]\u0004\u0019!C\u0001\u0003;\u000bADY3g_J,7\u000b^1si&twMR3uG\",'o]\"bY2,G\r\u0003\u0006\u00028\u0006]\u0004\u0019!C\u0001\u0003s\u000b\u0001EY3g_J,7\u000b^1si&twMR3uG\",'o]\"bY2,Gm\u0018\u0013fcR\u0019Q&a/\t\u0013E\n),!AA\u0002\u0005}\u0005\"CA`\u0003o\u0002\u000b\u0015BAP\u0003u\u0011WMZ8sKN#\u0018M\u001d;j]\u001e4U\r^2iKJ\u001c8)\u00197mK\u0012\u0004\u0003BCAb\u0003o\u0002\r\u0011\"\u0001\u0002F\u0006Q1m\u001c8tk6,'/\u00133\u0016\u0005\u0005\u001d\u0004BCAe\u0003o\u0002\r\u0011\"\u0001\u0002L\u0006q1m\u001c8tk6,'/\u00133`I\u0015\fHcA\u0017\u0002N\"I\u0011'a2\u0002\u0002\u0003\u0007\u0011q\r\u0005\n\u0003#\f9\b)Q\u0005\u0003O\n1bY8ogVlWM]%eA!Q\u0011Q[A<\u0001\u0004%\t!a6\u0002%A\f'\u000f^5uS>twj\u001e8feND\u0017\u000e]\u000b\u0003\u00033\u0004r!TAn\u0003O\ny.C\u0002\u0002^:\u00131!T1q!\u0015i\u0015\u0011]As\u0013\r\t\u0019O\u0014\u0002\u0004'\u0016$\bc\u0001!\u0002h&\u0019\u0011\u0011^!\u0003\u000f%sG/Z4fe\"Q\u0011Q^A<\u0001\u0004%\t!a<\u0002-A\f'\u000f^5uS>twj\u001e8feND\u0017\u000e]0%KF$2!LAy\u0011%\t\u00141^A\u0001\u0002\u0004\tI\u000eC\u0005\u0002v\u0006]\u0004\u0015)\u0003\u0002Z\u0006\u0019\u0002/\u0019:uSRLwN\\(x]\u0016\u00148\u000f[5qA!Q\u0011\u0011`A<\u0001\u0004%\t!a?\u00021\u001ddwNY1m!\u0006\u0014H/\u001b;j_:|uO\\3sg\"L\u0007/\u0006\u0002\u0002~B9Q*a7\u0002h\u0005}\bcB'\u0002\\\u0006\u0015(\u0011\u0001\t\u00041\t\r\u0011b\u0001B\u0003\u0005\t\u00012i\u001c8tk6,'\u000f\u00165sK\u0006$\u0017\n\u001a\u0005\u000b\u0005\u0013\t9\b1A\u0005\u0002\t-\u0011\u0001H4m_\n\fG\u000eU1si&$\u0018n\u001c8Po:,'o\u001d5ja~#S-\u001d\u000b\u0004[\t5\u0001\"C\u0019\u0003\b\u0005\u0005\t\u0019AA\u007f\u0011%\u0011\t\"a\u001e!B\u0013\ti0A\rhY>\u0014\u0017\r\u001c)beRLG/[8o\u001f^tWM]:iSB\u0004\u0003\u0002\u0003B\u000b\u0003o\"\tEa\u0006\u00023\t,gm\u001c:f%\u0016dW-Y:j]\u001e\u0004\u0016M\u001d;ji&|gn\u001d\u000b\u0004[\te\u0001\u0002CAk\u0005'\u0001\r!!7\t\u0011\tu\u0011q\u000fC!\u0005?\taCY3g_J,7\u000b^1si&twMR3uG\",'o\u001d\u000b\u0006[\t\u0005\"1\u0005\u0005\t\u0003\u0007\u0014Y\u00021\u0001\u0002h!A\u0011\u0011 B\u000e\u0001\u0004\ti\u0010K\u0004\u0001\u0005O\u0011iC!\r\u0011\u0007u\u0011I#C\u0002\u0003,y\u0011!\u0002Z3qe\u0016\u001c\u0017\r^3eC\t\u0011y#\u0001%UQ&\u001c\b\u0005^3ti\u0002B\u0017m\u001d\u0011cK\u0016t\u0007\u0005Z3qe\u0016\u001c\u0017\r^3eA\u0005tG\rI5uA]LG\u000e\u001c\u0011cK\u0002\u0012X-\\8wK\u0012\u0004\u0013N\u001c\u0011bA\u0019,H/\u001e:fAI,G.Z1tK\u0006\u0012!1G\u0001\ta9\n\u0004G\f\u0019/a\u0001")
public class ZookeeperConsumerConnectorTest
extends KafkaServerTestHarness {
    private final int RebalanceBackoffMs;
    private ZKGroupTopicDirs dirs = null;
    private final int numNodes;
    private final int numParts;
    private final String topic;
    private final Properties overridingProps = new Properties();
    private final String group;
    private final String consumer0;
    private final String consumer1;
    private final String consumer2;
    private final String consumer3;
    private final int nMessages;

    public int RebalanceBackoffMs() {
        return this.RebalanceBackoffMs;
    }

    public ZKGroupTopicDirs dirs() {
        return this.dirs;
    }

    public void dirs_$eq(ZKGroupTopicDirs x$1) {
        this.dirs = x$1;
    }

    public int numNodes() {
        return this.numNodes;
    }

    public int numParts() {
        return this.numParts;
    }

    public String topic() {
        return this.topic;
    }

    public Properties overridingProps() {
        return this.overridingProps;
    }

    @Override
    public Seq<KafkaConfig> generateConfigs() {
        return (Seq)TestUtils$.MODULE$.createBrokerConfigs(this.numNodes(), this.zkConnect(), TestUtils$.MODULE$.createBrokerConfigs$default$3(), TestUtils$.MODULE$.createBrokerConfigs$default$4(), TestUtils$.MODULE$.createBrokerConfigs$default$5(), TestUtils$.MODULE$.createBrokerConfigs$default$6(), TestUtils$.MODULE$.createBrokerConfigs$default$7(), TestUtils$.MODULE$.createBrokerConfigs$default$8(), TestUtils$.MODULE$.createBrokerConfigs$default$9(), TestUtils$.MODULE$.createBrokerConfigs$default$10(), TestUtils$.MODULE$.createBrokerConfigs$default$11(), TestUtils$.MODULE$.createBrokerConfigs$default$12()).map((Function1)new Serializable(this){
            public static final long serialVersionUID = 0L;
            private final /* synthetic */ ZookeeperConsumerConnectorTest $outer;

            public final KafkaConfig apply(Properties x$1) {
                return KafkaConfig$.MODULE$.fromProps(x$1, this.$outer.overridingProps());
            }
            {
                if ($outer == null) {
                    throw null;
                }
                this.$outer = $outer;
            }
        }, Seq$.MODULE$.canBuildFrom());
    }

    public String group() {
        return this.group;
    }

    public String consumer0() {
        return this.consumer0;
    }

    public String consumer1() {
        return this.consumer1;
    }

    public String consumer2() {
        return this.consumer2;
    }

    public String consumer3() {
        return this.consumer3;
    }

    public int nMessages() {
        return this.nMessages;
    }

    @Override
    @Before
    public void setUp() {
        super.setUp();
        this.dirs_$eq(new ZKGroupTopicDirs(this.group(), this.topic()));
    }

    @Override
    @After
    public void tearDown() {
        super.tearDown();
    }

    @Test
    public void testBasic() {
        Logger requestHandlerLogger = Logger.getLogger(KafkaRequestHandler.class);
        requestHandlerLogger.setLevel(Level.FATAL);
        ConsumerConfig consumerConfig0 = new ConsumerConfig(this){
            private final int consumerTimeoutMs;

            public int consumerTimeoutMs() {
                return this.consumerTimeoutMs;
            }
            {
                this.consumerTimeoutMs = 200;
            }
        };
        ZookeeperConsumerConnector zkConsumerConnector0 = new ZookeeperConsumerConnector(consumerConfig0, true);
        Map topicMessageStreams0 = zkConsumerConnector0.createMessageStreams((Map)Map$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)this.topic()), (Object)BoxesRunTime.boxToInteger((int)1))})), (Decoder)new StringDecoder(StringDecoder$.MODULE$.$lessinit$greater$default$1()), (Decoder)new StringDecoder(StringDecoder$.MODULE$.$lessinit$greater$default$1()));
        RichInt$.MODULE$.until$extension0(Predef$.MODULE$.intWrapper(0), 2).foreach$mVc$sp((Function1)new Serializable(this, topicMessageStreams0){
            public static final long serialVersionUID = 0L;
            private final /* synthetic */ ZookeeperConsumerConnectorTest $outer;
            private final Map topicMessageStreams0$1;

            public final void apply(int _) {
                this.apply$mcVI$sp(_);
            }

            public void apply$mcVI$sp(int _) {
                try {
                    TestUtils$.MODULE$.getMessages((Map<String, List<KafkaStream<String, String>>>)this.topicMessageStreams0$1, this.$outer.nMessages() * 2);
                    throw this.$outer.fail("should get an exception", new Position("ZookeeperConsumerConnectorTest.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 87));
                }
                catch (ConsumerTimeoutException consumerTimeoutException) {
                    return;
                }
            }
            {
                if ($outer == null) {
                    throw null;
                }
                this.$outer = $outer;
                this.topicMessageStreams0$1 = topicMessageStreams0$1;
            }
        });
        zkConsumerConnector0.shutdown();
        List sentMessages1 = (List)TestUtils$.MODULE$.sendMessages((Seq<KafkaServer>)this.servers(), this.topic(), this.nMessages(), 0, TestUtils$.MODULE$.sendMessages$default$5()).$plus$plus(TestUtils$.MODULE$.sendMessages((Seq<KafkaServer>)this.servers(), this.topic(), this.nMessages(), 1, TestUtils$.MODULE$.sendMessages$default$5()), List$.MODULE$.canBuildFrom());
        TestUtils$.MODULE$.waitUntilLeaderIsElectedOrChanged(this.zkUtils(), this.topic(), 0, TestUtils$.MODULE$.waitUntilLeaderIsElectedOrChanged$default$4(), TestUtils$.MODULE$.waitUntilLeaderIsElectedOrChanged$default$5(), TestUtils$.MODULE$.waitUntilLeaderIsElectedOrChanged$default$6());
        TestUtils$.MODULE$.waitUntilLeaderIsElectedOrChanged(this.zkUtils(), this.topic(), 1, TestUtils$.MODULE$.waitUntilLeaderIsElectedOrChanged$default$4(), TestUtils$.MODULE$.waitUntilLeaderIsElectedOrChanged$default$5(), TestUtils$.MODULE$.waitUntilLeaderIsElectedOrChanged$default$6());
        TestUtils$.MODULE$.waitUntilMetadataIsPropagated((Seq<KafkaServer>)this.servers(), this.topic(), 0, TestUtils$.MODULE$.waitUntilMetadataIsPropagated$default$4());
        TestUtils$.MODULE$.waitUntilMetadataIsPropagated((Seq<KafkaServer>)this.servers(), this.topic(), 1, TestUtils$.MODULE$.waitUntilMetadataIsPropagated$default$4());
        ConsumerConfig consumerConfig1 = new ConsumerConfig(TestUtils$.MODULE$.createConsumerProperties(this.zkConnect(), this.group(), this.consumer1(), TestUtils$.MODULE$.createConsumerProperties$default$4()));
        ZookeeperConsumerConnector zkConsumerConnector1 = new ZookeeperConsumerConnector(consumerConfig1, true);
        Map topicMessageStreams1 = zkConsumerConnector1.createMessageStreams((Map)Map$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)this.topic()), (Object)BoxesRunTime.boxToInteger((int)1))})), (Decoder)new StringDecoder(StringDecoder$.MODULE$.$lessinit$greater$default$1()), (Decoder)new StringDecoder(StringDecoder$.MODULE$.$lessinit$greater$default$1()));
        List<String> receivedMessages1 = TestUtils$.MODULE$.getMessages((Map<String, List<KafkaStream<String, String>>>)topicMessageStreams1, this.nMessages() * 2);
        Assert.assertEquals((Object)sentMessages1.sorted((Ordering)Ordering.String$.MODULE$), (Object)receivedMessages1.sorted((Ordering)Ordering.String$.MODULE$));
        Seq<Tuple2<String, String>> actual_1 = this.getZKChildrenValues(this.dirs().consumerOwnerDir());
        List expected_1 = List$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tuple2[]{new Tuple2((Object)"0", (Object)"group1_consumer1-0"), new Tuple2((Object)"1", (Object)"group1_consumer1-0")}));
        Assert.assertEquals((Object)expected_1, actual_1);
        zkConsumerConnector1.commitOffsets(true);
        ConsumerConfig consumerConfig2 = new ConsumerConfig(this){
            private final int rebalanceBackoffMs;

            public int rebalanceBackoffMs() {
                return this.rebalanceBackoffMs;
            }
            {
                this.rebalanceBackoffMs = $outer.RebalanceBackoffMs();
            }
        };
        ZookeeperConsumerConnector zkConsumerConnector2 = new ZookeeperConsumerConnector(consumerConfig2, true);
        Map topicMessageStreams2 = zkConsumerConnector2.createMessageStreams((Map)Map$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)this.topic()), (Object)BoxesRunTime.boxToInteger((int)1))})), (Decoder)new StringDecoder(StringDecoder$.MODULE$.$lessinit$greater$default$1()), (Decoder)new StringDecoder(StringDecoder$.MODULE$.$lessinit$greater$default$1()));
        List sentMessages2 = (List)TestUtils$.MODULE$.sendMessages((Seq<KafkaServer>)this.servers(), this.topic(), this.nMessages(), 0, TestUtils$.MODULE$.sendMessages$default$5()).$plus$plus(TestUtils$.MODULE$.sendMessages((Seq<KafkaServer>)this.servers(), this.topic(), this.nMessages(), 1, TestUtils$.MODULE$.sendMessages$default$5()), List$.MODULE$.canBuildFrom());
        TestUtils$.MODULE$.waitUntilLeaderIsElectedOrChanged(this.zkUtils(), this.topic(), 0, TestUtils$.MODULE$.waitUntilLeaderIsElectedOrChanged$default$4(), TestUtils$.MODULE$.waitUntilLeaderIsElectedOrChanged$default$5(), TestUtils$.MODULE$.waitUntilLeaderIsElectedOrChanged$default$6());
        TestUtils$.MODULE$.waitUntilLeaderIsElectedOrChanged(this.zkUtils(), this.topic(), 1, TestUtils$.MODULE$.waitUntilLeaderIsElectedOrChanged$default$4(), TestUtils$.MODULE$.waitUntilLeaderIsElectedOrChanged$default$5(), TestUtils$.MODULE$.waitUntilLeaderIsElectedOrChanged$default$6());
        List receivedMessages2 = (List)TestUtils$.MODULE$.getMessages((Map<String, List<KafkaStream<String, String>>>)topicMessageStreams1, this.nMessages()).$plus$plus(TestUtils$.MODULE$.getMessages((Map<String, List<KafkaStream<String, String>>>)topicMessageStreams2, this.nMessages()), List$.MODULE$.canBuildFrom());
        Assert.assertEquals((Object)sentMessages2.sorted((Ordering)Ordering.String$.MODULE$), (Object)receivedMessages2.sorted((Ordering)Ordering.String$.MODULE$));
        Seq<Tuple2<String, String>> actual_2 = this.getZKChildrenValues(this.dirs().consumerOwnerDir());
        List expected_2 = List$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tuple2[]{new Tuple2((Object)"0", (Object)"group1_consumer1-0"), new Tuple2((Object)"1", (Object)"group1_consumer2-0")}));
        Assert.assertEquals((Object)expected_2, actual_2);
        ConsumerConfig consumerConfig3 = new ConsumerConfig(TestUtils$.MODULE$.createConsumerProperties(this.zkConnect(), this.group(), this.consumer3(), TestUtils$.MODULE$.createConsumerProperties$default$4()));
        ZookeeperConsumerConnector zkConsumerConnector3 = new ZookeeperConsumerConnector(consumerConfig3, true);
        zkConsumerConnector3.createMessageStreams((Map)new HashMap());
        List sentMessages3 = (List)TestUtils$.MODULE$.sendMessages((Seq<KafkaServer>)this.servers(), this.topic(), this.nMessages(), 0, TestUtils$.MODULE$.sendMessages$default$5()).$plus$plus(TestUtils$.MODULE$.sendMessages((Seq<KafkaServer>)this.servers(), this.topic(), this.nMessages(), 1, TestUtils$.MODULE$.sendMessages$default$5()), List$.MODULE$.canBuildFrom());
        TestUtils$.MODULE$.waitUntilLeaderIsElectedOrChanged(this.zkUtils(), this.topic(), 0, TestUtils$.MODULE$.waitUntilLeaderIsElectedOrChanged$default$4(), TestUtils$.MODULE$.waitUntilLeaderIsElectedOrChanged$default$5(), TestUtils$.MODULE$.waitUntilLeaderIsElectedOrChanged$default$6());
        TestUtils$.MODULE$.waitUntilLeaderIsElectedOrChanged(this.zkUtils(), this.topic(), 1, TestUtils$.MODULE$.waitUntilLeaderIsElectedOrChanged$default$4(), TestUtils$.MODULE$.waitUntilLeaderIsElectedOrChanged$default$5(), TestUtils$.MODULE$.waitUntilLeaderIsElectedOrChanged$default$6());
        List receivedMessages3 = (List)TestUtils$.MODULE$.getMessages((Map<String, List<KafkaStream<String, String>>>)topicMessageStreams1, this.nMessages()).$plus$plus(TestUtils$.MODULE$.getMessages((Map<String, List<KafkaStream<String, String>>>)topicMessageStreams2, this.nMessages()), List$.MODULE$.canBuildFrom());
        Assert.assertEquals((Object)sentMessages3.sorted((Ordering)Ordering.String$.MODULE$), (Object)receivedMessages3.sorted((Ordering)Ordering.String$.MODULE$));
        Seq<Tuple2<String, String>> actual_3 = this.getZKChildrenValues(this.dirs().consumerOwnerDir());
        Assert.assertEquals((Object)expected_2, actual_3);
        try {
            zkConsumerConnector3.createMessageStreams((Map)new HashMap());
            throw this.fail("Should fail with MessageStreamsExistException", new Position("ZookeeperConsumerConnectorTest.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 167));
        }
        catch (MessageStreamsExistException messageStreamsExistException) {
            zkConsumerConnector1.shutdown();
            zkConsumerConnector2.shutdown();
            zkConsumerConnector3.shutdown();
            this.info((Function0<String>)new Serializable(this){
                public static final long serialVersionUID = 0L;

                public final String apply() {
                    return "all consumer connectors stopped";
                }
            });
            requestHandlerLogger.setLevel(Level.ERROR);
            return;
        }
    }

    @Test
    public void testCompression() {
        Logger requestHandlerLogger = Logger.getLogger(KafkaRequestHandler.class);
        requestHandlerLogger.setLevel(Level.FATAL);
        List sentMessages1 = (List)TestUtils$.MODULE$.sendMessages((Seq<KafkaServer>)this.servers(), this.topic(), this.nMessages(), 0, (CompressionCodec)GZIPCompressionCodec$.MODULE$).$plus$plus(TestUtils$.MODULE$.sendMessages((Seq<KafkaServer>)this.servers(), this.topic(), this.nMessages(), 1, (CompressionCodec)GZIPCompressionCodec$.MODULE$), List$.MODULE$.canBuildFrom());
        TestUtils$.MODULE$.waitUntilLeaderIsElectedOrChanged(this.zkUtils(), this.topic(), 0, TestUtils$.MODULE$.waitUntilLeaderIsElectedOrChanged$default$4(), TestUtils$.MODULE$.waitUntilLeaderIsElectedOrChanged$default$5(), TestUtils$.MODULE$.waitUntilLeaderIsElectedOrChanged$default$6());
        TestUtils$.MODULE$.waitUntilLeaderIsElectedOrChanged(this.zkUtils(), this.topic(), 1, TestUtils$.MODULE$.waitUntilLeaderIsElectedOrChanged$default$4(), TestUtils$.MODULE$.waitUntilLeaderIsElectedOrChanged$default$5(), TestUtils$.MODULE$.waitUntilLeaderIsElectedOrChanged$default$6());
        TestUtils$.MODULE$.waitUntilMetadataIsPropagated((Seq<KafkaServer>)this.servers(), this.topic(), 0, TestUtils$.MODULE$.waitUntilMetadataIsPropagated$default$4());
        TestUtils$.MODULE$.waitUntilMetadataIsPropagated((Seq<KafkaServer>)this.servers(), this.topic(), 1, TestUtils$.MODULE$.waitUntilMetadataIsPropagated$default$4());
        ConsumerConfig consumerConfig1 = new ConsumerConfig(TestUtils$.MODULE$.createConsumerProperties(this.zkConnect(), this.group(), this.consumer1(), TestUtils$.MODULE$.createConsumerProperties$default$4()));
        ZookeeperConsumerConnector zkConsumerConnector1 = new ZookeeperConsumerConnector(consumerConfig1, true);
        Map topicMessageStreams1 = zkConsumerConnector1.createMessageStreams((Map)Map$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)this.topic()), (Object)BoxesRunTime.boxToInteger((int)1))})), (Decoder)new StringDecoder(StringDecoder$.MODULE$.$lessinit$greater$default$1()), (Decoder)new StringDecoder(StringDecoder$.MODULE$.$lessinit$greater$default$1()));
        List<String> receivedMessages1 = TestUtils$.MODULE$.getMessages((Map<String, List<KafkaStream<String, String>>>)topicMessageStreams1, this.nMessages() * 2);
        Assert.assertEquals((Object)sentMessages1.sorted((Ordering)Ordering.String$.MODULE$), (Object)receivedMessages1.sorted((Ordering)Ordering.String$.MODULE$));
        Seq<Tuple2<String, String>> actual_1 = this.getZKChildrenValues(this.dirs().consumerOwnerDir());
        List expected_1 = List$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tuple2[]{new Tuple2((Object)"0", (Object)"group1_consumer1-0"), new Tuple2((Object)"1", (Object)"group1_consumer1-0")}));
        Assert.assertEquals((Object)expected_1, actual_1);
        zkConsumerConnector1.commitOffsets(true);
        ConsumerConfig consumerConfig2 = new ConsumerConfig(this){
            private final int rebalanceBackoffMs;

            public int rebalanceBackoffMs() {
                return this.rebalanceBackoffMs;
            }
            {
                this.rebalanceBackoffMs = $outer.RebalanceBackoffMs();
            }
        };
        ZookeeperConsumerConnector zkConsumerConnector2 = new ZookeeperConsumerConnector(consumerConfig2, true);
        Map topicMessageStreams2 = zkConsumerConnector2.createMessageStreams((Map)Map$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)this.topic()), (Object)BoxesRunTime.boxToInteger((int)1))})), (Decoder)new StringDecoder(StringDecoder$.MODULE$.$lessinit$greater$default$1()), (Decoder)new StringDecoder(StringDecoder$.MODULE$.$lessinit$greater$default$1()));
        List sentMessages2 = (List)TestUtils$.MODULE$.sendMessages((Seq<KafkaServer>)this.servers(), this.topic(), this.nMessages(), 0, (CompressionCodec)GZIPCompressionCodec$.MODULE$).$plus$plus(TestUtils$.MODULE$.sendMessages((Seq<KafkaServer>)this.servers(), this.topic(), this.nMessages(), 1, (CompressionCodec)GZIPCompressionCodec$.MODULE$), List$.MODULE$.canBuildFrom());
        TestUtils$.MODULE$.waitUntilLeaderIsElectedOrChanged(this.zkUtils(), this.topic(), 0, TestUtils$.MODULE$.waitUntilLeaderIsElectedOrChanged$default$4(), TestUtils$.MODULE$.waitUntilLeaderIsElectedOrChanged$default$5(), TestUtils$.MODULE$.waitUntilLeaderIsElectedOrChanged$default$6());
        TestUtils$.MODULE$.waitUntilLeaderIsElectedOrChanged(this.zkUtils(), this.topic(), 1, TestUtils$.MODULE$.waitUntilLeaderIsElectedOrChanged$default$4(), TestUtils$.MODULE$.waitUntilLeaderIsElectedOrChanged$default$5(), TestUtils$.MODULE$.waitUntilLeaderIsElectedOrChanged$default$6());
        List receivedMessages2 = (List)TestUtils$.MODULE$.getMessages((Map<String, List<KafkaStream<String, String>>>)topicMessageStreams1, this.nMessages()).$plus$plus(TestUtils$.MODULE$.getMessages((Map<String, List<KafkaStream<String, String>>>)topicMessageStreams2, this.nMessages()), List$.MODULE$.canBuildFrom());
        Assert.assertEquals((Object)sentMessages2.sorted((Ordering)Ordering.String$.MODULE$), (Object)receivedMessages2.sorted((Ordering)Ordering.String$.MODULE$));
        Seq<Tuple2<String, String>> actual_2 = this.getZKChildrenValues(this.dirs().consumerOwnerDir());
        List expected_2 = List$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tuple2[]{new Tuple2((Object)"0", (Object)"group1_consumer1-0"), new Tuple2((Object)"1", (Object)"group1_consumer2-0")}));
        Assert.assertEquals((Object)expected_2, actual_2);
        ConsumerConfig consumerConfig3 = new ConsumerConfig(TestUtils$.MODULE$.createConsumerProperties(this.zkConnect(), this.group(), this.consumer3(), TestUtils$.MODULE$.createConsumerProperties$default$4()));
        ZookeeperConsumerConnector zkConsumerConnector3 = new ZookeeperConsumerConnector(consumerConfig3, true);
        zkConsumerConnector3.createMessageStreams((Map)new HashMap(), (Decoder)new StringDecoder(StringDecoder$.MODULE$.$lessinit$greater$default$1()), (Decoder)new StringDecoder(StringDecoder$.MODULE$.$lessinit$greater$default$1()));
        List sentMessages3 = (List)TestUtils$.MODULE$.sendMessages((Seq<KafkaServer>)this.servers(), this.topic(), this.nMessages(), 0, (CompressionCodec)GZIPCompressionCodec$.MODULE$).$plus$plus(TestUtils$.MODULE$.sendMessages((Seq<KafkaServer>)this.servers(), this.topic(), this.nMessages(), 1, (CompressionCodec)GZIPCompressionCodec$.MODULE$), List$.MODULE$.canBuildFrom());
        TestUtils$.MODULE$.waitUntilLeaderIsElectedOrChanged(this.zkUtils(), this.topic(), 0, TestUtils$.MODULE$.waitUntilLeaderIsElectedOrChanged$default$4(), TestUtils$.MODULE$.waitUntilLeaderIsElectedOrChanged$default$5(), TestUtils$.MODULE$.waitUntilLeaderIsElectedOrChanged$default$6());
        TestUtils$.MODULE$.waitUntilLeaderIsElectedOrChanged(this.zkUtils(), this.topic(), 1, TestUtils$.MODULE$.waitUntilLeaderIsElectedOrChanged$default$4(), TestUtils$.MODULE$.waitUntilLeaderIsElectedOrChanged$default$5(), TestUtils$.MODULE$.waitUntilLeaderIsElectedOrChanged$default$6());
        List receivedMessages3 = (List)TestUtils$.MODULE$.getMessages((Map<String, List<KafkaStream<String, String>>>)topicMessageStreams1, this.nMessages()).$plus$plus(TestUtils$.MODULE$.getMessages((Map<String, List<KafkaStream<String, String>>>)topicMessageStreams2, this.nMessages()), List$.MODULE$.canBuildFrom());
        Assert.assertEquals((Object)sentMessages3.sorted((Ordering)Ordering.String$.MODULE$), (Object)receivedMessages3.sorted((Ordering)Ordering.String$.MODULE$));
        Seq<Tuple2<String, String>> actual_3 = this.getZKChildrenValues(this.dirs().consumerOwnerDir());
        Assert.assertEquals((Object)expected_2, actual_3);
        zkConsumerConnector1.shutdown();
        zkConsumerConnector2.shutdown();
        zkConsumerConnector3.shutdown();
        this.info((Function0<String>)new Serializable(this){
            public static final long serialVersionUID = 0L;

            public final String apply() {
                return "all consumer connectors stopped";
            }
        });
        requestHandlerLogger.setLevel(Level.ERROR);
    }

    @Test
    public void testCompressionSetConsumption() {
        List sentMessages = (List)TestUtils$.MODULE$.sendMessages((Seq<KafkaServer>)this.servers(), this.topic(), 200, 0, (CompressionCodec)DefaultCompressionCodec$.MODULE$).$plus$plus(TestUtils$.MODULE$.sendMessages((Seq<KafkaServer>)this.servers(), this.topic(), 200, 1, (CompressionCodec)DefaultCompressionCodec$.MODULE$), List$.MODULE$.canBuildFrom());
        TestUtils$.MODULE$.waitUntilMetadataIsPropagated((Seq<KafkaServer>)this.servers(), this.topic(), 0, TestUtils$.MODULE$.waitUntilMetadataIsPropagated$default$4());
        TestUtils$.MODULE$.waitUntilMetadataIsPropagated((Seq<KafkaServer>)this.servers(), this.topic(), 1, TestUtils$.MODULE$.waitUntilMetadataIsPropagated$default$4());
        ConsumerConfig consumerConfig1 = new ConsumerConfig(TestUtils$.MODULE$.createConsumerProperties(this.zkConnect(), this.group(), this.consumer0(), TestUtils$.MODULE$.createConsumerProperties$default$4()));
        ZookeeperConsumerConnector zkConsumerConnector1 = new ZookeeperConsumerConnector(consumerConfig1, true);
        Map topicMessageStreams1 = zkConsumerConnector1.createMessageStreams((Map)Map$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)this.topic()), (Object)BoxesRunTime.boxToInteger((int)1))})), (Decoder)new StringDecoder(StringDecoder$.MODULE$.$lessinit$greater$default$1()), (Decoder)new StringDecoder(StringDecoder$.MODULE$.$lessinit$greater$default$1()));
        List<String> receivedMessages = TestUtils$.MODULE$.getMessages((Map<String, List<KafkaStream<String, String>>>)topicMessageStreams1, 400);
        Assert.assertEquals((Object)sentMessages.sorted((Ordering)Ordering.String$.MODULE$), (Object)receivedMessages.sorted((Ordering)Ordering.String$.MODULE$));
        Seq<Tuple2<String, String>> actual_2 = this.getZKChildrenValues(this.dirs().consumerOwnerDir());
        List expected_2 = List$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tuple2[]{new Tuple2((Object)"0", (Object)"group1_consumer0-0"), new Tuple2((Object)"1", (Object)"group1_consumer0-0")}));
        Assert.assertEquals((Object)expected_2, actual_2);
        zkConsumerConnector1.shutdown();
    }

    @Test
    public void testConsumerDecoder() {
        Logger requestHandlerLogger = Logger.getLogger(KafkaRequestHandler.class);
        requestHandlerLogger.setLevel(Level.FATAL);
        List sentMessages = (List)TestUtils$.MODULE$.sendMessages((Seq<KafkaServer>)this.servers(), this.topic(), this.nMessages(), 0, (CompressionCodec)NoCompressionCodec$.MODULE$).$plus$plus(TestUtils$.MODULE$.sendMessages((Seq<KafkaServer>)this.servers(), this.topic(), this.nMessages(), 1, (CompressionCodec)NoCompressionCodec$.MODULE$), List$.MODULE$.canBuildFrom());
        TestUtils$.MODULE$.waitUntilMetadataIsPropagated((Seq<KafkaServer>)this.servers(), this.topic(), 0, TestUtils$.MODULE$.waitUntilMetadataIsPropagated$default$4());
        TestUtils$.MODULE$.waitUntilMetadataIsPropagated((Seq<KafkaServer>)this.servers(), this.topic(), 1, TestUtils$.MODULE$.waitUntilMetadataIsPropagated$default$4());
        ConsumerConfig consumerConfig = new ConsumerConfig(TestUtils$.MODULE$.createConsumerProperties(this.zkConnect(), this.group(), this.consumer1(), TestUtils$.MODULE$.createConsumerProperties$default$4()));
        TestUtils$.MODULE$.waitUntilLeaderIsElectedOrChanged(this.zkUtils(), this.topic(), 0, TestUtils$.MODULE$.waitUntilLeaderIsElectedOrChanged$default$4(), TestUtils$.MODULE$.waitUntilLeaderIsElectedOrChanged$default$5(), TestUtils$.MODULE$.waitUntilLeaderIsElectedOrChanged$default$6());
        TestUtils$.MODULE$.waitUntilLeaderIsElectedOrChanged(this.zkUtils(), this.topic(), 1, TestUtils$.MODULE$.waitUntilLeaderIsElectedOrChanged$default$4(), TestUtils$.MODULE$.waitUntilLeaderIsElectedOrChanged$default$5(), TestUtils$.MODULE$.waitUntilLeaderIsElectedOrChanged$default$6());
        ZookeeperConsumerConnector zkConsumerConnector = new ZookeeperConsumerConnector(consumerConfig, true);
        Map topicMessageStreams = zkConsumerConnector.createMessageStreams((Map)Map$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)this.topic()), (Object)BoxesRunTime.boxToInteger((int)1))})), (Decoder)new StringDecoder(StringDecoder$.MODULE$.$lessinit$greater$default$1()), (Decoder)new StringDecoder(StringDecoder$.MODULE$.$lessinit$greater$default$1()));
        ObjectRef receivedMessages = ObjectRef.create((Object)Nil$.MODULE$);
        topicMessageStreams.values().foreach((Function1)new Serializable(this, receivedMessages){
            public static final long serialVersionUID = 0L;
            private final /* synthetic */ ZookeeperConsumerConnectorTest $outer;
            public final ObjectRef receivedMessages$1;

            public final void apply(List<KafkaStream<String, String>> messageStreams) {
                messageStreams.foreach((Function1)new Serializable(this){
                    public static final long serialVersionUID = 0L;
                    private final /* synthetic */ $anonfun$testConsumerDecoder$1 $outer;

                    public final void apply(KafkaStream<String, String> messageStream) {
                        ConsumerIterator iterator = messageStream.iterator();
                        RichInt$.MODULE$.until$extension0(Predef$.MODULE$.intWrapper(0), this.$outer.kafka$consumer$ZookeeperConsumerConnectorTest$$anonfun$$$outer().nMessages() * 2).foreach$mVc$sp((Function1)new Serializable(this, iterator){
                            public static final long serialVersionUID = 0L;
                            private final /* synthetic */ $anonfun$testConsumerDecoder$1$$anonfun$apply$2 $outer;
                            private final ConsumerIterator iterator$1;

                            public final void apply(int _) {
                                this.apply$mcVI$sp(_);
                            }

                            public void apply$mcVI$sp(int _) {
                                Assert.assertTrue((boolean)this.iterator$1.hasNext());
                                String message = (String)this.iterator$1.next().message();
                                this.$outer.kafka$consumer$ZookeeperConsumerConnectorTest$$anonfun$$anonfun$$$outer().receivedMessages$1.elem = ((List)this.$outer.kafka$consumer$ZookeeperConsumerConnectorTest$$anonfun$$anonfun$$$outer().receivedMessages$1.elem).$colon$colon((Object)message);
                                this.$outer.kafka$consumer$ZookeeperConsumerConnectorTest$$anonfun$$anonfun$$$outer().kafka$consumer$ZookeeperConsumerConnectorTest$$anonfun$$$outer().debug((Function0<String>)new Serializable(this, message){
                                    public static final long serialVersionUID = 0L;
                                    private final String message$1;

                                    public final String apply() {
                                        return new StringBuilder().append((Object)"received message: ").append((Object)this.message$1).toString();
                                    }
                                    {
                                        this.message$1 = message$1;
                                    }
                                });
                            }
                            {
                                if ($outer == null) {
                                    throw null;
                                }
                                this.$outer = $outer;
                                this.iterator$1 = iterator$1;
                            }
                        });
                    }

                    public /* synthetic */ $anonfun$testConsumerDecoder$1 kafka$consumer$ZookeeperConsumerConnectorTest$$anonfun$$anonfun$$$outer() {
                        return this.$outer;
                    }
                    {
                        if ($outer == null) {
                            throw null;
                        }
                        this.$outer = $outer;
                    }
                });
            }

            public /* synthetic */ ZookeeperConsumerConnectorTest kafka$consumer$ZookeeperConsumerConnectorTest$$anonfun$$$outer() {
                return this.$outer;
            }
            {
                if ($outer == null) {
                    throw null;
                }
                this.$outer = $outer;
                this.receivedMessages$1 = receivedMessages$1;
            }
        });
        Assert.assertEquals((Object)sentMessages.sorted((Ordering)Ordering.String$.MODULE$), (Object)((List)receivedMessages.elem).sorted((Ordering)Ordering.String$.MODULE$));
        zkConsumerConnector.shutdown();
        requestHandlerLogger.setLevel(Level.ERROR);
    }

    @Test
    public void testLeaderSelectionForPartition() {
        ZkUtils zkUtils = ZkUtils$.MODULE$.apply(this.zkConnect(), 6000, 30000, false);
        TestUtils$.MODULE$.createTopic(zkUtils, this.topic(), 1, 1, (Seq<KafkaServer>)this.servers(), TestUtils$.MODULE$.createTopic$default$6());
        List<String> sentMessages1 = TestUtils$.MODULE$.sendMessages((Seq<KafkaServer>)this.servers(), this.topic(), this.nMessages(), TestUtils$.MODULE$.sendMessages$default$4(), TestUtils$.MODULE$.sendMessages$default$5());
        ConsumerConfig consumerConfig1 = new ConsumerConfig(TestUtils$.MODULE$.createConsumerProperties(this.zkConnect(), this.group(), this.consumer1(), TestUtils$.MODULE$.createConsumerProperties$default$4()));
        ZookeeperConsumerConnector zkConsumerConnector1 = new ZookeeperConsumerConnector(consumerConfig1, true);
        Map topicMessageStreams1 = zkConsumerConnector1.createMessageStreams((Map)Map$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)this.topic()), (Object)BoxesRunTime.boxToInteger((int)1))})), (Decoder)new StringDecoder(StringDecoder$.MODULE$.$lessinit$greater$default$1()), (Decoder)new StringDecoder(StringDecoder$.MODULE$.$lessinit$greater$default$1()));
        Pool topicRegistry = zkConsumerConnector1.getTopicRegistry();
        Assert.assertEquals((long)1L, (long)((TraversableOnce)topicRegistry.map((Function1)new Serializable(this){
            public static final long serialVersionUID = 0L;

            public final String apply(Tuple2<String, Pool<Object, PartitionTopicInfo>> r) {
                return (String)r._1();
            }
        }, Iterable$.MODULE$.canBuildFrom())).size());
        Assert.assertEquals((Object)this.topic(), (Object)((IterableLike)topicRegistry.map((Function1)new Serializable(this){
            public static final long serialVersionUID = 0L;

            public final String apply(Tuple2<String, Pool<Object, PartitionTopicInfo>> r) {
                return (String)r._1();
            }
        }, Iterable$.MODULE$.canBuildFrom())).head());
        Iterable topicsAndPartitionsInRegistry = (Iterable)topicRegistry.map((Function1)new Serializable(this){
            public static final long serialVersionUID = 0L;

            public final Tuple2<String, Iterable<PartitionTopicInfo>> apply(Tuple2<String, Pool<Object, PartitionTopicInfo>> r) {
                return new Tuple2(r._1(), ((TraversableLike)r._2()).map((Function1)new Serializable(this){
                    public static final long serialVersionUID = 0L;

                    public final PartitionTopicInfo apply(Tuple2<Object, PartitionTopicInfo> p) {
                        return (PartitionTopicInfo)p._2();
                    }
                }, Iterable$.MODULE$.canBuildFrom()));
            }
        }, Iterable$.MODULE$.canBuildFrom());
        PartitionTopicInfo brokerPartition = (PartitionTopicInfo)((IterableLike)((Tuple2)topicsAndPartitionsInRegistry.head())._2()).head();
        Assert.assertEquals((long)0L, (long)brokerPartition.partitionId());
        Seq<Tuple2<String, String>> actual_1 = this.getZKChildrenValues(this.dirs().consumerOwnerDir());
        List expected_1 = List$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tuple2[]{new Tuple2((Object)"0", (Object)"group1_consumer1-0")}));
        Assert.assertEquals((Object)expected_1, actual_1);
        List<String> receivedMessages1 = TestUtils$.MODULE$.getMessages((Map<String, List<KafkaStream<String, String>>>)topicMessageStreams1, this.nMessages());
        Assert.assertEquals(sentMessages1, receivedMessages1);
        zkConsumerConnector1.shutdown();
        zkUtils.close();
    }

    @Test
    public void testConsumerRebalanceListener() {
        TestUtils$.MODULE$.sendMessages((Seq<KafkaServer>)this.servers(), this.topic(), this.nMessages(), 0, TestUtils$.MODULE$.sendMessages$default$5());
        TestUtils$.MODULE$.sendMessages((Seq<KafkaServer>)this.servers(), this.topic(), this.nMessages(), 1, TestUtils$.MODULE$.sendMessages$default$5());
        ConsumerConfig consumerConfig1 = new ConsumerConfig(TestUtils$.MODULE$.createConsumerProperties(this.zkConnect(), this.group(), this.consumer1(), TestUtils$.MODULE$.createConsumerProperties$default$4()));
        ZookeeperConsumerConnector zkConsumerConnector1 = new ZookeeperConsumerConnector(consumerConfig1, true);
        TestConsumerRebalanceListener rebalanceListener1 = new TestConsumerRebalanceListener();
        zkConsumerConnector1.setConsumerRebalanceListener((ConsumerRebalanceListener)rebalanceListener1);
        Map topicMessageStreams1 = zkConsumerConnector1.createMessageStreams((Map)Map$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)this.topic()), (Object)BoxesRunTime.boxToInteger((int)1))})), (Decoder)new StringDecoder(StringDecoder$.MODULE$.$lessinit$greater$default$1()), (Decoder)new StringDecoder(StringDecoder$.MODULE$.$lessinit$greater$default$1()));
        Assert.assertEquals((Object)BoxesRunTime.boxToBoolean((boolean)true), (Object)BoxesRunTime.boxToBoolean((boolean)rebalanceListener1.beforeReleasingPartitionsCalled()));
        Assert.assertEquals((Object)BoxesRunTime.boxToBoolean((boolean)true), (Object)BoxesRunTime.boxToBoolean((boolean)rebalanceListener1.beforeStartingFetchersCalled()));
        Assert.assertEquals(null, rebalanceListener1.partitionOwnership().get(this.topic()));
        Assert.assertEquals((Object)"group1_consumer1", (Object)rebalanceListener1.globalPartitionOwnership().get(this.topic()).get(BoxesRunTime.boxToInteger((int)0)).consumer());
        Assert.assertEquals((Object)"group1_consumer1", (Object)rebalanceListener1.globalPartitionOwnership().get(this.topic()).get(BoxesRunTime.boxToInteger((int)1)).consumer());
        Assert.assertEquals((long)0L, (long)rebalanceListener1.globalPartitionOwnership().get(this.topic()).get(BoxesRunTime.boxToInteger((int)0)).threadId());
        Assert.assertEquals((long)0L, (long)rebalanceListener1.globalPartitionOwnership().get(this.topic()).get(BoxesRunTime.boxToInteger((int)1)).threadId());
        Assert.assertEquals((Object)"group1_consumer1", (Object)rebalanceListener1.consumerId());
        rebalanceListener1.beforeReleasingPartitionsCalled_$eq(false);
        rebalanceListener1.beforeStartingFetchersCalled_$eq(false);
        Seq<Tuple2<String, String>> actual_1 = this.getZKChildrenValues(this.dirs().consumerOwnerDir());
        List expected_1 = List$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tuple2[]{new Tuple2((Object)"0", (Object)"group1_consumer1-0"), new Tuple2((Object)"1", (Object)"group1_consumer1-0")}));
        Assert.assertEquals((Object)expected_1, actual_1);
        ConsumerConfig consumerConfig2 = new ConsumerConfig(TestUtils$.MODULE$.createConsumerProperties(this.zkConnect(), this.group(), this.consumer2(), TestUtils$.MODULE$.createConsumerProperties$default$4()));
        ZookeeperConsumerConnector zkConsumerConnector2 = new ZookeeperConsumerConnector(consumerConfig2, true);
        TestConsumerRebalanceListener rebalanceListener2 = new TestConsumerRebalanceListener();
        zkConsumerConnector2.setConsumerRebalanceListener((ConsumerRebalanceListener)rebalanceListener2);
        zkConsumerConnector2.createMessageStreams((Map)Map$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)this.topic()), (Object)BoxesRunTime.boxToInteger((int)1))})), (Decoder)new StringDecoder(StringDecoder$.MODULE$.$lessinit$greater$default$1()), (Decoder)new StringDecoder(StringDecoder$.MODULE$.$lessinit$greater$default$1()));
        TestUtils$.MODULE$.getMessages((Map<String, List<KafkaStream<String, String>>>)topicMessageStreams1, this.nMessages());
        Seq<Tuple2<String, String>> actual_2 = this.getZKChildrenValues(this.dirs().consumerOwnerDir());
        List expected_2 = List$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tuple2[]{new Tuple2((Object)"0", (Object)"group1_consumer1-0"), new Tuple2((Object)"1", (Object)"group1_consumer2-0")}));
        Assert.assertEquals((Object)expected_2, actual_2);
        Assert.assertEquals((Object)BoxesRunTime.boxToBoolean((boolean)true), (Object)BoxesRunTime.boxToBoolean((boolean)rebalanceListener1.beforeReleasingPartitionsCalled()));
        Assert.assertEquals((Object)BoxesRunTime.boxToBoolean((boolean)true), (Object)BoxesRunTime.boxToBoolean((boolean)rebalanceListener1.beforeStartingFetchersCalled()));
        Assert.assertEquals((Object)Set$.MODULE$.apply((Seq)Predef$.MODULE$.wrapIntArray(new int[]{0, 1})), rebalanceListener1.partitionOwnership().get(this.topic()));
        Assert.assertEquals((Object)"group1_consumer1", (Object)rebalanceListener1.globalPartitionOwnership().get(this.topic()).get(BoxesRunTime.boxToInteger((int)0)).consumer());
        Assert.assertEquals((Object)"group1_consumer2", (Object)rebalanceListener1.globalPartitionOwnership().get(this.topic()).get(BoxesRunTime.boxToInteger((int)1)).consumer());
        Assert.assertEquals((long)0L, (long)rebalanceListener1.globalPartitionOwnership().get(this.topic()).get(BoxesRunTime.boxToInteger((int)0)).threadId());
        Assert.assertEquals((long)0L, (long)rebalanceListener1.globalPartitionOwnership().get(this.topic()).get(BoxesRunTime.boxToInteger((int)1)).threadId());
        Assert.assertEquals((Object)"group1_consumer1", (Object)rebalanceListener1.consumerId());
        Assert.assertEquals((Object)"group1_consumer2", (Object)rebalanceListener2.consumerId());
        Assert.assertEquals(rebalanceListener1.globalPartitionOwnership(), rebalanceListener2.globalPartitionOwnership());
        zkConsumerConnector1.shutdown();
        zkConsumerConnector2.shutdown();
    }

    public Seq<Tuple2<String, String>> getZKChildrenValues(String path) {
        java.util.List children = this.zkUtils().zkClient().getChildren(path);
        Collections.sort(children);
        Seq childrenAsSeq = JavaConversions$.MODULE$.asScalaBuffer(children).toSeq();
        return (Seq)childrenAsSeq.map((Function1)new Serializable(this, path){
            public static final long serialVersionUID = 0L;
            private final /* synthetic */ ZookeeperConsumerConnectorTest $outer;
            private final String path$1;

            public final Tuple2<String, String> apply(String partition) {
                return new Tuple2((Object)partition, (Object)((String)this.$outer.zkUtils().zkClient().readData(new StringBuilder().append((Object)this.path$1).append((Object)"/").append((Object)partition).toString())));
            }
            {
                if ($outer == null) {
                    throw null;
                }
                this.$outer = $outer;
                this.path$1 = path$1;
            }
        }, Seq$.MODULE$.canBuildFrom());
    }

    public ZookeeperConsumerConnectorTest() {
        this.RebalanceBackoffMs = 5000;
        this.numNodes = 2;
        this.numParts = 2;
        this.topic = "topic1";
        this.overridingProps().put(KafkaConfig$.MODULE$.NumPartitionsProp(), ((Object)BoxesRunTime.boxToInteger((int)this.numParts())).toString());
        this.group = "group1";
        this.consumer0 = "consumer0";
        this.consumer1 = "consumer1";
        this.consumer2 = "consumer2";
        this.consumer3 = "consumer3";
        this.nMessages = 2;
    }

    public class TestConsumerRebalanceListener
    implements ConsumerRebalanceListener {
        private boolean beforeReleasingPartitionsCalled;
        private boolean beforeStartingFetchersCalled;
        private String consumerId;
        private java.util.Map<String, Set<Integer>> partitionOwnership;
        private java.util.Map<String, java.util.Map<Integer, ConsumerThreadId>> globalPartitionOwnership;

        public boolean beforeReleasingPartitionsCalled() {
            return this.beforeReleasingPartitionsCalled;
        }

        public void beforeReleasingPartitionsCalled_$eq(boolean x$1) {
            this.beforeReleasingPartitionsCalled = x$1;
        }

        public boolean beforeStartingFetchersCalled() {
            return this.beforeStartingFetchersCalled;
        }

        public void beforeStartingFetchersCalled_$eq(boolean x$1) {
            this.beforeStartingFetchersCalled = x$1;
        }

        public String consumerId() {
            return this.consumerId;
        }

        public void consumerId_$eq(String x$1) {
            this.consumerId = x$1;
        }

        public java.util.Map<String, Set<Integer>> partitionOwnership() {
            return this.partitionOwnership;
        }

        public void partitionOwnership_$eq(java.util.Map<String, Set<Integer>> x$1) {
            this.partitionOwnership = x$1;
        }

        public java.util.Map<String, java.util.Map<Integer, ConsumerThreadId>> globalPartitionOwnership() {
            return this.globalPartitionOwnership;
        }

        public void globalPartitionOwnership_$eq(java.util.Map<String, java.util.Map<Integer, ConsumerThreadId>> x$1) {
            this.globalPartitionOwnership = x$1;
        }

        public void beforeReleasingPartitions(java.util.Map<String, Set<Integer>> partitionOwnership) {
            this.beforeReleasingPartitionsCalled_$eq(true);
            this.partitionOwnership_$eq(partitionOwnership);
        }

        public void beforeStartingFetchers(String consumerId, java.util.Map<String, java.util.Map<Integer, ConsumerThreadId>> globalPartitionOwnership) {
            this.beforeStartingFetchersCalled_$eq(true);
            this.consumerId_$eq(consumerId);
            this.globalPartitionOwnership_$eq(globalPartitionOwnership);
        }

        public /* synthetic */ ZookeeperConsumerConnectorTest kafka$consumer$ZookeeperConsumerConnectorTest$TestConsumerRebalanceListener$$$outer() {
            return ZookeeperConsumerConnectorTest.this;
        }

        public TestConsumerRebalanceListener() {
            if (ZookeeperConsumerConnectorTest.this == null) {
                throw null;
            }
            this.beforeReleasingPartitionsCalled = false;
            this.beforeStartingFetchersCalled = false;
            this.consumerId = "";
            this.partitionOwnership = null;
            this.globalPartitionOwnership = null;
        }
    }
}

