/*
 * Decompiled with CFR 0.152.
 */
package com.alibaba.polardbx.rpc.net;

import com.alibaba.polardbx.common.exception.TddlRuntimeException;
import com.alibaba.polardbx.common.utils.GeneralUtil;
import com.alibaba.polardbx.rpc.XConfig;
import com.alibaba.polardbx.rpc.XLog;
import com.alibaba.polardbx.rpc.client.XClient;
import com.alibaba.polardbx.rpc.net.ErrorCode;
import com.alibaba.polardbx.rpc.net.NIOConnection;
import com.alibaba.polardbx.rpc.net.NIOProcessor;
import com.alibaba.polardbx.rpc.packet.XPacket;
import com.alibaba.polardbx.rpc.perf.TcpPerfItem;
import com.alibaba.polardbx.rpc.pool.XConnectionManager;
import com.alibaba.polardbx.rpc.utils.TimerThread;
import com.google.protobuf.CodedOutputStream;
import com.google.protobuf.GeneratedMessageV3;
import com.mysql.cj.polarx.protobuf.PolarxConnection;
import com.mysql.cj.polarx.protobuf.PolarxNotice;
import com.mysql.cj.polarx.protobuf.PolarxPhysicalBackfill;
import com.mysql.cj.polarx.protobuf.PolarxResultset;
import com.mysql.cj.polarx.protobuf.PolarxSession;
import com.mysql.cj.polarx.protobuf.PolarxSql;
import com.mysql.cj.x.protobuf.Polarx;
import com.mysql.cj.x.protobuf.PolarxExecPlan;
import java.io.IOException;
import java.net.Socket;
import java.net.SocketAddress;
import java.net.SocketOption;
import java.net.StandardSocketOptions;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Consumer;

