/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.common.network;

import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.net.Socket;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.network.NetworkReceive;
import org.apache.kafka.common.network.NetworkSend;
import org.apache.kafka.common.network.Selectable;
import org.apache.kafka.common.network.Selector;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.test.TestUtils;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

public class SelectorTest {
    private static final List<NetworkSend> EMPTY = new ArrayList<NetworkSend>();
    private static final int BUFFER_SIZE = 4096;
    private EchoServer server;
    private Selectable selector;

    @Before
    public void setup() throws Exception {
        this.server = new EchoServer();
        this.server.start();
        this.selector = new Selector(new Metrics(), (Time)new MockTime(), "MetricGroup", new LinkedHashMap());
    }

    @After
    public void teardown() throws Exception {
        this.selector.close();
        this.server.close();
    }

    @Test
    public void testServerDisconnect() throws Exception {
        int node = 0;
        this.blockingConnect(node);
        Assert.assertEquals((Object)"hello", (Object)this.blockingRequest(node, "hello"));
        this.server.closeConnections();
        while (!this.selector.disconnected().contains(node)) {
            this.selector.poll(1000L, EMPTY);
        }
        this.blockingConnect(node);
        Assert.assertEquals((Object)"hello", (Object)this.blockingRequest(node, "hello"));
    }

    @Test
    public void testClientDisconnect() throws Exception {
        int node = 0;
        this.blockingConnect(node);
        this.selector.disconnect(node);
        this.selector.poll(10L, Arrays.asList(this.createSend(node, "hello1")));
        Assert.assertEquals((String)"Request should not have succeeded", (long)0L, (long)this.selector.completedSends().size());
        Assert.assertEquals((String)"There should be a disconnect", (long)1L, (long)this.selector.disconnected().size());
        Assert.assertTrue((String)"The disconnect should be from our node", (boolean)this.selector.disconnected().contains(node));
        this.blockingConnect(node);
        Assert.assertEquals((Object)"hello2", (Object)this.blockingRequest(node, "hello2"));
    }

    @Test(expected=IllegalStateException.class)
    public void testCantSendWithInProgress() throws Exception {
        int node = 0;
        this.blockingConnect(node);
        this.selector.poll(1000L, Arrays.asList(this.createSend(node, "test1"), this.createSend(node, "test2")));
    }

    @Test(expected=IllegalStateException.class)
    public void testCantSendWithoutConnecting() throws Exception {
        this.selector.poll(1000L, Arrays.asList(this.createSend(0, "test")));
    }

    @Test(expected=IOException.class)
    public void testNoRouteToHost() throws Exception {
        this.selector.connect(0, new InetSocketAddress("asdf.asdf.dsc", this.server.port), 4096, 4096);
    }

    @Test
    public void testConnectionRefused() throws Exception {
        int node = 0;
        this.selector.connect(node, new InetSocketAddress("localhost", TestUtils.choosePort()), 4096, 4096);
        while (this.selector.disconnected().contains(node)) {
            this.selector.poll(1000L, EMPTY);
        }
    }

    @Test
    public void testNormalOperation() throws Exception {
        int conns = 5;
        int reqs = 500;
        InetSocketAddress addr = new InetSocketAddress("localhost", this.server.port);
        for (int i = 0; i < conns; ++i) {
            this.selector.connect(i, addr, 4096, 4096);
        }
        int[] requests = new int[conns];
        int[] responses = new int[conns];
        int responseCount = 0;
        ArrayList<NetworkSend> sends = new ArrayList<NetworkSend>();
        for (int i = 0; i < conns; ++i) {
            sends.add(this.createSend(i, i + "-" + 0));
        }
        while (responseCount < conns * reqs) {
            this.selector.poll(0L, sends);
            Assert.assertEquals((String)"No disconnects should have occurred.", (long)0L, (long)this.selector.disconnected().size());
            for (NetworkReceive receive : this.selector.completedReceives()) {
                String[] pieces = this.asString(receive).split("-");
                Assert.assertEquals((String)"Should be in the form 'conn-counter'", (long)2L, (long)pieces.length);
                Assert.assertEquals((String)"Check the source", (long)receive.source(), (long)Integer.parseInt(pieces[0]));
                Assert.assertEquals((String)"Check that the receive has kindly been rewound", (long)0L, (long)receive.payload().position());
                Assert.assertEquals((String)"Check the request counter", (long)responses[receive.source()], (long)Integer.parseInt(pieces[1]));
                int n = receive.source();
                responses[n] = responses[n] + 1;
                ++responseCount;
            }
            sends.clear();
            for (NetworkSend send : this.selector.completedSends()) {
                int dest;
                int n = dest = send.destination();
                requests[n] = requests[n] + 1;
                if (requests[dest] >= reqs) continue;
                sends.add(this.createSend(dest, dest + "-" + requests[dest]));
            }
        }
    }

