/*
 * Decompiled with CFR 0.152.
 */
package com.thinkaurelius.thrift;

import com.lmax.disruptor.BlockingWaitStrategy;
import com.lmax.disruptor.EventTranslator;
import com.lmax.disruptor.ExceptionHandler;
import com.lmax.disruptor.FatalExceptionHandler;
import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.Sequence;
import com.lmax.disruptor.WaitStrategy;
import com.lmax.disruptor.WorkHandler;
import com.lmax.disruptor.WorkerPool;
import com.sun.jna.Pointer;
import com.thinkaurelius.thrift.Message;
import com.thinkaurelius.thrift.TDisruptorServerMBean;
import com.thinkaurelius.thrift.util.TBinaryProtocol;
import com.thinkaurelius.thrift.util.ThriftFactories;
import java.io.IOException;
import java.lang.management.ManagementFactory;
import java.nio.channels.CancelledKeyException;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.spi.SelectorProvider;
import java.util.Iterator;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadLocalRandom;
import javax.management.MBeanServer;
import javax.management.ObjectName;
import org.apache.thrift.server.AbstractNonblockingServer;
import org.apache.thrift.server.TNonblockingServer;
import org.apache.thrift.transport.TNonblockingServerTransport;
import org.apache.thrift.transport.TNonblockingTransport;
import org.apache.thrift.transport.TTransportException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class TDisruptorServer
extends TNonblockingServer
implements TDisruptorServerMBean {
    private static final Logger logger = LoggerFactory.getLogger(TDisruptorServer.class);
    private static final boolean isJNAPresent;
    public static final String MBEAN_NAME = "com.thinkaurelius.thrift.server:type=TDisruptorServer";
    private final AcceptorThread[] acceptorThreads;
    private final SelectorThread[] selectorThreads;
    private final SelectorLoadBalancer selectorLoadBalancer;
    private final ThriftFactories thriftFactories;
    private volatile boolean useHeapBasedAllocation;
    private volatile boolean isStopped;

    public TDisruptorServer(Args args) {
        super((AbstractNonblockingServer.AbstractNonblockingServerArgs)args);
        int i;
        int ringSize;
        int numCores = Runtime.getRuntime().availableProcessors();
        int numAcceptors = args.numAcceptors == null ? 2 : args.numAcceptors;
        int numSelectors = args.numSelectors == null ? numCores : args.numSelectors;
        int numWorkersPerSelector = args.numWorkersPerSelector == null ? 2 : args.numWorkersPerSelector;
        int n = ringSize = args.ringSize == null ? 2048 : args.ringSize;
        if (!(this.inputProtocolFactory_ instanceof TBinaryProtocol.Factory) || !(this.outputProtocolFactory_ instanceof TBinaryProtocol.Factory)) {
            throw new IllegalArgumentException("Please use " + TBinaryProtocol.Factory.class.getCanonicalName() + " or it's subclass as protocol factories.");
        }
        if (!args.useHeapBasedAllocation && !isJNAPresent) {
            logger.warn("Off-heap allocation couldn't be used as JNA is not present in classpath or broken, using on-heap instead.");
            args.useHeapBasedAllocation = true;
        }
        this.useHeapBasedAllocation = args.useHeapBasedAllocation;
        this.thriftFactories = new ThriftFactories(this.inputTransportFactory_, this.outputTransportFactory_, this.inputProtocolFactory_, this.outputProtocolFactory_, this.processorFactory_, args.maxFrameSizeInBytes);
        try {
            this.acceptorThreads = new AcceptorThread[numAcceptors];
            for (i = 0; i < numAcceptors; ++i) {
                this.acceptorThreads[i] = new AcceptorThread("Thrift-Acceptor_" + i, (TNonblockingServerTransport)this.serverTransport_);
            }
        }
        catch (IOException e) {
            throw new RuntimeException("Could not create acceptor threads", e);
        }
        try {
            this.selectorThreads = new SelectorThread[numSelectors];
            for (i = 0; i < numSelectors; ++i) {
                this.selectorThreads[i] = new SelectorThread("Thrift-Selector_" + i, TDisruptorServer.nextPowerOfTwo(ringSize), numWorkersPerSelector);
            }
        }
        catch (IOException e) {
            throw new RuntimeException("Could not create selector threads", e);
        }
        this.selectorLoadBalancer = new RandomSelectorLoadBalancer(this.selectorThreads);
        MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
        try {
            mbs.registerMBean(this, new ObjectName(MBEAN_NAME));
        }
        catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    protected boolean startThreads() {
        int i;
        this.isStopped = false;
        for (i = 0; i < this.selectorThreads.length; ++i) {
            this.selectorThreads[i].start();
            logger.debug("Thrift Selector thread {} is started.", (Object)i);
        }
        for (i = 0; i < this.acceptorThreads.length; ++i) {
            this.acceptorThreads[i].start();
            logger.debug("Thrift Acceptor thread {} is started.", (Object)i);
        }
        return true;
    }

    protected void waitForShutdown() {
        this.joinSelector();
        this.gracefullyShutdownInvokerPool();
    }

    protected void joinSelector() {
        try {
            for (SelectorThread selector : this.selectorThreads) {
                selector.join();
            }
        }
        catch (InterruptedException e) {
            logger.error("Interruption: " + e.getMessage());
            e.printStackTrace();
        }
    }

    protected void gracefullyShutdownInvokerPool() {
        for (SelectorThread selector : this.selectorThreads) {
            selector.shutdown();
        }
    }

    public void stop() {
        this.isStopped = true;
        for (AcceptorThread acceptorThread : this.acceptorThreads) {
            acceptorThread.wakeupSelector();
        }
        for (AbstractSelectorThread abstractSelectorThread : this.selectorThreads) {
            abstractSelectorThread.wakeupSelector();
        }
        this.unregisterMBean();
    }

    void unregisterMBean() {
        try {
            MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
            ObjectName nameObj = new ObjectName(MBEAN_NAME);
            if (mbs.isRegistered(nameObj)) {
                mbs.unregisterMBean(nameObj);
            }
        }
        catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    public boolean isStopped() {
        for (SelectorThread selector : this.selectorThreads) {
            if (selector.isStopped()) continue;
            return false;
        }
        return true;
    }

    protected boolean requestInvoke(AbstractNonblockingServer.FrameBuffer frameBuffer) {
        throw new UnsupportedOperationException();
    }

    protected abstract void beforeInvoke(Message var1);

    protected void beforeClose(Message buffer) {
    }

    private static int nextPowerOfTwo(int v) {
        return 1 << 32 - Integer.numberOfLeadingZeros(v - 1);
    }

    @Override
    public int getRingBufferSize() {
        return this.selectorThreads[0].getRingBufferSize();
    }

    @Override
    public int getNumberOfSelectors() {
        return this.selectorThreads.length;
    }

    @Override
    public boolean isHeapBasedAllocationUsed() {
        return this.useHeapBasedAllocation;
    }

    @Override
    public void useHeapBasedAllocation(boolean flag) {
        if (!flag && !isJNAPresent) {
            throw new IllegalArgumentException("Off-Heap allocation method could not be used because JNA is missing.");
        }
        this.useHeapBasedAllocation = flag;
    }

    static {
        boolean jna = false;
        try {
            new Pointer(0L);
            jna = true;
        }
        catch (NoClassDefFoundError noClassDefFoundError) {
            // empty catch block
        }
        isJNAPresent = jna;
    }

    public static class RandomSelectorLoadBalancer
    implements SelectorLoadBalancer {
        private final SelectorThread[] selectors;

        public RandomSelectorLoadBalancer(SelectorThread[] selectorThreads) {
            this.selectors = selectorThreads;
        }

        @Override
        public SelectorThread nextSelector() {
            return this.selectors[ThreadLocalRandom.current().nextInt(this.selectors.length)];
        }
    }

    public static interface SelectorLoadBalancer {
        public SelectorThread nextSelector();
    }

    public class InvocationHandler
    implements WorkHandler<Message.Invocation> {
        public void onEvent(Message.Invocation invocation) throws Exception {
            TDisruptorServer.this.beforeInvoke(invocation.getMessage());
            invocation.execute();
        }
    }

    protected class SelectorThread
    extends AbstractSelectorThread {
        private final RingBuffer<Message.Invocation> ringBuffer;
        private final WorkerPool<Message.Invocation> workerPool;
        private final ConcurrentLinkedQueue<TNonblockingTransport> newConnections;

        public SelectorThread(String name, int ringSize, int numWorkers) throws IOException {
            super(name);
            this.newConnections = new ConcurrentLinkedQueue();
            WorkHandler[] handlers = new InvocationHandler[numWorkers];
            for (int i = 0; i < handlers.length; ++i) {
                handlers[i] = new InvocationHandler();
            }
            this.ringBuffer = RingBuffer.createSingleProducer(Message.Invocation.FACTORY, (int)ringSize, (WaitStrategy)new BlockingWaitStrategy());
            this.workerPool = new WorkerPool(this.ringBuffer, this.ringBuffer.newBarrier(new Sequence[0]), (ExceptionHandler)new FatalExceptionHandler(), handlers);
            this.workerPool.start((Executor)(numWorkers == 1 ? Executors.newSingleThreadExecutor() : Executors.newFixedThreadPool(numWorkers)));
        }

        @Override
        protected void processKey(SelectionKey key) {
            Message message = (Message)key.attachment();
            if (message.isReadyToRead()) {
                this.handleRead(message);
            } else if (message.isReadyToWrite()) {
                this.handleWrite(message);
            }
        }

        @Override
        protected void selectorIterationComplete() throws IOException {
            TNonblockingTransport newClient;
            while ((newClient = this.newConnections.poll()) != null) {
                SelectionKey clientKey = newClient.registerSelector(this.selector, 1);
                clientKey.attach(new Message(newClient, clientKey, TDisruptorServer.this.thriftFactories, TDisruptorServer.this.useHeapBasedAllocation));
            }
        }

        protected void handleRead(Message message) {
            message.changeSelectInterests();
            if (!message.read()) {
                this.cancelMessage(message);
            } else if (message.isFrameFullyRead()) {
                this.dispatchInvoke(message);
            }
        }

        protected void handleWrite(Message message) {
            message.changeSelectInterests();
            if (!message.write()) {
                this.cancelMessage(message);
            }
        }

        protected void dispatchInvoke(final Message message) {
            boolean success = this.ringBuffer.tryPublishEvent((EventTranslator)new EventTranslator<Message.Invocation>(){

                public void translateTo(Message.Invocation invocation, long sequence) {
                    invocation.setMessage(message);
                }
            });
            if (!success) {
                logger.warn(this + " ring buffer is full, dropping client message.");
                this.cancelMessage(message);
            }
        }

        private void cancelMessage(Message message) {
            TDisruptorServer.this.beforeClose(message);
            message.cancel();
        }

        public void subscribe(TNonblockingTransport newClient) {
            this.newConnections.add(newClient);
        }

        public void shutdown() {
            this.workerPool.drainAndHalt();
        }

        public int getRingBufferSize() {
            return this.ringBuffer.getBufferSize();
        }
    }

    protected class AcceptorThread
    extends AbstractSelectorThread {
        private final TNonblockingServerTransport serverTransport;

        public AcceptorThread(String name, TNonblockingServerTransport serverTransport) throws IOException {
            super(name);
            this.serverTransport = serverTransport;
            this.serverTransport.registerSelector(this.selector);
        }

        @Override
        protected void processKey(SelectionKey key) throws IOException {
            if (!key.isAcceptable()) {
                return;
            }
            try {
                SelectorThread selector = TDisruptorServer.this.selectorLoadBalancer.nextSelector();
                selector.subscribe((TNonblockingTransport)this.serverTransport.accept());
                selector.wakeupSelector();
            }
            catch (TTransportException tte) {
                logger.debug("Non-fatal exception trying to accept!", (Throwable)tte);
            }
        }

        @Override
        protected void selectorIterationComplete() {
        }
    }

    protected abstract class AbstractSelectorThread
    extends Thread {
        protected final Selector selector;

        public AbstractSelectorThread(String name) throws IOException {
            super(name);
            this.selector = SelectorProvider.provider().openSelector();
        }

        public boolean isStopped() {
            return TDisruptorServer.this.isStopped;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void run() {
            try {
                while (!this.isStopped()) {
                    this.select();
                }
                this.selector.close();
            }
            catch (Throwable t) {
                logger.error("run() exiting due to uncaught error", t);
            }
            finally {
                TDisruptorServer.this.isStopped = true;
            }
        }

        private void select() {
            try {
                this.selector.select();
                Iterator<SelectionKey> selectedKeys = this.selector.selectedKeys().iterator();
                while (!this.isStopped() && selectedKeys.hasNext()) {
                    SelectionKey key = selectedKeys.next();
                    selectedKeys.remove();
                    if (!key.isValid()) {
                        this.cleanupSelectionKey(key);
                        continue;
                    }
                    this.processKey(key);
                }
                this.selectorIterationComplete();
            }
            catch (IOException e) {
                logger.warn("Got an IOException while selecting: {}!", (Throwable)e);
            }
            catch (CancelledKeyException e) {
                logger.debug("Non-fatal exception in select loop (probably somebody closed the channel)...", (Throwable)e);
            }
        }

        protected abstract void processKey(SelectionKey var1) throws IOException;

        protected abstract void selectorIterationComplete() throws IOException;

        protected void cleanupSelectionKey(SelectionKey key) {
            Message message = (Message)key.attachment();
            if (message != null) {
                message.close();
            }
            key.cancel();
        }

        public void wakeupSelector() {
            this.selector.wakeup();
        }
    }

    public static class Args
    extends AbstractNonblockingServer.AbstractNonblockingServerArgs<Args> {
        private Integer numAcceptors;
        private Integer numSelectors;
        private Integer numWorkersPerSelector;
        private Integer ringSize;
        private Integer maxFrameSizeInBytes = 0xFA0000;
        private boolean useHeapBasedAllocation;

        public Args(TNonblockingServerTransport transport) {
            super(transport);
        }

        public Args numAcceptors(int numAcceptors) {
            this.numAcceptors = numAcceptors;
            return this;
        }

        public Args numSelectors(int numSelectors) {
            this.numSelectors = numSelectors;
            return this;
        }

        public Args numWorkersPerSelector(int numWorkers) {
            this.numWorkersPerSelector = numWorkers;
            return this;
        }

        public Args ringSizePerSelector(int ringSize) {
            this.ringSize = ringSize;
            return this;
        }

        public Args useHeapBasedAllocation(boolean flag) {
            this.useHeapBasedAllocation = flag;
            return this;
        }

        public Args maxFrameSizeInBytes(int maxFrameSizeInBytes) {
            this.maxFrameSizeInBytes = maxFrameSizeInBytes;
            return this;
        }
    }
}

