/*
 * Decompiled with CFR 0.152.
 */
package org.reaktivity.reaktor.test.internal.k3po.ext.behavior;

import java.nio.file.Path;
import java.util.function.LongSupplier;
import java.util.function.ToIntFunction;
import org.agrona.CloseHelper;
import org.agrona.DirectBuffer;
import org.agrona.MutableDirectBuffer;
import org.agrona.collections.ArrayUtil;
import org.agrona.collections.Int2ObjectHashMap;
import org.agrona.collections.Long2ObjectHashMap;
import org.agrona.concurrent.MessageHandler;
import org.agrona.concurrent.UnsafeBuffer;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelFuture;
import org.jboss.netty.channel.Channels;
import org.jboss.netty.channel.MessageEvent;
import org.reaktivity.reaktor.internal.budget.DefaultBudgetCreditor;
import org.reaktivity.reaktor.internal.budget.DefaultBudgetDebitor;
import org.reaktivity.reaktor.internal.layouts.BudgetsLayout;
import org.reaktivity.reaktor.internal.router.BudgetId;
import org.reaktivity.reaktor.internal.types.stream.FlushFW;
import org.reaktivity.reaktor.internal.types.stream.WindowFW;
import org.reaktivity.reaktor.test.internal.k3po.ext.NukleusExtConfiguration;
import org.reaktivity.reaktor.test.internal.k3po.ext.behavior.LabelManager;
import org.reaktivity.reaktor.test.internal.k3po.ext.behavior.NukleusChannel;
import org.reaktivity.reaktor.test.internal.k3po.ext.behavior.NukleusChannelAddress;
import org.reaktivity.reaktor.test.internal.k3po.ext.behavior.NukleusChannelConfig;
import org.reaktivity.reaktor.test.internal.k3po.ext.behavior.NukleusClientChannel;
import org.reaktivity.reaktor.test.internal.k3po.ext.behavior.NukleusCorrelation;
import org.reaktivity.reaktor.test.internal.k3po.ext.behavior.NukleusServerChannel;
import org.reaktivity.reaktor.test.internal.k3po.ext.behavior.NukleusSource;
import org.reaktivity.reaktor.test.internal.k3po.ext.behavior.NukleusTarget;
import org.reaktivity.reaktor.test.internal.k3po.ext.behavior.NukleusTransmission;
import org.reaktivity.reaktor.test.internal.k3po.ext.behavior.layout.StreamsLayout;

