/*
 * Decompiled with CFR 0.152.
 */
package kafka.integration;

import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import junit.framework.Assert;
import kafka.admin.AdminUtils$;
import kafka.cluster.Cluster;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerFetcherManager;
import kafka.consumer.FetchedDataChunk;
import kafka.consumer.PartitionTopicInfo;
import kafka.consumer.ZookeeperConsumerConnector$;
import kafka.integration.FetcherTest$;
import kafka.integration.KafkaServerTestHarness;
import kafka.integration.KafkaServerTestHarness$class;
import kafka.message.MessageAndOffset;
import kafka.producer.KeyedMessage;
import kafka.producer.Producer;
import kafka.serializer.DefaultEncoder;
import kafka.serializer.DefaultEncoder$;
import kafka.serializer.StringEncoder;
import kafka.serializer.StringEncoder$;
import kafka.server.KafkaConfig;
import kafka.server.KafkaServer;
import kafka.utils.TestUtils$;
import kafka.zk.EmbeddedZookeeper;
import kafka.zk.ZooKeeperTestHarness$class;
import org.I0Itec.zkclient.ZkClient;
import org.scalatest.junit.JUnit3Suite;
import scala.Array$;
import scala.Function1;
import scala.Predef;
import scala.Predef$;
import scala.Serializable;
import scala.Tuple2;
import scala.collection.Iterable;
import scala.collection.Map;
import scala.collection.Map$;
import scala.collection.Seq;
import scala.collection.Seq$;
import scala.collection.TraversableOnce;
import scala.collection.immutable.IndexedSeq$;
import scala.collection.immutable.List;
import scala.collection.immutable.List$;
import scala.collection.mutable.HashMap;
import scala.collection.mutable.StringBuilder;
import scala.reflect.ClassTag$;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxesRunTime;
import scala.runtime.IntRef;
import scala.runtime.RichInt$;
import scala.runtime.ScalaRunTime$;

