/*
 * Decompiled with CFR 0.152.
 */
package org.reaktivity.reaktor.test.internal.k3po.ext.behavior;

import java.net.SocketAddress;
import java.nio.file.Path;
import java.util.Arrays;
import java.util.Deque;
import java.util.function.Consumer;
import java.util.function.LongConsumer;
import java.util.function.LongSupplier;
import org.agrona.DirectBuffer;
import org.agrona.MutableDirectBuffer;
import org.agrona.concurrent.MessageHandler;
import org.agrona.concurrent.UnsafeBuffer;
import org.agrona.concurrent.ringbuffer.RingBuffer;
import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelException;
import org.jboss.netty.channel.ChannelFuture;
import org.jboss.netty.channel.ChannelFutureListener;
import org.jboss.netty.channel.DownstreamMessageEvent;
import org.jboss.netty.channel.MessageEvent;
import org.kaazing.k3po.driver.internal.netty.channel.Channels;
import org.kaazing.k3po.driver.internal.netty.channel.CompositeChannelFuture;
import org.reaktivity.reaktor.internal.budget.DefaultBudgetCreditor;
import org.reaktivity.reaktor.internal.budget.DefaultBudgetDebitor;
import org.reaktivity.reaktor.internal.router.BudgetId;
import org.reaktivity.reaktor.internal.types.stream.FlushFW;
import org.reaktivity.reaktor.test.internal.k3po.ext.behavior.NukleusChannel;
import org.reaktivity.reaktor.test.internal.k3po.ext.behavior.NukleusChannelAddress;
import org.reaktivity.reaktor.test.internal.k3po.ext.behavior.NukleusChannelConfig;
import org.reaktivity.reaktor.test.internal.k3po.ext.behavior.NukleusClientChannel;
import org.reaktivity.reaktor.test.internal.k3po.ext.behavior.NukleusCorrelation;
import org.reaktivity.reaktor.test.internal.k3po.ext.behavior.NukleusExtensionKind;
import org.reaktivity.reaktor.test.internal.k3po.ext.behavior.NukleusTransmission;
import org.reaktivity.reaktor.test.internal.k3po.ext.behavior.NukleusUpdateMode;
import org.reaktivity.reaktor.test.internal.k3po.ext.behavior.NullChannelBuffer;
import org.reaktivity.reaktor.test.internal.k3po.ext.behavior.layout.Layout;
import org.reaktivity.reaktor.test.internal.k3po.ext.behavior.layout.StreamsLayout;
import org.reaktivity.reaktor.test.internal.k3po.ext.types.NukleusTypeSystem;
import org.reaktivity.reaktor.test.internal.k3po.ext.types.OctetsFW;
import org.reaktivity.reaktor.test.internal.k3po.ext.types.stream.AbortFW;
import org.reaktivity.reaktor.test.internal.k3po.ext.types.stream.BeginFW;
import org.reaktivity.reaktor.test.internal.k3po.ext.types.stream.ChallengeFW;
import org.reaktivity.reaktor.test.internal.k3po.ext.types.stream.DataFW;
import org.reaktivity.reaktor.test.internal.k3po.ext.types.stream.EndFW;
import org.reaktivity.reaktor.test.internal.k3po.ext.types.stream.ResetFW;
import org.reaktivity.reaktor.test.internal.k3po.ext.types.stream.WindowFW;
import org.reaktivity.reaktor.test.internal.k3po.ext.util.function.LongObjectBiConsumer;

