/*
 * Decompiled with CFR 0.152.
 */
package org.reaktivity.reaktor.internal.agent;

import java.net.InetAddress;
import java.util.BitSet;
import java.util.EnumMap;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.FutureTask;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.Function;
import java.util.function.IntConsumer;
import java.util.function.IntFunction;
import java.util.function.LongConsumer;
import java.util.function.LongFunction;
import java.util.function.LongSupplier;
import java.util.function.Supplier;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.agrona.CloseHelper;
import org.agrona.DeadlineTimerWheel;
import org.agrona.DirectBuffer;
import org.agrona.MutableDirectBuffer;
import org.agrona.collections.ArrayUtil;
import org.agrona.collections.Int2ObjectHashMap;
import org.agrona.collections.Long2ObjectHashMap;
import org.agrona.concurrent.Agent;
import org.agrona.concurrent.AgentTerminationException;
import org.agrona.concurrent.MessageHandler;
import org.agrona.concurrent.UnsafeBuffer;
import org.agrona.concurrent.ringbuffer.RingBuffer;
import org.agrona.concurrent.status.AtomicCounter;
import org.agrona.concurrent.status.CountersManager;
import org.agrona.hints.ThreadHints;
import org.reaktivity.nukleus.AgentBuilder;
import org.reaktivity.nukleus.Elektron;
import org.reaktivity.nukleus.Nukleus;
import org.reaktivity.nukleus.budget.BudgetDebitor;
import org.reaktivity.nukleus.buffer.BufferPool;
import org.reaktivity.nukleus.buffer.CountingBufferPool;
import org.reaktivity.nukleus.concurrent.Signaler;
import org.reaktivity.nukleus.concurrent.SignalingExecutor;
import org.reaktivity.nukleus.function.MessageConsumer;
import org.reaktivity.nukleus.function.MessageFunction;
import org.reaktivity.nukleus.function.MessagePredicate;
import org.reaktivity.nukleus.route.Address;
import org.reaktivity.nukleus.route.AddressFactory;
import org.reaktivity.nukleus.route.AddressFactoryBuilder;
import org.reaktivity.nukleus.route.RouteKind;
import org.reaktivity.nukleus.route.RouteManager;
import org.reaktivity.nukleus.stream.StreamFactory;
import org.reaktivity.nukleus.stream.StreamFactoryBuilder;
import org.reaktivity.reaktor.ReaktorConfiguration;
import org.reaktivity.reaktor.internal.Counters;
import org.reaktivity.reaktor.internal.LabelManager;
import org.reaktivity.reaktor.internal.budget.DefaultBudgetCreditor;
import org.reaktivity.reaktor.internal.budget.DefaultBudgetDebitor;
import org.reaktivity.reaktor.internal.layouts.BudgetsLayout;
import org.reaktivity.reaktor.internal.layouts.BufferPoolLayout;
import org.reaktivity.reaktor.internal.layouts.MetricsLayout;
import org.reaktivity.reaktor.internal.layouts.StreamsLayout;
import org.reaktivity.reaktor.internal.router.BudgetId;
import org.reaktivity.reaktor.internal.router.Resolver;
import org.reaktivity.reaktor.internal.router.RouteId;
import org.reaktivity.reaktor.internal.router.StreamId;
import org.reaktivity.reaktor.internal.router.Target;
import org.reaktivity.reaktor.internal.router.WriteCounters;
import org.reaktivity.reaktor.internal.types.Flyweight;
import org.reaktivity.reaktor.internal.types.OctetsFW;
import org.reaktivity.reaktor.internal.types.control.Role;
import org.reaktivity.reaktor.internal.types.control.RouteFW;
import org.reaktivity.reaktor.internal.types.control.UnrouteFW;
import org.reaktivity.reaktor.internal.types.stream.AbortFW;
import org.reaktivity.reaktor.internal.types.stream.BeginFW;
import org.reaktivity.reaktor.internal.types.stream.DataFW;
import org.reaktivity.reaktor.internal.types.stream.ExtensionFW;
import org.reaktivity.reaktor.internal.types.stream.FlushFW;
import org.reaktivity.reaktor.internal.types.stream.FrameFW;
import org.reaktivity.reaktor.internal.types.stream.ReaktorSignalExFW;
import org.reaktivity.reaktor.internal.types.stream.ResetFW;
import org.reaktivity.reaktor.internal.types.stream.SignalFW;
import org.reaktivity.reaktor.internal.types.stream.WindowFW;

