/*
 * Decompiled with CFR 0.152.
 */
package water;

import java.io.EOFException;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.nio.channels.AsynchronousCloseException;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.nio.channels.spi.AbstractInterruptibleChannel;
import water.AutoBuffer;
import water.H2O;
import water.H2ONode;
import water.MemoryManager;
import water.RPC;
import water.TimeLine;
import water.UDP;
import water.UDPReceiverThread;
import water.util.Log;

public class TCPReceiverThread
extends Thread {
    private ServerSocketChannel SOCK;

    public TCPReceiverThread(ServerSocketChannel sock) {
        super("TCP-Accept");
        this.SOCK = sock;
    }

    @Override
    public void run() {
        Thread.currentThread().setPriority(10);
        AbstractInterruptibleChannel errsock = null;
        boolean saw_error = false;
        while (true) {
            try {
                byte chanType;
                while (true) {
                    if (errsock != null) {
                        AbstractInterruptibleChannel tmp2 = errsock;
                        errsock = null;
                        tmp2.close();
                    }
                    if (saw_error) {
                        Thread.sleep(100L);
                    }
                    saw_error = false;
                    if (this.SOCK == null) {
                        this.SOCK = ServerSocketChannel.open();
                        this.SOCK.socket().setReceiveBufferSize(AutoBuffer.BBP_BIG.size());
                        this.SOCK.socket().bind(H2O.SELF._key);
                    }
                    SocketChannel sock = this.SOCK.accept();
                    ByteBuffer bb = ByteBuffer.allocate(4).order(ByteOrder.nativeOrder());
                    bb.limit(bb.capacity());
                    bb.position(0);
                    while (bb.hasRemaining()) {
                        sock.read(bb);
                    }
                    bb.flip();
                    chanType = bb.get();
                    char port = bb.getChar();
                    int sentinel = 0xFF & bb.get();
                    if (sentinel != 239) {
                        throw H2O.fail("missing eom sentinel when opening new tcp channel");
                    }
                    H2ONode h2o = H2ONode.intern(sock.socket().getInetAddress(), (int)port);
                    if (chanType == 1) {
                        Log.info("starting new UDP-TCP receiver thread connected to " + sock.getRemoteAddress());
                        new UDP_TCP_ReaderThread(h2o, sock).start();
                        continue;
                    }
                    if (chanType != 2) break;
                    new TCPReaderThread(sock, new AutoBuffer(sock)).start();
                }
                throw H2O.fail("unexpected channel type " + chanType + ", only know 1 - Small and 2 - Big");
            }
            catch (AsynchronousCloseException ex) {
            }
            catch (Exception e) {
                e.printStackTrace();
                Log.err("IO error on TCP port " + H2O.H2O_PORT + ": ", e);
                saw_error = true;
                errsock = this.SOCK;
                this.SOCK = null;
                continue;
            }
            break;
        }
    }

    static class TCPReaderThread
    extends Thread {
        public SocketChannel _sock;
        public AutoBuffer _ab;

        public TCPReaderThread(SocketChannel sock, AutoBuffer ab) {
            super("TCP-" + ab._h2o + "-" + ab._h2o._tcp_readers++);
            this._sock = sock;
            this._ab = ab;
            this.setPriority(9);
        }

        @Override
        public void run() {
            while (true) {
                try {
                    int ctrl;
                    this._ab._h2o._last_heard_from = System.currentTimeMillis();
                    TimeLine.record_recv(this._ab, true, 0);
                    int x = ctrl = this._ab.getCtrl();
                    if (ctrl < 0 || ctrl >= UDP.udp.UDPS.length) {
                        x = 0;
                    }
                    switch (UDP.udp.UDPS[x]) {
                        case exec: {
                            RPC.remote_exec(this._ab);
                            break;
                        }
                        case ack: {
                            RPC.tcp_ack(this._ab);
                            break;
                        }
                        case timeline: {
                            TimeLine.tcp_call(this._ab);
                            break;
                        }
                        default: {
                            throw new RuntimeException("Unknown TCP Type: " + ctrl + " " + this._ab._h2o);
                        }
                    }
                }
                catch (AsynchronousCloseException ex) {
                    break;
                }
                catch (Throwable e) {
                    System.err.println("IO error");
                    e.printStackTrace();
                    Log.err("IO error on TCP port " + H2O.H2O_PORT + ": ", e);
                    break;
                }
                try {
                    if (!this._sock.isOpen()) break;
                    this._ab = new AutoBuffer(this._sock);
                }
                catch (Exception e) {
                    break;
                }
            }
        }
    }

    static class UDP_TCP_ReaderThread
    extends Thread {
        private final SocketChannel _chan;
        private final ByteBuffer _bb;
        private final H2ONode _h2o;

        public UDP_TCP_ReaderThread(H2ONode h2o, SocketChannel chan) {
            super("UDP-TCP-READ-" + h2o);
            this._h2o = h2o;
            this._chan = chan;
            this._bb = ByteBuffer.allocateDirect(AutoBuffer.BBP_BIG.size()).order(ByteOrder.nativeOrder());
        }

        public String printBytes(ByteBuffer bb, int start, int sz) {
            StringBuilder sb = new StringBuilder();
            int idx = start + sz;
            try {
                int i;
                for (i = 5; i > 0; --i) {
                    sb.append("-" + i + ":" + (0xFF & bb.get(idx - i)) + " ");
                }
                sb.append("0: " + (0xFF & bb.get(idx)) + " ");
                for (i = 1; i <= 5; ++i) {
                    sb.append("+" + i + ":" + (0xFF & bb.get(idx + i)) + " ");
                }
            }
            catch (Throwable t) {
                // empty catch block
            }
            return sb.toString();
        }

        private int read(int n) throws IOException {
            int sizeRead;
            int res;
            if (this._bb.remaining() < n) {
                throw new IllegalStateException("Reading more bytes than available, reading " + n + " bytes, remaining = " + this._bb.remaining());
            }
            for (sizeRead = 0; sizeRead < n; sizeRead += res) {
                res = this._chan.read(this._bb);
                if (res == -1) {
                    throw new EOFException("Reading " + n + " bytes, AB=" + this);
                }
                if (res != 0) continue;
                throw new RuntimeException("Reading zero bytes - so no progress?");
            }
            return sizeRead;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void run() {
            int start = 0;
            boolean idle = true;
            try {
                while (true) {
                    idle = true;
                    this._h2o._last_heard_from = System.currentTimeMillis();
                    if (start > this._bb.position() - 2) {
                        this.read(start + 2 - this._bb.position());
                    }
                    idle = false;
                    int sz = (0xFF & this._bb.get(start + 1)) << 8 | 0xFF & this._bb.get(start);
                    assert (sz < AutoBuffer.BBP_SML.size()) : "Incoming message is too big, should've been sent by TCP-BIG, got " + sz + " bytes, start = " + start;
                    this.read(start + 2 + sz + 1 - this._bb.position());
                    if ((0xFF & this._bb.get(start + 2 + sz)) != 239) {
                        H2O.fail("Missing expected sentinel (0xef==239) at the end of the message from " + this._h2o + ", likely out of sync, start = " + start + ", size = " + sz + ", position = " + this._bb.position() + ", bytes = " + this.printBytes(this._bb, start, sz));
                    }
                    byte[] ary = MemoryManager.malloc1(Math.max(16, sz));
                    if (this._bb.hasArray()) {
                        System.arraycopy(this._bb.array(), start + 2, ary, 0, sz);
                    } else {
                        int pos = this._bb.position();
                        this._bb.position(start + 2);
                        this._bb.get(ary, 0, sz);
                        this._bb.position(pos);
                    }
                    UDPReceiverThread.basic_packet_handling(new AutoBuffer(this._h2o, ary));
                    start += sz + 2 + 1;
                    if (this._bb.remaining() >= AutoBuffer.BBP_SML.size() + 2 + 1) continue;
                    this._bb.limit(this._bb.position());
                    this._bb.position(start);
                    this._bb.compact();
                    start = 0;
                }
            }
            catch (IOException ioe) {
                if (!idle) {
                    Log.err("Got IO Error when reading small messages over TCP");
                    Log.err(ioe);
                }
                AutoBuffer.BBP_BIG.free(this._bb);
                if (this._chan != null && this._chan.isOpen()) {
                    try {
                        this._chan.close();
                    }
                    catch (IOException e) {}
                }
            }
            catch (Throwable t) {
                try {
                    t.printStackTrace();
                    Log.err("unexpected error in UDP-TCP thread.");
                    Log.err(t);
                }
                catch (Throwable throwable) {
                    throw throwable;
                }
                finally {
                    AutoBuffer.BBP_BIG.free(this._bb);
                    if (this._chan != null && this._chan.isOpen()) {
                        try {
                            this._chan.close();
                        }
                        catch (IOException iOException) {}
                    }
                }
            }
        }
    }
}

