/*
 * Decompiled with CFR 0.152.
 */
package org.rapidoidx.net.impl;

import java.io.IOException;
import java.net.ConnectException;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.net.SocketException;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.ReadableByteChannel;
import java.nio.channels.SelectionKey;
import java.nio.channels.SocketChannel;
import java.nio.channels.WritableByteChannel;
import java.util.Queue;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.Callable;
import org.rapidoid.Insights;
import org.rapidoid.config.Conf;
import org.rapidoid.ctx.Ctxs;
import org.rapidoid.log.Log;
import org.rapidoid.measure.StatsMeasure;
import org.rapidoid.pool.Pool;
import org.rapidoid.pool.Pools;
import org.rapidoid.u.U;
import org.rapidoid.util.SimpleList;
import org.rapidoidx.buffer.BufGroup;
import org.rapidoidx.buffer.IncompleteReadException;
import org.rapidoidx.net.Protocol;
import org.rapidoidx.net.impl.AbstractEventLoop;
import org.rapidoidx.net.impl.ChannelHolderImpl;
import org.rapidoidx.net.impl.ConnState;
import org.rapidoidx.net.impl.ConnectionTarget;
import org.rapidoidx.net.impl.CtxListener;
import org.rapidoidx.net.impl.ProtocolException;
import org.rapidoidx.net.impl.RapidoidChannel;
import org.rapidoidx.net.impl.RapidoidConnection;
import org.rapidoidx.net.impl.RapidoidHelper;

