/*
 * Decompiled with CFR 0.152.
 */
package org.zeromq;

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import org.zeromq.ZMQ;
import org.zeromq.ZMsg;

public class ZDispatcher {
    private ConcurrentMap<ZMQ.Socket, SocketDispatcher> dispatchers = new ConcurrentHashMap<ZMQ.Socket, SocketDispatcher>();
    private final ExecutorService dispatcherExecutor;

    public ZDispatcher() {
        this.dispatcherExecutor = Executors.newCachedThreadPool();
    }

    public ZDispatcher(ExecutorService dispatcherExecutor) {
        this.dispatcherExecutor = dispatcherExecutor;
    }

    public void registerHandler(ZMQ.Socket socket, ZMessageHandler messageHandler, ZSender sender) {
        this.registerHandler(socket, messageHandler, sender, Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors()));
    }

    public void registerHandler(ZMQ.Socket socket, ZMessageHandler messageHandler, ZSender sender, ExecutorService threadpool) {
        SocketDispatcher socketDispatcher = new SocketDispatcher(socket, messageHandler, sender, threadpool);
        if (this.dispatchers.putIfAbsent(socket, socketDispatcher) != null) {
            throw new IllegalArgumentException("This socket already have a message handler");
        }
        socketDispatcher.start();
        this.dispatcherExecutor.execute(socketDispatcher);
    }

    public void unregisterHandler(ZMQ.Socket socket) {
        SocketDispatcher removedDispatcher = (SocketDispatcher)this.dispatchers.remove(socket);
        if (removedDispatcher == null) {
            throw new IllegalArgumentException("This socket doesn't have a message handler");
        }
        removedDispatcher.shutdown();
    }

    public void shutdown() {
        this.dispatcherExecutor.shutdown();
        for (SocketDispatcher socketDispatcher : this.dispatchers.values()) {
            socketDispatcher.shutdown();
        }
        this.dispatchers.clear();
    }

    private static final class SocketDispatcher
    implements Runnable {
        private volatile boolean active = false;
        private final CountDownLatch shutdownLatch = new CountDownLatch(1);
        private final ZMQ.Socket socket;
        private final ZMessageHandler handler;
        private final ZSender sender;
        private final ExecutorService threadpool;
        private final BlockingQueue<ZMsg> in = new LinkedBlockingQueue<ZMsg>();
        private static final int BUFFER_SIZE = 1024;
        private static final ThreadLocal<ZMessageBuffer> messages = new ThreadLocal<ZMessageBuffer>(){

            @Override
            protected ZMessageBuffer initialValue() {
                return new ZMessageBuffer();
            }
        };
        private final AtomicBoolean busy = new AtomicBoolean(false);

        public SocketDispatcher(ZMQ.Socket socket, ZMessageHandler handler, ZSender sender, ExecutorService handleThreadpool) {
            this.socket = socket;
            this.handler = handler;
            this.sender = sender;
            this.threadpool = handleThreadpool;
        }

        @Override
        public void run() {
            while (this.active) {
                this.doReceive();
                this.doHandle();
                this.doSend();
            }
            this.threadpool.shutdown();
            this.shutdownLatch.countDown();
        }

        public void start() {
            this.active = true;
        }

        public void shutdown() {
            try {
                this.active = false;
                this.shutdownLatch.await();
            }
            catch (InterruptedException interruptedException) {
                // empty catch block
            }
        }

        private void doReceive() {
            ZMsg msg;
            int remainingBuffer = 1024;
            while (this.active && remainingBuffer-- > 0 && (msg = ZMsg.recvMsg(this.socket, 1)) != null && msg.size() > 0 && msg.getFirst().hasData()) {
                this.in.add(msg);
            }
        }

        private void doHandle() {
            if (!this.in.isEmpty() && this.busy.compareAndSet(false, true)) {
                this.threadpool.submit(new Runnable(){

                    @Override
                    public void run() {
                        2 v0 = this;
                        ZMessageBuffer messages = (ZMessageBuffer)messages.get();
                        messages.drainFrom(SocketDispatcher.this.in);
                        SocketDispatcher.this.busy.set(false);
                        for (int i = 0; i <= messages.lastValidIndex; ++i) {
                            if (!SocketDispatcher.this.active) continue;
                            SocketDispatcher.this.handler.handleMessage(SocketDispatcher.this.sender, messages.buffer[i]);
                        }
                    }
                });
            }
        }

        private void doSend() {
            ZMsg msg;
            int remainingBuffer = 1024;
            while (this.active && remainingBuffer-- > 0 && (msg = (ZMsg)this.sender.out.poll()) != null) {
                msg.send(this.socket);
            }
        }

        private static class ZMessageBuffer {
            private final ZMsg[] buffer = new ZMsg[1024];
            private int lastValidIndex;

            private ZMessageBuffer() {
            }

            private void drainFrom(BlockingQueue<ZMsg> in) {
                ZMsg msg;
                this.lastValidIndex = -1;
                int lastIndex = -1;
                while (++lastIndex < this.buffer.length && (msg = (ZMsg)in.poll()) != null) {
                    this.buffer[lastIndex] = msg;
                    this.lastValidIndex = lastIndex;
                }
            }
        }
    }

    public static final class ZSender {
        private final BlockingQueue<ZMsg> out = new LinkedBlockingQueue<ZMsg>();

        public final boolean send(ZMsg msg) {
            return this.out.add(msg);
        }
    }

    public static interface ZMessageHandler {
        public void handleMessage(ZSender var1, ZMsg var2);
    }
}

