/*
 * Decompiled with CFR 0.152.
 */
package kafka.server;

import java.io.File;
import java.io.Serializable;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Properties;
import kafka.api.LeaderAndIsr$;
import kafka.cluster.Broker;
import kafka.controller.ControllerChannelManager;
import kafka.controller.ControllerContext;
import kafka.controller.StateChangeLogger;
import kafka.server.HostedPartition;
import kafka.server.KafkaBroker;
import kafka.server.KafkaConfig;
import kafka.server.KafkaConfig$;
import kafka.server.KafkaServer;
import kafka.server.QuorumTestHarness;
import kafka.utils.TestUtils$;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.message.LeaderAndIsrRequestData;
import org.apache.kafka.common.message.StopReplicaRequestData;
import org.apache.kafka.common.message.UpdateMetadataRequestData;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.network.ListenerName;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.requests.AbstractControlRequest;
import org.apache.kafka.common.requests.LeaderAndIsrRequest;
import org.apache.kafka.common.requests.StopReplicaRequest;
import org.apache.kafka.common.requests.UpdateMetadataRequest;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.apache.kafka.common.utils.Time;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInfo;
import scala.;
import scala.$less$colon$less$;
import scala.Function1;
import scala.MatchError;
import scala.None$;
import scala.Option;
import scala.Predef;
import scala.Predef$;
import scala.Tuple2;
import scala.collection.Iterable;
import scala.collection.IterableOnceOps;
import scala.collection.IterableOps;
import scala.collection.Map;
import scala.collection.Seq;
import scala.collection.Set;
import scala.collection.mutable.Buffer;
import scala.jdk.CollectionConverters$;
import scala.package$;
import scala.reflect.ScalaSignature;
import scala.runtime.BooleanRef;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;
import scala.runtime.RichLong$;
import scala.runtime.ScalaRunTime$;
import scala.runtime.VolatileBooleanRef;

