/*
 * Decompiled with CFR 0.152.
 */
package io.aeron.driver;

import io.aeron.driver.Configuration;
import io.aeron.driver.DriverConductor;
import io.aeron.driver.DriverManagedResource;
import io.aeron.driver.FlowControl;
import io.aeron.driver.MediaDriver;
import io.aeron.driver.NetworkPublicationPadding3;
import io.aeron.driver.NetworkPublicationThreadLocals;
import io.aeron.driver.PublicationParams;
import io.aeron.driver.RetransmitHandler;
import io.aeron.driver.RetransmitSender;
import io.aeron.driver.Subscribable;
import io.aeron.driver.SubscriptionLink;
import io.aeron.driver.UntetheredSubscription;
import io.aeron.driver.buffer.RawLog;
import io.aeron.driver.media.SendChannelEndpoint;
import io.aeron.driver.status.SystemCounterDescriptor;
import io.aeron.driver.status.SystemCounters;
import io.aeron.logbuffer.LogBufferDescriptor;
import io.aeron.logbuffer.LogBufferUnblocker;
import io.aeron.logbuffer.TermScanner;
import io.aeron.protocol.DataHeaderFlyweight;
import io.aeron.protocol.RttMeasurementFlyweight;
import io.aeron.protocol.SetupFlyweight;
import io.aeron.protocol.StatusMessageFlyweight;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import org.agrona.CloseHelper;
import org.agrona.ErrorHandler;
import org.agrona.collections.ArrayListUtil;
import org.agrona.collections.ArrayUtil;
import org.agrona.concurrent.CachedNanoClock;
import org.agrona.concurrent.UnsafeBuffer;
import org.agrona.concurrent.status.AtomicCounter;
import org.agrona.concurrent.status.Position;
import org.agrona.concurrent.status.ReadablePosition;

