/*
 * Decompiled with CFR 0.152.
 */
package io.camunda.zeebe.dispatcher;

import io.camunda.zeebe.dispatcher.AtomicPosition;
import io.camunda.zeebe.dispatcher.BlockPeek;
import io.camunda.zeebe.dispatcher.FragmentHandler;
import io.camunda.zeebe.dispatcher.Loggers;
import io.camunda.zeebe.dispatcher.impl.PositionUtil;
import io.camunda.zeebe.dispatcher.impl.log.DataFrameDescriptor;
import io.camunda.zeebe.dispatcher.impl.log.LogBuffer;
import io.camunda.zeebe.dispatcher.impl.log.LogBufferPartition;
import io.camunda.zeebe.scheduler.ActorCondition;
import io.camunda.zeebe.scheduler.channel.ActorConditions;
import io.camunda.zeebe.scheduler.channel.ConsumableChannel;
import java.nio.ByteBuffer;
import org.agrona.DirectBuffer;
import org.agrona.concurrent.UnsafeBuffer;
import org.slf4j.Logger;

public class Subscription
implements ConsumableChannel {
    public static final Logger LOG = Loggers.DISPATCHER_LOGGER;
    protected final ActorConditions actorConditions = new ActorConditions();
    protected final AtomicPosition limit;
    protected final AtomicPosition position;
    protected final LogBuffer logBuffer;
    protected final int id;
    protected final String name;
    protected final ActorCondition dataConsumed;
    protected final ByteBuffer rawDispatcherBufferView;
    protected volatile boolean isClosed = false;

    public Subscription(AtomicPosition position, AtomicPosition limit, int id, String name, ActorCondition onConsumption, LogBuffer logBuffer) {
        this.position = position;
        this.id = id;
        this.name = name;
        this.limit = limit;
        this.logBuffer = logBuffer;
        this.dataConsumed = onConsumption;
        this.rawDispatcherBufferView = logBuffer.createRawBufferView();
    }

    public long getPosition() {
        return this.position.get();
    }

    public boolean hasAvailable() {
        return this.getLimit() > this.getPosition();
    }

    public void registerConsumer(ActorCondition consumer) {
        this.actorConditions.registerConsumer(consumer);
    }

    public void removeConsumer(ActorCondition consumer) {
        this.actorConditions.removeConsumer(consumer);
    }

    protected long getLimit() {
        return this.limit.get();
    }

    public int poll(FragmentHandler frgHandler, int maxNumOfFragments) {
        int fragmentsRead = 0;
        if (!this.isClosed) {
            long currentPosition = this.position.get();
            long limit = this.getLimit();
            if (limit > currentPosition) {
                int partitionId = PositionUtil.partitionId(currentPosition);
                int partitionOffset = PositionUtil.partitionOffset(currentPosition);
                LogBufferPartition partition = this.logBuffer.getPartition(partitionId);
                fragmentsRead = this.pollFragments(partition, frgHandler, partitionId, partitionOffset, maxNumOfFragments, limit, false);
            }
        }
        return fragmentsRead;
    }

    protected int pollFragments(LogBufferPartition partition, FragmentHandler frgHandler, int partitionId, int fragmentOffset, int maxNumOfFragments, long limit, boolean handlerControlled) {
        int framedLength;
        UnsafeBuffer buffer = partition.getDataBuffer();
        int fragmentsConsumed = 0;
        int fragmentResult = 0;
        while ((framedLength = buffer.getIntVolatile(DataFrameDescriptor.lengthOffset(fragmentOffset))) > 0) {
            short type = buffer.getShort(DataFrameDescriptor.typeOffset(fragmentOffset));
            if (type == 1) {
                if ((fragmentOffset += DataFrameDescriptor.alignedLength(framedLength)) >= partition.getPartitionSize()) {
                    ++partitionId;
                    fragmentOffset = 0;
                    break;
                }
            } else {
                int streamId = buffer.getInt(DataFrameDescriptor.streamIdOffset(fragmentOffset));
                int flagsOffset = DataFrameDescriptor.flagsOffset(fragmentOffset);
                byte flags = buffer.getByte(flagsOffset);
                try {
                    boolean isMarkedAsFailed = DataFrameDescriptor.flagFailed(flags);
                    int messageLength = DataFrameDescriptor.messageLength(framedLength);
                    int handlerResult = frgHandler.onFragment((DirectBuffer)buffer, DataFrameDescriptor.messageOffset(fragmentOffset), messageLength, streamId, isMarkedAsFailed);
                    if (handlerResult == 2 && !isMarkedAsFailed) {
                        buffer.putByte(flagsOffset, DataFrameDescriptor.enableFlagFailed(flags));
                    }
                    if (handlerControlled) {
                        fragmentResult = handlerResult;
                    }
                }
                catch (RuntimeException e) {
                    LOG.error("Failed to handle fragment", (Throwable)e);
                }
                if (fragmentResult != 1) {
                    ++fragmentsConsumed;
                    fragmentOffset += DataFrameDescriptor.alignedLength(framedLength);
                }
            }
            if (fragmentResult != 1 && fragmentsConsumed < maxNumOfFragments && PositionUtil.position(partitionId, fragmentOffset) < limit) continue;
        }
        this.position.set(PositionUtil.position(partitionId, fragmentOffset));
        this.dataConsumed.signal();
        return fragmentsConsumed;
    }

    public int peekAndConsume(FragmentHandler frgHandler, int maxNumOfFragments) {
        int fragmentsRead = 0;
        if (!this.isClosed) {
            long currentPosition = this.position.get();
            long limit = this.getLimit();
            if (limit > currentPosition) {
                int partitionId = PositionUtil.partitionId(currentPosition);
                int partitionOffset = PositionUtil.partitionOffset(currentPosition);
                LogBufferPartition partition = this.logBuffer.getPartition(partitionId);
                fragmentsRead = this.pollFragments(partition, frgHandler, partitionId, partitionOffset, maxNumOfFragments, limit, true);
            }
        }
        return fragmentsRead;
    }

    public int peekBlock(BlockPeek availableBlock, int maxBlockSize, boolean isStreamAware) {
        int bytesAvailable = 0;
        if (!this.isClosed) {
            long currentPosition = this.position.get();
            long limit = this.getLimit();
            if (limit > currentPosition) {
                int partitionId = PositionUtil.partitionId(currentPosition);
                int partitionOffset = PositionUtil.partitionOffset(currentPosition);
                LogBufferPartition partition = this.logBuffer.getPartition(partitionId);
                bytesAvailable = this.peekBlock(partition, availableBlock, partitionId, partitionOffset, maxBlockSize, limit, isStreamAware);
            }
        }
        return bytesAvailable;
    }

    protected int peekBlock(LogBufferPartition partition, BlockPeek availableBlock, int partitionId, int partitionOffset, int maxBlockSize, long limit, boolean isStreamAware) {
        int blockLength;
        int framedLength;
        UnsafeBuffer buffer = partition.getDataBuffer();
        int bufferOffset = partition.getUnderlyingBufferOffset();
        int firstFragmentOffset = partitionOffset;
        int readBytes = 0;
        int initialStreamId = -1;
        boolean isReadingBatch = false;
        int offset = partitionOffset;
        int offsetLimit = PositionUtil.partitionOffset(limit);
        if (PositionUtil.partitionId(limit) > partitionId) {
            offsetLimit = partition.getPartitionSize();
        }
        while ((framedLength = buffer.getIntVolatile(DataFrameDescriptor.lengthOffset(partitionOffset))) > 0) {
            short type = buffer.getShort(DataFrameDescriptor.typeOffset(partitionOffset));
            if (type == 1) {
                if ((partitionOffset += DataFrameDescriptor.alignedLength(framedLength)) >= partition.getPartitionSize()) {
                    ++partitionId;
                    partitionOffset = 0;
                }
                offset = partitionOffset;
                if (readBytes != 0) break;
                this.position.proposeMaxOrdered(PositionUtil.position(partitionId, partitionOffset));
                this.dataConsumed.signal();
                break;
            }
            if (isStreamAware) {
                int streamId = buffer.getInt(DataFrameDescriptor.streamIdOffset(partitionOffset));
                if (readBytes == 0) {
                    initialStreamId = streamId;
                } else if (streamId != initialStreamId) break;
            }
            byte flags = buffer.getByte(DataFrameDescriptor.flagsOffset(partitionOffset));
            isReadingBatch = !isReadingBatch ? DataFrameDescriptor.flagBatchBegin(flags) : !DataFrameDescriptor.flagBatchEnd(flags);
            int alignedFrameLength = DataFrameDescriptor.alignedLength(framedLength);
            if (alignedFrameLength > maxBlockSize - readBytes) break;
            partitionOffset += alignedFrameLength;
            readBytes += alignedFrameLength;
            if (!isReadingBatch) {
                offset = partitionOffset;
            }
            if (maxBlockSize - readBytes > DataFrameDescriptor.HEADER_LENGTH && partitionOffset < offsetLimit) continue;
        }
        if ((blockLength = readBytes + offset - partitionOffset) > 0) {
            int absoluteOffset = bufferOffset + firstFragmentOffset;
            availableBlock.setBlock(this.rawDispatcherBufferView, this.position, this.dataConsumed, initialStreamId, absoluteOffset, blockLength, partitionId, offset);
        }
        return blockLength;
    }

    public int getId() {
        return this.id;
    }

    public String getName() {
        return this.name;
    }

    protected ActorConditions getActorConditions() {
        return this.actorConditions;
    }

    public String toString() {
        StringBuilder builder = new StringBuilder();
        builder.append("Subscription [id=");
        builder.append(this.id);
        builder.append(", name=");
        builder.append(this.name);
        builder.append("]");
        return builder.toString();
    }
}

