/*
 * Decompiled with CFR 0.152.
 */
package org.reaktivity.reaktor.internal.context;

import java.net.InetAddress;
import java.net.MalformedURLException;
import java.net.URL;
import java.nio.channels.SelectableChannel;
import java.util.BitSet;
import java.util.Collection;
import java.util.Deque;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.function.IntConsumer;
import java.util.function.IntFunction;
import java.util.function.LongConsumer;
import java.util.function.LongFunction;
import java.util.function.LongSupplier;
import org.agrona.CloseHelper;
import org.agrona.DeadlineTimerWheel;
import org.agrona.DirectBuffer;
import org.agrona.ErrorHandler;
import org.agrona.LangUtil;
import org.agrona.MutableDirectBuffer;
import org.agrona.collections.Int2ObjectHashMap;
import org.agrona.collections.Long2ObjectHashMap;
import org.agrona.concurrent.Agent;
import org.agrona.concurrent.AgentRunner;
import org.agrona.concurrent.AgentTerminationException;
import org.agrona.concurrent.BackoffIdleStrategy;
import org.agrona.concurrent.IdleStrategy;
import org.agrona.concurrent.MessageHandler;
import org.agrona.concurrent.UnsafeBuffer;
import org.agrona.concurrent.ringbuffer.RingBuffer;
import org.agrona.concurrent.status.AtomicCounter;
import org.agrona.concurrent.status.CountersManager;
import org.agrona.hints.ThreadHints;
import org.reaktivity.reaktor.ReaktorConfiguration;
import org.reaktivity.reaktor.config.Binding;
import org.reaktivity.reaktor.config.Namespace;
import org.reaktivity.reaktor.internal.Counters;
import org.reaktivity.reaktor.internal.LabelManager;
import org.reaktivity.reaktor.internal.budget.DefaultBudgetCreditor;
import org.reaktivity.reaktor.internal.budget.DefaultBudgetDebitor;
import org.reaktivity.reaktor.internal.context.BindingContext;
import org.reaktivity.reaktor.internal.context.ConfigurationContext;
import org.reaktivity.reaktor.internal.context.NamespaceTask;
import org.reaktivity.reaktor.internal.context.VaultContext;
import org.reaktivity.reaktor.internal.layouts.BudgetsLayout;
import org.reaktivity.reaktor.internal.layouts.BufferPoolLayout;
import org.reaktivity.reaktor.internal.layouts.MetricsLayout;
import org.reaktivity.reaktor.internal.layouts.StreamsLayout;
import org.reaktivity.reaktor.internal.poller.Poller;
import org.reaktivity.reaktor.internal.stream.BudgetId;
import org.reaktivity.reaktor.internal.stream.NamespacedId;
import org.reaktivity.reaktor.internal.stream.StreamId;
import org.reaktivity.reaktor.internal.stream.Target;
import org.reaktivity.reaktor.internal.stream.WriteCounters;
import org.reaktivity.reaktor.internal.types.stream.AbortFW;
import org.reaktivity.reaktor.internal.types.stream.BeginFW;
import org.reaktivity.reaktor.internal.types.stream.DataFW;
import org.reaktivity.reaktor.internal.types.stream.FlushFW;
import org.reaktivity.reaktor.internal.types.stream.FrameFW;
import org.reaktivity.reaktor.internal.types.stream.ResetFW;
import org.reaktivity.reaktor.internal.types.stream.SignalFW;
import org.reaktivity.reaktor.internal.types.stream.WindowFW;
import org.reaktivity.reaktor.nukleus.Elektron;
import org.reaktivity.reaktor.nukleus.ElektronContext;
import org.reaktivity.reaktor.nukleus.Nukleus;
import org.reaktivity.reaktor.nukleus.budget.BudgetCreditor;
import org.reaktivity.reaktor.nukleus.budget.BudgetDebitor;
import org.reaktivity.reaktor.nukleus.buffer.BufferPool;
import org.reaktivity.reaktor.nukleus.concurrent.Signaler;
import org.reaktivity.reaktor.nukleus.function.MessageConsumer;
import org.reaktivity.reaktor.nukleus.poller.PollerKey;
import org.reaktivity.reaktor.nukleus.stream.StreamFactory;
import org.reaktivity.reaktor.nukleus.vault.BindingVault;

