/*
 * Decompiled with CFR 0.152.
 */
package kafka.server;

import java.util.Collections;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.TimeUnit;
import kafka.cluster.Broker;
import kafka.cluster.EndPoint;
import kafka.server.BrokerToControllerQueueItem;
import kafka.server.BrokerToControllerRequestThread;
import kafka.server.KafkaConfig;
import kafka.server.MetadataCache;
import kafka.utils.TestUtils$;
import org.apache.kafka.clients.KafkaClient;
import org.apache.kafka.clients.ManualMetadataUpdater;
import org.apache.kafka.clients.Metadata;
import org.apache.kafka.clients.MockClient;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.feature.Features;
import org.apache.kafka.common.message.MetadataRequestData;
import org.apache.kafka.common.network.ListenerName;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.requests.AbstractRequest;
import org.apache.kafka.common.requests.AbstractResponse;
import org.apache.kafka.common.requests.MetadataRequest;
import org.apache.kafka.common.requests.MetadataResponse;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.apache.kafka.common.utils.SystemTime;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.test.TestUtils;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.Mockito;
import scala.None$;
import scala.Option;
import scala.Some;
import scala.collection.Seq;
import scala.collection.immutable.;
import scala.collection.immutable.List;
import scala.collection.immutable.Nil$;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxesRunTime;

@ScalaSignature(bytes="\u0006\u0001!2A!\u0002\u0004\u0001\u0017!)!\u0003\u0001C\u0001'!)a\u0003\u0001C\u0001/!)A\u0005\u0001C\u0001/!)a\u0005\u0001C\u0001/\t\u0019#I]8lKJ$vnQ8oiJ|G\u000e\\3s%\u0016\fX/Z:u)\"\u0014X-\u00193UKN$(BA\u0004\t\u0003\u0019\u0019XM\u001d<fe*\t\u0011\"A\u0003lC\u001a\\\u0017m\u0001\u0001\u0014\u0005\u0001a\u0001CA\u0007\u0011\u001b\u0005q!\"A\b\u0002\u000bM\u001c\u0017\r\\1\n\u0005Eq!AB!osJ+g-\u0001\u0004=S:LGO\u0010\u000b\u0002)A\u0011Q\u0003A\u0007\u0002\r\u0005\u0001B/Z:u%\u0016\fX/Z:ugN+g\u000e\u001e\u000b\u00021A\u0011Q\"G\u0005\u000359\u0011A!\u00168ji\"\u0012!\u0001\b\t\u0003;\tj\u0011A\b\u0006\u0003?\u0001\nQA[;oSRT\u0011!I\u0001\u0004_J<\u0017BA\u0012\u001f\u0005\u0011!Vm\u001d;\u0002+Q,7\u000f^\"p]R\u0014x\u000e\u001c7fe\u000eC\u0017M\\4fI\"\u00121\u0001H\u0001\u0012i\u0016\u001cHOT8u\u0007>tGO]8mY\u0016\u0014\bF\u0001\u0003\u001d\u0001")
public class BrokerToControllerRequestThreadTest {
    @Test
    public void testRequestsSent() {
        SystemTime time = new SystemTime();
        KafkaConfig config = new KafkaConfig((Map)TestUtils$.MODULE$.createBrokerConfig(1, "localhost:2181", TestUtils$.MODULE$.createBrokerConfig$default$3(), TestUtils$.MODULE$.createBrokerConfig$default$4(), TestUtils$.MODULE$.createBrokerConfig$default$5(), TestUtils$.MODULE$.createBrokerConfig$default$6(), TestUtils$.MODULE$.createBrokerConfig$default$7(), TestUtils$.MODULE$.createBrokerConfig$default$8(), TestUtils$.MODULE$.createBrokerConfig$default$9(), TestUtils$.MODULE$.createBrokerConfig$default$10(), TestUtils$.MODULE$.createBrokerConfig$default$11(), TestUtils$.MODULE$.createBrokerConfig$default$12(), TestUtils$.MODULE$.createBrokerConfig$default$13(), TestUtils$.MODULE$.createBrokerConfig$default$14(), TestUtils$.MODULE$.createBrokerConfig$default$15(), TestUtils$.MODULE$.createBrokerConfig$default$16(), TestUtils$.MODULE$.createBrokerConfig$default$17(), TestUtils$.MODULE$.createBrokerConfig$default$18(), TestUtils$.MODULE$.createBrokerConfig$default$19(), TestUtils$.MODULE$.createBrokerConfig$default$20()));
        int controllerId = 2;
        Metadata metadata = (Metadata)Mockito.mock(Metadata.class);
        MockClient mockClient = new MockClient((Time)time, metadata);
        LinkedBlockingDeque<BrokerToControllerQueueItem> requestQueue = new LinkedBlockingDeque<BrokerToControllerQueueItem>();
        MetadataCache metadataCache = (MetadataCache)Mockito.mock(MetadataCache.class);
        ListenerName listenerName = ListenerName.forSecurityProtocol((SecurityProtocol)SecurityProtocol.PLAINTEXT);
        Broker activeController = new Broker(controllerId, (Seq)new .colon.colon((Object)new EndPoint("host", 1234, listenerName, SecurityProtocol.PLAINTEXT), (List)Nil$.MODULE$), (Option)None$.MODULE$, Features.emptySupportedFeatures());
        Mockito.when((Object)metadataCache.getControllerId()).thenReturn((Object)new Some((Object)BoxesRunTime.boxToInteger((int)controllerId)));
        Mockito.when((Object)metadataCache.getAliveBrokers()).thenReturn((Object)new .colon.colon((Object)activeController, (List)Nil$.MODULE$));
        Mockito.when((Object)metadataCache.getAliveBroker(controllerId)).thenReturn((Object)new Some((Object)activeController));
        MetadataResponse expectedResponse = TestUtils.metadataUpdateWith((int)2, Collections.singletonMap("a", new Integer(2)));
        BrokerToControllerRequestThread testRequestThread = new BrokerToControllerRequestThread((KafkaClient)mockClient, new ManualMetadataUpdater(), requestQueue, metadataCache, config, listenerName, (Time)time, "");
        mockClient.prepareResponse((AbstractResponse)expectedResponse);
        CountDownLatch responseLatch = new CountDownLatch(1);
        BrokerToControllerQueueItem queueItem = new BrokerToControllerQueueItem((AbstractRequest.Builder)new MetadataRequest.Builder(new MetadataRequestData()), response -> {
            Assert.assertEquals((Object)expectedResponse, (Object)response.responseBody());
            responseLatch.countDown();
        });
        requestQueue.put(queueItem);
        testRequestThread.doWork();
        testRequestThread.doWork();
        Assert.assertTrue((boolean)responseLatch.await(10L, TimeUnit.SECONDS));
    }

