/*
 * Decompiled with CFR 0.152.
 */
package io.aeron.cluster;

import io.aeron.Counter;
import io.aeron.cluster.LogPublisher;
import io.aeron.cluster.client.ClusterException;
import io.aeron.cluster.codecs.SessionMessageHeaderDecoder;
import io.aeron.cluster.codecs.SessionMessageHeaderEncoder;
import io.aeron.cluster.service.ClusterClock;
import io.aeron.exceptions.AeronException;
import org.agrona.DirectBuffer;
import org.agrona.ExpandableRingBuffer;
import org.agrona.MutableDirectBuffer;
import org.agrona.collections.MutableInteger;

final class PendingServiceMessageTracker {
    private static final int SERVICE_MESSAGE_LIMIT = 20;
    private final int serviceId;
    private int pendingMessageHeadOffset = 0;
    private int uncommittedMessages = 0;
    private long nextServiceSessionId;
    private long logServiceSessionId;
    private long leadershipTermId = -1L;
    private final Counter commitPosition;
    private final LogPublisher logPublisher;
    private final ClusterClock clusterClock;
    private final ExpandableRingBuffer pendingMessages = new ExpandableRingBuffer();
    private final ExpandableRingBuffer.MessageConsumer messageAppender = this::messageAppender;
    private final ExpandableRingBuffer.MessageConsumer leaderMessageSweeper = this::leaderMessageSweeper;
    private final ExpandableRingBuffer.MessageConsumer followerMessageSweeper = this::followerMessageSweeper;

    PendingServiceMessageTracker(int serviceId, Counter commitPosition, LogPublisher logPublisher, ClusterClock clusterClock) {
        this.serviceId = serviceId;
        this.commitPosition = commitPosition;
        this.logPublisher = logPublisher;
        this.clusterClock = clusterClock;
        this.logServiceSessionId = PendingServiceMessageTracker.serviceSessionId(serviceId, Long.MIN_VALUE);
        this.nextServiceSessionId = this.logServiceSessionId + 1L;
    }

    void leadershipTermId(long leadershipTermId) {
        this.leadershipTermId = leadershipTermId;
    }

    int serviceId() {
        return this.serviceId;
    }

    long nextServiceSessionId() {
        return this.nextServiceSessionId;
    }

    long logServiceSessionId() {
        return this.logServiceSessionId;
    }

    void enqueueMessage(MutableDirectBuffer buffer, int offset, int length) {
        long clusterSessionId;
        if ((clusterSessionId = this.nextServiceSessionId++) > this.logServiceSessionId) {
            int headerOffset = offset - 24;
            int clusterSessionIdOffset = headerOffset + SessionMessageHeaderDecoder.clusterSessionIdEncodingOffset();
            int timestampOffset = headerOffset + SessionMessageHeaderDecoder.timestampEncodingOffset();
            buffer.putLong(clusterSessionIdOffset, clusterSessionId, SessionMessageHeaderDecoder.BYTE_ORDER);
            buffer.putLong(timestampOffset, Long.MAX_VALUE, SessionMessageHeaderDecoder.BYTE_ORDER);
            if (!this.pendingMessages.append((DirectBuffer)buffer, offset - 32, length + 32)) {
                throw new ClusterException("pending service message buffer at capacity=" + this.pendingMessages.size() + " for serviceId=" + this.serviceId);
            }
        }
    }

    void sweepFollowerMessages(long clusterSessionId) {
        this.logServiceSessionId = clusterSessionId;
        this.pendingMessages.consume(this.followerMessageSweeper, Integer.MAX_VALUE);
    }

    void sweepLeaderMessages() {
        if (this.uncommittedMessages > 0) {
            this.pendingMessageHeadOffset -= this.pendingMessages.consume(this.leaderMessageSweeper, Integer.MAX_VALUE);
            this.pendingMessageHeadOffset = Math.max(this.pendingMessageHeadOffset, 0);
        }
    }

    void restoreUncommittedMessages() {
        if (this.uncommittedMessages > 0) {
            this.pendingMessages.consume(this.leaderMessageSweeper, Integer.MAX_VALUE);
            this.pendingMessages.forEach(PendingServiceMessageTracker::messageReset, Integer.MAX_VALUE);
            this.uncommittedMessages = 0;
            this.pendingMessageHeadOffset = 0;
        }
    }

    void appendMessage(DirectBuffer buffer, int offset, int length) {
        this.pendingMessages.append(buffer, offset, length);
    }

    void loadState(long nextServiceSessionId, long logServiceSessionId, int pendingMessageCapacity) {
        this.nextServiceSessionId = nextServiceSessionId;
        this.logServiceSessionId = logServiceSessionId;
        this.pendingMessages.reset(pendingMessageCapacity);
    }

    int poll() {
        return this.pendingMessages.forEach(this.pendingMessageHeadOffset, this.messageAppender, 20);
    }

