/*
 * Decompiled with CFR 0.152.
 */
package io.aeron.driver;

import io.aeron.ArrayListUtil;
import io.aeron.ErrorCode;
import io.aeron.command.CorrelatedMessageFlyweight;
import io.aeron.command.PublicationMessageFlyweight;
import io.aeron.command.RemoveMessageFlyweight;
import io.aeron.command.SubscriptionMessageFlyweight;
import io.aeron.driver.AeronClient;
import io.aeron.driver.ClientProxy;
import io.aeron.driver.Configuration;
import io.aeron.driver.CongestionControl;
import io.aeron.driver.DataPacketDispatcher;
import io.aeron.driver.DirectPublication;
import io.aeron.driver.DriverConductorProxy;
import io.aeron.driver.DriverManagedResource;
import io.aeron.driver.FlowControl;
import io.aeron.driver.MediaDriver;
import io.aeron.driver.NetworkPublication;
import io.aeron.driver.NetworkPublicationThreadLocals;
import io.aeron.driver.PublicationImage;
import io.aeron.driver.PublicationLink;
import io.aeron.driver.ReceiverProxy;
import io.aeron.driver.RetransmitHandler;
import io.aeron.driver.SenderProxy;
import io.aeron.driver.SubscriberPosition;
import io.aeron.driver.SubscriptionLink;
import io.aeron.driver.buffer.RawLog;
import io.aeron.driver.buffer.RawLogFactory;
import io.aeron.driver.cmd.DriverConductorCmd;
import io.aeron.driver.exceptions.ControlProtocolException;
import io.aeron.driver.media.ReceiveChannelEndpoint;
import io.aeron.driver.media.SendChannelEndpoint;
import io.aeron.driver.media.UdpChannel;
import io.aeron.driver.media.UdpChannelTransport;
import io.aeron.driver.status.PublisherLimit;
import io.aeron.driver.status.ReceiveChannelStatus;
import io.aeron.driver.status.ReceiverHwm;
import io.aeron.driver.status.ReceiverPos;
import io.aeron.driver.status.SendChannelStatus;
import io.aeron.driver.status.SenderLimit;
import io.aeron.driver.status.SenderPos;
import io.aeron.driver.status.SubscriberPos;
import io.aeron.driver.status.SystemCounterDescriptor;
import io.aeron.driver.uri.AeronUri;
import io.aeron.logbuffer.FrameDescriptor;
import io.aeron.logbuffer.LogBufferDescriptor;
import io.aeron.protocol.DataHeaderFlyweight;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import org.agrona.BitUtil;
import org.agrona.DirectBuffer;
import org.agrona.MutableDirectBuffer;
import org.agrona.concurrent.Agent;
import org.agrona.concurrent.EpochClock;
import org.agrona.concurrent.MessageHandler;
import org.agrona.concurrent.NanoClock;
import org.agrona.concurrent.OneToOneConcurrentArrayQueue;
import org.agrona.concurrent.UnsafeBuffer;
import org.agrona.concurrent.errors.DistinctErrorLog;
import org.agrona.concurrent.ringbuffer.RingBuffer;
import org.agrona.concurrent.status.AtomicCounter;
import org.agrona.concurrent.status.CountersManager;
import org.agrona.concurrent.status.Position;
import org.agrona.concurrent.status.ReadablePosition;
import org.agrona.concurrent.status.UnsafeBufferPosition;