public class NIOClient
implements NIOConnection {
    private static final int socketRecvBuffer = 65536;
    private static final int socketSendBuffer = 32768;
    private volatile SocketChannel channel = null;
    private NIOProcessor processor = null;
    private SelectionKey processKey = null;
    private final ReentrantLock keyLock = new ReentrantLock();
    private ByteBuffer readBuffer = null;
    private ByteBuffer bigReadBuffer = null;
    private final ReentrantLock readLock = new ReentrantLock();
    private final AtomicBoolean bigBufferMark = new AtomicBoolean(false);
    private final AtomicBoolean bigBufferClean = new AtomicBoolean(false);
    private final Queue<ByteBuffer> writeQueue = new ConcurrentLinkedQueue<ByteBuffer>();
    private ByteBuffer lastWrite = null;
    private final ReentrantLock writeLock = new ReentrantLock();
    private volatile boolean isRegistered = false;
    private final AtomicBoolean isClosed = new AtomicBoolean(false);
    private final Consumer<Collection<XPacket>> consumer;
    private final Consumer<Throwable> fatalCallback;
    private final XClient client;
    private final AtomicLong lastSend = new AtomicLong(0L);
    private final AtomicLong lastRecv = new AtomicLong(0L);

    public NIOClient(Consumer<Collection<XPacket>> consumer, Consumer<Throwable> fatalCallback, XClient client) {
        this.consumer = consumer;
        this.fatalCallback = fatalCallback;
        this.client = client;
    }

    public void setProcessor(NIOProcessor processor) {
        this.processor = processor;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void fillTcpPhysicalInfo(TcpPerfItem item) {
        item.setSocketSendBufferSize(32768L);
        item.setSocketRecvBufferSize(65536L);
        this.readLock.lock();
        try {
            item.setReadDirectBuffers(null == this.readBuffer ? 0L : 1L);
            item.setReadHeapBuffers(null == this.bigReadBuffer ? 0L : 1L);
        }
        finally {
            this.readLock.unlock();
        }
        this.writeLock.lock();
        try {
            long directCount = 0L;
            long heapCount = 0L;
            for (ByteBuffer buf : this.writeQueue) {
                if (buf == null) continue;
                if (buf.isDirect()) {
                    ++directCount;
                    continue;
                }
                ++heapCount;
            }
            item.setWriteDirectBuffers(directCount);
            item.setWriteHeapBuffers(heapCount);
        }
        finally {
            this.writeLock.unlock();
        }
        item.setReactorRegistered(this.isRegistered);
        item.setSocketClosed(this.isClosed.get());
    }

    public synchronized void connect(SocketAddress address, int timeout) throws IOException {
        if (this.isClosed.get()) {
            throw new TddlRuntimeException(com.alibaba.polardbx.common.exception.code.ErrorCode.ERR_X_PROTOCOL_CLIENT, new String[]{this + " closed."});
        }
        if (this.channel != null) {
            throw new TddlRuntimeException(com.alibaba.polardbx.common.exception.code.ErrorCode.ERR_X_PROTOCOL_CLIENT, new String[]{this + " already connected."});
        }
        long startNanos = System.nanoTime();
        SocketChannel c = SocketChannel.open();
        try {
            c.setOption((SocketOption)StandardSocketOptions.TCP_NODELAY, (Object)true);
            c.socket().connect(address, timeout);
            if (c.socket().getSendBufferSize() < 32768) {
                c.socket().setSendBufferSize(32768);
            }
            if (c.socket().getReceiveBufferSize() < 65536) {
                c.socket().setReceiveBufferSize(65536);
            }
            c.configureBlocking(false);
            this.channel = c;
            long endNanos = System.nanoTime();
            XLog.XLogLogger.info(this + " connect success, connect time: " + (endNanos - startNanos) / 1000L + "us.");
            this.lastSend.set(endNanos);
            this.lastRecv.set(endNanos);
        }
        catch (Throwable e) {
            long endNanos = System.nanoTime();
            XLog.XLogLogger.error(this + " connect error with timeout " + timeout + "ms. real:" + (endNanos - startNanos) / 1000L + "us.");
            XLog.XLogLogger.error(e);
            this.channel = null;
            try {
                Socket socket = c.socket();
                if (socket != null) {
                    socket.close();
                }
            }
            catch (Throwable throwable) {
                // empty catch block
            }
            try {
                c.close();
            }
            catch (Throwable throwable) {
                // empty catch block
            }
            throw e;
        }
    }

    public long idleTime() {
        long snd = this.lastSend.get();
        long rcv = this.lastRecv.get();
        long nowNanos = TimerThread.fastNanos();
        return Math.max(0L, Math.min(nowNanos - snd, nowNanos - rcv));
    }

    public boolean isPending() {
        return null == this.channel;
    }

    public boolean isValid() {
        SocketChannel c = this.channel;
        return c != null && c.isOpen() && this.isRegistered;
    }

    private void clearSelectionKey() {
        ReentrantLock lock = this.keyLock;
        lock.lock();
        try {
            SelectionKey key = this.processKey;
            if (key != null && key.isValid()) {
                key.attach(null);
                key.cancel();
                this.processor.getReactor().getPerfCollection().getSocketCount().getAndDecrement();
            }
        }
        catch (Throwable throwable) {
        }
        finally {
            lock.unlock();
        }
    }

    private boolean closeSocket() {
        boolean channelClosed;
        XLog.XLogLogger.info(this + " close socket.");
        this.clearSelectionKey();
        SocketChannel c = this.channel;
        boolean socketClosed = true;
        if (c != null) {
            Socket socket = this.channel.socket();
            if (socket != null) {
                try {
                    socket.close();
                }
                catch (Throwable throwable) {
                    // empty catch block
                }
                socketClosed = socket.isClosed();
            }
            try {
                c.close();
            }
            catch (Throwable throwable) {
                // empty catch block
            }
            channelClosed = !c.isOpen();
        } else {
            channelClosed = true;
        }
        return socketClosed && channelClosed;
    }

    private void cleanup() {
        this.readLock.lock();
        try {
            if (this.readBuffer != null) {
                this.processor.getBufferPool().recycle(this.readBuffer);
                this.readBuffer = null;
            }
        }
        finally {
            this.readLock.unlock();
        }
        this.writeLock.lock();
        try {
            ByteBuffer top;
            while ((top = this.writeQueue.poll()) != null) {
                this.processor.getBufferPool().recycle(top);
            }
            this.lastWrite = null;
        }
        finally {
            this.writeLock.unlock();
        }
    }

    @Override
    public synchronized void close() {
        if (!this.isClosed.get()) {
            this.closeSocket();
            if (this.isClosed.compareAndSet(false, true)) {
                this.cleanup();
            }
        }
    }

    @Override
    public synchronized void register(Selector selector) throws IOException {
        try {
            this.processKey = this.channel.register(selector, 1, this);
            this.processor.getReactor().getPerfCollection().getSocketCount().getAndIncrement();
            this.isRegistered = true;
        }
        finally {
            if (this.isClosed.get()) {
                this.clearSelectionKey();
            }
        }
    }

    public void shrinkBuffer() {
        if (!this.bigBufferMark.compareAndSet(true, false)) {
            this.bigBufferClean.set(true);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     * Unable to fully structure code
     */
    @Override
    public void read() throws IOException {
        this.lastRecv.set(TimerThread.fastNanos());
        batch = new ArrayList<XPacket>(32);
        this.readLock.lock();
        try {
            if (this.isClosed.get()) {
                throw new TddlRuntimeException(com.alibaba.polardbx.common.exception.code.ErrorCode.ERR_X_PROTOCOL_CLIENT, new String[]{this + " closed."});
            }
            if (null == this.readBuffer) {
                this.readBuffer = this.processor.getBufferPool().allocate();
            }
            if ((got = this.channel.read(recvBuffer = this.bigReadBuffer != null ? this.bigReadBuffer : this.readBuffer)) < 0) {
                throw new TddlRuntimeException(com.alibaba.polardbx.common.exception.code.ErrorCode.ERR_X_PROTOCOL_CLIENT, new String[]{this + " EOF."});
            }
            if (this.client != null) {
                this.client.getPerfCollection().getRecvNetCount().getAndIncrement();
                this.client.getPerfCollection().getRecvSize().getAndAdd(got);
                this.client.getPool().getPerfCollection().getRecvNetCount().getAndIncrement();
                this.client.getPool().getPerfCollection().getRecvSize().getAndAdd(got);
            }
            maxPacketSize = XConnectionManager.getInstance().getMaxPacketSize();
            headerSize = XConfig.GALAXY_X_PROTOCOL != false ? 14 : 13;
            lengthOffset = XConfig.GALAXY_X_PROTOCOL != false ? 9 : 8;
            recvBuffer.flip();
lbl25:
            // 22 sources

            block27: while (recvBuffer.remaining() >= headerSize) {
                packetSize = recvBuffer.getInt(recvBuffer.position() + lengthOffset) - 1;
                if (packetSize < 0 || (long)packetSize > maxPacketSize) {
                    throw new TddlRuntimeException(com.alibaba.polardbx.common.exception.code.ErrorCode.ERR_X_PROTOCOL_BAD_PACKET, new String[]{this + ". Received packet length (" + packetSize + ") exceeds the allowed maximum (" + maxPacketSize + ")."});
                }
                if (headerSize + packetSize > this.readBuffer.capacity()) {
                    this.bigBufferMark.set(true);
                    this.bigBufferClean.set(false);
                }
                if (recvBuffer.remaining() >= headerSize + packetSize) {
                    sid = recvBuffer.getLong();
                    if (XConfig.GALAXY_X_PROTOCOL && (version = (long)recvBuffer.get()) != 0L) {
                        throw new TddlRuntimeException(com.alibaba.polardbx.common.exception.code.ErrorCode.ERR_X_PROTOCOL_BAD_PACKET, new String[]{this + ". Received packet version (" + version + ") unexpected."});
                    }
                    recvBuffer.position(recvBuffer.position() + 4);
                    type = recvBuffer.get();
                    nextPos = recvBuffer.position() + packetSize;
                    oldLimit = recvBuffer.limit();
                    recvBuffer.limit(nextPos);
                    try {
                        RDS80_REBASE = 81;
                        switch (type) {
                            case 0: {
                                batch.add(new XPacket(sid, type, Polarx.Ok.parseFrom(recvBuffer), packetSize));
                                ** break;
                            }
                            case 1: {
                                batch.add(new XPacket(sid, type, Polarx.Error.parseFrom(recvBuffer), packetSize));
                                ** break;
                            }
                            case 2: {
                                batch.add(new XPacket(sid, type, PolarxConnection.Capabilities.parseFrom(recvBuffer), packetSize));
                                ** break;
                            }
                            case 3: {
                                batch.add(new XPacket(sid, type, PolarxSession.AuthenticateContinue.parseFrom(recvBuffer), packetSize));
                                ** break;
                            }
                            case 4: {
                                batch.add(new XPacket(sid, type, PolarxSession.AuthenticateOk.parseFrom(recvBuffer), packetSize));
                                ** break;
                            }
                            case 11: {
                                batch.add(new XPacket(sid, type, PolarxNotice.Frame.parseFrom(recvBuffer), packetSize));
                                ** break;
                            }
                            case 12: {
                                if (XConfig.GALAXY_X_PROTOCOL) {
                                    batch.add(new XPacket(sid, type, PolarxResultset.ColumnMetaDataCompatible.parseFrom(recvBuffer), packetSize));
                                    ** break;
                                }
                                batch.add(new XPacket(sid, type, PolarxResultset.ColumnMetaData.parseFrom(recvBuffer), packetSize));
                                ** break;
                            }
                            case 13: {
                                batch.add(new XPacket(sid, type, PolarxResultset.Row.parseFrom(recvBuffer), packetSize));
                                ** break;
                            }
                            case 14: {
                                batch.add(new XPacket(sid, type, PolarxResultset.FetchDone.parseFrom(recvBuffer), packetSize));
                                ** break;
                            }
                            case 16: {
                                batch.add(new XPacket(sid, type, PolarxResultset.FetchDoneMoreResultsets.parseFrom(recvBuffer), packetSize));
                                ** break;
                            }
                            case 17: {
                                batch.add(new XPacket(sid, type, PolarxSql.StmtExecuteOk.parseFrom(recvBuffer), packetSize));
                                ** break;
                            }
                            case 19: 
                            case 100: {
                                batch.add(new XPacket(sid, 19, PolarxResultset.TokenDone.parseFrom(recvBuffer), packetSize));
                                ** break;
                            }
                            case 20: 
                            case 101: {
                                batch.add(new XPacket(sid, 20, PolarxExecPlan.ResultTSO.parseFrom(recvBuffer), packetSize));
                                ** break;
                            }
                            case 21: 
                            case 102: {
                                batch.add(new XPacket(sid, 21, PolarxResultset.Chunk.parseFrom(recvBuffer), packetSize));
                                ** break;
                            }
                            case 22: {
                                batch.add(new XPacket(sid, type, PolarxPhysicalBackfill.GetFileInfoOperator.parseFrom(recvBuffer), packetSize));
                                ** break;
                            }
                            case 23: {
                                batch.add(new XPacket(sid, type, PolarxPhysicalBackfill.TransferFileDataOperator.parseFrom(recvBuffer), packetSize));
                                ** break;
                            }
                            case 24: {
                                batch.add(new XPacket(sid, type, PolarxPhysicalBackfill.FileManageOperatorResponse.parseFrom(recvBuffer), packetSize));
                                ** break;
                            }
                            default: {
                                XLog.XLogLogger.error(this + " Unknown ServerMessages.Type: " + type);
                                continue block27;
                            }
                        }
                    }
                    catch (Exception e) {
                        XLog.XLogLogger.error(this + " Bad packet!!! sid: " + sid + " type: " + type + " len: " + packetSize);
                        XLog.XLogLogger.error((Throwable)e);
                        continue;
                    }
                    finally {
                        recvBuffer.position(nextPos);
                        recvBuffer.limit(oldLimit);
                        continue;
                    }
                }
                if (headerSize + packetSize <= recvBuffer.capacity()) break;
                newBuf = ByteBuffer.allocate(packetSize + 4096).order(ByteOrder.LITTLE_ENDIAN);
                newBuf.put(recvBuffer);
                recvBuffer.clear();
                this.bigReadBuffer = newBuf;
                XLog.XLogLogger.info(this + " switch to big buf.");
                break;
            }
            if (recvBuffer.hasRemaining()) {
                recvBuffer.compact();
            } else {
                if (recvBuffer == this.bigReadBuffer) {
                    if (this.bigBufferClean.compareAndSet(true, false)) {
                        this.bigReadBuffer = null;
                        XLog.XLogLogger.info(this + " free big buf.");
                    } else {
                        this.bigReadBuffer.clear();
                    }
                }
                this.readBuffer.clear();
            }
        }
        finally {
            this.readLock.unlock();
        }
        if (!batch.isEmpty()) {
            if (this.client != null) {
                this.client.getPerfCollection().getRecvMsgCount().getAndAdd(batch.size());
                this.client.getPool().getPerfCollection().getRecvMsgCount().getAndAdd(batch.size());
            }
            this.consumer.accept(batch);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void write(XPacket packet, boolean flush) throws IOException {
        if (!(packet.getPacket() instanceof GeneratedMessageV3)) {
            throw new TddlRuntimeException(com.alibaba.polardbx.common.exception.code.ErrorCode.ERR_X_PROTOCOL_BAD_PACKET, new String[]{this + " send unknown packet."});
        }
        GeneratedMessageV3 msg = (GeneratedMessageV3)packet.getPacket();
        int size = msg.getSerializedSize();
        if ((long)size > XConnectionManager.getInstance().getMaxPacketSize()) {
            throw new TddlRuntimeException(com.alibaba.polardbx.common.exception.code.ErrorCode.ERR_X_PROTOCOL_BAD_PACKET, new String[]{this + " sent packet length (" + size + ") exceeds the allowed maximum (" + XConnectionManager.getInstance().getMaxPacketSize() + ")."});
        }
        int headerSize = XConfig.GALAXY_X_PROTOCOL ? 14 : 13;
        int fullSize = headerSize + size;
        Throwable throwable = null;
        boolean directWrite = XConnectionManager.getInstance().isEnableDirectWrite();
        boolean needQueuedFlush = false;
        this.writeLock.lock();
        try {
            if (this.isClosed.get()) {
                throw new TddlRuntimeException(com.alibaba.polardbx.common.exception.code.ErrorCode.ERR_X_PROTOCOL_CLIENT, new String[]{this + " closed."});
            }
            boolean done = false;
            if (this.lastWrite != null) {
                int position = this.lastWrite.position();
                int limit = this.lastWrite.limit();
                if (limit + fullSize <= this.lastWrite.capacity()) {
                    this.lastWrite.position(limit);
                    this.lastWrite.limit(this.lastWrite.capacity());
                    this.lastWrite.putLong(packet.getSid());
                    if (XConfig.GALAXY_X_PROTOCOL) {
                        this.lastWrite.put((byte)0);
                    }
                    this.lastWrite.putInt(1 + size);
                    this.lastWrite.put((byte)packet.getType());
                    msg.writeTo(CodedOutputStream.newInstance((ByteBuffer)this.lastWrite));
                    this.lastWrite.position(this.lastWrite.position() + size);
                    this.lastWrite.flip();
                    this.lastWrite.position(position);
                    done = true;
                    if (this.lastWrite.limit() + headerSize > this.lastWrite.capacity()) {
                        this.lastWrite = null;
                    }
                }
            }
            if (!done) {
                ByteBuffer buffer = fullSize > this.processor.getBufferPool().getChunkSize() ? ByteBuffer.allocate(fullSize).order(ByteOrder.LITTLE_ENDIAN) : this.processor.getBufferPool().allocate();
                try {
                    buffer.putLong(packet.getSid());
                    if (XConfig.GALAXY_X_PROTOCOL) {
                        buffer.put((byte)0);
                    }
                    buffer.putInt(1 + size);
                    buffer.put((byte)packet.getType());
                    msg.writeTo(CodedOutputStream.newInstance((ByteBuffer)buffer));
                    buffer.position(buffer.position() + size);
                    this.lastWrite = buffer.remaining() >= headerSize ? buffer : null;
                    buffer.flip();
                    this.writeQueue.offer(buffer);
                }
                catch (Throwable e) {
                    this.processor.getBufferPool().recycle(buffer);
                    throw e;
                }
            }
            if (directWrite) {
                if (flush) {
                    needQueuedFlush = !this.write0();
                }
            } else {
                needQueuedFlush = flush;
            }
        }
        catch (Throwable e) {
            throwable = e;
        }
        finally {
            this.writeLock.unlock();
        }
        if (throwable != null) {
            XLog.XLogLogger.error(throwable);
            this.fatalCallback.accept(throwable);
            throw GeneralUtil.nestedException((Throwable)throwable);
        }
        if (this.client != null) {
            this.client.getPerfCollection().getSendMsgCount().getAndIncrement();
            this.client.getPool().getPerfCollection().getSendMsgCount().getAndIncrement();
        }
        if (needQueuedFlush) {
            this.flush();
        }
    }

    public void flush() {
        this.processor.postWrite(this);
    }

    private boolean write0() throws IOException {
        ByteBuffer top;
        this.lastSend.set(TimerThread.fastNanos());
        while ((top = this.writeQueue.peek()) != null) {
            int written = this.channel.write(top);
            if (written > 0 && this.client != null) {
                this.client.getPerfCollection().getSendFlushCount().getAndIncrement();
                this.client.getPerfCollection().getSendSize().getAndAdd(written);
                this.client.getPool().getPerfCollection().getSendFlushCount().getAndIncrement();
                this.client.getPool().getPerfCollection().getSendSize().getAndAdd(written);
            }
            if (top.hasRemaining()) {
                return false;
            }
            if (this.lastWrite == top) {
                this.lastWrite = null;
            }
            ByteBuffer removed = this.writeQueue.remove();
            assert (removed == top);
            this.processor.getBufferPool().recycle(removed);
        }
        return true;
    }

    private void enableWrite() {
        this.keyLock.lock();
        try {
            SelectionKey key = this.processKey;
            key.interestOps(key.interestOps() | 4);
        }
        finally {
            this.keyLock.unlock();
        }
        this.processKey.selector().wakeup();
    }

    private void disableWrite() {
        this.keyLock.lock();
        try {
            SelectionKey key = this.processKey;
            key.interestOps(key.interestOps() & 0xFFFFFFFB);
        }
        finally {
            this.keyLock.unlock();
        }
    }

    @Override
    public void writeByQueue() throws IOException {
        if (this.isClosed.get()) {
            return;
        }
        this.writeLock.lock();
        try {
            if ((this.processKey.interestOps() & 4) == 0 && !this.write0()) {
                this.enableWrite();
            }
        }
        finally {
            this.writeLock.unlock();
        }
    }

    @Override
    public void writeByEvent() throws IOException {
        if (this.isClosed.get()) {
            return;
        }
        this.writeLock.lock();
        try {
            if (this.write0()) {
                this.disableWrite();
            }
        }
        finally {
            this.writeLock.unlock();
        }
    }

    @Override
    public void handleError(ErrorCode errCode, Throwable t) {
        XLog.XLogLogger.error(t);
        this.fatalCallback.accept(t);
    }

    public String getTag() {
        SocketChannel c = this.channel;
        if (null == c) {
            return null;
        }
        try {
            return c.getLocalAddress().toString() + " -> " + c.getRemoteAddress().toString();
        }
        catch (Throwable ignore) {
            return null;
        }
    }

    public String toString() {
        SocketChannel c = this.channel;
        if (null == c) {
            return "X-NIO-Client";
        }
        try {
            return "X-NIO-Client " + c.getLocalAddress().toString() + " to " + c.getRemoteAddress().toString();
        }
        catch (Throwable ignore) {
            return "X-NIO-Client";
        }
    }
}