public class ElektronAgent
implements Agent {
    private static final Pattern ADDRESS_PATTERN = Pattern.compile("^([^#]+)(:?#.*)$");
    private static final int SYSTEM_SIGNAL_ROUTED = 1;
    private static final int SYSTEM_SIGNAL_UNROUTED = 2;
    private final FrameFW frameRO = new FrameFW();
    private final BeginFW beginRO = new BeginFW();
    private final DataFW dataRO = new DataFW();
    private final FlushFW flushRO = new FlushFW();
    private final WindowFW windowRO = new WindowFW();
    private final SignalFW signalRO = new SignalFW();
    private final ExtensionFW extensionRO = new ExtensionFW();
    private final ReaktorSignalExFW reaktorSignalExRO = new ReaktorSignalExFW();
    private final AbortFW.Builder abortRW = new AbortFW.Builder();
    private final FlushFW.Builder flushRW = new FlushFW.Builder();
    private final ResetFW.Builder resetRW = new ResetFW.Builder();
    private final WindowFW.Builder windowRW = new WindowFW.Builder();
    private final SignalFW.Builder signalRW = new SignalFW.Builder();
    private final ReaktorSignalExFW.Builder reaktorSignalExRW = new ReaktorSignalExFW.Builder();
    private final int reaktorTypeId;
    private final int localIndex;
    private final ReaktorConfiguration config;
    private final LabelManager labels;
    private final Function<String, BitSet> affinityMask;
    private final String elektronName;
    private final Counters counters;
    private final Function<String, InetAddress[]> resolveHost;
    private final boolean timestamps;
    private final MetricsLayout metricsLayout;
    private final StreamsLayout streamsLayout;
    private final BufferPoolLayout bufferPoolLayout;
    private final RingBuffer streamsBuffer;
    private final MutableDirectBuffer writeBuffer;
    private final MutableDirectBuffer signalBuffer;
    private final Int2ObjectHashMap<MessageConsumer>[] streams;
    private final Int2ObjectHashMap<MessageConsumer>[] throttles;
    private final Long2ObjectHashMap<ReadCounters> countersByRouteId;
    private final Int2ObjectHashMap<MessageConsumer> writersByIndex;
    private final Int2ObjectHashMap<Target> targetsByIndex;
    private final Map<String, ElektronRef> elektronByName;
    private final BufferPool bufferPool;
    private final int shift;
    private final long mask;
    private final MessageHandler readHandler;
    private final DeadlineTimerWheel.TimerHandler expireHandler;
    private final int readLimit;
    private final int expireLimit;
    private final LongFunction<? extends ReadCounters> newReadCounters;
    private final IntFunction<MessageConsumer> supplyWriter;
    private final IntFunction<Target> newTarget;
    private final LongFunction<WriteCounters> newWriteCounters;
    private final LongFunction<Affinity> resolveAffinity;
    private final Long2ObjectHashMap<Address> addressesByRouteId;
    private final RouteManager resolver;
    private final DefaultBudgetCreditor creditor;
    private final Int2ObjectHashMap<DefaultBudgetDebitor> debitorsByIndex;
    private final Int2ObjectHashMap<StreamFactory> streamFactoriesByAddressId;
    private final Long2ObjectHashMap<Affinity> affinityByRemoteId;
    private final Supplier<DirectBuffer> routesBufferRef;
    private final DeadlineTimerWheel timerWheel;
    private final Long2ObjectHashMap<Runnable> tasksByTimerId;
    private final Long2ObjectHashMap<Future<?>> futuresById;
    private final SignalingExecutor executor;
    private final ElektronSignaler signaler;
    private long streamId;
    private long traceId;
    private long budgetId;
    private volatile Agent[] agents;
    private long lastReadStreamId;

    public ElektronAgent(int index, int count, ReaktorConfiguration config, LabelManager labels, ExecutorService executorService, Function<String, BitSet> affinityMask, Supplier<DirectBuffer> routesBufferRef, Supplier<AgentBuilder> supplyAgentBuilder) {
        this.reaktorTypeId = labels.supplyLabelId(config.name());
        this.localIndex = index;
        this.config = config;
        this.labels = labels;
        this.affinityMask = affinityMask;
        this.routesBufferRef = routesBufferRef;
        MetricsLayout metricsLayout = new MetricsLayout.Builder().path(config.directory().resolve(String.format("metrics%d", index))).labelsBufferCapacity(config.counterLabelsBufferCapacity()).valuesBufferCapacity(config.counterValuesBufferCapacity()).readonly(false).build();
        StreamsLayout streamsLayout = new StreamsLayout.Builder().path(config.directory().resolve(String.format("data%d", index))).streamsCapacity(config.streamsBufferCapacity()).readonly(false).build();
        BufferPoolLayout bufferPoolLayout = new BufferPoolLayout.Builder().path(config.directory().resolve(String.format("buffers%d", index))).slotCapacity(config.bufferSlotCapacity()).slotCount(config.bufferPoolCapacity() / config.bufferSlotCapacity()).readonly(false).build();
        this.elektronName = String.format("reaktor/data#%d", index);
        this.metricsLayout = metricsLayout;
        this.streamsLayout = streamsLayout;
        this.bufferPoolLayout = bufferPoolLayout;
        CountersManager countersManager = new CountersManager(metricsLayout.labelsBuffer(), metricsLayout.valuesBuffer());
        this.counters = new Counters(countersManager);
        this.resolveHost = config.hostResolver();
        this.timestamps = config.timestamps();
        this.readLimit = config.maximumMessagesPerRead();
        this.expireLimit = config.maximumExpirationsPerPoll();
        this.streamsBuffer = streamsLayout.streamsBuffer();
        this.writeBuffer = new UnsafeBuffer(new byte[config.bufferSlotCapacity() + 1024]);
        this.signalBuffer = new UnsafeBuffer(new byte[config.bufferSlotCapacity() + 1024]);
        this.streams = this.initDispatcher();
        this.throttles = this.initDispatcher();
        this.countersByRouteId = new Long2ObjectHashMap();
        this.streamFactoriesByAddressId = new Int2ObjectHashMap();
        this.readHandler = this::handleRead;
        this.expireHandler = this::handleExpire;
        this.newReadCounters = this::newReadCounters;
        this.supplyWriter = this::supplyWriter;
        this.newTarget = this::newTarget;
        this.newWriteCounters = this::newWriteCounters;
        this.resolveAffinity = this::resolveAffinity;
        this.addressesByRouteId = new Long2ObjectHashMap();
        this.elektronByName = new ConcurrentHashMap<String, ElektronRef>();
        this.affinityByRemoteId = new Long2ObjectHashMap();
        this.targetsByIndex = new Int2ObjectHashMap();
        this.writersByIndex = new Int2ObjectHashMap();
        this.agents = new Agent[0];
        this.timerWheel = new DeadlineTimerWheel(TimeUnit.MILLISECONDS, System.currentTimeMillis(), 512L, 1024);
        this.tasksByTimerId = new Long2ObjectHashMap();
        this.futuresById = new Long2ObjectHashMap();
        this.executor = new ElektronExecutor(executorService);
        this.signaler = new ElektronSignaler(executorService);
        this.resolver = new ResolverRef(this::newResolver);
        BufferPool bufferPool = bufferPoolLayout.bufferPool();
        int reserved = 8;
        int shift = 56;
        long initial = (long)index << 56;
        long mask = initial | 0xFFFFFFFFFFFFFFL;
        this.shift = 56;
        this.mask = mask;
        this.bufferPool = bufferPool;
        this.streamId = initial;
        this.traceId = initial;
        this.budgetId = initial;
        BudgetsLayout budgetsLayout = new BudgetsLayout.Builder().path(config.directory().resolve(String.format("budgets%d", index))).capacity(config.budgetsBufferCapacity()).owner(true).build();
        this.creditor = new DefaultBudgetCreditor(index, budgetsLayout, this::doSystemFlush, this::supplyBudgetId, this.signaler::executeTaskAt, config.childCleanupLingerMillis());
        this.debitorsByIndex = new Int2ObjectHashMap();
        if (supplyAgentBuilder != null) {
            AgentBuilder agentBuilder = supplyAgentBuilder.get();
            Agent agent = agentBuilder.setRouteManager(this.resolver).setExecutor(this.executor).setWriteBuffer(this.writeBuffer).setAddressIdSupplier(labels::supplyLabelId).setStreamFactorySupplier(this::supplyStreamFactory).setThrottleSupplier(this::supplyThrottle).setThrottleRemover(this::removeThrottle).setInitialIdSupplier(this::supplyInitialId).setReplyIdSupplier(this::supplyReplyId).setTraceIdSupplier(this::supplyTraceId).setGroupIdSupplier(this::supplyBudgetId).setBufferPool(bufferPool).build();
            this.agents = (Agent[])ArrayUtil.add((Object[])this.agents, (Object)agent);
        }
    }

    private void onSystemMessage(int msgTypeId, DirectBuffer buffer, int index, int length) {
        switch (msgTypeId) {
            case 5: {
                FlushFW flush = this.flushRO.wrap(buffer, index, index + length);
                this.onSystemFlush(flush);
                break;
            }
            case 0x40000002: {
                WindowFW window = this.windowRO.wrap(buffer, index, index + length);
                this.onSystemWindow(window);
                break;
            }
            case 0x40000003: {
                SignalFW signal = this.signalRO.wrap(buffer, index, index + length);
                this.onSystemSignal(signal);
            }
        }
    }

    private void onSystemFlush(FlushFW flush) {
        long traceId = flush.traceId();
        long budgetId = flush.budgetId();
        int ownerIndex = BudgetId.ownerIndex(budgetId);
        DefaultBudgetDebitor debitor = (DefaultBudgetDebitor)this.debitorsByIndex.get(ownerIndex);
        if (ReaktorConfiguration.DEBUG_BUDGETS) {
            System.out.format("[%d] [0x%016x] [0x%016x] FLUSH %08x %s\n", System.nanoTime(), traceId, budgetId, ownerIndex, debitor);
        }
        if (debitor != null) {
            debitor.flush(traceId, budgetId);
        }
    }

    private void onSystemWindow(WindowFW window) {
        long traceId = window.traceId();
        long budgetId = window.budgetId();
        int reserved = window.maximum();
        this.creditor.creditById(traceId, budgetId, reserved);
        long parentBudgetId = this.creditor.parentBudgetId(budgetId);
        if (parentBudgetId != 0L) {
            this.doSystemWindowIfNecessary(traceId, parentBudgetId, reserved);
        }
    }

    private void onSystemSignal(SignalFW signal) {
        int signalId = signal.signalId();
        switch (signalId) {
            case 1: {
                this.onSystemRoutedSignal(signal);
                break;
            }
            case 2: {
                this.onSystemUnroutedSignal(signal);
            }
        }
    }

    private void onSystemRoutedSignal(SignalFW signal) {
        long routeId = signal.routeId();
        OctetsFW extension = signal.extension();
        ExtensionFW signalEx = extension.get(this.extensionRO::wrap);
        assert (signalEx.typeId() == this.reaktorTypeId);
        ReaktorSignalExFW reaktorSignalEx = extension.get(this.reaktorSignalExRO::wrap);
        assert (reaktorSignalEx.kind() == 1);
        RouteFW route = reaktorSignalEx.route();
        RouteKind routeKind = RouteKind.valueOf(route.role().get().ordinal());
        String nukleusName = route.nukleus().asString();
        String localName = route.localAddress().asString();
        ElektronRef elektronRef = this.elektronByName.get(nukleusName);
        AddressFactory addressFactory = (AddressFactory)elektronRef.addressFactories.get((Object)routeKind);
        if (addressFactory != null) {
            Address newAddress = addressFactory.newAddress(localName);
            assert (nukleusName.equals(newAddress.nukleus()));
            this.addressesByRouteId.put(routeId, (Object)newAddress);
            MessageConsumer routeHandler = newAddress.routeHandler();
            assert (routeHandler != null);
            routeHandler.accept(route.typeId(), route.buffer(), route.offset(), route.sizeof());
        }
    }

    private void onSystemUnroutedSignal(SignalFW signal) {
        long routeId = signal.routeId();
        Address address = (Address)this.addressesByRouteId.remove(routeId);
        if (address != null) {
            OctetsFW extension = signal.extension();
            ExtensionFW signalEx = extension.get(this.extensionRO::wrap);
            assert (signalEx.typeId() == this.reaktorTypeId);
            ReaktorSignalExFW reaktorSignalEx = extension.get(this.reaktorSignalExRO::wrap);
            assert (reaktorSignalEx.kind() == 2);
            UnrouteFW unroute = reaktorSignalEx.unroute();
            MessageConsumer routeHandler = address.routeHandler();
            assert (routeHandler != null);
            routeHandler.accept(unroute.typeId(), unroute.buffer(), unroute.offset(), unroute.sizeof());
        }
    }

    private void doSystemFlush(long traceId, long budgetId, long watchers) {
        for (int watcherIndex = 0; watcherIndex < 64; ++watcherIndex) {
            if ((watchers & 1L << watcherIndex) == 0L) continue;
            if (ReaktorConfiguration.DEBUG_BUDGETS) {
                System.out.format("[%d] [0x%016x] [0x%016x] flush %d\n", System.nanoTime(), traceId, budgetId, watcherIndex);
            }
            MessageConsumer writer = this.supplyWriter(watcherIndex);
            FlushFW flush = this.flushRW.wrap(this.writeBuffer, 0, this.writeBuffer.capacity()).routeId(0L).streamId(0L).sequence(0L).acknowledge(0L).maximum(0).traceId(traceId).budgetId(budgetId).reserved(0).build();
            writer.accept(flush.typeId(), flush.buffer(), flush.offset(), flush.sizeof());
        }
    }

    private void doSystemWindow(long traceId, long budgetId, int reserved) {
        if (ReaktorConfiguration.DEBUG_BUDGETS) {
            System.out.format("[%d] [0x%016x] [0x%016x] doSystemWindow credit=%d \n", System.nanoTime(), traceId, budgetId, reserved);
        }
        int targetIndex = BudgetId.ownerIndex(budgetId);
        MessageConsumer writer = this.supplyWriter(targetIndex);
        WindowFW window = this.windowRW.wrap(this.writeBuffer, 0, this.writeBuffer.capacity()).routeId(0L).streamId(0L).sequence(0L).acknowledge(0L).maximum(reserved).traceId(traceId).budgetId(budgetId).padding(0).build();
        writer.accept(window.typeId(), window.buffer(), window.offset(), window.sizeof());
    }

    private Resolver newResolver() {
        return new Resolver(this.routesBufferRef, this.throttles, this::supplyInitialWriter);
    }

    public String roleName() {
        return this.elektronName;
    }

    public int doWork() throws Exception {
        int workDone = 0;
        try {
            for (Agent agent : this.agents) {
                workDone += agent.doWork();
            }
            if (this.timerWheel.timerCount() != 0L) {
                int expired;
                long now = System.currentTimeMillis();
                for (int expiredMax = this.expireLimit; this.timerWheel.currentTickTime() <= now && expiredMax > 0; expiredMax -= expired) {
                    expired = this.timerWheel.poll(now, this.expireHandler, expiredMax);
                    workDone += expired;
                }
            }
        }
        catch (Throwable ex) {
            ex.addSuppressed(new Exception(String.format("[%s]\t[0x%016x] %s", this.elektronName, this.lastReadStreamId, this.streamsLayout)));
            throw new AgentTerminationException(ex);
        }
        return workDone += this.streamsBuffer.read(this.readHandler, this.readLimit);
    }

    public long counter(String name) {
        LongSupplier counter = this.counters.readonlyCounter(name);
        return counter != null ? counter.getAsLong() : 0L;
    }

    public void onClose() {
        while (this.config.drainOnClose() && this.streamsBuffer.consumerPosition() < this.streamsBuffer.producerPosition()) {
            ThreadHints.onSpinWait();
        }
        for (Agent agent : this.agents) {
            agent.onClose();
        }
        int acquiredBuffers = 0;
        int acquiredCreditors = 0;
        long acquiredDebitors = 0L;
        if (this.config.syntheticAbort()) {
            Int2ObjectHashMap handlers = new Int2ObjectHashMap();
            int senderIndex = 0;
            while (senderIndex < this.streams.length) {
                handlers.clear();
                this.streams[senderIndex].forEach((arg_0, arg_1) -> ((Int2ObjectHashMap)handlers).put(arg_0, arg_1));
                int senderIndex0 = senderIndex++;
                handlers.forEach((id, handler) -> this.doSyntheticAbort(StreamId.streamId(this.localIndex, senderIndex0, id), (MessageConsumer)handler));
            }
            acquiredBuffers = this.bufferPool.acquiredSlots();
            acquiredCreditors = this.creditor.acquired();
            acquiredDebitors = this.debitorsByIndex.values().stream().mapToInt(DefaultBudgetDebitor::acquired).sum();
        }
        this.targetsByIndex.forEach((k, v) -> v.detach());
        this.targetsByIndex.forEach((k, v) -> CloseHelper.quietClose((AutoCloseable)v));
        CloseHelper.quietClose((AutoCloseable)this.streamsLayout);
        CloseHelper.quietClose((AutoCloseable)this.metricsLayout);
        CloseHelper.quietClose((AutoCloseable)this.bufferPoolLayout);
        this.debitorsByIndex.forEach((k, v) -> CloseHelper.quietClose((AutoCloseable)v));
        CloseHelper.quietClose((AutoCloseable)this.creditor);
        if (acquiredBuffers != 0 || acquiredCreditors != 0 || acquiredDebitors != 0L) {
            throw new IllegalStateException(String.format("Some resources not released: %d buffers, %d creditors, %d debitors", acquiredBuffers, acquiredCreditors, acquiredDebitors));
        }
    }

    public String toString() {
        return this.elektronName;
    }

    public void onRouteable(long routeId, Nukleus nukleus) {
        String nukleusName = nukleus.name();
        int localAddressId = RouteId.localId(routeId);
        String localAddress = this.labels.lookupLabel(localAddressId);
        BitSet affinity = this.affinityMask.apply(localAddress);
        if (affinity.get(this.localIndex)) {
            this.elektronByName.computeIfAbsent(nukleusName, name -> new ElektronRef((String)name, nukleus.supplyElektron(this.localIndex)));
        }
    }

    public void onRouted(Nukleus nukleus, RouteKind routeKind, long routeId, OctetsFW extension) {
        String nukleusName = nukleus.name();
        int localAddressId = RouteId.localId(routeId);
        int remoteAddressId = RouteId.remoteId(routeId);
        String localAddress = this.labels.lookupLabel(localAddressId);
        String remoteAddress = this.labels.lookupLabel(remoteAddressId);
        BitSet affinity = this.affinityMask.apply(localAddress);
        if (affinity.get(this.localIndex)) {
            this.elektronByName.computeIfPresent(nukleusName, (a, r) -> r.assign(routeKind, localAddressId));
            SignalFW signal = this.signalRW.wrap(this.signalBuffer, 0, this.signalBuffer.capacity()).routeId(routeId).streamId(0L).sequence(0L).acknowledge(0L).maximum(0).cancelId(-1L).signalId(1).extension(m -> m.set(this.visitRoutedSignalEx(routeId, nukleusName, routeKind, localAddress, remoteAddress, extension))).build();
            this.streamsBuffer.write(signal.typeId(), signal.buffer(), signal.offset(), signal.sizeof());
        }
    }

    public void onUnrouted(Nukleus nukleus, RouteKind routeKind, long routeId) {
        String nukleusName = nukleus.name();
        int localAddressId = RouteId.localId(routeId);
        String localAddress = this.labels.lookupLabel(localAddressId);
        BitSet affinity = this.affinityMask.apply(localAddress);
        if (affinity.get(this.localIndex)) {
            this.elektronByName.computeIfPresent(nukleusName, (a, r) -> r.unassign(routeKind, localAddressId));
            SignalFW signal = this.signalRW.wrap(this.signalBuffer, 0, this.signalBuffer.capacity()).routeId(routeId).streamId(0L).sequence(0L).acknowledge(0L).maximum(0).cancelId(-1L).signalId(2).extension(m -> m.set(this.visitUnroutedSignalEx(routeId, nukleusName))).build();
            this.streamsBuffer.write(signal.typeId(), signal.buffer(), signal.offset(), signal.sizeof());
        }
    }

    private Flyweight.Builder.Visitor visitRoutedSignalEx(long routeId, String nukleusName, RouteKind routeKind, String localAddress, String remoteAddress, OctetsFW extension) {
        return (b, o, l) -> this.reaktorSignalExRW.wrap(b, o, l).typeId(this.reaktorTypeId).route(r -> r.correlationId(routeId).nukleus(nukleusName).role(m -> m.set(Role.valueOf(routeKind.ordinal()))).localAddress(localAddress).remoteAddress(remoteAddress).extension(extension)).build().sizeof();
    }

    private Flyweight.Builder.Visitor visitUnroutedSignalEx(long routeId, String nukleusName) {
        return (b, o, l) -> this.reaktorSignalExRW.wrap(b, o, l).typeId(this.reaktorTypeId).unroute(u -> u.correlationId(routeId).nukleus(nukleusName).routeId(routeId)).build().sizeof();
    }

    private boolean handleExpire(TimeUnit timeUnit, long now, long timerId) {
        Runnable task = (Runnable)this.tasksByTimerId.remove(timerId);
        if (task != null) {
            task.run();
        }
        return true;
    }

    private void handleRead(int msgTypeId, MutableDirectBuffer buffer, int index, int length) {
        FrameFW frame = this.frameRO.wrap((DirectBuffer)buffer, index, index + length);
        long streamId = frame.streamId();
        long routeId = frame.routeId();
        long sequence = frame.sequence();
        long acknowledge = frame.acknowledge();
        int maximum = frame.maximum();
        this.lastReadStreamId = streamId;
        if (streamId == 0L) {
            this.onSystemMessage(msgTypeId, (DirectBuffer)buffer, index, length);
        } else if (StreamId.isInitial(streamId)) {
            this.handleReadInitial(routeId, streamId, sequence, acknowledge, maximum, msgTypeId, buffer, index, length);
        } else {
            this.handleReadReply(routeId, streamId, sequence, acknowledge, maximum, msgTypeId, buffer, index, length);
        }
    }

    private void handleReadInitial(long routeId, long streamId, long sequence, long acknowledge, int maximum, int msgTypeId, MutableDirectBuffer buffer, int index, int length) {
        int instanceId = StreamId.instanceId(streamId);
        if ((msgTypeId & 0x40000000) == 0) {
            Int2ObjectHashMap<MessageConsumer> dispatcher = this.streams[StreamId.streamIndex(streamId)];
            MessageConsumer handler = (MessageConsumer)dispatcher.get(instanceId);
            if (handler != null) {
                switch (msgTypeId) {
                    case 1: {
                        handler.accept(msgTypeId, (DirectBuffer)buffer, index, length);
                        break;
                    }
                    case 2: {
                        handler.accept(msgTypeId, (DirectBuffer)buffer, index, length);
                        break;
                    }
                    case 3: {
                        handler.accept(msgTypeId, (DirectBuffer)buffer, index, length);
                        dispatcher.remove(instanceId);
                        break;
                    }
                    case 4: {
                        handler.accept(msgTypeId, (DirectBuffer)buffer, index, length);
                        dispatcher.remove(instanceId);
                        break;
                    }
                    case 5: {
                        handler.accept(msgTypeId, (DirectBuffer)buffer, index, length);
                        break;
                    }
                    default: {
                        this.doReset(routeId, streamId, sequence, acknowledge, maximum);
                        break;
                    }
                }
            } else {
                this.handleDefaultReadInitial(msgTypeId, buffer, index, length);
            }
        } else {
            Int2ObjectHashMap<MessageConsumer> dispatcher = this.throttles[StreamId.throttleIndex(streamId)];
            MessageConsumer throttle = (MessageConsumer)dispatcher.get(instanceId);
            if (throttle != null) {
                ReadCounters counters = (ReadCounters)this.countersByRouteId.computeIfAbsent(routeId, this.newReadCounters);
                switch (msgTypeId) {
                    case 0x40000002: {
                        counters.windows.increment();
                        throttle.accept(msgTypeId, (DirectBuffer)buffer, index, length);
                        break;
                    }
                    case 0x40000001: {
                        counters.resets.increment();
                        throttle.accept(msgTypeId, (DirectBuffer)buffer, index, length);
                        dispatcher.remove(instanceId);
                        break;
                    }
                    case 0x40000003: {
                        SignalFW signal = this.signalRO.wrap((DirectBuffer)buffer, index, index + length);
                        long cancelId = signal.cancelId();
                        if (cancelId != -1L) {
                            this.futuresById.remove(cancelId);
                        }
                        throttle.accept(msgTypeId, (DirectBuffer)buffer, index, length);
                        break;
                    }
                    case 0x40000004: {
                        throttle.accept(msgTypeId, (DirectBuffer)buffer, index, length);
                        break;
                    }
                }
            } else {
                switch (msgTypeId) {
                    case 0x40000003: {
                        SignalFW signal = this.signalRO.wrap((DirectBuffer)buffer, index, index + length);
                        long cancelId = signal.cancelId();
                        if (cancelId == -1L) break;
                        this.futuresById.remove(cancelId);
                    }
                }
            }
        }
    }

    private void handleDefaultReadInitial(int msgTypeId, MutableDirectBuffer buffer, int index, int length) {
        switch (msgTypeId) {
            case 1: {
                MessageConsumer newHandler = this.handleBeginInitial(msgTypeId, (DirectBuffer)buffer, index, length);
                if (newHandler != null) {
                    newHandler.accept(msgTypeId, (DirectBuffer)buffer, index, length);
                    break;
                }
                FrameFW frame = this.frameRO.wrap((DirectBuffer)buffer, index, index + length);
                long streamId = frame.streamId();
                long routeId = frame.routeId();
                long sequence = frame.sequence();
                long acknowledge = frame.acknowledge();
                int maximum = frame.maximum();
                this.doReset(routeId, streamId, sequence, acknowledge, maximum);
                break;
            }
            case 2: {
                this.handleDroppedReadData(msgTypeId, (DirectBuffer)buffer, index, length);
            }
        }
    }

    private void handleDroppedReadFrame(int msgTypeId, DirectBuffer buffer, int index, int length) {
        switch (msgTypeId) {
            case 2: {
                this.handleDroppedReadData(msgTypeId, buffer, index, length);
            }
        }
    }

    private void handleDroppedReadData(int msgTypeId, DirectBuffer buffer, int index, int length) {
        assert (msgTypeId == 2);
        DataFW data = this.dataRO.wrap(buffer, index, index + length);
        long traceId = data.traceId();
        long budgetId = data.budgetId();
        int reserved = data.reserved();
        this.doSystemWindowIfNecessary(traceId, budgetId, reserved);
    }

    private void doSystemWindowIfNecessary(long traceId, long budgetId, int reserved) {
        if (budgetId != 0L && reserved > 0) {
            this.doSystemWindow(traceId, budgetId, reserved);
        }
    }

    private void handleReadReply(long routeId, long streamId, long sequence, long acknowledge, int maximum, int msgTypeId, MutableDirectBuffer buffer, int index, int length) {
        int instanceId = StreamId.instanceId(streamId);
        if ((msgTypeId & 0x40000000) == 0) {
            Int2ObjectHashMap<MessageConsumer> dispatcher = this.streams[StreamId.streamIndex(streamId)];
            MessageConsumer handler = (MessageConsumer)dispatcher.get(instanceId);
            if (handler != null) {
                ReadCounters counters = (ReadCounters)this.countersByRouteId.computeIfAbsent(routeId, this.newReadCounters);
                switch (msgTypeId) {
                    case 1: {
                        counters.opens.increment();
                        handler.accept(msgTypeId, (DirectBuffer)buffer, index, length);
                        break;
                    }
                    case 2: {
                        counters.frames.increment();
                        counters.bytes.getAndAdd((long)buffer.getInt(index + 73));
                        handler.accept(msgTypeId, (DirectBuffer)buffer, index, length);
                        break;
                    }
                    case 3: {
                        counters.closes.increment();
                        handler.accept(msgTypeId, (DirectBuffer)buffer, index, length);
                        dispatcher.remove(instanceId);
                        break;
                    }
                    case 4: {
                        counters.aborts.increment();
                        handler.accept(msgTypeId, (DirectBuffer)buffer, index, length);
                        dispatcher.remove(instanceId);
                        break;
                    }
                    case 5: {
                        handler.accept(msgTypeId, (DirectBuffer)buffer, index, length);
                        break;
                    }
                    default: {
                        this.doReset(routeId, streamId, sequence, acknowledge, maximum);
                        break;
                    }
                }
            } else {
                this.handleDefaultReadReply(msgTypeId, buffer, index, length);
            }
        } else {
            Int2ObjectHashMap<MessageConsumer> dispatcher = this.throttles[StreamId.throttleIndex(streamId)];
            MessageConsumer throttle = (MessageConsumer)dispatcher.get(instanceId);
            if (throttle != null) {
                switch (msgTypeId) {
                    case 0x40000002: {
                        throttle.accept(msgTypeId, (DirectBuffer)buffer, index, length);
                        break;
                    }
                    case 0x40000001: {
                        throttle.accept(msgTypeId, (DirectBuffer)buffer, index, length);
                        dispatcher.remove(instanceId);
                        break;
                    }
                    case 0x40000003: {
                        SignalFW signal = this.signalRO.wrap((DirectBuffer)buffer, index, index + length);
                        long cancelId = signal.cancelId();
                        if (cancelId != -1L) {
                            this.futuresById.remove(cancelId);
                        }
                        throttle.accept(msgTypeId, (DirectBuffer)buffer, index, length);
                        break;
                    }
                    case 0x40000004: {
                        throttle.accept(msgTypeId, (DirectBuffer)buffer, index, length);
                        break;
                    }
                }
            } else {
                switch (msgTypeId) {
                    case 0x40000003: {
                        SignalFW signal = this.signalRO.wrap((DirectBuffer)buffer, index, index + length);
                        long cancelId = signal.cancelId();
                        if (cancelId == -1L) break;
                        this.futuresById.remove(cancelId);
                    }
                }
            }
        }
    }

    private void handleDefaultReadReply(int msgTypeId, MutableDirectBuffer buffer, int index, int length) {
        if (msgTypeId == 1) {
            FrameFW frame = this.frameRO.wrap((DirectBuffer)buffer, index, index + length);
            long routeId = frame.routeId();
            long streamId = frame.streamId();
            long sequence = frame.sequence();
            long acknowledge = frame.acknowledge();
            int maximum = frame.maximum();
            MessageConsumer newHandler = this.handleBeginReply(msgTypeId, (DirectBuffer)buffer, index, length);
            if (newHandler != null) {
                ReadCounters counters = (ReadCounters)this.countersByRouteId.computeIfAbsent(routeId, this.newReadCounters);
                counters.opens.increment();
                newHandler.accept(msgTypeId, (DirectBuffer)buffer, index, length);
            } else {
                this.doReset(routeId, streamId, sequence, acknowledge, maximum);
            }
        } else if (msgTypeId == 2) {
            this.handleDroppedReadData(msgTypeId, (DirectBuffer)buffer, index, length);
        }
    }

    private MessageConsumer handleBeginInitial(int msgTypeId, DirectBuffer buffer, int index, int length) {
        MessageConsumer replyTo;
        BeginFW begin = this.beginRO.wrap(buffer, index, index + length);
        long routeId = begin.routeId();
        long streamId = begin.streamId();
        int addressId = RouteId.remoteId(routeId);
        MessageConsumer newStream = null;
        StreamFactory streamFactory = (StreamFactory)this.streamFactoriesByAddressId.get(addressId);
        if (streamFactory != null && (newStream = streamFactory.newStream(msgTypeId, buffer, index, length, replyTo = this.supplyReplyTo(streamId))) != null) {
            this.streams[StreamId.streamIndex(streamId)].put(StreamId.instanceId(streamId), (Object)newStream);
        }
        return newStream;
    }

    private MessageConsumer handleBeginReply(int msgTypeId, DirectBuffer buffer, int index, int length) {
        MessageConsumer replyTo;
        BeginFW begin = this.beginRO.wrap(buffer, index, index + length);
        long routeId = begin.routeId();
        long streamId = begin.streamId();
        int labelId = RouteId.localId(routeId);
        MessageConsumer newStream = null;
        StreamFactory streamFactory = (StreamFactory)this.streamFactoriesByAddressId.get(labelId);
        if (streamFactory != null && (newStream = streamFactory.newStream(msgTypeId, buffer, index, length, replyTo = this.supplyReplyTo(streamId))) != null) {
            this.streams[StreamId.streamIndex(streamId)].put(StreamId.instanceId(streamId), (Object)newStream);
        }
        return newStream;
    }

    private void doReset(long routeId, long streamId, long sequence, long acknowledge, int maximum) {
        ResetFW reset = this.resetRW.wrap(this.writeBuffer, 0, this.writeBuffer.capacity()).routeId(routeId).streamId(streamId).sequence(sequence).acknowledge(acknowledge).maximum(maximum).build();
        MessageConsumer replyTo = this.supplyReplyTo(streamId);
        replyTo.accept(reset.typeId(), reset.buffer(), reset.offset(), reset.sizeof());
    }

    private void doSyntheticAbort(long streamId, MessageConsumer stream) {
        long syntheticAbortRouteId = 0L;
        AbortFW abort = this.abortRW.wrap(this.writeBuffer, 0, this.writeBuffer.capacity()).routeId(0L).streamId(streamId).sequence(-1L).acknowledge(-1L).maximum(0).build();
        stream.accept(abort.typeId(), abort.buffer(), abort.offset(), abort.sizeof());
    }

    private MessageConsumer supplyReplyTo(long streamId) {
        int index = StreamId.streamIndex(streamId);
        return (MessageConsumer)this.writersByIndex.computeIfAbsent(index, this.supplyWriter);
    }

    private MessageConsumer supplyInitialWriter(long streamId) {
        int index = StreamId.remoteIndex(streamId);
        return (MessageConsumer)this.writersByIndex.computeIfAbsent(index, this.supplyWriter);
    }

    private MessageConsumer supplyWriter(int index) {
        return this.supplyTarget(index).writeHandler();
    }

    private Target supplyTarget(int index) {
        return (Target)this.targetsByIndex.computeIfAbsent(index, this.newTarget);
    }

    private Target newTarget(int index) {
        return new Target(this.config, index, this.writeBuffer, this.streams, this.throttles, this.newWriteCounters);
    }

    private ReadCounters newReadCounters(long routeId) {
        int localId = RouteId.localId(routeId);
        String nukleus = this.nukleus(localId);
        return new ReadCounters(this.counters, nukleus, routeId);
    }

    private WriteCounters newWriteCounters(long routeId) {
        int localId = RouteId.localId(routeId);
        String nukleus = this.nukleus(localId);
        return new WriteCounters(this.counters, nukleus, routeId);
    }

    private String nukleus(int localId) {
        String localAddress = this.labels.lookupLabel(localId);
        Matcher matcher = ADDRESS_PATTERN.matcher(localAddress);
        matcher.matches();
        return matcher.group(1);
    }

    private AddressFactory newAddressFactory(AddressFactoryBuilder addressFactoryBuilder) {
        return addressFactoryBuilder.setRouter(this.resolver).setWriteBuffer(this.writeBuffer).setTypeIdSupplier(this.labels::supplyLabelId).setTraceIdSupplier(this::supplyTraceId).setInitialIdSupplier(this::supplyInitialId).setReplyIdSupplier(this::supplyReplyId).build();
    }

    private StreamFactory newStreamFactory(Function<String, LongSupplier> supplyCounter, Function<String, LongConsumer> supplyAccumulator, Supplier<BufferPool> supplyCountingBufferPool, StreamFactoryBuilder streamFactoryBuilder) {
        return streamFactoryBuilder.setRouteManager(this.resolver).setExecutor(this.executor).setSignaler(this.signaler).setWriteBuffer(this.writeBuffer).setTypeIdSupplier(this.labels::supplyLabelId).setInitialIdSupplier(this::supplyInitialId).setReplyIdSupplier(this::supplyReplyId).setTraceIdSupplier(this::supplyTraceId).setBudgetIdSupplier(this::supplyBudgetId).setBudgetCreditor(this.creditor).setBudgetDebitorSupplier(this::supplyBudgetDebitor).setCounterSupplier(supplyCounter).setAccumulatorSupplier(supplyAccumulator).setBufferPoolSupplier(supplyCountingBufferPool).setDroppedFrameConsumer(this::handleDroppedReadFrame).setRemoteIndexSupplier(StreamId::remoteIndex).setHostResolver(this.resolveHost).build();
    }

    private StreamFactory supplyStreamFactory(int addressId) {
        return (StreamFactory)this.streamFactoriesByAddressId.get(addressId);
    }

    private MessageConsumer supplyThrottle(long streamId) {
        int instanceId = StreamId.instanceId(streamId);
        Int2ObjectHashMap<MessageConsumer> dispatcher = this.throttles[StreamId.throttleIndex(streamId)];
        return (MessageConsumer)dispatcher.get(instanceId);
    }

    private void removeThrottle(long streamId) {
        int instanceId = StreamId.instanceId(streamId);
        Int2ObjectHashMap<MessageConsumer> dispatcher = this.throttles[StreamId.throttleIndex(streamId)];
        dispatcher.remove(instanceId);
    }

    private long supplyInitialId(long routeId) {
        int remoteId = RouteId.remoteId(routeId);
        int remoteIndex = this.resolveRemoteIndex(remoteId);
        this.streamId += 2L;
        this.streamId &= this.mask;
        return (long)remoteIndex << 48 & 0xFF000000000000L | this.streamId & 0xFF00FFFFFFFFFFFFL | 1L;
    }

    private long supplyReplyId(long initialId) {
        assert (StreamId.isInitial(initialId));
        return initialId & 0xFFFFFFFFFFFFFFFEL;
    }

    private long supplyBudgetId() {
        ++this.budgetId;
        this.budgetId &= this.mask;
        return this.budgetId;
    }

    private BudgetDebitor supplyBudgetDebitor(long budgetId) {
        int ownerIndex = (int)(budgetId >> this.shift & 0xFFFFFFFFFFFFFFFFL);
        return (BudgetDebitor)this.debitorsByIndex.computeIfAbsent(ownerIndex, this::newBudgetDebitor);
    }

    private DefaultBudgetDebitor newBudgetDebitor(int ownerIndex) {
        BudgetsLayout layout = new BudgetsLayout.Builder().path(this.config.directory().resolve(String.format("budgets%d", ownerIndex))).owner(false).build();
        return new DefaultBudgetDebitor(this.localIndex, ownerIndex, layout);
    }

    private long supplyTraceId() {
        ++this.traceId;
        this.traceId &= this.mask;
        return this.traceId;
    }

    private int resolveRemoteIndex(int remoteId) {
        Affinity affinity = this.supplyAffinity(remoteId);
        BitSet mask = affinity.mask;
        int remoteIndex = affinity.nextIndex;
        assert (mask.cardinality() != 0);
        if (remoteIndex != this.localIndex) {
            int nextIndex = affinity.mask.nextSetBit(remoteIndex + 1);
            if (nextIndex == -1) {
                nextIndex = affinity.mask.nextSetBit(0);
            }
            affinity.nextIndex = nextIndex;
        }
        return remoteIndex;
    }

    private Affinity supplyAffinity(int remoteId) {
        return (Affinity)this.affinityByRemoteId.computeIfAbsent((long)remoteId, this.resolveAffinity);
    }

    public Affinity resolveAffinity(long remoteIdAsLong) {
        int remoteId = (int)(remoteIdAsLong & 0xFFFFFFFFL);
        String remoteAddress = this.labels.lookupLabel(remoteId);
        BitSet mask = this.affinityMask.apply(remoteAddress);
        if (mask.cardinality() == 0) {
            throw new IllegalStateException(String.format("affinity mask must specify at least one bit: %s %d", remoteAddress, mask));
        }
        Affinity affinity = new Affinity();
        affinity.mask = mask;
        affinity.nextIndex = mask.get(this.localIndex) ? this.localIndex : mask.nextSetBit(0);
        return affinity;
    }

    private static SignalFW.Builder newSignalRW() {
        UnsafeBuffer buffer = new UnsafeBuffer(new byte[512]);
        return new SignalFW.Builder().wrap((MutableDirectBuffer)buffer, 0, buffer.capacity());
    }

    private Int2ObjectHashMap<MessageConsumer>[] initDispatcher() {
        Int2ObjectHashMap[] dispatcher = new Int2ObjectHashMap[64];
        for (int i = 0; i < dispatcher.length; ++i) {
            dispatcher[i] = new Int2ObjectHashMap();
        }
        return dispatcher;
    }

    static /* synthetic */ Agent[] access$1402(ElektronAgent x0, Agent[] x1) {
        x0.agents = x1;
        return x1;
    }

    private final class ElektronExecutor
    implements SignalingExecutor {
        private final ThreadLocal<SignalFW.Builder> signalRW = ThreadLocal.withInitial(() -> ElektronAgent.access$2300());
        private final ExecutorService executorService;

        private ElektronExecutor(ExecutorService executorService) {
            this.executorService = executorService;
        }

        @Override
        public Future<?> schedule(long delay, TimeUnit unit, long routeId, long streamId, long signalId) {
            final long timerId = ElektronAgent.this.timerWheel.scheduleTimer(System.currentTimeMillis() + unit.toMillis(delay));
            FutureTask<Object> task = new FutureTask<Object>(() -> this.signal(routeId, streamId, signalId), null){

                @Override
                protected void done() {
                    if (this.isCancelled()) {
                        ElektronAgent.this.tasksByTimerId.remove(timerId);
                        ElektronAgent.this.timerWheel.cancelTimer(timerId);
                    }
                }
            };
            ElektronAgent.this.tasksByTimerId.put(timerId, (Object)task);
            return task;
        }

        @Override
        public Future<?> execute(Runnable task, long routeId, long streamId, long signalId) {
            if (this.executorService != null) {
                return this.executorService.submit(() -> this.invokeAndSignal(task, routeId, streamId, signalId));
            }
            this.invokeAndSignal(task, routeId, streamId, signalId);
            return new Future<Void>(){

                @Override
                public boolean cancel(boolean mayInterruptIfRunning) {
                    return false;
                }

                @Override
                public boolean isCancelled() {
                    return false;
                }

                @Override
                public boolean isDone() {
                    return true;
                }

                @Override
                public Void get() throws InterruptedException, ExecutionException {
                    return null;
                }

                @Override
                public Void get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
                    return null;
                }
            };
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        private void invokeAndSignal(Runnable task, long routeId, long streamId, long signalId) {
            try {
                task.run();
            }
            finally {
                this.signal(routeId, streamId, signalId);
            }
        }

        private void signal(long routeId, long streamId, long signalId) {
            long timestamp = ElektronAgent.this.timestamps ? System.nanoTime() : 0L;
            SignalFW signal = this.signalRW.get().rewrap().routeId(routeId).streamId(streamId).sequence(0L).acknowledge(0L).maximum(0).timestamp(timestamp).traceId(ElektronAgent.this.supplyTraceId()).cancelId(-1L).signalId((int)signalId).build();
            ElektronAgent.this.streamsBuffer.write(signal.typeId(), signal.buffer(), signal.offset(), signal.sizeof());
        }
    }

    private final class ElektronSignaler
    implements Signaler {
        private final ThreadLocal<SignalFW.Builder> signalRW = ThreadLocal.withInitial(() -> ElektronAgent.access$2300());
        private final ExecutorService executorService;
        private long nextFutureId;

        private ElektronSignaler(ExecutorService executorService) {
            this.executorService = executorService;
        }

        public void executeTaskAt(long timeMillis, Runnable task) {
            long timerId = ElektronAgent.this.timerWheel.scheduleTimer(timeMillis);
            Runnable oldTask = (Runnable)ElektronAgent.this.tasksByTimerId.put(timerId, (Object)task);
            assert (oldTask == null);
            assert (timerId >= 0L);
        }

        @Override
        public long signalAt(long timeMillis, int signalId, IntConsumer handler) {
            long timerId = ElektronAgent.this.timerWheel.scheduleTimer(timeMillis);
            Runnable task = () -> handler.accept(signalId);
            Runnable oldTask = (Runnable)ElektronAgent.this.tasksByTimerId.put(timerId, (Object)task);
            assert (oldTask == null);
            assert (timerId >= 0L);
            return timerId;
        }

        @Override
        public long signalAt(long timeMillis, long routeId, long streamId, int signalId) {
            long timerId = ElektronAgent.this.timerWheel.scheduleTimer(timeMillis);
            Runnable task = () -> this.signal(routeId, streamId, 0L, 0L, -1L, signalId);
            Runnable oldTask = (Runnable)ElektronAgent.this.tasksByTimerId.put(timerId, (Object)task);
            assert (oldTask == null);
            assert (timerId >= 0L);
            return timerId;
        }

        @Override
        public long signalTask(Runnable task, long routeId, long streamId, int signalId) {
            long cancelId;
            if (this.executorService != null) {
                this.nextFutureId = this.nextFutureId + 1L & Long.MAX_VALUE;
                long newFutureId = this.nextFutureId << 1 | 0x8000000000000001L;
                assert (newFutureId != -1L);
                Future<?> newFuture = this.executorService.submit(() -> this.invokeAndSignal(task, routeId, streamId, 0L, 0L, newFutureId, signalId));
                Future oldFuture = (Future)ElektronAgent.this.futuresById.put(newFutureId, newFuture);
                assert (oldFuture == null);
                cancelId = newFutureId;
            } else {
                cancelId = -1L;
                this.invokeAndSignal(task, routeId, streamId, 0L, 0L, cancelId, signalId);
            }
            assert (cancelId < 0L);
            return cancelId;
        }

        @Override
        public void signalNow(long routeId, long streamId, int signalId) {
            this.signal(routeId, streamId, 0L, 0L, -1L, signalId);
        }

        @Override
        public boolean cancel(long cancelId) {
            boolean cancelled = false;
            if (cancelId > 0L) {
                long timerId = cancelId;
                cancelled = ElektronAgent.this.timerWheel.cancelTimer(timerId);
                ElektronAgent.this.tasksByTimerId.remove(timerId);
            } else if (cancelId != -1L) {
                long futureId = cancelId;
                Future future = (Future)ElektronAgent.this.futuresById.remove(futureId);
                cancelled = future != null && future.cancel(true);
            }
            return cancelled;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        private void invokeAndSignal(Runnable task, long routeId, long streamId, long sequence, long acknowledge, long cancelId, int signalId) {
            try {
                task.run();
            }
            finally {
                this.signal(routeId, streamId, sequence, acknowledge, cancelId, signalId);
            }
        }

        private void signal(long routeId, long streamId, long sequence, long acknowledge, long cancelId, int signalId) {
            long timestamp = ElektronAgent.this.timestamps ? System.nanoTime() : 0L;
            SignalFW signal = this.signalRW.get().rewrap().routeId(routeId).streamId(streamId).sequence(sequence).acknowledge(acknowledge).maximum(0).timestamp(timestamp).traceId(ElektronAgent.this.supplyTraceId()).cancelId(cancelId).signalId(signalId).build();
            ElektronAgent.this.streamsBuffer.write(signal.typeId(), signal.buffer(), signal.offset(), signal.sizeof());
        }
    }

    private static class ResolverRef
    implements RouteManager {
        private final ThreadLocal<Resolver> resolver;

        ResolverRef(Supplier<Resolver> supplyResolver) {
            this.resolver = ThreadLocal.withInitial(supplyResolver);
        }

        @Override
        public <R> R resolveExternal(long authorization, MessagePredicate filter, MessageFunction<R> mapper) {
            return this.resolver.get().resolveExternal(authorization, filter, mapper);
        }

        @Override
        public <R> R resolve(long routeId, long authorization, MessagePredicate filter, MessageFunction<R> mapper) {
            return this.resolver.get().resolve(routeId, authorization, filter, mapper);
        }

        @Override
        public void forEach(MessageConsumer consumer) {
            this.resolver.get().forEach(consumer);
        }

        @Override
        public MessageConsumer supplyReceiver(long streamId) {
            return this.resolver.get().supplyReceiver(streamId);
        }

        @Override
        public void setThrottle(long streamId, MessageConsumer throttle) {
            this.resolver.get().setThrottle(streamId, throttle);
        }

        @Override
        public void clearThrottle(long streamId) {
            this.resolver.get().clearThrottle(streamId);
        }
    }

    private final class ElektronRef {
        private final Elektron elektron;
        private final Map<RouteKind, StreamFactory> streamFactories;
        private final Map<RouteKind, AddressFactory> addressFactories;
        private int count;

        private ElektronRef(String nukleusName, Elektron elekron) {
            this.elektron = Objects.requireNonNull(elekron);
            EnumMap<RouteKind, StreamFactory> streamFactories = new EnumMap<RouteKind, StreamFactory>(RouteKind.class);
            EnumMap<RouteKind, AddressFactory> addressFactories = new EnumMap<RouteKind, AddressFactory>(RouteKind.class);
            HashMap countersByName = new HashMap();
            Function<String, AtomicCounter> newCounter = ElektronAgent.this.counters::counter;
            Function<String, LongSupplier> supplyCounter = name -> () -> ((AtomicCounter)countersByName.computeIfAbsent(name, newCounter)).increment() + 1L;
            Function<String, LongConsumer> supplyAccumulator = name -> inc -> ElektronAgent.this.counters.counter((String)name).getAndAdd(inc);
            AtomicCounter acquires = ElektronAgent.this.counters.counter(String.format("%s.acquires", nukleusName));
            AtomicCounter releases = ElektronAgent.this.counters.counter(String.format("%s.releases", nukleusName));
            CountingBufferPool countingPool = new CountingBufferPool(ElektronAgent.this.bufferPool, () -> ((AtomicCounter)acquires).increment(), () -> ((AtomicCounter)releases).increment());
            Supplier<BufferPool> supplyCountingBufferPool = () -> countingPool;
            for (RouteKind routeKind : EnumSet.allOf(RouteKind.class)) {
                StreamFactoryBuilder streamFactoryBuilder;
                AddressFactoryBuilder addressFactoryBuilder = this.elektron.addressFactoryBuilder(routeKind);
                if (addressFactoryBuilder != null) {
                    AddressFactory addressFactory = ElektronAgent.this.newAddressFactory(addressFactoryBuilder);
                    addressFactories.put(routeKind, addressFactory);
                }
                if ((streamFactoryBuilder = this.elektron.streamFactoryBuilder(routeKind)) == null) continue;
                StreamFactory streamFactory = ElektronAgent.this.newStreamFactory(supplyCounter, supplyAccumulator, supplyCountingBufferPool, streamFactoryBuilder);
                streamFactories.put(routeKind, streamFactory);
            }
            this.addressFactories = addressFactories;
            this.streamFactories = streamFactories;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public ElektronRef assign(RouteKind routeKind, int labelId) {
            ElektronRef elektronRef = this;
            synchronized (elektronRef) {
                StreamFactory streamFactory;
                Agent agent;
                if (this.count == 0 && (agent = this.elektron.agent()) != null) {
                    ElektronAgent.access$1402(ElektronAgent.this, (Agent[])ArrayUtil.add((Object[])ElektronAgent.this.agents, (Object)agent));
                }
                if ((streamFactory = this.streamFactories.get((Object)routeKind)) != null) {
                    ElektronAgent.this.streamFactoriesByAddressId.put(labelId, (Object)streamFactory);
                }
                ++this.count;
            }
            return this;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public ElektronRef unassign(RouteKind routeKind, int labelId) {
            ElektronRef elektronRef = this;
            synchronized (elektronRef) {
                --this.count;
                if (this.count == 0) {
                    StreamFactory streamFactory = (StreamFactory)ElektronAgent.this.streamFactoriesByAddressId.remove(labelId);
                    assert (streamFactory == this.streamFactories.get((Object)routeKind));
                    final Agent agent = this.elektron.agent();
                    if (agent != null) {
                        ElektronAgent.access$1402(ElektronAgent.this, (Agent[])ArrayUtil.remove((Object[])ElektronAgent.this.agents, (Object)agent));
                        Agent closeAgent = new Agent(){

                            public int doWork() throws Exception {
                                CloseHelper.quietClose(() -> ((Agent)agent).onClose());
                                ElektronAgent.access$1402(ElektronAgent.this, (Agent[])ArrayUtil.remove((Object[])ElektronAgent.this.agents, (Object)this));
                                return 1;
                            }

                            public String roleName() {
                                return String.format("%s (deferred close)", agent.roleName());
                            }
                        };
                        ElektronAgent.access$1402(ElektronAgent.this, (Agent[])ArrayUtil.add((Object[])ElektronAgent.this.agents, (Object)closeAgent));
                    }
                }
            }
            return this;
        }
    }

    private static final class ReadCounters {
        private final AtomicCounter opens;
        private final AtomicCounter closes;
        private final AtomicCounter aborts;
        private final AtomicCounter windows;
        private final AtomicCounter resets;
        private final AtomicCounter bytes;
        private final AtomicCounter frames;

        ReadCounters(Counters counters, String nukleus, long routeId) {
            this.opens = counters.counter(String.format("%s.%d.opens.read", nukleus, routeId));
            this.closes = counters.counter(String.format("%s.%d.closes.read", nukleus, routeId));
            this.aborts = counters.counter(String.format("%s.%d.aborts.read", nukleus, routeId));
            this.windows = counters.counter(String.format("%s.%d.windows.read", nukleus, routeId));
            this.resets = counters.counter(String.format("%s.%d.resets.read", nukleus, routeId));
            this.bytes = counters.counter(String.format("%s.%d.bytes.read", nukleus, routeId));
            this.frames = counters.counter(String.format("%s.%d.frames.read", nukleus, routeId));
        }
    }

    private static class Affinity {
        BitSet mask;
        int nextIndex;

        private Affinity() {
        }
    }
}

