/*
 * Decompiled with CFR 0.152.
 */
package org.reaktivity.reaktor.test.internal.k3po.ext.behavior;

import java.nio.file.Path;
import java.util.function.IntFunction;
import java.util.function.LongFunction;
import java.util.function.LongSupplier;
import org.agrona.CloseHelper;
import org.agrona.collections.Long2ObjectHashMap;
import org.agrona.concurrent.MessageHandler;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelException;
import org.jboss.netty.channel.ChannelFuture;
import org.jboss.netty.channel.ChannelFutureListener;
import org.jboss.netty.channel.Channels;
import org.reaktivity.reaktor.internal.budget.DefaultBudgetCreditor;
import org.reaktivity.reaktor.internal.layouts.BudgetsLayout;
import org.reaktivity.reaktor.test.internal.k3po.ext.NukleusExtConfiguration;
import org.reaktivity.reaktor.test.internal.k3po.ext.behavior.NukleusChannel;
import org.reaktivity.reaktor.test.internal.k3po.ext.behavior.NukleusChannelConfig;
import org.reaktivity.reaktor.test.internal.k3po.ext.behavior.NukleusCorrelation;
import org.reaktivity.reaktor.test.internal.k3po.ext.behavior.NukleusPartition;
import org.reaktivity.reaktor.test.internal.k3po.ext.behavior.NukleusServerChannel;
import org.reaktivity.reaktor.test.internal.k3po.ext.behavior.NukleusStreamFactory;
import org.reaktivity.reaktor.test.internal.k3po.ext.behavior.NukleusTarget;
import org.reaktivity.reaktor.test.internal.k3po.ext.behavior.NukleusTransmission;
import org.reaktivity.reaktor.test.internal.k3po.ext.behavior.layout.StreamsLayout;
import org.reaktivity.reaktor.test.internal.k3po.ext.types.NukleusTypeSystem;
import org.reaktivity.reaktor.test.internal.k3po.ext.util.function.LongLongFunction;

public final class NukleusSource
implements AutoCloseable {
    private final Path streamsPath;
    private final NukleusStreamFactory streamFactory;
    private final LongSupplier supplyTraceId;
    private final NukleusPartition partition;
    private final Long2ObjectHashMap<Long2ObjectHashMap<NukleusServerChannel>> routesByIdAndAuth;
    private final DefaultBudgetCreditor creditor;

    public NukleusSource(NukleusExtConfiguration config, int scopeIndex, LongSupplier supplyTraceId, LongFunction<NukleusCorrelation> correlateEstablished, LongLongFunction<NukleusTarget> supplySender, IntFunction<NukleusTarget> supplyTarget, DefaultBudgetCreditor.BudgetFlusher flushWatchers, Long2ObjectHashMap<MessageHandler> streamsById, Long2ObjectHashMap<MessageHandler> throttlesById) {
        this.streamsPath = config.directory().resolve(String.format("data%d", scopeIndex));
        this.streamFactory = new NukleusStreamFactory(supplySender, arg_0 -> streamsById.remove(arg_0));
        this.routesByIdAndAuth = new Long2ObjectHashMap();
        BudgetsLayout budgets = new BudgetsLayout.Builder().path(config.directory().resolve(String.format("budgets%d", scopeIndex))).capacity(config.budgetsBufferCapacity()).owner(true).build();
        StreamsLayout streams = new StreamsLayout.Builder().path(this.streamsPath).streamsCapacity(config.streamsBufferCapacity()).readonly(false).build();
        this.supplyTraceId = supplyTraceId;
        this.partition = new NukleusPartition(this.streamsPath, scopeIndex, streams, this::lookupRoute, arg_0 -> streamsById.get(arg_0), (arg_0, arg_1) -> streamsById.put(arg_0, arg_1), arg_0 -> throttlesById.get(arg_0), this.streamFactory, correlateEstablished, supplySender, supplyTarget);
        this.creditor = new DefaultBudgetCreditor(scopeIndex, budgets, flushWatchers);
    }

    public String toString() {
        return String.format("%s [%s]", this.getClass().getSimpleName(), this.streamsPath);
    }

    public void doRoute(long routeId, long authorization, NukleusServerChannel serverChannel) {
        this.routesByAuth(routeId).put(authorization, (Object)serverChannel);
    }

    public void doUnroute(long routeId, long authorization, NukleusServerChannel serverChannel) {
        Long2ObjectHashMap channels = (Long2ObjectHashMap)this.routesByIdAndAuth.get(routeId);
        if (channels != null && channels.remove(authorization) != null && channels.isEmpty()) {
            this.routesByIdAndAuth.remove(routeId);
        }
    }

    public void doAdviseInput(NukleusChannel channel, ChannelFuture adviseFuture, Object value) {
        if (value == NukleusTypeSystem.ADVISORY_CHALLENGE) {
            long traceId = this.supplyTraceId.getAsLong();
            this.streamFactory.doChallenge(channel, traceId);
            adviseFuture.setSuccess();
        } else {
            adviseFuture.setFailure((Throwable)new ChannelException("unexpected: " + value));
        }
    }

    public void doAbortInput(final NukleusChannel channel, final ChannelFuture abortFuture) {
        ChannelFuture beginFuture;
        boolean isClientChannel = channel.getParent() == null;
        boolean isHalfDuplex = ((NukleusChannelConfig)channel.getConfig()).getTransmission() == NukleusTransmission.HALF_DUPLEX;
        ChannelFuture channelFuture = beginFuture = isClientChannel && isHalfDuplex ? channel.beginOutputFuture() : channel.beginInputFuture();
        if (beginFuture.isSuccess()) {
            this.doAbortInputAfterBegin(channel, abortFuture);
        } else {
            beginFuture.addListener(new ChannelFutureListener(){

                public void operationComplete(ChannelFuture future) throws Exception {
                    if (future.isSuccess()) {
                        NukleusSource.this.doAbortInputAfterBegin(channel, abortFuture);
                    } else {
                        abortFuture.setFailure(future.getCause());
                    }
                }
            });
        }
    }

    private void doAbortInputAfterBegin(NukleusChannel channel, ChannelFuture abortFuture) {
        long traceId = this.supplyTraceId.getAsLong();
        this.streamFactory.doReset(channel, traceId);
        this.partition.doSystemWindow(channel, traceId);
        abortFuture.setSuccess();
        if (channel.setReadAborted() && channel.setReadClosed()) {
            Channels.fireChannelDisconnected((Channel)channel);
            Channels.fireChannelUnbound((Channel)channel);
            Channels.fireChannelClosed((Channel)channel);
        }
    }

    public int process() {
        return this.partition.process();
    }

    @Override
    public void close() {
        CloseHelper.quietClose((AutoCloseable)this.creditor);
        this.partition.close();
    }

    Path streamsPath() {
        return this.streamsPath;
    }

    int scopeIndex() {
        return this.partition.scopeIndex();
    }

    DefaultBudgetCreditor creditor() {
        return this.creditor;
    }

    private NukleusServerChannel lookupRoute(long routeId, long authorization) {
        Long2ObjectHashMap<NukleusServerChannel> routesByAuth = this.routesByAuth(routeId);
        return (NukleusServerChannel)((Object)routesByAuth.get(authorization));
    }

    private Long2ObjectHashMap<NukleusServerChannel> routesByAuth(long routeId) {
        return (Long2ObjectHashMap)this.routesByIdAndAuth.computeIfAbsent(routeId, this::newRoutesByAuth);
    }

    private Long2ObjectHashMap<NukleusServerChannel> newRoutesByAuth(long routeId) {
        return new Long2ObjectHashMap();
    }
}