@ScalaSignature(bytes="\u0006\u0001\u0005Ec\u0001B\u0001\u0003\u0001\u001d\u00111BR3uG\",'\u000fV3ti*\u00111\u0001B\u0001\fS:$Xm\u001a:bi&|gNC\u0001\u0006\u0003\u0015Y\u0017MZ6b\u0007\u0001\u00192\u0001\u0001\u0005\u0013!\tI\u0001#D\u0001\u000b\u0015\tYA\"A\u0003kk:LGO\u0003\u0002\u000e\u001d\u0005I1oY1mCR,7\u000f\u001e\u0006\u0002\u001f\u0005\u0019qN]4\n\u0005EQ!a\u0003&V]&$8gU;ji\u0016\u0004\"a\u0005\u000b\u000e\u0003\tI!!\u0006\u0002\u0003--\u000bgm[1TKJ4XM\u001d+fgRD\u0015M\u001d8fgNDQa\u0006\u0001\u0005\u0002a\ta\u0001P5oSRtD#A\r\u0011\u0005M\u0001\u0001bB\u000e\u0001\u0005\u0004%\t\u0001H\u0001\t]Vlgj\u001c3fgV\tQ\u0004\u0005\u0002\u001fC5\tqDC\u0001!\u0003\u0015\u00198-\u00197b\u0013\t\u0011sDA\u0002J]RDa\u0001\n\u0001!\u0002\u0013i\u0012!\u00038v[:{G-Z:!\u0011\u001d1\u0003A1A\u0005\u0002\u001d\nqaY8oM&<7/F\u0001)!\rIc\u0006M\u0007\u0002U)\u00111\u0006L\u0001\nS6lW\u000f^1cY\u0016T!!L\u0010\u0002\u0015\r|G\u000e\\3di&|g.\u0003\u00020U\t!A*[:u!\t\tD'D\u00013\u0015\t\u0019D!\u0001\u0004tKJ4XM]\u0005\u0003kI\u00121bS1gW\u0006\u001cuN\u001c4jO\"1q\u0007\u0001Q\u0001\n!\n\u0001bY8oM&<7\u000f\t\u0005\bs\u0001\u0011\r\u0011\"\u0001;\u0003!iWm]:bO\u0016\u001cX#A\u001e\u0011\tqzT$Q\u0007\u0002{)\u0011a\bL\u0001\b[V$\u0018M\u00197f\u0013\t\u0001UHA\u0004ICNDW*\u00199\u0011\u0007\t\u001bU)D\u0001-\u0013\t!EFA\u0002TKF\u00042A\b$I\u0013\t9uDA\u0003BeJ\f\u0017\u0010\u0005\u0002\u001f\u0013&\u0011!j\b\u0002\u0005\u0005f$X\r\u0003\u0004M\u0001\u0001\u0006IaO\u0001\n[\u0016\u001c8/Y4fg\u0002BqA\u0014\u0001C\u0002\u0013\u0005q*A\u0003u_BL7-F\u0001Q!\t\tf+D\u0001S\u0015\t\u0019F+\u0001\u0003mC:<'\"A+\u0002\t)\fg/Y\u0005\u0003/J\u0013aa\u0015;sS:<\u0007BB-\u0001A\u0003%\u0001+\u0001\u0004u_BL7\r\t\u0005\b7\u0002\u0011\r\u0011\"\u0001]\u0003\u001d\u0019G.^:uKJ,\u0012!\u0018\t\u0003=\u0002l\u0011a\u0018\u0006\u00037\u0012I!!Y0\u0003\u000f\rcWo\u001d;fe\"11\r\u0001Q\u0001\nu\u000b\u0001b\u00197vgR,'\u000f\t\u0005\bK\u0002\u0011\r\u0011\"\u0001g\u0003!\u0019\b.\u001e;e_^tW#A4\u0011\u0005!\\W\"A5\u000b\u0005)$\u0011\u0001C2p]N,X.\u001a:\n\u00051L'\u0001\u0005$fi\u000eDW\r\u001a#bi\u0006\u001c\u0005.\u001e8l\u0011\u0019q\u0007\u0001)A\u0005O\u0006I1\u000f[;uI><h\u000e\t\u0005\ba\u0002\u0011\r\u0011\"\u0001r\u0003\u0015\tX/Z;f+\u0005\u0011\bcA:yO6\tAO\u0003\u0002vm\u0006Q1m\u001c8dkJ\u0014XM\u001c;\u000b\u0005]$\u0016\u0001B;uS2L!!\u001f;\u0003'1Kgn[3e\u00052|7m[5oOF+X-^3\t\rm\u0004\u0001\u0015!\u0003s\u0003\u0019\tX/Z;fA!9Q\u0010\u0001b\u0001\n\u0003q\u0018A\u0003;pa&\u001c\u0017J\u001c4pgV\tq\u0010\u0005\u0003*]\u0005\u0005\u0001c\u00015\u0002\u0004%\u0019\u0011QA5\u0003%A\u000b'\u000f^5uS>tGk\u001c9jG&sgm\u001c\u0005\b\u0003\u0013\u0001\u0001\u0015!\u0003\u0000\u0003-!x\u000e]5d\u0013:4wn\u001d\u0011\t\u0013\u00055\u0001\u00011A\u0005\u0002\u0005=\u0011a\u00024fi\u000eDWM]\u000b\u0003\u0003#\u00012\u0001[A\n\u0013\r\t)\"\u001b\u0002\u0017\u0007>t7/^7fe\u001a+Go\u00195fe6\u000bg.Y4fe\"I\u0011\u0011\u0004\u0001A\u0002\u0013\u0005\u00111D\u0001\fM\u0016$8\r[3s?\u0012*\u0017\u000f\u0006\u0003\u0002\u001e\u0005\r\u0002c\u0001\u0010\u0002 %\u0019\u0011\u0011E\u0010\u0003\tUs\u0017\u000e\u001e\u0005\u000b\u0003K\t9\"!AA\u0002\u0005E\u0011a\u0001=%c!A\u0011\u0011\u0006\u0001!B\u0013\t\t\"\u0001\u0005gKR\u001c\u0007.\u001a:!\u0011\u001d\ti\u0003\u0001C!\u0003_\tQa]3u+B$\"!!\b\t\u000f\u0005M\u0002\u0001\"\u0011\u00020\u0005AA/Z1s\t><h\u000eC\u0004\u00028\u0001!\t!a\f\u0002\u0017Q,7\u000f\u001e$fi\u000eDWM\u001d\u0005\b\u0003w\u0001A\u0011AA\u0018\u0003A\t7o]3siF+X-^3F[B$\u0018\u0010C\u0004\u0002@\u0001!\t!!\u0011\u0002\u0019M,g\u000eZ'fgN\fw-Z:\u0015\u0007u\t\u0019\u0005C\u0004\u0002F\u0005u\u0002\u0019A\u000f\u0002\u001f5,7o]1hKN\u0004VM\u001d(pI\u0016Dq!!\u0013\u0001\t\u0003\tY%A\u0003gKR\u001c\u0007\u000e\u0006\u0003\u0002\u001e\u00055\u0003bBA(\u0003\u000f\u0002\r!H\u0001\tKb\u0004Xm\u0019;fI\u0002")
public class FetcherTest
extends JUnit3Suite
implements KafkaServerTestHarness {
    private final int numNodes;
    private final List<KafkaConfig> configs;
    private final HashMap<Object, Seq<byte[]>> messages;
    private final String topic;
    private final Cluster cluster;
    private final FetchedDataChunk shutdown;
    private final LinkedBlockingQueue<FetchedDataChunk> queue;
    private final List<PartitionTopicInfo> topicInfos;
    private ConsumerFetcherManager fetcher;
    private List<KafkaServer> servers;
    private final String zkConnect;
    private EmbeddedZookeeper zookeeper;
    private ZkClient zkClient;
    private final int zkConnectionTimeout;
    private final int zkSessionTimeout;

    @Override
    public List<KafkaServer> servers() {
        return this.servers;
    }

    @Override
    public void servers_$eq(List<KafkaServer> x$1) {
        this.servers = x$1;
    }

    @Override
    public void kafka$integration$KafkaServerTestHarness$$super$setUp() {
        ZooKeeperTestHarness$class.setUp(this);
    }

    @Override
    public void kafka$integration$KafkaServerTestHarness$$super$tearDown() {
        ZooKeeperTestHarness$class.tearDown(this);
    }

    @Override
    public String zkConnect() {
        return this.zkConnect;
    }

    @Override
    public EmbeddedZookeeper zookeeper() {
        return this.zookeeper;
    }

    @Override
    public void zookeeper_$eq(EmbeddedZookeeper x$1) {
        this.zookeeper = x$1;
    }

    @Override
    public ZkClient zkClient() {
        return this.zkClient;
    }

    @Override
    public void zkClient_$eq(ZkClient x$1) {
        this.zkClient = x$1;
    }

    @Override
    public int zkConnectionTimeout() {
        return this.zkConnectionTimeout;
    }

    @Override
    public int zkSessionTimeout() {
        return this.zkSessionTimeout;
    }

    @Override
    public void kafka$zk$ZooKeeperTestHarness$$super$setUp() {
        super.setUp();
    }

    @Override
    public void kafka$zk$ZooKeeperTestHarness$$super$tearDown() {
        super.tearDown();
    }

    @Override
    public void kafka$zk$ZooKeeperTestHarness$_setter_$zkConnect_$eq(String x$1) {
        this.zkConnect = x$1;
    }

    @Override
    public void kafka$zk$ZooKeeperTestHarness$_setter_$zkConnectionTimeout_$eq(int x$1) {
        this.zkConnectionTimeout = x$1;
    }

    @Override
    public void kafka$zk$ZooKeeperTestHarness$_setter_$zkSessionTimeout_$eq(int x$1) {
        this.zkSessionTimeout = x$1;
    }

    public int numNodes() {
        return this.numNodes;
    }

    @Override
    public List<KafkaConfig> configs() {
        return this.configs;
    }

    public HashMap<Object, Seq<byte[]>> messages() {
        return this.messages;
    }

    public String topic() {
        return this.topic;
    }

    public Cluster cluster() {
        return this.cluster;
    }

    public FetchedDataChunk shutdown() {
        return this.shutdown;
    }

    public LinkedBlockingQueue<FetchedDataChunk> queue() {
        return this.queue;
    }

    public List<PartitionTopicInfo> topicInfos() {
        return this.topicInfos;
    }

    public ConsumerFetcherManager fetcher() {
        return this.fetcher;
    }

    public void fetcher_$eq(ConsumerFetcherManager x$1) {
        this.fetcher = x$1;
    }

    @Override
    public void setUp() {
        KafkaServerTestHarness$class.setUp(this);
        AdminUtils$.MODULE$.createOrUpdateTopicPartitionAssignmentPathInZK(this.zkClient(), this.topic(), (Map)Map$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.any2ArrowAssoc((Object)BoxesRunTime.boxToInteger((int)0)), (Object)Seq$.MODULE$.apply((Seq)Predef$.MODULE$.wrapIntArray(new int[]{((KafkaConfig)this.configs().head()).brokerId()})))})), AdminUtils$.MODULE$.createOrUpdateTopicPartitionAssignmentPathInZK$default$4(), AdminUtils$.MODULE$.createOrUpdateTopicPartitionAssignmentPathInZK$default$5());
        TestUtils$.MODULE$.waitUntilLeaderIsElectedOrChanged(this.zkClient(), this.topic(), 0, 500L, TestUtils$.MODULE$.waitUntilLeaderIsElectedOrChanged$default$5());
        this.fetcher_$eq(new ConsumerFetcherManager("consumer1", new ConsumerConfig(TestUtils$.MODULE$.createConsumerProperties("", "", "", TestUtils$.MODULE$.createConsumerProperties$default$4())), this.zkClient()));
        this.fetcher().stopConnections();
        this.fetcher().startConnections(this.topicInfos(), this.cluster());
    }

    @Override
    public void tearDown() {
        this.fetcher().stopConnections();
        KafkaServerTestHarness$class.tearDown(this);
    }

    public void testFetcher() {
        int perNode = 2;
        int count = this.sendMessages(perNode);
        this.fetch(count);
        this.assertQueueEmpty();
        count = this.sendMessages(perNode);
        this.fetch(count);
        this.assertQueueEmpty();
    }

    public void assertQueueEmpty() {
        Assert.assertEquals((int)0, (int)this.queue().size());
    }

    public int sendMessages(int messagesPerNode) {
        IntRef count = new IntRef(0);
        this.configs().foreach((Function1)new Serializable(this, messagesPerNode, count){
            public static final long serialVersionUID = 0L;
            private final /* synthetic */ FetcherTest $outer;
            private final int messagesPerNode$1;
            private final IntRef count$1;

            public final void apply(KafkaConfig conf) {
                Producer<K, V> producer = TestUtils$.MODULE$.createProducer(TestUtils$.MODULE$.getBrokerListStrFromConfigs((Seq<KafkaConfig>)this.$outer.configs()), new DefaultEncoder(DefaultEncoder$.MODULE$.$lessinit$greater$default$1()), new StringEncoder(StringEncoder$.MODULE$.$lessinit$greater$default$1()));
                byte[][] ms = (byte[][])((TraversableOnce)RichInt$.MODULE$.until$extension0(Predef$.MODULE$.intWrapper(0), this.messagesPerNode$1).map((Function1)new Serializable(this, conf){
                    public static final long serialVersionUID = 0L;
                    private final KafkaConfig conf$1;

                    public final byte[] apply(int x) {
                        return ((Object)BoxesRunTime.boxToInteger((int)(this.conf$1.brokerId() * 5 + x))).toString().getBytes();
                    }
                    {
                        this.conf$1 = conf$1;
                    }
                }, IndexedSeq$.MODULE$.canBuildFrom())).toArray(ClassTag$.MODULE$.apply(ScalaRunTime$.MODULE$.arrayClass(Byte.TYPE)));
                this.$outer.messages().$plus$eq(Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.any2ArrowAssoc((Object)BoxesRunTime.boxToInteger((int)conf.brokerId())), (Object)Predef$.MODULE$.wrapRefArray((Object[])ms)));
                producer.send((Seq)Predef$.MODULE$.wrapRefArray((Object[])Predef$.MODULE$.refArrayOps((Object[])ms).map((Function1)new Serializable(this){
                    public static final long serialVersionUID = 0L;
                    private final /* synthetic */ $anonfun$sendMessages$1 $outer;

                    public final KeyedMessage<String, byte[]> apply(byte[] m) {
                        return new KeyedMessage(this.$outer.kafka$integration$FetcherTest$$anonfun$$$outer().topic(), (Object)this.$outer.kafka$integration$FetcherTest$$anonfun$$$outer().topic(), (Object)m);
                    }
                    {
                        if ($outer == null) {
                            throw new NullPointerException();
                        }
                        this.$outer = $outer;
                    }
                }, Array$.MODULE$.canBuildFrom(ClassTag$.MODULE$.apply(KeyedMessage.class)))));
                producer.close();
                this.count$1.elem += Predef$.MODULE$.refArrayOps((Object[])ms).size();
            }

            public /* synthetic */ FetcherTest kafka$integration$FetcherTest$$anonfun$$$outer() {
                return this.$outer;
            }
            {
                if ($outer == null) {
                    throw new NullPointerException();
                }
                this.$outer = $outer;
                this.messagesPerNode$1 = messagesPerNode$1;
                this.count$1 = count$1;
            }
        });
        return count.elem;
    }

    public void fetch(int expected) {
        IntRef count = new IntRef(0);
        do {
            FetchedDataChunk chunk = this.queue().poll(2L, TimeUnit.SECONDS);
            Assert.assertNotNull((String)new StringBuilder().append((Object)"Timed out waiting for data chunk ").append((Object)BoxesRunTime.boxToInteger((int)(count.elem + 1))).toString(), (Object)chunk);
            chunk.messages().foreach((Function1)new Serializable(this, count){
                public static final long serialVersionUID = 0L;
                private final IntRef count$2;

                public final void apply(MessageAndOffset message) {
                    ++this.count$2.elem;
                }
                {
                    this.count$2 = count$2;
                }
            });
        } while (count.elem != expected);
    }

    public FetcherTest() {
        ZooKeeperTestHarness$class.$init$(this);
        KafkaServerTestHarness$class.$init$(this);
        this.numNodes = 1;
        this.configs = (List)TestUtils$.MODULE$.createBrokerConfigs(this.numNodes()).map((Function1)new $anonfun$1(this), List$.MODULE$.canBuildFrom());
        this.messages = new HashMap();
        this.topic = "topic";
        this.cluster = new Cluster((Iterable)this.configs().map((Function1)new $anonfun$2(this), List$.MODULE$.canBuildFrom()));
        this.shutdown = ZookeeperConsumerConnector$.MODULE$.shutdownCommand();
        this.queue = new LinkedBlockingQueue();
        this.topicInfos = (List)this.configs().map((Function1)new $anonfun$3(this), List$.MODULE$.canBuildFrom());
        this.fetcher = null;
    }
}

