/*
 * Decompiled with CFR 0.152.
 */
package org.rapidoidx.net.impl;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.nio.channels.SelectionKey;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import org.rapidoid.annotation.Inject;
import org.rapidoid.cls.Cls;
import org.rapidoid.config.Conf;
import org.rapidoid.log.Log;
import org.rapidoid.u.U;
import org.rapidoid.util.Rnd;
import org.rapidoidx.buffer.BufGroup;
import org.rapidoidx.net.Protocol;
import org.rapidoidx.net.TCPServer;
import org.rapidoidx.net.TCPServerInfo;
import org.rapidoidx.net.impl.AbstractEventLoop;
import org.rapidoidx.net.impl.DefaultExchange;
import org.rapidoidx.net.impl.RapidoidConnection;
import org.rapidoidx.net.impl.RapidoidHelper;
import org.rapidoidx.net.impl.RapidoidWorker;

public class RapidoidServerLoop
extends AbstractEventLoop<TCPServer>
implements TCPServer,
TCPServerInfo {
    private volatile RapidoidWorker[] workers;
    private int workerIndex = 0;
    @Inject(optional=true)
    private int port = 8888;
    @Inject(optional=true)
    private int workersN = Conf.cpus();
    @Inject(optional=true)
    private int bufSizeKB = 16;
    @Inject(optional=true)
    private boolean noDelay = false;
    protected final Protocol protocol;
    private final Class<? extends RapidoidHelper> helperClass;
    private final Class<? extends DefaultExchange<?>> exchangeClass;
    private ServerSocketChannel serverSocketChannel;

    public RapidoidServerLoop(Protocol protocol, Class<? extends DefaultExchange<?>> exchangeClass, Class<? extends RapidoidHelper> helperClass) {
        super("server");
        this.protocol = protocol;
        this.exchangeClass = exchangeClass;
        this.helperClass = (Class)U.or(helperClass, RapidoidHelper.class);
    }

    @Override
    protected void acceptOP(SelectionKey key) throws IOException {
        ServerSocketChannel serverChannel = (ServerSocketChannel)key.channel();
        SocketChannel socketChannel = serverChannel.accept();
        RapidoidWorker worker = this.workers[this.workerIndex];
        ++this.workerIndex;
        if (this.workerIndex >= this.workers.length) {
            this.workerIndex = 0;
        }
        worker.accept(socketChannel);
    }

    @Override
    protected void doProcessing() {
    }

    @Override
    protected final void beforeLoop() {
        try {
            this.openSocket();
        }
        catch (IOException e) {
            throw U.rte((String)"Cannot open socket!", (Throwable)e);
        }
    }

    private void openSocket() throws IOException {
        U.notNull((Object)this.protocol, (String)"protocol", (Object[])new Object[0]);
        U.notNull(this.helperClass, (String)"helperClass", (Object[])new Object[0]);
        this.serverSocketChannel = ServerSocketChannel.open();
        if (this.serverSocketChannel.isOpen() && this.selector.isOpen()) {
            this.serverSocketChannel.configureBlocking(false);
            ServerSocket socket = this.serverSocketChannel.socket();
            Log.info((String)"Opening port to listen", (String)"port", (Object)this.port);
            InetSocketAddress addr = new InetSocketAddress(this.port);
            socket.bind(addr);
            Log.info((String)"Opened socket", (String)"address", (Object)addr);
            this.serverSocketChannel.register(this.selector, 16);
            Log.info((String)"Waiting for connections...");
            this.workers = new RapidoidWorker[this.workersN];
            for (int i = 0; i < this.workers.length; ++i) {
                RapidoidHelper helper = (RapidoidHelper)Cls.newInstance(this.helperClass, (Object[])new Object[]{this.exchangeClass});
                String workerName = "server" + (i + 1);
                BufGroup bufGroup = new BufGroup(14);
                this.workers[i] = new RapidoidWorker(workerName, bufGroup, this.protocol, helper, this.bufSizeKB, this.noDelay);
                new Thread((Runnable)this.workers[i], workerName).start();
            }
            for (RapidoidWorker worker : this.workers) {
                worker.waitToStart();
            }
        } else {
            throw U.rte((String)"Cannot open socket!");
        }
    }

    @Override
    public synchronized TCPServer start() {
        new Thread((Runnable)this, "server").start();
        return (TCPServer)super.start();
    }

    @Override
    public synchronized TCPServer shutdown() {
        this.stopLoop();
        for (RapidoidWorker worker : this.workers) {
            worker.stopLoop();
        }
        if (this.serverSocketChannel.isOpen() && this.selector.isOpen()) {
            try {
                this.selector.close();
                this.serverSocketChannel.close();
            }
            catch (IOException e) {
                Log.warn((String)"Cannot close socket or selector!", (Throwable)e);
            }
        }
        return (TCPServer)super.shutdown();
    }

    public synchronized RapidoidConnection newConnection() {
        int rndWorker = Rnd.rnd((int)this.workers.length);
        return this.workers[rndWorker].newConnection();
    }

    public synchronized void process(RapidoidConnection conn) {
        conn.worker.process(conn);
    }

    @Override
    public synchronized String process(String input) {
        RapidoidConnection conn = this.newConnection();
        conn.setInitial(false);
        conn.input.append(input);
        conn.setProtocol(this.protocol);
        this.process(conn);
        return conn.output.asText();
    }

    public Protocol getProtocol() {
        return this.protocol;
    }

    @Override
    public TCPServerInfo info() {
        return this;
    }

    @Override
    public long messagesProcessed() {
        long total = 0L;
        for (int i = 0; i < this.workers.length; ++i) {
            total += this.workers[i].getMessagesProcessed();
        }
        return total;
    }
}