public final class NetworkPublication
extends NetworkPublicationPadding3
implements RetransmitSender,
DriverManagedResource,
Subscribable {
    private final long registrationId;
    private final long unblockTimeoutNs;
    private final long connectionTimeoutNs;
    private final long lingerTimeoutNs;
    private final long untetheredWindowLimitTimeoutNs;
    private final long untetheredRestingTimeoutNs;
    private final long tag;
    private final int positionBitsToShift;
    private final int initialTermId;
    private final int termBufferLength;
    private final int termLengthMask;
    private final int mtuLength;
    private final int termWindowLength;
    private final int sessionId;
    private final int streamId;
    private final boolean isExclusive;
    private final boolean spiesSimulateConnection;
    private final boolean signalEos;
    private volatile boolean hasReceivers;
    private volatile boolean hasSpies;
    private volatile boolean isConnected;
    private volatile boolean isEndOfStream;
    private volatile boolean hasSenderReleased;
    private State state = State.ACTIVE;
    private final UnsafeBuffer[] termBuffers;
    private final ByteBuffer[] sendBuffers;
    private final ErrorHandler errorHandler;
    private final Position publisherPos;
    private final Position publisherLimit;
    private final Position senderPosition;
    private final Position senderLimit;
    private final SendChannelEndpoint channelEndpoint;
    private final ByteBuffer heartbeatBuffer;
    private final DataHeaderFlyweight heartbeatDataHeader;
    private final ByteBuffer setupBuffer;
    private final SetupFlyweight setupHeader;
    private final ByteBuffer rttMeasurementBuffer;
    private final RttMeasurementFlyweight rttMeasurementHeader;
    private final FlowControl flowControl;
    private final CachedNanoClock cachedNanoClock;
    private final RetransmitHandler retransmitHandler;
    private final UnsafeBuffer metaDataBuffer;
    private final RawLog rawLog;
    private final AtomicCounter heartbeatsSent;
    private final AtomicCounter retransmitsSent;
    private final AtomicCounter senderFlowControlLimits;
    private final AtomicCounter senderBpe;
    private final AtomicCounter shortSends;
    private final AtomicCounter unblockedPublications;

    NetworkPublication(long registrationId, MediaDriver.Context ctx, PublicationParams params, SendChannelEndpoint channelEndpoint, RawLog rawLog, int termWindowLength, Position publisherPos, Position publisherLimit, Position senderPosition, Position senderLimit, AtomicCounter senderBpe, int sessionId, int streamId, int initialTermId, FlowControl flowControl, RetransmitHandler retransmitHandler, NetworkPublicationThreadLocals threadLocals, boolean isExclusive) {
        int termLength;
        this.registrationId = registrationId;
        this.unblockTimeoutNs = ctx.publicationUnblockTimeoutNs();
        this.connectionTimeoutNs = ctx.publicationConnectionTimeoutNs();
        this.lingerTimeoutNs = params.lingerTimeoutNs;
        this.untetheredWindowLimitTimeoutNs = ctx.untetheredWindowLimitTimeoutNs();
        this.untetheredRestingTimeoutNs = ctx.untetheredRestingTimeoutNs();
        this.tag = params.entityTag;
        this.channelEndpoint = channelEndpoint;
        this.rawLog = rawLog;
        this.cachedNanoClock = ctx.senderCachedNanoClock();
        this.senderPosition = senderPosition;
        this.senderLimit = senderLimit;
        this.flowControl = flowControl;
        this.retransmitHandler = retransmitHandler;
        this.publisherPos = publisherPos;
        this.publisherLimit = publisherLimit;
        this.mtuLength = params.mtuLength;
        this.initialTermId = initialTermId;
        this.sessionId = sessionId;
        this.streamId = streamId;
        this.spiesSimulateConnection = params.spiesSimulateConnection;
        this.signalEos = params.signalEos;
        this.isExclusive = isExclusive;
        this.metaDataBuffer = rawLog.metaData();
        this.setupBuffer = threadLocals.setupBuffer();
        this.setupHeader = threadLocals.setupHeader();
        this.heartbeatBuffer = threadLocals.heartbeatBuffer();
        this.heartbeatDataHeader = threadLocals.heartbeatDataHeader();
        this.rttMeasurementBuffer = threadLocals.rttMeasurementBuffer();
        this.rttMeasurementHeader = threadLocals.rttMeasurementHeader();
        SystemCounters systemCounters = ctx.systemCounters();
        this.heartbeatsSent = systemCounters.get(SystemCounterDescriptor.HEARTBEATS_SENT);
        this.shortSends = systemCounters.get(SystemCounterDescriptor.SHORT_SENDS);
        this.retransmitsSent = systemCounters.get(SystemCounterDescriptor.RETRANSMITS_SENT);
        this.senderFlowControlLimits = systemCounters.get(SystemCounterDescriptor.SENDER_FLOW_CONTROL_LIMITS);
        this.unblockedPublications = systemCounters.get(SystemCounterDescriptor.UNBLOCKED_PUBLICATIONS);
        this.senderBpe = senderBpe;
        this.termBuffers = rawLog.termBuffers();
        this.sendBuffers = rawLog.sliceTerms();
        this.errorHandler = ctx.errorHandler();
        this.termBufferLength = termLength = rawLog.termLength();
        this.termLengthMask = termLength - 1;
        long nowNs = this.cachedNanoClock.nanoTime();
        this.timeOfLastDataOrHeartbeatNs = nowNs - Configuration.PUBLICATION_HEARTBEAT_TIMEOUT_NS - 1L;
        this.timeOfLastSetupNs = nowNs - Configuration.PUBLICATION_SETUP_TIMEOUT_NS - 1L;
        this.timeOfLastStatusMessageNs = nowNs;
        this.positionBitsToShift = LogBufferDescriptor.positionBitsToShift((int)termLength);
        this.termWindowLength = termWindowLength;
        this.cleanPosition = this.lastSenderPosition = senderPosition.get();
        this.timeOfLastActivityNs = nowNs;
    }

    @Override
    public boolean free() {
        return this.rawLog.free();
    }

    @Override
    public void close() {
        CloseHelper.close((ErrorHandler)this.errorHandler, (AutoCloseable)this.publisherPos);
        CloseHelper.close((ErrorHandler)this.errorHandler, (AutoCloseable)this.publisherLimit);
        CloseHelper.close((ErrorHandler)this.errorHandler, (AutoCloseable)this.senderPosition);
        CloseHelper.close((ErrorHandler)this.errorHandler, (AutoCloseable)this.senderLimit);
        CloseHelper.close((ErrorHandler)this.errorHandler, (AutoCloseable)this.senderBpe);
        CloseHelper.closeAll((ErrorHandler)this.errorHandler, (AutoCloseable[])this.spyPositions);
        int size = this.untetheredSubscriptions.size();
        for (int i = 0; i < size; ++i) {
            UntetheredSubscription untetheredSubscription = (UntetheredSubscription)this.untetheredSubscriptions.get(i);
            if (UntetheredSubscription.State.RESTING != untetheredSubscription.state) continue;
            CloseHelper.close((ErrorHandler)this.errorHandler, (AutoCloseable)untetheredSubscription.position);
        }
        CloseHelper.close((ErrorHandler)this.errorHandler, (AutoCloseable)this.rawLog);
    }

    public long timeOfLastStatusMessageNs() {
        return this.timeOfLastStatusMessageNs;
    }

    public String channel() {
        return this.channelEndpoint.originalUriString();
    }

    public int sessionId() {
        return this.sessionId;
    }

    public int streamId() {
        return this.streamId;
    }

    public void triggerSendSetupFrame() {
        if (!this.isEndOfStream) {
            this.timeOfLastStatusMessageNs = this.cachedNanoClock.nanoTime();
            this.isSetupElicited = true;
        }
    }

    @Override
    public void addSubscriber(SubscriptionLink subscriptionLink, ReadablePosition position, long nowNs) {
        this.spyPositions = (ReadablePosition[])ArrayUtil.add((Object[])this.spyPositions, (Object)position);
        this.hasSpies = true;
        if (!subscriptionLink.isTether()) {
            this.untetheredSubscriptions.add(new UntetheredSubscription(subscriptionLink, position, nowNs));
        }
        if (this.spiesSimulateConnection) {
            LogBufferDescriptor.isConnected((UnsafeBuffer)this.metaDataBuffer, (boolean)true);
            this.isConnected = true;
        }
    }

    @Override
    public void removeSubscriber(SubscriptionLink subscriptionLink, ReadablePosition position) {
        this.spyPositions = (ReadablePosition[])ArrayUtil.remove((Object[])this.spyPositions, (Object)position);
        this.hasSpies = this.spyPositions.length > 0;
        position.close();
        if (!subscriptionLink.isTether()) {
            int lastIndex;
            for (int i = lastIndex = this.untetheredSubscriptions.size() - 1; i >= 0; --i) {
                if (((UntetheredSubscription)this.untetheredSubscriptions.get((int)i)).subscriptionLink != subscriptionLink) continue;
                ArrayListUtil.fastUnorderedRemove((ArrayList)this.untetheredSubscriptions, (int)i, (int)lastIndex);
                break;
            }
        }
    }

    public void onNak(int termId, int termOffset, int length) {
        this.retransmitHandler.onNak(termId, termOffset, length, this.termBufferLength, this);
    }

    public void onStatusMessage(StatusMessageFlyweight msg, InetSocketAddress srcAddress) {
        long timeNs;
        if (!this.hasReceivers) {
            this.hasReceivers = true;
        }
        if (!this.hasInitialConnection) {
            this.hasInitialConnection = true;
        }
        this.timeOfLastStatusMessageNs = timeNs = this.cachedNanoClock.nanoTime();
        this.senderLimit.setOrdered(this.flowControl.onStatusMessage(msg, srcAddress, this.senderLimit.get(), this.initialTermId, this.positionBitsToShift, timeNs));
        if (!this.isConnected && this.flowControl.hasRequiredReceivers()) {
            LogBufferDescriptor.isConnected((UnsafeBuffer)this.metaDataBuffer, (boolean)true);
            this.isConnected = true;
        }
    }

    public void onRttMeasurement(RttMeasurementFlyweight msg, InetSocketAddress srcAddress) {
        if (128 == (msg.flags() & 0x80)) {
            this.rttMeasurementBuffer.clear();
            this.rttMeasurementHeader.receiverId(msg.receiverId()).echoTimestampNs(msg.echoTimestampNs()).receptionDelta(0L).sessionId(this.sessionId).streamId(this.streamId).flags((short)0);
            int bytesSent = this.channelEndpoint.send(this.rttMeasurementBuffer);
            if (40 != bytesSent) {
                this.shortSends.increment();
            }
        }
    }

    @Override
    public void resend(int termId, int termOffset, int length) {
        long resendPosition;
        long senderPosition = this.senderPosition.get();
        long bottomResendWindow = senderPosition - (long)(this.termBufferLength >> 1);
        if (bottomResendWindow <= (resendPosition = LogBufferDescriptor.computePosition((int)termId, (int)termOffset, (int)this.positionBitsToShift, (int)this.initialTermId)) && resendPosition < senderPosition) {
            long scanOutcome;
            int available;
            int activeIndex = LogBufferDescriptor.indexByPosition((long)resendPosition, (int)this.positionBitsToShift);
            UnsafeBuffer termBuffer = this.termBuffers[activeIndex];
            ByteBuffer sendBuffer = this.sendBuffers[activeIndex];
            int remainingBytes = length;
            int bytesSent = 0;
            int offset = termOffset;
            while ((available = TermScanner.available((long)(scanOutcome = TermScanner.scanForAvailability((UnsafeBuffer)termBuffer, (int)(offset += bytesSent), (int)Math.min(this.mtuLength, remainingBytes))))) > 0) {
                sendBuffer.limit(offset + available).position(offset);
                if (available != this.channelEndpoint.send(sendBuffer)) {
                    this.shortSends.increment();
                    break;
                }
                bytesSent = available + TermScanner.padding((long)scanOutcome);
                if ((remainingBytes -= bytesSent) > 0) continue;
            }
            this.retransmitsSent.incrementOrdered();
        }
    }

    int send(long nowNs) {
        int bytesSent;
        long senderPosition = this.senderPosition.get();
        int activeTermId = LogBufferDescriptor.computeTermIdFromPosition((long)senderPosition, (int)this.positionBitsToShift, (int)this.initialTermId);
        int termOffset = (int)senderPosition & this.termLengthMask;
        if (!this.hasInitialConnection || this.isSetupElicited) {
            this.setupMessageCheck(nowNs, activeTermId, termOffset);
        }
        if (0 == (bytesSent = this.sendData(nowNs, senderPosition, termOffset))) {
            bytesSent = this.heartbeatMessageCheck(nowNs, activeTermId, termOffset, this.signalEos && this.isEndOfStream);
            if (this.spiesSimulateConnection && this.hasSpies && !this.hasReceivers) {
                long newSenderPosition = this.maxSpyPosition(senderPosition);
                this.senderPosition.setOrdered(newSenderPosition);
                this.senderLimit.setOrdered(this.flowControl.onIdle(nowNs, newSenderPosition, newSenderPosition, this.isEndOfStream));
            } else {
                this.senderLimit.setOrdered(this.flowControl.onIdle(nowNs, this.senderLimit.get(), senderPosition, this.isEndOfStream));
            }
        }
        this.updateHasReceivers(nowNs);
        this.retransmitHandler.processTimeouts(nowNs, this);
        return bytesSent;
    }

    SendChannelEndpoint channelEndpoint() {
        return this.channelEndpoint;
    }

    RawLog rawLog() {
        return this.rawLog;
    }

    int publisherLimitId() {
        return this.publisherLimit.id();
    }

    long tag() {
        return this.tag;
    }

    int termBufferLength() {
        return this.termBufferLength;
    }

    int mtuLength() {
        return this.mtuLength;
    }

    long registrationId() {
        return this.registrationId;
    }

    boolean isExclusive() {
        return this.isExclusive;
    }

    boolean spiesSimulateConnection() {
        return this.spiesSimulateConnection;
    }

    boolean isAcceptingSubscriptions() {
        return State.ACTIVE == this.state || State.DRAINING == this.state && this.spyPositions.length > 0 && this.producerPosition() > this.senderPosition.getVolatile();
    }

    int updatePublisherLimit() {
        int workCount = 0;
        if (State.ACTIVE == this.state) {
            long senderPosition = this.senderPosition.getVolatile();
            if (this.hasRequiredReceivers() || this.spiesSimulateConnection && this.spyPositions.length > 0) {
                long minConsumerPosition = senderPosition;
                for (ReadablePosition spyPosition : this.spyPositions) {
                    minConsumerPosition = Math.min(minConsumerPosition, spyPosition.getVolatile());
                }
                long proposedPublisherLimit = minConsumerPosition + (long)this.termWindowLength;
                long publisherLimit = this.publisherLimit.get();
                if (proposedPublisherLimit > publisherLimit) {
                    this.cleanBufferTo(minConsumerPosition - (long)this.termBufferLength);
                    this.publisherLimit.setOrdered(proposedPublisherLimit);
                    workCount = 1;
                }
            } else if (this.publisherLimit.get() > senderPosition) {
                this.publisherLimit.setOrdered(senderPosition);
                this.cleanBufferTo(senderPosition - (long)this.termBufferLength);
                workCount = 1;
            }
        }
        return workCount;
    }

    boolean hasSpies() {
        return this.hasSpies;
    }

    void updateHasReceivers(long timeNs) {
        if (this.timeOfLastStatusMessageNs + this.connectionTimeoutNs - timeNs < 0L && this.hasReceivers) {
            this.hasReceivers = false;
        }
    }

    private int sendData(long nowNs, long senderPosition, int termOffset) {
        int bytesSent = 0;
        int availableWindow = (int)(this.senderLimit.get() - senderPosition);
        if (availableWindow > 0) {
            int scanLimit = Math.min(availableWindow, this.mtuLength);
            int activeIndex = LogBufferDescriptor.indexByPosition((long)senderPosition, (int)this.positionBitsToShift);
            long scanOutcome = TermScanner.scanForAvailability((UnsafeBuffer)this.termBuffers[activeIndex], (int)termOffset, (int)scanLimit);
            int available = TermScanner.available((long)scanOutcome);
            if (available > 0) {
                ByteBuffer sendBuffer = this.sendBuffers[activeIndex];
                sendBuffer.limit(termOffset + available).position(termOffset);
                if (available == this.channelEndpoint.send(sendBuffer)) {
                    this.timeOfLastDataOrHeartbeatNs = nowNs;
                    this.trackSenderLimits = true;
                    bytesSent = available;
                    this.senderPosition.setOrdered(senderPosition + (long)bytesSent + (long)TermScanner.padding((long)scanOutcome));
                } else {
                    this.shortSends.increment();
                }
            }
        } else if (this.trackSenderLimits) {
            this.trackSenderLimits = false;
            this.senderBpe.incrementOrdered();
            this.senderFlowControlLimits.incrementOrdered();
        }
        return bytesSent;
    }

    private void setupMessageCheck(long nowNs, int activeTermId, int termOffset) {
        if (this.timeOfLastSetupNs + Configuration.PUBLICATION_SETUP_TIMEOUT_NS - nowNs < 0L) {
            this.timeOfLastSetupNs = nowNs;
            this.setupBuffer.clear();
            this.setupHeader.activeTermId(activeTermId).termOffset(termOffset).sessionId(this.sessionId).streamId(this.streamId).initialTermId(this.initialTermId).termLength(this.termBufferLength).mtuLength(this.mtuLength).ttl(this.channelEndpoint.multicastTtl());
            if (40 != this.channelEndpoint.send(this.setupBuffer)) {
                this.shortSends.increment();
            }
            if (this.isSetupElicited && this.hasReceivers) {
                this.isSetupElicited = false;
            }
        }
    }

    private int heartbeatMessageCheck(long nowNs, int activeTermId, int termOffset, boolean signalEos) {
        int bytesSent = 0;
        if (this.hasInitialConnection && this.timeOfLastDataOrHeartbeatNs + Configuration.PUBLICATION_HEARTBEAT_TIMEOUT_NS - nowNs < 0L) {
            this.heartbeatBuffer.clear();
            this.heartbeatDataHeader.sessionId(this.sessionId).streamId(this.streamId).termId(activeTermId).termOffset(termOffset).flags((short)((byte)(signalEos ? 224 : 192)));
            bytesSent = this.channelEndpoint.send(this.heartbeatBuffer);
            if (32 != bytesSent) {
                this.shortSends.increment();
            }
            this.timeOfLastDataOrHeartbeatNs = nowNs;
            this.heartbeatsSent.incrementOrdered();
        }
        return bytesSent;
    }

    private void cleanBufferTo(long position) {
        long cleanPosition = this.cleanPosition;
        if (position > cleanPosition) {
            UnsafeBuffer dirtyTerm = this.termBuffers[LogBufferDescriptor.indexByPosition((long)cleanPosition, (int)this.positionBitsToShift)];
            int bytesForCleaning = (int)(position - cleanPosition);
            int termOffset = (int)cleanPosition & this.termLengthMask;
            int length = Math.min(bytesForCleaning, this.termBufferLength - termOffset);
            dirtyTerm.setMemory(termOffset + 8, length - 8, (byte)0);
            dirtyTerm.putLongOrdered(termOffset, 0L);
            this.cleanPosition = cleanPosition + (long)length;
        }
    }

    private void checkForBlockedPublisher(long producerPosition, long senderPosition, long nowNs) {
        if (senderPosition == this.lastSenderPosition && this.isPossiblyBlocked(producerPosition, senderPosition)) {
            if (this.timeOfLastActivityNs + this.unblockTimeoutNs - nowNs < 0L && LogBufferUnblocker.unblock((UnsafeBuffer[])this.termBuffers, (UnsafeBuffer)this.metaDataBuffer, (long)senderPosition, (int)this.termBufferLength)) {
                this.unblockedPublications.incrementOrdered();
            }
        } else {
            this.timeOfLastActivityNs = nowNs;
            this.lastSenderPosition = senderPosition;
        }
    }

    private boolean isPossiblyBlocked(long producerPosition, long consumerPosition) {
        int expectedTermCount;
        int producerTermCount = LogBufferDescriptor.activeTermCount((UnsafeBuffer)this.metaDataBuffer);
        if (producerTermCount != (expectedTermCount = (int)(consumerPosition >> this.positionBitsToShift))) {
            return true;
        }
        return producerPosition > consumerPosition;
    }

    private boolean spiesFinishedConsuming(DriverConductor conductor, long eosPosition) {
        if (this.spyPositions.length > 0) {
            for (ReadablePosition spyPosition : this.spyPositions) {
                if (spyPosition.getVolatile() >= eosPosition) continue;
                return false;
            }
            this.hasSpies = false;
            conductor.cleanupSpies(this);
        }
        return true;
    }

    private long maxSpyPosition(long senderPosition) {
        long position = senderPosition;
        for (ReadablePosition spyPosition : this.spyPositions) {
            position = Math.max(position, spyPosition.getVolatile());
        }
        return position;
    }

    private void updateConnectedStatus() {
        boolean currentConnectedState;
        boolean bl = currentConnectedState = this.hasRequiredReceivers() || this.spiesSimulateConnection && this.spyPositions.length > 0;
        if (currentConnectedState != this.isConnected) {
            LogBufferDescriptor.isConnected((UnsafeBuffer)this.metaDataBuffer, (boolean)currentConnectedState);
            this.isConnected = currentConnectedState;
        }
    }

    private boolean hasRequiredReceivers() {
        return this.hasReceivers && this.flowControl.hasRequiredReceivers();
    }

    private void checkUntetheredSubscriptions(long nowNs, DriverConductor conductor) {
        ArrayList untetheredSubscriptions = this.untetheredSubscriptions;
        int untetheredSubscriptionsSize = untetheredSubscriptions.size();
        if (untetheredSubscriptionsSize > 0) {
            int lastIndex;
            long senderPosition = this.senderPosition.getVolatile();
            long untetheredWindowLimit = senderPosition - (long)this.termWindowLength + (long)(this.termWindowLength >> 2);
            for (int i = lastIndex = untetheredSubscriptionsSize - 1; i >= 0; --i) {
                UntetheredSubscription untethered = (UntetheredSubscription)untetheredSubscriptions.get(i);
                if (UntetheredSubscription.State.ACTIVE == untethered.state) {
                    if (untethered.position.getVolatile() > untetheredWindowLimit) {
                        untethered.timeOfLastUpdateNs = nowNs;
                        continue;
                    }
                    if (untethered.timeOfLastUpdateNs + this.untetheredWindowLimitTimeoutNs - nowNs > 0L) continue;
                    conductor.notifyUnavailableImageLink(this.registrationId, untethered.subscriptionLink);
                    untethered.state(UntetheredSubscription.State.LINGER, nowNs, this.streamId, this.sessionId);
                    continue;
                }
                if (UntetheredSubscription.State.LINGER == untethered.state) {
                    if (untethered.timeOfLastUpdateNs + this.untetheredWindowLimitTimeoutNs - nowNs > 0L) continue;
                    this.spyPositions = (ReadablePosition[])ArrayUtil.remove((Object[])this.spyPositions, (Object)untethered.position);
                    untethered.state(UntetheredSubscription.State.RESTING, nowNs, this.streamId, this.sessionId);
                    continue;
                }
                if (UntetheredSubscription.State.RESTING != untethered.state || untethered.timeOfLastUpdateNs + this.untetheredRestingTimeoutNs - nowNs > 0L) continue;
                this.spyPositions = (ReadablePosition[])ArrayUtil.add((Object[])this.spyPositions, (Object)untethered.position);
                conductor.notifyAvailableImageLink(this.registrationId, this.sessionId, untethered.subscriptionLink, untethered.position.id(), senderPosition, this.rawLog.fileName(), "aeron:ipc");
                untethered.state(UntetheredSubscription.State.ACTIVE, nowNs, this.streamId, this.sessionId);
                LogBufferDescriptor.isConnected((UnsafeBuffer)this.metaDataBuffer, (boolean)true);
            }
        }
    }

    @Override
    public void onTimeEvent(long timeNs, long timeMs, DriverConductor conductor) {
        switch (this.state) {
            case ACTIVE: {
                this.updateConnectedStatus();
                long producerPosition = this.producerPosition();
                this.publisherPos.setOrdered(producerPosition);
                if (!this.isExclusive) {
                    this.checkForBlockedPublisher(producerPosition, this.senderPosition.getVolatile(), timeNs);
                }
                this.checkUntetheredSubscriptions(timeNs, conductor);
                break;
            }
            case DRAINING: {
                long producerPosition = this.producerPosition();
                this.publisherPos.setOrdered(producerPosition);
                long senderPosition = this.senderPosition.getVolatile();
                if (producerPosition > senderPosition) {
                    if (LogBufferUnblocker.unblock((UnsafeBuffer[])this.termBuffers, (UnsafeBuffer)this.metaDataBuffer, (long)senderPosition, (int)this.termBufferLength)) {
                        this.unblockedPublications.incrementOrdered();
                        break;
                    }
                    if (this.hasReceivers) {
                        break;
                    }
                } else {
                    this.isEndOfStream = true;
                }
                if (!this.spiesFinishedConsuming(conductor, producerPosition)) break;
                this.timeOfLastActivityNs = timeNs;
                this.state = State.LINGER;
                break;
            }
            case LINGER: {
                if (this.timeOfLastActivityNs + this.lingerTimeoutNs - timeNs >= 0L) break;
                this.channelEndpoint.decRef();
                conductor.cleanupPublication(this);
                this.timeOfLastActivityNs = timeNs;
                this.state = State.DONE;
            }
        }
    }

    @Override
    public boolean hasReachedEndOfLife() {
        return this.hasSenderReleased;
    }

    void decRef() {
        if (0 == --this.refCount) {
            long producerPosition = this.producerPosition();
            this.publisherLimit.setOrdered(producerPosition);
            LogBufferDescriptor.endOfStreamPosition((UnsafeBuffer)this.metaDataBuffer, (long)producerPosition);
            if (this.senderPosition.getVolatile() >= producerPosition) {
                this.isEndOfStream = true;
            }
            this.state = State.DRAINING;
        }
    }

    void incRef() {
        ++this.refCount;
    }

    State state() {
        return this.state;
    }

    void senderRelease() {
        this.hasSenderReleased = true;
    }

    long producerPosition() {
        long rawTail = LogBufferDescriptor.rawTailVolatile((UnsafeBuffer)this.metaDataBuffer);
        int termOffset = LogBufferDescriptor.termOffset((long)rawTail, (long)this.termBufferLength);
        return LogBufferDescriptor.computePosition((int)LogBufferDescriptor.termId((long)rawTail), (int)termOffset, (int)this.positionBitsToShift, (int)this.initialTermId);
    }

    long consumerPosition() {
        return this.senderPosition.getVolatile();
    }

    static enum State {
        ACTIVE,
        DRAINING,
        LINGER,
        DONE;

    }
}