final class NukleusTarget
implements AutoCloseable {
    private final BeginFW.Builder beginRW = new BeginFW.Builder();
    private final DataFW.Builder dataRW = new DataFW.Builder();
    private final EndFW.Builder endRW = new EndFW.Builder();
    private final AbortFW.Builder abortRW = new AbortFW.Builder();
    private final WindowFW windowRO = new WindowFW();
    private final ResetFW resetRO = new ResetFW();
    private final ChallengeFW challengeRO = new ChallengeFW();
    private final OctetsFW octetsRO = new OctetsFW();
    private final MutableDirectBuffer resetBuffer = new UnsafeBuffer(new byte[40]);
    private final ResetFW.Builder resetRW = new ResetFW.Builder();
    private final WindowFW.Builder windowRW = new WindowFW.Builder();
    private final ChallengeFW.Builder challengeRW = new ChallengeFW.Builder();
    private final FlushFW.Builder flushRW = new FlushFW.Builder();
    private final int scopeIndex;
    private final Path streamsPath;
    private final Layout layout;
    private final RingBuffer streamsBuffer;
    private final LongObjectBiConsumer<MessageHandler> registerThrottle;
    private final LongConsumer unregisterThrottle;
    private final MutableDirectBuffer writeBuffer;
    private final LongObjectBiConsumer<NukleusCorrelation> correlateNew;
    private final LongSupplier supplyTimestamp;
    private final LongSupplier supplyTraceId;

    NukleusTarget(int scopeIndex, Path streamsPath, StreamsLayout layout, MutableDirectBuffer writeBuffer, LongObjectBiConsumer<MessageHandler> registerThrottle, LongConsumer unregisterThrottle, LongObjectBiConsumer<NukleusCorrelation> correlateNew, LongSupplier supplyTimestamp, LongSupplier supplyTraceId) {
        this.scopeIndex = scopeIndex;
        this.streamsPath = streamsPath;
        this.layout = layout;
        this.streamsBuffer = layout.streamsBuffer();
        this.writeBuffer = writeBuffer;
        this.registerThrottle = registerThrottle;
        this.unregisterThrottle = unregisterThrottle;
        this.correlateNew = correlateNew;
        this.supplyTimestamp = supplyTimestamp;
        this.supplyTraceId = supplyTraceId;
    }

    @Override
    public void close() {
        this.layout.close();
    }

    public String toString() {
        return String.format("%s [%s]", this.getClass().getSimpleName(), this.streamsPath);
    }

    public RingBuffer streamsBuffer() {
        return this.streamsBuffer;
    }

    public void doSystemWindow(long traceId, long budgetId, int credit) {
        WindowFW window = this.windowRW.wrap(this.writeBuffer, 0, this.writeBuffer.capacity()).routeId(0L).streamId(0L).traceId(traceId).budgetId(budgetId).credit(credit).padding(0).build();
        this.streamsBuffer.write(window.typeId(), window.buffer(), window.offset(), window.sizeof());
    }

    public void doSystemFlush(long traceId, long budgetId) {
        FlushFW flush = this.flushRW.wrap(this.writeBuffer, 0, this.writeBuffer.capacity()).routeId(0L).streamId(0L).traceId(traceId).budgetId(budgetId).build();
        this.streamsBuffer.write(flush.typeId(), flush.buffer(), flush.offset(), flush.sizeof());
    }

    public void doConnect(NukleusClientChannel clientChannel, NukleusChannelAddress localAddress, NukleusChannelAddress remoteAddress, ChannelFuture connectFuture) {
        try {
            long routeId = clientChannel.routeId();
            long initialId = clientChannel.targetId();
            long replyId = initialId & 0xFFFFFFFFFFFFFFFEL;
            clientChannel.sourceId(replyId);
            ChannelFuture windowFuture = org.jboss.netty.channel.Channels.future((Channel)clientChannel);
            ChannelFuture replyFuture = org.jboss.netty.channel.Channels.succeededFuture((Channel)clientChannel);
            NukleusChannelConfig clientConfig = (NukleusChannelConfig)clientChannel.getConfig();
            switch (clientConfig.getTransmission()) {
                case DUPLEX: {
                    ChannelFuture correlatedFuture = clientChannel.beginInputFuture();
                    this.correlateNew.accept(replyId, new NukleusCorrelation(clientChannel, correlatedFuture));
                    replyFuture = correlatedFuture;
                    break;
                }
                case HALF_DUPLEX: {
                    ChannelFuture correlatedFuture = clientChannel.beginInputFuture();
                    this.correlateNew.accept(replyId, new NukleusCorrelation(clientChannel, correlatedFuture));
                    correlatedFuture.addListener(f -> org.jboss.netty.channel.Channels.fireChannelInterestChanged((Channel)f.getChannel()));
                    break;
                }
            }
            long budgetId = clientConfig.getBudgetId();
            if (budgetId != 0L) {
                long creditorId = budgetId | BudgetId.budgetMask((int)this.scopeIndex);
                DefaultBudgetCreditor creditor = clientChannel.reaktor.supplyCreditor(clientChannel);
                clientChannel.setCreditor(creditor, creditorId);
                int sharedWindow = clientConfig.getSharedWindow();
                if ((long)sharedWindow != 0L) {
                    long creditorIndex = creditor.acquire(creditorId);
                    if (creditorIndex == -1L) {
                        clientChannel.getCloseFuture().setFailure((Throwable)new ChannelException("Unable to acquire creditor"));
                    } else {
                        clientChannel.setCreditorIndex(creditorIndex);
                        creditor.credit(0L, creditorIndex, (long)sharedWindow);
                    }
                }
            }
            long authorization = remoteAddress.getAuthorization();
            clientChannel.targetAuth(authorization);
            long affinity = clientConfig.getAffinity();
            ChannelBuffer beginExt = clientChannel.writeExtBuffer(NukleusExtensionKind.BEGIN, true);
            int writableExtBytes = beginExt.readableBytes();
            byte[] beginExtCopy = this.writeExtCopy(beginExt);
            BeginFW begin = this.beginRW.wrap(this.writeBuffer, 0, this.writeBuffer.capacity()).routeId(routeId).streamId(initialId).timestamp(this.supplyTimestamp.getAsLong()).traceId(this.supplyTraceId.getAsLong()).authorization(authorization).affinity(affinity).extension(p -> p.set(beginExtCopy)).build();
            clientChannel.setRemoteAddress(remoteAddress);
            ChannelFuture handshakeFuture = this.newHandshakeFuture(clientChannel, connectFuture, windowFuture, replyFuture);
            Throttle throttle = new Throttle(clientChannel, windowFuture, handshakeFuture);
            this.registerThrottle.accept(begin.streamId(), (x$0, x$1, x$2, x$3) -> throttle.handleThrottle(x$0, x$1, x$2, x$3));
            this.streamsBuffer.write(begin.typeId(), begin.buffer(), begin.offset(), begin.sizeof());
            beginExt.skipBytes(writableExtBytes);
            beginExt.discardReadBytes();
            NukleusChannelConfig config = (NukleusChannelConfig)clientChannel.getConfig();
            if (config.getUpdate() == NukleusUpdateMode.PROACTIVE) {
                NukleusChannelConfig channelConfig = (NukleusChannelConfig)clientChannel.getConfig();
                int initialWindow = channelConfig.getWindow();
                int padding = channelConfig.getPadding();
                long creditorId = clientChannel.creditorId();
                this.doWindow(clientChannel, creditorId, initialWindow, padding);
            }
            clientChannel.beginOutputFuture().setSuccess();
        }
        catch (Exception ex) {
            connectFuture.setFailure((Throwable)ex);
        }
    }

    private ChannelFuture newHandshakeFuture(final NukleusClientChannel clientChannel, final ChannelFuture connectFuture, ChannelFuture windowFuture, ChannelFuture replyFuture) {
        CompositeChannelFuture handshakeFuture = new CompositeChannelFuture((Channel)clientChannel, Arrays.asList(windowFuture, replyFuture));
        handshakeFuture.addListener(new ChannelFutureListener(){

            public void operationComplete(ChannelFuture future) throws Exception {
                if (future.isSuccess()) {
                    clientChannel.setConnected();
                    clientChannel.setFlushable();
                    connectFuture.setSuccess();
                    org.jboss.netty.channel.Channels.fireChannelConnected((Channel)clientChannel, (SocketAddress)((Object)clientChannel.getRemoteAddress()));
                } else {
                    connectFuture.setFailure(future.getCause());
                }
            }
        });
        return handshakeFuture;
    }

    public void doConnectAbort(NukleusClientChannel clientChannel) {
        long routeId = clientChannel.routeId();
        long initialId = clientChannel.targetId();
        AbortFW abort = this.abortRW.wrap(this.writeBuffer, 0, this.writeBuffer.capacity()).routeId(routeId).streamId(initialId).timestamp(this.supplyTimestamp.getAsLong()).traceId(this.supplyTraceId.getAsLong()).build();
        this.streamsBuffer.write(abort.typeId(), abort.buffer(), abort.offset(), abort.sizeof());
        this.unregisterThrottle.accept(initialId);
    }

    public void doPrepareReply(NukleusChannel channel, ChannelFuture windowFuture, ChannelFuture handshakeFuture) {
        Throttle throttle = new Throttle(channel, windowFuture, handshakeFuture);
        this.registerThrottle.accept(channel.targetId(), (x$0, x$1, x$2, x$3) -> throttle.handleThrottle(x$0, x$1, x$2, x$3));
    }

    public void doBeginReply(NukleusChannel channel) {
        ChannelBuffer beginExt = channel.writeExtBuffer(NukleusExtensionKind.BEGIN, true);
        int writableExtBytes = beginExt.readableBytes();
        byte[] beginExtCopy = this.writeExtCopy(beginExt);
        long routeId = channel.routeId();
        long replyId = channel.targetId();
        long affinity = ((NukleusChannelConfig)channel.getConfig()).getAffinity();
        BeginFW begin = this.beginRW.wrap(this.writeBuffer, 0, this.writeBuffer.capacity()).routeId(routeId).streamId(replyId).timestamp(this.supplyTimestamp.getAsLong()).traceId(this.supplyTraceId.getAsLong()).affinity(affinity).extension(p -> p.set(beginExtCopy)).build();
        this.streamsBuffer.write(begin.typeId(), begin.buffer(), begin.offset(), begin.sizeof());
        beginExt.skipBytes(writableExtBytes);
        beginExt.discardReadBytes();
        channel.beginOutputFuture().setSuccess();
    }

    public void doWrite(NukleusChannel channel, MessageEvent newWriteRequest) {
        this.doFlushBegin(channel);
        channel.writeRequests.addLast(newWriteRequest);
        this.flushThrottledWrites(channel);
    }

    public void doFlush(NukleusChannel channel, ChannelFuture flushFuture) {
        this.doFlushBegin(channel);
        if (channel.writeExtBuffer(NukleusExtensionKind.DATA, true).readable()) {
            if (channel.writeRequests.isEmpty()) {
                NullChannelBuffer message = NullChannelBuffer.NULL_BUFFER;
                DownstreamMessageEvent newWriteRequest = new DownstreamMessageEvent((Channel)channel, flushFuture, (Object)message, null);
                channel.writeRequests.addLast((MessageEvent)newWriteRequest);
            }
            this.flushThrottledWrites(channel);
        } else {
            this.flushThrottledWrites(channel);
            flushFuture.setSuccess();
            Channels.fireFlushed((Channel)channel);
        }
    }

    public void doSystemFlush(NukleusChannel channel, ChannelFuture flushFuture) {
        if (channel.beginOutputFuture().isDone()) {
            this.flushThrottledWrites(channel);
        }
        flushFuture.setSuccess();
    }

    public void doAdviseOutput(NukleusChannel channel, ChannelFuture adviseFuture, Object value) {
        if (value == NukleusTypeSystem.ADVISORY_FLUSH) {
            this.doAdviseOutputFlush(channel, adviseFuture);
        } else {
            adviseFuture.setFailure((Throwable)new ChannelException("unexpected: " + value));
        }
    }

    private void doAdviseOutputFlush(NukleusChannel channel, ChannelFuture adviseFuture) {
        long routeId = channel.routeId();
        long streamId = channel.targetId();
        long authorization = channel.targetAuth();
        long budgetId = channel.debitorId();
        ChannelBuffer writeExt = channel.writeExtBuffer(NukleusExtensionKind.FLUSH, true);
        byte[] writeExtCopy = this.writeExtCopy(writeExt);
        FlushFW flush = this.flushRW.wrap(this.writeBuffer, 0, this.writeBuffer.capacity()).routeId(routeId).streamId(streamId).timestamp(this.supplyTimestamp.getAsLong()).traceId(this.supplyTraceId.getAsLong()).authorization(authorization).budgetId(budgetId).extension(ex -> ex.set(writeExtCopy)).build();
        this.streamsBuffer.write(flush.typeId(), flush.buffer(), flush.offset(), flush.sizeof());
        adviseFuture.setSuccess();
    }

    public void doAbortOutput(NukleusChannel channel, ChannelFuture abortFuture) {
        this.doFlushBegin(channel);
        long routeId = channel.routeId();
        long streamId = channel.targetId();
        long authorization = channel.targetAuth();
        AbortFW abort = this.abortRW.wrap(this.writeBuffer, 0, this.writeBuffer.capacity()).routeId(routeId).streamId(streamId).timestamp(this.supplyTimestamp.getAsLong()).traceId(this.supplyTraceId.getAsLong()).authorization(authorization).build();
        this.streamsBuffer.write(abort.typeId(), abort.buffer(), abort.offset(), abort.sizeof());
        this.unregisterThrottle.accept(streamId);
        abortFuture.setSuccess();
        if (channel.setWriteAborted() && channel.setWriteClosed()) {
            org.jboss.netty.channel.Channels.fireChannelDisconnected((Channel)channel);
            org.jboss.netty.channel.Channels.fireChannelUnbound((Channel)channel);
            org.jboss.netty.channel.Channels.fireChannelClosed((Channel)channel);
        }
    }

    public void doShutdownOutput(NukleusChannel channel, ChannelFuture handlerFuture) {
        this.doFlushBegin(channel);
        long routeId = channel.routeId();
        long streamId = channel.targetId();
        ChannelBuffer endExt = channel.writeExtBuffer(NukleusExtensionKind.END, true);
        int writableExtBytes = endExt.readableBytes();
        byte[] endExtCopy = this.writeExtCopy(endExt);
        long authorization = channel.targetAuth();
        EndFW end = this.endRW.wrap(this.writeBuffer, 0, this.writeBuffer.capacity()).routeId(routeId).streamId(streamId).timestamp(this.supplyTimestamp.getAsLong()).traceId(this.supplyTraceId.getAsLong()).authorization(authorization).extension(p -> p.set(endExtCopy)).build();
        this.streamsBuffer.write(end.typeId(), end.buffer(), end.offset(), end.sizeof());
        endExt.skipBytes(writableExtBytes);
        endExt.discardReadBytes();
        this.unregisterThrottle.accept(streamId);
        Channels.fireOutputShutdown((Channel)channel);
        handlerFuture.setSuccess();
        if (channel.setWriteClosed()) {
            org.jboss.netty.channel.Channels.fireChannelDisconnected((Channel)channel);
            org.jboss.netty.channel.Channels.fireChannelUnbound((Channel)channel);
            org.jboss.netty.channel.Channels.fireChannelClosed((Channel)channel);
        }
    }

    public void doClose(NukleusChannel channel, ChannelFuture handlerFuture) {
        this.doFlushBegin(channel);
        long routeId = channel.routeId();
        long streamId = channel.targetId();
        ChannelBuffer endExt = channel.writeExtBuffer(NukleusExtensionKind.END, true);
        int writableExtBytes = endExt.readableBytes();
        byte[] endExtCopy = this.writeExtCopy(endExt);
        EndFW end = this.endRW.wrap(this.writeBuffer, 0, this.writeBuffer.capacity()).routeId(routeId).streamId(streamId).timestamp(this.supplyTimestamp.getAsLong()).traceId(this.supplyTraceId.getAsLong()).authorization(channel.targetAuth()).extension(p -> p.set(endExtCopy)).build();
        this.streamsBuffer.write(end.typeId(), end.buffer(), end.offset(), end.sizeof());
        endExt.skipBytes(writableExtBytes);
        endExt.discardReadBytes();
        this.unregisterThrottle.accept(streamId);
        handlerFuture.setSuccess();
        if (channel.setClosed()) {
            org.jboss.netty.channel.Channels.fireChannelDisconnected((Channel)channel);
            org.jboss.netty.channel.Channels.fireChannelUnbound((Channel)channel);
            org.jboss.netty.channel.Channels.fireChannelClosed((Channel)channel);
        }
    }

    private boolean doFlushBegin(NukleusChannel channel) {
        boolean doFlush;
        boolean bl = doFlush = !channel.beginOutputFuture().isDone();
        if (doFlush) {
            this.doBeginReply(channel);
        }
        return doFlush;
    }

    private void flushThrottledWrites(NukleusChannel channel) {
        Deque<MessageEvent> writeRequests = channel.writeRequests;
        while (channel.isFlushable() && !writeRequests.isEmpty()) {
            boolean flushed;
            MessageEvent writeRequest = writeRequests.peekFirst();
            ChannelBuffer writeBuf = (ChannelBuffer)writeRequest.getMessage();
            ChannelBuffer writeExt = channel.writeExtBuffer(NukleusExtensionKind.DATA, true);
            if (!(writeBuf.readable() || writeExt.readable() ? !(flushed = this.flushData(channel, writeBuf, writeExt)) : channel.isTargetWriteRequestInProgress())) continue;
            break;
        }
    }

    private boolean flushData(NukleusChannel channel, ChannelBuffer writeBuf, ChannelBuffer writeExt) {
        boolean flushable;
        long authorization = channel.targetAuth();
        boolean flushing = writeBuf == NullChannelBuffer.NULL_BUFFER;
        int reservedBytes = channel.reservedBytes(Math.min(writeBuf.readableBytes(), this.writeBuffer.capacity() >> 1));
        int writableBytes = Math.max(Math.min(reservedBytes - channel.writablePadding, writeBuf.readableBytes()), 0);
        boolean bl = flushable = writableBytes > 0 || writeBuf.capacity() == 0;
        if (flushable) {
            int writeReaderIndex = writeBuf.readerIndex();
            if (writeReaderIndex == 0) {
                channel.targetWriteRequestProgressing();
            }
            int writableExtBytes = writeExt.readableBytes();
            byte[] writeExtCopy = this.writeExtCopy(writeExt);
            OctetsFW writeCopy = null;
            if (writeBuf != NullChannelBuffer.NULL_BUFFER) {
                byte[] writeCopyBytes = new byte[writableBytes];
                writeBuf.getBytes(writeReaderIndex, writeCopyBytes);
                writeCopy = this.octetsRO.wrap((DirectBuffer)new UnsafeBuffer(writeCopyBytes), 0, writableBytes);
            }
            int flags = 0;
            if (writableBytes == writeBuf.readableBytes()) {
                flags |= 1;
            }
            if (writeReaderIndex == 0) {
                flags |= 2;
            }
            long streamId = channel.targetId();
            long routeId = channel.routeId();
            long budgetId = channel.debitorId();
            DataFW data = this.dataRW.wrap(this.writeBuffer, 0, this.writeBuffer.capacity()).routeId(routeId).streamId(streamId).timestamp(this.supplyTimestamp.getAsLong()).traceId(this.supplyTraceId.getAsLong()).authorization(authorization).flags(flags).budgetId(budgetId).reserved(reservedBytes).payload(writeCopy).extension(p -> p.set(writeExtCopy)).build();
            flushable = this.streamsBuffer.write(data.typeId(), data.buffer(), data.offset(), data.sizeof());
            if (flushable) {
                channel.writtenBytes(writableBytes, reservedBytes);
                writeBuf.skipBytes(writableBytes);
                writeExt.skipBytes(writableExtBytes);
                writeExt.discardReadBytes();
            }
        }
        if (flushing) {
            Channels.fireFlushed((Channel)channel);
        } else if (flushable) {
            org.jboss.netty.channel.Channels.fireWriteComplete((Channel)channel, (long)writableBytes);
        }
        channel.targetWriteRequestProgress();
        return flushable;
    }

    private byte[] writeExtCopy(ChannelBuffer writeExt) {
        int writableExtBytes = writeExt.readableBytes();
        byte[] writeExtArray = writeExt.array();
        int writeExtArrayOffset = writeExt.arrayOffset();
        int writeExtReaderIndex = writeExt.readerIndex();
        byte[] writeExtCopy = new byte[writableExtBytes];
        System.arraycopy(writeExtArray, writeExtArrayOffset + writeExtReaderIndex, writeExtCopy, 0, writeExtCopy.length);
        return writeExtCopy;
    }

    void doWindow(NukleusChannel channel, long budgetId, int credit, int padding) {
        long routeId = channel.routeId();
        long streamId = channel.sourceId();
        byte capabilities = ((NukleusChannelConfig)channel.getConfig()).getCapabilities();
        channel.readableBytes(credit);
        WindowFW window = this.windowRW.wrap(this.writeBuffer, 0, this.writeBuffer.capacity()).routeId(routeId).streamId(streamId).timestamp(this.supplyTimestamp.getAsLong()).traceId(this.supplyTraceId.getAsLong()).budgetId(budgetId).credit(credit).padding(padding).capabilities(capabilities).build();
        this.streamsBuffer.write(window.typeId(), window.buffer(), window.offset(), window.sizeof());
    }

    void doReset(NukleusChannel channel, long traceId) {
        long routeId = channel.routeId();
        long streamId = channel.sourceId();
        this.doReset(routeId, streamId, traceId);
    }

    void doReset(long routeId, long streamId, long traceId) {
        ResetFW reset = this.resetRW.wrap(this.writeBuffer, 0, this.writeBuffer.capacity()).routeId(routeId).streamId(streamId).timestamp(this.supplyTimestamp.getAsLong()).traceId(traceId).build();
        this.streamsBuffer.write(reset.typeId(), reset.buffer(), reset.offset(), reset.sizeof());
    }

    void doChallenge(long routeId, long streamId, long traceId, ChannelBuffer extension) {
        byte[] extensionCopy = this.writeExtCopy(extension);
        ChallengeFW challenge = this.challengeRW.wrap(this.writeBuffer, 0, this.writeBuffer.capacity()).routeId(routeId).streamId(streamId).timestamp(this.supplyTimestamp.getAsLong()).traceId(traceId).extension(p -> p.set(extensionCopy)).build();
        this.streamsBuffer.write(challenge.typeId(), challenge.buffer(), challenge.offset(), challenge.sizeof());
    }

    private final class Throttle {
        private final NukleusChannel channel;
        private final ChannelFuture windowFuture;
        private final ChannelFuture handshakeFuture;
        private Consumer<WindowFW> windowHandler;
        private Consumer<ResetFW> resetHandler;

        private Throttle(NukleusChannel channel, ChannelFuture windowFuture, ChannelFuture handshakeFuture) {
            this.channel = channel;
            this.windowHandler = this::onWindowBeforeWritable;
            this.windowFuture = windowFuture;
            this.handshakeFuture = handshakeFuture;
            boolean isChildChannel = channel.getParent() != null;
            boolean isHalfDuplex = ((NukleusChannelConfig)channel.getConfig()).getTransmission() == NukleusTransmission.HALF_DUPLEX;
            this.resetHandler = isChildChannel && isHalfDuplex ? this::onReset : this::onResetBeforeHandshake;
            handshakeFuture.addListener(this::onHandshakeCompleted);
        }

        private void handleThrottle(int msgTypeId, DirectBuffer buffer, int index, int length) {
            switch (msgTypeId) {
                case 0x40000001: {
                    ResetFW reset = NukleusTarget.this.resetRO.wrap(buffer, index, index + length);
                    this.resetHandler.accept(reset);
                    break;
                }
                case 0x40000002: {
                    WindowFW window = NukleusTarget.this.windowRO.wrap(buffer, index, index + length);
                    this.windowHandler.accept(window);
                    break;
                }
                case 0x40000004: {
                    ChallengeFW challenge = NukleusTarget.this.challengeRO.wrap(buffer, index, index + length);
                    this.onChallenge(challenge);
                    break;
                }
                default: {
                    throw new IllegalArgumentException("Unexpected message type: " + msgTypeId);
                }
            }
        }

        private void onChallenge(ChallengeFW challenge) {
            OctetsFW challengeExt = challenge.extension();
            int challengeExtBytes = challengeExt.sizeof();
            if (challengeExtBytes != 0) {
                DirectBuffer buffer = challengeExt.buffer();
                int offset = challengeExt.offset();
                byte[] challengeExtCopy = new byte[challengeExtBytes];
                buffer.getBytes(offset, challengeExtCopy);
                this.channel.readExtBuffer(NukleusExtensionKind.CHALLENGE).writeBytes(challengeExtCopy);
            }
            Channels.fireOutputAdvised((Channel)this.channel, (Object)NukleusTypeSystem.ADVISORY_CHALLENGE);
        }

        private void onWindow(WindowFW window) {
            long traceId = window.traceId();
            long budgetId = window.budgetId();
            int credit = window.credit();
            int padding = window.padding();
            int minimum = window.minimum();
            int capabilities = window.capabilities();
            if (!(budgetId == 0L || this.channel.hasDebitor() || this.channel.isWriteClosed() || this.channel.getCloseFuture().isSuccess())) {
                DefaultBudgetDebitor debitor = this.channel.reaktor.supplyDebitor(this.channel, budgetId);
                this.channel.setDebitor(debitor, budgetId);
            }
            this.channel.writableWindow(credit, padding, minimum, traceId);
            this.channel.capabilities(capabilities);
            NukleusTarget.this.flushThrottledWrites(this.channel);
        }

        private void onReset(ResetFW reset) {
            long streamId = reset.streamId();
            NukleusTarget.this.unregisterThrottle.accept(streamId);
            if (this.channel.setWriteAborted()) {
                if (this.channel.setWriteClosed()) {
                    Channels.fireOutputAborted((Channel)this.channel);
                    org.jboss.netty.channel.Channels.fireChannelDisconnected((Channel)this.channel);
                    org.jboss.netty.channel.Channels.fireChannelUnbound((Channel)this.channel);
                    org.jboss.netty.channel.Channels.fireChannelClosed((Channel)this.channel);
                } else {
                    Channels.fireOutputAborted((Channel)this.channel);
                }
            }
        }

        private void onWindowBeforeWritable(WindowFW window) {
            this.windowHandler = this::onWindow;
            this.channel.setFlushable();
            this.windowFuture.setSuccess();
            this.windowHandler.accept(window);
        }

        private void onResetBeforeHandshake(ResetFW reset) {
            this.handshakeFuture.setFailure((Throwable)new ChannelException("handshake failed"));
        }

        private void onHandshakeCompleted(ChannelFuture future) {
            this.resetHandler = this::onReset;
            if (!future.isSuccess()) {
                long streamId = this.channel.sourceId();
                long routeId = this.channel.routeId();
                ResetFW reset = NukleusTarget.this.resetRW.wrap(NukleusTarget.this.resetBuffer, 0, NukleusTarget.this.resetBuffer.capacity()).routeId(routeId).streamId(streamId).timestamp(NukleusTarget.this.supplyTimestamp.getAsLong()).traceId(NukleusTarget.this.supplyTraceId.getAsLong()).build();
                this.resetHandler.accept(reset);
            }
        }
    }
}

