/*
 * Decompiled with CFR 0.152.
 */
package com.datatorrent.bufferserver.server;

import com.datatorrent.bufferserver.internal.DataList;
import com.datatorrent.bufferserver.internal.FastDataList;
import com.datatorrent.bufferserver.internal.LogicalNode;
import com.datatorrent.bufferserver.packet.PayloadTuple;
import com.datatorrent.bufferserver.packet.PublishRequestTuple;
import com.datatorrent.bufferserver.packet.PurgeRequestTuple;
import com.datatorrent.bufferserver.packet.ResetRequestTuple;
import com.datatorrent.bufferserver.packet.SubscribeRequestTuple;
import com.datatorrent.bufferserver.packet.Tuple;
import com.datatorrent.bufferserver.storage.Storage;
import com.datatorrent.common.util.NameableThreadFactory;
import com.datatorrent.netlet.AbstractClient;
import com.datatorrent.netlet.AbstractLengthPrependerClient;
import com.datatorrent.netlet.DefaultEventLoop;
import com.datatorrent.netlet.EventLoop;
import com.datatorrent.netlet.Listener;
import com.datatorrent.netlet.util.VarInt;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class Server
implements Listener.ServerListener {
    public static final int DEFAULT_BUFFER_SIZE = 0x4000000;
    public static final int DEFAULT_NUMBER_OF_CACHED_BLOCKS = 8;
    private final int port;
    private String identity;
    private Storage storage;
    private EventLoop eventloop;
    private InetSocketAddress address;
    private final ExecutorService serverHelperExecutor;
    private final ExecutorService storageHelperExecutor;
    private byte[] authToken;
    private final ConcurrentHashMap<String, DataList> publisherBuffers = new ConcurrentHashMap(1, 0.75f, 1);
    private final ConcurrentHashMap<String, LogicalNode> subscriberGroups = new ConcurrentHashMap();
    private final ConcurrentHashMap<String, AbstractLengthPrependerClient> publisherChannels = new ConcurrentHashMap();
    private final ConcurrentHashMap<String, AbstractLengthPrependerClient> subscriberChannels = new ConcurrentHashMap();
    private final int blockSize;
    private final int numberOfCacheBlocks;
    private static final Logger logger = LoggerFactory.getLogger(Server.class);

    public Server(int port) {
        this(port, 0x4000000, 8);
    }

    public Server(int port, int blocksize, int numberOfCacheBlocks) {
        this.port = port;
        this.blockSize = blocksize;
        this.numberOfCacheBlocks = numberOfCacheBlocks;
        this.serverHelperExecutor = Executors.newSingleThreadExecutor((ThreadFactory)new NameableThreadFactory("ServerHelper"));
        ArrayBlockingQueue<Runnable> workQueue = new ArrayBlockingQueue<Runnable>(numberOfCacheBlocks);
        NameableThreadFactory threadFactory = new NameableThreadFactory("StorageHelper");
        this.storageHelperExecutor = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, workQueue, (ThreadFactory)threadFactory, new ThreadPoolExecutor.CallerRunsPolicy());
    }

    public void setSpoolStorage(Storage storage) {
        this.storage = storage;
    }

    public synchronized void registered(SelectionKey key) {
        ServerSocketChannel channel = (ServerSocketChannel)key.channel();
        this.address = (InetSocketAddress)channel.socket().getLocalSocketAddress();
        logger.info("Server started listening at {}", (Object)this.address);
        this.notifyAll();
    }

    public void unregistered(SelectionKey key) {
        this.serverHelperExecutor.shutdown();
        this.storageHelperExecutor.shutdown();
        try {
            this.serverHelperExecutor.awaitTermination(5000L, TimeUnit.MILLISECONDS);
        }
        catch (InterruptedException ex) {
            logger.debug("Executor Termination", (Throwable)ex);
        }
        logger.info("Server stopped listening at {}", (Object)this.address);
    }

    public synchronized InetSocketAddress run(EventLoop eventloop) {
        eventloop.start(null, this.port, (Listener.ServerListener)this);
        while (this.address == null) {
            try {
                this.wait(20L);
            }
            catch (InterruptedException ex) {
                throw new RuntimeException(ex);
            }
        }
        this.eventloop = eventloop;
        return this.address;
    }

    public void setAuthToken(byte[] authToken) {
        this.authToken = authToken;
    }

    public static void main(String[] args) throws Exception {
        int port = args.length > 0 ? Integer.parseInt(args[0]) : 0;
        DefaultEventLoop eventloop = DefaultEventLoop.createEventLoop((String)"alone");
        eventloop.start(null, port, (Listener.ServerListener)new Server(port));
        new Thread((Runnable)eventloop).start();
    }

    public String toString() {
        return this.identity;
    }

    private void handlePurgeRequest(PurgeRequestTuple request, AbstractLengthPrependerClient ctx) throws IOException {
        byte[] message;
        DataList dl = this.publisherBuffers.get(request.getIdentifier());
        if (dl == null) {
            message = ("Invalid identifier '" + request.getIdentifier() + "'").getBytes();
        } else {
            dl.purge((long)request.getBaseSeconds() << 32 | (long)request.getWindowId());
            message = ("Request sent for processing: " + request).getBytes();
        }
        byte[] tuple = PayloadTuple.getSerializedTuple(0, message.length);
        System.arraycopy(message, 0, tuple, tuple.length - message.length, message.length);
        if (!ctx.write(tuple)) {
            logger.error("Failed to deliver purge ack message. {} send buffers are full.", (Object)ctx);
            throw new RuntimeException("Failed to deliver purge ack message. " + ctx + "send buffers are full.");
        }
        ctx.write();
    }

    public void purge(long windowId) {
        for (DataList dataList : this.publisherBuffers.values()) {
            dataList.purge(windowId);
        }
    }

    private void handleResetRequest(ResetRequestTuple request, AbstractLengthPrependerClient ctx) throws IOException {
        byte[] message;
        DataList dl = this.publisherBuffers.remove(request.getIdentifier());
        if (dl == null) {
            message = ("Invalid identifier '" + request.getIdentifier() + "'").getBytes();
        } else {
            AbstractLengthPrependerClient channel = this.publisherChannels.remove(request.getIdentifier());
            if (channel != null) {
                this.eventloop.disconnect((Listener.ClientListener)channel);
            }
            dl.reset();
            message = ("Request sent for processing: " + request).getBytes();
        }
        byte[] tuple = PayloadTuple.getSerializedTuple(0, message.length);
        System.arraycopy(message, 0, tuple, tuple.length - message.length, message.length);
        if (!ctx.write(tuple)) {
            logger.error("Failed to deliver reset ack message. {} send buffers are full.", (Object)ctx);
            throw new RuntimeException("Failed to deliver reset ack message. " + ctx + "send buffers are full.");
        }
        ctx.write();
    }

    public LogicalNode handleSubscriberRequest(SubscribeRequestTuple request, final AbstractLengthPrependerClient connection) {
        LogicalNode ln;
        String identifier = request.getIdentifier();
        String type = request.getStreamType();
        String upstream_identifier = request.getUpstreamIdentifier();
        if (this.subscriberGroups.containsKey(type)) {
            AbstractLengthPrependerClient previous = this.subscriberChannels.put(identifier, connection);
            if (previous != null) {
                this.eventloop.disconnect((Listener.ClientListener)previous);
            }
            ln = this.subscriberGroups.get(type);
            this.serverHelperExecutor.submit(new Runnable(){

                @Override
                public void run() {
                    ln.boot(Server.this.eventloop);
                    ln.addConnection(connection);
                    ln.catchUp();
                }
            });
        } else {
            DataList dl;
            if (this.publisherBuffers.containsKey(upstream_identifier)) {
                dl = this.publisherBuffers.get(upstream_identifier);
            } else {
                dl = "1.1".equals(request.getVersion()) ? new FastDataList(upstream_identifier, this.blockSize, this.numberOfCacheBlocks) : new DataList(upstream_identifier, this.blockSize, this.numberOfCacheBlocks);
                this.publisherBuffers.put(upstream_identifier, dl);
            }
            long skipWindowId = (long)request.getBaseSeconds() << 32 | (long)request.getWindowId();
            ln = new LogicalNode(identifier, upstream_identifier, type, dl.newIterator(skipWindowId), skipWindowId);
            int mask = request.getMask();
            if (mask != 0) {
                int[] arr$ = request.getPartitions();
                int len$ = arr$.length;
                for (int i$ = 0; i$ < len$; ++i$) {
                    Integer bs = arr$[i$];
                    ln.addPartition(bs, mask);
                }
            }
            this.subscriberGroups.put(type, ln);
            this.serverHelperExecutor.submit(new Runnable(){

                @Override
                public void run() {
                    ln.addConnection(connection);
                    ln.catchUp();
                    dl.addDataListener(ln);
                }
            });
        }
        return ln;
    }

    public DataList handlePublisherRequest(PublishRequestTuple request, AbstractLengthPrependerClient connection) {
        DataList dl;
        String identifier = request.getIdentifier();
        if (this.publisherBuffers.containsKey(identifier)) {
            AbstractLengthPrependerClient previous = this.publisherChannels.put(identifier, connection);
            if (previous != null) {
                this.eventloop.disconnect((Listener.ClientListener)previous);
            }
            dl = this.publisherBuffers.get(identifier);
            try {
                dl.rewind(request.getBaseSeconds(), request.getWindowId());
            }
            catch (IOException ie) {
                throw new RuntimeException(ie);
            }
        } else {
            dl = "1.1".equals(request.getVersion()) ? new FastDataList(identifier, this.blockSize, this.numberOfCacheBlocks) : new DataList(identifier, this.blockSize, this.numberOfCacheBlocks);
            this.publisherBuffers.put(identifier, dl);
        }
        dl.setSecondaryStorage(this.storage, this.storageHelperExecutor);
        return dl;
    }

    public Listener.ClientListener getClientConnection(SocketChannel sc, ServerSocketChannel ssc) {
        AbstractLengthPrependerClient client;
        if (this.authToken == null) {
            client = new UnidentifiedClient();
        } else {
            AuthClient authClient = new AuthClient();
            authClient.setToken(this.authToken);
            client = authClient;
        }
        return client;
    }

    public void handleException(Exception cce, EventLoop el) {
        if (cce instanceof RuntimeException) {
            throw (RuntimeException)cce;
        }
        throw new RuntimeException(cce);
    }

    abstract class SeedDataClient
    extends AbstractLengthPrependerClient {
        public SeedDataClient() {
        }

        public SeedDataClient(int readBufferSize, int sendBufferSize) {
            super(readBufferSize, sendBufferSize);
        }

        public SeedDataClient(byte[] readbuffer, int position, int sendBufferSize) {
            super(readbuffer, position, sendBufferSize);
        }

        public void transferBuffer(byte[] array, int offset, int len) {
            int remainingCapacity;
            do {
                if (len < (remainingCapacity = this.buffer.length - this.writeOffset)) {
                    remainingCapacity = len;
                    this.byteBuffer.position(this.writeOffset + remainingCapacity);
                } else {
                    this.byteBuffer.position(this.buffer.length);
                }
                System.arraycopy(array, offset, this.buffer, this.writeOffset, remainingCapacity);
                this.read(remainingCapacity);
                offset += remainingCapacity;
            } while ((len -= remainingCapacity) > 0);
        }
    }

    class Publisher
    extends SeedDataClient {
        private final DataList datalist;
        boolean dirty;
        private volatile boolean torndown;

        Publisher(DataList dl, long windowId) {
            super(dl.getBuffer(windowId), dl.getPosition(), 1024);
            this.datalist = dl;
        }

        public void onMessage(byte[] buffer, int offset, int size) {
            this.dirty = true;
        }

        public boolean resumeReadIfSuspended() {
            Server.this.eventloop.submit(new Runnable(){

                @Override
                public void run() {
                    int interestOps = Publisher.this.key.interestOps();
                    if ((interestOps & 1) == 0) {
                        if (Publisher.this.readExt(0)) {
                            logger.debug("Resuming read on key {} with attachment {}", (Object)Publisher.this.key, Publisher.this.key.attachment());
                            Publisher.this.key.interestOps(interestOps | 1);
                        } else {
                            logger.debug("Keeping read on key {} with attachment {} suspended. ", new Object[]{Publisher.this.key, Publisher.this.key.attachment(), Publisher.this.datalist});
                            Publisher.this.datalist.notifyListeners();
                        }
                    }
                }
            });
            return true;
        }

        public void read(int len) {
            this.readExt(len);
        }

        private boolean readExt(int len) {
            this.writeOffset += len;
            block4: while (true) {
                if (this.size <= 0) {
                    this.size = this.readSize();
                    switch (this.size) {
                        case -1: {
                            if (this.writeOffset == this.buffer.length) {
                                if (this.readOffset > this.writeOffset - 5) {
                                    this.dirty = false;
                                    this.datalist.flush(this.writeOffset);
                                    if (!this.switchToNewBufferOrSuspendRead(this.buffer, this.readOffset, this.size + VarInt.getSize((int)this.size))) {
                                        return false;
                                    }
                                }
                            } else if (this.dirty) {
                                this.dirty = false;
                                this.datalist.flush(this.writeOffset);
                            }
                            return true;
                        }
                        case 0: {
                            continue block4;
                        }
                    }
                }
                if (this.writeOffset - this.readOffset < this.size) break;
                this.onMessage(this.buffer, this.readOffset, this.size);
                this.readOffset += this.size;
                this.size = 0;
            }
            if (this.writeOffset == this.buffer.length) {
                this.dirty = false;
                this.datalist.flush(this.writeOffset);
                if (!this.switchToNewBufferOrSuspendRead(this.buffer, this.readOffset - VarInt.getSize((int)this.size), this.size + VarInt.getSize((int)this.size))) {
                    this.readOffset -= VarInt.getSize((int)this.size);
                    this.size = 0;
                    return false;
                }
                this.size = 0;
            } else if (this.dirty) {
                this.dirty = false;
                this.datalist.flush(this.writeOffset);
            }
            return true;
        }

        private boolean switchToNewBufferOrSuspendRead(byte[] array, int offset, int size) {
            if (this.switchToNewBuffer(array, offset, size)) {
                return true;
            }
            this.datalist.suspendRead((AbstractClient)this);
            return false;
        }

        private boolean switchToNewBuffer(byte[] array, int offset, int size) {
            if (this.datalist.isMemoryBlockAvailable()) {
                byte[] newBuffer = this.datalist.newBuffer(size);
                this.byteBuffer = ByteBuffer.wrap(newBuffer);
                if (array == null || array.length - offset == 0) {
                    this.writeOffset = 0;
                } else {
                    this.writeOffset = array.length - offset;
                    System.arraycopy(this.buffer, offset, newBuffer, 0, this.writeOffset);
                    this.byteBuffer.position(this.writeOffset);
                }
                this.buffer = newBuffer;
                this.readOffset = 0;
                this.datalist.addBuffer(this.buffer);
                return true;
            }
            return false;
        }

        public void unregistered(SelectionKey key) {
            super.unregistered(key);
            this.teardown();
        }

        public void handleException(Exception cce, EventLoop el) {
            this.teardown();
            if (cce instanceof RejectedExecutionException && Server.this.serverHelperExecutor.isTerminated()) {
                logger.warn("Terminated Executor Exception for {}.", (Object)this, (Object)cce);
                el.disconnect((Listener.ClientListener)this);
            } else {
                super.handleException(cce, el);
            }
        }

        public String toString() {
            return ((Object)((Object)this)).getClass().getName() + '@' + Integer.toHexString(((Object)((Object)this)).hashCode()) + " {datalist=" + this.datalist + '}';
        }

        private void teardown() {
            if (this.torndown) {
                return;
            }
            this.torndown = true;
            if (Server.this.publisherChannels.containsValue((Object)this)) {
                Iterator i = Server.this.publisherChannels.entrySet().iterator();
                while (i.hasNext()) {
                    if (i.next().getValue() != this) continue;
                    i.remove();
                    break;
                }
            }
            ArrayList<LogicalNode> list = new ArrayList<LogicalNode>();
            String publisherIdentifier = this.datalist.getIdentifier();
            for (LogicalNode ln : Server.this.subscriberGroups.values()) {
                if (!publisherIdentifier.equals(ln.getUpstream())) continue;
                list.add(ln);
            }
            for (LogicalNode ln : list) {
                ln.boot(Server.this.eventloop);
            }
        }
    }

    class Subscriber
    extends AbstractLengthPrependerClient {
        private final String type;
        private final int mask;
        private final int[] partitions;
        private volatile boolean torndown;

        Subscriber(String type, int mask, int[] partitions, int bufferSize) {
            super(1024, bufferSize);
            this.type = type;
            this.mask = mask;
            this.partitions = partitions;
            this.write = false;
        }

        public void onMessage(byte[] buffer, int offset, int size) {
            logger.warn("Received data when no data is expected: {}", (Object)Arrays.toString(Arrays.copyOfRange(buffer, offset, offset + size)));
        }

        public void unregistered(SelectionKey key) {
            super.unregistered(key);
            this.teardown();
        }

        public void handleException(Exception cce, EventLoop el) {
            this.teardown();
            super.handleException(cce, el);
        }

        public String toString() {
            return "Server.Subscriber{type=" + this.type + ", mask=" + this.mask + ", partitions=" + (this.partitions == null ? "null" : Arrays.toString(this.partitions)) + '}';
        }

        private void teardown() {
            if (this.torndown) {
                return;
            }
            this.torndown = true;
            LogicalNode ln = (LogicalNode)Server.this.subscriberGroups.get(this.type);
            if (ln != null) {
                if (Server.this.subscriberChannels.containsValue((Object)this)) {
                    Iterator i = Server.this.subscriberChannels.entrySet().iterator();
                    while (i.hasNext()) {
                        if (i.next().getValue() != this) continue;
                        i.remove();
                        break;
                    }
                }
                ln.removeChannel(this);
                if (ln.getPhysicalNodeCount() == 0) {
                    DataList dl = (DataList)Server.this.publisherBuffers.get(ln.getUpstream());
                    if (dl != null) {
                        dl.removeDataListener(ln);
                    }
                    Server.this.subscriberGroups.remove(ln.getGroup());
                }
                ln.getIterator().close();
            }
        }
    }

    class UnidentifiedClient
    extends SeedDataClient {
        boolean ignore;

        UnidentifiedClient() {
        }

        public void onMessage(byte[] buffer, int offset, int size) {
            if (this.ignore) {
                return;
            }
            Tuple request = Tuple.getTuple(buffer, offset, size);
            switch (request.getType()) {
                case PUBLISHER_REQUEST: {
                    this.unregistered(this.key);
                    logger.info("Received publisher request: {}", (Object)request);
                    PublishRequestTuple publisherRequest = (PublishRequestTuple)request;
                    DataList dl = Server.this.handlePublisherRequest(publisherRequest, this);
                    dl.setAutoFlushExecutor(Server.this.serverHelperExecutor);
                    Publisher publisher = publisherRequest.getVersion().equals("1.1") ? new Publisher(dl, (long)request.getBaseSeconds() << 32 | (long)request.getWindowId()){

                        public int readSize() {
                            if (this.writeOffset - this.readOffset < 2) {
                                return -1;
                            }
                            short s = this.buffer[this.readOffset++];
                            return s | this.buffer[this.readOffset++] << 8;
                        }
                    } : new Publisher(dl, (long)request.getBaseSeconds() << 32 | (long)request.getWindowId());
                    this.key.attach((Object)publisher);
                    this.key.interestOps(1);
                    publisher.registered(this.key);
                    int len = this.writeOffset - this.readOffset - size;
                    if (len > 0) {
                        publisher.transferBuffer(this.buffer, this.readOffset + size, len);
                    }
                    this.ignore = true;
                    break;
                }
                case SUBSCRIBER_REQUEST: {
                    this.unregistered(this.key);
                    this.ignore = true;
                    logger.info("Received subscriber request: {}", (Object)request);
                    SubscribeRequestTuple subscriberRequest = (SubscribeRequestTuple)request;
                    int bufferSize = subscriberRequest.getBufferSize();
                    Subscriber subscriber = subscriberRequest.getVersion().equals("1.1") ? new Subscriber(subscriberRequest.getStreamType(), subscriberRequest.getMask(), subscriberRequest.getPartitions(), bufferSize) : new Subscriber(subscriberRequest.getStreamType(), subscriberRequest.getMask(), subscriberRequest.getPartitions(), bufferSize){

                        public int readSize() {
                            if (this.writeOffset - this.readOffset < 2) {
                                return -1;
                            }
                            short s = this.buffer[this.readOffset++];
                            return s | this.buffer[this.readOffset++] << 8;
                        }
                    };
                    this.key.attach((Object)subscriber);
                    this.key.interestOps(5);
                    subscriber.registered(this.key);
                    Server.this.handleSubscriberRequest(subscriberRequest, subscriber);
                    break;
                }
                case PURGE_REQUEST: {
                    logger.info("Received purge request: {}", (Object)request);
                    try {
                        Server.this.handlePurgeRequest((PurgeRequestTuple)request, this);
                        break;
                    }
                    catch (IOException io) {
                        throw new RuntimeException(io);
                    }
                }
                case RESET_REQUEST: {
                    logger.info("Received reset all request: {}", (Object)request);
                    try {
                        Server.this.handleResetRequest((ResetRequestTuple)request, this);
                        break;
                    }
                    catch (IOException io) {
                        throw new RuntimeException(io);
                    }
                }
                default: {
                    throw new RuntimeException("unexpected message: " + request.toString());
                }
            }
        }
    }

    class AuthClient
    extends com.datatorrent.bufferserver.client.AuthClient {
        boolean ignore;

        AuthClient() {
        }

        public void onMessage(byte[] buffer, int offset, int size) {
            if (this.ignore) {
                return;
            }
            this.authenticateMessage(buffer, offset, size);
            this.unregistered(this.key);
            UnidentifiedClient client = new UnidentifiedClient();
            this.key.attach((Object)client);
            this.key.interestOps(1);
            client.registered(this.key);
            client.connected();
            int len = this.writeOffset - this.readOffset - size;
            if (len > 0) {
                client.transferBuffer(buffer, this.readOffset + size, len);
            }
            this.ignore = true;
        }
    }
}

