/*
 * Decompiled with CFR 0.152.
 */
package org.gridgain.grid.kernal.processors.hadoop.taskexecutor.external.communication;

import java.io.Externalizable;
import java.io.IOException;
import java.io.ObjectInput;
import java.io.ObjectOutput;
import java.net.ConnectException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.SocketTimeoutException;
import java.nio.ByteOrder;
import java.nio.channels.SocketChannel;
import java.util.Collection;
import java.util.UUID;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import org.gridgain.grid.GridException;
import org.gridgain.grid.kernal.processors.hadoop.message.GridHadoopMessage;
import org.gridgain.grid.kernal.processors.hadoop.taskexecutor.external.GridHadoopProcessDescriptor;
import org.gridgain.grid.kernal.processors.hadoop.taskexecutor.external.communication.GridHadoopCommunicationClient;
import org.gridgain.grid.kernal.processors.hadoop.taskexecutor.external.communication.GridHadoopHandshakeTimeoutException;
import org.gridgain.grid.kernal.processors.hadoop.taskexecutor.external.communication.GridHadoopIpcToNioAdapter;
import org.gridgain.grid.kernal.processors.hadoop.taskexecutor.external.communication.GridHadoopMarshallerFilter;
import org.gridgain.grid.kernal.processors.hadoop.taskexecutor.external.communication.GridHadoopMessageListener;
import org.gridgain.grid.kernal.processors.hadoop.taskexecutor.external.communication.GridHadoopTcpNioCommunicationClient;
import org.gridgain.grid.lang.GridInClosure;
import org.gridgain.grid.logger.GridLogger;
import org.gridgain.grid.marshaller.GridMarshaller;
import org.gridgain.grid.thread.GridThread;
import org.gridgain.grid.util.GridConcurrentFactory;
import org.gridgain.grid.util.GridKeyLock;
import org.gridgain.grid.util.ipc.GridIpcEndpoint;
import org.gridgain.grid.util.ipc.shmem.GridIpcOutOfSystemResourcesException;
import org.gridgain.grid.util.ipc.shmem.GridIpcSharedMemoryClientEndpoint;
import org.gridgain.grid.util.ipc.shmem.GridIpcSharedMemoryServerEndpoint;
import org.gridgain.grid.util.nio.GridBufferedParser;
import org.gridgain.grid.util.nio.GridNioAsyncNotifyFilter;
import org.gridgain.grid.util.nio.GridNioCodecFilter;
import org.gridgain.grid.util.nio.GridNioFilter;
import org.gridgain.grid.util.nio.GridNioFilterAdapter;
import org.gridgain.grid.util.nio.GridNioFuture;
import org.gridgain.grid.util.nio.GridNioMessageTracker;
import org.gridgain.grid.util.nio.GridNioParser;
import org.gridgain.grid.util.nio.GridNioServer;
import org.gridgain.grid.util.nio.GridNioServerListener;
import org.gridgain.grid.util.nio.GridNioServerListenerAdapter;
import org.gridgain.grid.util.nio.GridNioSession;
import org.gridgain.grid.util.nio.GridNioSessionMetaKey;
import org.gridgain.grid.util.typedef.CI1;
import org.gridgain.grid.util.typedef.F;
import org.gridgain.grid.util.typedef.X;
import org.gridgain.grid.util.typedef.internal.LT;
import org.gridgain.grid.util.typedef.internal.S;
import org.gridgain.grid.util.typedef.internal.U;
import org.gridgain.grid.util.worker.GridWorker;
import org.jdk8.backport.ConcurrentLinkedDeque8;
import org.jetbrains.annotations.Nullable;

