/*
 * Decompiled with CFR 0.152.
 */
package org.reaktivity.reaktor.test.internal.k3po.ext.behavior;

import java.util.function.LongConsumer;
import org.agrona.DirectBuffer;
import org.agrona.MutableDirectBuffer;
import org.agrona.concurrent.MessageHandler;
import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelFuture;
import org.kaazing.k3po.driver.internal.netty.channel.Channels;
import org.reaktivity.reaktor.test.internal.k3po.ext.behavior.NukleusChannel;
import org.reaktivity.reaktor.test.internal.k3po.ext.behavior.NukleusChannelConfig;
import org.reaktivity.reaktor.test.internal.k3po.ext.behavior.NukleusExtensionKind;
import org.reaktivity.reaktor.test.internal.k3po.ext.behavior.NukleusTarget;
import org.reaktivity.reaktor.test.internal.k3po.ext.behavior.NukleusUpdateMode;
import org.reaktivity.reaktor.test.internal.k3po.ext.behavior.NullChannelBuffer;
import org.reaktivity.reaktor.test.internal.k3po.ext.types.NukleusTypeSystem;
import org.reaktivity.reaktor.test.internal.k3po.ext.types.OctetsFW;
import org.reaktivity.reaktor.test.internal.k3po.ext.types.stream.AbortFW;
import org.reaktivity.reaktor.test.internal.k3po.ext.types.stream.BeginFW;
import org.reaktivity.reaktor.test.internal.k3po.ext.types.stream.DataFW;
import org.reaktivity.reaktor.test.internal.k3po.ext.types.stream.EndFW;
import org.reaktivity.reaktor.test.internal.k3po.ext.types.stream.FlushFW;
import org.reaktivity.reaktor.test.internal.k3po.ext.types.stream.FrameFW;
import org.reaktivity.reaktor.test.internal.k3po.ext.util.function.LongLongFunction;

public final class NukleusStreamFactory {
    private final FrameFW frameRO = new FrameFW();
    private final BeginFW beginRO = new BeginFW();
    private final DataFW dataRO = new DataFW();
    private final EndFW endRO = new EndFW();
    private final AbortFW abortRO = new AbortFW();
    private final FlushFW flushRO = new FlushFW();
    private final LongLongFunction<NukleusTarget> supplySender;
    private final LongConsumer unregisterStream;

    public NukleusStreamFactory(LongLongFunction<NukleusTarget> supplySender, LongConsumer unregisterStream) {
        this.supplySender = supplySender;
        this.unregisterStream = unregisterStream;
    }

    public void doReset(NukleusChannel channel, long traceId) {
        long routeId = channel.routeId();
        long streamId = channel.sourceId();
        NukleusTarget sender = this.supplySender.apply(routeId, streamId);
        sender.doReset(channel, traceId);
        this.unregisterStream.accept(streamId);
    }

    public void doChallenge(NukleusChannel channel, long traceId) {
        ChannelBuffer challengeExt = channel.writeExtBuffer(NukleusExtensionKind.CHALLENGE, true);
        long routeId = channel.routeId();
        long streamId = channel.sourceId();
        long sequence = channel.sourceSeq();
        long acknowledge = channel.sourceAck();
        int maximum = channel.sourceMax();
        NukleusTarget sender = this.supplySender.apply(routeId, streamId);
        sender.doChallenge(routeId, streamId, sequence, acknowledge, traceId, maximum, challengeExt);
    }

    public MessageHandler newStream(NukleusChannel channel, NukleusTarget sender, ChannelFuture beginFuture) {
        return (arg_0, arg_1, arg_2, arg_3) -> NukleusStreamFactory.lambda$newStream$0(new Stream(channel, sender, beginFuture), arg_0, arg_1, arg_2, arg_3);
    }

    private static /* synthetic */ void lambda$newStream$0(Stream rec$, int x$0, MutableDirectBuffer x$1, int x$2, int x$3) {
        rec$.handleStream(x$0, x$1, x$2, x$3);
    }

    private final class Stream {
        private final NukleusChannel channel;
        private final NukleusTarget sender;
        private final ChannelFuture beginFuture;
        private int fragments;

        private Stream(NukleusChannel channel, NukleusTarget sender, ChannelFuture beginFuture) {
            this.channel = channel;
            this.sender = sender;
            this.beginFuture = beginFuture;
        }

