/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.clients;

import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import org.apache.kafka.clients.ClientRequest;
import org.apache.kafka.clients.ClientResponse;
import org.apache.kafka.clients.NetworkClient;
import org.apache.kafka.clients.producer.internals.Metadata;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.network.NetworkReceive;
import org.apache.kafka.common.network.Selectable;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.ProtoUtils;
import org.apache.kafka.common.protocol.types.Struct;
import org.apache.kafka.common.requests.MetadataRequest;
import org.apache.kafka.common.requests.ProduceRequest;
import org.apache.kafka.common.requests.RequestHeader;
import org.apache.kafka.common.requests.RequestSend;
import org.apache.kafka.common.requests.ResponseHeader;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.test.MockSelector;
import org.apache.kafka.test.TestUtils;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

public class NetworkClientTest {
    private MockTime time = new MockTime();
    private MockSelector selector = new MockSelector(this.time);
    private Metadata metadata = new Metadata(0L, Long.MAX_VALUE);
    private int nodeId = 1;
    private Cluster cluster = TestUtils.singletonCluster("test", this.nodeId);
    private Node node = (Node)this.cluster.nodes().get(0);
    private NetworkClient client = new NetworkClient((Selectable)this.selector, this.metadata, "mock", Integer.MAX_VALUE, 0L, 65536, 65536);

    @Before
    public void setup() {
        this.metadata.update(this.cluster, this.time.milliseconds());
    }

    @Test
    public void testReadyAndDisconnect() {
        ArrayList reqs = new ArrayList();
        Assert.assertFalse((String)"Client begins unready as it has no connection.", (boolean)this.client.ready(this.node, this.time.milliseconds()));
        Assert.assertEquals((String)"The connection is established as a side-effect of the readiness check", (long)1L, (long)this.selector.connected().size());
        this.client.poll(reqs, 1L, this.time.milliseconds());
        this.selector.clear();
        Assert.assertTrue((String)"Now the client is ready", (boolean)this.client.ready(this.node, this.time.milliseconds()));
        this.selector.disconnect(this.node.id());
        this.client.poll(reqs, 1L, this.time.milliseconds());
        this.selector.clear();
        Assert.assertFalse((String)"After we forced the disconnection the client is no longer ready.", (boolean)this.client.ready(this.node, this.time.milliseconds()));
        Assert.assertTrue((String)"Metadata should get updated.", (this.metadata.timeToNextUpdate(this.time.milliseconds()) == 0L ? 1 : 0) != 0);
    }

    @Test(expected=IllegalStateException.class)
    public void testSendToUnreadyNode() {
        RequestSend send = new RequestSend(5, this.client.nextRequestHeader(ApiKeys.METADATA), new MetadataRequest(Arrays.asList("test")).toStruct());
        ClientRequest request = new ClientRequest(this.time.milliseconds(), false, send, null);
        this.client.poll(Arrays.asList(request), 1L, this.time.milliseconds());
    }

    @Test
    public void testSimpleRequestResponse() {
        ProduceRequest produceRequest = new ProduceRequest(1, 1000, Collections.emptyMap());
        RequestHeader reqHeader = this.client.nextRequestHeader(ApiKeys.PRODUCE);
        RequestSend send = new RequestSend(this.node.id(), reqHeader, produceRequest.toStruct());
        ClientRequest request = new ClientRequest(this.time.milliseconds(), true, send, null);
        this.awaitReady(this.client, this.node);
        this.client.poll(Arrays.asList(request), 1L, this.time.milliseconds());
        Assert.assertEquals((long)1L, (long)this.client.inFlightRequestCount());
        ResponseHeader respHeader = new ResponseHeader(reqHeader.correlationId());
        Struct resp = new Struct(ProtoUtils.currentResponseSchema((int)ApiKeys.PRODUCE.id));
        resp.set("responses", (Object)new Object[0]);
        int size = respHeader.sizeOf() + resp.sizeOf();
        ByteBuffer buffer = ByteBuffer.allocate(size);
        respHeader.writeTo(buffer);
        resp.writeTo(buffer);
        buffer.flip();
        this.selector.completeReceive(new NetworkReceive(this.node.id(), buffer));
        List responses = this.client.poll(new ArrayList(), 1L, this.time.milliseconds());
        Assert.assertEquals((long)1L, (long)responses.size());
        ClientResponse response = (ClientResponse)responses.get(0);
        Assert.assertTrue((String)"Should have a response body.", (boolean)response.hasResponse());
        Assert.assertEquals((String)"Should be correlated to the original request", (Object)request, (Object)response.request());
    }

    private void awaitReady(NetworkClient client, Node node) {
        while (!client.ready(node, this.time.milliseconds())) {
            client.poll(new ArrayList(), 1L, this.time.milliseconds());
        }
    }
}