public class GridHadoopExternalCommunication {
    public static final String OUT_OF_RESOURCES_TCP_MSG = "Failed to allocate shared memory segment (switching to TCP, may be slower). For troubleshooting see http://bit.ly/GridGain-Troubleshooting";
    public static final int DFLT_PORT = 27100;
    public static final long DFLT_CONN_TIMEOUT = 1000L;
    public static final long DFLT_MAX_CONN_TIMEOUT = 600000L;
    public static final int DFLT_RECONNECT_CNT = 10;
    public static final int DFLT_MSG_QUEUE_LIMIT = 1024;
    public static final int DFLT_SELECTORS_CNT = 1;
    private static final int PROCESS_META = GridNioSessionMetaKey.nextUniqueKey();
    private static final int HANDSHAKE_FINISH_META = GridNioSessionMetaKey.nextUniqueKey();
    private static final int TRACKER_META = GridNioSessionMetaKey.nextUniqueKey();
    public static final int DFLT_PORT_RANGE = 100;
    public static final boolean DFLT_TCP_NODELAY = true;
    private final GridNioServerListener<GridHadoopMessage> srvLsnr = new GridNioServerListenerAdapter<GridHadoopMessage>(){

        public void onConnected(GridNioSession ses) {
            GridHadoopProcessDescriptor desc = (GridHadoopProcessDescriptor)ses.meta(PROCESS_META);
            assert (desc != null) : "Received connected notification without finished handshake: " + ses;
        }

        public void onDisconnected(GridNioSession ses, @Nullable Exception e) {
            GridHadoopMessageListener lsnr0;
            GridHadoopCommunicationClient rmv;
            GridHadoopProcessDescriptor desc;
            if (GridHadoopExternalCommunication.this.log.isDebugEnabled()) {
                GridHadoopExternalCommunication.this.log.debug("Closed connection for session: " + ses);
            }
            if (e != null) {
                U.error((GridLogger)GridHadoopExternalCommunication.this.log, (Object)("Session disconnected due to exception: " + ses), (Throwable)e);
            }
            if ((desc = (GridHadoopProcessDescriptor)ses.meta(PROCESS_META)) != null && (rmv = (GridHadoopCommunicationClient)GridHadoopExternalCommunication.this.clients.remove(desc.processId())) != null) {
                rmv.forceClose();
            }
            if ((lsnr0 = GridHadoopExternalCommunication.this.lsnr) != null) {
                lsnr0.onConnectionLost(desc);
            }
        }

        public void onMessage(GridNioSession ses, GridHadoopMessage msg) {
            GridHadoopExternalCommunication.this.notifyListener((GridHadoopProcessDescriptor)ses.meta(PROCESS_META), msg);
            if (GridHadoopExternalCommunication.this.msgQueueLimit > 0) {
                GridNioMessageTracker tracker = (GridNioMessageTracker)ses.meta(TRACKER_META);
                assert (tracker != null) : "Missing tracker for limited message queue: " + ses;
                tracker.run();
            }
        }
    };
    private GridLogger log;
    private GridHadoopProcessDescriptor locProcDesc;
    private GridMarshaller marsh;
    private ExecutorService execSvc;
    private String gridName;
    private volatile InetAddress locHost;
    private int locPort = 27100;
    private int locPortRange = 100;
    private int shmemPort = -1;
    private boolean directBuf = true;
    private long connTimeout = 1000L;
    private long maxConnTimeout = 600000L;
    private int reconCnt = 10;
    private int sockSndBuf;
    private int sockRcvBuf;
    private int msgQueueLimit = 1024;
    private GridNioServer<GridHadoopMessage> nioSrvr;
    private GridIpcSharedMemoryServerEndpoint shmemSrv;
    private boolean tcpNoDelay = true;
    private ShmemAcceptWorker shmemAcceptWorker;
    private final Collection<ShmemWorker> shmemWorkers = new ConcurrentLinkedDeque8();
    private final ConcurrentMap<UUID, GridHadoopCommunicationClient> clients = GridConcurrentFactory.newMap();
    private volatile GridHadoopMessageListener lsnr;
    private int boundTcpPort = -1;
    private int boundTcpShmemPort = -1;
    private int selectorsCnt = 1;
    private ProcessHandshakeMessage locIdMsg;
    private final GridKeyLock locks = new GridKeyLock();

    public GridHadoopExternalCommunication(UUID parentNodeId, UUID procId, GridMarshaller marsh, GridLogger log, ExecutorService execSvc, String gridName) {
        this.locProcDesc = new GridHadoopProcessDescriptor(parentNodeId, procId);
        this.marsh = marsh;
        this.log = log.getLogger(GridHadoopExternalCommunication.class);
        this.execSvc = execSvc;
        this.gridName = gridName;
    }

    public void setLocalPort(int locPort) {
        this.locPort = locPort;
    }

    public int getLocalPort() {
        return this.locPort;
    }

    public void setLocalPortRange(int locPortRange) {
        this.locPortRange = locPortRange;
    }

    public int getLocalPortRange() {
        return this.locPortRange;
    }

    public void setSharedMemoryPort(int shmemPort) {
        this.shmemPort = shmemPort;
    }

    public int getSharedMemoryPort() {
        return this.shmemPort;
    }

    public void setConnectTimeout(long connTimeout) {
        this.connTimeout = connTimeout;
    }

    public long getConnectTimeout() {
        return this.connTimeout;
    }

    public void setMaxConnectTimeout(long maxConnTimeout) {
        this.maxConnTimeout = maxConnTimeout;
    }

    public long getMaxConnectTimeout() {
        return this.maxConnTimeout;
    }

    public void setReconnectCount(int reconCnt) {
        this.reconCnt = reconCnt;
    }