public class DriverConductor
implements Agent {
    private final long imageLivenessTimeoutNs;
    private final long clientLivenessTimeoutNs;
    private final long publicationUnblockTimeoutNs;
    private long timeOfLastToDriverPositionChange;
    private long lastConsumerCommandPosition;
    private long timeOfLastTimeoutCheck;
    private int nextSessionId = BitUtil.generateRandomisedId();
    private final NetworkPublicationThreadLocals networkPublicationThreadLocals = new NetworkPublicationThreadLocals();
    private final MediaDriver.Context context;
    private final RawLogFactory rawLogFactory;
    private final ReceiverProxy receiverProxy;
    private final SenderProxy senderProxy;
    private final ClientProxy clientProxy;
    private final DriverConductorProxy fromReceiverConductorProxy;
    private final RingBuffer toDriverCommands;
    private final OneToOneConcurrentArrayQueue<DriverConductorCmd> fromReceiverDriverConductorCmdQueue;
    private final OneToOneConcurrentArrayQueue<DriverConductorCmd> fromSenderDriverConductorCmdQueue;
    private final HashMap<String, SendChannelEndpoint> sendChannelEndpointByChannelMap = new HashMap();
    private final HashMap<String, ReceiveChannelEndpoint> receiveChannelEndpointByChannelMap = new HashMap();
    private final ArrayList<PublicationLink> publicationLinks = new ArrayList();
    private final ArrayList<NetworkPublication> networkPublications = new ArrayList();
    private final ArrayList<SubscriptionLink> subscriptionLinks = new ArrayList();
    private final ArrayList<PublicationImage> publicationImages = new ArrayList();
    private final ArrayList<AeronClient> clients = new ArrayList();
    private final ArrayList<DirectPublication> directPublications = new ArrayList();
    private final PublicationMessageFlyweight publicationMsgFlyweight = new PublicationMessageFlyweight();
    private final SubscriptionMessageFlyweight subscriptionMsgFlyweight = new SubscriptionMessageFlyweight();
    private final CorrelatedMessageFlyweight correlatedMsgFlyweight = new CorrelatedMessageFlyweight();
    private final RemoveMessageFlyweight removeMsgFlyweight = new RemoveMessageFlyweight();
    private final EpochClock epochClock;
    private final NanoClock nanoClock;
    private final DistinctErrorLog errorLog;
    private final Consumer<DriverConductorCmd> onDriverConductorCmdFunc = this::onDriverConductorCmd;
    private final MessageHandler onClientCommandFunc = this::onClientCommand;
    private final CountersManager countersManager;
    private final AtomicCounter clientKeepAlives;
    private final AtomicCounter errors;

    public DriverConductor(MediaDriver.Context ctx) {
        long now;
        this.context = ctx;
        this.imageLivenessTimeoutNs = ctx.imageLivenessTimeoutNs();
        this.clientLivenessTimeoutNs = ctx.clientLivenessTimeoutNs();
        this.publicationUnblockTimeoutNs = ctx.publicationUnblockTimeoutNs();
        this.fromReceiverDriverConductorCmdQueue = ctx.toConductorFromReceiverCommandQueue();
        this.fromSenderDriverConductorCmdQueue = ctx.toConductorFromSenderCommandQueue();
        this.receiverProxy = ctx.receiverProxy();
        this.senderProxy = ctx.senderProxy();
        this.rawLogFactory = ctx.rawLogBuffersFactory();
        this.epochClock = ctx.epochClock();
        this.nanoClock = ctx.nanoClock();
        this.toDriverCommands = ctx.toDriverCommands();
        this.clientProxy = ctx.clientProxy();
        this.fromReceiverConductorProxy = ctx.fromReceiverDriverConductorProxy();
        this.errorLog = ctx.errorLog();
        this.countersManager = this.context.countersManager();
        this.clientKeepAlives = this.context.systemCounters().get(SystemCounterDescriptor.CLIENT_KEEP_ALIVES);
        this.errors = this.context.systemCounters().get(SystemCounterDescriptor.ERRORS);
        this.toDriverCommands.consumerHeartbeatTime(this.epochClock.time());
        this.timeOfLastTimeoutCheck = now = this.nanoClock.nanoTime();
        this.timeOfLastToDriverPositionChange = now;
        this.lastConsumerCommandPosition = this.toDriverCommands.consumerPosition();
    }

    public void onClose() {
        this.networkPublications.forEach(NetworkPublication::close);
        this.publicationImages.forEach(PublicationImage::close);
        this.directPublications.forEach(DirectPublication::close);
        this.sendChannelEndpointByChannelMap.values().forEach(UdpChannelTransport::close);
        this.receiveChannelEndpointByChannelMap.values().forEach(ReceiveChannelEndpoint::close);
    }

    public String roleName() {
        return "driver-conductor";
    }

    SendChannelEndpoint senderChannelEndpoint(UdpChannel channel) {
        return this.sendChannelEndpointByChannelMap.get(channel.canonicalForm());
    }

    ReceiveChannelEndpoint receiverChannelEndpoint(UdpChannel channel) {
        return this.receiveChannelEndpointByChannelMap.get(channel.canonicalForm());
    }

    DirectPublication getDirectPublication(long streamId) {
        return DriverConductor.findDirectPublication(this.directPublications, streamId);
    }

    public int doWork() throws Exception {
        int workCount = 0;
        workCount += this.toDriverCommands.read(this.onClientCommandFunc);
        workCount += this.fromReceiverDriverConductorCmdQueue.drain(this.onDriverConductorCmdFunc);
        workCount += this.fromSenderDriverConductorCmdQueue.drain(this.onDriverConductorCmdFunc);
        long now = this.nanoClock.nanoTime();
        workCount += this.processTimers(now);
        ArrayList<PublicationImage> publicationImages = this.publicationImages;
        int size = publicationImages.size();
        for (int i = 0; i < size; ++i) {
            publicationImages.get(i).trackRebuild(now, this.context.statusMessageTimeout());
        }
        ArrayList<NetworkPublication> networkPublications = this.networkPublications;
        int size2 = networkPublications.size();
        for (int i = 0; i < size2; ++i) {
            workCount += networkPublications.get(i).updatePublishersLimit();
        }
        ArrayList<DirectPublication> directPublications = this.directPublications;
        int size3 = directPublications.size();
        for (int i = 0; i < size3; ++i) {
            workCount += directPublications.get(i).updatePublishersLimit(this.toDriverCommands.consumerHeartbeatTime());
        }
        return workCount;
    }

    public void onCreatePublicationImage(int sessionId, int streamId, int initialTermId, int activeTermId, int initialTermOffset, int termBufferLength, int senderMtuLength, InetSocketAddress controlAddress, InetSocketAddress sourceAddress, ReceiveChannelEndpoint channelEndpoint) {
        channelEndpoint.validateSenderMtuLength(senderMtuLength);
        channelEndpoint.validateWindowMaxLength(this.context.initialWindowLength());
        UdpChannel udpChannel = channelEndpoint.udpChannel();
        String channel = udpChannel.originalUriString();
        long registrationId = this.nextImageCorrelationId();
        long joiningPosition = LogBufferDescriptor.computePosition((int)activeTermId, (int)initialTermOffset, (int)Integer.numberOfTrailingZeros(termBufferLength), (int)initialTermId);
        List<SubscriberPosition> subscriberPositions = this.createSubscriberPositions(sessionId, streamId, channelEndpoint, joiningPosition);
        if (subscriberPositions.size() > 0) {
            RawLog rawLog = this.newPublicationImageLog(sessionId, streamId, initialTermId, termBufferLength, senderMtuLength, udpChannel, registrationId);
            CongestionControl congestionControl = this.context.congestionControlSupplier().newInstance(registrationId, udpChannel, streamId, sessionId, termBufferLength, senderMtuLength, this.nanoClock, this.context, this.countersManager);
            PublicationImage image = new PublicationImage(registrationId, this.imageLivenessTimeoutNs, channelEndpoint, controlAddress, sessionId, streamId, initialTermId, activeTermId, initialTermOffset, rawLog, udpChannel.isMulticast() ? Configuration.NAK_MULTICAST_DELAY_GENERATOR : Configuration.NAK_UNICAST_DELAY_GENERATOR, subscriberPositions.stream().map(SubscriberPosition::position).collect(Collectors.toList()), (Position)ReceiverHwm.allocate(this.countersManager, registrationId, sessionId, streamId, channel), (Position)ReceiverPos.allocate(this.countersManager, registrationId, sessionId, streamId, channel), this.nanoClock, this.context.epochClock(), this.context.systemCounters(), sourceAddress, congestionControl, this.context.lossReport(), subscriberPositions.get(0).subscription().isReliable());
            subscriberPositions.forEach(subscriberPosition -> subscriberPosition.addImage(image));
            this.publicationImages.add(image);
            this.receiverProxy.newPublicationImage(channelEndpoint, image);
            this.clientProxy.onAvailableImage(registrationId, streamId, sessionId, rawLog.fileName(), subscriberPositions, DriverConductor.generateSourceIdentity(sourceAddress));
        }
    }

    public void onClosePublication(NetworkPublication publication) {
        publication.close();
    }

    void cleanupPublication(NetworkPublication publication) {
        if (publication.hasSpies()) {
            this.clientProxy.onUnavailableImage(LogBufferDescriptor.correlationId((UnsafeBuffer)publication.rawLog().metaData()), publication.streamId(), publication.sendChannelEndpoint().originalUriString());
            this.subscriptionLinks.stream().filter(link -> link.matches(publication)).forEach(SubscriptionLink::removeSpiedPublication);
        }
        this.senderProxy.removeNetworkPublication(publication);
        SendChannelEndpoint channelEndpoint = publication.sendChannelEndpoint();
        if (0 == channelEndpoint.sessionCount() && !channelEndpoint.isStatusIndicatorClosed()) {
            channelEndpoint.closeStatusIndicator();
            this.sendChannelEndpointByChannelMap.remove(channelEndpoint.udpChannel().canonicalForm());
            this.senderProxy.closeSendChannelEndpoint(channelEndpoint);
        }
    }

    void cleanupSubscriptionLink(SubscriptionLink link) {
        ReceiveChannelEndpoint channelEndpoint = link.channelEndpoint();
        if (null != channelEndpoint) {
            int streamId = link.streamId();
            if (0 == channelEndpoint.decRefToStream(link.streamId())) {
                this.receiverProxy.removeSubscription(channelEndpoint, streamId);
            }
            if (0 == channelEndpoint.streamCount() && !channelEndpoint.isStatusIndicatorClosed()) {
                channelEndpoint.closeStatusIndicator();
                this.receiveChannelEndpointByChannelMap.remove(channelEndpoint.udpChannel().canonicalForm());
                this.receiverProxy.closeReceiveChannelEndpoint(channelEndpoint);
            }
        }
    }

    void transitionToLinger(PublicationImage image) {
        this.clientProxy.onUnavailableImage(image.correlationId(), image.streamId(), image.channelUriString());
        this.receiverProxy.removeCoolDown(image.channelEndpoint(), image.sessionId(), image.streamId());
    }

    void cleanupImage(PublicationImage image) {
        int size = this.subscriptionLinks.size();
        for (int i = 0; i < size; ++i) {
            SubscriptionLink link = this.subscriptionLinks.get(i);
            if (!image.matches(link.channelEndpoint(), link.streamId())) continue;
            link.removeImage(image);
        }
    }

    private List<SubscriberPosition> createSubscriberPositions(int sessionId, int streamId, ReceiveChannelEndpoint channelEndpoint, long joiningPosition) {
        ArrayList<SubscriberPosition> subscriberPositions = new ArrayList<SubscriberPosition>();
        int size = this.subscriptionLinks.size();
        for (int i = 0; i < size; ++i) {
            SubscriptionLink link = this.subscriptionLinks.get(i);
            if (!link.matches(channelEndpoint, streamId)) continue;
            UnsafeBufferPosition position = SubscriberPos.allocate(this.countersManager, link.registrationId(), sessionId, streamId, link.channelUri(), joiningPosition);
            position.setOrdered(joiningPosition);
            subscriberPositions.add(new SubscriberPosition(link, (Position)position));
        }
        return subscriberPositions;
    }

    private <T extends DriverManagedResource> void onCheckManagedResources(ArrayList<T> list, long time) {
        int lastIndex;
        for (int i = lastIndex = list.size() - 1; i >= 0; --i) {
            DriverManagedResource resource = (DriverManagedResource)list.get(i);
            resource.onTimeEvent(time, this);
            if (!resource.hasReachedEndOfLife()) continue;
            ArrayListUtil.fastUnorderedRemove(list, (int)i, (int)lastIndex);
            --lastIndex;
            resource.delete();
        }
    }

    private void onHeartbeatCheckTimeouts(long nanoTimeNow) {
        this.toDriverCommands.consumerHeartbeatTime(this.epochClock.time());
        this.onCheckManagedResources(this.clients, nanoTimeNow);
        this.onCheckManagedResources(this.publicationLinks, nanoTimeNow);
        this.onCheckManagedResources(this.networkPublications, nanoTimeNow);
        this.onCheckManagedResources(this.subscriptionLinks, nanoTimeNow);
        this.onCheckManagedResources(this.publicationImages, nanoTimeNow);
        this.onCheckManagedResources(this.directPublications, nanoTimeNow);
    }

    private void onCheckForBlockedToDriverCommands(long nanoTimeNow) {
        long consumerPosition = this.toDriverCommands.consumerPosition();
        if (consumerPosition == this.lastConsumerCommandPosition) {
            if (this.toDriverCommands.producerPosition() > consumerPosition && nanoTimeNow > this.timeOfLastToDriverPositionChange + this.clientLivenessTimeoutNs && this.toDriverCommands.unblock()) {
                this.context.systemCounters().get(SystemCounterDescriptor.UNBLOCKED_COMMANDS).orderedIncrement();
            }
        } else {
            this.timeOfLastToDriverPositionChange = nanoTimeNow;
            this.lastConsumerCommandPosition = consumerPosition;
        }
    }

    private void onClientCommand(int msgTypeId, MutableDirectBuffer buffer, int index, int length) {
        long correlationId = 0L;
        try {
            switch (msgTypeId) {
                case 1: {
                    PublicationMessageFlyweight publicationMessageFlyweight = this.publicationMsgFlyweight;
                    publicationMessageFlyweight.wrap(buffer, index);
                    correlationId = publicationMessageFlyweight.correlationId();
                    int streamId = publicationMessageFlyweight.streamId();
                    long clientId = publicationMessageFlyweight.clientId();
                    String channel = publicationMessageFlyweight.channel();
                    if (channel.startsWith("aeron:ipc")) {
                        this.onAddDirectPublication(channel, streamId, correlationId, clientId);
                        break;
                    }
                    this.onAddNetworkPublication(channel, streamId, correlationId, clientId);
                    break;
                }
                case 2: {
                    RemoveMessageFlyweight removeMessageFlyweight = this.removeMsgFlyweight;
                    removeMessageFlyweight.wrap(buffer, index);
                    correlationId = removeMessageFlyweight.correlationId();
                    this.onRemovePublication(removeMessageFlyweight.registrationId(), correlationId);
                    break;
                }
                case 4: {
                    SubscriptionMessageFlyweight subscriptionMessageFlyweight = this.subscriptionMsgFlyweight;
                    subscriptionMessageFlyweight.wrap(buffer, index);
                    correlationId = subscriptionMessageFlyweight.correlationId();
                    int streamId = subscriptionMessageFlyweight.streamId();
                    long clientId = subscriptionMessageFlyweight.clientId();
                    String channel = subscriptionMessageFlyweight.channel();
                    if (channel.startsWith("aeron:ipc")) {
                        this.onAddDirectSubscription(channel, streamId, correlationId, clientId);
                        break;
                    }
                    if (channel.startsWith("aeron-spy:")) {
                        this.onAddSpySubscription(channel.substring("aeron-spy:".length()), streamId, correlationId, clientId);
                        break;
                    }
                    this.onAddNetworkSubscription(channel, streamId, correlationId, clientId);
                    break;
                }
                case 5: {
                    RemoveMessageFlyweight removeMessageFlyweight = this.removeMsgFlyweight;
                    removeMessageFlyweight.wrap(buffer, index);
                    correlationId = removeMessageFlyweight.correlationId();
                    this.onRemoveSubscription(removeMessageFlyweight.registrationId(), correlationId);
                    break;
                }
                case 6: {
                    CorrelatedMessageFlyweight correlatedMessageFlyweight = this.correlatedMsgFlyweight;
                    correlatedMessageFlyweight.wrap(buffer, index);
                    correlationId = correlatedMessageFlyweight.correlationId();
                    this.onClientKeepalive(correlatedMessageFlyweight.clientId());
                    break;
                }
            }
        }
        catch (ControlProtocolException ex) {
            this.clientProxy.onError(ex.errorCode(), ex.getMessage(), correlationId);
            this.recordError(ex);
        }
        catch (Exception ex) {
            this.clientProxy.onError(ErrorCode.GENERIC_ERROR, ex.getMessage(), correlationId);
            this.recordError(ex);
        }
    }

    private void recordError(Exception ex) {
        this.errors.increment();
        this.errorLog.record((Throwable)ex);
    }

    private int processTimers(long now) {
        int workCount = 0;
        if (now > this.timeOfLastTimeoutCheck + Configuration.HEARTBEAT_TIMEOUT_NS) {
            this.onHeartbeatCheckTimeouts(now);
            this.onCheckForBlockedToDriverCommands(now);
            this.timeOfLastTimeoutCheck = now;
            workCount = 1;
        }
        return workCount;
    }

    private void onAddNetworkPublication(String channel, int streamId, long registrationId, long clientId) {
        UdpChannel udpChannel = UdpChannel.parse(channel);
        SendChannelEndpoint channelEndpoint = this.getOrCreateSendChannelEndpoint(udpChannel);
        NetworkPublication publication = channelEndpoint.getPublication(streamId);
        if (null == publication) {
            int termLength = DriverConductor.getTermBufferLength(udpChannel.aeronUri(), this.context.publicationTermBufferLength());
            int sessionId = this.nextSessionId++;
            int initialTermId = BitUtil.generateRandomisedId();
            RetransmitHandler retransmitHandler = new RetransmitHandler(this.nanoClock, this.context.systemCounters(), Configuration.RETRANSMIT_UNICAST_DELAY_GENERATOR, Configuration.RETRANSMIT_UNICAST_LINGER_GENERATOR);
            FlowControl flowControl = udpChannel.isMulticast() ? this.context.multicastFlowControlSupplier().newInstance(udpChannel, streamId, registrationId) : this.context.unicastFlowControlSupplier().newInstance(udpChannel, streamId, registrationId);
            publication = new NetworkPublication(channelEndpoint, this.nanoClock, () -> ((RingBuffer)this.toDriverCommands).consumerHeartbeatTime(), this.newNetworkPublicationLog(sessionId, streamId, initialTermId, udpChannel, registrationId, termLength), (Position)PublisherLimit.allocate(this.countersManager, registrationId, sessionId, streamId, channel), (Position)SenderPos.allocate(this.countersManager, registrationId, sessionId, streamId, channel), (Position)SenderLimit.allocate(this.countersManager, registrationId, sessionId, streamId, channel), sessionId, streamId, initialTermId, this.context.mtuLength(), this.context.systemCounters(), flowControl, retransmitHandler, this.networkPublicationThreadLocals);
            channelEndpoint.addPublication(publication);
            this.networkPublications.add(publication);
            this.senderProxy.newNetworkPublication(publication);
            this.linkSpies(publication);
        }
        this.linkPublication(registrationId, publication, this.getOrAddClient(clientId));
        this.clientProxy.onPublicationReady(registrationId, streamId, publication.sessionId(), publication.rawLog().fileName(), publication.publisherLimitId());
    }

    private void linkSpies(NetworkPublication publication) {
        ArrayList<SubscriptionLink> links = this.subscriptionLinks;
        int size = links.size();
        for (int i = 0; i < size; ++i) {
            SubscriptionLink subscription = links.get(i);
            if (!subscription.matches(publication)) continue;
            this.linkSpy(publication, subscription);
        }
    }

    private static int getTermBufferLength(AeronUri aeronUri, int defaultTermLength) {
        String termLengthParam = aeronUri.get("term-length");
        int termLength = defaultTermLength;
        if (null != termLengthParam) {
            termLength = Integer.parseInt(termLengthParam);
            Configuration.validateTermBufferLength(termLength);
        }
        return termLength;
    }

    private void onAddDirectPublication(String channel, int streamId, long registrationId, long clientId) {
        DirectPublication directPublication = this.getOrAddDirectPublication(streamId, channel);
        this.linkPublication(registrationId, directPublication, this.getOrAddClient(clientId));
        this.clientProxy.onPublicationReady(registrationId, streamId, directPublication.sessionId(), directPublication.rawLog().fileName(), directPublication.publisherLimitId());
    }

    private void linkPublication(long registrationId, DriverManagedResource publication, AeronClient client) {
        if (null != DriverConductor.findPublicationLink(this.publicationLinks, registrationId)) {
            throw new ControlProtocolException(ErrorCode.GENERIC_ERROR, "registration id already in use.");
        }
        this.publicationLinks.add(new PublicationLink(registrationId, publication, client, this.nanoClock.nanoTime(), this.publicationUnblockTimeoutNs, this.context.systemCounters()));
    }

    private RawLog newNetworkPublicationLog(int sessionId, int streamId, int initialTermId, UdpChannel udpChannel, long registrationId, int termBufferLength) {
        RawLog rawLog = this.rawLogFactory.newNetworkPublication(udpChannel.canonicalForm(), sessionId, streamId, registrationId, termBufferLength);
        UnsafeBuffer logMetaData = rawLog.metaData();
        LogBufferDescriptor.storeDefaultFrameHeader((UnsafeBuffer)logMetaData, (DirectBuffer)DataHeaderFlyweight.createDefaultHeader((int)sessionId, (int)streamId, (int)initialTermId));
        LogBufferDescriptor.initialiseTailWithTermId((UnsafeBuffer)logMetaData, (int)0, (int)initialTermId);
        LogBufferDescriptor.initialTermId((UnsafeBuffer)logMetaData, (int)initialTermId);
        LogBufferDescriptor.mtuLength((UnsafeBuffer)logMetaData, (int)this.context.mtuLength());
        LogBufferDescriptor.correlationId((UnsafeBuffer)logMetaData, (long)registrationId);
        LogBufferDescriptor.timeOfLastStatusMessage((UnsafeBuffer)logMetaData, (long)0L);
        return rawLog;
    }

    private RawLog newPublicationImageLog(int sessionId, int streamId, int initialTermId, int termBufferLength, int senderMtuLength, UdpChannel udpChannel, long correlationId) {
        RawLog rawLog = this.rawLogFactory.newNetworkedImage(udpChannel.canonicalForm(), sessionId, streamId, correlationId, termBufferLength);
        UnsafeBuffer logMetaData = rawLog.metaData();
        LogBufferDescriptor.storeDefaultFrameHeader((UnsafeBuffer)logMetaData, (DirectBuffer)DataHeaderFlyweight.createDefaultHeader((int)sessionId, (int)streamId, (int)initialTermId));
        LogBufferDescriptor.initialTermId((UnsafeBuffer)logMetaData, (int)initialTermId);
        LogBufferDescriptor.mtuLength((UnsafeBuffer)logMetaData, (int)senderMtuLength);
        LogBufferDescriptor.correlationId((UnsafeBuffer)logMetaData, (long)correlationId);
        LogBufferDescriptor.timeOfLastStatusMessage((UnsafeBuffer)logMetaData, (long)0L);
        return rawLog;
    }

    private RawLog newDirectPublicationLog(int termBufferLength, int sessionId, int streamId, int initialTermId, long registrationId) {
        RawLog rawLog = this.rawLogFactory.newDirectPublication(sessionId, streamId, registrationId, termBufferLength);
        UnsafeBuffer logMetaData = rawLog.metaData();
        LogBufferDescriptor.storeDefaultFrameHeader((UnsafeBuffer)logMetaData, (DirectBuffer)DataHeaderFlyweight.createDefaultHeader((int)sessionId, (int)streamId, (int)initialTermId));
        LogBufferDescriptor.initialiseTailWithTermId((UnsafeBuffer)logMetaData, (int)0, (int)initialTermId);
        LogBufferDescriptor.initialTermId((UnsafeBuffer)logMetaData, (int)initialTermId);
        LogBufferDescriptor.mtuLength((UnsafeBuffer)logMetaData, (int)FrameDescriptor.computeMaxMessageLength((int)termBufferLength));
        LogBufferDescriptor.correlationId((UnsafeBuffer)logMetaData, (long)registrationId);
        LogBufferDescriptor.timeOfLastStatusMessage((UnsafeBuffer)logMetaData, (long)0L);
        return rawLog;
    }

    private SendChannelEndpoint getOrCreateSendChannelEndpoint(UdpChannel udpChannel) {
        SendChannelEndpoint channelEndpoint = this.sendChannelEndpointByChannelMap.get(udpChannel.canonicalForm());
        if (null == channelEndpoint) {
            channelEndpoint = this.context.sendChannelEndpointSupplier().newInstance(udpChannel, SendChannelStatus.allocate(this.countersManager, udpChannel.originalUriString()), this.context);
            this.sendChannelEndpointByChannelMap.put(udpChannel.canonicalForm(), channelEndpoint);
            this.senderProxy.registerSendChannelEndpoint(channelEndpoint);
        }
        return channelEndpoint;
    }

    private void onRemovePublication(long registrationId, long correlationId) {
        PublicationLink publicationLink = null;
        ArrayList<PublicationLink> publicationLinks = this.publicationLinks;
        int size = publicationLinks.size();
        int lastIndex = size - 1;
        for (int i = 0; i < size; ++i) {
            PublicationLink link = publicationLinks.get(i);
            if (registrationId != link.registrationId()) continue;
            publicationLink = link;
            ArrayListUtil.fastUnorderedRemove(publicationLinks, (int)i, (int)lastIndex);
            break;
        }
        if (null == publicationLink) {
            throw new ControlProtocolException(ErrorCode.UNKNOWN_PUBLICATION, "Unknown publication: " + registrationId);
        }
        publicationLink.close();
        this.clientProxy.operationSucceeded(correlationId);
    }

    private void onAddNetworkSubscription(String channel, int streamId, long registrationId, long clientId) {
        UdpChannel udpChannel = UdpChannel.parse(channel);
        String reliableParam = udpChannel.aeronUri().get("reliable", "true");
        boolean isReliable = !"false".equals(reliableParam);
        this.checkForClashingSubscription(isReliable, udpChannel, streamId);
        ReceiveChannelEndpoint channelEndpoint = this.getOrCreateReceiveChannelEndpoint(udpChannel);
        int refCount = channelEndpoint.incRefToStream(streamId);
        if (1 == refCount) {
            this.receiverProxy.addSubscription(channelEndpoint, streamId);
        }
        AeronClient client = this.getOrAddClient(clientId);
        SubscriptionLink subscription = new SubscriptionLink(registrationId, channelEndpoint, streamId, channel, client, this.context.clientLivenessTimeoutNs(), isReliable);
        this.subscriptionLinks.add(subscription);
        this.clientProxy.operationSucceeded(registrationId);
        this.linkMatchingImages(streamId, registrationId, channelEndpoint, subscription);
    }

    private void checkForClashingSubscription(boolean isReliable, UdpChannel udpChannel, int streamId) {
        ReceiveChannelEndpoint channelEndpoint = this.receiveChannelEndpointByChannelMap.get(udpChannel.canonicalForm());
        if (null == channelEndpoint) {
            return;
        }
        ArrayList<SubscriptionLink> existingLinks = this.subscriptionLinks;
        int size = existingLinks.size();
        for (int i = 0; i < size; ++i) {
            SubscriptionLink link = existingLinks.get(i);
            if (!link.matches(channelEndpoint, streamId) || isReliable == link.isReliable()) continue;
            throw new IllegalStateException("Settings do not match existing subscriptions: reliable=" + isReliable);
        }
    }

    private void linkMatchingImages(int streamId, long registrationId, ReceiveChannelEndpoint channelEndpoint, SubscriptionLink subscription) {
        int size = this.publicationImages.size();
        for (int i = 0; i < size; ++i) {
            PublicationImage image = this.publicationImages.get(i);
            if (!image.matches(channelEndpoint, streamId) || !image.isAcceptingSubscriptions()) continue;
            long rebuildPosition = image.rebuildPosition();
            int sessionId = image.sessionId();
            UnsafeBufferPosition position = SubscriberPos.allocate(this.countersManager, registrationId, sessionId, streamId, subscription.channelUri(), rebuildPosition);
            position.setOrdered(rebuildPosition);
            image.addSubscriber((ReadablePosition)position);
            subscription.addImage(image, (ReadablePosition)position);
            this.clientProxy.onAvailableImage(image.correlationId(), streamId, sessionId, image.rawLog().fileName(), Collections.singletonList(new SubscriberPosition(subscription, (Position)position)), DriverConductor.generateSourceIdentity(image.sourceAddress()));
        }
    }

    private void onAddDirectSubscription(String channel, int streamId, long registrationId, long clientId) {
        DirectPublication publication = this.getOrAddDirectPublication(streamId, channel);
        AeronClient client = this.getOrAddClient(clientId);
        long joiningPosition = publication.joiningPosition();
        int sessionId = publication.sessionId();
        UnsafeBufferPosition position = SubscriberPos.allocate(this.countersManager, registrationId, sessionId, streamId, channel, joiningPosition);
        position.setOrdered(joiningPosition);
        SubscriptionLink subscriptionLink = new SubscriptionLink(registrationId, streamId, channel, publication, (ReadablePosition)position, client, this.context.clientLivenessTimeoutNs());
        this.subscriptionLinks.add(subscriptionLink);
        publication.addSubscription((ReadablePosition)position);
        this.clientProxy.operationSucceeded(registrationId);
        this.clientProxy.onAvailableImage(publication.correlationId(), streamId, sessionId, publication.rawLog().fileName(), Collections.singletonList(new SubscriberPosition(subscriptionLink, (Position)position)), channel);
    }

    private void onAddSpySubscription(String channel, int streamId, long registrationId, long clientId) {
        NetworkPublication publication;
        UdpChannel udpChannel = UdpChannel.parse(channel);
        AeronClient client = this.getOrAddClient(clientId);
        SubscriptionLink subscriptionLink = new SubscriptionLink(registrationId, udpChannel, streamId, channel, client, this.context.clientLivenessTimeoutNs());
        this.subscriptionLinks.add(subscriptionLink);
        this.clientProxy.operationSucceeded(registrationId);
        SendChannelEndpoint channelEndpoint = this.senderChannelEndpoint(udpChannel);
        NetworkPublication networkPublication = publication = null == channelEndpoint ? null : channelEndpoint.getPublication(streamId);
        if (null != publication) {
            this.linkSpy(publication, subscriptionLink);
        }
    }

    private void linkSpy(NetworkPublication publication, SubscriptionLink link) {
        long spyJoiningPosition = publication.spyJoiningPosition();
        int streamId = publication.streamId();
        int sessionId = publication.sessionId();
        UnsafeBufferPosition position = SubscriberPos.allocate(this.countersManager, link.registrationId(), sessionId, streamId, link.channelUri(), spyJoiningPosition);
        position.setOrdered(spyJoiningPosition);
        publication.addSpyPosition((ReadablePosition)position);
        link.addSpiedPublication(publication, (ReadablePosition)position);
        this.clientProxy.onAvailableImage(LogBufferDescriptor.correlationId((UnsafeBuffer)publication.rawLog().metaData()), streamId, sessionId, publication.rawLog().fileName(), Collections.singletonList(new SubscriberPosition(link, (Position)position)), link.channelUri());
    }

    private ReceiveChannelEndpoint getOrCreateReceiveChannelEndpoint(UdpChannel udpChannel) {
        ReceiveChannelEndpoint channelEndpoint = this.receiveChannelEndpointByChannelMap.get(udpChannel.canonicalForm());
        if (null == channelEndpoint) {
            channelEndpoint = this.context.receiveChannelEndpointSupplier().newInstance(udpChannel, new DataPacketDispatcher(this.fromReceiverConductorProxy, this.receiverProxy.receiver()), ReceiveChannelStatus.allocate(this.countersManager, udpChannel.originalUriString()), this.context);
            this.receiveChannelEndpointByChannelMap.put(udpChannel.canonicalForm(), channelEndpoint);
            this.receiverProxy.registerReceiveChannelEndpoint(channelEndpoint);
        }
        return channelEndpoint;
    }

    private void onRemoveSubscription(long registrationId, long correlationId) {
        SubscriptionLink link = DriverConductor.removeSubscriptionLink(this.subscriptionLinks, registrationId);
        if (null == link) {
            throw new ControlProtocolException(ErrorCode.UNKNOWN_SUBSCRIPTION, "Unknown subscription link: " + registrationId);
        }
        link.close();
        ReceiveChannelEndpoint channelEndpoint = link.channelEndpoint();
        if (null != channelEndpoint) {
            int refCount = channelEndpoint.decRefToStream(link.streamId());
            if (0 == refCount) {
                this.receiverProxy.removeSubscription(channelEndpoint, link.streamId());
            }
            if (0 == channelEndpoint.streamCount() && !channelEndpoint.isStatusIndicatorClosed()) {
                channelEndpoint.closeStatusIndicator();
                this.receiveChannelEndpointByChannelMap.remove(channelEndpoint.udpChannel().canonicalForm());
                this.receiverProxy.closeReceiveChannelEndpoint(channelEndpoint);
                while (!channelEndpoint.isClosed()) {
                    Thread.yield();
                }
            }
        }
        this.clientProxy.operationSucceeded(correlationId);
    }

    private void onClientKeepalive(long clientId) {
        this.clientKeepAlives.addOrdered(1L);
        AeronClient client = DriverConductor.findClient(this.clients, clientId);
        if (null != client) {
            client.timeOfLastKeepalive(this.nanoClock.nanoTime());
        }
    }

    private void onDriverConductorCmd(DriverConductorCmd cmd) {
        cmd.execute(this);
    }

    private AeronClient getOrAddClient(long clientId) {
        AeronClient client = DriverConductor.findClient(this.clients, clientId);
        if (null == client) {
            client = new AeronClient(clientId, this.clientLivenessTimeoutNs, this.nanoClock.nanoTime());
            this.clients.add(client);
        }
        return client;
    }

    private DirectPublication getOrAddDirectPublication(int streamId, String channel) {
        DirectPublication publication = DriverConductor.findDirectPublication(this.directPublications, streamId);
        if (null == publication) {
            int termLength = DriverConductor.getTermBufferLength(AeronUri.parse(channel), this.context.ipcTermBufferLength());
            long registrationId = this.nextImageCorrelationId();
            int sessionId = this.nextSessionId++;
            int initialTermId = BitUtil.generateRandomisedId();
            RawLog rawLog = this.newDirectPublicationLog(termLength, sessionId, streamId, initialTermId, registrationId);
            UnsafeBufferPosition publisherLimit = PublisherLimit.allocate(this.countersManager, registrationId, sessionId, streamId, channel);
            publication = new DirectPublication(registrationId, sessionId, streamId, (Position)publisherLimit, rawLog);
            this.directPublications.add(publication);
        }
        return publication;
    }

    private long nextImageCorrelationId() {
        return this.toDriverCommands.nextCorrelationId();
    }

    private static AeronClient findClient(ArrayList<AeronClient> clients, long clientId) {
        AeronClient aeronClient = null;
        int size = clients.size();
        for (int i = 0; i < size; ++i) {
            AeronClient client = clients.get(i);
            if (client.clientId() != clientId) continue;
            aeronClient = client;
            break;
        }
        return aeronClient;
    }

    private static PublicationLink findPublicationLink(ArrayList<PublicationLink> publicationLinks, long registrationId) {
        PublicationLink publicationLink = null;
        int size = publicationLinks.size();
        for (int i = 0; i < size; ++i) {
            PublicationLink link = publicationLinks.get(i);
            if (registrationId != link.registrationId()) continue;
            publicationLink = link;
            break;
        }
        return publicationLink;
    }

    private static SubscriptionLink removeSubscriptionLink(ArrayList<SubscriptionLink> subscriptionLinks, long registrationId) {
        SubscriptionLink subscriptionLink = null;
        int size = subscriptionLinks.size();
        int lastIndex = size - 1;
        for (int i = 0; i < size; ++i) {
            SubscriptionLink link = subscriptionLinks.get(i);
            if (link.registrationId() != registrationId) continue;
            subscriptionLink = link;
            ArrayListUtil.fastUnorderedRemove(subscriptionLinks, (int)i, (int)lastIndex);
            break;
        }
        return subscriptionLink;
    }

    private static DirectPublication findDirectPublication(ArrayList<DirectPublication> directPublications, long streamId) {
        DirectPublication directPublication = null;
        int size = directPublications.size();
        for (int i = 0; i < size; ++i) {
            DirectPublication log = directPublications.get(i);
            if ((long)log.streamId() != streamId) continue;
            directPublication = log;
            break;
        }
        return directPublication;
    }

    private static String generateSourceIdentity(InetSocketAddress address) {
        return address.getHostString() + ':' + address.getPort();
    }
}

