/*
 * Decompiled with CFR 0.152.
 */
package io.aeron.driver;

import io.aeron.driver.Configuration;
import io.aeron.driver.DriverConductor;
import io.aeron.driver.DriverConductorProxy;
import io.aeron.driver.DriverManagedResource;
import io.aeron.driver.FlowControl;
import io.aeron.driver.MediaDriver;
import io.aeron.driver.NetworkPublicationPadding3;
import io.aeron.driver.NetworkPublicationThreadLocals;
import io.aeron.driver.PublicationParams;
import io.aeron.driver.ReceiverLivenessTracker;
import io.aeron.driver.RetransmitHandler;
import io.aeron.driver.RetransmitSender;
import io.aeron.driver.Subscribable;
import io.aeron.driver.SubscriptionLink;
import io.aeron.driver.UntetheredSubscription;
import io.aeron.driver.buffer.RawLog;
import io.aeron.driver.media.SendChannelEndpoint;
import io.aeron.driver.status.SystemCounterDescriptor;
import io.aeron.driver.status.SystemCounters;
import io.aeron.logbuffer.FrameDescriptor;
import io.aeron.logbuffer.LogBufferDescriptor;
import io.aeron.logbuffer.LogBufferUnblocker;
import io.aeron.logbuffer.TermScanner;
import io.aeron.protocol.DataHeaderFlyweight;
import io.aeron.protocol.ErrorFlyweight;
import io.aeron.protocol.RttMeasurementFlyweight;
import io.aeron.protocol.SetupFlyweight;
import io.aeron.protocol.StatusMessageFlyweight;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import org.agrona.CloseHelper;
import org.agrona.ErrorHandler;
import org.agrona.collections.ArrayListUtil;
import org.agrona.collections.ArrayUtil;
import org.agrona.concurrent.CachedNanoClock;
import org.agrona.concurrent.UnsafeBuffer;
import org.agrona.concurrent.status.AtomicCounter;
import org.agrona.concurrent.status.Position;
import org.agrona.concurrent.status.ReadablePosition;

