/*
 * Decompiled with CFR 0.152.
 */
package io.camunda.zeebe.dispatcher;

import io.camunda.zeebe.dispatcher.AtomicPosition;
import io.camunda.zeebe.dispatcher.ClaimedFragment;
import io.camunda.zeebe.dispatcher.ClaimedFragmentBatch;
import io.camunda.zeebe.dispatcher.Loggers;
import io.camunda.zeebe.dispatcher.Subscription;
import io.camunda.zeebe.dispatcher.impl.PositionUtil;
import io.camunda.zeebe.dispatcher.impl.log.LogBuffer;
import io.camunda.zeebe.dispatcher.impl.log.LogBufferAppender;
import io.camunda.zeebe.dispatcher.impl.log.LogBufferPartition;
import io.camunda.zeebe.scheduler.Actor;
import io.camunda.zeebe.scheduler.ActorCondition;
import io.camunda.zeebe.scheduler.FutureUtil;
import io.camunda.zeebe.scheduler.future.ActorFuture;
import java.util.Arrays;
import java.util.function.BiFunction;
import org.slf4j.Logger;

public class Dispatcher
extends Actor {
    private static final Logger LOG = Loggers.DISPATCHER_LOGGER;
    private static final String ERROR_MESSAGE_CLAIM_FAILED = "Expected to claim segment of size %d, but can't claim more than %d bytes.";
    private final LogBuffer logBuffer;
    private final LogBufferAppender logAppender;
    private final AtomicPosition publisherLimit;
    private final AtomicPosition publisherPosition;
    private long recordPosition;
    private final String[] defaultSubscriptionNames;
    private final int maxFragmentLength;
    private final String name;
    private final int logWindowLength;
    private Subscription[] subscriptions;
    private final Runnable onClaimComplete = this::signalSubscriptions;
    private volatile boolean isClosed = false;
    private final Runnable backgroundTask = this::runBackgroundTask;
    private ActorCondition dataConsumed;

    Dispatcher(LogBuffer logBuffer, LogBufferAppender logAppender, AtomicPosition publisherLimit, AtomicPosition publisherPosition, long initialPosition, int logWindowLength, int maxFragmentLength, String[] subscriptionNames, String name) {
        this.logBuffer = logBuffer;
        this.logAppender = logAppender;
        this.publisherLimit = publisherLimit;
        this.publisherPosition = publisherPosition;
        this.recordPosition = initialPosition;
        this.name = name;
        this.logWindowLength = logWindowLength;
        this.maxFragmentLength = maxFragmentLength;
        this.subscriptions = new Subscription[0];
        this.defaultSubscriptionNames = subscriptionNames;
    }

    @Override
    public String getName() {
        return this.name;
    }

    @Override
    protected void onActorStarted() {
        this.dataConsumed = this.actor.onCondition("data-consumed", this.backgroundTask);
        this.openDefaultSubscriptions();
    }

    @Override
    protected void onActorClosing() {
        Subscription[] subscriptionsCopy;
        this.publisherLimit.reset();
        this.publisherPosition.reset();
        for (Subscription subscription : subscriptionsCopy = Arrays.copyOf(this.subscriptions, this.subscriptions.length)) {
            this.doCloseSubscription(subscription);
        }
        this.logBuffer.close();
        this.isClosed = true;
        LOG.debug("Dispatcher closed");
    }

    private void runBackgroundTask() {
        this.updatePublisherLimit();
        this.logBuffer.cleanPartitions();
    }

    private void openDefaultSubscriptions() {
        int subscriptionSize = this.defaultSubscriptionNames == null ? 0 : this.defaultSubscriptionNames.length;
        for (int i = 0; i < subscriptionSize; ++i) {
            this.doOpenSubscription(this.defaultSubscriptionNames[i], this.dataConsumed);
        }
    }

    private void signalSubscriptions() {
        Subscription[] subscriptions = this.subscriptions;
        for (int i = 0; i < subscriptions.length; ++i) {
            subscriptions[i].getActorConditions().signalConsumers();
        }
    }

    public long claimSingleFragment(ClaimedFragment claim, int length) {
        return this.claimSingleFragment(claim, length, 0);
    }

    public long claimSingleFragment(ClaimedFragment claim, int length, int streamId) {
        return this.offer((partition, activePartitionId) -> this.logAppender.claim((LogBufferPartition)partition, (int)activePartitionId, claim, length, streamId, this.onClaimComplete), 1, LogBufferAppender.claimedFragmentLength(length));
    }

    public long claimFragmentBatch(ClaimedFragmentBatch batch, int fragmentCount, int batchLength) {
        return this.offer((partition, activePartitionId) -> this.logAppender.claim((LogBufferPartition)partition, (int)activePartitionId, batch, fragmentCount, batchLength, this.onClaimComplete), fragmentCount, LogBufferAppender.claimedBatchLength(fragmentCount, batchLength));
    }

    public boolean canClaimFragmentBatch(int fragmentCount, int batchLength) {
        int framedLength = LogBufferAppender.claimedBatchLength(fragmentCount, batchLength);
        return framedLength < this.maxFragmentLength;
    }

    private synchronized long offer(BiFunction<LogBufferPartition, Integer, Integer> claimer, int fragmentCount, int length) {
        long newPosition = -1L;
        if (!this.isClosed) {
            LogBufferPartition partition;
            int partitionOffset;
            long limit = this.publisherLimit.get();
            int activePartitionId = this.logBuffer.getActivePartitionIdVolatile();
            long position = PositionUtil.position(activePartitionId, partitionOffset = (partition = this.logBuffer.getPartition(activePartitionId)).getTailCounterVolatile());
            if (position < limit) {
                if (length >= this.maxFragmentLength) {
                    throw new IllegalArgumentException(String.format(ERROR_MESSAGE_CLAIM_FAILED, length, this.maxFragmentLength));
                }
                int newOffset = claimer.apply(partition, activePartitionId);
                newPosition = this.updatePublisherPosition(activePartitionId, newOffset);
                if (newPosition > 0L) {
                    newPosition = this.recordPosition;
                    this.recordPosition += (long)fragmentCount;
                }
                this.signalSubscriptions();
            }
        }
        return newPosition;
    }

    private long updatePublisherPosition(int activePartitionId, int newOffset) {
        long newPosition = -1L;
        if (newOffset > 0) {
            newPosition = PositionUtil.position(activePartitionId, newOffset);
        } else if (newOffset == -2) {
            this.logBuffer.onActivePartitionFilled(activePartitionId);
            newPosition = -2L;
        }
        if (this.publisherPosition.proposeMaxOrdered(newPosition)) {
            LOG.trace("Updated publisher position to {}", (Object)newPosition);
        }
        return newPosition;
    }

    public int updatePublisherLimit() {
        int isUpdated = 0;
        if (!this.isClosed) {
            long proposedPublisherLimit;
            long lastSubscriberPosition;
            if (this.subscriptions.length > 0) {
                lastSubscriberPosition = this.subscriptions[this.subscriptions.length - 1].getPosition();
                if (this.subscriptions.length > 1) {
                    for (int i = 0; i < this.subscriptions.length - 1; ++i) {
                        lastSubscriberPosition = Math.min(lastSubscriberPosition, this.subscriptions[i].getPosition());
                    }
                }
            } else {
                lastSubscriberPosition = Math.max(0L, this.publisherLimit.get() - (long)this.logWindowLength);
            }
            int partitionId = PositionUtil.partitionId(lastSubscriberPosition);
            int partitionOffset = PositionUtil.partitionOffset(lastSubscriberPosition) + this.logWindowLength;
            if (partitionOffset >= this.logBuffer.getPartitionSize()) {
                ++partitionId;
                partitionOffset = this.logWindowLength;
            }
            if (this.publisherLimit.proposeMaxOrdered(proposedPublisherLimit = PositionUtil.position(partitionId, partitionOffset))) {
                LOG.trace("Updated publisher limit to {}", (Object)proposedPublisherLimit);
                isUpdated = 1;
            }
        }
        return isUpdated;
    }

    public Subscription openSubscription(String subscriptionName) {
        return FutureUtil.join(this.openSubscriptionAsync(subscriptionName));
    }

    public ActorFuture<Subscription> openSubscriptionAsync(String subscriptionName) {
        return this.actor.call(() -> this.doOpenSubscription(subscriptionName, this.dataConsumed));
    }

    protected Subscription doOpenSubscription(String subscriptionName, ActorCondition onConsumption) {
        Subscription subscription;
        this.ensureUniqueSubscriptionName(subscriptionName);
        LOG.trace("Open subscription with name '{}'", (Object)subscriptionName);
        Subscription[] newSubscriptions = new Subscription[this.subscriptions.length + 1];
        System.arraycopy(this.subscriptions, 0, newSubscriptions, 0, this.subscriptions.length);
        int subscriberId = newSubscriptions.length - 1;
        newSubscriptions[subscriberId] = subscription = this.newSubscription(subscriberId, subscriptionName, onConsumption);
        this.subscriptions = newSubscriptions;
        onConsumption.signal();
        return subscription;
    }

    private void ensureUniqueSubscriptionName(String subscriptionName) {
        if (this.findSubscriptionByName(subscriptionName) != null) {
            throw new IllegalStateException("subscription with name '" + subscriptionName + "' already exists");
        }
    }

    protected Subscription newSubscription(int subscriptionId, String subscriptionName, ActorCondition onConsumption) {
        AtomicPosition position = new AtomicPosition();
        position.set(PositionUtil.position(this.logBuffer.getActivePartitionIdVolatile(), 0));
        AtomicPosition limit = this.determineLimit();
        return new Subscription(position, limit, subscriptionId, subscriptionName, onConsumption, this.logBuffer);
    }

    protected AtomicPosition determineLimit() {
        return this.publisherPosition;
    }

    private void doCloseSubscription(Subscription subscriptionToClose) {
        Subscription[] newSubscriptions;
        int numMoved;
        if (this.isClosed) {
            return;
        }
        subscriptionToClose.isClosed = true;
        subscriptionToClose.position.reset();
        int len = this.subscriptions.length;
        int index = 0;
        for (int i = 0; i < len; ++i) {
            if (subscriptionToClose != this.subscriptions[i]) continue;
            index = i;
            break;
        }
        if ((numMoved = len - index - 1) == 0) {
            newSubscriptions = Arrays.copyOf(this.subscriptions, len - 1);
        } else {
            newSubscriptions = new Subscription[len - 1];
            System.arraycopy(this.subscriptions, 0, newSubscriptions, 0, index);
            System.arraycopy(this.subscriptions, index + 1, newSubscriptions, index, numMoved);
        }
        this.subscriptions = newSubscriptions;
        this.dataConsumed.signal();
    }

    private Subscription findSubscriptionByName(String subscriptionName) {
        Subscription subscription = null;
        if (!this.isClosed) {
            for (int i = 0; i < this.subscriptions.length; ++i) {
                if (!this.subscriptions[i].getName().equals(subscriptionName)) continue;
                subscription = this.subscriptions[i];
                break;
            }
        }
        return subscription;
    }

    public boolean isClosed() {
        return this.isClosed;
    }

    public LogBuffer getLogBuffer() {
        return this.logBuffer;
    }

    public int getMaxFragmentLength() {
        return this.maxFragmentLength;
    }

    public long getPublisherPosition() {
        if (this.isClosed) {
            return -1L;
        }
        return this.publisherPosition.get();
    }

    public String toString() {
        return "Dispatcher [" + this.name + "]";
    }
}