public class DispatchAgent
implements ElektronContext,
Agent {
    private static final int SIGNAL_TASK_QUEUED = 1;
    private final FrameFW frameRO = new FrameFW();
    private final BeginFW beginRO = new BeginFW();
    private final DataFW dataRO = new DataFW();
    private final FlushFW flushRO = new FlushFW();
    private final WindowFW windowRO = new WindowFW();
    private final SignalFW signalRO = new SignalFW();
    private final AbortFW.Builder abortRW = new AbortFW.Builder();
    private final FlushFW.Builder flushRW = new FlushFW.Builder();
    private final ResetFW.Builder resetRW = new ResetFW.Builder();
    private final WindowFW.Builder windowRW = new WindowFW.Builder();
    private final int localIndex;
    private final ReaktorConfiguration config;
    private final URL configURL;
    private final LabelManager labels;
    private final String agentName;
    private final Counters counters;
    private final Function<String, InetAddress[]> resolveHost;
    private final boolean timestamps;
    private final MetricsLayout metricsLayout;
    private final StreamsLayout streamsLayout;
    private final BufferPoolLayout bufferPoolLayout;
    private final RingBuffer streamsBuffer;
    private final MutableDirectBuffer writeBuffer;
    private final Int2ObjectHashMap<MessageConsumer>[] streams;
    private final Int2ObjectHashMap<MessageConsumer>[] throttles;
    private final Long2ObjectHashMap<ReadCounters> countersByRouteId;
    private final Int2ObjectHashMap<MessageConsumer> writersByIndex;
    private final Int2ObjectHashMap<Target> targetsByIndex;
    private final BufferPool bufferPool;
    private final int shift;
    private final long mask;
    private final MessageHandler readHandler;
    private final DeadlineTimerWheel.TimerHandler expireHandler;
    private final int readLimit;
    private final int expireLimit;
    private final LongFunction<? extends ReadCounters> newReadCounters;
    private final IntFunction<MessageConsumer> supplyWriter;
    private final IntFunction<Target> newTarget;
    private final LongFunction<WriteCounters> newWriteCounters;
    private final LongFunction<Affinity> resolveAffinity;
    private final Poller poller;
    private final DefaultBudgetCreditor creditor;
    private final Int2ObjectHashMap<DefaultBudgetDebitor> debitorsByIndex;
    private final Map<String, AtomicCounter> countersByName;
    private final Long2ObjectHashMap<Affinity> affinityByRouteId;
    private final DeadlineTimerWheel timerWheel;
    private final Long2ObjectHashMap<Runnable> tasksByTimerId;
    private final Long2ObjectHashMap<Future<?>> futuresById;
    private final ElektronSignaler signaler;
    private final Long2ObjectHashMap<MessageConsumer> correlations;
    private final ConfigurationContext configuration;
    private final Deque<Runnable> taskQueue;
    private final LongFunction<BitSet> affinityMask;
    private final AgentRunner runner;
    private long iniitalId;
    private long traceId;
    private long budgetId;
    private long lastReadStreamId;

    public DispatchAgent(ReaktorConfiguration config, URL configURL, ExecutorService executor, LabelManager labels, ErrorHandler errorHandler, LongFunction<BitSet> affinityMask, Collection<Nukleus> nuklei, int index) {
        this.localIndex = index;
        this.config = config;
        this.configURL = configURL;
        this.labels = labels;
        this.affinityMask = affinityMask;
        BackoffIdleStrategy idleStrategy = new BackoffIdleStrategy(config.maxSpins(), config.maxYields(), config.minParkNanos(), config.maxParkNanos());
        MetricsLayout metricsLayout = new MetricsLayout.Builder().path(config.directory().resolve(String.format("metrics%d", index))).labelsBufferCapacity(config.counterLabelsBufferCapacity()).valuesBufferCapacity(config.counterValuesBufferCapacity()).build();
        StreamsLayout streamsLayout = new StreamsLayout.Builder().path(config.directory().resolve(String.format("data%d", index))).streamsCapacity(config.streamsBufferCapacity()).readonly(false).build();
        BufferPoolLayout bufferPoolLayout = new BufferPoolLayout.Builder().path(config.directory().resolve(String.format("buffers%d", index))).slotCapacity(config.bufferSlotCapacity()).slotCount(config.bufferPoolCapacity() / config.bufferSlotCapacity()).readonly(false).build();
        this.agentName = String.format("reaktor/data#%d", index);
        this.metricsLayout = metricsLayout;
        this.streamsLayout = streamsLayout;
        this.bufferPoolLayout = bufferPoolLayout;
        this.runner = new AgentRunner((IdleStrategy)idleStrategy, errorHandler, null, (Agent)this);
        CountersManager countersManager = new CountersManager(metricsLayout.labelsBuffer(), metricsLayout.valuesBuffer());
        this.counters = new Counters(countersManager);
        this.resolveHost = config.hostResolver();
        this.timestamps = config.timestamps();
        this.readLimit = config.maximumMessagesPerRead();
        this.expireLimit = config.maximumExpirationsPerPoll();
        this.streamsBuffer = streamsLayout.streamsBuffer();
        this.writeBuffer = new UnsafeBuffer(new byte[config.bufferSlotCapacity() + 1024]);
        this.streams = this.initDispatcher();
        this.throttles = this.initDispatcher();
        this.countersByRouteId = new Long2ObjectHashMap();
        this.readHandler = this::handleRead;
        this.expireHandler = this::handleExpire;
        this.newReadCounters = this::newReadCounters;
        this.supplyWriter = this::supplyWriter;
        this.newTarget = this::newTarget;
        this.newWriteCounters = this::newWriteCounters;
        this.resolveAffinity = this::resolveAffinity;
        this.affinityByRouteId = new Long2ObjectHashMap();
        this.targetsByIndex = new Int2ObjectHashMap();
        this.writersByIndex = new Int2ObjectHashMap();
        this.timerWheel = new DeadlineTimerWheel(TimeUnit.MILLISECONDS, System.currentTimeMillis(), 512L, 1024);
        this.tasksByTimerId = new Long2ObjectHashMap();
        this.futuresById = new Long2ObjectHashMap();
        this.signaler = new ElektronSignaler(executor);
        this.poller = new Poller();
        BufferPool bufferPool = bufferPoolLayout.bufferPool();
        int reserved = 8;
        int shift = 56;
        long initial = (long)index << 56;
        long mask = initial | 0xFFFFFFFFFFFFFFL;
        this.shift = 56;
        this.mask = mask;
        this.bufferPool = bufferPool;
        this.iniitalId = initial;
        this.traceId = initial;
        this.budgetId = initial;
        BudgetsLayout budgetsLayout = new BudgetsLayout.Builder().path(config.directory().resolve(String.format("budgets%d", index))).capacity(config.budgetsBufferCapacity()).owner(true).build();
        this.creditor = new DefaultBudgetCreditor(index, budgetsLayout, this::doSystemFlush, this::supplyBudgetId, this.signaler::executeTaskAt, config.childCleanupLingerMillis());
        this.debitorsByIndex = new Int2ObjectHashMap();
        this.countersByName = new HashMap<String, AtomicCounter>();
        LinkedHashMap<String, Elektron> elektronsByName = new LinkedHashMap<String, Elektron>();
        for (Nukleus nukleus : nuklei) {
            String name = nukleus.name();
            elektronsByName.put(name, nukleus.supplyElektron(this));
        }
        this.configuration = new ConfigurationContext(elektronsByName::get, labels::supplyLabelId);
        this.taskQueue = new ConcurrentLinkedDeque<Runnable>();
        this.correlations = new Long2ObjectHashMap();
    }

    @Override
    public int index() {
        return this.localIndex;
    }

    @Override
    public Signaler signaler() {
        return this.signaler;
    }

    @Override
    public String supplyNamespace(long routeId) {
        return this.labels.lookupLabel(NamespacedId.namespaceId(routeId));
    }

    @Override
    public String supplyLocalName(long routeId) {
        return this.labels.lookupLabel(NamespacedId.localId(routeId));
    }

    @Override
    public int supplyTypeId(String name) {
        return this.labels.supplyLabelId(name);
    }

    @Override
    public long supplyInitialId(long routeId) {
        int remoteIndex = this.resolveRemoteIndex(routeId);
        this.iniitalId += 2L;
        this.iniitalId &= this.mask;
        return (long)remoteIndex << 48 & 0xFF000000000000L | this.iniitalId & 0xFF00FFFFFFFFFFFFL | 1L;
    }

    @Override
    public long supplyReplyId(long initialId) {
        assert (StreamId.isInitial(initialId));
        return initialId & 0xFFFFFFFFFFFFFFFEL;
    }

    @Override
    public long supplyBudgetId() {
        ++this.budgetId;
        this.budgetId &= this.mask;
        return this.budgetId;
    }

    @Override
    public long supplyTraceId() {
        ++this.traceId;
        this.traceId &= this.mask;
        return this.traceId;
    }

    @Override
    public void detachSender(long streamId) {
        this.throttles[StreamId.throttleIndex(streamId)].remove(StreamId.instanceId(streamId));
    }

    @Override
    public BudgetCreditor creditor() {
        return this.creditor;
    }

    @Override
    public BudgetDebitor supplyDebitor(long budgetId) {
        int ownerIndex = (int)(budgetId >> this.shift & 0xFFFFFFFFFFFFFFFFL);
        return (BudgetDebitor)this.debitorsByIndex.computeIfAbsent(ownerIndex, this::newBudgetDebitor);
    }

    @Override
    public MutableDirectBuffer writeBuffer() {
        return this.writeBuffer;
    }

    @Override
    public BufferPool bufferPool() {
        return this.bufferPool;
    }

    @Override
    public LongSupplier supplyCounter(String name) {
        return () -> this.supplyAtomicCounter(name).increment() + 1L;
    }

    @Override
    public LongConsumer supplyAccumulator(String name) {
        return increment -> this.supplyAtomicCounter(name).getAndAdd(increment);
    }

    @Override
    public MessageConsumer droppedFrameHandler() {
        return this::handleDroppedReadFrame;
    }

    @Override
    public int supplyRemoteIndex(long streamId) {
        return StreamId.remoteIndex(streamId);
    }

    @Override
    public InetAddress[] resolveHost(String host) {
        return this.resolveHost.apply(host);
    }

    @Override
    public PollerKey supplyPollerKey(SelectableChannel channel) {
        return this.poller.register(channel);
    }

    @Override
    public long supplyRouteId(Namespace namespace, Binding binding) {
        int namespaceId = this.labels.supplyLabelId(namespace.name);
        int bindingId = this.labels.supplyLabelId(binding.entry);
        return NamespacedId.id(namespaceId, bindingId);
    }

    @Override
    public StreamFactory streamFactory() {
        return this::newStream;
    }

    @Override
    public BindingVault supplyVault(long vaultId) {
        VaultContext vault = this.configuration.resolveVault(vaultId);
        return vault != null ? vault.vaultFactory() : null;
    }

    @Override
    public URL resolvePath(String path) {
        URL resolved = null;
        try {
            resolved = new URL(this.configURL, path);
        }
        catch (MalformedURLException ex) {
            LangUtil.rethrowUnchecked((Throwable)ex);
        }
        return resolved;
    }

    public String roleName() {
        return this.agentName;
    }

    public int doWork() throws Exception {
        int workDone = 0;
        try {
            workDone += this.poller.doWork();
            if (this.timerWheel.timerCount() != 0L) {
                int expired;
                long now = System.currentTimeMillis();
                for (int expiredMax = this.expireLimit; this.timerWheel.currentTickTime() <= now && expiredMax > 0; expiredMax -= expired) {
                    expired = this.timerWheel.poll(now, this.expireHandler, expiredMax);
                    workDone += expired;
                }
            }
        }
        catch (Throwable ex) {
            ex.addSuppressed(new Exception(String.format("[%s]\t[0x%016x] %s", this.agentName, this.lastReadStreamId, this.streamsLayout)));
            throw new AgentTerminationException(ex);
        }
        return workDone += this.streamsBuffer.read(this.readHandler, this.readLimit);
    }

    public void onClose() {
        while (this.config.drainOnClose() && this.streamsBuffer.consumerPosition() < this.streamsBuffer.producerPosition()) {
            ThreadHints.onSpinWait();
        }
        this.configuration.detachAll();
        this.poller.onClose();
        int acquiredBuffers = 0;
        int acquiredCreditors = 0;
        long acquiredDebitors = 0L;
        if (this.config.syntheticAbort()) {
            Int2ObjectHashMap handlers = new Int2ObjectHashMap();
            int senderIndex = 0;
            while (senderIndex < this.streams.length) {
                handlers.clear();
                this.streams[senderIndex].forEach((arg_0, arg_1) -> ((Int2ObjectHashMap)handlers).put(arg_0, arg_1));
                int senderIndex0 = senderIndex++;
                handlers.forEach((id, handler) -> this.doSyntheticAbort(StreamId.streamId(this.localIndex, senderIndex0, id), (MessageConsumer)handler));
            }
            acquiredBuffers = this.bufferPool.acquiredSlots();
            acquiredCreditors = this.creditor.acquired();
            acquiredDebitors = this.debitorsByIndex.values().stream().mapToInt(DefaultBudgetDebitor::acquired).sum();
        }
        this.targetsByIndex.forEach((k, v) -> v.detach());
        this.targetsByIndex.forEach((k, v) -> CloseHelper.quietClose((AutoCloseable)v));
        CloseHelper.quietClose((AutoCloseable)this.streamsLayout);
        CloseHelper.quietClose((AutoCloseable)this.metricsLayout);
        CloseHelper.quietClose((AutoCloseable)this.bufferPoolLayout);
        this.debitorsByIndex.forEach((k, v) -> CloseHelper.quietClose((AutoCloseable)v));
        CloseHelper.quietClose((AutoCloseable)this.creditor);
        if (acquiredBuffers != 0 || acquiredCreditors != 0 || acquiredDebitors != 0L) {
            throw new IllegalStateException(String.format("Some resources not released: %d buffers, %d creditors, %d debitors", acquiredBuffers, acquiredCreditors, acquiredDebitors));
        }
    }

    public String toString() {
        return this.agentName;
    }

    public CompletableFuture<Void> attach(Namespace namespace) {
        NamespaceTask attachTask = this.configuration.attach(namespace);
        this.taskQueue.offer(attachTask);
        this.signaler.signalNow(0L, 0L, 1);
        return attachTask.future();
    }

    public CompletableFuture<Void> detach(Namespace namespace) {
        NamespaceTask detachTask = this.configuration.detach(namespace);
        this.taskQueue.offer(detachTask);
        this.signaler.signalNow(0L, 0L, 1);
        return detachTask.future();
    }

    public AgentRunner runner() {
        return this.runner;
    }

    public long counter(String name) {
        LongSupplier counter = this.counters.readonlyCounter(name);
        return counter != null ? counter.getAsLong() : 0L;
    }

    private AtomicCounter supplyAtomicCounter(String name) {
        return this.countersByName.computeIfAbsent(name, this.counters::counter);
    }

    private void onSystemMessage(int msgTypeId, DirectBuffer buffer, int index, int length) {
        switch (msgTypeId) {
            case 5: {
                FlushFW flush = this.flushRO.wrap(buffer, index, index + length);
                this.onSystemFlush(flush);
                break;
            }
            case 0x40000002: {
                WindowFW window = this.windowRO.wrap(buffer, index, index + length);
                this.onSystemWindow(window);
                break;
            }
            case 0x40000003: {
                SignalFW signal = this.signalRO.wrap(buffer, index, index + length);
                this.onSystemSignal(signal);
            }
        }
    }

    private void onSystemFlush(FlushFW flush) {
        long traceId = flush.traceId();
        long budgetId = flush.budgetId();
        int ownerIndex = BudgetId.ownerIndex(budgetId);
        DefaultBudgetDebitor debitor = (DefaultBudgetDebitor)this.debitorsByIndex.get(ownerIndex);
        if (ReaktorConfiguration.DEBUG_BUDGETS) {
            System.out.format("[%d] [0x%016x] [0x%016x] FLUSH %08x %s\n", System.nanoTime(), traceId, budgetId, ownerIndex, debitor);
        }
        if (debitor != null) {
            debitor.flush(traceId, budgetId);
        }
    }

    private void onSystemWindow(WindowFW window) {
        long traceId = window.traceId();
        long budgetId = window.budgetId();
        int reserved = window.maximum();
        this.creditor.creditById(traceId, budgetId, reserved);
        long parentBudgetId = this.creditor.parentBudgetId(budgetId);
        if (parentBudgetId != 0L) {
            this.doSystemWindowIfNecessary(traceId, parentBudgetId, reserved);
        }
    }

    private void onSystemSignal(SignalFW signal) {
        int signalId = signal.signalId();
        switch (signalId) {
            case 1: {
                this.taskQueue.poll().run();
            }
        }
    }

    private void doSystemFlush(long traceId, long budgetId, long watchers) {
        for (int watcherIndex = 0; watcherIndex < 64; ++watcherIndex) {
            if ((watchers & 1L << watcherIndex) == 0L) continue;
            if (ReaktorConfiguration.DEBUG_BUDGETS) {
                System.out.format("[%d] [0x%016x] [0x%016x] flush %d\n", System.nanoTime(), traceId, budgetId, watcherIndex);
            }
            MessageConsumer writer = this.supplyWriter(watcherIndex);
            FlushFW flush = this.flushRW.wrap(this.writeBuffer, 0, this.writeBuffer.capacity()).routeId(0L).streamId(0L).sequence(0L).acknowledge(0L).maximum(0).traceId(traceId).budgetId(budgetId).reserved(0).build();
            writer.accept(flush.typeId(), flush.buffer(), flush.offset(), flush.sizeof());
        }
    }

    private void doSystemWindow(long traceId, long budgetId, int reserved) {
        if (ReaktorConfiguration.DEBUG_BUDGETS) {
            System.out.format("[%d] [0x%016x] [0x%016x] doSystemWindow credit=%d \n", System.nanoTime(), traceId, budgetId, reserved);
        }
        int targetIndex = BudgetId.ownerIndex(budgetId);
        MessageConsumer writer = this.supplyWriter(targetIndex);
        WindowFW window = this.windowRW.wrap(this.writeBuffer, 0, this.writeBuffer.capacity()).routeId(0L).streamId(0L).sequence(0L).acknowledge(0L).maximum(reserved).traceId(traceId).budgetId(budgetId).padding(0).build();
        writer.accept(window.typeId(), window.buffer(), window.offset(), window.sizeof());
    }

    private boolean handleExpire(TimeUnit timeUnit, long now, long timerId) {
        Runnable task = (Runnable)this.tasksByTimerId.remove(timerId);
        if (task != null) {
            task.run();
        }
        return true;
    }

    private void handleRead(int msgTypeId, MutableDirectBuffer buffer, int index, int length) {
        FrameFW frame = this.frameRO.wrap((DirectBuffer)buffer, index, index + length);
        long streamId = frame.streamId();
        long routeId = frame.routeId();
        long sequence = frame.sequence();
        long acknowledge = frame.acknowledge();
        int maximum = frame.maximum();
        this.lastReadStreamId = streamId;
        if (streamId == 0L) {
            this.onSystemMessage(msgTypeId, (DirectBuffer)buffer, index, length);
        } else if (StreamId.isInitial(streamId)) {
            this.handleReadInitial(routeId, streamId, sequence, acknowledge, maximum, msgTypeId, buffer, index, length);
        } else {
            this.handleReadReply(routeId, streamId, sequence, acknowledge, maximum, msgTypeId, buffer, index, length);
        }
    }

    private void handleReadInitial(long routeId, long streamId, long sequence, long acknowledge, int maximum, int msgTypeId, MutableDirectBuffer buffer, int index, int length) {
        int instanceId = StreamId.instanceId(streamId);
        if ((msgTypeId & 0x40000000) == 0) {
            Int2ObjectHashMap<MessageConsumer> dispatcher = this.streams[StreamId.streamIndex(streamId)];
            MessageConsumer handler = (MessageConsumer)dispatcher.get(instanceId);
            if (handler != null) {
                switch (msgTypeId) {
                    case 1: {
                        handler.accept(msgTypeId, (DirectBuffer)buffer, index, length);
                        break;
                    }
                    case 2: {
                        handler.accept(msgTypeId, (DirectBuffer)buffer, index, length);
                        break;
                    }
                    case 3: {
                        handler.accept(msgTypeId, (DirectBuffer)buffer, index, length);
                        dispatcher.remove(instanceId);
                        break;
                    }
                    case 4: {
                        handler.accept(msgTypeId, (DirectBuffer)buffer, index, length);
                        dispatcher.remove(instanceId);
                        break;
                    }
                    case 5: {
                        handler.accept(msgTypeId, (DirectBuffer)buffer, index, length);
                        break;
                    }
                    default: {
                        this.doReset(routeId, streamId, sequence, acknowledge, maximum);
                        break;
                    }
                }
            } else {
                this.handleDefaultReadInitial(msgTypeId, buffer, index, length);
            }
        } else {
            Int2ObjectHashMap<MessageConsumer> dispatcher = this.throttles[StreamId.throttleIndex(streamId)];
            MessageConsumer throttle = (MessageConsumer)dispatcher.get(instanceId);
            if (throttle != null) {
                ReadCounters counters = (ReadCounters)this.countersByRouteId.computeIfAbsent(routeId, this.newReadCounters);
                switch (msgTypeId) {
                    case 0x40000002: {
                        counters.windows.increment();
                        throttle.accept(msgTypeId, (DirectBuffer)buffer, index, length);
                        break;
                    }
                    case 0x40000001: {
                        counters.resets.increment();
                        throttle.accept(msgTypeId, (DirectBuffer)buffer, index, length);
                        dispatcher.remove(instanceId);
                        break;
                    }
                    case 0x40000003: {
                        SignalFW signal = this.signalRO.wrap((DirectBuffer)buffer, index, index + length);
                        long cancelId = signal.cancelId();
                        if (cancelId != -1L) {
                            this.futuresById.remove(cancelId);
                        }
                        throttle.accept(msgTypeId, (DirectBuffer)buffer, index, length);
                        break;
                    }
                    case 0x40000004: {
                        throttle.accept(msgTypeId, (DirectBuffer)buffer, index, length);
                        break;
                    }
                }
            } else {
                switch (msgTypeId) {
                    case 0x40000003: {
                        SignalFW signal = this.signalRO.wrap((DirectBuffer)buffer, index, index + length);
                        long cancelId = signal.cancelId();
                        if (cancelId == -1L) break;
                        this.futuresById.remove(cancelId);
                    }
                }
            }
        }
    }

    private void handleDefaultReadInitial(int msgTypeId, MutableDirectBuffer buffer, int index, int length) {
        switch (msgTypeId) {
            case 1: {
                MessageConsumer newHandler = this.handleBeginInitial(msgTypeId, (DirectBuffer)buffer, index, length);
                if (newHandler != null) {
                    newHandler.accept(msgTypeId, (DirectBuffer)buffer, index, length);
                    break;
                }
                FrameFW frame = this.frameRO.wrap((DirectBuffer)buffer, index, index + length);
                long streamId = frame.streamId();
                long routeId = frame.routeId();
                long sequence = frame.sequence();
                long acknowledge = frame.acknowledge();
                int maximum = frame.maximum();
                this.doReset(routeId, streamId, sequence, acknowledge, maximum);
                break;
            }
            case 2: {
                this.handleDroppedReadData(msgTypeId, (DirectBuffer)buffer, index, length);
            }
        }
    }

    private void handleDroppedReadFrame(int msgTypeId, DirectBuffer buffer, int index, int length) {
        switch (msgTypeId) {
            case 2: {
                this.handleDroppedReadData(msgTypeId, buffer, index, length);
            }
        }
    }

    private void handleDroppedReadData(int msgTypeId, DirectBuffer buffer, int index, int length) {
        assert (msgTypeId == 2);
        DataFW data = this.dataRO.wrap(buffer, index, index + length);
        long traceId = data.traceId();
        long budgetId = data.budgetId();
        int reserved = data.reserved();
        this.doSystemWindowIfNecessary(traceId, budgetId, reserved);
    }

    private void doSystemWindowIfNecessary(long traceId, long budgetId, int reserved) {
        if (budgetId != 0L && reserved > 0) {
            this.doSystemWindow(traceId, budgetId, reserved);
        }
    }

    private void handleReadReply(long routeId, long streamId, long sequence, long acknowledge, int maximum, int msgTypeId, MutableDirectBuffer buffer, int index, int length) {
        int instanceId = StreamId.instanceId(streamId);
        if ((msgTypeId & 0x40000000) == 0) {
            Int2ObjectHashMap<MessageConsumer> dispatcher = this.streams[StreamId.streamIndex(streamId)];
            MessageConsumer handler = (MessageConsumer)dispatcher.get(instanceId);
            if (handler != null) {
                ReadCounters counters = (ReadCounters)this.countersByRouteId.computeIfAbsent(routeId, this.newReadCounters);
                switch (msgTypeId) {
                    case 1: {
                        counters.opens.increment();
                        handler.accept(msgTypeId, (DirectBuffer)buffer, index, length);
                        break;
                    }
                    case 2: {
                        counters.frames.increment();
                        counters.bytes.getAndAdd((long)buffer.getInt(index + 73));
                        handler.accept(msgTypeId, (DirectBuffer)buffer, index, length);
                        break;
                    }
                    case 3: {
                        counters.closes.increment();
                        handler.accept(msgTypeId, (DirectBuffer)buffer, index, length);
                        dispatcher.remove(instanceId);
                        break;
                    }
                    case 4: {
                        counters.aborts.increment();
                        handler.accept(msgTypeId, (DirectBuffer)buffer, index, length);
                        dispatcher.remove(instanceId);
                        break;
                    }
                    case 5: {
                        handler.accept(msgTypeId, (DirectBuffer)buffer, index, length);
                        break;
                    }
                    default: {
                        this.doReset(routeId, streamId, sequence, acknowledge, maximum);
                        break;
                    }
                }
            } else {
                this.handleDefaultReadReply(msgTypeId, buffer, index, length);
            }
        } else {
            Int2ObjectHashMap<MessageConsumer> dispatcher = this.throttles[StreamId.throttleIndex(streamId)];
            MessageConsumer throttle = (MessageConsumer)dispatcher.get(instanceId);
            if (throttle != null) {
                switch (msgTypeId) {
                    case 0x40000002: {
                        throttle.accept(msgTypeId, (DirectBuffer)buffer, index, length);
                        break;
                    }
                    case 0x40000001: {
                        throttle.accept(msgTypeId, (DirectBuffer)buffer, index, length);
                        dispatcher.remove(instanceId);
                        break;
                    }
                    case 0x40000003: {
                        SignalFW signal = this.signalRO.wrap((DirectBuffer)buffer, index, index + length);
                        long cancelId = signal.cancelId();
                        if (cancelId != -1L) {
                            this.futuresById.remove(cancelId);
                        }
                        throttle.accept(msgTypeId, (DirectBuffer)buffer, index, length);
                        break;
                    }
                    case 0x40000004: {
                        throttle.accept(msgTypeId, (DirectBuffer)buffer, index, length);
                        break;
                    }
                }
            } else {
                switch (msgTypeId) {
                    case 0x40000003: {
                        SignalFW signal = this.signalRO.wrap((DirectBuffer)buffer, index, index + length);
                        long cancelId = signal.cancelId();
                        if (cancelId == -1L) break;
                        this.futuresById.remove(cancelId);
                    }
                }
            }
        }
    }

    private void handleDefaultReadReply(int msgTypeId, MutableDirectBuffer buffer, int index, int length) {
        if (msgTypeId == 1) {
            FrameFW frame = this.frameRO.wrap((DirectBuffer)buffer, index, index + length);
            long routeId = frame.routeId();
            long streamId = frame.streamId();
            long sequence = frame.sequence();
            long acknowledge = frame.acknowledge();
            int maximum = frame.maximum();
            MessageConsumer newHandler = this.handleBeginReply(msgTypeId, (DirectBuffer)buffer, index, length);
            if (newHandler != null) {
                ReadCounters counters = (ReadCounters)this.countersByRouteId.computeIfAbsent(routeId, this.newReadCounters);
                counters.opens.increment();
                newHandler.accept(msgTypeId, (DirectBuffer)buffer, index, length);
            } else {
                this.doReset(routeId, streamId, sequence, acknowledge, maximum);
            }
        } else if (msgTypeId == 2) {
            this.handleDroppedReadData(msgTypeId, (DirectBuffer)buffer, index, length);
        }
    }

    private MessageConsumer handleBeginInitial(int msgTypeId, DirectBuffer buffer, int index, int length) {
        MessageConsumer replyTo;
        StreamFactory streamFactory;
        BeginFW begin = this.beginRO.wrap(buffer, index, index + length);
        long routeId = begin.routeId();
        long initialId = begin.streamId();
        MessageConsumer newStream = null;
        BindingContext binding = this.configuration.resolveBinding(routeId);
        StreamFactory streamFactory2 = streamFactory = binding != null ? binding.streamFactory() : null;
        if (streamFactory != null && (newStream = streamFactory.newStream(msgTypeId, buffer, index, length, replyTo = this.supplyReplyTo(initialId))) != null) {
            long replyId = this.supplyReplyId(initialId);
            this.streams[StreamId.streamIndex(initialId)].put(StreamId.instanceId(initialId), (Object)newStream);
            this.throttles[StreamId.throttleIndex(replyId)].put(StreamId.instanceId(replyId), (Object)newStream);
        }
        return newStream;
    }

    private MessageConsumer handleBeginReply(int msgTypeId, DirectBuffer buffer, int index, int length) {
        BeginFW begin = this.beginRO.wrap(buffer, index, index + length);
        long streamId = begin.streamId();
        MessageConsumer newStream = null;
        newStream = (MessageConsumer)this.correlations.remove(streamId);
        if (newStream != null) {
            this.streams[StreamId.streamIndex(streamId)].put(StreamId.instanceId(streamId), (Object)newStream);
        }
        return newStream;
    }

    private void doReset(long routeId, long streamId, long sequence, long acknowledge, int maximum) {
        ResetFW reset = this.resetRW.wrap(this.writeBuffer, 0, this.writeBuffer.capacity()).routeId(routeId).streamId(streamId).sequence(sequence).acknowledge(acknowledge).maximum(maximum).build();
        MessageConsumer replyTo = this.supplyReplyTo(streamId);
        replyTo.accept(reset.typeId(), reset.buffer(), reset.offset(), reset.sizeof());
    }

    private void doSyntheticAbort(long streamId, MessageConsumer stream) {
        long syntheticAbortRouteId = 0L;
        AbortFW abort = this.abortRW.wrap(this.writeBuffer, 0, this.writeBuffer.capacity()).routeId(0L).streamId(streamId).sequence(Long.MAX_VALUE).acknowledge(0L).maximum(0).build();
        stream.accept(abort.typeId(), abort.buffer(), abort.offset(), abort.sizeof());
    }

    private MessageConsumer supplyReplyTo(long streamId) {
        int index = StreamId.streamIndex(streamId);
        return (MessageConsumer)this.writersByIndex.computeIfAbsent(index, this.supplyWriter);
    }

    private MessageConsumer newStream(int msgTypeId, DirectBuffer buffer, int index, int length, MessageConsumer sender) {
        FrameFW frame = this.frameRO.wrap(buffer, index, length);
        long streamId = frame.streamId();
        assert (StreamId.isInitial(streamId));
        this.throttles[StreamId.throttleIndex(streamId)].put(StreamId.instanceId(streamId), (Object)sender);
        long replyId = this.supplyReplyId(streamId);
        this.correlations.put(replyId, (Object)sender);
        return this.supplyReceiver(streamId);
    }

    @Override
    public MessageConsumer supplyReceiver(long streamId) {
        int remoteIndex = StreamId.remoteIndex(streamId);
        return (MessageConsumer)this.writersByIndex.computeIfAbsent(remoteIndex, this.supplyWriter);
    }

    private MessageConsumer supplyWriter(int index) {
        return this.supplyTarget(index).writeHandler();
    }

    private Target supplyTarget(int index) {
        return (Target)this.targetsByIndex.computeIfAbsent(index, this.newTarget);
    }

    private Target newTarget(int index) {
        return new Target(this.config, index, this.writeBuffer, this.correlations, this.streams, this.throttles, this.newWriteCounters);
    }

    private ReadCounters newReadCounters(long routeId) {
        int namespaceId = NamespacedId.namespaceId(routeId);
        int bindingId = NamespacedId.localId(routeId);
        String namespace = this.labels.lookupLabel(namespaceId);
        String binding = this.labels.lookupLabel(bindingId);
        return new ReadCounters(this.counters, namespace, binding);
    }

    private WriteCounters newWriteCounters(long routeId) {
        int namespaceId = NamespacedId.namespaceId(routeId);
        int bindingId = NamespacedId.localId(routeId);
        String namespace = this.labels.lookupLabel(namespaceId);
        String binding = this.labels.lookupLabel(bindingId);
        return new WriteCounters(this.counters, namespace, binding);
    }

    private DefaultBudgetDebitor newBudgetDebitor(int ownerIndex) {
        BudgetsLayout layout = new BudgetsLayout.Builder().path(this.config.directory().resolve(String.format("budgets%d", ownerIndex))).owner(false).build();
        return new DefaultBudgetDebitor(this.localIndex, ownerIndex, layout);
    }

    private int resolveRemoteIndex(long routeId) {
        Affinity affinity = this.supplyAffinity(routeId);
        BitSet mask = affinity.mask;
        int remoteIndex = affinity.nextIndex;
        assert (mask.cardinality() != 0);
        if (remoteIndex != this.localIndex) {
            int nextIndex = affinity.mask.nextSetBit(remoteIndex + 1);
            if (nextIndex == -1) {
                nextIndex = affinity.mask.nextSetBit(0);
            }
            affinity.nextIndex = nextIndex;
        }
        return remoteIndex;
    }

    private Affinity supplyAffinity(long routeId) {
        return (Affinity)this.affinityByRouteId.computeIfAbsent(routeId, this.resolveAffinity);
    }

    public Affinity resolveAffinity(long routeId) {
        BitSet mask = this.affinityMask.apply(routeId);
        if (mask.cardinality() == 0) {
            int namespaceId = NamespacedId.namespaceId(routeId);
            int bindingId = NamespacedId.localId(routeId);
            String namespace = this.labels.lookupLabel(namespaceId);
            String binding = this.labels.lookupLabel(bindingId);
            throw new IllegalStateException(String.format("affinity mask must specify at least one bit: %s.%s %d", namespace, binding, mask));
        }
        Affinity affinity = new Affinity();
        affinity.mask = mask;
        affinity.nextIndex = mask.get(this.localIndex) ? this.localIndex : mask.nextSetBit(0);
        return affinity;
    }

    private static SignalFW.Builder newSignalRW() {
        UnsafeBuffer buffer = new UnsafeBuffer(new byte[512]);
        return new SignalFW.Builder().wrap((MutableDirectBuffer)buffer, 0, buffer.capacity());
    }

    private Int2ObjectHashMap<MessageConsumer>[] initDispatcher() {
        Int2ObjectHashMap[] dispatcher = new Int2ObjectHashMap[64];
        for (int i = 0; i < dispatcher.length; ++i) {
            dispatcher[i] = new Int2ObjectHashMap();
        }
        return dispatcher;
    }

    private final class ElektronSignaler
    implements Signaler {
        private final ThreadLocal<SignalFW.Builder> signalRW = ThreadLocal.withInitial(() -> DispatchAgent.newSignalRW());
        private final ExecutorService executorService;
        private long nextFutureId;

        private ElektronSignaler(ExecutorService executorService) {
            this.executorService = executorService;
        }

        public void executeTaskAt(long timeMillis, Runnable task) {
            long timerId = DispatchAgent.this.timerWheel.scheduleTimer(timeMillis);
            Runnable oldTask = (Runnable)DispatchAgent.this.tasksByTimerId.put(timerId, (Object)task);
            assert (oldTask == null);
            assert (timerId >= 0L);
        }

        @Override
        public long signalAt(long timeMillis, int signalId, IntConsumer handler) {
            long timerId = DispatchAgent.this.timerWheel.scheduleTimer(timeMillis);
            Runnable task = () -> handler.accept(signalId);
            Runnable oldTask = (Runnable)DispatchAgent.this.tasksByTimerId.put(timerId, (Object)task);
            assert (oldTask == null);
            assert (timerId >= 0L);
            return timerId;
        }

        @Override
        public long signalAt(long timeMillis, long routeId, long streamId, int signalId) {
            long timerId = DispatchAgent.this.timerWheel.scheduleTimer(timeMillis);
            Runnable task = () -> this.signal(routeId, streamId, 0L, 0L, -1L, signalId);
            Runnable oldTask = (Runnable)DispatchAgent.this.tasksByTimerId.put(timerId, (Object)task);
            assert (oldTask == null);
            assert (timerId >= 0L);
            return timerId;
        }

        @Override
        public long signalTask(Runnable task, long routeId, long streamId, int signalId) {
            long cancelId;
            if (this.executorService != null) {
                this.nextFutureId = this.nextFutureId + 1L & Long.MAX_VALUE;
                long newFutureId = this.nextFutureId << 1 | 0x8000000000000001L;
                assert (newFutureId != -1L);
                Future<?> newFuture = this.executorService.submit(() -> this.invokeAndSignal(task, routeId, streamId, 0L, 0L, newFutureId, signalId));
                Future oldFuture = (Future)DispatchAgent.this.futuresById.put(newFutureId, newFuture);
                assert (oldFuture == null);
                cancelId = newFutureId;
            } else {
                cancelId = -1L;
                this.invokeAndSignal(task, routeId, streamId, 0L, 0L, cancelId, signalId);
            }
            assert (cancelId < 0L);
            return cancelId;
        }

        @Override
        public void signalNow(long routeId, long streamId, int signalId) {
            this.signal(routeId, streamId, 0L, 0L, -1L, signalId);
        }

        @Override
        public boolean cancel(long cancelId) {
            boolean cancelled = false;
            if (cancelId > 0L) {
                long timerId = cancelId;
                cancelled = DispatchAgent.this.timerWheel.cancelTimer(timerId);
                DispatchAgent.this.tasksByTimerId.remove(timerId);
            } else if (cancelId != -1L) {
                long futureId = cancelId;
                Future future = (Future)DispatchAgent.this.futuresById.remove(futureId);
                cancelled = future != null && future.cancel(true);
            }
            return cancelled;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        private void invokeAndSignal(Runnable task, long routeId, long streamId, long sequence, long acknowledge, long cancelId, int signalId) {
            try {
                task.run();
            }
            finally {
                this.signal(routeId, streamId, sequence, acknowledge, cancelId, signalId);
            }
        }

        private void signal(long routeId, long streamId, long sequence, long acknowledge, long cancelId, int signalId) {
            long timestamp = DispatchAgent.this.timestamps ? System.nanoTime() : 0L;
            SignalFW signal = this.signalRW.get().rewrap().routeId(routeId).streamId(streamId).sequence(sequence).acknowledge(acknowledge).maximum(0).timestamp(timestamp).traceId(DispatchAgent.this.supplyTraceId()).cancelId(cancelId).signalId(signalId).build();
            DispatchAgent.this.streamsBuffer.write(signal.typeId(), signal.buffer(), signal.offset(), signal.sizeof());
        }
    }

    private static final class ReadCounters {
        private final AtomicCounter opens;
        private final AtomicCounter closes;
        private final AtomicCounter aborts;
        private final AtomicCounter windows;
        private final AtomicCounter resets;
        private final AtomicCounter bytes;
        private final AtomicCounter frames;

        ReadCounters(Counters counters, String namespace, String binding) {
            this.opens = counters.counter(String.format("%s.%s.opens.read", namespace, binding));
            this.closes = counters.counter(String.format("%s.%s.closes.read", namespace, binding));
            this.aborts = counters.counter(String.format("%s.%s.aborts.read", namespace, binding));
            this.windows = counters.counter(String.format("%s.%s.windows.read", namespace, binding));
            this.resets = counters.counter(String.format("%s.%s.resets.read", namespace, binding));
            this.bytes = counters.counter(String.format("%s.%s.bytes.read", namespace, binding));
            this.frames = counters.counter(String.format("%s.%s.frames.read", namespace, binding));
        }
    }

    private static class Affinity {
        BitSet mask;
        int nextIndex;

        private Affinity() {
        }
    }
}