public final class NetworkPublication
extends NetworkPublicationPadding3
implements RetransmitSender,
DriverManagedResource,
Subscribable {
    private final long registrationId;
    private final long unblockTimeoutNs;
    private final long connectionTimeoutNs;
    private final long lingerTimeoutNs;
    private final long untetheredWindowLimitTimeoutNs;
    private final long untetheredLingerTimeoutNs;
    private final long untetheredRestingTimeoutNs;
    private final long tag;
    private final long responseCorrelationId;
    private final int positionBitsToShift;
    private final int initialTermId;
    private final int startingTermId;
    private final int startingTermOffset;
    private final int termBufferLength;
    private final int termLengthMask;
    private final int mtuLength;
    private final int termWindowLength;
    private final int sessionId;
    private final int streamId;
    private final boolean isExclusive;
    private final boolean signalEos;
    private final boolean isResponse;
    private final boolean spiesSimulateConnection;
    private volatile boolean hasSpies;
    private volatile boolean hasReceivers;
    private volatile boolean isConnected;
    private volatile boolean isEndOfStream;
    private volatile boolean hasSenderReleased;
    private volatile boolean hasReceivedUnicastEos;
    private State state = State.ACTIVE;
    private final FlowControl flowControl;
    private final UnsafeBuffer[] termBuffers;
    private final ByteBuffer[] sendBuffers;
    private final ErrorHandler errorHandler;
    private final Position publisherPos;
    private final Position publisherLimit;
    private final Position senderPosition;
    private final Position senderLimit;
    private final SendChannelEndpoint channelEndpoint;
    private final ByteBuffer heartbeatBuffer;
    private final DataHeaderFlyweight heartbeatDataHeader;
    private final ByteBuffer setupBuffer;
    private final SetupFlyweight setupHeader;
    private final ByteBuffer rttMeasurementBuffer;
    private final RttMeasurementFlyweight rttMeasurementHeader;
    private final CachedNanoClock cachedNanoClock;
    private final RetransmitHandler retransmitHandler;
    private final UnsafeBuffer metaDataBuffer;
    private final RawLog rawLog;
    private final AtomicCounter heartbeatsSent;
    private final AtomicCounter retransmitsSent;
    private final AtomicCounter retransmittedBytes;
    private final AtomicCounter senderFlowControlLimits;
    private final AtomicCounter senderBpe;
    private final AtomicCounter senderNaksReceived;
    private final AtomicCounter shortSends;
    private final AtomicCounter unblockedPublications;
    private final AtomicCounter publicationsRevoked;
    private final ReceiverLivenessTracker livenessTracker = new ReceiverLivenessTracker();

    NetworkPublication(long registrationId, MediaDriver.Context ctx, PublicationParams params, SendChannelEndpoint channelEndpoint, RawLog rawLog, int termWindowLength, Position publisherPos, Position publisherLimit, Position senderPosition, Position senderLimit, AtomicCounter senderBpe, AtomicCounter senderNaksReceived, int sessionId, int streamId, int initialTermId, FlowControl flowControl, RetransmitHandler retransmitHandler, NetworkPublicationThreadLocals threadLocals, boolean isExclusive) {
        int termLength;
        this.registrationId = registrationId;
        this.unblockTimeoutNs = ctx.publicationUnblockTimeoutNs();
        this.connectionTimeoutNs = ctx.publicationConnectionTimeoutNs();
        this.lingerTimeoutNs = params.lingerTimeoutNs;
        this.untetheredWindowLimitTimeoutNs = params.untetheredWindowLimitTimeoutNs;
        this.untetheredLingerTimeoutNs = params.untetheredLingerTimeoutNs;
        this.untetheredRestingTimeoutNs = params.untetheredRestingTimeoutNs;
        this.tag = params.entityTag;
        this.channelEndpoint = channelEndpoint;
        this.rawLog = rawLog;
        this.cachedNanoClock = ctx.senderCachedNanoClock();
        this.senderPosition = senderPosition;
        this.senderLimit = senderLimit;
        this.senderNaksReceived = senderNaksReceived;
        this.flowControl = flowControl;
        this.retransmitHandler = retransmitHandler;
        this.publisherPos = publisherPos;
        this.publisherLimit = publisherLimit;
        this.mtuLength = params.mtuLength;
        this.initialTermId = initialTermId;
        this.sessionId = sessionId;
        this.streamId = streamId;
        this.spiesSimulateConnection = params.spiesSimulateConnection;
        this.signalEos = params.signalEos;
        this.isExclusive = isExclusive;
        this.startingTermId = params.termId;
        this.startingTermOffset = params.termOffset;
        this.isResponse = params.isResponse;
        this.responseCorrelationId = params.responseCorrelationId;
        this.metaDataBuffer = rawLog.metaData();
        this.setupBuffer = threadLocals.setupBuffer();
        this.setupHeader = threadLocals.setupHeader();
        this.heartbeatBuffer = threadLocals.heartbeatBuffer();
        this.heartbeatDataHeader = threadLocals.heartbeatDataHeader();
        this.rttMeasurementBuffer = threadLocals.rttMeasurementBuffer();
        this.rttMeasurementHeader = threadLocals.rttMeasurementHeader();
        SystemCounters systemCounters = ctx.systemCounters();
        this.heartbeatsSent = systemCounters.get(SystemCounterDescriptor.HEARTBEATS_SENT);
        this.shortSends = systemCounters.get(SystemCounterDescriptor.SHORT_SENDS);
        this.retransmitsSent = systemCounters.get(SystemCounterDescriptor.RETRANSMITS_SENT);
        this.retransmittedBytes = systemCounters.get(SystemCounterDescriptor.RETRANSMITTED_BYTES);
        this.senderFlowControlLimits = systemCounters.get(SystemCounterDescriptor.SENDER_FLOW_CONTROL_LIMITS);
        this.unblockedPublications = systemCounters.get(SystemCounterDescriptor.UNBLOCKED_PUBLICATIONS);
        this.publicationsRevoked = systemCounters.get(SystemCounterDescriptor.PUBLICATIONS_REVOKED);
        this.senderBpe = senderBpe;
        for (UnsafeBuffer termBuffer : this.termBuffers = rawLog.termBuffers()) {
            termBuffer.verifyAlignment();
        }
        this.sendBuffers = rawLog.sliceTerms();
        this.errorHandler = ctx.errorHandler();
        this.termBufferLength = termLength = rawLog.termLength();
        this.termLengthMask = termLength - 1;
        long nowNs = this.cachedNanoClock.nanoTime();
        this.timeOfLastDataOrHeartbeatNs = nowNs - Configuration.PUBLICATION_HEARTBEAT_TIMEOUT_NS - 1L;
        this.timeOfLastSetupNs = nowNs - Configuration.PUBLICATION_SETUP_TIMEOUT_NS - 1L;
        this.positionBitsToShift = LogBufferDescriptor.positionBitsToShift(termLength);
        this.termWindowLength = termWindowLength;
        this.cleanPosition = this.lastSenderPosition = senderPosition.get();
        this.timeOfLastActivityNs = nowNs;
    }

    @Override
    public boolean free() {
        return this.rawLog.free();
    }

    @Override
    public void close() {
        CloseHelper.close(this.errorHandler, this.publisherPos);
        CloseHelper.close(this.errorHandler, this.publisherLimit);
        CloseHelper.close(this.errorHandler, this.senderPosition);
        CloseHelper.close(this.errorHandler, this.senderLimit);
        CloseHelper.close(this.errorHandler, this.senderBpe);
        CloseHelper.close(this.errorHandler, this.senderNaksReceived);
        CloseHelper.closeAll(this.errorHandler, (AutoCloseable[])this.spyPositions);
        int size = this.untetheredSubscriptions.size();
        for (int i = 0; i < size; ++i) {
            UntetheredSubscription untetheredSubscription = (UntetheredSubscription)this.untetheredSubscriptions.get(i);
            if (UntetheredSubscription.State.RESTING != untetheredSubscription.state) continue;
            CloseHelper.close(this.errorHandler, untetheredSubscription.position);
        }
        CloseHelper.close(this.flowControl);
    }

    public long timeOfLastStatusMessageNs() {
        return this.timeOfLastStatusMessageNs;
    }

    public String channel() {
        return this.channelEndpoint.originalUriString();
    }

    public int sessionId() {
        return this.sessionId;
    }

    public int streamId() {
        return this.streamId;
    }

    public void triggerSendSetupFrame(StatusMessageFlyweight msg, InetSocketAddress srcAddress) {
        if (!this.isEndOfStream) {
            this.timeOfLastStatusMessageNs = this.cachedNanoClock.nanoTime();
            this.isSetupElicited = true;
            this.flowControl.onTriggerSendSetup(msg, srcAddress, this.timeOfLastStatusMessageNs);
            if (this.isResponse) {
                this.endpointAddress = srcAddress;
            }
        }
    }

    @Override
    public long subscribableRegistrationId() {
        return this.registrationId;
    }

    @Override
    public void addSubscriber(SubscriptionLink subscriptionLink, ReadablePosition position, long nowNs) {
        this.addSpyPosition(position);
        if (!subscriptionLink.isTether()) {
            this.untetheredSubscriptions.add(new UntetheredSubscription(subscriptionLink, position, nowNs));
        }
        if (this.spiesSimulateConnection) {
            this.updateConnectedState(true);
        }
    }

    @Override
    public void removeSubscriber(SubscriptionLink subscriptionLink, ReadablePosition position) {
        this.removeSpyPosition(position);
        position.close();
        if (!subscriptionLink.isTether()) {
            int lastIndex;
            for (int i = lastIndex = this.untetheredSubscriptions.size() - 1; i >= 0; --i) {
                if (((UntetheredSubscription)this.untetheredSubscriptions.get((int)i)).subscriptionLink != subscriptionLink) continue;
                ArrayListUtil.fastUnorderedRemove(this.untetheredSubscriptions, i, lastIndex);
                break;
            }
        }
        if (this.spiesSimulateConnection) {
            this.updateConnectedState(this.hasSubscribers());
        }
    }

    public void onNak(int termId, int termOffset, int length) {
        this.senderNaksReceived.incrementRelease();
        this.retransmitHandler.onNak(termId, termOffset, length, this.termBufferLength, this.mtuLength, this.flowControl, this);
    }

    public void onStatusMessage(StatusMessageFlyweight msg, InetSocketAddress srcAddress, DriverConductorProxy conductorProxy) {
        boolean isEos = 64 == (msg.flags() & 0x40);
        long timeNs = this.cachedNanoClock.nanoTime();
        if (isEos) {
            this.livenessTracker.onRemoteClose(msg.receiverId());
            if (!this.channelEndpoint.udpChannel().isMulticast() && !this.channelEndpoint.udpChannel().isMultiDestination()) {
                this.hasReceivedUnicastEos = true;
            }
        } else {
            this.livenessTracker.onStatusMessage(msg.receiverId(), timeNs);
        }
        boolean isLive = this.livenessTracker.hasReceivers();
        boolean existingHasReceivers = this.hasReceivers;
        if (!existingHasReceivers && isLive) {
            conductorProxy.responseConnected(this.responseCorrelationId);
        }
        if (existingHasReceivers != isLive) {
            this.hasReceivers = isLive;
        }
        if (!this.hasInitialConnection) {
            this.hasInitialConnection = true;
        }
        this.timeOfLastStatusMessageNs = timeNs;
        this.senderLimit.setRelease(this.flowControl.onStatusMessage(msg, srcAddress, this.senderLimit.get(), this.initialTermId, this.positionBitsToShift, timeNs));
        this.updateConnectedState(this.hasSubscribers());
    }

    public void onError(ErrorFlyweight msg, InetSocketAddress srcAddress, long destinationRegistrationId, DriverConductorProxy conductorProxy) {
        this.flowControl.onError(msg, srcAddress, this.cachedNanoClock.nanoTime());
        if (this.livenessTracker.onRemoteClose(msg.receiverId())) {
            conductorProxy.onPublicationError(this.registrationId, destinationRegistrationId, msg.sessionId(), msg.streamId(), msg.receiverId(), msg.groupTag(), srcAddress, msg.errorCode(), msg.errorMessage());
        }
    }

    public void onRttMeasurement(RttMeasurementFlyweight msg, InetSocketAddress srcAddress) {
        if (128 == (msg.flags() & 0x80)) {
            this.rttMeasurementBuffer.clear();
            this.rttMeasurementHeader.receiverId(msg.receiverId()).echoTimestampNs(msg.echoTimestampNs()).receptionDelta(0L).sessionId(this.sessionId).streamId(this.streamId).flags((short)0);
            int bytesSent = this.doSend(this.rttMeasurementBuffer);
            if (40 != bytesSent) {
                this.shortSends.increment();
            }
        }
    }

    private int doSend(ByteBuffer message) {
        if (this.isResponse) {
            if (null != this.endpointAddress) {
                return this.channelEndpoint.send(message, this.endpointAddress);
            }
            return 0;
        }
        return this.channelEndpoint.send(message);
    }

    @Override
    public void resend(int termId, int termOffset, int length) {
        this.channelEndpoint.resendHook(this.sessionId, this.streamId, termId, termOffset, length);
        long senderPosition = this.senderPosition.get();
        long resendPosition = LogBufferDescriptor.computePosition(termId, termOffset, this.positionBitsToShift, this.initialTermId);
        long bottomResendWindow = senderPosition - (long)(this.termBufferLength >> 1) - (long)FrameDescriptor.computeMaxMessageLength(this.termBufferLength);
        if (bottomResendWindow <= resendPosition && resendPosition < senderPosition) {
            long scanOutcome;
            int available;
            int activeIndex = LogBufferDescriptor.indexByPosition(resendPosition, this.positionBitsToShift);
            UnsafeBuffer termBuffer = this.termBuffers[activeIndex];
            ByteBuffer sendBuffer = this.sendBuffers[activeIndex];
            int remainingBytes = length;
            int totalBytesSent = 0;
            int bytesSent = 0;
            int offset = termOffset;
            while ((available = TermScanner.available(scanOutcome = TermScanner.scanForAvailability(termBuffer, offset += bytesSent, Math.min(this.mtuLength, remainingBytes)))) > 0) {
                sendBuffer.limit(offset + available).position(offset);
                if (available != this.doSend(sendBuffer)) {
                    this.shortSends.increment();
                    break;
                }
                bytesSent = available + TermScanner.padding(scanOutcome);
                totalBytesSent += bytesSent;
                if ((remainingBytes -= bytesSent) > 0) continue;
            }
            if (totalBytesSent > 0) {
                this.retransmitsSent.incrementRelease();
                this.retransmittedBytes.getAndAddRelease(totalBytesSent);
            }
        }
    }

    int send(long nowNs) {
        int bytesSent;
        long senderPosition = this.senderPosition.get();
        int activeTermId = LogBufferDescriptor.computeTermIdFromPosition(senderPosition, this.positionBitsToShift, this.initialTermId);
        int termOffset = (int)senderPosition & this.termLengthMask;
        if (!this.hasInitialConnection || this.isSetupElicited) {
            this.setupMessageCheck(nowNs, activeTermId, termOffset);
        }
        if (0 == (bytesSent = this.sendData(nowNs, senderPosition, termOffset))) {
            bytesSent = this.heartbeatMessageCheck(nowNs, activeTermId, termOffset);
            if (this.spiesSimulateConnection && this.hasSpies && !this.hasReceivers) {
                long newSenderPosition = this.maxSpyPosition(senderPosition);
                this.senderPosition.setRelease(newSenderPosition);
                this.senderLimit.setRelease(this.flowControl.onIdle(nowNs, newSenderPosition, newSenderPosition, this.isEndOfStream));
            } else {
                this.senderLimit.setRelease(this.flowControl.onIdle(nowNs, this.senderLimit.get(), senderPosition, this.isEndOfStream));
            }
            this.updateHasReceivers(nowNs);
        }
        this.retransmitHandler.processTimeouts(nowNs, this);
        return bytesSent;
    }

    SendChannelEndpoint channelEndpoint() {
        return this.channelEndpoint;
    }

    RawLog rawLog() {
        return this.rawLog;
    }

    int publisherLimitId() {
        return this.publisherLimit.id();
    }

    long tag() {
        return this.tag;
    }

    int termBufferLength() {
        return this.termBufferLength;
    }

    int mtuLength() {
        return this.mtuLength;
    }

    long registrationId() {
        return this.registrationId;
    }

    boolean isExclusive() {
        return this.isExclusive;
    }

    boolean spiesSimulateConnection() {
        return this.spiesSimulateConnection;
    }

    int initialTermId() {
        return this.initialTermId;
    }

    int startingTermId() {
        return this.startingTermId;
    }

    int startingTermOffset() {
        return this.startingTermOffset;
    }

    boolean isAcceptingSubscriptions() {
        return State.ACTIVE == this.state || State.DRAINING == this.state && this.hasSpies && this.producerPosition() > this.senderPosition.getVolatile();
    }

    int updatePublisherPositionAndLimit() {
        int workCount = 0;
        if (State.ACTIVE == this.state) {
            long producerPosition = this.producerPosition();
            long senderPosition = this.senderPosition.getVolatile();
            this.publisherPos.setRelease(producerPosition);
            if (this.hasSubscribers()) {
                long minConsumerPosition = senderPosition;
                for (ReadablePosition spyPosition : this.spyPositions) {
                    minConsumerPosition = Math.min(minConsumerPosition, spyPosition.getVolatile());
                }
                long newLimitPosition = minConsumerPosition + (long)this.termWindowLength;
                if (newLimitPosition > this.publisherLimit.get()) {
                    this.cleanBufferTo(minConsumerPosition - (long)this.termBufferLength);
                    long cleanPosition = this.cleanPosition;
                    int dirtyTermId = LogBufferDescriptor.computeTermIdFromPosition(cleanPosition, this.positionBitsToShift, this.initialTermId);
                    int activeTermId = LogBufferDescriptor.computeTermIdFromPosition(newLimitPosition, this.positionBitsToShift, this.initialTermId);
                    int termGap = activeTermId - dirtyTermId;
                    if (termGap < 2 || 2 == termGap && 0 != (int)(cleanPosition & (long)this.termLengthMask)) {
                        this.publisherLimit.setRelease(newLimitPosition);
                    }
                    workCount = 1;
                }
            } else if (this.publisherLimit.get() > senderPosition) {
                this.updateConnectedState(false);
                this.publisherLimit.setRelease(senderPosition);
                this.cleanBufferTo(senderPosition - (long)this.termBufferLength);
                workCount = 1;
            }
        }
        return workCount;
    }

    boolean hasSpies() {
        return this.hasSpies;
    }

    void updateHasReceivers(long timeNs) {
        this.livenessTracker.onIdle(timeNs, this.connectionTimeoutNs);
        boolean isLive = this.livenessTracker.hasReceivers();
        if (this.hasReceivers != isLive) {
            this.hasReceivers = isLive;
        }
        this.timeOfLastUpdateReceivers = timeNs;
    }

    private int sendData(long nowNs, long senderPosition, int termOffset) {
        int bytesSent = 0;
        int availableWindow = (int)(this.senderLimit.get() - senderPosition);
        if (availableWindow > 0) {
            int scanLimit = Math.min(availableWindow, this.mtuLength);
            int activeIndex = LogBufferDescriptor.indexByPosition(senderPosition, this.positionBitsToShift);
            long scanOutcome = TermScanner.scanForAvailability(this.termBuffers[activeIndex], termOffset, scanLimit);
            int available = TermScanner.available(scanOutcome);
            if (available > 0) {
                ByteBuffer sendBuffer = this.sendBuffers[activeIndex];
                sendBuffer.limit(termOffset + available).position(termOffset);
                if (available == this.doSend(sendBuffer)) {
                    this.timeOfLastDataOrHeartbeatNs = nowNs;
                    this.trackSenderLimits = true;
                    bytesSent = available + TermScanner.padding(scanOutcome);
                    this.senderPosition.setRelease(senderPosition + (long)bytesSent);
                } else {
                    this.shortSends.increment();
                }
            } else if (available < 0 && this.trackSenderLimits) {
                this.trackSenderLimits = false;
                this.senderBpe.incrementRelease();
                this.senderFlowControlLimits.incrementRelease();
            }
        } else if (this.trackSenderLimits) {
            this.trackSenderLimits = false;
            this.senderBpe.incrementRelease();
            this.senderFlowControlLimits.incrementRelease();
        }
        return bytesSent;
    }

    private void setupMessageCheck(long nowNs, int activeTermId, int termOffset) {
        if (this.timeOfLastSetupNs + Configuration.PUBLICATION_SETUP_TIMEOUT_NS - nowNs < 0L) {
            this.timeOfLastSetupNs = nowNs;
            int flags = (this.isSendResponseSetupFlag() ? 128 : 0) | (this.hasGroupSemantics() ? 64 : 0);
            this.setupBuffer.clear();
            this.setupHeader.activeTermId(activeTermId).termOffset(termOffset).sessionId(this.sessionId).streamId(this.streamId).initialTermId(this.initialTermId).termLength(this.termBufferLength).mtuLength(this.mtuLength).ttl(this.channelEndpoint.multicastTtl()).flags((short)(flags & 0xFFFF));
            if (this.isSetupElicited) {
                this.flowControl.onSetup(this.setupHeader, this.senderLimit.get(), this.senderPosition.get(), this.positionBitsToShift, nowNs);
            }
            if (40 != this.doSend(this.setupBuffer)) {
                this.shortSends.increment();
            }
            if (this.isSetupElicited && this.hasReceivers) {
                this.isSetupElicited = false;
            }
        }
    }

    private int heartbeatMessageCheck(long nowNs, int activeTermId, int termOffset) {
        int bytesSent = 0;
        if (this.hasInitialConnection && this.timeOfLastDataOrHeartbeatNs + Configuration.PUBLICATION_HEARTBEAT_TIMEOUT_NS - nowNs < 0L) {
            short flags = LogBufferDescriptor.isPublicationRevoked(this.metaDataBuffer) ? (short)240 : (this.signalEos && this.isEndOfStream ? (short)224 : 192);
            this.heartbeatBuffer.clear();
            this.heartbeatDataHeader.sessionId(this.sessionId).streamId(this.streamId).termId(activeTermId).termOffset(termOffset).flags(flags);
            bytesSent = this.doSend(this.heartbeatBuffer);
            if (32 != bytesSent) {
                this.shortSends.increment();
            }
            this.timeOfLastDataOrHeartbeatNs = nowNs;
            this.heartbeatsSent.incrementRelease();
        }
        return bytesSent;
    }

    private void cleanBufferTo(long position) {
        long cleanPosition = this.cleanPosition;
        if (position > cleanPosition) {
            UnsafeBuffer dirtyTermBuffer = this.termBuffers[LogBufferDescriptor.indexByPosition(cleanPosition, this.positionBitsToShift)];
            int bytesForCleaning = (int)(position - cleanPosition);
            int termOffset = (int)cleanPosition & this.termLengthMask;
            int length = Math.min(bytesForCleaning, this.termBufferLength - termOffset);
            dirtyTermBuffer.setMemory(termOffset + 8, length - 8, (byte)0);
            dirtyTermBuffer.putLongRelease(termOffset, 0L);
            this.cleanPosition = cleanPosition + (long)length;
        }
    }

    private void checkForBlockedPublisher(long producerPosition, long senderPosition, long nowNs) {
        if (senderPosition == this.lastSenderPosition && this.isPossiblyBlocked(producerPosition, senderPosition)) {
            if (this.timeOfLastActivityNs + this.unblockTimeoutNs - nowNs < 0L && LogBufferUnblocker.unblock(this.termBuffers, this.metaDataBuffer, senderPosition, this.termBufferLength)) {
                this.unblockedPublications.incrementRelease();
            }
        } else {
            this.timeOfLastActivityNs = nowNs;
            this.lastSenderPosition = senderPosition;
        }
    }

    private boolean isPossiblyBlocked(long producerPosition, long consumerPosition) {
        int expectedTermCount;
        int producerTermCount = LogBufferDescriptor.activeTermCount(this.metaDataBuffer);
        if (producerTermCount != (expectedTermCount = (int)(consumerPosition >> this.positionBitsToShift))) {
            return true;
        }
        return producerPosition > consumerPosition;
    }

    private boolean spiesFinishedConsuming(DriverConductor conductor, long eosPosition) {
        if (this.hasSpies) {
            for (ReadablePosition spyPosition : this.spyPositions) {
                if (spyPosition.getVolatile() >= eosPosition) continue;
                return false;
            }
            this.hasSpies = false;
            conductor.cleanupSpies(this);
        }
        return true;
    }

    private long maxSpyPosition(long senderPosition) {
        long position = senderPosition;
        for (ReadablePosition spyPosition : this.spyPositions) {
            position = Math.max(position, spyPosition.getVolatile());
        }
        return position;
    }

    private void updateConnectedState(boolean newConnectedState) {
        if (newConnectedState != this.isConnected) {
            LogBufferDescriptor.isConnected(this.metaDataBuffer, newConnectedState);
            this.isConnected = newConnectedState;
        }
    }

    private boolean hasSubscribers() {
        return this.spiesSimulateConnection && this.hasSpies || this.hasReceivers && this.flowControl.hasRequiredReceivers();
    }

    private void checkUntetheredSubscriptions(long nowNs, DriverConductor conductor) {
        ArrayList untetheredSubscriptions = this.untetheredSubscriptions;
        int untetheredSubscriptionsSize = untetheredSubscriptions.size();
        if (untetheredSubscriptionsSize > 0) {
            int lastIndex;
            long senderPosition = this.senderPosition.getVolatile();
            long untetheredWindowLimit = senderPosition - (long)this.termWindowLength + (long)(this.termWindowLength >> 2);
            for (int i = lastIndex = untetheredSubscriptionsSize - 1; i >= 0; --i) {
                UntetheredSubscription untethered = (UntetheredSubscription)untetheredSubscriptions.get(i);
                if (UntetheredSubscription.State.ACTIVE == untethered.state) {
                    if (untethered.position.getVolatile() > untetheredWindowLimit) {
                        untethered.timeOfLastUpdateNs = nowNs;
                        continue;
                    }
                    if (untethered.timeOfLastUpdateNs + this.untetheredWindowLimitTimeoutNs - nowNs > 0L) continue;
                    conductor.notifyUnavailableImageLink(this.registrationId, untethered.subscriptionLink);
                    untethered.state(UntetheredSubscription.State.LINGER, nowNs, this.streamId, this.sessionId);
                    continue;
                }
                if (UntetheredSubscription.State.LINGER == untethered.state) {
                    if (untethered.timeOfLastUpdateNs + this.untetheredLingerTimeoutNs - nowNs > 0L) continue;
                    this.removeSpyPosition(untethered.position);
                    if (untethered.subscriptionLink.isRejoin()) {
                        untethered.state(UntetheredSubscription.State.RESTING, nowNs, this.streamId, this.sessionId);
                        continue;
                    }
                    ArrayListUtil.fastUnorderedRemove(untetheredSubscriptions, i, lastIndex--);
                    untethered.position.close();
                    continue;
                }
                if (UntetheredSubscription.State.RESTING != untethered.state || untethered.timeOfLastUpdateNs + this.untetheredRestingTimeoutNs - nowNs > 0L) continue;
                this.addSpyPosition(untethered.position);
                conductor.notifyAvailableImageLink(this.registrationId, this.sessionId, untethered.subscriptionLink, untethered.position.id(), senderPosition, this.rawLog.fileName(), "aeron:ipc");
                untethered.state(UntetheredSubscription.State.ACTIVE, nowNs, this.streamId, this.sessionId);
            }
        }
    }

    @Override
    public void onTimeEvent(long timeNs, long timeMs, DriverConductor conductor) {
        switch (this.state) {
            case ACTIVE: {
                if (LogBufferDescriptor.isPublicationRevoked(this.metaDataBuffer)) {
                    long revokedPos = this.producerPosition();
                    this.publisherLimit.setRelease(revokedPos);
                    LogBufferDescriptor.endOfStreamPosition(this.metaDataBuffer, revokedPos);
                    this.updateConnectedState(false);
                    this.isConnected = false;
                    this.isEndOfStream = true;
                    conductor.cleanupSpies(this);
                    this.state = State.LINGER;
                    NetworkPublication.logRevoke(revokedPos, this.sessionId(), this.streamId(), this.channel());
                    this.publicationsRevoked.increment();
                    break;
                }
                this.checkUntetheredSubscriptions(timeNs, conductor);
                this.updateConnectedState(this.hasSubscribers());
                long producerPosition = this.producerPosition();
                this.publisherPos.setRelease(producerPosition);
                if (this.isExclusive) break;
                this.checkForBlockedPublisher(producerPosition, this.senderPosition.getVolatile(), timeNs);
                break;
            }
            case DRAINING: {
                long producerPosition = this.producerPosition();
                this.publisherPos.setRelease(producerPosition);
                long senderPosition = this.senderPosition.getVolatile();
                if (producerPosition > senderPosition) {
                    if (LogBufferUnblocker.unblock(this.termBuffers, this.metaDataBuffer, senderPosition, this.termBufferLength)) {
                        this.unblockedPublications.incrementRelease();
                        break;
                    }
                    if (this.hasReceivers) {
                        break;
                    }
                } else {
                    this.isEndOfStream = true;
                }
                if (!this.spiesFinishedConsuming(conductor, producerPosition)) break;
                this.timeOfLastActivityNs = timeNs;
                this.state = State.LINGER;
                break;
            }
            case LINGER: {
                if (0 != this.refCount || !this.hasReceivedUnicastEos && this.timeOfLastActivityNs + this.lingerTimeoutNs - timeNs >= 0L) break;
                this.channelEndpoint.decRef();
                conductor.cleanupPublication(this);
                this.timeOfLastActivityNs = timeNs;
                this.state = State.DONE;
                break;
            }
        }
    }

    @Override
    public boolean hasReachedEndOfLife() {
        return this.hasSenderReleased;
    }

    public long responseCorrelationId() {
        return this.responseCorrelationId;
    }

    void revoke() {
        LogBufferDescriptor.isPublicationRevoked(this.metaDataBuffer, true);
    }

    void decRef() {
        if (0 == --this.refCount) {
            long producerPosition = this.producerPosition();
            this.publisherLimit.setRelease(producerPosition);
            LogBufferDescriptor.endOfStreamPosition(this.metaDataBuffer, producerPosition);
            if (!LogBufferDescriptor.isPublicationRevoked(this.metaDataBuffer)) {
                if (this.senderPosition.getVolatile() >= producerPosition) {
                    this.isEndOfStream = true;
                }
                this.state = State.DRAINING;
            }
        }
    }

    void incRef() {
        ++this.refCount;
    }

    State state() {
        return this.state;
    }

    void senderRelease() {
        this.hasSenderReleased = true;
    }

    long producerPosition() {
        long rawTail = LogBufferDescriptor.rawTailVolatile(this.metaDataBuffer);
        int termOffset = LogBufferDescriptor.termOffset(rawTail, this.termBufferLength);
        return LogBufferDescriptor.computePosition(LogBufferDescriptor.termId(rawTail), termOffset, this.positionBitsToShift, this.initialTermId);
    }

    long consumerPosition() {
        return this.senderPosition.getVolatile();
    }

    private void addSpyPosition(ReadablePosition position) {
        this.spyPositions = ArrayUtil.add(this.spyPositions, position);
        this.hasSpies = true;
    }

    private void removeSpyPosition(ReadablePosition position) {
        this.spyPositions = ArrayUtil.remove(this.spyPositions, position);
        this.hasSpies = 0 != this.spyPositions.length;
    }

    private boolean isSendResponseSetupFlag() {
        return !this.isResponse && -1L != this.responseCorrelationId;
    }

    private boolean hasGroupSemantics() {
        return this.channelEndpoint().udpChannel().hasGroupSemantics();
    }

    private static void logRevoke(long revokedPos, int sessionId, int streamId, String channel) {
    }

    static enum State {
        ACTIVE,
        DRAINING,
        LINGER,
        DONE;

    }
}