@ScalaSignature(bytes="\u0006\u0005\u0005uc\u0001B\u000b\u0017\u0001mAQ\u0001\t\u0001\u0005\u0002\u0005Bqa\t\u0001C\u0002\u0013\u0005A\u0005\u0003\u0004,\u0001\u0001\u0006I!\n\u0005\bY\u0001\u0011\r\u0011\"\u0001%\u0011\u0019i\u0003\u0001)A\u0005K!9a\u0006\u0001a\u0001\n\u0003y\u0003bB \u0001\u0001\u0004%\t\u0001\u0011\u0005\u0007\r\u0002\u0001\u000b\u0015\u0002\u0019\t\u000b\u001d\u0003A\u0011\t%\t\u000bm\u0003A\u0011\t/\t\u000b\u0005\u0004A\u0011\u0001/\t\u000b\u0019\u0004A\u0011\u0001/\t\u000b!\u0004A\u0011\u0001/\t\u000b)\u0004A\u0011\u0001/\t\u000b1\u0004A\u0011\u0001/\t\u000b9\u0004A\u0011B8\t\u000bU\u0004A\u0011\u0002<\t\u000b]\u0004A\u0011\u0002=\t\u000f\u0005\r\u0001\u0001\"\u0003\u0002\u0006!9\u00111\n\u0001\u0005\n\u00055#A\u0007\"s_.,'/\u00129pG\"Le\u000e^3he\u0006$\u0018n\u001c8UKN$(BA\f\u0019\u0003\u0019\u0019XM\u001d<fe*\t\u0011$A\u0003lC\u001a\\\u0017m\u0001\u0001\u0014\u0005\u0001a\u0002CA\u000f\u001f\u001b\u00051\u0012BA\u0010\u0017\u0005E\tVo\u001c:v[R+7\u000f\u001e%be:,7o]\u0001\u0007y%t\u0017\u000e\u001e \u0015\u0003\t\u0002\"!\b\u0001\u0002\u0013\t\u0014xn[3s\u0013\u0012\fT#A\u0013\u0011\u0005\u0019JS\"A\u0014\u000b\u0003!\nQa]2bY\u0006L!AK\u0014\u0003\u0007%sG/\u0001\u0006ce>\\WM]%ec\u0001\n\u0011B\u0019:pW\u0016\u0014\u0018\n\u001a\u001a\u0002\u0015\t\u0014xn[3s\u0013\u0012\u0014\u0004%A\u0004tKJ4XM]:\u0016\u0003A\u00022!M\u001d=\u001d\t\u0011tG\u0004\u00024m5\tAG\u0003\u000265\u00051AH]8pizJ\u0011\u0001K\u0005\u0003q\u001d\nq\u0001]1dW\u0006<W-\u0003\u0002;w\t\u00191+Z9\u000b\u0005a:\u0003CA\u000f>\u0013\tqdCA\u0006LC\u001a\\\u0017mU3sm\u0016\u0014\u0018aC:feZ,'o]0%KF$\"!\u0011#\u0011\u0005\u0019\u0012\u0015BA\"(\u0005\u0011)f.\u001b;\t\u000f\u0015;\u0011\u0011!a\u0001a\u0005\u0019\u0001\u0010J\u0019\u0002\u0011M,'O^3sg\u0002\nQa]3u+B$\"!Q%\t\u000b)K\u0001\u0019A&\u0002\u0011Q,7\u000f^%oM>\u0004\"\u0001T+\u000e\u00035S!AT(\u0002\u0007\u0005\u0004\u0018N\u0003\u0002Q#\u00069!.\u001e9ji\u0016\u0014(B\u0001*T\u0003\u0015QWO\\5u\u0015\u0005!\u0016aA8sO&\u0011a+\u0014\u0002\t)\u0016\u001cH/\u00138g_\"\u0012\u0011\u0002\u0017\t\u0003\u0019fK!AW'\u0003\u0015\t+gm\u001c:f\u000b\u0006\u001c\u0007.\u0001\u0005uK\u0006\u0014Hi\\<o)\u0005\t\u0005F\u0001\u0006_!\tau,\u0003\u0002a\u001b\nI\u0011I\u001a;fe\u0016\u000b7\r[\u0001+i\u0016\u001cHOU3qY&\u001c\u0017-T1oC\u001e,'O\u0011:pW\u0016\u0014X\t]8dQ6\u000bGo\u00195fg^KG\u000f\u001b.lQ\tY1\r\u0005\u0002MI&\u0011Q-\u0014\u0002\u0005)\u0016\u001cH/A\u0016uKN$8i\u001c8ue>dG.\u001a:Ce>\\WM]#q_\u000eD7)Y2iK6\u000bGo\u00195fg^KG\u000f\u001b.lQ\ta1-\u0001\u0015uKN$8i\u001c8ue>d'+Z9vKN$x+\u001b;i\u0007>\u0014(/Z2u\u0005J|7.\u001a:Fa>\u001c\u0007\u000e\u000b\u0002\u000eG\u00061C/Z:u\u0007>tGO]8m%\u0016\fX/Z:u/&$\bn\u0015;bY\u0016\u0014%o\\6fe\u0016\u0003xn\u00195)\u00059\u0019\u0017A\n;fgR\u001cuN\u001c;s_2\u0014V-];fgR<\u0016\u000e\u001e5OK^,'O\u0011:pW\u0016\u0014X\t]8dQ\"\u0012qbY\u0001\"i\u0016\u001cHoQ8oiJ|GNU3rk\u0016\u001cHoV5uQ\n\u0013xn[3s\u000bB|7\r\u001b\u000b\u0003\u0003BDQ!\u001d\tA\u0002I\f!%\u001a9pG\"LeNU3rk\u0016\u001cH\u000fR5gM\u001a\u0013x.\\\"veJ,g\u000e^#q_\u000eD\u0007C\u0001\u0014t\u0013\t!xE\u0001\u0003M_:<\u0017!D4fi\u000e{g\u000e\u001e:pY2,'/F\u0001=\u00035\u001a\u0007.Z2l\u0007>tGO]8mY\u0016\u0014(I]8lKJ,\u0005o\\2ig\u000e\u000b7\r[3NCR\u001c\u0007.Z:XSRD'l\u001b\u000b\u0003\u0003fDQA\u001f\nA\u0002m\f\u0011cY8oiJ|G\u000e\\3s\u0007>tG/\u001a=u!\tax0D\u0001~\u0015\tq\b$\u0001\u0006d_:$(o\u001c7mKJL1!!\u0001~\u0005E\u0019uN\u001c;s_2dWM]\"p]R,\u0007\u0010^\u0001(g\u0016tG-\u00118e-\u0016\u0014\u0018NZ=Ti\u0006dWM\u0011:pW\u0016\u0014X\t]8dQ&s'+Z:q_:\u001cX\rF\u0003B\u0003\u000f\t\t\u0002C\u0004\u0002\nM\u0001\r!a\u0003\u00021\r|g\u000e\u001e:pY2,'o\u00115b]:,G.T1oC\u001e,'\u000fE\u0002}\u0003\u001bI1!a\u0004~\u0005a\u0019uN\u001c;s_2dWM]\"iC:tW\r\\'b]\u0006<WM\u001d\u0005\b\u0003'\u0019\u0002\u0019AA\u000b\u0003\u001d\u0011W/\u001b7eKJ\u0004D!a\u0006\u0002:A1\u0011\u0011DA\u0018\u0003kqA!a\u0007\u0002,5\u0011\u0011Q\u0004\u0006\u0005\u0003?\t\t#\u0001\u0005sKF,Xm\u001d;t\u0015\u0011\t\u0019#!\n\u0002\r\r|W.\\8o\u0015\rI\u0012q\u0005\u0006\u0004\u0003S\u0019\u0016AB1qC\u000eDW-\u0003\u0003\u0002.\u0005u\u0011AF!cgR\u0014\u0018m\u0019;D_:$(o\u001c7SKF,Xm\u001d;\n\t\u0005E\u00121\u0007\u0002\b\u0005VLG\u000eZ3s\u0015\u0011\ti#!\b\u0011\t\u0005]\u0012\u0011\b\u0007\u0001\t1\tY$!\u0005\u0002\u0002\u0003\u0005)\u0011AA\u001f\u0005\ryF%M\t\u0005\u0003\u007f\t)\u0005E\u0002'\u0003\u0003J1!a\u0011(\u0005\u001dqu\u000e\u001e5j]\u001e\u0004B!a\u0007\u0002H%!\u0011\u0011JA\u000f\u0005Y\t%m\u001d;sC\u000e$8i\u001c8ue>d'+Z9vKN$\u0018aH:f]\u0012\fe\u000e\u001a,fe&4\u0017pU;dG\u0016\u001c8OZ;m%\u0016\u001c\bo\u001c8tKR)\u0011)a\u0014\u0002R!9\u0011\u0011\u0002\u000bA\u0002\u0005-\u0001bBA\n)\u0001\u0007\u00111\u000b\u0019\u0005\u0003+\nI\u0006\u0005\u0004\u0002\u001a\u0005=\u0012q\u000b\t\u0005\u0003o\tI\u0006\u0002\u0007\u0002\\\u0005E\u0013\u0011!A\u0001\u0006\u0003\tiDA\u0002`II\u0002")
public class BrokerEpochIntegrationTest
extends QuorumTestHarness {
    private final int brokerId1;
    private final int brokerId2;
    private scala.collection.immutable.Seq<KafkaServer> servers = (scala.collection.immutable.Seq)package$.MODULE$.Seq().empty();

    public int brokerId1() {
        return this.brokerId1;
    }

    public int brokerId2() {
        return this.brokerId2;
    }

    public scala.collection.immutable.Seq<KafkaServer> servers() {
        return this.servers;
    }

    public void servers_$eq(scala.collection.immutable.Seq<KafkaServer> x$1) {
        this.servers = x$1;
    }

    @Override
    @BeforeEach
    public void setUp(TestInfo testInfo) {
        super.setUp(testInfo);
        Object[] objectArray = new Properties[2];
        objectArray[0] = TestUtils$.MODULE$.createBrokerConfig(this.brokerId1(), this.zkConnect(), true, true, TestUtils$.MODULE$.RandomPort(), (Option<SecurityProtocol>)None$.MODULE$, (Option<File>)None$.MODULE$, (Option<Properties>)None$.MODULE$, true, false, TestUtils$.MODULE$.RandomPort(), false, TestUtils$.MODULE$.RandomPort(), false, TestUtils$.MODULE$.RandomPort(), (Option<String>)None$.MODULE$, 1, false, 1, (short)1);
        objectArray[1] = TestUtils$.MODULE$.createBrokerConfig(this.brokerId2(), this.zkConnect(), true, true, TestUtils$.MODULE$.RandomPort(), (Option<SecurityProtocol>)None$.MODULE$, (Option<File>)None$.MODULE$, (Option<Properties>)None$.MODULE$, true, false, TestUtils$.MODULE$.RandomPort(), false, TestUtils$.MODULE$.RandomPort(), false, TestUtils$.MODULE$.RandomPort(), (Option<String>)None$.MODULE$, 1, false, 1, (short)1);
        scala.collection.immutable.Seq configs = (scala.collection.immutable.Seq)package$.MODULE$.Seq().apply((scala.collection.immutable.Seq)ScalaRunTime$.MODULE$.wrapRefArray(objectArray));
        configs.foreach((Function1 & Serializable)config -> config.setProperty(KafkaConfig$.MODULE$.AutoLeaderRebalanceEnableProp(), Boolean.toString(false)));
        this.servers_$eq((scala.collection.immutable.Seq<KafkaServer>)((scala.collection.immutable.Seq)configs.map((Function1 & Serializable)config -> {
            void createServer_time;
            Time time;
            Time time2 = time = Time.SYSTEM;
            time = null;
            Time time3 = time2;
            KafkaConfig createServer_config = KafkaConfig$.MODULE$.fromProps(config);
            TestUtils$ createServer_this = TestUtils$.MODULE$;
            None$ createServer_createServer_threadNamePrefix = None$.MODULE$;
            return createServer_this.createServer(createServer_config, (Time)createServer_time, (Option<String>)createServer_createServer_threadNamePrefix, true);
        })));
    }

    @Override
    @AfterEach
    public void tearDown() {
        TestUtils$.MODULE$.shutdownServers(this.servers(), true);
        super.tearDown();
    }

    @Test
    public void testReplicaManagerBrokerEpochMatchesWithZk() {
        Map brokerAndEpochs = this.zkClient().getAllBrokerAndEpochsInCluster();
        Assertions.assertEquals((int)brokerAndEpochs.size(), (int)this.servers().size());
        brokerAndEpochs.foreach((Function1 & Serializable)x0$1 -> {
            BrokerEpochIntegrationTest.$anonfun$testReplicaManagerBrokerEpochMatchesWithZk$1(this, x0$1);
            return BoxedUnit.UNIT;
        });
    }

    @Test
    public void testControllerBrokerEpochCacheMatchesWithZk() {
        KafkaServer controller = this.getController();
        KafkaServer otherBroker = (KafkaServer)this.servers().find((Function1 & Serializable)e -> BoxesRunTime.boxToBoolean((boolean)BrokerEpochIntegrationTest.$anonfun$testControllerBrokerEpochCacheMatchesWithZk$1(controller, e))).get();
        this.checkControllerBrokerEpochsCacheMatchesWithZk(controller.kafkaController().controllerContext());
        otherBroker.shutdown();
        this.checkControllerBrokerEpochsCacheMatchesWithZk(controller.kafkaController().controllerContext());
        otherBroker.startup();
        this.checkControllerBrokerEpochsCacheMatchesWithZk(controller.kafkaController().controllerContext());
    }

    @Test
    public void testControlRequestWithCorrectBrokerEpoch() {
        this.testControlRequestWithBrokerEpoch(0L);
    }

    @Test
    public void testControlRequestWithStaleBrokerEpoch() {
        this.testControlRequestWithBrokerEpoch(-1L);
    }

    @Test
    public void testControlRequestWithNewerBrokerEpoch() {
        this.testControlRequestWithBrokerEpoch(1L);
    }

    private void testControlRequestWithBrokerEpoch(long epochInRequestDiffFromCurrentEpoch) {
        TopicPartition tp = new TopicPartition("new-topic", 0);
        TestUtils$.MODULE$.createTopic(this.zkClient(), tp.topic(), (Map<Object, Seq<Object>>)((Map)Predef$.MODULE$.Map().apply((scala.collection.immutable.Seq)ScalaRunTime$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)BoxesRunTime.boxToInteger((int)0)), (Object)package$.MODULE$.Seq().apply((scala.collection.immutable.Seq)ScalaRunTime$.MODULE$.wrapIntArray(new int[]{this.brokerId1(), this.brokerId2()})))}))), (Seq<KafkaServer>)this.servers());
        java.util.Map topicIds = CollectionConverters$.MODULE$.MapHasAsJava((Map)this.getController().kafkaController().controllerContext().topicIds().toMap((.less.colon.less)$less$colon$less$.MODULE$.refl())).asJava();
        int controllerId = 2;
        int controllerEpoch = ((Tuple2)this.zkClient().getControllerEpoch().get())._1$mcI$sp();
        KafkaConfig controllerConfig = KafkaConfig$.MODULE$.fromProps(TestUtils$.MODULE$.createBrokerConfig(controllerId, this.zkConnect(), true, true, TestUtils$.MODULE$.RandomPort(), (Option<SecurityProtocol>)None$.MODULE$, (Option<File>)None$.MODULE$, (Option<Properties>)None$.MODULE$, true, false, TestUtils$.MODULE$.RandomPort(), false, TestUtils$.MODULE$.RandomPort(), false, TestUtils$.MODULE$.RandomPort(), (Option<String>)None$.MODULE$, 1, false, 1, (short)1));
        SecurityProtocol securityProtocol = SecurityProtocol.PLAINTEXT;
        ListenerName listenerName = ListenerName.forSecurityProtocol((SecurityProtocol)securityProtocol);
        scala.collection.immutable.Map brokerAndEpochs = ((IterableOnceOps)this.servers().map((Function1 & Serializable)s -> new Tuple2((Object)new Broker(s.config().brokerId(), "localhost", TestUtils$.MODULE$.boundPort((KafkaBroker)s, SecurityProtocol.PLAINTEXT), listenerName, securityProtocol), (Object)BoxesRunTime.boxToLong((long)s.kafkaController().brokerEpoch())))).toMap((.less.colon.less)$less$colon$less$.MODULE$.refl());
        Iterable nodes = (Iterable)brokerAndEpochs.keys().map((Function1 & Serializable)x$1 -> x$1.node(listenerName));
        ControllerContext controllerContext = new ControllerContext();
        controllerContext.setLiveBrokers((Map)brokerAndEpochs);
        Metrics metrics = new Metrics();
        ControllerChannelManager controllerChannelManager = new ControllerChannelManager(controllerContext, controllerConfig, Time.SYSTEM, metrics, new StateChangeLogger(controllerId, true, (Option)None$.MODULE$), (Option)None$.MODULE$);
        controllerChannelManager.startup();
        KafkaServer broker2 = (KafkaServer)this.servers().apply(this.brokerId2());
        long epochInRequest = broker2.kafkaController().brokerEpoch() + epochInRequestDiffFromCurrentEpoch;
        try {
            scala.collection.immutable.Seq partitionStates = (scala.collection.immutable.Seq)package$.MODULE$.Seq().apply((scala.collection.immutable.Seq)ScalaRunTime$.MODULE$.wrapRefArray((Object[])new LeaderAndIsrRequestData.LeaderAndIsrPartitionState[]{new LeaderAndIsrRequestData.LeaderAndIsrPartitionState().setTopicName(tp.topic()).setPartitionIndex(tp.partition()).setControllerEpoch(controllerEpoch).setLeader(this.brokerId2()).setLeaderEpoch(LeaderAndIsr$.MODULE$.initialLeaderEpoch() + 1).setIsr(CollectionConverters$.MODULE$.SeqHasAsJava((Seq)package$.MODULE$.Seq().apply((scala.collection.immutable.Seq)ScalaRunTime$.MODULE$.wrapIntArray(new int[]{this.brokerId1(), this.brokerId2()})).map((Function1 & Serializable)x$1 -> BoxesRunTime.unboxToInt((Object)x$1))).asJava()).setZkVersion(LeaderAndIsr$.MODULE$.initialZKVersion()).setReplicas(CollectionConverters$.MODULE$.SeqHasAsJava((Seq)package$.MODULE$.Seq().apply((scala.collection.immutable.Seq)ScalaRunTime$.MODULE$.wrapIntArray(new int[]{0, 1})).map((Function1 & Serializable)x$1 -> BoxesRunTime.unboxToInt((Object)x$1))).asJava()).setIsNew(false)}));
            LeaderAndIsrRequest.Builder requestBuilder = new LeaderAndIsrRequest.Builder(ApiKeys.LEADER_AND_ISR.latestVersion(), controllerId, controllerEpoch, epochInRequest, CollectionConverters$.MODULE$.SeqHasAsJava((Seq)partitionStates).asJava(), topicIds, (Collection)CollectionConverters$.MODULE$.SetHasAsJava((Set)nodes.toSet()).asJava());
            if (epochInRequestDiffFromCurrentEpoch < 0L) {
                this.sendAndVerifyStaleBrokerEpochInResponse(controllerChannelManager, (AbstractControlRequest.Builder<? extends AbstractControlRequest>)requestBuilder);
            } else {
                this.sendAndVerifySuccessfulResponse(controllerChannelManager, (AbstractControlRequest.Builder<? extends AbstractControlRequest>)requestBuilder);
                TestUtils$.MODULE$.waitUntilLeaderIsKnown((Seq)package$.MODULE$.Seq().apply((scala.collection.immutable.Seq)ScalaRunTime$.MODULE$.wrapRefArray((Object[])new KafkaServer[]{broker2})), tp, 10000L);
            }
            scala.collection.immutable.Seq partitionStates2 = (scala.collection.immutable.Seq)package$.MODULE$.Seq().apply((scala.collection.immutable.Seq)ScalaRunTime$.MODULE$.wrapRefArray((Object[])new UpdateMetadataRequestData.UpdateMetadataPartitionState[]{new UpdateMetadataRequestData.UpdateMetadataPartitionState().setTopicName(tp.topic()).setPartitionIndex(tp.partition()).setControllerEpoch(controllerEpoch).setLeader(this.brokerId2()).setLeaderEpoch(LeaderAndIsr$.MODULE$.initialLeaderEpoch() + 1).setIsr(CollectionConverters$.MODULE$.SeqHasAsJava((Seq)package$.MODULE$.Seq().apply((scala.collection.immutable.Seq)ScalaRunTime$.MODULE$.wrapIntArray(new int[]{this.brokerId1(), this.brokerId2()})).map((Function1 & Serializable)x$1 -> BoxesRunTime.unboxToInt((Object)x$1))).asJava()).setZkVersion(LeaderAndIsr$.MODULE$.initialZKVersion()).setReplicas(CollectionConverters$.MODULE$.SeqHasAsJava((Seq)package$.MODULE$.Seq().apply((scala.collection.immutable.Seq)ScalaRunTime$.MODULE$.wrapIntArray(new int[]{0, 1})).map((Function1 & Serializable)x$1 -> BoxesRunTime.unboxToInt((Object)x$1))).asJava())}));
            Buffer liveBrokers = ((IterableOnceOps)brokerAndEpochs.map((Function1 & Serializable)x0$1 -> {
                if (x0$1 == null) {
                    throw new MatchError(null);
                }
                Broker broker = (Broker)x0$1._1();
                SecurityProtocol securityProtocol = SecurityProtocol.PLAINTEXT;
                ListenerName listenerName = ListenerName.forSecurityProtocol((SecurityProtocol)securityProtocol);
                Node node = broker.node(listenerName);
                scala.collection.immutable.Seq endpoints = (scala.collection.immutable.Seq)package$.MODULE$.Seq().apply((scala.collection.immutable.Seq)ScalaRunTime$.MODULE$.wrapRefArray((Object[])new UpdateMetadataRequestData.UpdateMetadataEndpoint[]{new UpdateMetadataRequestData.UpdateMetadataEndpoint().setHost(node.host()).setPort(node.port()).setSecurityProtocol(securityProtocol.id).setListener(listenerName.value())}));
                UpdateMetadataRequestData.UpdateMetadataBroker updateMetadataBroker = new UpdateMetadataRequestData.UpdateMetadataBroker().setId(broker.id()).setEndpoints(CollectionConverters$.MODULE$.SeqHasAsJava((Seq)endpoints).asJava()).setRack((String)broker.rack().orNull((.less.colon.less)$less$colon$less$.MODULE$.refl()));
                return updateMetadataBroker;
            })).toBuffer();
            UpdateMetadataRequest.Builder requestBuilder2 = new UpdateMetadataRequest.Builder(ApiKeys.UPDATE_METADATA.latestVersion(), controllerId, controllerEpoch, epochInRequest, CollectionConverters$.MODULE$.SeqHasAsJava((Seq)partitionStates2).asJava(), CollectionConverters$.MODULE$.BufferHasAsJava(liveBrokers).asJava(), Collections.emptyMap());
            if (epochInRequestDiffFromCurrentEpoch < 0L) {
                this.sendAndVerifyStaleBrokerEpochInResponse(controllerChannelManager, (AbstractControlRequest.Builder<? extends AbstractControlRequest>)requestBuilder2);
            } else {
                this.sendAndVerifySuccessfulResponse(controllerChannelManager, (AbstractControlRequest.Builder<? extends AbstractControlRequest>)requestBuilder2);
                TestUtils$.MODULE$.waitForPartitionMetadata((Seq)package$.MODULE$.Seq().apply((scala.collection.immutable.Seq)ScalaRunTime$.MODULE$.wrapRefArray((Object[])new KafkaServer[]{broker2})), tp.topic(), tp.partition(), 10000L);
                Assertions.assertEquals((int)this.brokerId2(), (int)((UpdateMetadataRequestData.UpdateMetadataPartitionState)broker2.metadataCache().getPartitionInfo(tp.topic(), tp.partition()).get()).leader());
            }
            List topicStates = CollectionConverters$.MODULE$.SeqHasAsJava((Seq)package$.MODULE$.Seq().apply((scala.collection.immutable.Seq)ScalaRunTime$.MODULE$.wrapRefArray((Object[])new StopReplicaRequestData.StopReplicaTopicState[]{new StopReplicaRequestData.StopReplicaTopicState().setTopicName(tp.topic()).setPartitionStates(CollectionConverters$.MODULE$.SeqHasAsJava((Seq)package$.MODULE$.Seq().apply((scala.collection.immutable.Seq)ScalaRunTime$.MODULE$.wrapRefArray((Object[])new StopReplicaRequestData.StopReplicaPartitionState[]{new StopReplicaRequestData.StopReplicaPartitionState().setPartitionIndex(tp.partition()).setLeaderEpoch(LeaderAndIsr$.MODULE$.initialLeaderEpoch() + 2).setDeletePartition(true)}))).asJava())}))).asJava();
            StopReplicaRequest.Builder requestBuilder3 = new StopReplicaRequest.Builder(ApiKeys.STOP_REPLICA.latestVersion(), controllerId, controllerEpoch, epochInRequest, false, topicStates);
            if (epochInRequestDiffFromCurrentEpoch < 0L) {
                this.sendAndVerifyStaleBrokerEpochInResponse(controllerChannelManager, (AbstractControlRequest.Builder<? extends AbstractControlRequest>)requestBuilder3);
            } else {
                this.sendAndVerifySuccessfulResponse(controllerChannelManager, (AbstractControlRequest.Builder<? extends AbstractControlRequest>)requestBuilder3);
                Assertions.assertEquals((Object)HostedPartition.None$.MODULE$, (Object)broker2.replicaManager().getPartition(tp));
            }
        }
        finally {
            controllerChannelManager.shutdown();
            metrics.close();
        }
    }

    private KafkaServer getController() {
        int controllerId = TestUtils$.MODULE$.waitUntilControllerElected(this.zkClient(), 15000L);
        return (KafkaServer)((IterableOps)this.servers().filter((Function1 & Serializable)s -> BoxesRunTime.boxToBoolean((boolean)BrokerEpochIntegrationTest.$anonfun$getController$1(controllerId, s)))).head();
    }

    private void checkControllerBrokerEpochsCacheMatchesWithZk(ControllerContext controllerContext) {
        Map brokerAndEpochs = this.zkClient().getAllBrokerAndEpochsInCluster();
        long l = 15000L;
        long l2 = 100L;
        long waitUntilTrue_startTime = System.currentTimeMillis();
        while (!BrokerEpochIntegrationTest.$anonfun$checkControllerBrokerEpochsCacheMatchesWithZk$1(controllerContext, brokerAndEpochs)) {
            if (System.currentTimeMillis() > waitUntilTrue_startTime + l) {
                Assertions.fail((String)"Broker epoch mismatches");
            }
            Thread.sleep(RichLong$.MODULE$.min$extension(Predef$.MODULE$.longWrapper(l), l2));
        }
    }

    private void sendAndVerifyStaleBrokerEpochInResponse(ControllerChannelManager controllerChannelManager, AbstractControlRequest.Builder<? extends AbstractControlRequest> builder) {
        BooleanRef staleBrokerEpochDetected = BooleanRef.create((boolean)false);
        controllerChannelManager.sendRequest(this.brokerId2(), builder, (Function1 & Serializable)response -> {
            staleBrokerEpochDetected.elem = response.errorCounts().containsKey(Errors.STALE_BROKER_EPOCH);
            return BoxedUnit.UNIT;
        });
        long l = 15000L;
        long l2 = 100L;
        long waitUntilTrue_startTime = System.currentTimeMillis();
        while (!staleBrokerEpochDetected.elem) {
            if (System.currentTimeMillis() > waitUntilTrue_startTime + l) {
                Assertions.fail((String)"Broker epoch should be stale");
            }
            Thread.sleep(RichLong$.MODULE$.min$extension(Predef$.MODULE$.longWrapper(l), l2));
        }
        Assertions.assertTrue((boolean)staleBrokerEpochDetected.elem, (String)"Stale broker epoch not detected by the broker");
    }

    private void sendAndVerifySuccessfulResponse(ControllerChannelManager controllerChannelManager, AbstractControlRequest.Builder<? extends AbstractControlRequest> builder) {
        VolatileBooleanRef succeed = VolatileBooleanRef.create((boolean)false);
        controllerChannelManager.sendRequest(this.brokerId2(), builder, (Function1 & Serializable)response -> {
            succeed.elem = response.errorCounts().isEmpty() || response.errorCounts().containsKey(Errors.NONE) && response.errorCounts().size() == 1;
            return BoxedUnit.UNIT;
        });
        long l = 15000L;
        long l2 = 100L;
        long waitUntilTrue_startTime = System.currentTimeMillis();
        while (!succeed.elem) {
            if (System.currentTimeMillis() > waitUntilTrue_startTime + l) {
                Assertions.fail((String)"Should receive response with no errors");
            }
            Thread.sleep(RichLong$.MODULE$.min$extension(Predef$.MODULE$.longWrapper(l), l2));
        }
    }

    public static final /* synthetic */ boolean $anonfun$testReplicaManagerBrokerEpochMatchesWithZk$2(Broker broker$1, KafkaServer e) {
        return e.config().brokerId() == broker$1.id();
    }

    public static final /* synthetic */ void $anonfun$testReplicaManagerBrokerEpochMatchesWithZk$1(BrokerEpochIntegrationTest $this, Tuple2 x0$1) {
        if (x0$1 != null) {
            Broker broker = (Broker)x0$1._1();
            long epoch = x0$1._2$mcJ$sp();
            Option brokerServer = $this.servers().find((Function1 & Serializable)e -> BoxesRunTime.boxToBoolean((boolean)BrokerEpochIntegrationTest.$anonfun$testReplicaManagerBrokerEpochMatchesWithZk$2(broker, e)));
            Assertions.assertTrue((boolean)brokerServer.isDefined());
            Assertions.assertEquals((long)epoch, (long)((KafkaServer)brokerServer.get()).kafkaController().brokerEpoch());
            return;
        }
        throw new MatchError(null);
    }

    public static final /* synthetic */ boolean $anonfun$testControllerBrokerEpochCacheMatchesWithZk$1(KafkaServer controller$1, KafkaServer e) {
        return e.config().brokerId() != controller$1.config().brokerId();
    }

    public static final /* synthetic */ boolean $anonfun$getController$1(int controllerId$1, KafkaServer s) {
        return s.config().brokerId() == controllerId$1;
    }

    public static final /* synthetic */ boolean $anonfun$checkControllerBrokerEpochsCacheMatchesWithZk$2(Map brokerEpochsInControllerContext$1, Tuple2 x0$1) {
        if (x0$1 == null) {
            throw new MatchError(null);
        }
        Broker broker = (Broker)x0$1._1();
        long epoch = x0$1._2$mcJ$sp();
        boolean bl = brokerEpochsInControllerContext$1.get((Object)BoxesRunTime.boxToInteger((int)broker.id())).contains((Object)BoxesRunTime.boxToLong((long)epoch));
        return bl;
    }

    public static final /* synthetic */ boolean $anonfun$checkControllerBrokerEpochsCacheMatchesWithZk$1(ControllerContext controllerContext$1, Map brokerAndEpochs$1) {
        Map brokerEpochsInControllerContext = controllerContext$1.liveBrokerIdAndEpochs();
        if (brokerAndEpochs$1.size() != brokerEpochsInControllerContext.size()) {
            return false;
        }
        return brokerAndEpochs$1.forall((Function1 & Serializable)x0$1 -> BoxesRunTime.boxToBoolean((boolean)BrokerEpochIntegrationTest.$anonfun$checkControllerBrokerEpochsCacheMatchesWithZk$2(brokerEpochsInControllerContext, x0$1)));
    }

    public static final /* synthetic */ String $anonfun$checkControllerBrokerEpochsCacheMatchesWithZk$3() {
        return "Broker epoch mismatches";
    }

    public static final /* synthetic */ boolean $anonfun$sendAndVerifyStaleBrokerEpochInResponse$2(BooleanRef staleBrokerEpochDetected$1) {
        return staleBrokerEpochDetected$1.elem;
    }

    public static final /* synthetic */ String $anonfun$sendAndVerifyStaleBrokerEpochInResponse$3() {
        return "Broker epoch should be stale";
    }

    public static final /* synthetic */ boolean $anonfun$sendAndVerifySuccessfulResponse$2(VolatileBooleanRef succeed$1) {
        return succeed$1.elem;
    }

    public static final /* synthetic */ String $anonfun$sendAndVerifySuccessfulResponse$3() {
        return "Should receive response with no errors";
    }

    public BrokerEpochIntegrationTest() {
        this.brokerId1 = 0;
        this.brokerId2 = 1;
    }
}