public class RapidoidWorker
extends AbstractEventLoop<RapidoidWorker> {
    public static boolean EXTRA_SAFE = false;
    private final Queue<RapidoidConnection> restarting;
    private final Queue<ConnectionTarget> connecting;
    private final Queue<RapidoidChannel> connected;
    private final SimpleList<RapidoidConnection> waitingToWrite;
    private final Pool<RapidoidConnection> connections;
    private final long maxPipelineSize;
    private final int selectorTimeout;
    final Protocol serverProtocol;
    final RapidoidHelper helper;
    private final int bufSize;
    private final long bufSizeLimit;
    private final boolean noDelay;
    private final BufGroup bufs;
    private volatile long messagesProcessed;
    private final StatsMeasure dataIn;
    private final StatsMeasure dataOut;

    public RapidoidWorker(String name, BufGroup bufs, Protocol protocol, RapidoidHelper helper, int bufSizeKB, boolean noNelay) {
        super(name);
        this.bufs = bufs;
        this.serverProtocol = protocol;
        this.helper = helper;
        this.maxPipelineSize = Conf.option((String)"pipeline-max", (long)1000000L);
        this.selectorTimeout = Conf.option((String)"selector-timeout", (int)5);
        this.bufSizeLimit = Conf.option((String)"buffer-limit", (long)0x100000L);
        int queueSize = Conf.micro() ? 1000 : 1000000;
        int growFactor = Conf.micro() ? 2 : 10;
        this.restarting = new ArrayBlockingQueue<RapidoidConnection>(queueSize);
        this.connecting = new ArrayBlockingQueue<ConnectionTarget>(queueSize);
        this.connected = new ArrayBlockingQueue<RapidoidChannel>(queueSize);
        this.waitingToWrite = new SimpleList(queueSize / 10, growFactor);
        this.dataIn = Insights.stats((String)(name + ":datain"));
        this.dataOut = Insights.stats((String)(name + ":dataout"));
        this.connections = Pools.create((String)"connections", (Callable)new Callable<RapidoidConnection>(){

            @Override
            public RapidoidConnection call() throws Exception {
                return RapidoidWorker.this.newConnection();
            }
        }, (int)100000);
        this.bufSize = bufSizeKB * 1024;
        this.noDelay = noNelay;
    }

    public void accept(SocketChannel socketChannel) throws IOException {
        this.configureSocket(socketChannel);
        this.connected.add(new RapidoidChannel(socketChannel, false, this.serverProtocol));
        this.selector.wakeup();
    }

    public void connect(ConnectionTarget target) throws IOException {
        this.configureSocket(target.socketChannel);
        this.connecting.add(target);
        if (target.socketChannel.connect(target.addr)) {
            Log.info((String)"Opened socket, connected", (String)"address", (Object)target.addr);
        } else {
            Log.info((String)"Opened socket, connecting...", (String)"address", (Object)target.addr);
        }
        this.selector.wakeup();
    }

    private void configureSocket(SocketChannel socketChannel) throws IOException, SocketException {
        socketChannel.configureBlocking(false);
        Socket socket = socketChannel.socket();
        socket.setTcpNoDelay(this.noDelay);
        socket.setReceiveBufferSize(this.bufSize);
        socket.setSendBufferSize(this.bufSize);
        socket.setReuseAddress(true);
    }

    @Override
    protected void connectOP(SelectionKey key) throws IOException {
        U.must((boolean)key.isConnectable());
        SocketChannel socketChannel = (SocketChannel)key.channel();
        if (!socketChannel.isConnectionPending()) {
            return;
        }
        ConnectionTarget target = (ConnectionTarget)key.attachment();
        try {
            boolean ready = socketChannel.finishConnect();
            U.rteIf((!ready ? 1 : 0) != 0, (String)"Expected an established connection!");
            Log.info((String)"Connected", (String)"address", (Object)target.addr);
            this.connected.add(new RapidoidChannel(socketChannel, true, target.protocol, target.holder, target.autoreconnecting, target.state));
        }
        catch (ConnectException e) {
            this.retryConnecting(target);
        }
    }

    private void retryConnecting(ConnectionTarget target) throws IOException {
        Log.warn((String)"Reconnecting...", (String)"address", (Object)target.addr);
        target.socketChannel = SocketChannel.open();
        target.retryAfter = U.time() + 1000L;
        this.connect(target);
    }

    @Override
    protected void readOP(SelectionKey key) throws IOException {
        SocketChannel socketChannel = (SocketChannel)key.channel();
        RapidoidConnection conn = (RapidoidConnection)key.attachment();
        if (conn.input.size() < this.bufSizeLimit) {
            long read;
            try {
                read = conn.input.append((ReadableByteChannel)socketChannel);
            }
            catch (Exception e) {
                read = -1L;
            }
            if (read == -1L) {
                Log.debug((String)"The other end closed the connection!");
                conn.closing = true;
            }
            this.dataIn.value(read);
        }
        this.process(conn);
        if (conn.closing) {
            if (conn.shouldReconnect()) {
                this.reconnect(conn);
            } else {
                this.close(key);
            }
        }
    }

    private void reconnect(RapidoidConnection conn) throws IOException {
        SelectionKey key = conn.key;
        InetSocketAddress addr = conn.getAddress();
        Protocol protocol = conn.getProtocol();
        ChannelHolderImpl holder = conn.getHolder();
        ConnState state = conn.state().copy();
        holder.closed();
        this.close(key);
        this.retryConnecting(new ConnectionTarget(null, addr, protocol, holder, true, state));
    }

    public void process(RapidoidConnection conn) {
        this.messagesProcessed += this.processMsgs(conn);
        conn.completedInputPos = conn.input.position();
    }

    private long processMsgs(RapidoidConnection conn) {
        long reqN;
        for (reqN = 0L; reqN < this.maxPipelineSize && conn.input().hasRemaining() && this.processNext(conn, false, false); ++reqN) {
        }
        return reqN;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private boolean processNext(RapidoidConnection conn, boolean initial, boolean write) {
        conn.log(initial ? "<< INIT >>" : "<< PROCESS >>");
        U.must((initial || write || conn.input().hasRemaining() ? 1 : 0) != 0);
        conn.input().checkpoint(conn.input().position());
        long limit = conn.input().limit();
        long osize = conn.output().size();
        ConnState state = conn.state();
        long stateN = state.n;
        Object stateObj = state.obj;
        try {
            conn.done = false;
            if (EXTRA_SAFE) {
                this.processNextExtraSafe(conn);
            } else {
                conn.getProtocol().process(conn);
            }
            if (!conn.closed && !conn.isAsync()) {
                conn.done();
            }
            conn.input().deleteBefore(conn.input().checkpoint());
            Log.debug((String)"Completed message processing");
            boolean bl = true;
            return bl;
        }
        catch (IncompleteReadException e) {
            Log.debug((String)"Incomplete message");
            conn.log("<< ROLLBACK >>");
            conn.input().position(conn.input().checkpoint());
            conn.input().limit(limit);
            conn.output().deleteAfter(osize);
            state.n = stateN;
            state.obj = stateObj;
        }
        catch (ProtocolException e) {
            conn.log("<< PROTOCOL ERROR >>");
            Log.warn((String)"Protocol error", (String)"error", (Object)e);
            conn.output().deleteAfter(osize);
            conn.write((String)U.or((Object)e.getMessage(), (Object)"Protocol error!"));
            conn.error();
            conn.close(true);
        }
        catch (Throwable e) {
            conn.log("<< ERROR >>");
            Log.error((String)"Failed to process message!", (Throwable)e);
            conn.close(true);
        }
        finally {
            conn.input().checkpoint(-1L);
        }
        return false;
    }

    private void processNextExtraSafe(RapidoidConnection conn) {
        if (Ctxs.hasContext()) {
            Log.warn((String)"Detected unclosed context before processing message!");
            Ctxs.close();
        }
        try {
            conn.getProtocol().process(conn);
        }
        finally {
            if (Ctxs.hasContext()) {
                Log.warn((String)"Detected unclosed context after processing message!");
                Ctxs.close();
            }
        }
    }

    public void close(RapidoidConnection conn) {
        this.close(conn.key);
    }

    private void close(SelectionKey key) {
        try {
            RapidoidConnection conn;
            Object attachment = key.attachment();
            this.clearKey(key);
            if (attachment instanceof RapidoidConnection && (conn = (RapidoidConnection)attachment) != null && !conn.closed) {
                Log.trace((String)"Closing connection", (String)"connection", (Object)conn);
                assert (conn.key == key);
                conn.reset();
                this.connections.release((Object)conn);
            }
        }
        catch (IOException e) {
            e.printStackTrace();
        }
    }

    private void clearKey(SelectionKey key) throws IOException {
        if (key.isValid()) {
            SocketChannel socketChannel = (SocketChannel)key.channel();
            socketChannel.close();
            key.attach(null);
            key.cancel();
        }
    }

    @Override
    protected void writeOP(SelectionKey key) throws IOException {
        RapidoidConnection conn = (RapidoidConnection)key.attachment();
        SocketChannel socketChannel = (SocketChannel)key.channel();
        this.checkOnSameThread();
        try {
            boolean complete;
            long wrote = conn.output.writeTo((WritableByteChannel)socketChannel);
            assert (wrote >= 0L);
            conn.output.deleteBefore(wrote);
            this.dataOut.value(wrote);
            boolean bl = complete = conn.output.size() == 0L;
            if (conn.closeAfterWrite() && complete) {
                this.close(conn);
            } else {
                if (complete) {
                    key.interestOps(conn.mode != 0 ? conn.mode : conn.nextOp);
                    this.processNext(conn, false, true);
                } else {
                    key.interestOps(conn.mode != 0 ? conn.mode : 5);
                }
                conn.wrote(complete);
            }
        }
        catch (IOException e) {
            this.close(conn);
        }
    }

    public void wantToWrite(RapidoidConnection conn) {
        U.must((conn.mode != 1 ? 1 : 0) != 0);
        if (this.onSameThread()) {
            conn.key.interestOps(4);
        } else {
            this.wantToWriteAsync(conn);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void wantToWriteAsync(RapidoidConnection conn) {
        SimpleList<RapidoidConnection> simpleList = this.waitingToWrite;
        synchronized (simpleList) {
            this.waitingToWrite.add((Object)conn);
        }
        this.selector.wakeup();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    protected void doProcessing() {
        RapidoidConnection restartedConn;
        RapidoidChannel channel;
        SelectionKey newKey;
        long now = U.time();
        int connectingN = this.connecting.size();
        for (int i = 0; i < connectingN; ++i) {
            ConnectionTarget target = this.connecting.poll();
            assert (target != null);
            if (target.retryAfter < now) {
                Log.debug((String)"connecting", (String)"address", (Object)target.addr);
                try {
                    newKey = target.socketChannel.register(this.selector, 8);
                    newKey.attach(target);
                }
                catch (ClosedChannelException e) {
                    Log.warn((String)"Closed channel", (Throwable)e);
                }
                continue;
            }
            this.connecting.add(target);
        }
        while ((channel = this.connected.poll()) != null) {
            SocketChannel socketChannel = channel.socketChannel;
            Log.debug((String)"connected", (String)"address", (Object)socketChannel.socket().getRemoteSocketAddress());
            try {
                newKey = socketChannel.register(this.selector, 1);
                U.notNull((Object)channel.protocol, (String)"protocol", (Object[])new Object[0]);
                RapidoidConnection conn = this.attachConn(newKey, channel.protocol);
                conn.setClient(channel.isClient);
                conn.setAutoReconnect(channel.autoreconnecting);
                this.bindChannelToHolder(conn, channel.holder);
                if (channel.state != null) {
                    conn.state().copyFrom(channel.state);
                }
                try {
                    this.processNext(conn, true, false);
                }
                finally {
                    conn.setInitial(false);
                }
            }
            catch (ClosedChannelException e) {
                Log.warn((String)"Closed channel", (Throwable)e);
            }
        }
        while ((restartedConn = this.restarting.poll()) != null) {
            Log.debug((String)"restarting", (String)"connection", (Object)restartedConn);
            this.processNext(restartedConn, true, false);
        }
        SimpleList<RapidoidConnection> simpleList = this.waitingToWrite;
        synchronized (simpleList) {
            for (int i = 0; i < this.waitingToWrite.size(); ++i) {
                RapidoidConnection conn = (RapidoidConnection)this.waitingToWrite.get(i);
                if (conn.key == null || !conn.key.isValid()) continue;
                conn.key.interestOps(4);
            }
            this.waitingToWrite.clear();
        }
    }

    private void bindChannelToHolder(RapidoidConnection conn, ChannelHolderImpl holder) {
        conn.setHolder(holder);
        if (holder != null) {
            holder.setChannel(conn);
        }
    }

    private RapidoidConnection attachConn(SelectionKey key, Protocol protocol) {
        U.notNull((Object)key, (String)"protocol", (Object[])new Object[0]);
        U.notNull((Object)protocol, (String)"protocol", (Object[])new Object[0]);
        Object attachment = key.attachment();
        assert (attachment == null || attachment instanceof ConnectionTarget);
        RapidoidConnection conn = (RapidoidConnection)this.connections.get();
        conn.reset();
        U.must((boolean)conn.closed);
        conn.closed = false;
        conn.key = key;
        conn.setProtocol(protocol);
        if (protocol instanceof CtxListener) {
            conn.setListener((CtxListener)((Object)protocol));
        }
        key.attach(conn);
        return conn;
    }

    @Override
    protected void failedOP(SelectionKey key, Throwable e) {
        Log.error((String)"Network error", (Throwable)e);
        this.close(key);
    }

    public void restart(RapidoidConnection conn) {
        this.restarting.add(conn);
    }

    public RapidoidConnection newConnection() {
        return new RapidoidConnection(this, this.bufs);
    }

    public long getMessagesProcessed() {
        return this.messagesProcessed;
    }

    @Override
    protected long getSelectorTimeout() {
        return this.selectorTimeout;
    }
}

