/*
 * Decompiled with CFR 0.152.
 */
package io.aeron.driver;

import io.aeron.driver.AbstractMinMulticastFlowControlRhsPadding;
import io.aeron.driver.FlowControl;
import io.aeron.driver.MediaDriver;
import io.aeron.driver.media.UdpChannel;
import io.aeron.driver.status.FlowControlReceivers;
import io.aeron.logbuffer.LogBufferDescriptor;
import io.aeron.protocol.SetupFlyweight;
import io.aeron.protocol.StatusMessageFlyweight;
import java.util.Arrays;
import org.agrona.AsciiEncoding;
import org.agrona.CloseHelper;
import org.agrona.ErrorHandler;
import org.agrona.SystemUtil;
import org.agrona.collections.ArrayUtil;
import org.agrona.concurrent.status.AtomicCounter;
import org.agrona.concurrent.status.CountersManager;

public abstract class AbstractMinMulticastFlowControl
extends AbstractMinMulticastFlowControlRhsPadding
implements FlowControl {
    static final Receiver[] EMPTY_RECEIVERS = new Receiver[0];
    private final boolean isGroupTagAware;
    private volatile boolean hasRequiredReceivers;
    private int groupMinSize;
    private long groupTag;
    private long receiverTimeoutNs;
    private Receiver[] receivers = EMPTY_RECEIVERS;
    private String channel;
    private AtomicCounter receiverCount;
    private ErrorHandler errorHandler;

    protected AbstractMinMulticastFlowControl(boolean isGroupTagAware) {
        this.isGroupTagAware = isGroupTagAware;
    }

    @Override
    public void initialize(MediaDriver.Context context, CountersManager countersManager, UdpChannel udpChannel, int streamId, int sessionId, long registrationId, int initialTermId, int termBufferLength) {
        this.receiverTimeoutNs = context.flowControlReceiverTimeoutNs();
        this.groupTag = this.isGroupTagAware ? context.flowControlGroupTag() : 0L;
        this.groupMinSize = context.flowControlGroupMinSize();
        this.channel = udpChannel.originalUriString();
        this.parseUriParam(udpChannel.channelUri().get("fc"));
        this.hasRequiredReceivers = this.receivers.length >= this.groupMinSize;
        this.errorHandler = context.errorHandler();
        this.receiverCount = FlowControlReceivers.allocate(context.tempBuffer(), countersManager, registrationId, sessionId, streamId, this.channel);
        this.timeOfLastSetupNs = 0L;
        this.lastSetupSenderLimit = -1L;
    }

    @Override
    public void close() {
        CloseHelper.close(this.errorHandler, this.receiverCount);
    }

    @Override
    public long onSetup(SetupFlyweight flyweight, long senderLimit, long senderPosition, int positionBitsToShift, long timeNs) {
        if (this.receivers.length > 0) {
            this.timeOfLastSetupNs = timeNs;
            this.lastSetupSenderLimit = senderLimit;
        }
        return senderLimit;
    }

    @Override
    public long onIdle(long timeNs, long senderLimit, long senderPosition, boolean isEos) {
        int lastIndex;
        long minLimitPosition = this.lastSetupSenderLimit(timeNs);
        int removed = 0;
        Receiver[] receivers = this.receivers;
        for (int i = lastIndex = receivers.length - 1; i >= 0; --i) {
            Receiver receiver = receivers[i];
            if (receiver.timeOfLastStatusMessageNs + this.receiverTimeoutNs - timeNs < 0L || receiver.eosFlagged) {
                if (i != lastIndex) {
                    receivers[i] = receivers[lastIndex--];
                }
                this.receiverRemoved(receiver.receiverId, receiver.sessionId, receiver.streamId, this.channel, receivers.length - ++removed);
                continue;
            }
            minLimitPosition = Math.min(minLimitPosition, receiver.lastPositionPlusWindow);
        }
        if (removed > 0) {
            this.hasRequiredReceivers = (receivers = AbstractMinMulticastFlowControl.truncateReceivers(receivers, removed)).length >= this.groupMinSize;
            this.receivers = receivers;
            this.receiverCount.setOrdered(receivers.length);
        }
        return receivers.length < this.groupMinSize || receivers.length == 0 ? senderLimit : minLimitPosition;
    }

    @Override
    public boolean hasRequiredReceivers() {
        return this.hasRequiredReceivers;
    }

    protected final long processStatusMessage(StatusMessageFlyweight flyweight, long senderLimit, int initialTermId, int positionBitsToShift, long timeNs, boolean matchesTag) {
        Receiver[] receivers;
        long position = LogBufferDescriptor.computePosition(flyweight.consumptionTermId(), flyweight.consumptionTermOffset(), positionBitsToShift, initialTermId);
        long windowLength = flyweight.receiverWindowLength();
        long receiverId = flyweight.receiverId();
        long lastPositionPlusWindow = position + windowLength;
        boolean eosFlagged = 64 == (flyweight.flags() & 0x40);
        boolean isExisting = false;
        long minPosition = this.lastSetupSenderLimit(timeNs);
        for (Receiver receiver : receivers = this.receivers) {
            if (matchesTag && receiverId == receiver.receiverId) {
                receiver.eosFlagged = eosFlagged;
                receiver.lastPosition = Math.max(position, receiver.lastPosition);
                receiver.lastPositionPlusWindow = lastPositionPlusWindow;
                receiver.timeOfLastStatusMessageNs = timeNs;
                isExisting = true;
            }
            minPosition = Math.min(minPosition, receiver.lastPositionPlusWindow);
        }
        if (!(isExisting || eosFlagged || !matchesTag || 0 != receivers.length && lastPositionPlusWindow < minPosition - windowLength)) {
            Receiver receiver = new Receiver(receiverId, flyweight.sessionId(), flyweight.streamId(), position, lastPositionPlusWindow, timeNs);
            this.hasRequiredReceivers = (receivers = ArrayUtil.add(receivers, receiver)).length >= this.groupMinSize;
            this.receivers = receivers;
            minPosition = Math.min(minPosition, lastPositionPlusWindow);
            this.receiverAdded(receiver.receiverId, receiver.sessionId, receiver.streamId, this.channel, receivers.length);
            this.receiverCount.setOrdered(receivers.length);
            this.lastSetupSenderLimit = -1L;
        }
        if (receivers.length < this.groupMinSize) {
            return senderLimit;
        }
        if (0 == receivers.length) {
            return Math.max(senderLimit, lastPositionPlusWindow);
        }
        return Math.max(senderLimit, minPosition);
    }

    protected final long receiverTimeoutNs() {
        return this.receiverTimeoutNs;
    }

    protected final boolean hasGroupTag() {
        return this.isGroupTagAware;
    }

    protected final long groupTag() {
        return this.groupTag;
    }

    protected final int groupMinSize() {
        return this.groupMinSize;
    }

    static Receiver[] truncateReceivers(Receiver[] receivers, int removed) {
        int length = receivers.length;
        int newLength = length - removed;
        if (0 == newLength) {
            return EMPTY_RECEIVERS;
        }
        return Arrays.copyOf(receivers, newLength);
    }

    private void parseUriParam(String fcValue) {
        if (null != fcValue) {
            for (String arg : fcValue.split(",")) {
                if (arg.startsWith("t:")) {
                    this.receiverTimeoutNs = SystemUtil.parseDuration("fc receiver timeout", arg.substring(2));
                    continue;
                }
                if (!arg.startsWith("g:")) continue;
                int groupMinSizeIndex = arg.indexOf(47);
                if (2 != groupMinSizeIndex && this.isGroupTagAware) {
                    int lengthToParse = -1 == groupMinSizeIndex ? arg.length() - 2 : groupMinSizeIndex - 2;
                    this.groupTag = AsciiEncoding.parseLongAscii(arg, 2, lengthToParse);
                }
                if (-1 == groupMinSizeIndex) continue;
                this.groupMinSize = AsciiEncoding.parseIntAscii(arg, groupMinSizeIndex + 1, arg.length() - (groupMinSizeIndex + 1));
            }
        }
    }

    private void receiverAdded(long receiverId, int sessionId, int streamId, String channel, int receiverCount) {
    }

    private void receiverRemoved(long receiverId, int sessionId, int streamId, String channel, int receiverCount) {
    }

    private long lastSetupSenderLimit(long nowNs) {
        if (-1L != this.lastSetupSenderLimit) {
            if (this.timeOfLastSetupNs + this.receiverTimeoutNs - nowNs < 0L) {
                this.lastSetupSenderLimit = -1L;
            } else {
                return this.lastSetupSenderLimit;
            }
        }
        return Long.MAX_VALUE;
    }

    static final class Receiver {
        final int sessionId;
        final int streamId;
        final long receiverId;
        long lastPosition;
        long lastPositionPlusWindow;
        long timeOfLastStatusMessageNs;
        boolean eosFlagged;

        Receiver(long receiverId, int sessionId, int streamId, long lastPosition, long lastPositionPlusWindow, long timeNs) {
            this.receiverId = receiverId;
            this.sessionId = sessionId;
            this.streamId = streamId;
            this.lastPosition = lastPosition;
            this.lastPositionPlusWindow = lastPositionPlusWindow;
            this.timeOfLastStatusMessageNs = timeNs;
            this.eosFlagged = false;
        }
    }
}