        private void handleStream(int msgTypeId, MutableDirectBuffer buffer, int index, int length) {
            FrameFW frame = NukleusStreamFactory.this.frameRO.wrap((DirectBuffer)buffer, index, index + length);
            long routeId = frame.routeId();
            this.verifyRouteId(routeId);
            switch (msgTypeId) {
                case 1: {
                    BeginFW begin = NukleusStreamFactory.this.beginRO.wrap((DirectBuffer)buffer, index, index + length);
                    this.onBegin(begin);
                    break;
                }
                case 2: {
                    DataFW data = NukleusStreamFactory.this.dataRO.wrap((DirectBuffer)buffer, index, index + length);
                    this.onData(data);
                    break;
                }
                case 3: {
                    EndFW end = NukleusStreamFactory.this.endRO.wrap((DirectBuffer)buffer, index, index + length);
                    this.onEnd(end);
                    break;
                }
                case 4: {
                    AbortFW abort = NukleusStreamFactory.this.abortRO.wrap((DirectBuffer)buffer, index, index + length);
                    this.onAbort(abort);
                    break;
                }
                case 5: {
                    FlushFW flush = NukleusStreamFactory.this.flushRO.wrap((DirectBuffer)buffer, index, index + length);
                    this.onFlush(flush);
                }
            }
        }

        private void onBegin(BeginFW begin) {
            long streamId = begin.streamId();
            long sequence = begin.sequence();
            long acknowledge = begin.acknowledge();
            OctetsFW beginExt = begin.extension();
            int beginExtBytes = beginExt.sizeof();
            if (beginExtBytes != 0) {
                DirectBuffer buffer = beginExt.buffer();
                int offset = beginExt.offset();
                byte[] beginExtCopy = new byte[beginExtBytes];
                buffer.getBytes(offset, beginExtCopy);
                this.channel.readExtBuffer(NukleusExtensionKind.BEGIN).writeBytes(beginExtCopy);
            }
            this.channel.sourceSeq(sequence);
            this.channel.sourceAck(acknowledge);
            this.channel.sourceId(streamId);
            this.channel.sourceAuth(begin.authorization());
            NukleusChannelConfig config = (NukleusChannelConfig)this.channel.getConfig();
            if (config.getUpdate() == NukleusUpdateMode.HANDSHAKE || config.getUpdate() == NukleusUpdateMode.STREAM) {
                this.sender.doWindow(this.channel);
            }
            this.channel.beginInputFuture().setSuccess();
            this.beginFuture.setSuccess();
        }

        private void onData(DataFW data) {
            long sequence = data.sequence();
            long traceId = data.traceId();
            int flags = data.flags();
            int reservedBytes = data.reserved();
            OctetsFW payload = data.payload();
            NullChannelBuffer message = payload == null ? NullChannelBuffer.NULL_BUFFER : payload.get(this::readBuffer);
            int readableBytes = message.readableBytes();
            OctetsFW dataExt = data.extension();
            assert (sequence >= this.channel.sourceSeq());
            if (this.channel.paddedBytes(readableBytes) <= reservedBytes && reservedBytes <= this.channel.readableBudget()) {
                this.channel.readBytes(sequence, reservedBytes);
                this.channel.readFlags(flags);
                int dataExtBytes = dataExt.sizeof();
                if (dataExtBytes != 0) {
                    DirectBuffer buffer = dataExt.buffer();
                    int offset = dataExt.offset();
                    byte[] dataExtCopy = new byte[dataExtBytes];
                    buffer.getBytes(offset, dataExtCopy);
                    this.channel.readExtBuffer(NukleusExtensionKind.DATA).writeBytes(dataExtCopy);
                }
                if ((flags & 2) != 0 && this.fragments != 0) {
                    org.jboss.netty.channel.Channels.fireExceptionCaught((Channel)this.channel, (Throwable)new IllegalStateException("invalid message boundary"));
                    this.sender.doReset(this.channel, traceId);
                } else {
                    NukleusChannelConfig config = (NukleusChannelConfig)this.channel.getConfig();
                    if (config.getUpdate() == NukleusUpdateMode.MESSAGE || config.getUpdate() == NukleusUpdateMode.STREAM || config.getUpdate() == NukleusUpdateMode.PROACTIVE) {
                        this.channel.acknowledgeBytes(reservedBytes);
                        this.channel.doSharedCredit(traceId, reservedBytes);
                        this.sender.doWindow(this.channel);
                    } else {
                        this.channel.pendingSharedCredit(reservedBytes);
                    }
                    if ((flags & 1) != 0 || (flags & 4) != 0) {
                        message.markWriterIndex();
                        this.fragments = 0;
                    } else {
                        ++this.fragments;
                    }
                    org.jboss.netty.channel.Channels.fireMessageReceived((Channel)this.channel, (Object)((Object)message));
                }
            } else {
                this.sender.doReset(this.channel, traceId);
                if (this.channel.setReadAborted()) {
                    if (this.channel.setReadClosed()) {
                        Channels.fireInputAborted((Channel)this.channel);
                        org.jboss.netty.channel.Channels.fireChannelDisconnected((Channel)this.channel);
                        org.jboss.netty.channel.Channels.fireChannelUnbound((Channel)this.channel);
                        org.jboss.netty.channel.Channels.fireChannelClosed((Channel)this.channel);
                    } else {
                        Channels.fireInputAborted((Channel)this.channel);
                    }
                }
            }
        }

