/*
 * Decompiled with CFR 0.152.
 */
package kafka.common;

import kafka.common.NotificationHandler;
import kafka.common.ZkNodeChangeNotificationListener;
import kafka.zk.LiteralAclChangeStore$;
import kafka.zk.ZooKeeperTestHarness;
import org.apache.kafka.common.resource.PatternType;
import org.apache.kafka.common.resource.ResourcePattern;
import org.apache.kafka.common.resource.ResourceType;
import org.apache.kafka.common.utils.Time;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.scalactic.source.Position;
import org.scalatest.Assertions$;
import scala.None$;
import scala.Option;
import scala.Option$;
import scala.collection.Seq;
import scala.collection.SeqOps;
import scala.collection.immutable.Range;
import scala.collection.mutable.ArrayBuffer;
import scala.reflect.ScalaSignature;

@ScalaSignature(bytes="\u0006\u0005\u0005mb\u0001\u0002\r\u001a\u0001yAQ!\n\u0001\u0005\u0002\u0019Bq!\u000b\u0001C\u0002\u0013%!\u0006\u0003\u00042\u0001\u0001\u0006Ia\u000b\u0005\ne\u0001\u0001\r\u00111A\u0005\nMB\u0011b\u000e\u0001A\u0002\u0003\u0007I\u0011\u0002\u001d\t\u0013y\u0002\u0001\u0019!A!B\u0013!\u0004\"C \u0001\u0001\u0004\u0005\r\u0011\"\u0003A\u0011-\tI\u0001\u0001a\u0001\u0002\u0004%I!a\u0003\t\u0015\u0005=\u0001\u00011A\u0001B\u0003&\u0011\tC\u0004\u0002\u0012\u0001!\t%a\u0005\t\u000f\u0005\r\u0002\u0001\"\u0011\u0002\u0014!9\u0011Q\u0006\u0001\u0005\u0002\u0005M\u0001bBA\u001c\u0001\u0011\u0005\u00111\u0003\u0004\u0005\u0007\u0002!A\tC\u0003&\u001d\u0011\u00051\nC\u0004M\u001d\t\u0007I\u0011B'\t\r\tt\u0001\u0015!\u0003O\u0011\u001d\u0019g\u00021A\u0005\n\u0011Dq\u0001\u001b\bA\u0002\u0013%\u0011\u000e\u0003\u0004l\u001d\u0001\u0006K!\u001a\u0005\u0006a:!\t%\u001d\u0005\u0006u:!\ta\u001f\u0005\b\u0003\u0003qA\u0011AA\u0002\u0005\u0011R6NT8eK\u000eC\u0017M\\4f\u001d>$\u0018NZ5dCRLwN\u001c'jgR,g.\u001a:UKN$(B\u0001\u000e\u001c\u0003\u0019\u0019w.\\7p]*\tA$A\u0003lC\u001a\\\u0017m\u0001\u0001\u0014\u0005\u0001y\u0002C\u0001\u0011$\u001b\u0005\t#B\u0001\u0012\u001c\u0003\tQ8.\u0003\u0002%C\t!\"l\\8LK\u0016\u0004XM\u001d+fgRD\u0015M\u001d8fgN\fa\u0001P5oSRtD#A\u0014\u0011\u0005!\u0002Q\"A\r\u0002%\rD\u0017M\\4f\u000bb\u0004\u0018N]1uS>tWj]\u000b\u0002WA\u0011AfL\u0007\u0002[)\ta&A\u0003tG\u0006d\u0017-\u0003\u00021[\t\u0019\u0011J\u001c;\u0002'\rD\u0017M\\4f\u000bb\u0004\u0018N]1uS>tWj\u001d\u0011\u0002)9|G/\u001b4jG\u0006$\u0018n\u001c8MSN$XM\\3s+\u0005!\u0004C\u0001\u00156\u0013\t1\u0014D\u0001\u0011[W:{G-Z\"iC:<WMT8uS\u001aL7-\u0019;j_:d\u0015n\u001d;f]\u0016\u0014\u0018\u0001\u00078pi&4\u0017nY1uS>tG*[:uK:,'o\u0018\u0013fcR\u0011\u0011\b\u0010\t\u0003YiJ!aO\u0017\u0003\tUs\u0017\u000e\u001e\u0005\b{\u0015\t\t\u00111\u00015\u0003\rAH%M\u0001\u0016]>$\u0018NZ5dCRLwN\u001c'jgR,g.\u001a:!\u0003Mqw\u000e^5gS\u000e\fG/[8o\u0011\u0006tG\r\\3s+\u0005\t\u0005C\u0001\"\u000f\u001b\u0005\u0001!a\u0006+fgRtu\u000e^5gS\u000e\fG/[8o\u0011\u0006tG\r\\3s'\rqQ\t\u0013\t\u0003Y\u0019K!aR\u0017\u0003\r\u0005s\u0017PU3g!\tA\u0013*\u0003\u0002K3\t\u0019bj\u001c;jM&\u001c\u0017\r^5p]\"\u000bg\u000e\u001a7feR\t\u0011)\u0001\u0005nKN\u001c\u0018mZ3t+\u0005q\u0005cA(U-6\t\u0001K\u0003\u0002R%\u00069Q.\u001e;bE2,'BA*.\u0003)\u0019w\u000e\u001c7fGRLwN\\\u0005\u0003+B\u00131\"\u0011:sCf\u0014UO\u001a4feB\u0011q\u000bY\u0007\u00021*\u0011\u0011LW\u0001\te\u0016\u001cx.\u001e:dK*\u0011!d\u0017\u0006\u00039qS!!\u00180\u0002\r\u0005\u0004\u0018m\u00195f\u0015\u0005y\u0016aA8sO&\u0011\u0011\r\u0017\u0002\u0010%\u0016\u001cx.\u001e:dKB\u000bG\u000f^3s]\u0006IQ.Z:tC\u001e,7\u000fI\u0001\ni\"\u0014xn^*ju\u0016,\u0012!\u001a\t\u0004Y\u0019\\\u0013BA4.\u0005\u0019y\u0005\u000f^5p]\u0006iA\u000f\u001b:poNK'0Z0%KF$\"!\u000f6\t\u000fu\u001a\u0012\u0011!a\u0001K\u0006QA\u000f\u001b:poNK'0\u001a\u0011)\u0005Qi\u0007C\u0001\u0017o\u0013\tyWF\u0001\u0005w_2\fG/\u001b7f\u0003M\u0001(o\\2fgNtu\u000e^5gS\u000e\fG/[8o)\tI$\u000fC\u0003t+\u0001\u0007A/A\no_RLg-[2bi&|g.T3tg\u0006<W\rE\u0002-k^L!A^\u0017\u0003\u000b\u0005\u0013(/Y=\u0011\u00051B\u0018BA=.\u0005\u0011\u0011\u0015\u0010^3\u0002\u0011I,7-Z5wK\u0012$\u0012\u0001 \t\u0004{z4V\"\u0001*\n\u0005}\u0014&aA*fc\u0006a1/\u001a;UQJ|woU5{KR\u0019\u0011(!\u0002\t\r\u0005\u001dq\u00031\u0001,\u0003\u0015Ig\u000eZ3y\u0003]qw\u000e^5gS\u000e\fG/[8o\u0011\u0006tG\r\\3s?\u0012*\u0017\u000fF\u0002:\u0003\u001bAq!\u0010\u0005\u0002\u0002\u0003\u0007\u0011)\u0001\u000bo_RLg-[2bi&|g\u000eS1oI2,'\u000fI\u0001\u0006g\u0016$X\u000b\u001d\u000b\u0002s!\u001a!\"a\u0006\u0011\t\u0005e\u0011qD\u0007\u0003\u00037Q1!!\b_\u0003\u0015QWO\\5u\u0013\u0011\t\t#a\u0007\u0003\r\t+gm\u001c:f\u0003!!X-\u0019:E_^t\u0007fA\u0006\u0002(A!\u0011\u0011DA\u0015\u0013\u0011\tY#a\u0007\u0003\u000b\u00053G/\u001a:\u0002/Q,7\u000f\u001e)s_\u000e,7o\u001d(pi&4\u0017nY1uS>t\u0007f\u0001\u0007\u00022A!\u0011\u0011DA\u001a\u0013\u0011\t)$a\u0007\u0003\tQ+7\u000f^\u0001\u001fi\u0016\u001cHoU<bY2|wo\u001d)s_\u000e,7o]8s\u000bb\u001cW\r\u001d;j_:D3!DA\u0019\u0001")
public class ZkNodeChangeNotificationListenerTest
extends ZooKeeperTestHarness {
    private final int changeExpirationMs;
    private ZkNodeChangeNotificationListener notificationListener;
    private TestNotificationHandler notificationHandler;

    private int changeExpirationMs() {
        return this.changeExpirationMs;
    }

    private ZkNodeChangeNotificationListener notificationListener() {
        return this.notificationListener;
    }

    private void notificationListener_$eq(ZkNodeChangeNotificationListener x$1) {
        this.notificationListener = x$1;
    }

    private TestNotificationHandler notificationHandler() {
        return this.notificationHandler;
    }

    private void notificationHandler_$eq(TestNotificationHandler x$1) {
        this.notificationHandler = x$1;
    }

    @Override
    @Before
    public void setUp() {
        super.setUp();
        this.zkClient().createAclPaths();
        this.notificationHandler_$eq(new TestNotificationHandler());
    }

    @Override
    @After
    public void tearDown() {
        if (this.notificationListener() != null) {
            this.notificationListener().close();
        }
        super.tearDown();
    }

    @Test
    public void testProcessNotification() {
        ResourcePattern notificationMessage1 = new ResourcePattern(ResourceType.GROUP, "messageA", PatternType.LITERAL);
        ResourcePattern notificationMessage2 = new ResourcePattern(ResourceType.GROUP, "messageB", PatternType.LITERAL);
        this.notificationListener_$eq(new ZkNodeChangeNotificationListener(this.zkClient(), LiteralAclChangeStore$.MODULE$.aclChangePath(), "acl_changes_", (NotificationHandler)this.notificationHandler(), (long)this.changeExpirationMs(), Time.SYSTEM));
        this.notificationListener().init();
        this.zkClient().createAclChangeNotification(notificationMessage1);
        long l = 15000L;
        long l2 = 100L;
        long waitUntilTrue_startTime = System.currentTimeMillis();
        while (!ZkNodeChangeNotificationListenerTest.$anonfun$testProcessNotification$1(this, notificationMessage1)) {
            if (System.currentTimeMillis() > waitUntilTrue_startTime + l) {
                throw Assertions$.MODULE$.fail("Failed to send/process notification message in the timeout period.", new Position("TestUtils.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 845));
            }
            Thread.sleep(Math.min(l, l2));
        }
        this.zkClient().createAclChangeNotification(notificationMessage2);
        long l3 = 15000L;
        long l4 = 100L;
        long waitUntilTrue_startTime2 = System.currentTimeMillis();
        while (!ZkNodeChangeNotificationListenerTest.$anonfun$testProcessNotification$3(this, notificationMessage2)) {
            if (System.currentTimeMillis() > waitUntilTrue_startTime2 + l3) {
                throw Assertions$.MODULE$.fail("Failed to send/process notification message in the timeout period.", new Position("TestUtils.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 845));
            }
            Thread.sleep(Math.min(l3, l4));
        }
        int n = 3;
        int to$extension_end = 10;
        Range.Inclusive foreach$mVc$sp_this = new Range.Inclusive(n, to$extension_end, 1);
        if (!foreach$mVc$sp_this.isEmpty()) {
            int foreach$mVc$sp_i = foreach$mVc$sp_this.start();
            while (true) {
                ZkNodeChangeNotificationListenerTest.$anonfun$testProcessNotification$5(this, foreach$mVc$sp_i);
                if (foreach$mVc$sp_i == foreach$mVc$sp_this.scala$collection$immutable$Range$$lastElement) break;
                foreach$mVc$sp_i += foreach$mVc$sp_this.step();
            }
        }
        Object var8_11 = null;
        long l5 = 15000L;
        long l6 = 100L;
        long waitUntilTrue_startTime3 = System.currentTimeMillis();
        while (!ZkNodeChangeNotificationListenerTest.$anonfun$testProcessNotification$6(this)) {
            if (System.currentTimeMillis() > waitUntilTrue_startTime3 + l5) {
                throw Assertions$.MODULE$.fail(ZkNodeChangeNotificationListenerTest.$anonfun$testProcessNotification$7(this), new Position("TestUtils.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 845));
            }
            Thread.sleep(Math.min(l5, l6));
        }
    }

    @Test
    public void testSwallowsProcessorException() {
        this.notificationHandler().setThrowSize(2);
        this.notificationListener_$eq(new ZkNodeChangeNotificationListener(this.zkClient(), LiteralAclChangeStore$.MODULE$.aclChangePath(), "acl_changes_", (NotificationHandler)this.notificationHandler(), (long)this.changeExpirationMs(), Time.SYSTEM));
        this.notificationListener().init();
        this.zkClient().createAclChangeNotification(new ResourcePattern(ResourceType.GROUP, "messageA", PatternType.LITERAL));
        this.zkClient().createAclChangeNotification(new ResourcePattern(ResourceType.GROUP, "messageB", PatternType.LITERAL));
        this.zkClient().createAclChangeNotification(new ResourcePattern(ResourceType.GROUP, "messageC", PatternType.LITERAL));
        long l = 15000L;
        long l2 = 100L;
        long waitUntilTrue_startTime = System.currentTimeMillis();
        while (!ZkNodeChangeNotificationListenerTest.$anonfun$testSwallowsProcessorException$1(this)) {
            if (System.currentTimeMillis() > waitUntilTrue_startTime + l) {
                throw Assertions$.MODULE$.fail(ZkNodeChangeNotificationListenerTest.$anonfun$testSwallowsProcessorException$2(this), new Position("TestUtils.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 845));
            }
            Thread.sleep(Math.min(l, l2));
        }
    }

    public static final /* synthetic */ boolean $anonfun$testProcessNotification$1(ZkNodeChangeNotificationListenerTest $this, ResourcePattern notificationMessage1$1) {
        Seq<ResourcePattern> seq = $this.notificationHandler().received();
        if (seq == null) {
            throw null;
        }
        if (seq.length() == 1) {
            Object object = $this.notificationHandler().received().last();
            if (!(object != null ? !object.equals(notificationMessage1$1) : notificationMessage1$1 != null)) {
                return true;
            }
        }
        return false;
    }

    public static final /* synthetic */ String $anonfun$testProcessNotification$2() {
        return "Failed to send/process notification message in the timeout period.";
    }

    public static final /* synthetic */ boolean $anonfun$testProcessNotification$3(ZkNodeChangeNotificationListenerTest $this, ResourcePattern notificationMessage2$1) {
        Seq<ResourcePattern> seq = $this.notificationHandler().received();
        if (seq == null) {
            throw null;
        }
        if (seq.length() == 2) {
            Object object = $this.notificationHandler().received().last();
            if (!(object != null ? !object.equals(notificationMessage2$1) : notificationMessage2$1 != null)) {
                return true;
            }
        }
        return false;
    }

    public static final /* synthetic */ String $anonfun$testProcessNotification$4() {
        return "Failed to send/process notification message in the timeout period.";
    }

    public static final /* synthetic */ void $anonfun$testProcessNotification$5(ZkNodeChangeNotificationListenerTest $this, int i) {
        $this.zkClient().createAclChangeNotification(new ResourcePattern(ResourceType.GROUP, new StringBuilder(7).append("message").append(i).toString(), PatternType.LITERAL));
    }

    public static final /* synthetic */ boolean $anonfun$testProcessNotification$6(ZkNodeChangeNotificationListenerTest $this) {
        Seq<ResourcePattern> seq = $this.notificationHandler().received();
        if (seq == null) {
            throw null;
        }
        return seq.length() == 10;
    }

    public static final /* synthetic */ String $anonfun$testProcessNotification$7(ZkNodeChangeNotificationListenerTest $this) {
        return new StringBuilder(64).append("Expected 10 invocations of processNotifications, but there were ").append($this.notificationHandler().received()).toString();
    }

    public static final /* synthetic */ boolean $anonfun$testSwallowsProcessorException$1(ZkNodeChangeNotificationListenerTest $this) {
        Seq<ResourcePattern> seq = $this.notificationHandler().received();
        if (seq == null) {
            throw null;
        }
        return seq.length() == 3;
    }

    public static final /* synthetic */ String $anonfun$testSwallowsProcessorException$2(ZkNodeChangeNotificationListenerTest $this) {
        return new StringBuilder(63).append("Expected 2 invocations of processNotifications, but there were ").append($this.notificationHandler().received()).toString();
    }

    public ZkNodeChangeNotificationListenerTest() {
        this.changeExpirationMs = 1000;
    }

    public class TestNotificationHandler
    implements NotificationHandler {
        private final ArrayBuffer<ResourcePattern> messages;
        private volatile Option<Object> throwSize;

        private ArrayBuffer<ResourcePattern> messages() {
            return this.messages;
        }

        private Option<Object> throwSize() {
            return this.throwSize;
        }

        private void throwSize_$eq(Option<Object> x$1) {
            this.throwSize = x$1;
        }

        /*
         * WARNING - void declaration
         */
        public void processNotification(byte[] notificationMessage) {
            void $plus$eq_elem;
            ArrayBuffer<ResourcePattern> arrayBuffer = this.messages();
            ResourcePattern resourcePattern = LiteralAclChangeStore$.MODULE$.decode(notificationMessage);
            if (arrayBuffer == null) {
                throw null;
            }
            arrayBuffer.addOne((Object)$plus$eq_elem);
            resourcePattern = null;
            ArrayBuffer<ResourcePattern> arrayBuffer2 = this.messages();
            if (arrayBuffer2 == null) {
                throw null;
            }
            if (this.throwSize().contains((Object)SeqOps.size$(arrayBuffer2))) {
                throw new RuntimeException("Oh no, my processing failed!");
            }
        }

        public Seq<ResourcePattern> received() {
            return this.messages();
        }

        public void setThrowSize(int index) {
            this.throwSize_$eq((Option<Object>)Option$.MODULE$.apply((Object)index));
        }

        public /* synthetic */ ZkNodeChangeNotificationListenerTest kafka$common$ZkNodeChangeNotificationListenerTest$TestNotificationHandler$$$outer() {
            return ZkNodeChangeNotificationListenerTest.this;
        }

        public TestNotificationHandler() {
            if (ZkNodeChangeNotificationListenerTest.this == null) {
                throw null;
            }
            this.messages = new ArrayBuffer();
            this.throwSize = None$.MODULE$;
        }
    }
}