    int size() {
        return this.pendingMessages.size();
    }

    void verify() {
        MutableInteger messageCount = new MutableInteger();
        ExpandableRingBuffer.MessageConsumer messageConsumer = (buffer, offset, length, headOffset) -> {
            messageCount.increment();
            int headerOffset = offset + 8;
            int clusterSessionIdOffset = headerOffset + SessionMessageHeaderDecoder.clusterSessionIdEncodingOffset();
            long clusterSessionId = buffer.getLong(clusterSessionIdOffset, SessionMessageHeaderDecoder.BYTE_ORDER);
            if (clusterSessionId != this.logServiceSessionId + (long)messageCount.get()) {
                throw new ClusterException("snapshot has incorrect pending message: serviceId=" + this.serviceId + " nextServiceSessionId=" + this.nextServiceSessionId + " logServiceSessionId=" + this.logServiceSessionId + " clusterSessionId=" + clusterSessionId + " pendingMessageIndex=" + messageCount.get(), AeronException.Category.FATAL);
            }
            return true;
        };
        this.pendingMessages.forEach(messageConsumer, Integer.MAX_VALUE);
        if (this.nextServiceSessionId != this.logServiceSessionId + (long)messageCount.get() + 1L) {
            throw new ClusterException("snapshot has incorrect pending message state: serviceId=" + this.serviceId + " nextServiceSessionId=" + this.nextServiceSessionId + " logServiceSessionId=" + this.logServiceSessionId + " pendingMessageCount=" + messageCount.get(), AeronException.Category.FATAL);
        }
    }

    void reset() {
        this.pendingMessages.forEach(PendingServiceMessageTracker::messageReset, Integer.MAX_VALUE);
    }

    ExpandableRingBuffer pendingMessages() {
        return this.pendingMessages;
    }

    private boolean messageAppender(MutableDirectBuffer buffer, int offset, int length, int headOffset) {
        int headerOffset = offset + 8;
        int clusterSessionIdOffset = headerOffset + SessionMessageHeaderDecoder.clusterSessionIdEncodingOffset();
        int timestampOffset = headerOffset + SessionMessageHeaderDecoder.timestampEncodingOffset();
        long clusterSessionId = buffer.getLong(clusterSessionIdOffset, SessionMessageHeaderDecoder.BYTE_ORDER);
        long appendPosition = this.logPublisher.appendMessage(this.leadershipTermId, clusterSessionId, this.clusterClock.time(), (DirectBuffer)buffer, offset + 32, length - 32);
        if (appendPosition > 0L) {
            ++this.uncommittedMessages;
            this.pendingMessageHeadOffset = headOffset;
            buffer.putLong(timestampOffset, appendPosition, SessionMessageHeaderEncoder.BYTE_ORDER);
            return true;
        }
        return false;
    }

    private static boolean messageReset(MutableDirectBuffer buffer, int offset, int length, int headOffset) {
        int timestampOffset = offset + 8 + SessionMessageHeaderDecoder.timestampEncodingOffset();
        long appendPosition = buffer.getLong(timestampOffset, SessionMessageHeaderDecoder.BYTE_ORDER);
        if (appendPosition < Long.MAX_VALUE) {
            buffer.putLong(timestampOffset, Long.MAX_VALUE, SessionMessageHeaderEncoder.BYTE_ORDER);
            return true;
        }
        return false;
    }

    private boolean leaderMessageSweeper(MutableDirectBuffer buffer, int offset, int length, int headOffset) {
        int headerOffset = offset + 8;
        int clusterSessionIdOffset = headerOffset + SessionMessageHeaderDecoder.clusterSessionIdEncodingOffset();
        int timestampOffset = headerOffset + SessionMessageHeaderDecoder.timestampEncodingOffset();
        long appendPosition = buffer.getLong(timestampOffset, SessionMessageHeaderDecoder.BYTE_ORDER);
        if (this.commitPosition.getWeak() >= appendPosition) {
            this.logServiceSessionId = buffer.getLong(clusterSessionIdOffset, SessionMessageHeaderDecoder.BYTE_ORDER);
            --this.uncommittedMessages;
            return true;
        }
        return false;
    }

    private boolean followerMessageSweeper(MutableDirectBuffer buffer, int offset, int length, int headOffset) {
        int clusterSessionIdOffset = offset + 8 + SessionMessageHeaderDecoder.clusterSessionIdEncodingOffset();
        return buffer.getLong(clusterSessionIdOffset, SessionMessageHeaderDecoder.BYTE_ORDER) <= this.logServiceSessionId;
    }

    static int serviceId(long clusterSessionId) {
        return (int)(clusterSessionId >>> 56) & 0x7F;
    }

    static long serviceSessionId(int serviceId, long sessionId) {
        return (long)serviceId << 56 | sessionId;
    }
}