    @Test
    public void testSendLargeRequest() throws Exception {
        int node = 0;
        this.blockingConnect(node);
        String big = TestUtils.randomString(40960);
        Assert.assertEquals((Object)big, (Object)this.blockingRequest(node, big));
    }

    @Test
    public void testEmptyRequest() throws Exception {
        int node = 0;
        this.blockingConnect(node);
        Assert.assertEquals((Object)"", (Object)this.blockingRequest(node, ""));
    }

    @Test(expected=IllegalStateException.class)
    public void testExistingConnectionId() throws IOException {
        this.blockingConnect(0);
        this.blockingConnect(0);
    }

    private String blockingRequest(int node, String s) throws IOException {
        NetworkReceive receive;
        this.selector.poll(1000L, Arrays.asList(this.createSend(node, s)));
        block0: while (true) {
            this.selector.poll(1000L, EMPTY);
            Iterator i$ = this.selector.completedReceives().iterator();
            do {
                if (!i$.hasNext()) continue block0;
            } while ((receive = (NetworkReceive)i$.next()).source() != node);
            break;
        }
        return this.asString(receive);
    }

    private void blockingConnect(int node) throws IOException {
        this.selector.connect(node, new InetSocketAddress("localhost", this.server.port), 4096, 4096);
        while (!this.selector.connected().contains(node)) {
            this.selector.poll(10000L, EMPTY);
        }
    }

    private NetworkSend createSend(int node, String s) {
        return new NetworkSend(node, new ByteBuffer[]{ByteBuffer.wrap(s.getBytes())});
    }

    private String asString(NetworkReceive receive) {
        return new String(Utils.toArray((ByteBuffer)receive.payload()));
    }

    static class EchoServer
    extends Thread {
        public final int port = TestUtils.choosePort();
        private final ServerSocket serverSocket = new ServerSocket(this.port);
        private final List<Thread> threads = Collections.synchronizedList(new ArrayList());
        private final List<Socket> sockets = Collections.synchronizedList(new ArrayList());

        @Override
        public void run() {
            try {
                while (true) {
                    final Socket socket = this.serverSocket.accept();
                    this.sockets.add(socket);
                    Thread thread = new Thread(){

                        /*
                         * WARNING - Removed try catching itself - possible behaviour change.
                         */
                        @Override
                        public void run() {
                            try {
                                DataInputStream input = new DataInputStream(socket.getInputStream());
                                DataOutputStream output = new DataOutputStream(socket.getOutputStream());
                                while (socket.isConnected() && !socket.isClosed()) {
                                    int size = input.readInt();
                                    byte[] bytes = new byte[size];
                                    input.readFully(bytes);
                                    output.writeInt(size);
                                    output.write(bytes);
                                    output.flush();
                                }
                            }
                            catch (IOException e) {
                            }
                            finally {
                                try {
                                    socket.close();
                                }
                                catch (IOException e) {}
                            }
                        }
                    };
                    thread.start();
                    this.threads.add(thread);
                }
            }
            catch (IOException iOException) {
                return;
            }
        }

        public void closeConnections() throws IOException {
            for (Socket socket : this.sockets) {
                socket.close();
            }
        }

        public void close() throws IOException, InterruptedException {
            this.serverSocket.close();
            this.closeConnections();
            for (Thread t : this.threads) {
                t.join();
            }
            this.join();
        }
    }
}

