/*
 * Decompiled with CFR 0.152.
 */
package org.reaktivity.reaktor.test.internal.k3po.ext.behavior;

import java.net.SocketAddress;
import java.nio.file.Path;
import java.util.function.IntFunction;
import java.util.function.LongFunction;
import org.agrona.DirectBuffer;
import org.agrona.LangUtil;
import org.agrona.MutableDirectBuffer;
import org.agrona.concurrent.MessageHandler;
import org.agrona.concurrent.ringbuffer.RingBuffer;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelFactory;
import org.jboss.netty.channel.ChannelFuture;
import org.jboss.netty.channel.ChannelPipeline;
import org.jboss.netty.channel.ChannelPipelineFactory;
import org.jboss.netty.channel.ChannelSink;
import org.jboss.netty.channel.Channels;
import org.kaazing.k3po.driver.internal.behavior.handler.RejectedHandler;
import org.reaktivity.reaktor.internal.budget.DefaultBudgetCreditor;
import org.reaktivity.reaktor.internal.stream.BudgetId;
import org.reaktivity.reaktor.internal.types.stream.DataFW;
import org.reaktivity.reaktor.test.internal.k3po.ext.behavior.NukleusChannel;
import org.reaktivity.reaktor.test.internal.k3po.ext.behavior.NukleusChannelAddress;
import org.reaktivity.reaktor.test.internal.k3po.ext.behavior.NukleusChannelConfig;
import org.reaktivity.reaktor.test.internal.k3po.ext.behavior.NukleusChildChannel;
import org.reaktivity.reaktor.test.internal.k3po.ext.behavior.NukleusChildChannelSink;
import org.reaktivity.reaktor.test.internal.k3po.ext.behavior.NukleusClientChannel;
import org.reaktivity.reaktor.test.internal.k3po.ext.behavior.NukleusCorrelation;
import org.reaktivity.reaktor.test.internal.k3po.ext.behavior.NukleusExtensionKind;
import org.reaktivity.reaktor.test.internal.k3po.ext.behavior.NukleusServerChannel;
import org.reaktivity.reaktor.test.internal.k3po.ext.behavior.NukleusServerChannelConfig;
import org.reaktivity.reaktor.test.internal.k3po.ext.behavior.NukleusStreamFactory;
import org.reaktivity.reaktor.test.internal.k3po.ext.behavior.NukleusTarget;
import org.reaktivity.reaktor.test.internal.k3po.ext.behavior.NukleusTransmission;
import org.reaktivity.reaktor.test.internal.k3po.ext.behavior.layout.StreamsLayout;
import org.reaktivity.reaktor.test.internal.k3po.ext.types.OctetsFW;
import org.reaktivity.reaktor.test.internal.k3po.ext.types.stream.BeginFW;
import org.reaktivity.reaktor.test.internal.k3po.ext.types.stream.FrameFW;
import org.reaktivity.reaktor.test.internal.k3po.ext.util.function.LongLongFunction;
import org.reaktivity.reaktor.test.internal.k3po.ext.util.function.LongObjectBiConsumer;