    @Test
    public void testControllerChanged() {
        SystemTime time = new SystemTime();
        KafkaConfig config = new KafkaConfig((Map)TestUtils$.MODULE$.createBrokerConfig(1, "localhost:2181", TestUtils$.MODULE$.createBrokerConfig$default$3(), TestUtils$.MODULE$.createBrokerConfig$default$4(), TestUtils$.MODULE$.createBrokerConfig$default$5(), TestUtils$.MODULE$.createBrokerConfig$default$6(), TestUtils$.MODULE$.createBrokerConfig$default$7(), TestUtils$.MODULE$.createBrokerConfig$default$8(), TestUtils$.MODULE$.createBrokerConfig$default$9(), TestUtils$.MODULE$.createBrokerConfig$default$10(), TestUtils$.MODULE$.createBrokerConfig$default$11(), TestUtils$.MODULE$.createBrokerConfig$default$12(), TestUtils$.MODULE$.createBrokerConfig$default$13(), TestUtils$.MODULE$.createBrokerConfig$default$14(), TestUtils$.MODULE$.createBrokerConfig$default$15(), TestUtils$.MODULE$.createBrokerConfig$default$16(), TestUtils$.MODULE$.createBrokerConfig$default$17(), TestUtils$.MODULE$.createBrokerConfig$default$18(), TestUtils$.MODULE$.createBrokerConfig$default$19(), TestUtils$.MODULE$.createBrokerConfig$default$20()));
        int oldControllerId = 1;
        int newControllerId = 2;
        Metadata metadata = (Metadata)Mockito.mock(Metadata.class);
        MockClient mockClient = new MockClient((Time)time, metadata);
        LinkedBlockingDeque<BrokerToControllerQueueItem> requestQueue = new LinkedBlockingDeque<BrokerToControllerQueueItem>();
        MetadataCache metadataCache = (MetadataCache)Mockito.mock(MetadataCache.class);
        ListenerName listenerName = ListenerName.forSecurityProtocol((SecurityProtocol)SecurityProtocol.PLAINTEXT);
        Broker oldController = new Broker(oldControllerId, (Seq)new .colon.colon((Object)new EndPoint("host1", 1234, listenerName, SecurityProtocol.PLAINTEXT), (List)Nil$.MODULE$), (Option)None$.MODULE$, Features.emptySupportedFeatures());
        Node oldControllerNode = oldController.node(listenerName);
        Broker newController = new Broker(newControllerId, (Seq)new .colon.colon((Object)new EndPoint("host2", 1234, listenerName, SecurityProtocol.PLAINTEXT), (List)Nil$.MODULE$), (Option)None$.MODULE$, Features.emptySupportedFeatures());
        Mockito.when((Object)metadataCache.getControllerId()).thenReturn((Object)new Some((Object)BoxesRunTime.boxToInteger((int)oldControllerId)), (Object[])new Option[]{new Some((Object)BoxesRunTime.boxToInteger((int)newControllerId))});
        Mockito.when((Object)metadataCache.getAliveBroker(oldControllerId)).thenReturn((Object)new Some((Object)oldController));
        Mockito.when((Object)metadataCache.getAliveBroker(newControllerId)).thenReturn((Object)new Some((Object)newController));
        Mockito.when((Object)metadataCache.getAliveBrokers()).thenReturn((Object)new .colon.colon((Object)oldController, (List)new .colon.colon((Object)newController, (List)Nil$.MODULE$)));
        MetadataResponse expectedResponse = TestUtils.metadataUpdateWith((int)3, Collections.singletonMap("a", new Integer(2)));
        BrokerToControllerRequestThread testRequestThread = new BrokerToControllerRequestThread((KafkaClient)mockClient, new ManualMetadataUpdater(), requestQueue, metadataCache, config, listenerName, (Time)time, "");
        CountDownLatch responseLatch = new CountDownLatch(1);
        BrokerToControllerQueueItem queueItem = new BrokerToControllerQueueItem((AbstractRequest.Builder)new MetadataRequest.Builder(new MetadataRequestData()), response -> {
            Assert.assertEquals((Object)expectedResponse, (Object)response.responseBody());
            responseLatch.countDown();
        });
        requestQueue.put(queueItem);
        mockClient.prepareResponse((AbstractResponse)expectedResponse);
        testRequestThread.doWork();
        Assert.assertFalse((boolean)requestQueue.isEmpty());
        Assert.assertEquals((long)1L, (long)requestQueue.size());
        Assert.assertEquals((Object)queueItem, requestQueue.peek());
        mockClient.setUnreachable(oldControllerNode, time.milliseconds() + 5000L);
        testRequestThread.doWork();
        Assert.assertFalse((boolean)requestQueue.isEmpty());
        Assert.assertEquals((long)1L, (long)requestQueue.size());
        testRequestThread.doWork();
        testRequestThread.doWork();
        Assert.assertTrue((boolean)responseLatch.await(10L, TimeUnit.SECONDS));
    }