    public int getReconnectCount() {
        return this.reconCnt;
    }

    public void setDirectBuffer(boolean directBuf) {
        this.directBuf = directBuf;
    }

    public boolean isDirectBuffer() {
        return this.directBuf;
    }

    public void setSelectorsCount(int selectorsCnt) {
        this.selectorsCnt = selectorsCnt;
    }

    public int getSelectorsCount() {
        return this.selectorsCnt;
    }

    public void setTcpNoDelay(boolean tcpNoDelay) {
        this.tcpNoDelay = tcpNoDelay;
    }

    public boolean isTcpNoDelay() {
        return this.tcpNoDelay;
    }

    public void setSocketReceiveBuffer(int sockRcvBuf) {
        this.sockRcvBuf = sockRcvBuf;
    }

    public int getSocketReceiveBuffer() {
        return this.sockRcvBuf;
    }

    public void setSocketSendBuffer(int sockSndBuf) {
        this.sockSndBuf = sockSndBuf;
    }

    public int getSocketSendBuffer() {
        return this.sockSndBuf;
    }

    public void setMessageQueueLimit(int msgQueueLimit) {
        this.msgQueueLimit = msgQueueLimit;
    }

    public int getMessageQueueLimit() {
        return this.msgQueueLimit;
    }

    public void setListener(GridHadoopMessageListener lsnr) {
        this.lsnr = lsnr;
    }

    public int getOutboundMessagesQueueSize() {
        return this.nioSrvr.outboundMessagesQueueSize();
    }

    public void start() throws GridException {
        try {
            this.locHost = U.getLocalHost();
        }
        catch (IOException e) {
            throw new GridException("Failed to initialize local address.", (Throwable)e);
        }
        try {
            this.shmemSrv = this.resetShmemServer();
        }
        catch (GridException e) {
            U.warn((GridLogger)this.log, (Object)"Failed to start shared memory communication server.", (Object)((Object)e));
        }
        try {
            this.nioSrvr = this.resetNioServer();
        }
        catch (GridException e) {
            throw new GridException("Failed to initialize TCP server: " + this.locHost, (Throwable)e);
        }
        this.locProcDesc.address(this.locHost.getHostAddress());
        this.locProcDesc.sharedMemoryPort(this.boundTcpShmemPort);
        this.locProcDesc.tcpPort(this.boundTcpPort);
        this.locIdMsg = new ProcessHandshakeMessage(this.locProcDesc);
        if (this.shmemSrv != null) {
            this.shmemAcceptWorker = new ShmemAcceptWorker(this.shmemSrv);
            new GridThread((GridWorker)this.shmemAcceptWorker).start();
        }
        this.nioSrvr.start();
    }

    public GridHadoopProcessDescriptor localProcessDescriptor() {
        return this.locProcDesc;
    }

    private GridNioFilter[] filters() {
        return new GridNioFilter[]{new GridNioAsyncNotifyFilter(this.gridName, (Executor)this.execSvc, this.log), new HandshakeAndBackpressureFilter(), new GridHadoopMarshallerFilter(this.marsh), new GridNioCodecFilter((GridNioParser)new GridBufferedParser(this.directBuf, ByteOrder.nativeOrder()), this.log, false)};
    }

    private GridNioServer<GridHadoopMessage> resetNioServer() throws GridException {
        if (this.boundTcpPort >= 0) {
            throw new GridException("Tcp NIO server was already created on port " + this.boundTcpPort);
        }
        GridException lastEx = null;
        for (int port = this.locPort; port < this.locPort + this.locPortRange; ++port) {
            try {
                GridNioServer srvr = GridNioServer.builder().address(this.locHost).port(port).listener(this.srvLsnr).logger(this.log.getLogger(GridNioServer.class)).selectorCount(this.selectorsCnt).gridName(this.gridName).tcpNoDelay(this.tcpNoDelay).directBuffer(this.directBuf).byteOrder(ByteOrder.nativeOrder()).socketSendBufferSize(this.sockSndBuf).socketReceiveBufferSize(this.sockRcvBuf).sendQueueLimit(this.msgQueueLimit).directMode(false).filters(this.filters()).build();
                this.boundTcpPort = port;
                if (this.log.isInfoEnabled()) {
                    this.log.info("Successfully bound to TCP port [port=" + this.boundTcpPort + ", locHost=" + this.locHost + ']');
                }
                return srvr;
            }
            catch (GridException e) {
                lastEx = e;
                if (!this.log.isDebugEnabled()) continue;
                this.log.debug("Failed to bind to local port (will try next port within range) [port=" + port + ", locHost=" + this.locHost + ']');
                continue;
            }
        }
        throw new GridException("Failed to bind to any port within range [startPort=" + this.locPort + ", portRange=" + this.locPortRange + ", locHost=" + this.locHost + ']', (Throwable)lastEx);
    }

