/*
 * Decompiled with CFR 0.152.
 */
package io.aeron.driver;

import io.aeron.driver.CongestionControl;
import io.aeron.driver.DriverConductor;
import io.aeron.driver.DriverManagedResource;
import io.aeron.driver.FeedbackDelayGenerator;
import io.aeron.driver.LossDetector;
import io.aeron.driver.LossHandler;
import io.aeron.driver.MediaDriver;
import io.aeron.driver.PublicationImagePadding3;
import io.aeron.driver.Subscribable;
import io.aeron.driver.SubscriberPosition;
import io.aeron.driver.SubscriptionLink;
import io.aeron.driver.UntetheredSubscription;
import io.aeron.driver.buffer.RawLog;
import io.aeron.driver.media.ImageConnection;
import io.aeron.driver.media.ReceiveChannelEndpoint;
import io.aeron.driver.media.ReceiveDestinationTransport;
import io.aeron.driver.status.SystemCounterDescriptor;
import io.aeron.driver.status.SystemCounters;
import io.aeron.logbuffer.LogBufferDescriptor;
import io.aeron.logbuffer.TermGapFiller;
import io.aeron.logbuffer.TermRebuilder;
import io.aeron.protocol.DataHeaderFlyweight;
import io.aeron.protocol.RttMeasurementFlyweight;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
import org.agrona.CloseHelper;
import org.agrona.ErrorHandler;
import org.agrona.collections.ArrayListUtil;
import org.agrona.collections.ArrayUtil;
import org.agrona.concurrent.CachedNanoClock;
import org.agrona.concurrent.EpochClock;
import org.agrona.concurrent.MemoryAccess;
import org.agrona.concurrent.NanoClock;
import org.agrona.concurrent.UnsafeBuffer;
import org.agrona.concurrent.status.AtomicCounter;
import org.agrona.concurrent.status.Position;
import org.agrona.concurrent.status.ReadablePosition;

