/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.common.network;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.Iterator;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.network.ChannelBuilder;
import org.apache.kafka.common.network.ChannelState;
import org.apache.kafka.common.network.EchoServer;
import org.apache.kafka.common.network.NetworkReceive;
import org.apache.kafka.common.network.NetworkSend;
import org.apache.kafka.common.network.PlaintextChannelBuilder;
import org.apache.kafka.common.network.Selector;
import org.apache.kafka.common.network.Send;
import org.apache.kafka.common.protocol.SecurityProtocol;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.test.TestUtils;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

public class SelectorTest {
    protected static final int BUFFER_SIZE = 4096;
    protected EchoServer server;
    protected Time time;
    protected Selector selector;
    protected ChannelBuilder channelBuilder;
    private Metrics metrics;

    @Before
    public void setUp() throws Exception {
        HashMap configs = new HashMap();
        this.server = new EchoServer(SecurityProtocol.PLAINTEXT, configs);
        this.server.start();
        this.time = new MockTime();
        this.channelBuilder = new PlaintextChannelBuilder();
        this.channelBuilder.configure(configs);
        this.metrics = new Metrics();
        this.selector = new Selector(5000L, this.metrics, this.time, "MetricGroup", this.channelBuilder);
    }

    @After
    public void tearDown() throws Exception {
        this.selector.close();
        this.server.close();
        this.metrics.close();
    }

    @Test
    public void testServerDisconnect() throws Exception {
        String node = "0";
        this.blockingConnect(node);
        Assert.assertEquals((Object)"hello", (Object)this.blockingRequest(node, "hello"));
        this.server.closeConnections();
        while (!this.selector.disconnected().containsKey(node)) {
            this.selector.poll(1000L);
        }
        this.blockingConnect(node);
        Assert.assertEquals((Object)"hello", (Object)this.blockingRequest(node, "hello"));
    }

    @Test(expected=IllegalStateException.class)
    public void testCantSendWithInProgress() throws Exception {
        String node = "0";
        this.blockingConnect(node);
        this.selector.send((Send)this.createSend(node, "test1"));
        this.selector.send((Send)this.createSend(node, "test2"));
        this.selector.poll(1000L);
    }

    @Test(expected=IllegalStateException.class)
    public void testCantSendWithoutConnecting() throws Exception {
        this.selector.send((Send)this.createSend("0", "test"));
        this.selector.poll(1000L);
    }

    @Test(expected=IOException.class)
    public void testNoRouteToHost() throws Exception {
        this.selector.connect("0", new InetSocketAddress("some.invalid.hostname.foo.bar.local", this.server.port), 4096, 4096);
    }

    @Test
    public void testConnectionRefused() throws Exception {
        String node = "0";
        ServerSocket nonListeningSocket = new ServerSocket(0);
        int nonListeningPort = nonListeningSocket.getLocalPort();
        this.selector.connect(node, new InetSocketAddress("localhost", nonListeningPort), 4096, 4096);
        while (this.selector.disconnected().containsKey(node)) {
            Assert.assertEquals((Object)ChannelState.NOT_CONNECTED, this.selector.disconnected().get(node));
            this.selector.poll(1000L);
        }
        nonListeningSocket.close();
    }

    @Test
    public void testNormalOperation() throws Exception {
        int conns = 5;
        int reqs = 500;
        InetSocketAddress addr = new InetSocketAddress("localhost", this.server.port);
        for (int i = 0; i < conns; ++i) {
            this.connect(Integer.toString(i), addr);
        }
        HashMap<String, Integer> requests = new HashMap<String, Integer>();
        HashMap<String, Integer> responses = new HashMap<String, Integer>();
        int responseCount = 0;
        for (int i = 0; i < conns; ++i) {
            String node = Integer.toString(i);
            this.selector.send((Send)this.createSend(node, node + "-0"));
        }
        while (responseCount < conns * reqs) {
            this.selector.poll(0L);
            Assert.assertEquals((String)"No disconnects should have occurred.", (long)0L, (long)this.selector.disconnected().size());
            for (NetworkReceive receive : this.selector.completedReceives()) {
                String[] pieces = this.asString(receive).split("-");
                Assert.assertEquals((String)"Should be in the form 'conn-counter'", (long)2L, (long)pieces.length);
                Assert.assertEquals((String)"Check the source", (Object)receive.source(), (Object)pieces[0]);
                Assert.assertEquals((String)"Check that the receive has kindly been rewound", (long)0L, (long)receive.payload().position());
                if (responses.containsKey(receive.source())) {
                    Assert.assertEquals((String)"Check the request counter", (long)((Integer)responses.get(receive.source())).intValue(), (long)Integer.parseInt(pieces[1]));
                    responses.put(receive.source(), (Integer)responses.get(receive.source()) + 1);
                } else {
                    Assert.assertEquals((String)"Check the request counter", (long)0L, (long)Integer.parseInt(pieces[1]));
                    responses.put(receive.source(), 1);
                }
                ++responseCount;
            }
            for (Send send : this.selector.completedSends()) {
                String dest = send.destination();
                if (requests.containsKey(dest)) {
                    requests.put(dest, (Integer)requests.get(dest) + 1);
                } else {
                    requests.put(dest, 1);
                }
                if ((Integer)requests.get(dest) >= reqs) continue;
                this.selector.send((Send)this.createSend(dest, dest + "-" + requests.get(dest)));
            }
        }
    }

