/*
 * Decompiled with CFR 0.152.
 */
package integration.kafka.network;

import java.io.Serializable;
import java.util.Properties;
import kafka.server.BaseRequestTest;
import kafka.server.Defaults$;
import kafka.server.KafkaBroker;
import kafka.server.KafkaConfig$;
import kafka.server.KafkaServer;
import kafka.utils.TestUtils$;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.network.ListenerName;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInfo;
import scala.Function1;
import scala.Tuple2;
import scala.collection.Seq;
import scala.collection.TraversableLike;
import scala.collection.TraversableOnce;
import scala.jdk.CollectionConverters$;
import scala.math.package$;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxesRunTime;
import scala.runtime.LongRef;

@ScalaSignature(bytes="\u0006\u0001\u0005Ub\u0001B\b\u0011\u0001]AQa\b\u0001\u0005\u0002\u0001BQa\t\u0001\u0005B\u0011Bqa\u000b\u0001C\u0002\u0013\u0005A\u0006\u0003\u00046\u0001\u0001\u0006I!\f\u0005\bm\u0001\u0011\r\u0011\"\u0001-\u0011\u00199\u0004\u0001)A\u0005[!)\u0001\b\u0001C!s!)Q\t\u0001C!\r\")\u0011\f\u0001C\u00015\")q\r\u0001C\u0001Q\")Q\u000e\u0001C\u0005]\")a\u000f\u0001C\u0005o\"9\u0011q\u0001\u0001\u0005\n\u0005%\u0001\"CA\u000f\u0001E\u0005I\u0011BA\u0010\u0005q!\u0015P\\1nS\u000etU/\u001c(fi^|'o\u001b+ie\u0016\fGm\u001d+fgRT!!\u0005\n\u0002\u000f9,Go^8sW*\u00111\u0003F\u0001\u0006W\u000647.\u0019\u0006\u0002+\u0005Y\u0011N\u001c;fOJ\fG/[8o\u0007\u0001\u0019\"\u0001\u0001\r\u0011\u0005eiR\"\u0001\u000e\u000b\u0005ma\u0012AB:feZ,'OC\u0001\u0014\u0013\tq\"DA\bCCN,'+Z9vKN$H+Z:u\u0003\u0019a\u0014N\\5u}Q\t\u0011\u0005\u0005\u0002#\u00015\t\u0001#A\u0006ce>\\WM]\"pk:$X#A\u0013\u0011\u0005\u0019JS\"A\u0014\u000b\u0003!\nQa]2bY\u0006L!AK\u0014\u0003\u0007%sG/\u0001\u0005j]R,'O\\1m+\u0005i\u0003C\u0001\u00184\u001b\u0005y#B\u0001\u00192\u0003\u0011a\u0017M\\4\u000b\u0003I\nAA[1wC&\u0011Ag\f\u0002\u0007'R\u0014\u0018N\\4\u0002\u0013%tG/\u001a:oC2\u0004\u0013\u0001C3yi\u0016\u0014h.\u00197\u0002\u0013\u0015DH/\u001a:oC2\u0004\u0013a\u00062s_.,'\u000f\u0015:pa\u0016\u0014H/_(wKJ\u0014\u0018\u000eZ3t)\tQT\b\u0005\u0002'w%\u0011Ah\n\u0002\u0005+:LG\u000fC\u0003?\u000f\u0001\u0007q(\u0001\u0006qe>\u0004XM\u001d;jKN\u0004\"\u0001Q\"\u000e\u0003\u0005S!AQ\u0019\u0002\tU$\u0018\u000e\\\u0005\u0003\t\u0006\u0013!\u0002\u0015:pa\u0016\u0014H/[3t\u0003\u0015\u0019X\r^+q)\tQt\tC\u0003I\u0011\u0001\u0007\u0011*\u0001\u0005uKN$\u0018J\u001c4p!\tQ5+D\u0001L\u0015\taU*A\u0002ba&T!AT(\u0002\u000f),\b/\u001b;fe*\u0011\u0001+U\u0001\u0006UVt\u0017\u000e\u001e\u0006\u0002%\u0006\u0019qN]4\n\u0005Q[%\u0001\u0003+fgRLeNZ8)\u0005!1\u0006C\u0001&X\u0013\tA6J\u0001\u0006CK\u001a|'/Z#bG\"\fAcZ3u\u001dVlg*\u001a;x_J\\G\u000b\u001b:fC\u0012\u001cHCA\u0013\\\u0011\u0015a\u0016\u00021\u0001^\u0003!a\u0017n\u001d;f]\u0016\u0014\bC\u00010f\u001d\ty6\r\u0005\u0002aO5\t\u0011M\u0003\u0002c-\u00051AH]8pizJ!\u0001Z\u0014\u0002\rA\u0013X\rZ3g\u0013\t!dM\u0003\u0002eO\u0005aB/Z:u\tft\u0017-\\5d\u001dVlg*\u001a;x_J\\G\u000b\u001b:fC\u0012\u001cH#\u0001\u001e)\u0005)Q\u0007C\u0001&l\u0013\ta7J\u0001\u0003UKN$\u0018A\u0005:fG>tg-[4ve\u0016\u001cVM\u001d<feN$2AO8r\u0011\u0015\u00018\u00021\u0001@\u0003!qWm\u001e)s_B\u001c\b\"\u0002:\f\u0001\u0004\u0019\u0018!D1Qe>\u0004Hk\u001c,fe&4\u0017\u0010\u0005\u0003'ivk\u0016BA;(\u0005\u0019!V\u000f\u001d7fe\u0005\t2M]3bi\u0016\fE-\\5o\u00072LWM\u001c;\u0015\u0003a\u00042!_A\u0002\u001b\u0005Q(BA>}\u0003\u0015\tG-\\5o\u0015\tih0A\u0004dY&,g\u000e^:\u000b\u0005My(bAA\u0001#\u00061\u0011\r]1dQ\u0016L1!!\u0002{\u0005\u0015\tE-\\5o\u0003U9\u0018-\u001b;G_J\u001cuN\u001c4jO>s7+\u001a:wKJ$rAOA\u0006\u0003\u001f\t\u0019\u0002\u0003\u0004\u0002\u000e5\u0001\r!X\u0001\taJ|\u0007OT1nK\"1\u0011\u0011C\u0007A\u0002u\u000b\u0011\u0002\u001d:paZ\u000bG.^3\t\u0013\u0005UQ\u0002%AA\u0002\u0005]\u0011!C7bq^\u000b\u0017\u000e^'t!\r1\u0013\u0011D\u0005\u0004\u000379#\u0001\u0002'p]\u001e\fqd^1ji\u001a{'oQ8oM&<wJ\\*feZ,'\u000f\n3fM\u0006,H\u000e\u001e\u00134+\t\t\tC\u000b\u0003\u0002\u0018\u0005\r2FAA\u0013!\u0011\t9#!\r\u000e\u0005\u0005%\"\u0002BA\u0016\u0003[\t\u0011\"\u001e8dQ\u0016\u001c7.\u001a3\u000b\u0007\u0005=r%\u0001\u0006b]:|G/\u0019;j_:LA!a\r\u0002*\t\tRO\\2iK\u000e\\W\r\u001a,be&\fgnY3")
public class DynamicNumNetworkThreadsTest
extends BaseRequestTest {
    private final String internal;
    private final String external;

    @Override
    public int brokerCount() {
        return 1;
    }

    public String internal() {
        return this.internal;
    }

    public String external() {
        return this.external;
    }

    @Override
    public void brokerPropertyOverrides(Properties properties) {
        properties.put(KafkaConfig$.MODULE$.ListenersProp(), new StringBuilder(30).append(this.internal()).append("://localhost:0, ").append(this.external()).append("://localhost:0").toString());
        properties.put(KafkaConfig$.MODULE$.ListenerSecurityProtocolMapProp(), new StringBuilder(22).append(this.internal()).append(":PLAINTEXT, ").append(this.external()).append(":PLAINTEXT").toString());
        properties.put(new StringBuilder(15).append("listener.name.").append(this.internal().toLowerCase()).append(".").append(KafkaConfig$.MODULE$.NumNetworkThreadsProp()).toString(), "2");
    }

    @Override
    @BeforeEach
    public void setUp(TestInfo testInfo) {
        super.setUp(testInfo);
        TestUtils$.MODULE$.createTopic(this.zkClient(), "test", this.brokerCount(), this.brokerCount(), (Seq<KafkaServer>)this.servers(), TestUtils$.MODULE$.createTopic$default$6());
        Assertions.assertEquals((int)2, (int)this.getNumNetworkThreads(this.internal()));
        Assertions.assertEquals((int)Defaults$.MODULE$.NumNetworkThreads(), (int)this.getNumNetworkThreads(this.external()));
    }

    public int getNumNetworkThreads(String listener) {
        return ((TraversableOnce)((TraversableLike)CollectionConverters$.MODULE$.asScalaSetConverter(((KafkaBroker)this.brokers().head()).metrics().metrics().keySet()).asScala()).filter((Function1 & Serializable & scala.Serializable)x$1 -> BoxesRunTime.boxToBoolean((boolean)DynamicNumNetworkThreadsTest.$anonfun$getNumNetworkThreads$1(x$1)))).count((Function1 & Serializable & scala.Serializable)x$2 -> BoxesRunTime.boxToBoolean((boolean)DynamicNumNetworkThreadsTest.$anonfun$getNumNetworkThreads$2(listener, x$2)));
    }

    @Test
    public void testDynamicNumNetworkThreads() {
        int newBaseNetworkThreadsCount = Defaults$.MODULE$.NumNetworkThreads() + 1;
        Properties props = new Properties();
        props.put(KafkaConfig$.MODULE$.NumNetworkThreadsProp(), Integer.toString(newBaseNetworkThreadsCount));
        this.reconfigureServers(props, (Tuple2<String, String>)new Tuple2((Object)KafkaConfig$.MODULE$.NumNetworkThreadsProp(), (Object)Integer.toString(newBaseNetworkThreadsCount)));
        Assertions.assertEquals((int)2, (int)this.getNumNetworkThreads(this.internal()));
        Assertions.assertEquals((int)newBaseNetworkThreadsCount, (int)this.getNumNetworkThreads(this.external()));
        int newInternalNetworkThreadsCount = 3;
        props = new Properties();
        props.put(new StringBuilder(15).append("listener.name.").append(this.internal().toLowerCase()).append(".").append(KafkaConfig$.MODULE$.NumNetworkThreadsProp()).toString(), Integer.toString(newInternalNetworkThreadsCount));
        this.reconfigureServers(props, (Tuple2<String, String>)new Tuple2((Object)new StringBuilder(15).append("listener.name.").append(this.internal().toLowerCase()).append(".").append(KafkaConfig$.MODULE$.NumNetworkThreadsProp()).toString(), (Object)Integer.toString(newInternalNetworkThreadsCount)));
        Assertions.assertEquals((int)newInternalNetworkThreadsCount, (int)this.getNumNetworkThreads(this.internal()));
        Assertions.assertEquals((int)newBaseNetworkThreadsCount, (int)this.getNumNetworkThreads(this.external()));
    }

    private void reconfigureServers(Properties newProps, Tuple2<String, String> aPropToVerify) {
        Admin adminClient = this.createAdminClient();
        TestUtils$.MODULE$.incrementalAlterConfigs(this.servers(), adminClient, newProps, false, TestUtils$.MODULE$.incrementalAlterConfigs$default$5()).all().get();
        this.waitForConfigOnServer((String)aPropToVerify._1(), (String)aPropToVerify._2(), this.waitForConfigOnServer$default$3());
        adminClient.close();
    }

    private Admin createAdminClient() {
        String bootstrapServers = TestUtils$.MODULE$.bootstrapServers(this.servers(), new ListenerName(this.securityProtocol().name));
        Properties config = new Properties();
        config.put("bootstrap.servers", bootstrapServers);
        config.put("metadata.max.age.ms", "10");
        return Admin.create((Properties)config);
    }

    private void waitForConfigOnServer(String propName, String propValue, long maxWaitMs) {
        TestUtils$ testUtils$ = TestUtils$.MODULE$;
        if (testUtils$ == null) {
            throw null;
        }
        TestUtils$ retry_this = testUtils$;
        LongRef retry_wait = LongRef.create((long)1L);
        long retry_startTime = System.currentTimeMillis();
        while (true) {
            try {
                DynamicNumNetworkThreadsTest.$anonfun$waitForConfigOnServer$1(this, propValue, propName);
            }
            catch (AssertionError retry_e) {
                if (System.currentTimeMillis() - retry_startTime > maxWaitMs) {
                    throw retry_e;
                }
                if (retry_this.logger().underlying().isInfoEnabled()) {
                    retry_this.logger().underlying().info(retry_this.msgWithLogIdent(TestUtils$.$anonfun$retry$1(retry_wait)));
                }
                Thread.sleep(retry_wait.elem);
                retry_wait.elem += package$.MODULE$.min(retry_wait.elem, 1000L);
                continue;
            }
            break;
        }
    }

    private long waitForConfigOnServer$default$3() {
        return 10000L;
    }

    public static final /* synthetic */ boolean $anonfun$getNumNetworkThreads$1(MetricName x$1) {
        String string = x$1.name();
        String string2 = "request-rate";
        return string != null && string.equals(string2);
    }

    public static final /* synthetic */ boolean $anonfun$getNumNetworkThreads$2(String listener$1, MetricName x$2) {
        String string = listener$1;
        Object v = x$2.tags().get("listener");
        return !(string != null ? !string.equals(v) : v != null);
    }

    public static final /* synthetic */ void $anonfun$waitForConfigOnServer$1(DynamicNumNetworkThreadsTest $this, String propValue$1, String propName$1) {
        Assertions.assertEquals((Object)propValue$1, ((KafkaServer)$this.servers().head()).config().originals().get(propName$1));
    }

    public DynamicNumNetworkThreadsTest() {
        this.internal = "PLAINTEXT";
        this.external = "EXTERNAL";
    }
}