    @Nullable
    private GridIpcSharedMemoryServerEndpoint resetShmemServer() throws GridException {
        if (this.boundTcpShmemPort >= 0) {
            throw new GridException("Shared memory server was already created on port " + this.boundTcpShmemPort);
        }
        if (this.shmemPort == -1 || U.isWindows()) {
            return null;
        }
        GridException lastEx = null;
        for (int port = this.shmemPort; port < this.shmemPort + this.locPortRange; ++port) {
            try {
                GridIpcSharedMemoryServerEndpoint srv = new GridIpcSharedMemoryServerEndpoint(this.log.getLogger(GridIpcSharedMemoryServerEndpoint.class), this.locProcDesc.processId(), this.gridName);
                srv.setPort(port);
                srv.omitOutOfResourcesWarning(true);
                srv.start();
                this.boundTcpShmemPort = port;
                if (this.log.isInfoEnabled()) {
                    this.log.info("Successfully bound shared memory communication to TCP port [port=" + this.boundTcpShmemPort + ", locHost=" + this.locHost + ']');
                }
                return srv;
            }
            catch (GridException e) {
                lastEx = e;
                if (!this.log.isDebugEnabled()) continue;
                this.log.debug("Failed to bind to local port (will try next port within range) [port=" + port + ", locHost=" + this.locHost + ']');
                continue;
            }
        }
        throw new GridException("Failed to bind shared memory communication to any port within range [startPort=" + this.locPort + ", portRange=" + this.locPortRange + ", locHost=" + this.locHost + ']', (Throwable)lastEx);
    }