public final class PublicationImage
extends PublicationImagePadding3
implements LossHandler,
DriverManagedResource,
Subscribable {
    private static final long SM_EOS_MULTIPLE = 5L;
    private static final AtomicLongFieldUpdater<PublicationImage> BEGIN_SM_CHANGE_UPDATER = AtomicLongFieldUpdater.newUpdater(PublicationImage.class, "beginSmChange");
    private static final AtomicLongFieldUpdater<PublicationImage> END_SM_CHANGE_UPDATER = AtomicLongFieldUpdater.newUpdater(PublicationImage.class, "endSmChange");
    private volatile long beginSmChange = -1L;
    private volatile long endSmChange = -1L;
    private long nextSmPosition;
    private int nextSmReceiverWindowLength;
    private long lastSmChangeNumber = -1L;
    private long lastSmPosition;
    private long lastOverrunThreshold;
    private long timeOfLastSmNs;
    private final long smTimeoutNs;
    private final long maxReceiverWindowLength;
    private volatile long beginLossChange = -1L;
    private volatile long endLossChange = -1L;
    private int lossTermId;
    private int lossTermOffset;
    private int lossLength;
    private long lastLossChangeNumber = -1L;
    private volatile long timeOfLastStateChangeNs;
    private final long correlationId;
    private final long imageLivenessTimeoutNs;
    private final long untetheredWindowLimitTimeoutNs;
    private final long untetheredRestingTimeoutNs;
    private final int sessionId;
    private final int streamId;
    private final int positionBitsToShift;
    private final int termLengthMask;
    private final int initialTermId;
    private final boolean isReliable;
    private boolean isRebuilding = true;
    private volatile boolean isReceiverReleaseTriggered = false;
    private volatile boolean hasReceiverReleased = false;
    private volatile State state = State.INIT;
    private final CachedNanoClock cachedNanoClock;
    private final ReceiveChannelEndpoint channelEndpoint;
    private final UnsafeBuffer[] termBuffers;
    private final Position hwmPosition;
    private final LossDetector lossDetector;
    private final CongestionControl congestionControl;
    private final ErrorHandler errorHandler;
    private final Position rebuildPosition;
    private final InetSocketAddress sourceAddress;
    private final String sourceIdentity;
    private final AtomicCounter heartbeatsReceived;
    private final AtomicCounter statusMessagesSent;
    private final AtomicCounter nakMessagesSent;
    private final AtomicCounter flowControlUnderRuns;
    private final AtomicCounter flowControlOverRuns;
    private final AtomicCounter lossGapFills;
    private final EpochClock epochClock;
    private final NanoClock nanoClock;
    private final RawLog rawLog;

    PublicationImage(long correlationId, MediaDriver.Context ctx, ReceiveChannelEndpoint channelEndpoint, int transportIndex, InetSocketAddress controlAddress, int sessionId, int streamId, int initialTermId, int activeTermId, int initialTermOffset, RawLog rawLog, FeedbackDelayGenerator lossFeedbackDelayGenerator, ArrayList<SubscriberPosition> subscriberPositions, Position hwmPosition, Position rebuildPosition, InetSocketAddress sourceAddress, String sourceIdentity, CongestionControl congestionControl) {
        long position;
        long nowNs;
        this.correlationId = correlationId;
        this.imageLivenessTimeoutNs = ctx.imageLivenessTimeoutNs();
        this.untetheredWindowLimitTimeoutNs = ctx.untetheredWindowLimitTimeoutNs();
        this.untetheredRestingTimeoutNs = ctx.untetheredRestingTimeoutNs();
        this.smTimeoutNs = ctx.statusMessageTimeoutNs();
        this.channelEndpoint = channelEndpoint;
        this.sessionId = sessionId;
        this.streamId = streamId;
        this.rawLog = rawLog;
        this.hwmPosition = hwmPosition;
        this.rebuildPosition = rebuildPosition;
        this.sourceAddress = sourceAddress;
        this.sourceIdentity = sourceIdentity;
        this.initialTermId = initialTermId;
        this.congestionControl = congestionControl;
        this.errorHandler = ctx.errorHandler();
        this.lossReport = ctx.lossReport();
        this.nanoClock = ctx.nanoClock();
        this.epochClock = ctx.epochClock();
        this.cachedNanoClock = ctx.receiverCachedNanoClock();
        this.timeOfLastStateChangeNs = nowNs = this.cachedNanoClock.nanoTime();
        this.timeOfLastPacketNs = nowNs;
        this.subscriberPositions = this.positionArray(subscriberPositions, nowNs);
        this.isReliable = subscriberPositions.get(0).subscription().isReliable();
        SystemCounters systemCounters = ctx.systemCounters();
        this.heartbeatsReceived = systemCounters.get(SystemCounterDescriptor.HEARTBEATS_RECEIVED);
        this.statusMessagesSent = systemCounters.get(SystemCounterDescriptor.STATUS_MESSAGES_SENT);
        this.nakMessagesSent = systemCounters.get(SystemCounterDescriptor.NAK_MESSAGES_SENT);
        this.flowControlUnderRuns = systemCounters.get(SystemCounterDescriptor.FLOW_CONTROL_UNDER_RUNS);
        this.flowControlOverRuns = systemCounters.get(SystemCounterDescriptor.FLOW_CONTROL_OVER_RUNS);
        this.lossGapFills = systemCounters.get(SystemCounterDescriptor.LOSS_GAP_FILLS);
        this.imageConnections = (ImageConnection[])ArrayUtil.ensureCapacity((Object[])this.imageConnections, (int)(transportIndex + 1));
        this.imageConnections[transportIndex] = new ImageConnection(nowNs, controlAddress);
        this.termBuffers = rawLog.termBuffers();
        this.lossDetector = new LossDetector(lossFeedbackDelayGenerator, this);
        int termLength = rawLog.termLength();
        this.termLengthMask = termLength - 1;
        this.positionBitsToShift = LogBufferDescriptor.positionBitsToShift((int)termLength);
        this.nextSmReceiverWindowLength = congestionControl.initialWindowLength();
        this.maxReceiverWindowLength = congestionControl.maxWindowLength();
        this.nextSmPosition = position = LogBufferDescriptor.computePosition((int)activeTermId, (int)initialTermOffset, (int)this.positionBitsToShift, (int)initialTermId);
        this.lastSmPosition = position;
        this.lastOverrunThreshold = position + (long)this.nextSmReceiverWindowLength;
        this.cleanPosition = position;
        hwmPosition.setOrdered(position);
        rebuildPosition.setOrdered(position);
    }

    @Override
    public boolean free() {
        return this.rawLog.free();
    }

    @Override
    public void close() {
        CloseHelper.close((ErrorHandler)this.errorHandler, (AutoCloseable)this.hwmPosition);
        CloseHelper.close((ErrorHandler)this.errorHandler, (AutoCloseable)this.rebuildPosition);
        CloseHelper.closeAll((ErrorHandler)this.errorHandler, (AutoCloseable[])this.subscriberPositions);
        int size = this.untetheredSubscriptions.size();
        for (int i = 0; i < size; ++i) {
            UntetheredSubscription untetheredSubscription = (UntetheredSubscription)this.untetheredSubscriptions.get(i);
            if (UntetheredSubscription.State.RESTING != untetheredSubscription.state) continue;
            CloseHelper.close((ErrorHandler)this.errorHandler, (AutoCloseable)untetheredSubscription.position);
        }
        CloseHelper.close((ErrorHandler)this.errorHandler, (AutoCloseable)this.congestionControl);
    }

    public long correlationId() {
        return this.correlationId;
    }

    public int sessionId() {
        return this.sessionId;
    }

    public int streamId() {
        return this.streamId;
    }

    public String channel() {
        return this.channelEndpoint.originalUriString();
    }

    @Override
    public long subscribableRegistrationId() {
        return this.correlationId;
    }

    @Override
    public void addSubscriber(SubscriptionLink subscriptionLink, ReadablePosition subscriberPosition, long nowNs) {
        this.subscriberPositions = (ReadablePosition[])ArrayUtil.add((Object[])this.subscriberPositions, (Object)subscriberPosition);
        if (!subscriptionLink.isTether()) {
            this.untetheredSubscriptions.add(new UntetheredSubscription(subscriptionLink, subscriberPosition, nowNs));
        }
    }

    @Override
    public void removeSubscriber(SubscriptionLink subscriptionLink, ReadablePosition subscriberPosition) {
        this.subscriberPositions = (ReadablePosition[])ArrayUtil.remove((Object[])this.subscriberPositions, (Object)subscriberPosition);
        subscriberPosition.close();
        if (!subscriptionLink.isTether()) {
            int lastIndex;
            for (int i = lastIndex = this.untetheredSubscriptions.size() - 1; i >= 0; --i) {
                if (((UntetheredSubscription)this.untetheredSubscriptions.get((int)i)).subscriptionLink != subscriptionLink) continue;
                ArrayListUtil.fastUnorderedRemove((ArrayList)this.untetheredSubscriptions, (int)i, (int)lastIndex);
                break;
            }
        }
        if (this.subscriberPositions.length == 0) {
            this.isRebuilding = false;
        }
    }

    @Override
    public void onGapDetected(int termId, int termOffset, int length) {
        long changeNumber;
        this.beginLossChange = changeNumber = this.beginLossChange + 1L;
        this.lossTermId = termId;
        this.lossTermOffset = termOffset;
        this.lossLength = length;
        this.endLossChange = changeNumber;
        if (null != this.reportEntry) {
            this.reportEntry.recordObservation(length, this.epochClock.time());
        } else if (null != this.lossReport) {
            this.reportEntry = this.lossReport.createEntry(length, this.epochClock.time(), this.sessionId, this.streamId, this.channel(), this.sourceIdentity);
            if (null == this.reportEntry) {
                this.lossReport = null;
            }
        }
    }

    InetSocketAddress sourceAddress() {
        return this.sourceAddress;
    }

    String sourceIdentity() {
        return this.sourceIdentity;
    }

    ReceiveChannelEndpoint channelEndpoint() {
        return this.channelEndpoint;
    }

    void removeFromDispatcher() {
        this.channelEndpoint.dispatcher().removePublicationImage(this);
    }

    RawLog rawLog() {
        return this.rawLog;
    }

    void activate() {
        this.timeOfLastStateChangeNs = this.cachedNanoClock.nanoTime();
        this.state = State.ACTIVE;
    }

    void deactivate() {
        if (State.ACTIVE == this.state) {
            long nowNs = this.cachedNanoClock.nanoTime();
            this.isRebuilding = false;
            this.timeOfLastStateChangeNs = nowNs;
            if (!this.isSendingEosSm) {
                boolean bl = this.isSendingEosSm = !this.isEndOfStream || this.rebuildPosition.getVolatile() == this.hwmPosition.get();
            }
            if (this.isSendingEosSm) {
                this.timeOfLastSmNs = nowNs - this.smTimeoutNs - 1L;
            }
            this.state = State.DRAINING;
        }
    }

    void receiverRelease() {
        this.hasReceiverReleased = true;
    }

    void addDestination(int transportIndex, ReceiveDestinationTransport transport) {
        this.imageConnections = (ImageConnection[])ArrayUtil.ensureCapacity((Object[])this.imageConnections, (int)(transportIndex + 1));
        if (transport.isMulticast()) {
            this.imageConnections[transportIndex] = new ImageConnection(this.cachedNanoClock.nanoTime(), transport.udpChannel().remoteControl());
        } else if (transport.hasExplicitControl()) {
            this.imageConnections[transportIndex] = new ImageConnection(this.cachedNanoClock.nanoTime(), transport.explicitControlAddress());
        }
    }

    void removeDestination(int transportIndex) {
        this.imageConnections[transportIndex] = null;
        this.updateActiveTransportCount();
    }

    void addDestinationConnectionIfUnknown(int transportIndex, InetSocketAddress remoteAddress) {
        this.trackConnection(transportIndex, remoteAddress, this.cachedNanoClock.nanoTime());
    }

    int trackRebuild(long nowNs) {
        int workCount = 0;
        if (this.isRebuilding) {
            long hwmPosition = this.hwmPosition.getVolatile();
            long minSubscriberPosition = Long.MAX_VALUE;
            long maxSubscriberPosition = 0L;
            for (ReadablePosition subscriberPosition : this.subscriberPositions) {
                long position = subscriberPosition.getVolatile();
                minSubscriberPosition = Math.min(minSubscriberPosition, position);
                maxSubscriberPosition = Math.max(maxSubscriberPosition, position);
            }
            long rebuildPosition = Math.max(this.rebuildPosition.get(), maxSubscriberPosition);
            long scanOutcome = this.lossDetector.scan(this.termBuffers[LogBufferDescriptor.indexByPosition((long)rebuildPosition, (int)this.positionBitsToShift)], rebuildPosition, hwmPosition, nowNs, this.termLengthMask, this.positionBitsToShift, this.initialTermId);
            int rebuildTermOffset = (int)rebuildPosition & this.termLengthMask;
            long newRebuildPosition = rebuildPosition - (long)rebuildTermOffset + (long)LossDetector.rebuildOffset(scanOutcome);
            this.rebuildPosition.proposeMaxOrdered(newRebuildPosition);
            long ccOutcome = this.congestionControl.onTrackRebuild(nowNs, minSubscriberPosition, this.nextSmPosition, hwmPosition, rebuildPosition, newRebuildPosition, LossDetector.lossFound(scanOutcome));
            int windowLength = CongestionControl.receiverWindowLength(ccOutcome);
            int threshold = CongestionControl.threshold(windowLength);
            if (CongestionControl.shouldForceStatusMessage(ccOutcome) || minSubscriberPosition > this.nextSmPosition + (long)threshold || windowLength != this.nextSmReceiverWindowLength) {
                this.cleanBufferTo(minSubscriberPosition - (long)(this.termLengthMask + 1));
                this.scheduleStatusMessage(minSubscriberPosition, windowLength);
                ++workCount;
            }
        }
        return workCount;
    }

    int insertPacket(int termId, int termOffset, UnsafeBuffer buffer, int length, int transportIndex, InetSocketAddress srcAddress) {
        long proposedPosition;
        boolean isHeartbeat = DataHeaderFlyweight.isHeartbeat((UnsafeBuffer)buffer, (int)length);
        long packetPosition = LogBufferDescriptor.computePosition((int)termId, (int)termOffset, (int)this.positionBitsToShift, (int)this.initialTermId);
        long l = proposedPosition = isHeartbeat ? packetPosition : packetPosition + (long)length;
        if (!this.isFlowControlOverRun(proposedPosition)) {
            if (!this.isFlowControlUnderRun(packetPosition)) {
                long nowNs;
                this.timeOfLastPacketNs = nowNs = this.cachedNanoClock.nanoTime();
                this.trackConnection(transportIndex, srcAddress, nowNs);
                if (isHeartbeat) {
                    if (DataHeaderFlyweight.isEndOfStream((UnsafeBuffer)buffer) && !this.isEndOfStream && this.allEos(transportIndex)) {
                        LogBufferDescriptor.endOfStreamPosition((UnsafeBuffer)this.rawLog.metaData(), (long)proposedPosition);
                        this.isEndOfStream = true;
                    }
                    this.heartbeatsReceived.incrementOrdered();
                } else {
                    UnsafeBuffer termBuffer = this.termBuffers[LogBufferDescriptor.indexByPosition((long)packetPosition, (int)this.positionBitsToShift)];
                    TermRebuilder.insert((UnsafeBuffer)termBuffer, (int)termOffset, (UnsafeBuffer)buffer, (int)length);
                }
                this.hwmPosition.proposeMaxOrdered(proposedPosition);
            } else if (packetPosition >= this.lastSmPosition - this.maxReceiverWindowLength) {
                this.trackConnection(transportIndex, srcAddress, this.cachedNanoClock.nanoTime());
            }
        }
        return length;
    }

    boolean isConnected(long nowNs) {
        return this.timeOfLastPacketNs + this.imageLivenessTimeoutNs - nowNs >= 0L && !this.channelEndpoint.isClosed() && (!this.isEndOfStream || !this.isReceiverReleaseTriggered);
    }

    void checkEosForDrainTransition(long nowNs) {
        if (!this.isSendingEosSm && this.isEndOfStream && this.rebuildPosition.getVolatile() == this.hwmPosition.get() && State.ACTIVE == this.state) {
            this.isRebuilding = false;
            this.timeOfLastStateChangeNs = nowNs;
            this.isSendingEosSm = true;
            this.timeOfLastSmNs = nowNs - this.smTimeoutNs - 1L;
            this.state = State.DRAINING;
        }
    }

    int sendPendingStatusMessage(long nowNs) {
        int workCount = 0;
        long changeNumber = this.endSmChange;
        if (changeNumber != this.lastSmChangeNumber || this.timeOfLastSmNs + this.smTimeoutNs - nowNs < 0L) {
            long smPosition = this.nextSmPosition;
            int receiverWindowLength = this.nextSmReceiverWindowLength;
            MemoryAccess.acquireFence();
            if (changeNumber == this.beginSmChange) {
                int termId = LogBufferDescriptor.computeTermIdFromPosition((long)smPosition, (int)this.positionBitsToShift, (int)this.initialTermId);
                int termOffset = (int)smPosition & this.termLengthMask;
                short flags = this.isSendingEosSm ? (short)64 : 0;
                this.channelEndpoint.sendStatusMessage(this.imageConnections, this.sessionId, this.streamId, termId, termOffset, receiverWindowLength, flags);
                this.statusMessagesSent.incrementOrdered();
                this.lastSmPosition = smPosition;
                this.lastOverrunThreshold = smPosition + this.maxReceiverWindowLength;
                this.lastSmChangeNumber = changeNumber;
                this.timeOfLastSmNs = nowNs;
                this.updateActiveTransportCount();
            }
            workCount = 1;
        }
        return workCount;
    }

    int processPendingLoss() {
        int workCount = 0;
        long changeNumber = this.endLossChange;
        if (changeNumber != this.lastLossChangeNumber) {
            int termId = this.lossTermId;
            int termOffset = this.lossTermOffset;
            int length = this.lossLength;
            MemoryAccess.acquireFence();
            if (changeNumber == this.beginLossChange) {
                if (this.isReliable) {
                    this.channelEndpoint.sendNakMessage(this.imageConnections, this.sessionId, this.streamId, termId, termOffset, length);
                    this.nakMessagesSent.incrementOrdered();
                } else {
                    UnsafeBuffer termBuffer = this.termBuffers[LogBufferDescriptor.indexByTerm((int)this.initialTermId, (int)termId)];
                    if (TermGapFiller.tryFillGap((UnsafeBuffer)this.rawLog.metaData(), (UnsafeBuffer)termBuffer, (int)termId, (int)termOffset, (int)length)) {
                        this.lossGapFills.incrementOrdered();
                    }
                }
                this.lastLossChangeNumber = changeNumber;
            }
            workCount = 1;
        }
        return workCount;
    }

    int initiateAnyRttMeasurements(long nowNs) {
        int workCount = 0;
        if (this.congestionControl.shouldMeasureRtt(nowNs)) {
            long preciseTimeNs = this.nanoClock.nanoTime();
            this.channelEndpoint.sendRttMeasurement(this.imageConnections, this.sessionId, this.streamId, preciseTimeNs, 0L, true);
            this.congestionControl.onRttMeasurementSent(preciseTimeNs);
            workCount = 1;
        }
        return workCount;
    }

    void onRttMeasurement(RttMeasurementFlyweight header, int transportIndex, InetSocketAddress srcAddress) {
        long nowNs = this.nanoClock.nanoTime();
        long rttInNs = nowNs - header.echoTimestampNs() - header.receptionDelta();
        this.congestionControl.onRttMeasurement(nowNs, rttInNs, srcAddress);
    }

    boolean isAcceptingSubscriptions() {
        return this.subscriberPositions.length > 0 && (State.INIT == this.state || State.ACTIVE == this.state || State.DRAINING == this.state && !this.isDrained());
    }

    long joinPosition() {
        long position = this.rebuildPosition.get();
        for (ReadablePosition subscriberPosition : this.subscriberPositions) {
            position = Math.min(subscriberPosition.getVolatile(), position);
        }
        return position;
    }

    @Override
    public void onTimeEvent(long timeNs, long timesMs, DriverConductor conductor) {
        switch (this.state) {
            case ACTIVE: {
                this.checkUntetheredSubscriptions(timeNs, conductor);
                break;
            }
            case DRAINING: {
                if (!this.isDrained() || this.timeOfLastStateChangeNs + 5L * this.smTimeoutNs - timeNs >= 0L) break;
                conductor.transitionToLinger(this);
                this.channelEndpoint.decRefImages();
                conductor.tryCloseReceiveChannelEndpoint(this.channelEndpoint);
                this.timeOfLastStateChangeNs = timeNs;
                this.isReceiverReleaseTriggered = true;
                this.state = State.LINGER;
                break;
            }
            case LINGER: {
                if (!this.hasNoSubscribers() && this.timeOfLastStateChangeNs + this.imageLivenessTimeoutNs - timeNs >= 0L) break;
                conductor.cleanupImage(this);
                this.timeOfLastStateChangeNs = timeNs;
                this.state = State.DONE;
                break;
            }
        }
    }

    @Override
    public boolean hasReachedEndOfLife() {
        return this.hasReceiverReleased && State.DONE == this.state;
    }

    private boolean isDrained() {
        long rebuildPosition = this.rebuildPosition.get();
        for (ReadablePosition subscriberPosition : this.subscriberPositions) {
            if (subscriberPosition.getVolatile() >= rebuildPosition) continue;
            return false;
        }
        return true;
    }

    private boolean hasNoSubscribers() {
        return this.subscriberPositions.length == 0;
    }

    private boolean isFlowControlUnderRun(long packetPosition) {
        boolean isFlowControlUnderRun;
        boolean bl = isFlowControlUnderRun = packetPosition < this.lastSmPosition;
        if (isFlowControlUnderRun) {
            this.flowControlUnderRuns.incrementOrdered();
        }
        return isFlowControlUnderRun;
    }

    private boolean isFlowControlOverRun(long proposedPosition) {
        boolean isFlowControlOverRun;
        boolean bl = isFlowControlOverRun = proposedPosition > this.lastOverrunThreshold;
        if (isFlowControlOverRun) {
            this.flowControlOverRuns.incrementOrdered();
        }
        return isFlowControlOverRun;
    }

    private void cleanBufferTo(long position) {
        long cleanPosition = this.cleanPosition;
        if (position > cleanPosition) {
            int bytesForCleaning = (int)(position - cleanPosition);
            UnsafeBuffer dirtyTerm = this.termBuffers[LogBufferDescriptor.indexByPosition((long)cleanPosition, (int)this.positionBitsToShift)];
            int termOffset = (int)cleanPosition & this.termLengthMask;
            int length = Math.min(bytesForCleaning, dirtyTerm.capacity() - termOffset);
            dirtyTerm.setMemory(termOffset, length - 8, (byte)0);
            dirtyTerm.putLongOrdered(termOffset + (length - 8), 0L);
            this.cleanPosition = cleanPosition + (long)length;
        }
    }

    private void trackConnection(int transportIndex, InetSocketAddress srcAddress, long nowNs) {
        this.imageConnections = (ImageConnection[])ArrayUtil.ensureCapacity((Object[])this.imageConnections, (int)(transportIndex + 1));
        ImageConnection imageConnection = this.imageConnections[transportIndex];
        if (null == imageConnection) {
            this.imageConnections[transportIndex] = imageConnection = new ImageConnection(nowNs, srcAddress);
        }
        imageConnection.timeOfLastActivityNs = nowNs;
        imageConnection.timeOfLastFrameNs = nowNs;
    }

    private boolean allEos(int transportIndex) {
        this.imageConnections[transportIndex].isEos = true;
        int length = this.imageConnections.length;
        for (int i = 0; i < length; ++i) {
            ImageConnection imageConnection = this.imageConnections[i];
            if (null != imageConnection && !imageConnection.isEos) {
                return false;
            }
            if (null != imageConnection || !this.channelEndpoint.hasDestination(i)) continue;
            return false;
        }
        return true;
    }

    private void scheduleStatusMessage(long smPosition, int receiverWindowLength) {
        long changeNumber = this.beginSmChange + 1L;
        BEGIN_SM_CHANGE_UPDATER.lazySet(this, changeNumber);
        MemoryAccess.releaseFence();
        this.nextSmPosition = smPosition;
        this.nextSmReceiverWindowLength = receiverWindowLength;
        END_SM_CHANGE_UPDATER.lazySet(this, changeNumber);
    }

    private void checkUntetheredSubscriptions(long nowNs, DriverConductor conductor) {
        ArrayList untetheredSubscriptions = this.untetheredSubscriptions;
        int untetheredSubscriptionsSize = untetheredSubscriptions.size();
        if (untetheredSubscriptionsSize > 0) {
            int lastIndex;
            long untetheredWindowLimit = this.untetheredWindowLimit();
            for (int i = lastIndex = untetheredSubscriptionsSize - 1; i >= 0; --i) {
                UntetheredSubscription untethered = (UntetheredSubscription)untetheredSubscriptions.get(i);
                if (UntetheredSubscription.State.ACTIVE == untethered.state) {
                    if (untethered.position.getVolatile() > untetheredWindowLimit) {
                        untethered.timeOfLastUpdateNs = nowNs;
                        continue;
                    }
                    if (untethered.timeOfLastUpdateNs + this.untetheredWindowLimitTimeoutNs - nowNs > 0L) continue;
                    conductor.notifyUnavailableImageLink(this.correlationId, untethered.subscriptionLink);
                    untethered.state(UntetheredSubscription.State.LINGER, nowNs, this.streamId, this.sessionId);
                    continue;
                }
                if (UntetheredSubscription.State.LINGER == untethered.state) {
                    if (untethered.timeOfLastUpdateNs + this.untetheredWindowLimitTimeoutNs - nowNs > 0L) continue;
                    this.subscriberPositions = (ReadablePosition[])ArrayUtil.remove((Object[])this.subscriberPositions, (Object)untethered.position);
                    untethered.state(UntetheredSubscription.State.RESTING, nowNs, this.streamId, this.sessionId);
                    continue;
                }
                if (UntetheredSubscription.State.RESTING != untethered.state || untethered.timeOfLastUpdateNs + this.untetheredRestingTimeoutNs - nowNs > 0L) continue;
                this.subscriberPositions = (ReadablePosition[])ArrayUtil.add((Object[])this.subscriberPositions, (Object)untethered.position);
                conductor.notifyAvailableImageLink(this.correlationId, this.sessionId, untethered.subscriptionLink, untethered.position.id(), this.joinPosition(), this.rawLog.fileName(), this.sourceIdentity);
                untethered.state(UntetheredSubscription.State.ACTIVE, nowNs, this.streamId, this.sessionId);
            }
        }
    }

    private long untetheredWindowLimit() {
        int windowLength = this.nextSmReceiverWindowLength;
        long maxConsumerPosition = 0L;
        for (ReadablePosition subscriberPosition : this.subscriberPositions) {
            long position = subscriberPosition.getVolatile();
            if (position <= maxConsumerPosition) continue;
            maxConsumerPosition = position;
        }
        return maxConsumerPosition - (long)windowLength + (long)(windowLength >> 2);
    }

    private void updateActiveTransportCount() {
        long nowNs = this.cachedNanoClock.nanoTime();
        int activeTransportCount = 0;
        for (ImageConnection imageConnection : this.imageConnections) {
            if (null == imageConnection || imageConnection.timeOfLastFrameNs + this.imageLivenessTimeoutNs - nowNs <= 0L) continue;
            ++activeTransportCount;
        }
        UnsafeBuffer metaDataBuffer = this.rawLog.metaData();
        if (metaDataBuffer.getInt(LogBufferDescriptor.LOG_ACTIVE_TRANSPORT_COUNT) != activeTransportCount) {
            LogBufferDescriptor.activeTransportCount((UnsafeBuffer)metaDataBuffer, (int)activeTransportCount);
        }
    }

    private ReadablePosition[] positionArray(ArrayList<SubscriberPosition> subscriberPositions, long nowNs) {
        int size = subscriberPositions.size();
        ReadablePosition[] positions = new ReadablePosition[subscriberPositions.size()];
        for (int i = 0; i < size; ++i) {
            SubscriberPosition subscriberPosition = subscriberPositions.get(i);
            positions[i] = subscriberPosition.position();
            if (subscriberPosition.subscription().isTether()) continue;
            this.untetheredSubscriptions.add(new UntetheredSubscription(subscriberPosition.subscription(), (ReadablePosition)subscriberPosition.position(), nowNs));
        }
        return positions;
    }

    static enum State {
        INIT,
        ACTIVE,
        DRAINING,
        LINGER,
        DONE;

    }
}