public final class NukleusScope
implements AutoCloseable {
    private final WindowFW windowRO = new WindowFW();
    private final FlushFW flushRO = new FlushFW();
    private final Int2ObjectHashMap<NukleusTarget> targetsByIndex;
    private final Int2ObjectHashMap<DefaultBudgetDebitor> debitorsByIndex;
    private final NukleusExtConfiguration config;
    private final LabelManager labels;
    private final MutableDirectBuffer writeBuffer;
    private final Long2ObjectHashMap<MessageHandler> streamsById;
    private final Long2ObjectHashMap<MessageHandler> throttlesById;
    private final Long2ObjectHashMap<NukleusCorrelation> correlations;
    private final ToIntFunction<String> lookupTargetIndex;
    private final LongSupplier supplyTimestamp;
    private final LongSupplier supplyTraceId;
    private final NukleusSource source;
    private NukleusTarget[] targets = new NukleusTarget[0];

    public NukleusScope(NukleusExtConfiguration config, LabelManager labels, int scopeIndex, ToIntFunction<String> lookupTargetIndex, LongSupplier supplyTimestamp, LongSupplier supplyTraceId) {
        this.config = config;
        this.labels = labels;
        this.writeBuffer = new UnsafeBuffer(new byte[config.streamsBufferCapacity() / 8]);
        this.streamsById = new Long2ObjectHashMap();
        this.throttlesById = new Long2ObjectHashMap();
        this.correlations = new Long2ObjectHashMap();
        this.targetsByIndex = new Int2ObjectHashMap();
        this.debitorsByIndex = new Int2ObjectHashMap();
        this.lookupTargetIndex = lookupTargetIndex;
        this.supplyTimestamp = supplyTimestamp;
        this.supplyTraceId = supplyTraceId;
        this.source = new NukleusSource(config, labels, scopeIndex, supplyTraceId, arg_0 -> this.correlations.remove(arg_0), this::supplySender, this::supplyTarget, this::doSystemFlush, this.streamsById, this.throttlesById);
        this.streamsById.put(0L, this::onSystemMessage);
        this.throttlesById.put(0L, this::onSystemMessage);
    }

    public String toString() {
        return String.format("%s [%s]", this.getClass().getSimpleName(), this.source.streamsPath());
    }

    public void doRoute(String receiverAddress, long authorization, NukleusServerChannel serverChannel) {
        this.source.doRoute(receiverAddress, authorization, serverChannel);
    }

    public void doUnroute(String receiverAddress, long authorization, NukleusServerChannel serverChannel) {
        this.source.doUnroute(receiverAddress, authorization, serverChannel);
    }

    public void doConnect(NukleusClientChannel clientChannel, NukleusChannelAddress localAddress, NukleusChannelAddress remoteAddress, ChannelFuture connectFuture) {
        String receiverAddress = remoteAddress.getReceiverAddress();
        int targetIndex = this.lookupTargetIndex.applyAsInt(receiverAddress);
        NukleusTarget target = this.supplyTarget(targetIndex);
        clientChannel.setRemoteScope(targetIndex);
        clientChannel.routeId(this.routeId(remoteAddress));
        target.doConnect(clientChannel, localAddress, remoteAddress, connectFuture);
    }

    public void doConnectAbort(NukleusClientChannel clientChannel, NukleusChannelAddress remoteAddress) {
        String receiverAddress = remoteAddress.getReceiverAddress();
        int targetIndex = this.lookupTargetIndex.applyAsInt(receiverAddress);
        NukleusTarget target = this.supplyTarget(targetIndex);
        target.doConnectAbort(clientChannel);
    }

    public void doAbortOutput(NukleusChannel channel, ChannelFuture abortFuture) {
        NukleusTarget target = this.supplyTarget(channel);
        target.doAbortOutput(channel, abortFuture);
    }

    public void doAbortInput(NukleusChannel channel, ChannelFuture abortFuture) {
        this.source.doAbortInput(channel, abortFuture);
    }

    public void doWrite(NukleusChannel channel, MessageEvent writeRequest) {
        NukleusTarget target = this.supplyTarget(channel);
        target.doWrite(channel, writeRequest);
    }

    public void doFlush(NukleusChannel channel, ChannelFuture flushFuture) {
        NukleusTarget target = this.supplyTarget(channel);
        target.doFlush(channel, flushFuture);
    }

    public void doShutdownOutput(NukleusChannel channel, ChannelFuture shutdownFuture) {
        NukleusTarget target = this.supplyTarget(channel);
        target.doShutdownOutput(channel, shutdownFuture);
    }

    public void doClose(NukleusChannel channel, ChannelFuture handlerFuture) {
        boolean readClosed = channel.getCloseFuture().isDone() || channel.isReadClosed();
        NukleusTarget target = this.supplyTarget(channel);
        target.doClose(channel, handlerFuture);
        if (!readClosed && ((NukleusChannelConfig)channel.getConfig()).getTransmission() == NukleusTransmission.HALF_DUPLEX) {
            ChannelFuture abortFuture = Channels.future((Channel)channel);
            this.source.doAbortInput(channel, abortFuture);
            assert (abortFuture.isSuccess());
        }
    }

    public void doSystemFlush(NukleusChannel channel, ChannelFuture flushFuture) {
        NukleusTarget target = this.supplyTarget(channel);
        target.doSystemFlush(channel, flushFuture);
    }

    public int process() {
        return this.source.process();
    }

    @Override
    public void close() {
        CloseHelper.quietClose((AutoCloseable)this.source);
        for (NukleusTarget target : this.targetsByIndex.values()) {
            CloseHelper.quietClose((AutoCloseable)target);
        }
        for (DefaultBudgetDebitor debitor : this.debitorsByIndex.values()) {
            CloseHelper.quietClose((AutoCloseable)debitor);
        }
    }

    public NukleusTarget supplySender(long routeId, long streamId) {
        int targetIndex = NukleusScope.replyToIndex(streamId);
        return this.supplyTarget(targetIndex);
    }

    public DefaultBudgetDebitor supplyDebitor(long budgetId) {
        int ownerIndex = BudgetId.ownerIndex((long)budgetId);
        return (DefaultBudgetDebitor)this.debitorsByIndex.computeIfAbsent(ownerIndex, this::newDebitor);
    }

    public DefaultBudgetCreditor creditor() {
        return this.source.creditor();
    }

    private DefaultBudgetDebitor newDebitor(int ownerIndex) {
        int watcherIndex = this.source.scopeIndex();
        BudgetsLayout layout = new BudgetsLayout.Builder().path(this.config.directory().resolve(String.format("budgets%d", ownerIndex))).owner(false).build();
        return new DefaultBudgetDebitor(watcherIndex, ownerIndex, layout);
    }

    private void onSystemMessage(int msgTypeId, DirectBuffer buffer, int index, int length) {
        switch (msgTypeId) {
            case 0x40000002: {
                WindowFW window = this.windowRO.wrap(buffer, index, index + length);
                this.onSystemWindow(window);
                break;
            }
            case 5: {
                FlushFW flush = this.flushRO.wrap(buffer, index, index + length);
                this.onSystemFlush(flush);
            }
        }
    }

    private void onSystemWindow(WindowFW window) {
        long traceId = window.traceId();
        long budgetId = window.budgetId();
        int credit = window.credit();
        this.creditor().creditById(traceId, budgetId, (long)credit);
    }

    private void onSystemFlush(FlushFW flush) {
        long traceId = flush.traceId();
        long budgetId = flush.budgetId();
        int ownerIndex = BudgetId.ownerIndex((long)budgetId);
        DefaultBudgetDebitor debitor = (DefaultBudgetDebitor)this.debitorsByIndex.get(ownerIndex);
        if (debitor != null) {
            debitor.flush(traceId, budgetId);
        }
    }

    private void doSystemFlush(long traceId, long budgetId, long watchers) {
        for (int watcherIndex = 0; watcherIndex < 64; ++watcherIndex) {
            if ((watchers & 1L << watcherIndex) == 0L) continue;
            NukleusTarget target = this.supplyTarget(watcherIndex);
            target.doSystemFlush(traceId, budgetId);
        }
    }

    private NukleusTarget supplyTarget(NukleusChannel channel) {
        return this.supplyTarget(channel.getRemoteScope());
    }

    private NukleusTarget supplyTarget(int targetIndex) {
        return (NukleusTarget)this.targetsByIndex.computeIfAbsent(targetIndex, this::newTarget);
    }

    private NukleusTarget newTarget(int targetIndex) {
        Path targetPath = this.config.directory().resolve(String.format("data%d", targetIndex));
        StreamsLayout layout = new StreamsLayout.Builder().path(targetPath).readonly(true).build();
        NukleusTarget target = new NukleusTarget(this.source.scopeIndex(), targetPath, layout, this.writeBuffer, (arg_0, arg_1) -> this.throttlesById.put(arg_0, arg_1), arg_0 -> this.throttlesById.remove(arg_0), (arg_0, arg_1) -> this.correlations.put(arg_0, arg_1), this.supplyTimestamp, this.supplyTraceId);
        this.targets = (NukleusTarget[])ArrayUtil.add((Object[])this.targets, (Object)target);
        return target;
    }

    private long routeId(NukleusChannelAddress remoteAddress) {
        long localId = this.labels.supplyLabelId(remoteAddress.getSenderAddress());
        long remoteId = this.labels.supplyLabelId(remoteAddress.getReceiverAddress());
        return localId << 48 | remoteId << 32 | 0xF0000000L | (long)System.identityHashCode((Object)remoteAddress) & 0xFFFFFFFL;
    }

    private static int replyToIndex(long streamId) {
        return NukleusScope.isInitial(streamId) ? NukleusScope.localIndex(streamId) : NukleusScope.remoteIndex(streamId);
    }

    private static int localIndex(long streamId) {
        return (int)(streamId >> 56) & 0x7F;
    }

    private static int remoteIndex(long streamId) {
        return (int)(streamId >> 48) & 0x7F;
    }

    private static boolean isInitial(long streamId) {
        return (streamId & 1L) != 0L;
    }
}