    public void stop() throws GridException {
        if (this.nioSrvr != null) {
            this.nioSrvr.stop();
        }
        U.cancel((GridWorker)this.shmemAcceptWorker);
        U.join((GridWorker)this.shmemAcceptWorker, (GridLogger)this.log);
        U.cancel(this.shmemWorkers);
        U.join(this.shmemWorkers, (GridLogger)this.log);
        this.shmemWorkers.clear();
        for (GridHadoopCommunicationClient client : this.clients.values()) {
            client.forceClose();
        }
        this.nioSrvr = null;
        this.boundTcpPort = -1;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void sendMessage(GridHadoopProcessDescriptor desc, GridHadoopMessage msg) throws GridException {
        assert (desc != null);
        assert (msg != null);
        if (this.log.isTraceEnabled()) {
            this.log.trace("Sending message to Hadoop process [desc=" + desc + ", msg=" + msg + ']');
        }
        GridHadoopCommunicationClient client = null;
        boolean closeOnRelease = true;
        try {
            client = this.reserveClient(desc);
            client.sendMessage(desc, msg);
            closeOnRelease = false;
        }
        finally {
            if (client != null) {
                if (closeOnRelease) {
                    client.forceClose();
                    this.clients.remove(desc.processId(), client);
                } else {
                    client.release();
                }
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private GridHadoopCommunicationClient reserveClient(GridHadoopProcessDescriptor desc) throws GridException {
        assert (desc != null);
        UUID procId = desc.processId();
        while (true) {
            GridHadoopCommunicationClient client;
            if ((client = (GridHadoopCommunicationClient)this.clients.get(procId)) == null) {
                if (this.log.isDebugEnabled()) {
                    this.log.debug("Did not find client for remote process [locProcDesc=" + this.locProcDesc + ", desc=" + desc + ']');
                }
                Object sync = this.locks.lock((Object)procId);
                try {
                    client = (GridHadoopCommunicationClient)this.clients.get(procId);
                    if (client == null) {
                        client = this.createNioClient(desc);
                        GridHadoopCommunicationClient old = this.clients.put(procId, client);
                        assert (old == null);
                    }
                }
                finally {
                    this.locks.unlock((Object)procId, sync);
                }
                assert (client != null);
            }
            if (client.reserve()) {
                return client;
            }
            this.clients.remove(procId, client);
        }
    }

    @Nullable
    protected GridHadoopCommunicationClient createNioClient(GridHadoopProcessDescriptor desc) throws GridException {
        block5: {
            assert (desc != null);
            int shmemPort = desc.sharedMemoryPort();
            if (shmemPort != -1 && this.locProcDesc.parentNodeId().equals(desc.parentNodeId())) {
                try {
                    return this.createShmemClient(desc, shmemPort);
                }
                catch (GridException e) {
                    if (e.hasCause(new Class[]{GridIpcOutOfSystemResourcesException.class})) {
                        LT.warn((GridLogger)this.log, null, (String)OUT_OF_RESOURCES_TCP_MSG);
                    }
                    if (!this.log.isDebugEnabled()) break block5;
                    this.log.debug("Failed to establish shared memory connection with local hadoop process: " + desc);
                }
            }
        }
        return this.createTcpClient(desc);
    }

    @Nullable
    protected GridHadoopCommunicationClient createShmemClient(GridHadoopProcessDescriptor desc, int port) throws GridException {
        GridHadoopCommunicationClient client;
        int attempt = 1;
        int connectAttempts = 1;
        long connTimeout0 = this.connTimeout;
        while (true) {
            GridIpcSharedMemoryClientEndpoint clientEndpoint;
            try {
                clientEndpoint = new GridIpcSharedMemoryClientEndpoint(port, (int)this.connTimeout, this.log);
            }
            catch (GridException e) {
                if (connectAttempts < 2 && X.hasCause((Throwable)e, (Class[])new Class[]{ConnectException.class})) {
                    ++connectAttempts;
                    continue;
                }
                throw e;
            }
            client = null;
            try {
                ShmemWorker worker = new ShmemWorker((GridIpcEndpoint)clientEndpoint, false);
                this.shmemWorkers.add(worker);
                GridNioSession ses = worker.session();
                HandshakeFinish fin = new HandshakeFinish();
                ses.addMeta(HANDSHAKE_FINISH_META, (Object)fin);
                client = new GridHadoopTcpNioCommunicationClient(ses);
                new GridThread((GridWorker)worker).start();
                fin.await(connTimeout0);
            }
            catch (GridHadoopHandshakeTimeoutException e) {
                if (this.log.isDebugEnabled()) {
                    this.log.debug("Handshake timed out (will retry with increased timeout) [timeout=" + connTimeout0 + ", err=" + e.getMessage() + ", client=" + client + ']');
                }
                if (client != null) {
                    client.forceClose();
                }
                if (attempt == this.reconCnt || connTimeout0 > this.maxConnTimeout) {
                    if (this.log.isDebugEnabled()) {
                        this.log.debug("Handshake timedout (will stop attempts to perform the handshake) [timeout=" + connTimeout0 + ", maxConnTimeout=" + this.maxConnTimeout + ", attempt=" + attempt + ", reconCnt=" + this.reconCnt + ", err=" + e.getMessage() + ", client=" + client + ']');
                    }
                    throw e;
                }
                ++attempt;
                connTimeout0 *= 2L;
                continue;
            }
            catch (Error | RuntimeException e) {
                if (this.log.isDebugEnabled()) {
                    this.log.debug("Caught exception (will close client) [err=" + e.getMessage() + ", client=" + client + ']');
                }
                if (client != null) {
                    client.forceClose();
                }
                throw e;
            }
            break;
        }
        return client;
    }

    protected GridHadoopCommunicationClient createTcpClient(GridHadoopProcessDescriptor desc) throws GridException {
        String addr = desc.address();
        int port = desc.tcpPort();
        if (this.log.isDebugEnabled()) {
            this.log.debug("Trying to connect to remote process [locProcDesc=" + this.locProcDesc + ", desc=" + desc + ']');
        }
        boolean conn = false;
        GridHadoopTcpNioCommunicationClient client = null;
        GridException errs = null;
        int connectAttempts = 1;
        long connTimeout0 = this.connTimeout;
        int attempt = 1;
        while (!conn) {
            try {
                SocketChannel ch = SocketChannel.open();
                ch.configureBlocking(true);
                ch.socket().setTcpNoDelay(this.tcpNoDelay);
                ch.socket().setKeepAlive(true);
                if (this.sockRcvBuf > 0) {
                    ch.socket().setReceiveBufferSize(this.sockRcvBuf);
                }
                if (this.sockSndBuf > 0) {
                    ch.socket().setSendBufferSize(this.sockSndBuf);
                }
                ch.socket().connect(new InetSocketAddress(addr, port), (int)this.connTimeout);
                HandshakeFinish fin = new HandshakeFinish();
                GridNioSession ses = (GridNioSession)this.nioSrvr.createSession(ch, F.asMap((Object)HANDSHAKE_FINISH_META, (Object)fin)).get();
                client = new GridHadoopTcpNioCommunicationClient(ses);
                if (this.log.isDebugEnabled()) {
                    this.log.debug("Waiting for handshake finish for client: " + client);
                }
                fin.await(connTimeout0);
                conn = true;
            }
            catch (GridHadoopHandshakeTimeoutException e) {
                if (client != null) {
                    client.forceClose();
                    client = null;
                }
                if (this.log.isDebugEnabled()) {
                    this.log.debug("Handshake timedout (will retry with increased timeout) [timeout=" + connTimeout0 + ", desc=" + desc + ", port=" + port + ", err=" + (Object)((Object)e) + ']');
                }
                if (attempt == this.reconCnt || connTimeout0 > this.maxConnTimeout) {
                    if (this.log.isDebugEnabled()) {
                        this.log.debug("Handshake timed out (will stop attempts to perform the handshake) [timeout=" + connTimeout0 + ", maxConnTimeout=" + this.maxConnTimeout + ", attempt=" + attempt + ", reconCnt=" + this.reconCnt + ", err=" + e.getMessage() + ", addr=" + addr + ']');
                    }
                    if (errs == null) {
                        errs = new GridException("Failed to connect to remote Hadoop process (is process still running?) [desc=" + desc + ", addrs=" + addr + ']');
                    }
                    errs.addSuppressed((Throwable)((Object)e));
                    break;
                }
                ++attempt;
                connTimeout0 *= 2L;
            }
            catch (Exception e) {
                if (client != null) {
                    client.forceClose();
                    client = null;
                }
                if (this.log.isDebugEnabled()) {
                    this.log.debug("Client creation failed [addr=" + addr + ", port=" + port + ", err=" + e + ']');
                }
                if (X.hasCause((Throwable)e, (Class[])new Class[]{SocketTimeoutException.class})) {
                    LT.warn((GridLogger)this.log, null, (String)("Connect timed out (consider increasing 'connTimeout' configuration property) [addr=" + addr + ", port=" + port + ']'));
                }
                if (errs == null) {
                    errs = new GridException("Failed to connect to remote Hadoop process (is process still running?) [desc=" + desc + ", addrs=" + addr + ']');
                }
                errs.addSuppressed((Throwable)e);
                if (connectAttempts >= 2 || !(e instanceof ConnectException) && !X.hasCause((Throwable)e, (Class[])new Class[]{ConnectException.class})) break;
                ++connectAttempts;
            }
        }
        if (client == null) {
            assert (errs != null);
            if (X.hasCause(errs, (Class[])new Class[]{ConnectException.class})) {
                LT.warn((GridLogger)this.log, null, (String)("Failed to connect to a remote Hadoop process (is process still running?). Make sure operating system firewall is disabled on local and remote host) [addrs=" + addr + ", port=" + port + ']'));
            }
            throw errs;
        }
        if (this.log.isDebugEnabled()) {
            this.log.debug("Created client: " + client);
        }
        return client;
    }

    protected void notifyListener(GridHadoopProcessDescriptor desc, GridHadoopMessage msg) {
        GridHadoopMessageListener lsnr = this.lsnr;
        if (lsnr != null) {
            lsnr.onMessageReceived(desc, msg);
        } else if (this.log.isDebugEnabled()) {
            this.log.debug("Received communication message without any registered listeners (will ignore) [senderProcDesc=" + desc + ", msg=" + msg + ']');
        }
    }

    public String toString() {
        return S.toString(GridHadoopExternalCommunication.class, (Object)this);
    }

    static /* synthetic */ GridKeyLock access$1400(GridHadoopExternalCommunication x0) {
        return x0.locks;
    }

    static /* synthetic */ GridHadoopProcessDescriptor access$1500(GridHadoopExternalCommunication x0) {
        return x0.locProcDesc;
    }

    static /* synthetic */ int access$1600() {
        return HANDSHAKE_FINISH_META;
    }

    public static class ProcessHandshakeMessage
    implements GridHadoopMessage,
    Externalizable {
        private static final long serialVersionUID = 0L;
        private GridHadoopProcessDescriptor procDesc;

        public ProcessHandshakeMessage() {
        }

        private ProcessHandshakeMessage(GridHadoopProcessDescriptor procDesc) {
            this.procDesc = procDesc;
        }

        public GridHadoopProcessDescriptor processDescriptor() {
            return this.procDesc;
        }

        @Override
        public void writeExternal(ObjectOutput out) throws IOException {
            out.writeObject(this.procDesc);
        }

        @Override
        public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
            this.procDesc = (GridHadoopProcessDescriptor)in.readObject();
        }
    }

    private class HandshakeAndBackpressureFilter
    extends GridNioFilterAdapter {
        protected HandshakeAndBackpressureFilter() {
            super("HadoopHandshakeFilter");
        }

        public void onSessionOpened(final GridNioSession ses) throws GridException {
            if (ses.accepted()) {
                if (GridHadoopExternalCommunication.this.log.isDebugEnabled()) {
                    GridHadoopExternalCommunication.this.log.debug("Accepted connection, initiating handshake: " + ses);
                }
                ses.send((Object)GridHadoopExternalCommunication.this.locIdMsg).listenAsync((GridInClosure)new CI1<GridNioFuture<?>>(){

                    public void apply(GridNioFuture<?> fut) {
                        try {
                            fut.get();
                        }
                        catch (IOException | GridException e) {
                            GridHadoopExternalCommunication.this.log.warning("Failed to send handshake message, will close session: " + ses, e);
                            ses.close();
                        }
                    }
                });
            }
        }

        public void onSessionClosed(GridNioSession ses) throws GridException {
            this.proceedSessionClosed(ses);
        }

        public void onExceptionCaught(GridNioSession ses, GridException ex) throws GridException {
            this.proceedExceptionCaught(ses, ex);
        }

        public GridNioFuture<?> onSessionWrite(GridNioSession ses, Object msg) throws GridException {
            if (ses.meta(PROCESS_META) == null && !(msg instanceof ProcessHandshakeMessage)) {
                GridHadoopExternalCommunication.this.log.warning("Writing message before handshake has finished [ses=" + ses + ", msg=" + msg + ']');
            }
            return this.proceedSessionWrite(ses, msg);
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         * Unable to fully structure code
         */
        public void onMessageReceived(GridNioSession ses, Object msg) throws GridException {
            desc = (GridHadoopProcessDescriptor)ses.meta(GridHadoopExternalCommunication.access$000());
            v0 = rmtProcId = desc == null ? null : desc.processId();
            if (rmtProcId == null) {
                if (!(msg instanceof ProcessHandshakeMessage)) {
                    GridHadoopExternalCommunication.access$100(GridHadoopExternalCommunication.this).warning("Invalid handshake message received, will close connection [ses=" + ses + ", msg=" + msg + ']');
                    ses.close();
                    return;
                }
                nId = (ProcessHandshakeMessage)msg;
                if (GridHadoopExternalCommunication.access$100(GridHadoopExternalCommunication.this).isDebugEnabled()) {
                    GridHadoopExternalCommunication.access$100(GridHadoopExternalCommunication.this).debug("Received handshake message [ses=" + ses + ", msg=" + msg + ']');
                }
                ses.addMeta(GridHadoopExternalCommunication.access$000(), (Object)nId.processDescriptor());
                if (!ses.accepted()) {
                    ses.send((Object)GridHadoopExternalCommunication.access$1300(GridHadoopExternalCommunication.this));
                } else {
                    rmtProcId = nId.processDescriptor().processId();
                    if (GridHadoopExternalCommunication.access$100(GridHadoopExternalCommunication.this).isDebugEnabled()) {
                        GridHadoopExternalCommunication.access$100(GridHadoopExternalCommunication.this).debug("Finished handshake with remote client: " + ses);
                    }
                    if ((sync = GridHadoopExternalCommunication.access$1400(GridHadoopExternalCommunication.this).tryLock((Object)rmtProcId)) != null) {
                        try {
                            if (GridHadoopExternalCommunication.access$200(GridHadoopExternalCommunication.this).get(rmtProcId) == null) {
                                if (GridHadoopExternalCommunication.access$100(GridHadoopExternalCommunication.this).isDebugEnabled()) {
                                    GridHadoopExternalCommunication.access$100(GridHadoopExternalCommunication.this).debug("Will reuse session for descriptor: " + rmtProcId);
                                }
                                GridHadoopExternalCommunication.access$200(GridHadoopExternalCommunication.this).put(rmtProcId, new GridHadoopTcpNioCommunicationClient(ses));
                            }
                            if (!GridHadoopExternalCommunication.access$100(GridHadoopExternalCommunication.this).isDebugEnabled()) ** GOTO lbl37
                            GridHadoopExternalCommunication.access$100(GridHadoopExternalCommunication.this).debug("Will not reuse client as another already exists [locProcDesc=" + GridHadoopExternalCommunication.access$1500(GridHadoopExternalCommunication.this) + ", desc=" + desc + ']');
                        }
                        finally {
                            GridHadoopExternalCommunication.access$1400(GridHadoopExternalCommunication.this).unlock((Object)rmtProcId, sync);
                        }
                    } else if (GridHadoopExternalCommunication.access$100(GridHadoopExternalCommunication.this).isDebugEnabled()) {
                        GridHadoopExternalCommunication.access$100(GridHadoopExternalCommunication.this).debug("Concurrent connection is being established, will not reuse client session [locProcDesc=" + GridHadoopExternalCommunication.access$1500(GridHadoopExternalCommunication.this) + ", desc=" + desc + ']');
                    }
                }
lbl37:
                // 7 sources

                if (GridHadoopExternalCommunication.access$100(GridHadoopExternalCommunication.this).isDebugEnabled()) {
                    GridHadoopExternalCommunication.access$100(GridHadoopExternalCommunication.this).debug("Handshake is finished for session [ses=" + ses + ", locProcDesc=" + GridHadoopExternalCommunication.access$1500(GridHadoopExternalCommunication.this) + ']');
                }
                if ((to = (HandshakeFinish)ses.meta(GridHadoopExternalCommunication.access$1600())) != null) {
                    to.finish();
                }
                this.proceedSessionOpened(ses);
            } else {
                if (GridHadoopExternalCommunication.access$400(GridHadoopExternalCommunication.this) > 0) {
                    tracker = (GridNioMessageTracker)ses.meta(GridHadoopExternalCommunication.access$500());
                    if (tracker == null) {
                        tracker = new GridNioMessageTracker(ses, GridHadoopExternalCommunication.access$400(GridHadoopExternalCommunication.this));
                        old = (GridNioMessageTracker)ses.addMeta(GridHadoopExternalCommunication.access$500(), (Object)tracker);
                        if (!HandshakeAndBackpressureFilter.$assertionsDisabled && old != null) {
                            throw new AssertionError();
                        }
                    }
                    tracker.onMessageReceived();
                }
                this.proceedMessageReceived(ses, msg);
            }
        }

        public GridNioFuture<Boolean> onSessionClose(GridNioSession ses) throws GridException {
            return this.proceedSessionClose(ses);
        }

        public void onSessionIdleTimeout(GridNioSession ses) throws GridException {
            this.proceedSessionIdleTimeout(ses);
        }

        public void onSessionWriteTimeout(GridNioSession ses) throws GridException {
            this.proceedSessionWriteTimeout(ses);
        }
    }

    private static class HandshakeFinish {
        private CountDownLatch latch = new CountDownLatch(1);

        private HandshakeFinish() {
        }

        public void finish() {
            this.latch.countDown();
        }

        public void await(long time) throws GridHadoopHandshakeTimeoutException {
            try {
                if (!this.latch.await(time, TimeUnit.MILLISECONDS)) {
                    throw new GridHadoopHandshakeTimeoutException("Failed to wait for handshake to finish [timeout=" + time + ']');
                }
            }
            catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                throw new GridHadoopHandshakeTimeoutException("Failed to wait for handshake to finish (thread was interrupted) [timeout=" + time + ']', e);
            }
        }
    }

    private class ShmemWorker
    extends GridWorker {
        private final GridIpcEndpoint endpoint;
        private GridHadoopIpcToNioAdapter<GridHadoopMessage> adapter;

        private ShmemWorker(GridIpcEndpoint endpoint, boolean accepted) {
            super(GridHadoopExternalCommunication.this.gridName, "shmem-worker", log);
            this.endpoint = endpoint;
            this.adapter = new GridHadoopIpcToNioAdapter(GridHadoopExternalCommunication.this.log, endpoint, accepted, GridHadoopExternalCommunication.this.srvLsnr, GridHadoopExternalCommunication.this.filters());
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        protected void body() throws InterruptedException {
            try {
                this.adapter.serve();
            }
            finally {
                GridHadoopExternalCommunication.this.shmemWorkers.remove((Object)this);
                this.endpoint.close();
            }
        }

        public void cancel() {
            super.cancel();
            this.endpoint.close();
        }

        protected void cleanup() {
            super.cleanup();
            this.endpoint.close();
        }

        public String toString() {
            return S.toString(ShmemWorker.class, (Object)((Object)this));
        }

        public GridNioSession session() {
            return this.adapter.session();
        }
    }

    private class ShmemAcceptWorker
    extends GridWorker {
        private final GridIpcSharedMemoryServerEndpoint srv;

        ShmemAcceptWorker(GridIpcSharedMemoryServerEndpoint srv) {
            super(GridHadoopExternalCommunication.this.gridName, "shmem-communication-acceptor", log);
            this.srv = srv;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        protected void body() throws InterruptedException {
            try {
                while (!Thread.interrupted()) {
                    ShmemWorker e = new ShmemWorker(this.srv.accept(), true);
                    GridHadoopExternalCommunication.this.shmemWorkers.add(e);
                    new GridThread((GridWorker)e).start();
                }
            }
            catch (GridException e) {
                if (!this.isCancelled()) {
                    U.error((GridLogger)log, (Object)"Shmem server failed.", (Throwable)e);
                }
            }
            finally {
                this.srv.close();
            }
        }

        public void cancel() {
            super.cancel();
            this.srv.close();
        }
    }
}

