/*
 * Decompiled with CFR 0.152.
 */
package org.reaktivity.reaktor.test.internal.k3po.ext.behavior;

import java.net.SocketAddress;
import java.util.Deque;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import org.agrona.CloseHelper;
import org.agrona.collections.ArrayUtil;
import org.agrona.collections.Int2ObjectHashMap;
import org.agrona.concurrent.BackoffIdleStrategy;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelException;
import org.jboss.netty.channel.ChannelFuture;
import org.jboss.netty.channel.ChannelFutureListener;
import org.jboss.netty.channel.Channels;
import org.jboss.netty.channel.MessageEvent;
import org.jboss.netty.util.ExternalResourceReleasable;
import org.reaktivity.reaktor.internal.budget.DefaultBudgetCreditor;
import org.reaktivity.reaktor.internal.budget.DefaultBudgetDebitor;
import org.reaktivity.reaktor.test.internal.k3po.ext.NukleusExtConfiguration;
import org.reaktivity.reaktor.test.internal.k3po.ext.behavior.LabelManager;
import org.reaktivity.reaktor.test.internal.k3po.ext.behavior.NukleusChannel;
import org.reaktivity.reaktor.test.internal.k3po.ext.behavior.NukleusChannelAddress;
import org.reaktivity.reaktor.test.internal.k3po.ext.behavior.NukleusChannelConfig;
import org.reaktivity.reaktor.test.internal.k3po.ext.behavior.NukleusClientChannel;
import org.reaktivity.reaktor.test.internal.k3po.ext.behavior.NukleusScope;
import org.reaktivity.reaktor.test.internal.k3po.ext.behavior.NukleusServerChannel;
import org.reaktivity.reaktor.test.internal.k3po.ext.behavior.NukleusTransmission;