final class NukleusPartition
implements AutoCloseable {
    private final FrameFW frameRO = new FrameFW();
    private final BeginFW beginRO = new BeginFW();
    private final DataFW dataRO = new DataFW();
    private final Path streamsPath;
    private final int scopeIndex;
    private final StreamsLayout layout;
    private final RingBuffer streamsBuffer;
    private final LongLongFunction<NukleusServerChannel> lookupRoute;
    private final LongFunction<MessageHandler> lookupStream;
    private final LongFunction<MessageHandler> lookupThrottle;
    private final MessageHandler streamHandler;
    private final LongObjectBiConsumer<MessageHandler> registerStream;
    private final NukleusStreamFactory streamFactory;
    private final LongFunction<NukleusCorrelation> correlateEstablished;
    private final LongLongFunction<NukleusTarget> supplySender;
    private final IntFunction<NukleusTarget> supplyTarget;

    NukleusPartition(Path streamsPath, int scopeIndex, StreamsLayout layout, LongLongFunction<NukleusServerChannel> lookupRoute, LongFunction<MessageHandler> lookupStream, LongObjectBiConsumer<MessageHandler> registerStream, LongFunction<MessageHandler> lookupThrottle, NukleusStreamFactory streamFactory, LongFunction<NukleusCorrelation> correlateEstablished, LongLongFunction<NukleusTarget> supplySender, IntFunction<NukleusTarget> supplyTarget) {
        this.streamsPath = streamsPath;
        this.scopeIndex = scopeIndex;
        this.layout = layout;
        this.streamsBuffer = layout.streamsBuffer();
        this.lookupRoute = lookupRoute;
        this.lookupStream = lookupStream;
        this.lookupThrottle = lookupThrottle;
        this.registerStream = registerStream;
        this.streamHandler = this::handleStream;
        this.streamFactory = streamFactory;
        this.correlateEstablished = correlateEstablished;
        this.supplySender = supplySender;
        this.supplyTarget = supplyTarget;
    }

    public int process() {
        return this.streamsBuffer.read(this.streamHandler);
    }

    @Override
    public void close() {
        this.layout.close();
    }

    public String toString() {
        return String.format("%s [%s]", this.getClass().getSimpleName(), this.streamsPath);
    }

    void doSystemWindow(NukleusChannel channel, long traceId) {
        int pendingSharedBudget = channel.pendingSharedBudget();
        if (pendingSharedBudget != 0) {
            long budgetId = channel.creditorId();
            assert (budgetId != 0L);
            int ownerIndex = BudgetId.ownerIndex((long)budgetId);
            NukleusTarget target = this.supplyTarget.apply(ownerIndex);
            target.doSystemWindow(traceId, budgetId, pendingSharedBudget);
        }
    }

    int scopeIndex() {
        return this.scopeIndex;
    }

    private void handleStream(int msgTypeId, MutableDirectBuffer buffer, int index, int length) {
        FrameFW frame = this.frameRO.wrap((DirectBuffer)buffer, index, index + length);
        long streamId = frame.streamId();
        if ((msgTypeId & 0x40000000) != 0) {
            MessageHandler handler = this.lookupThrottle.apply(streamId);
            if (handler != null) {
                handler.onMessage(msgTypeId, buffer, index, length);
            }
        } else {
            MessageHandler handler = this.lookupStream.apply(streamId);
            if (handler != null) {
                handler.onMessage(msgTypeId, buffer, index, length);
            } else {
                this.handleUnrecognized(msgTypeId, buffer, index, length);
            }
        }
    }

    private void handleUnrecognized(int msgTypeId, MutableDirectBuffer buffer, int index, int length) {
        switch (msgTypeId) {
            case 1: {
                BeginFW begin = this.beginRO.wrap((DirectBuffer)buffer, index, index + length);
                this.handleBegin(begin);
                break;
            }
            case 2: {
                DataFW data = this.dataRO.wrap((DirectBuffer)buffer, index, index + length);
                long traceId = data.traceId();
                long budgetId = data.budgetId();
                if (budgetId == 0L) break;
                int reserved = data.reserved();
                int ownerIndex = BudgetId.ownerIndex((long)budgetId);
                NukleusTarget target = this.supplyTarget.apply(ownerIndex);
                target.doSystemWindow(traceId, budgetId, reserved);
            }
        }
    }

    private void handleBegin(BeginFW begin) {
        long routeId = begin.routeId();
        long streamId = begin.streamId();
        long sequence = begin.sequence();
        long acknowledge = begin.acknowledge();
        long traceId = begin.traceId();
        long authorization = begin.authorization();
        int maximum = begin.maximum();
        if ((streamId & 1L) != 0L) {
            NukleusServerChannel serverChannel = this.lookupRoute.apply(routeId, authorization);
            if (serverChannel != null) {
                this.handleBeginInitial(begin, serverChannel);
            } else {
                this.supplySender.apply(routeId, streamId).doReset(routeId, streamId, sequence, acknowledge, traceId, maximum);
            }
        } else {
            this.handleBeginReply(begin);
        }
    }

    private void handleBeginInitial(BeginFW begin, NukleusServerChannel serverChannel) {
        long routeId = begin.routeId();
        long initialId = begin.streamId();
        long sequence = begin.sequence();
        long acknowledge = begin.acknowledge();
        long traceId = begin.traceId();
        int maximum = begin.maximum();
        long replyId = initialId & 0xFFFFFFFFFFFFFFFEL;
        NukleusChildChannel childChannel = this.doAccept(serverChannel, routeId, initialId, replyId);
        NukleusTarget sender = this.supplySender.apply(routeId, initialId);
        ChannelPipeline pipeline = childChannel.getPipeline();
        if (pipeline.get(RejectedHandler.class) != null) {
            OctetsFW beginExt = begin.extension();
            int beginExtBytes = beginExt.sizeof();
            if (beginExtBytes != 0) {
                DirectBuffer buffer = beginExt.buffer();
                int offset = beginExt.offset();
                byte[] beginExtCopy = new byte[beginExtBytes];
                buffer.getBytes(offset, beginExtCopy);
                childChannel.readExtBuffer(NukleusExtensionKind.BEGIN).writeBytes(beginExtCopy);
            }
            childChannel.setWriteClosed();
            Channels.fireChannelBound((Channel)childChannel, (SocketAddress)((Object)childChannel.getLocalAddress()));
            sender.doReset(routeId, initialId, sequence, acknowledge, traceId, maximum);
            childChannel.setReadClosed();
        } else {
            ChannelFuture beginFuture = Channels.future((Channel)childChannel);
            ChannelFuture windowFuture = Channels.future((Channel)childChannel);
            MessageHandler newStream = this.streamFactory.newStream(childChannel, sender, beginFuture);
            this.registerStream.accept(initialId, newStream);
            newStream.onMessage(begin.typeId(), (MutableDirectBuffer)begin.buffer(), begin.offset(), begin.sizeof());
            Channels.fireChannelBound((Channel)childChannel, (SocketAddress)((Object)childChannel.getLocalAddress()));
            ChannelFuture handshakeFuture = beginFuture;
            sender.doPrepareReply(childChannel, windowFuture, handshakeFuture);
            NukleusChannelConfig childConfig = (NukleusChannelConfig)childChannel.getConfig();
            switch (childConfig.getTransmission()) {
                case DUPLEX: {
                    sender.doBeginReply(childChannel);
                    break;
                }
                default: {
                    windowFuture.setSuccess();
                }
            }
            Channels.fireChannelConnected((Channel)childChannel, (SocketAddress)((Object)childChannel.getRemoteAddress()));
        }
    }

    private void handleBeginReply(BeginFW begin) {
        long routeId = begin.routeId();
        long replyId = begin.streamId();
        long sequence = begin.sequence();
        long acknowledge = begin.acknowledge();
        long traceId = begin.traceId();
        int maximum = begin.maximum();
        NukleusCorrelation correlation = this.correlateEstablished.apply(replyId);
        NukleusTarget sender = this.supplySender.apply(routeId, replyId);
        if (correlation != null) {
            ChannelFuture beginFuture = correlation.correlatedFuture();
            NukleusClientChannel clientChannel = (NukleusClientChannel)beginFuture.getChannel();
            MessageHandler newStream = this.streamFactory.newStream(clientChannel, sender, beginFuture);
            this.registerStream.accept(replyId, newStream);
            newStream.onMessage(begin.typeId(), (MutableDirectBuffer)begin.buffer(), begin.offset(), begin.sizeof());
        } else {
            sender.doReset(routeId, replyId, sequence, acknowledge, traceId, maximum);
        }
    }

    private NukleusChildChannel doAccept(NukleusServerChannel serverChannel, long routeId, long initialId, long replyId) {
        try {
            NukleusServerChannelConfig serverConfig = (NukleusServerChannelConfig)serverChannel.getConfig();
            ChannelPipelineFactory pipelineFactory = serverConfig.getPipelineFactory();
            ChannelPipeline pipeline = pipelineFactory.getPipeline();
            NukleusChannelAddress serverAddress = serverChannel.getLocalAddress();
            NukleusChannelAddress remoteAddress = serverAddress.newEphemeralAddress();
            ChannelFactory channelFactory = serverChannel.getFactory();
            NukleusChildChannelSink childSink = new NukleusChildChannelSink();
            NukleusChildChannel childChannel = new NukleusChildChannel(serverChannel, channelFactory, pipeline, (ChannelSink)childSink, initialId, replyId);
            NukleusChannelConfig childConfig = (NukleusChannelConfig)childChannel.getConfig();
            childConfig.setBufferFactory(serverConfig.getBufferFactory());
            childConfig.setTransmission(serverConfig.getTransmission());
            childConfig.setThrottle(serverConfig.getThrottle());
            childConfig.setWindow(serverConfig.getWindow());
            childConfig.setBudgetId(serverConfig.getBudgetId());
            childConfig.setPadding(serverConfig.getPadding());
            childConfig.setAlignment(serverConfig.getAlignment());
            childConfig.setCapabilities(serverConfig.getCapabilities());
            if (childConfig.getTransmission() == NukleusTransmission.SIMPLEX) {
                childChannel.setWriteClosed();
            }
            childChannel.routeId(routeId);
            childChannel.setLocalAddress(serverAddress);
            childChannel.setRemoteAddress(remoteAddress);
            long budgetId = childConfig.getBudgetId();
            if (budgetId != 0L) {
                long creditorId = budgetId | BudgetId.budgetMask((int)this.scopeIndex);
                DefaultBudgetCreditor creditor = serverChannel.reaktor.supplyCreditor(childChannel);
                childChannel.setCreditor(creditor, creditorId);
                int sharedWindow = childConfig.getSharedWindow();
                creditor.creditById(0L, budgetId, (long)sharedWindow);
            }
            return childChannel;
        }
        catch (Exception ex) {
            LangUtil.rethrowUnchecked((Throwable)ex);
            return null;
        }
    }
}