    @Test
    public void testSendLargeRequest() throws Exception {
        String node = "0";
        this.blockingConnect(node);
        String big = TestUtils.randomString(40960);
        Assert.assertEquals((Object)big, (Object)this.blockingRequest(node, big));
    }

    @Test
    public void testLargeMessageSequence() throws Exception {
        int bufferSize = 524288;
        String node = "0";
        int reqs = 50;
        InetSocketAddress addr = new InetSocketAddress("localhost", this.server.port);
        this.connect(node, addr);
        String requestPrefix = TestUtils.randomString(bufferSize);
        this.sendAndReceive(node, requestPrefix, 0, reqs);
    }

    @Test
    public void testEmptyRequest() throws Exception {
        String node = "0";
        this.blockingConnect(node);
        Assert.assertEquals((Object)"", (Object)this.blockingRequest(node, ""));
    }

    @Test(expected=IllegalStateException.class)
    public void testExistingConnectionId() throws IOException {
        this.blockingConnect("0");
        this.blockingConnect("0");
    }

    @Test
    public void testMute() throws Exception {
        this.blockingConnect("0");
        this.blockingConnect("1");
        this.selector.send((Send)this.createSend("0", "hello"));
        this.selector.send((Send)this.createSend("1", "hi"));
        this.selector.mute("1");
        while (this.selector.completedReceives().isEmpty()) {
            this.selector.poll(5L);
        }
        Assert.assertEquals((String)"We should have only one response", (long)1L, (long)this.selector.completedReceives().size());
        Assert.assertEquals((String)"The response should not be from the muted node", (Object)"0", (Object)((NetworkReceive)this.selector.completedReceives().get(0)).source());
        this.selector.unmute("1");
        do {
            this.selector.poll(5L);
        } while (this.selector.completedReceives().isEmpty());
        Assert.assertEquals((String)"We should have only one response", (long)1L, (long)this.selector.completedReceives().size());
        Assert.assertEquals((String)"The response should be from the previously muted node", (Object)"1", (Object)((NetworkReceive)this.selector.completedReceives().get(0)).source());
    }

    @Test
    public void testCloseOldestConnection() throws Exception {
        String id = "0";
        this.blockingConnect(id);
        this.time.sleep(6000L);
        this.selector.poll(0L);
        Assert.assertTrue((String)"The idle connection should have been closed", (boolean)this.selector.disconnected().containsKey(id));
        Assert.assertEquals((Object)ChannelState.EXPIRED, this.selector.disconnected().get(id));
    }

    private String blockingRequest(String node, String s) throws IOException {
        NetworkReceive receive;
        this.selector.send((Send)this.createSend(node, s));
        this.selector.poll(1000L);
        block0: while (true) {
            this.selector.poll(1000L);
            Iterator i$ = this.selector.completedReceives().iterator();
            do {
                if (!i$.hasNext()) continue block0;
            } while (!(receive = (NetworkReceive)i$.next()).source().equals(node));
            break;
        }
        return this.asString(receive);
    }

    protected void connect(String node, InetSocketAddress serverAddr) throws IOException {
        this.selector.connect(node, serverAddr, 4096, 4096);
    }

    private void blockingConnect(String node) throws IOException {
        this.blockingConnect(node, new InetSocketAddress("localhost", this.server.port));
    }

    protected void blockingConnect(String node, InetSocketAddress serverAddr) throws IOException {
        this.selector.connect(node, serverAddr, 4096, 4096);
        while (!this.selector.connected().contains(node)) {
            this.selector.poll(10000L);
        }
        while (!this.selector.isChannelReady(node)) {
            this.selector.poll(10000L);
        }
    }

    protected NetworkSend createSend(String node, String s) {
        return new NetworkSend(node, ByteBuffer.wrap(s.getBytes()));
    }

    protected String asString(NetworkReceive receive) {
        return new String(Utils.toArray((ByteBuffer)receive.payload()));
    }

    private void sendAndReceive(String node, String requestPrefix, int startIndex, int endIndex) throws Exception {
        int requests = startIndex;
        int responses = startIndex;
        this.selector.send((Send)this.createSend(node, requestPrefix + "-" + startIndex));
        ++requests;
        while (responses < endIndex) {
            this.selector.poll(0L);
            Assert.assertEquals((String)"No disconnects should have occurred.", (long)0L, (long)this.selector.disconnected().size());
            for (NetworkReceive receive : this.selector.completedReceives()) {
                Assert.assertEquals((Object)(requestPrefix + "-" + responses), (Object)this.asString(receive));
                ++responses;
            }
            for (int i = 0; i < this.selector.completedSends().size() && requests < endIndex; ++i, ++requests) {
                this.selector.send((Send)this.createSend(node, requestPrefix + "-" + requests));
            }
        }
    }
}