    @Test
    public void testNotController() {
        SystemTime time = new SystemTime();
        KafkaConfig config = new KafkaConfig((Map)TestUtils$.MODULE$.createBrokerConfig(1, "localhost:2181", TestUtils$.MODULE$.createBrokerConfig$default$3(), TestUtils$.MODULE$.createBrokerConfig$default$4(), TestUtils$.MODULE$.createBrokerConfig$default$5(), TestUtils$.MODULE$.createBrokerConfig$default$6(), TestUtils$.MODULE$.createBrokerConfig$default$7(), TestUtils$.MODULE$.createBrokerConfig$default$8(), TestUtils$.MODULE$.createBrokerConfig$default$9(), TestUtils$.MODULE$.createBrokerConfig$default$10(), TestUtils$.MODULE$.createBrokerConfig$default$11(), TestUtils$.MODULE$.createBrokerConfig$default$12(), TestUtils$.MODULE$.createBrokerConfig$default$13(), TestUtils$.MODULE$.createBrokerConfig$default$14(), TestUtils$.MODULE$.createBrokerConfig$default$15(), TestUtils$.MODULE$.createBrokerConfig$default$16(), TestUtils$.MODULE$.createBrokerConfig$default$17(), TestUtils$.MODULE$.createBrokerConfig$default$18(), TestUtils$.MODULE$.createBrokerConfig$default$19(), TestUtils$.MODULE$.createBrokerConfig$default$20()));
        int oldControllerId = 1;
        int newControllerId = 2;
        Metadata metadata = (Metadata)Mockito.mock(Metadata.class);
        MockClient mockClient = new MockClient((Time)time, metadata);
        LinkedBlockingDeque<BrokerToControllerQueueItem> requestQueue = new LinkedBlockingDeque<BrokerToControllerQueueItem>();
        MetadataCache metadataCache = (MetadataCache)Mockito.mock(MetadataCache.class);
        ListenerName listenerName = ListenerName.forSecurityProtocol((SecurityProtocol)SecurityProtocol.PLAINTEXT);
        Broker oldController = new Broker(oldControllerId, (Seq)new .colon.colon((Object)new EndPoint("host1", 1234, listenerName, SecurityProtocol.PLAINTEXT), (List)Nil$.MODULE$), (Option)None$.MODULE$, Features.emptySupportedFeatures());
        Broker newController = new Broker(2, (Seq)new .colon.colon((Object)new EndPoint("host2", 1234, listenerName, SecurityProtocol.PLAINTEXT), (List)Nil$.MODULE$), (Option)None$.MODULE$, Features.emptySupportedFeatures());
        Mockito.when((Object)metadataCache.getControllerId()).thenReturn((Object)new Some((Object)BoxesRunTime.boxToInteger((int)oldControllerId)), (Object[])new Option[]{new Some((Object)BoxesRunTime.boxToInteger((int)newControllerId))});
        Mockito.when((Object)metadataCache.getAliveBrokers()).thenReturn((Object)new .colon.colon((Object)oldController, (List)new .colon.colon((Object)newController, (List)Nil$.MODULE$)));
        Mockito.when((Object)metadataCache.getAliveBroker(oldControllerId)).thenReturn((Object)new Some((Object)oldController));
        Mockito.when((Object)metadataCache.getAliveBroker(newControllerId)).thenReturn((Object)new Some((Object)newController));
        MetadataResponse responseWithNotControllerError = TestUtils.metadataUpdateWith((String)"cluster1", (int)2, Collections.singletonMap("a", Errors.NOT_CONTROLLER), Collections.singletonMap("a", new Integer(2)));
        MetadataResponse expectedResponse = TestUtils.metadataUpdateWith((int)3, Collections.singletonMap("a", new Integer(2)));
        BrokerToControllerRequestThread testRequestThread = new BrokerToControllerRequestThread((KafkaClient)mockClient, new ManualMetadataUpdater(), requestQueue, metadataCache, config, listenerName, (Time)time, "");
        CountDownLatch responseLatch = new CountDownLatch(1);
        BrokerToControllerQueueItem queueItem = new BrokerToControllerQueueItem((AbstractRequest.Builder)new MetadataRequest.Builder(new MetadataRequestData().setAllowAutoTopicCreation(true)), response -> {
            Assert.assertEquals((Object)expectedResponse, (Object)response.responseBody());
            responseLatch.countDown();
        });
        requestQueue.put(queueItem);
        testRequestThread.doWork();
        mockClient.prepareResponse(body -> body instanceof MetadataRequest && ((MetadataRequest)body).allowAutoTopicCreation(), (AbstractResponse)responseWithNotControllerError);
        testRequestThread.doWork();
        testRequestThread.doWork();
        mockClient.prepareResponse((AbstractResponse)expectedResponse);
        testRequestThread.doWork();
        Assert.assertTrue((boolean)responseLatch.await(10L, TimeUnit.SECONDS));
    }
}