public final class NukleusReaktor
implements Runnable,
ExternalResourceReleasable {
    private static final long MAX_PARK_NS = TimeUnit.MILLISECONDS.toNanos(100L);
    private static final long MIN_PARK_NS = TimeUnit.MILLISECONDS.toNanos(1L);
    private static final int MAX_YIELDS = 30;
    private static final int MAX_SPINS = 20;
    private final NukleusExtConfiguration config;
    private final Deque<Runnable> taskQueue;
    private final AtomicLong traceIds;
    private final Int2ObjectHashMap<NukleusScope> scopesByIndex;
    private final Map<Long, Integer> scopeIndexByRouteId;
    private final LabelManager labels;
    private final CountDownLatch shutdownLatch = new CountDownLatch(1);
    private final AtomicBoolean shutdown = new AtomicBoolean();
    private NukleusScope[] scopes;
    private final AtomicReference<Thread> thread;

    NukleusReaktor(NukleusExtConfiguration config) {
        this.config = config;
        this.scopesByIndex = new Int2ObjectHashMap();
        this.scopeIndexByRouteId = new ConcurrentHashMap<Long, Integer>();
        this.taskQueue = new ConcurrentLinkedDeque<Runnable>();
        this.traceIds = new AtomicLong(Long.MIN_VALUE);
        this.scopes = new NukleusScope[0];
        this.labels = new LabelManager(config.directory());
        this.thread = new AtomicReference();
    }

    public void bind(NukleusServerChannel serverChannel, NukleusChannelAddress localAddress, ChannelFuture bindFuture) {
        this.submitTask(new BindServerTask(serverChannel, localAddress, bindFuture), true);
    }

    public void unbind(NukleusServerChannel serverChannel, ChannelFuture unbindFuture) {
        this.submitTask(new UnbindServerTask(serverChannel, unbindFuture), true);
    }

    public void close(NukleusServerChannel serverChannel) {
        this.submitTask(new CloseServerTask(serverChannel), true);
    }

    public void connect(NukleusClientChannel channel, NukleusChannelAddress remoteAddress, ChannelFuture connectFuture) {
        this.submitTask(new ConnectClientTask(channel, remoteAddress, connectFuture));
    }

    public void adviseOutput(NukleusChannel channel, ChannelFuture handlerFuture, Object value) {
        this.submitTask(new AdviseOutputTask(channel, handlerFuture, value));
    }

    public void adviseInput(NukleusChannel channel, ChannelFuture handlerFuture, Object value) {
        this.submitTask(new AdviseInputTask(channel, handlerFuture, value));
    }

    public void abortOutput(NukleusChannel channel, ChannelFuture handlerFuture) {
        this.submitTask(new AbortOutputTask(channel, handlerFuture));
    }

    public void abortInput(NukleusChannel channel, ChannelFuture handlerFuture) {
        this.submitTask(new AbortInputTask(channel, handlerFuture));
    }

    public void write(MessageEvent writeRequest) {
        this.submitTask(new WriteTask(writeRequest));
    }

    public void flush(NukleusChannel channel, ChannelFuture handlerFuture) {
        this.submitTask(new FlushTask(channel, handlerFuture));
    }

    public void shutdownOutput(NukleusChannel channel, ChannelFuture handlerFuture) {
        this.submitTask(new ShutdownOutputTask(channel, handlerFuture));
    }

    public void close(NukleusChannel channel, ChannelFuture handlerFuture) {
        this.submitTask(new CloseTask(channel, handlerFuture));
    }

    public void systemFlush(NukleusChannel channel, ChannelFuture handlerFuture) {
        this.submitTask(new SystemFlushTask(channel, handlerFuture));
    }

    @Override
    public void run() {
        BackoffIdleStrategy idleStrategy = new BackoffIdleStrategy(20L, 30L, MIN_PARK_NS, MAX_PARK_NS);
        if (!this.thread.compareAndSet(null, Thread.currentThread())) {
            return;
        }
        while (!this.shutdown.get()) {
            int workCount = 0;
            workCount += this.executeTasks();
            idleStrategy.idle(workCount += this.readMessages());
        }
        this.executeTasks();
        this.shutdownLatch.countDown();
    }

    public void shutdown() {
        if (this.shutdown.compareAndSet(false, true)) {
            try {
                this.shutdownLatch.await();
                for (int i = 0; i < this.scopes.length; ++i) {
                    CloseHelper.quietClose((AutoCloseable)this.scopes[i]);
                }
            }
            catch (InterruptedException ex) {
                Thread.currentThread().interrupt();
            }
        }
    }

    public void releaseExternalResources() {
        this.shutdown();
    }

    DefaultBudgetCreditor supplyCreditor(NukleusChannel channel) {
        int scopeIndex = channel.getLocalScope();
        NukleusScope scope = this.supplyScope(scopeIndex);
        return scope.creditor();
    }

    DefaultBudgetDebitor supplyDebitor(NukleusChannel channel, long debitorId) {
        int scopeIndex = channel.getLocalScope();
        NukleusScope scope = this.supplyScope(scopeIndex);
        return scope.supplyDebitor(debitorId);
    }

    private NukleusScope supplyScope(int scopeIndex) {
        return (NukleusScope)this.scopesByIndex.computeIfAbsent(scopeIndex, this::newScope);
    }

    private int executeTasks() {
        Runnable task;
        int workCount = 0;
        while ((task = this.taskQueue.poll()) != null) {
            task.run();
            ++workCount;
        }
        return workCount;
    }

    private int readMessages() {
        int workCount = 0;
        for (int i = 0; i < this.scopes.length; ++i) {
            workCount += this.scopes[i].process();
        }
        return workCount;
    }

    private NukleusScope newScope(int scopeIndex) {
        NukleusScope scope = new NukleusScope(this.config, this.labels, scopeIndex, this::lookupTargetIndex, System::nanoTime, this.traceIds::incrementAndGet);
        this.scopes = (NukleusScope[])ArrayUtil.add((Object[])this.scopes, (Object)scope);
        return scope;
    }

    private void submitTask(Runnable task) {
        this.submitTask(task, false);
    }

    private void submitTask(Runnable task, boolean immediateIfAligned) {
        if (immediateIfAligned && this.thread.get() == Thread.currentThread()) {
            task.run();
        } else {
            this.taskQueue.offer(task);
        }
    }

    private int lookupTargetIndex(long routeId) {
        return this.scopeIndexByRouteId.getOrDefault(routeId, 0);
    }

    private final class BindServerTask
    implements Runnable {
        private final NukleusServerChannel serverChannel;
        private final NukleusChannelAddress localAddress;
        private final ChannelFuture bindFuture;

        private BindServerTask(NukleusServerChannel serverChannel, NukleusChannelAddress localAddress, ChannelFuture bindFuture) {
            this.serverChannel = serverChannel;
            this.localAddress = localAddress;
            this.bindFuture = bindFuture;
        }

        @Override
        public void run() {
            try {
                NukleusReaktor reaktor = this.serverChannel.reaktor;
                int scopeIndex = this.serverChannel.getLocalScope();
                NukleusScope scope = reaktor.supplyScope(scopeIndex);
                long routeId = scope.routeId(this.localAddress);
                long authorization = this.localAddress.getAuthorization();
                scope.doRoute(routeId, authorization, this.serverChannel);
                NukleusReaktor.this.scopeIndexByRouteId.put(routeId, scopeIndex);
                this.serverChannel.setLocalAddress(this.localAddress);
                this.serverChannel.setBound();
                Channels.fireChannelBound((Channel)this.serverChannel, (SocketAddress)((Object)this.localAddress));
                this.bindFuture.setSuccess();
            }
            catch (Exception ex) {
                this.bindFuture.setFailure((Throwable)ex);
            }
        }
    }

    private final class UnbindServerTask
    implements Runnable {
        private final NukleusServerChannel serverChannel;
        private final ChannelFuture unbindFuture;

        private UnbindServerTask(NukleusServerChannel serverChannel, ChannelFuture unbindFuture) {
            this.serverChannel = serverChannel;
            this.unbindFuture = unbindFuture;
        }

        @Override
        public void run() {
            try {
                NukleusReaktor reaktor = this.serverChannel.reaktor;
                NukleusChannelAddress localAddress = this.serverChannel.getLocalAddress();
                int scopeIndex = this.serverChannel.getLocalScope();
                NukleusScope scope = reaktor.supplyScope(scopeIndex);
                long routeId = scope.routeId(localAddress);
                long authorization = localAddress.getAuthorization();
                scope.doUnroute(routeId, authorization, this.serverChannel);
                this.serverChannel.setLocalAddress(null);
                Channels.fireChannelUnbound((Channel)this.serverChannel);
                this.unbindFuture.setSuccess();
            }
            catch (Exception ex) {
                this.unbindFuture.setFailure((Throwable)ex);
            }
        }
    }

    private final class CloseServerTask
    implements Runnable {
        private final NukleusServerChannel serverChannel;

        private CloseServerTask(NukleusServerChannel serverChannel) {
            this.serverChannel = serverChannel;
        }

        @Override
        public void run() {
            try {
                NukleusReaktor reaktor = this.serverChannel.reaktor;
                NukleusChannelAddress localAddress = this.serverChannel.getLocalAddress();
                if (localAddress != null) {
                    int scopeIndex = this.serverChannel.getLocalScope();
                    NukleusScope scope = reaktor.supplyScope(scopeIndex);
                    long routeId = scope.routeId(localAddress);
                    long authorization = localAddress.getAuthorization();
                    scope.doUnroute(routeId, authorization, this.serverChannel);
                    this.serverChannel.setLocalAddress(null);
                    Channels.fireChannelUnbound((Channel)this.serverChannel);
                }
                this.serverChannel.setClosed();
            }
            catch (ChannelException ex) {
                Channels.fireExceptionCaught((Channel)this.serverChannel, (Throwable)ex);
            }
        }
    }

    private final class ConnectClientTask
    implements Runnable {
        private final NukleusClientChannel clientChannel;
        private final NukleusChannelAddress remoteAddress;
        private final ChannelFuture connectFuture;

        private ConnectClientTask(NukleusClientChannel clientChannel, NukleusChannelAddress remoteAddress, ChannelFuture connectFuture) {
            this.clientChannel = clientChannel;
            this.remoteAddress = remoteAddress;
            this.connectFuture = connectFuture;
        }

        @Override
        public void run() {
            NukleusChannelAddress localAddress = this.remoteAddress.newEphemeralAddress();
            if (!this.clientChannel.isBound()) {
                this.clientChannel.setLocalAddress(localAddress);
                this.clientChannel.setBound();
                Channels.fireChannelBound((Channel)this.clientChannel, (SocketAddress)((Object)localAddress));
            }
            try {
                NukleusReaktor reaktor = this.clientChannel.reaktor;
                int scopeIndex = this.clientChannel.getLocalScope();
                NukleusChannelConfig clientConfig = (NukleusChannelConfig)this.clientChannel.getConfig();
                if (clientConfig.getTransmission() == NukleusTransmission.SIMPLEX) {
                    this.clientChannel.setReadClosed();
                }
                NukleusScope scope = reaktor.supplyScope(scopeIndex);
                scope.doConnect(this.clientChannel, localAddress, this.remoteAddress, this.connectFuture);
                this.connectFuture.addListener(new ChannelFutureListener(){

                    public void operationComplete(ChannelFuture future) throws Exception {
                        if (future.isCancelled()) {
                            NukleusReaktor.this.submitTask(new ConnectAbortTask(ConnectClientTask.this.clientChannel, ConnectClientTask.this.remoteAddress));
                        }
                    }
                });
            }
            catch (Exception ex) {
                this.connectFuture.setFailure((Throwable)ex);
            }
        }

        private final class ConnectAbortTask
        implements Runnable {
            private final NukleusClientChannel clientChannel;
            private final NukleusChannelAddress remoteAddress;

            private ConnectAbortTask(NukleusClientChannel clientChannel, NukleusChannelAddress remoteAddress) {
                this.clientChannel = clientChannel;
                this.remoteAddress = remoteAddress;
            }

            @Override
            public void run() {
                NukleusReaktor reaktor = this.clientChannel.reaktor;
                int scopeIndex = this.clientChannel.getLocalScope();
                NukleusScope scope = reaktor.supplyScope(scopeIndex);
                scope.doConnectAbort(this.clientChannel, this.remoteAddress);
            }
        }
    }

    private final class AdviseOutputTask
    implements Runnable {
        private final NukleusChannel channel;
        private final ChannelFuture handlerFuture;
        private final Object value;

        private AdviseOutputTask(NukleusChannel channel, ChannelFuture handlerFuture, Object value) {
            this.channel = channel;
            this.handlerFuture = handlerFuture;
            this.value = value;
        }

        @Override
        public void run() {
            try {
                if (!this.channel.isWriteClosed()) {
                    NukleusReaktor reaktor = this.channel.reaktor;
                    int scopeIndex = this.channel.getLocalScope();
                    NukleusScope scope = reaktor.supplyScope(scopeIndex);
                    scope.doAdviseOutput(this.channel, this.handlerFuture, this.value);
                }
            }
            catch (Exception ex) {
                this.handlerFuture.setFailure((Throwable)ex);
            }
        }
    }

    private final class AdviseInputTask
    implements Runnable {
        private final NukleusChannel channel;
        private final ChannelFuture handlerFuture;
        private final Object value;

        private AdviseInputTask(NukleusChannel channel, ChannelFuture handlerFuture, Object value) {
            this.channel = channel;
            this.handlerFuture = handlerFuture;
            this.value = value;
        }

        @Override
        public void run() {
            try {
                if (!this.channel.isReadClosed()) {
                    NukleusReaktor reaktor = this.channel.reaktor;
                    int scopeIndex = this.channel.getLocalScope();
                    NukleusScope scope = reaktor.supplyScope(scopeIndex);
                    scope.doAdviseInput(this.channel, this.handlerFuture, this.value);
                }
            }
            catch (Exception ex) {
                this.handlerFuture.setFailure((Throwable)ex);
            }
        }
    }

    private final class AbortOutputTask
    implements Runnable {
        private final NukleusChannel channel;
        private final ChannelFuture handlerFuture;

        private AbortOutputTask(NukleusChannel channel, ChannelFuture handlerFuture) {
            this.channel = channel;
            this.handlerFuture = handlerFuture;
        }

        @Override
        public void run() {
            try {
                if (!this.channel.isWriteClosed()) {
                    NukleusReaktor reaktor = this.channel.reaktor;
                    int scopeIndex = this.channel.getLocalScope();
                    NukleusScope scope = reaktor.supplyScope(scopeIndex);
                    scope.doAbortOutput(this.channel, this.handlerFuture);
                }
            }
            catch (Exception ex) {
                this.handlerFuture.setFailure((Throwable)ex);
            }
        }
    }

    private final class AbortInputTask
    implements Runnable {
        private final NukleusChannel channel;
        private final ChannelFuture handlerFuture;

        private AbortInputTask(NukleusChannel channel, ChannelFuture handlerFuture) {
            this.channel = channel;
            this.handlerFuture = handlerFuture;
        }

        @Override
        public void run() {
            try {
                if (!this.channel.isReadClosed()) {
                    NukleusReaktor reaktor = this.channel.reaktor;
                    int scopeIndex = this.channel.getLocalScope();
                    NukleusScope scope = reaktor.supplyScope(scopeIndex);
                    scope.doAbortInput(this.channel, this.handlerFuture);
                }
            }
            catch (Exception ex) {
                this.handlerFuture.setFailure((Throwable)ex);
            }
        }
    }

    private final class WriteTask
    implements Runnable {
        private final MessageEvent writeRequest;

        private WriteTask(MessageEvent writeRequest) {
            this.writeRequest = writeRequest;
        }

        @Override
        public void run() {
            try {
                NukleusChannel channel = (NukleusChannel)this.writeRequest.getChannel();
                if (!channel.isWriteClosed()) {
                    NukleusReaktor reaktor = channel.reaktor;
                    int scopeIndex = channel.getLocalScope();
                    NukleusScope scope = reaktor.supplyScope(scopeIndex);
                    scope.doWrite(channel, this.writeRequest);
                }
            }
            catch (Exception ex) {
                ChannelFuture writeFuture = this.writeRequest.getFuture();
                writeFuture.setFailure((Throwable)ex);
            }
        }
    }

    private final class FlushTask
    implements Runnable {
        private final NukleusChannel channel;
        private final ChannelFuture flushFuture;

        private FlushTask(NukleusChannel channel, ChannelFuture future) {
            this.channel = channel;
            this.flushFuture = future;
        }

        @Override
        public void run() {
            try {
                if (!this.channel.isWriteClosed()) {
                    NukleusReaktor reaktor = this.channel.reaktor;
                    int scopeIndex = this.channel.getLocalScope();
                    NukleusScope scope = reaktor.supplyScope(scopeIndex);
                    scope.doFlush(this.channel, this.flushFuture);
                }
            }
            catch (Exception ex) {
                this.flushFuture.setFailure((Throwable)ex);
            }
        }
    }

    private final class ShutdownOutputTask
    implements Runnable {
        private final NukleusChannel channel;
        private final ChannelFuture handlerFuture;

        private ShutdownOutputTask(NukleusChannel channel, ChannelFuture handlerFuture) {
            this.channel = channel;
            this.handlerFuture = handlerFuture;
        }

        @Override
        public void run() {
            try {
                if (!this.channel.isWriteClosed()) {
                    NukleusReaktor reaktor = this.channel.reaktor;
                    int scopeIndex = this.channel.getLocalScope();
                    NukleusScope scope = reaktor.supplyScope(scopeIndex);
                    scope.doShutdownOutput(this.channel, this.handlerFuture);
                }
            }
            catch (Exception ex) {
                this.handlerFuture.setFailure((Throwable)ex);
            }
        }
    }

    private final class CloseTask
    implements Runnable {
        private final NukleusChannel channel;
        private final ChannelFuture handlerFuture;

        private CloseTask(NukleusChannel channel, ChannelFuture handlerFuture) {
            this.channel = channel;
            this.handlerFuture = handlerFuture;
        }

        @Override
        public void run() {
            try {
                NukleusReaktor reaktor = this.channel.reaktor;
                NukleusChannelAddress remoteAddress = this.channel.getRemoteAddress();
                if (remoteAddress != null) {
                    int scopeIndex = this.channel.getLocalScope();
                    NukleusScope scope = reaktor.supplyScope(scopeIndex);
                    scope.doClose(this.channel, this.handlerFuture);
                }
            }
            catch (ChannelException ex) {
                Channels.fireExceptionCaught((Channel)this.channel, (Throwable)ex);
            }
        }
    }

    private final class SystemFlushTask
    implements Runnable {
        private final NukleusChannel channel;
        private final ChannelFuture flushFuture;

        private SystemFlushTask(NukleusChannel channel, ChannelFuture future) {
            this.channel = channel;
            this.flushFuture = future;
        }

        @Override
        public void run() {
            try {
                if (!this.channel.isWriteClosed()) {
                    NukleusReaktor reaktor = this.channel.reaktor;
                    int scopeIndex = this.channel.getLocalScope();
                    NukleusScope scope = reaktor.supplyScope(scopeIndex);
                    scope.doSystemFlush(this.channel, this.flushFuture);
                }
            }
            catch (Exception ex) {
                this.flushFuture.setFailure((Throwable)ex);
            }
        }
    }
}