        private void onEnd(EndFW end) {
            long streamId = end.streamId();
            long sequence = end.sequence();
            long traceId = end.traceId();
            this.channel.sourceSeq(sequence);
            if (end.authorization() != this.channel.sourceAuth()) {
                this.sender.doReset(this.channel, traceId);
            }
            NukleusStreamFactory.this.unregisterStream.accept(streamId);
            OctetsFW endExt = end.extension();
            int endExtBytes = endExt.sizeof();
            if (endExtBytes != 0) {
                DirectBuffer buffer = endExt.buffer();
                int offset = endExt.offset();
                byte[] endExtCopy = new byte[endExtBytes];
                buffer.getBytes(offset, endExtCopy);
                this.channel.readExtBuffer(NukleusExtensionKind.END).writeBytes(endExtCopy);
            }
            if (this.channel.setReadClosed()) {
                Channels.fireInputShutdown((Channel)this.channel);
                org.jboss.netty.channel.Channels.fireChannelDisconnected((Channel)this.channel);
                org.jboss.netty.channel.Channels.fireChannelUnbound((Channel)this.channel);
                org.jboss.netty.channel.Channels.fireChannelClosed((Channel)this.channel);
            } else {
                Channels.fireInputShutdown((Channel)this.channel);
            }
        }

        private void onAbort(AbortFW abort) {
            long streamId = abort.streamId();
            long sequence = abort.sequence();
            long traceId = abort.traceId();
            this.channel.sourceSeq(sequence);
            if (abort.authorization() != this.channel.sourceAuth()) {
                this.sender.doReset(this.channel, traceId);
            }
            NukleusStreamFactory.this.unregisterStream.accept(streamId);
            if (this.channel.setReadAborted()) {
                if (this.channel.setReadClosed()) {
                    Channels.fireInputAborted((Channel)this.channel);
                    org.jboss.netty.channel.Channels.fireChannelDisconnected((Channel)this.channel);
                    org.jboss.netty.channel.Channels.fireChannelUnbound((Channel)this.channel);
                    org.jboss.netty.channel.Channels.fireChannelClosed((Channel)this.channel);
                } else {
                    Channels.fireInputAborted((Channel)this.channel);
                }
            }
        }

        private void onFlush(FlushFW flush) {
            OctetsFW flushExt;
            int flushExtBytes;
            long sequence = flush.sequence();
            long traceId = flush.traceId();
            this.channel.sourceSeq(sequence);
            if (flush.authorization() != this.channel.sourceAuth()) {
                this.sender.doReset(this.channel, traceId);
            }
            if ((flushExtBytes = (flushExt = flush.extension()).sizeof()) != 0) {
                DirectBuffer buffer = flushExt.buffer();
                int offset = flushExt.offset();
                byte[] flushExtCopy = new byte[flushExtBytes];
                buffer.getBytes(offset, flushExtCopy);
                this.channel.readExtBuffer(NukleusExtensionKind.FLUSH).writeBytes(flushExtCopy);
            }
            Channels.fireInputAdvised((Channel)this.channel, (Object)NukleusTypeSystem.ADVISORY_FLUSH);
        }

        private void verifyRouteId(long routeId) {
            if (routeId != this.channel.routeId()) {
                throw new IllegalStateException(String.format("routeId: expected %x actual %x", this.channel.routeId(), routeId));
            }
        }

        private ChannelBuffer readBuffer(DirectBuffer buffer, int index, int maxLimit) {
            byte[] array = new byte[maxLimit - index];
            buffer.getBytes(index, array);
            return ((NukleusChannelConfig)this.channel.getConfig()).getBufferFactory().getBuffer(array, 0, array.length);
        }
    }
}

