/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.hdfs.server.datanode;

import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.InetAddress;
import java.net.Socket;
import java.net.SocketAddress;
import java.net.SocketException;
import java.net.UnknownHostException;
import java.nio.channels.SocketChannel;
import java.util.ArrayList;
import java.util.List;
import javax.net.SocketFactory;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.net.StandardSocketFactory;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;

public class TestDataNodeTcpNoDelay {
    private static final Log LOG = LogFactory.getLog(TestDataNodeTcpNoDelay.class);
    private static Configuration baseConf;

    @BeforeClass
    public static void setUpBeforeClass() throws Exception {
        baseConf = new HdfsConfiguration();
    }

    @AfterClass
    public static void tearDownAfterClass() throws Exception {
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testTcpNoDelayEnabled() throws Exception {
        Configuration testConf = new Configuration(baseConf);
        testConf.set("hadoop.rpc.socket.factory.class.default", SocketFactoryWrapper.class.getName());
        SocketFactory defaultFactory = NetUtils.getDefaultSocketFactory((Configuration)testConf);
        LOG.info((Object)("Socket factory is " + defaultFactory.getClass().getName()));
        MiniDFSCluster dfsCluster = new MiniDFSCluster.Builder(testConf).numDataNodes(3).build();
        dfsCluster.waitActive();
        DistributedFileSystem dfs = dfsCluster.getFileSystem();
        try {
            this.createData(dfs);
            this.transferBlock(dfs);
            Assert.assertTrue((boolean)SocketFactoryWrapper.wasTcpNoDelayActive());
        }
        finally {
            SocketFactoryWrapper.reset();
            dfsCluster.shutdown();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testTcpNoDelayDisabled() throws Exception {
        Configuration testConf = new Configuration(baseConf);
        this.setTcpNoDelay(testConf, false);
        testConf.set("hadoop.rpc.socket.factory.class.default", SocketFactoryWrapper.class.getName());
        SocketFactory defaultFactory = NetUtils.getDefaultSocketFactory((Configuration)testConf);
        LOG.info((Object)("Socket factory is " + defaultFactory.getClass().getName()));
        MiniDFSCluster dfsCluster = new MiniDFSCluster.Builder(testConf).numDataNodes(3).build();
        dfsCluster.waitActive();
        DistributedFileSystem dfs = dfsCluster.getFileSystem();
        try {
            this.createData(dfs);
            this.transferBlock(dfs);
            Assert.assertFalse((boolean)SocketFactoryWrapper.wasTcpNoDelayActive());
        }
        finally {
            SocketFactoryWrapper.reset();
            dfsCluster.shutdown();
        }
    }

    private void createData(DistributedFileSystem dfs) throws Exception {
        Path dir = new Path("test-dir");
        for (int i = 0; i < 3; ++i) {
            Path f = new Path(dir, "file" + i);
            DFSTestUtil.createFile((FileSystem)dfs, f, 10240L, (short)3, 0L);
        }
    }

    private void transferBlock(DistributedFileSystem dfs) throws Exception {
        Path dir = new Path("test-block-transfer");
        Path f = new Path(dir, "testfile");
        DFSTestUtil.createFile((FileSystem)dfs, f, 10240L, (short)1, 0L);
        dfs.setReplication(f, (short)2);
        DFSTestUtil.waitForReplication(dfs, f, (short)2, 20000);
    }

    private void setTcpNoDelay(Configuration conf, boolean value) {
        conf.setBoolean("dfs.data.transfer.client.tcpnodelay", value);
        conf.setBoolean("dfs.data.transfer.server.tcpnodelay", value);
        conf.setBoolean("ipc.client.tcpnodelay", value);
        conf.setBoolean("ipc.server.tcpnodelay", value);
    }

    public static class SocketWrapper
    extends Socket {
        private final Socket wrapped;
        private boolean tcpNoDelay;

        public SocketWrapper(Socket socket) {
            this.wrapped = socket;
        }

        @Override
        public void connect(SocketAddress endpoint) throws IOException {
            this.wrapped.connect(endpoint);
        }

        @Override
        public void connect(SocketAddress endpoint, int timeout) throws IOException {
            this.wrapped.connect(endpoint, timeout);
        }

        @Override
        public void bind(SocketAddress bindpoint) throws IOException {
            this.wrapped.bind(bindpoint);
        }

        @Override
        public InetAddress getInetAddress() {
            return this.wrapped.getInetAddress();
        }

        @Override
        public InetAddress getLocalAddress() {
            return this.wrapped.getLocalAddress();
        }

        @Override
        public int getPort() {
            return this.wrapped.getPort();
        }

        @Override
        public int getLocalPort() {
            return this.wrapped.getLocalPort();
        }

        @Override
        public SocketAddress getRemoteSocketAddress() {
            return this.wrapped.getRemoteSocketAddress();
        }

        @Override
        public SocketAddress getLocalSocketAddress() {
            return this.wrapped.getLocalSocketAddress();
        }

        @Override
        public SocketChannel getChannel() {
            return this.wrapped.getChannel();
        }

        @Override
        public InputStream getInputStream() throws IOException {
            return this.wrapped.getInputStream();
        }

        @Override
        public OutputStream getOutputStream() throws IOException {
            return this.wrapped.getOutputStream();
        }

        @Override
        public void setTcpNoDelay(boolean on) throws SocketException {
            this.wrapped.setTcpNoDelay(on);
            this.tcpNoDelay = on;
        }

        @Override
        public boolean getTcpNoDelay() throws SocketException {
            return this.wrapped.getTcpNoDelay();
        }

        @Override
        public void setSoLinger(boolean on, int linger) throws SocketException {
            this.wrapped.setSoLinger(on, linger);
        }

        @Override
        public int getSoLinger() throws SocketException {
            return this.wrapped.getSoLinger();
        }

        @Override
        public void sendUrgentData(int data) throws IOException {
            this.wrapped.sendUrgentData(data);
        }

        @Override
        public void setOOBInline(boolean on) throws SocketException {
            this.wrapped.setOOBInline(on);
        }

        @Override
        public boolean getOOBInline() throws SocketException {
            return this.wrapped.getOOBInline();
        }

        @Override
        public synchronized void setSoTimeout(int timeout) throws SocketException {
            this.wrapped.setSoTimeout(timeout);
        }

        @Override
        public synchronized int getSoTimeout() throws SocketException {
            return this.wrapped.getSoTimeout();
        }

        @Override
        public synchronized void setSendBufferSize(int size) throws SocketException {
            this.wrapped.setSendBufferSize(size);
        }

        @Override
        public synchronized int getSendBufferSize() throws SocketException {
            return this.wrapped.getSendBufferSize();
        }

        @Override
        public synchronized void setReceiveBufferSize(int size) throws SocketException {
            this.wrapped.setReceiveBufferSize(size);
        }

        @Override
        public synchronized int getReceiveBufferSize() throws SocketException {
            return this.wrapped.getReceiveBufferSize();
        }

        @Override
        public void setKeepAlive(boolean on) throws SocketException {
            this.wrapped.setKeepAlive(on);
        }

        @Override
        public boolean getKeepAlive() throws SocketException {
            return this.wrapped.getKeepAlive();
        }

        @Override
        public void setTrafficClass(int tc) throws SocketException {
            this.wrapped.setTrafficClass(tc);
        }

        @Override
        public int getTrafficClass() throws SocketException {
            return this.wrapped.getTrafficClass();
        }

        @Override
        public void setReuseAddress(boolean on) throws SocketException {
            this.wrapped.setReuseAddress(on);
        }

        @Override
        public boolean getReuseAddress() throws SocketException {
            return this.wrapped.getReuseAddress();
        }

        @Override
        public synchronized void close() throws IOException {
            this.wrapped.close();
        }

        @Override
        public void shutdownInput() throws IOException {
            this.wrapped.shutdownInput();
        }

        @Override
        public void shutdownOutput() throws IOException {
            this.wrapped.shutdownOutput();
        }

        @Override
        public String toString() {
            return this.wrapped.toString();
        }

        @Override
        public boolean isConnected() {
            return this.wrapped.isConnected();
        }

        @Override
        public boolean isBound() {
            return this.wrapped.isBound();
        }

        @Override
        public boolean isClosed() {
            return this.wrapped.isClosed();
        }

        @Override
        public boolean isInputShutdown() {
            return this.wrapped.isInputShutdown();
        }

        @Override
        public boolean isOutputShutdown() {
            return this.wrapped.isOutputShutdown();
        }

        @Override
        public void setPerformancePreferences(int connectionTime, int latency, int bandwidth) {
            this.wrapped.setPerformancePreferences(connectionTime, latency, bandwidth);
        }

        public boolean getLastTcpNoDelay() {
            return this.tcpNoDelay;
        }
    }

    public static class SocketFactoryWrapper
    extends StandardSocketFactory {
        private static List<SocketWrapper> sockets = new ArrayList<SocketWrapper>();

        public static boolean wasTcpNoDelayActive() {
            LOG.info((Object)("Checking " + sockets.size() + " sockets for TCP_NODELAY"));
            for (SocketWrapper sw : sockets) {
                if (sw.getLastTcpNoDelay()) continue;
                return false;
            }
            return true;
        }

        public static void reset() {
            sockets = new ArrayList<SocketWrapper>();
        }

        public Socket createSocket() throws IOException {
            LOG.info((Object)"Creating new socket");
            SocketWrapper wrapper = new SocketWrapper(super.createSocket());
            sockets.add(wrapper);
            return wrapper;
        }

        public Socket createSocket(String host, int port) throws IOException, UnknownHostException {
            LOG.info((Object)("Creating socket for " + host));
            SocketWrapper wrapper = new SocketWrapper(super.createSocket(host, port));
            sockets.add(wrapper);
            return wrapper;
        }

        public Socket createSocket(String host, int port, InetAddress localHostAddr, int localPort) throws IOException, UnknownHostException {
            LOG.info((Object)("Creating socket for " + host));
            SocketWrapper wrapper = new SocketWrapper(super.createSocket(host, port, localHostAddr, localPort));
            sockets.add(wrapper);
            return wrapper;
        }

        public Socket createSocket(InetAddress addr, int port) throws IOException {
            LOG.info((Object)("Creating socket for " + addr));
            SocketWrapper wrapper = new SocketWrapper(super.createSocket(addr, port));
            sockets.add(wrapper);
            return wrapper;
        }

        public Socket createSocket(InetAddress addr, int port, InetAddress localHostAddr, int localPort) throws IOException {
            LOG.info((Object)("Creating socket for " + addr));
            SocketWrapper wrapper = new SocketWrapper(super.createSocket(addr, port, localHostAddr, localPort));
            sockets.add(wrapper);
            return wrapper;
        }
    }
}

