/*
 * Decompiled with CFR 0.152.
 */
package water;

import java.io.IOException;
import java.net.InetAddress;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.nio.channels.AsynchronousCloseException;
import java.nio.channels.ByteChannel;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.nio.channels.spi.AbstractInterruptibleChannel;
import java.util.Date;
import java.util.Random;
import water.AutoBuffer;
import water.ExternalFrameHandlerThread;
import water.FJPacket;
import water.H2O;
import water.H2ONode;
import water.MemoryManager;
import water.RPC;
import water.TimeLine;
import water.UDP;
import water.UDPRebooted;
import water.network.SocketChannelFactory;
import water.util.Log;
import water.util.SB;

public class TCPReceiverThread
extends Thread {
    private ServerSocketChannel SOCK;
    private SocketChannelFactory socketChannelFactory;
    static final byte TCP_SMALL = 1;
    static final byte TCP_BIG = 2;
    static final byte TCP_EXTERNAL = 3;
    private static int _unknown_packets_per_sec = 0;
    private static long _unknown_packet_time = 0L;
    static final Random RANDOM_UDP_DROP = new Random();

    public TCPReceiverThread(ServerSocketChannel sock) {
        super("TCP-Accept");
        this.SOCK = sock;
        this.socketChannelFactory = H2O.SELF.getSocketFactory();
    }

    @Override
    public void run() {
        Thread.currentThread().setPriority(10);
        AbstractInterruptibleChannel errsock = null;
        boolean saw_error = false;
        block8: while (true) {
            try {
                if (errsock != null) {
                    AbstractInterruptibleChannel tmp2 = errsock;
                    errsock = null;
                    tmp2.close();
                }
                if (saw_error) {
                    Thread.sleep(100L);
                }
                saw_error = false;
                if (this.SOCK == null) {
                    this.SOCK = ServerSocketChannel.open();
                    this.SOCK.socket().setReceiveBufferSize(AutoBuffer.BBP_BIG._size);
                    this.SOCK.socket().bind(H2O.SELF._key);
                }
                SocketChannel sock = this.SOCK.accept();
                ByteBuffer bb = ByteBuffer.allocate(6).order(ByteOrder.nativeOrder());
                ByteChannel wrappedSocket = this.socketChannelFactory.serverChannel(sock);
                bb.limit(bb.capacity());
                bb.position(0);
                while (bb.hasRemaining()) {
                    wrappedSocket.read(bb);
                }
                bb.flip();
                byte chanType = bb.get();
                short timestamp = bb.getShort();
                char port = bb.getChar();
                int sentinel = 0xFF & bb.get();
                if (sentinel != 239) {
                    if (H2O.SELF.getSecurityManager().securityEnabled) {
                        throw new IOException("Missing EOM sentinel when opening new SSL tcp channel.");
                    }
                    throw H2O.fail("missing eom sentinel when opening new tcp channel");
                }
                InetAddress inetAddress = sock.socket().getInetAddress();
                switch (chanType) {
                    case 1: {
                        new UDP_TCP_ReaderThread(H2ONode.intern(inetAddress, port, timestamp), wrappedSocket).start();
                        continue block8;
                    }
                    case 2: {
                        new TCPReaderThread(wrappedSocket, new AutoBuffer(wrappedSocket, inetAddress, timestamp), inetAddress, timestamp).start();
                        continue block8;
                    }
                    case 3: {
                        new ExternalFrameHandlerThread(wrappedSocket, new AutoBuffer(wrappedSocket)).start();
                        continue block8;
                    }
                }
                throw H2O.fail("unexpected channel type " + chanType + ", only know 1 - Small, 2 - Big and 3 - ExternalFrameHandling");
            }
            catch (AsynchronousCloseException ex) {
            }
            catch (Exception e) {
                e.printStackTrace();
                Log.err("IO error on TCP port " + H2O.H2O_PORT + ": ", e);
                saw_error = true;
                errsock = this.SOCK;
                this.SOCK = null;
                continue;
            }
            break;
        }
    }

    public static void basic_packet_handling(AutoBuffer ab) throws IOException {
        int drop = H2O.ARGS.random_udp_drop && RANDOM_UDP_DROP.nextInt(5) == 0 ? 2 : 0;
        TimeLine.record_recv(ab, false, drop);
        long now = ab._h2o._last_heard_from = System.currentTimeMillis();
        int ctrl = ab.getCtrl();
        ab.getPort();
        if (ctrl == UDP.udp.timeline.ordinal()) {
            UDP.udp.timeline._udp.call(ab);
            return;
        }
        if (ctrl == UDP.udp.rebooted.ordinal()) {
            UDPRebooted.checkForSuicide(ctrl, ab);
        }
        if (drop != 0) {
            return;
        }
        H2O cloud = H2O.CLOUD;
        boolean is_member = cloud.contains(ab._h2o);
        boolean is_client = ab._h2o._client;
        if (!(UDP.udp.UDPS[ctrl]._paxos || is_member || is_client)) {
            ++_unknown_packets_per_sec;
            long timediff = ab._h2o._last_heard_from - _unknown_packet_time;
            if (timediff > 1000L) {
                if (!H2O.ARGS.client || now - H2O.START_TIME_MILLIS.get() >= 1000L) {
                    Log.warn("UDP packets from outside the cloud: " + _unknown_packets_per_sec + "/sec, last one from " + ab._h2o + " @ " + new Date());
                }
                _unknown_packets_per_sec = 0;
                _unknown_packet_time = ab._h2o._last_heard_from;
            }
            ab.close();
            return;
        }
        H2O.submitTask(new FJPacket(ab, ctrl));
    }

    static class UDP_TCP_ReaderThread
    extends Thread {
        private final ByteChannel _chan;
        private final ByteBuffer _bb;
        private final H2ONode _h2o;

        public UDP_TCP_ReaderThread(H2ONode h2o, ByteChannel chan) {
            super("UDP-TCP-READ-" + h2o);
            this._h2o = h2o;
            this._chan = chan;
            this._bb = ByteBuffer.allocateDirect(AutoBuffer.BBP_BIG._size).order(ByteOrder.nativeOrder());
            this._bb.flip();
        }

        public String printBytes(ByteBuffer bb, int start, int sz) {
            SB sb = new SB();
            int idx = start + sz;
            try {
                int i;
                for (i = 5; i > 0; --i) {
                    sb.p("-").p(i).p(":").p(0xFF & bb.get(idx - i)).p(" ");
                }
                sb.p("0: ").p(0xFF & bb.get(idx)).p(" ");
                for (i = 1; i <= 5; ++i) {
                    sb.p("+").p(i).p(":").p(0xFF & bb.get(idx + i)).p(" ");
                }
            }
            catch (Throwable throwable) {
                // empty catch block
            }
            return sb.toString();
        }

        private ByteBuffer read(int n) throws IOException {
            if (this._bb.remaining() < n) {
                this._bb.compact();
                while (this._bb.position() < n) {
                    int res = this._chan.read(this._bb);
                    if (res <= 0) {
                        throw new IOException("Didn't read any data: res=" + res);
                    }
                    this._h2o._last_heard_from = System.currentTimeMillis();
                }
                this._bb.flip();
            }
            return this._bb;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void run() {
            assert (!this._bb.hasArray());
            boolean idle = false;
            try {
                try {
                    while (true) {
                        idle = true;
                        char sz = this.read(2).getChar();
                        idle = false;
                        assert (sz < AutoBuffer.BBP_SML._size) : "Incoming message is too big, should've been sent by TCP-BIG, got " + sz + " bytes";
                        byte[] ary = MemoryManager.malloc1(Math.max(16, sz));
                        byte sentinel = this.read(sz + '\u0001').get(ary, 0, sz).get();
                        assert ((0xFF & sentinel) == 239) : "Missing expected sentinel (0xef) at the end of the message from " + this._h2o + ", likely out of sync, size = " + sz + ", position = " + this._bb.position() + ", bytes = " + this.printBytes(this._bb, this._bb.position(), sz);
                        TCPReceiverThread.basic_packet_handling(new AutoBuffer(this._h2o, ary, 0, sz));
                    }
                }
                catch (Throwable t) {
                    if (!idle || !(t instanceof IOException)) {
                        t.printStackTrace();
                        Log.err(t);
                    }
                    AutoBuffer.BBP_BIG.free(this._bb);
                    if (this._chan != null && this._chan.isOpen()) {
                        try {
                            this._chan.close();
                        }
                        catch (IOException iOException) {}
                    }
                }
            }
            catch (Throwable throwable) {
                AutoBuffer.BBP_BIG.free(this._bb);
                if (this._chan != null && this._chan.isOpen()) {
                    try {
                        this._chan.close();
                    }
                    catch (IOException iOException) {
                        // empty catch block
                    }
                }
                throw throwable;
            }
        }
    }

    static class TCPReaderThread
    extends Thread {
        public ByteChannel _sock;
        public AutoBuffer _ab;
        private final InetAddress _address;
        private final short _timestamp;

        public TCPReaderThread(ByteChannel sock, AutoBuffer ab, InetAddress address, short timestamp) {
            super("TCP-" + ab._h2o + "-" + ab._h2o._tcp_readers++);
            this._sock = sock;
            this._ab = ab;
            this._address = address;
            this._timestamp = timestamp;
            this.setPriority(9);
        }

        @Override
        public void run() {
            while (true) {
                try {
                    int ctrl;
                    this._ab._h2o._last_heard_from = System.currentTimeMillis();
                    TimeLine.record_recv(this._ab, true, 0);
                    int x = ctrl = this._ab.getCtrl();
                    if (ctrl < 0 || ctrl >= UDP.udp.UDPS.length) {
                        x = 0;
                    }
                    switch (UDP.udp.UDPS[x]) {
                        case exec: {
                            RPC.remote_exec(this._ab);
                            break;
                        }
                        case ack: {
                            RPC.tcp_ack(this._ab);
                            break;
                        }
                        case timeline: {
                            TimeLine.tcp_call(this._ab);
                            break;
                        }
                        default: {
                            throw new RuntimeException("Unknown TCP Type: " + ctrl + " " + this._ab._h2o);
                        }
                    }
                }
                catch (AsynchronousCloseException ex) {
                    break;
                }
                catch (Throwable e) {
                    System.err.println("IO error");
                    e.printStackTrace();
                    Log.err("IO error on TCP port " + H2O.H2O_PORT + ": ", e);
                    break;
                }
                try {
                    if (!this._sock.isOpen()) break;
                    this._ab = new AutoBuffer(this._sock, this._address, this._timestamp);
                }
                catch (Exception e) {
                    break;
                }
            }
        }
    }
}

