/*
 * Decompiled with CFR 0.152.
 */
package com.datatorrent.netlet;

import com.datatorrent.netlet.EventLoop;
import com.datatorrent.netlet.Listener;
import com.datatorrent.netlet.OptimizedEventLoop;
import com.datatorrent.netlet.util.CircularBuffer;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.NotYetConnectedException;
import java.nio.channels.SelectableChannel;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.nio.channels.UnresolvedAddressException;
import java.util.Collection;
import java.util.Iterator;
import java.util.Set;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class DefaultEventLoop
implements Runnable,
EventLoop {
    public static final String eventLoopPropertyName = "com.datatorrent.netlet.disableOptimizedEventLoop";
    private static final int SLEEP_MILLIS = 5;
    public final String id;
    protected final Selector selector;
    protected final CircularBuffer<Runnable> tasks = new CircularBuffer(1024, 5);
    protected boolean alive;
    private int refCount;
    private Thread eventThread;
    private static final Logger logger = LoggerFactory.getLogger(DefaultEventLoop.class);

    public static DefaultEventLoop createEventLoop(String id) throws IOException {
        String disableOptimizedEventLoop = System.getProperty(eventLoopPropertyName);
        if (disableOptimizedEventLoop == null || disableOptimizedEventLoop.equalsIgnoreCase("false") || disableOptimizedEventLoop.equalsIgnoreCase("no")) {
            return new OptimizedEventLoop(id);
        }
        DefaultEventLoop eventLoop = new DefaultEventLoop(id);
        return eventLoop;
    }

    @Deprecated
    public DefaultEventLoop(String id) throws IOException {
        this.id = id;
        this.selector = Selector.open();
    }

    public synchronized Thread start() {
        if (++this.refCount == 1) {
            this.eventThread = new Thread((Runnable)this, this.id);
            this.eventThread.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler(){

                @Override
                public void uncaughtException(Thread t, Throwable e) {
                    logger.error("Exception in thread {}", (Object)t, (Object)e);
                }
            });
            this.eventThread.start();
        }
        return this.eventThread;
    }

    public void stop() {
        this.submit(new Runnable(){

            /*
             * WARNING - Removed try catching itself - possible behaviour change.
             */
            @Override
            public void run() {
                DefaultEventLoop defaultEventLoop = DefaultEventLoop.this;
                synchronized (defaultEventLoop) {
                    if (--DefaultEventLoop.this.refCount == 0) {
                        for (SelectionKey selectionKey : DefaultEventLoop.this.selector.keys()) {
                            SelectableChannel channel;
                            if (!selectionKey.isValid() || (channel = selectionKey.channel()) == null || !channel.isOpen()) continue;
                            Listener l = (Listener)selectionKey.attachment();
                            try {
                                selectionKey.channel().close();
                                if (l == null) continue;
                                if (l instanceof Listener.ClientListener) {
                                    ((Listener.ClientListener)l).disconnected();
                                }
                                l.unregistered(selectionKey);
                            }
                            catch (IOException e) {
                                if (l != null) {
                                    l.handleException(e, DefaultEventLoop.this);
                                    continue;
                                }
                                logger.warn("Exception while closing channel {} on unregistered key {}", new Object[]{channel, selectionKey, e});
                            }
                        }
                        DefaultEventLoop.this.alive = false;
                        DefaultEventLoop.this.selector.wakeup();
                    }
                }
            }

            public String toString() {
                return String.format("stop{%d}", DefaultEventLoop.this.refCount);
            }
        });
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void run() {
        DefaultEventLoop defaultEventLoop = this;
        synchronized (defaultEventLoop) {
            if (this.eventThread == null) {
                ++this.refCount;
                this.eventThread = Thread.currentThread();
            } else if (this.eventThread != Thread.currentThread()) {
                throw new IllegalStateException("DefaultEventLoop can not run in two [" + this.eventThread.getName() + "] and [" + Thread.currentThread().getName() + "] threads.");
            }
        }
        this.alive = true;
        try {
            this.runEventLoop();
        }
        finally {
            if (this.alive) {
                this.alive = false;
                logger.warn("Unexpected termination of {}", (Object)this);
            }
            this.eventThread = null;
        }
    }

    protected void runEventLoop() {
        Iterator<SelectionKey> EMPTY_ITERATOR = new Iterator<SelectionKey>(){

            @Override
            public boolean hasNext() {
                return false;
            }

            @Override
            public SelectionKey next() {
                throw new UnsupportedOperationException("Not supported yet.");
            }

            @Override
            public void remove() {
                throw new UnsupportedOperationException("Not supported yet.");
            }
        };
        Set<SelectionKey> EMPTY_SET = new Set<SelectionKey>(){

            @Override
            public int size() {
                throw new UnsupportedOperationException("Not supported yet.");
            }

            @Override
            public boolean isEmpty() {
                throw new UnsupportedOperationException("Not supported yet.");
            }

            @Override
            public boolean contains(Object o) {
                throw new UnsupportedOperationException("Not supported yet.");
            }

            @Override
            public Iterator<SelectionKey> iterator() {
                throw new UnsupportedOperationException("Not supported yet.");
            }

            @Override
            public Object[] toArray() {
                throw new UnsupportedOperationException("Not supported yet.");
            }

            @Override
            public <T> T[] toArray(T[] a) {
                throw new UnsupportedOperationException("Not supported yet.");
            }

            @Override
            public boolean add(SelectionKey e) {
                throw new UnsupportedOperationException("Not supported yet.");
            }

            @Override
            public boolean remove(Object o) {
                throw new UnsupportedOperationException("Not supported yet.");
            }

            @Override
            public boolean containsAll(Collection<?> c) {
                throw new UnsupportedOperationException("Not supported yet.");
            }

            @Override
            public boolean addAll(Collection<? extends SelectionKey> c) {
                throw new UnsupportedOperationException("Not supported yet.");
            }

            @Override
            public boolean retainAll(Collection<?> c) {
                throw new UnsupportedOperationException("Not supported yet.");
            }

            @Override
            public boolean removeAll(Collection<?> c) {
                throw new UnsupportedOperationException("Not supported yet.");
            }

            @Override
            public void clear() {
            }
        };
        SelectionKey sk = null;
        Set<SelectionKey> selectedKeys = EMPTY_SET;
        Iterator<SelectionKey> iterator = EMPTY_ITERATOR;
        do {
            try {
                do {
                    if (!iterator.hasNext()) {
                        int size = this.tasks.size();
                        if (size > 0) {
                            Runnable task;
                            while ((task = this.tasks.poll()) != null) {
                                task.run();
                                if (--size > 0) continue;
                            }
                            size = this.selector.selectNow();
                        } else {
                            size = this.selector.select(100L);
                        }
                        if (size > 0) {
                            selectedKeys = this.selector.selectedKeys();
                            iterator = selectedKeys.iterator();
                        } else {
                            iterator = EMPTY_ITERATOR;
                        }
                    }
                    while (iterator.hasNext()) {
                        sk = (SelectionKey)iterator.next();
                        if (!sk.isValid()) continue;
                        this.handleSelectedKey(sk);
                    }
                    selectedKeys.clear();
                } while (this.alive);
            }
            catch (Exception ex) {
                if (sk == null) {
                    logger.warn("Unexpected exception not related to SelectionKey", (Throwable)ex);
                    continue;
                }
                logger.warn("Exception on unregistered SelectionKey {}", sk, (Object)ex);
                Listener l = (Listener)sk.attachment();
                if (l == null) continue;
                l.handleException(ex, this);
            }
        } while (this.alive);
    }

    protected final void handleSelectedKey(SelectionKey sk) throws IOException {
        switch (sk.readyOps()) {
            case 16: {
                ServerSocketChannel ssc = (ServerSocketChannel)sk.channel();
                SocketChannel sc = ssc.accept();
                sc.configureBlocking(false);
                Listener.ServerListener sl = (Listener.ServerListener)sk.attachment();
                Listener.ClientListener l = sl.getClientConnection(sc, (ServerSocketChannel)sk.channel());
                this.register(sc, 5, (Listener)l);
                break;
            }
            case 8: {
                if (!((SocketChannel)sk.channel()).finishConnect()) break;
                ((Listener.ClientListener)sk.attachment()).connected();
                break;
            }
            case 1: {
                ((Listener.ClientListener)sk.attachment()).read();
                break;
            }
            case 4: {
                ((Listener.ClientListener)sk.attachment()).write();
                break;
            }
            case 5: {
                ((Listener.ClientListener)sk.attachment()).read();
                ((Listener.ClientListener)sk.attachment()).write();
                break;
            }
            case 9: 
            case 12: 
            case 13: {
                if (!((SocketChannel)sk.channel()).finishConnect()) break;
                ((Listener.ClientListener)sk.attachment()).connected();
                if (sk.isReadable()) {
                    ((Listener.ClientListener)sk.attachment()).read();
                }
                if (!sk.isWritable()) break;
                ((Listener.ClientListener)sk.attachment()).write();
                break;
            }
            default: {
                logger.warn("!!!!!! not sure what interest this is {} !!!!!!", (Object)Integer.toBinaryString(sk.readyOps()));
            }
        }
    }

    private void handleFullTasksCircularBuffer(int sleepMillis) {
        if (this.eventThread == Thread.currentThread()) {
            Runnable task;
            while ((task = this.tasks.poll()) != null) {
                task.run();
            }
        } else {
            this.selector.wakeup();
            try {
                Thread.sleep(sleepMillis);
            }
            catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void submit(Runnable r) {
        Thread currentThread = Thread.currentThread();
        if (this.eventThread != currentThread || !this.tasks.isEmpty()) {
            int sleepMillis = 0;
            while (true) {
                CircularBuffer<Runnable> circularBuffer = this.tasks;
                synchronized (circularBuffer) {
                    if (this.tasks.offer(r)) {
                        this.selector.wakeup();
                        return;
                    }
                }
                this.handleFullTasksCircularBuffer(sleepMillis);
                sleepMillis = Math.min(5, sleepMillis + 1);
            }
        }
        r.run();
    }

    private void register(final SelectableChannel c, final int ops, final Listener l) {
        this.submit(new Runnable(){

            @Override
            public void run() {
                try {
                    l.registered(c.register(DefaultEventLoop.this.selector, ops, l));
                }
                catch (ClosedChannelException cce) {
                    l.handleException(cce, DefaultEventLoop.this);
                }
            }

            public String toString() {
                return String.format("register(%s, %d, %s)", c, ops, l);
            }
        });
    }

    public void unregister(final SelectableChannel c) {
        this.submit(new Runnable(){

            @Override
            public void run() {
                for (SelectionKey key : DefaultEventLoop.this.selector.keys()) {
                    if (key.channel() != c) continue;
                    ((Listener)key.attachment()).unregistered(key);
                    key.interestOps(0);
                    key.attach(Listener.NOOP_LISTENER);
                }
            }

            public String toString() {
                return String.format("unregister(%s)", c);
            }
        });
    }

    public void register(ServerSocketChannel channel, Listener l) {
        this.register(channel, 16, l);
    }

    public void register(SocketChannel channel, int ops, Listener l) {
        this.register((SelectableChannel)channel, ops, l);
    }

    @Override
    public final void connect(final InetSocketAddress address, final Listener.ClientListener l) {
        this.submit(new Runnable(){

            @Override
            public void run() {
                SocketChannel channel = null;
                Exception exception = null;
                try {
                    channel = SocketChannel.open();
                    channel.configureBlocking(false);
                    if (channel.connect(address)) {
                        l.connected();
                        DefaultEventLoop.this.register(channel, 1, (Listener)l);
                    } else {
                        DefaultEventLoop.this.register(channel, 9, (Listener)new Listener.ClientListener(){
                            private SelectionKey key;

                            @Override
                            public void read() throws IOException {
                                logger.debug("missing OP_CONNECT");
                                this.connected();
                                l.read();
                            }

                            @Override
                            public void write() throws IOException {
                                logger.debug("missing OP_CONNECT");
                                this.connected();
                                l.write();
                            }

                            @Override
                            public void connected() {
                                logger.debug("{}", (Object)this);
                                this.key.interestOps(5);
                                this.key.attach(l);
                                l.connected();
                            }

                            @Override
                            public void disconnected() {
                                logger.debug("missing OP_CONNECT {}", (Object)this);
                                throw new NotYetConnectedException();
                            }

                            @Override
                            public void handleException(Exception exception, EventLoop eventloop) {
                                this.key.attach(l);
                                l.handleException(exception, eventloop);
                            }

                            @Override
                            public void registered(SelectionKey key) {
                                this.key = key;
                                l.registered(this.key);
                            }

                            @Override
                            public void unregistered(SelectionKey key) {
                                l.unregistered(key);
                            }

                            public String toString() {
                                return "Pre-connect Client listener for " + l.toString();
                            }
                        });
                    }
                }
                catch (UnresolvedAddressException e) {
                    exception = new RuntimeException("Inet Address " + address + " is not resolvable.", e);
                }
                catch (IllegalStateException e) {
                    exception = new RuntimeException("Connect request is not valid for channel " + channel + ".", e);
                }
                catch (Exception e) {
                    exception = e;
                }
                if (exception != null) {
                    if (channel != null && channel.isOpen()) {
                        try {
                            channel.close();
                        }
                        catch (IOException io) {
                            l.handleException(io, DefaultEventLoop.this);
                        }
                    }
                    l.handleException(exception, DefaultEventLoop.this);
                }
            }

            public String toString() {
                return String.format("connect(%s, %s)", address, l);
            }
        });
    }

    @Override
    public final void disconnect(final Listener.ClientListener l) {
        this.submit(new Runnable(){

            /*
             * WARNING - Removed try catching itself - possible behaviour change.
             */
            @Override
            public void run() {
                for (SelectionKey key : DefaultEventLoop.this.selector.keys()) {
                    if (key.attachment() != l) continue;
                    try {
                        l.unregistered(key);
                    }
                    finally {
                        boolean disconnected = true;
                        if (key.isValid() && (key.interestOps() & 4) != 0) {
                            key.attach(new Listener.DisconnectingListener(key));
                            disconnected = false;
                        }
                        if (!disconnected) continue;
                        try {
                            key.attach(Listener.NOOP_CLIENT_LISTENER);
                            key.channel().close();
                        }
                        catch (IOException io) {
                            l.handleException(io, DefaultEventLoop.this);
                        }
                        if (key.channel().isOpen()) continue;
                        l.disconnected();
                    }
                }
            }

            public String toString() {
                return String.format("disconnect(%s)", l);
            }
        });
    }

    @Override
    public final void start(final String host, final int port, final Listener.ServerListener l) {
        this.submit(new Runnable(){

            @Override
            public void run() {
                block4: {
                    ServerSocketChannel channel = null;
                    try {
                        channel = ServerSocketChannel.open();
                        channel.configureBlocking(false);
                        channel.socket().bind(host == null ? new InetSocketAddress(port) : new InetSocketAddress(host, port), 128);
                        DefaultEventLoop.this.register(channel, 16, l);
                    }
                    catch (IOException io) {
                        l.handleException(io, DefaultEventLoop.this);
                        if (channel == null || !channel.isOpen()) break block4;
                        try {
                            channel.close();
                        }
                        catch (IOException ie) {
                            l.handleException(ie, DefaultEventLoop.this);
                        }
                    }
                }
            }

            public String toString() {
                return String.format("start(%s, %d, %s)", host, port, l);
            }
        });
    }

    @Override
    public final void stop(final Listener.ServerListener l) {
        this.submit(new Runnable(){

            @Override
            public void run() {
                for (SelectionKey key : DefaultEventLoop.this.selector.keys()) {
                    if (key.attachment() != l) continue;
                    if (key.isValid()) {
                        l.unregistered(key);
                        key.cancel();
                    }
                    key.attach(Listener.NOOP_LISTENER);
                    try {
                        key.channel().close();
                    }
                    catch (IOException io) {
                        l.handleException(io, DefaultEventLoop.this);
                    }
                }
            }

            public String toString() {
                return String.format("stop(%s)", l);
            }
        });
    }

    public boolean isActive() {
        return this.eventThread != null && this.eventThread.isAlive();
    }

    public String toString() {
        return "{id=" + this.id + ", " + this.tasks + '}';
    }
}

