/*
 * Decompiled with CFR 0.152.
 */
package io.camunda.zeebe.engine.processing.distribution;

import io.camunda.zeebe.engine.state.immutable.DistributionState;
import io.camunda.zeebe.protocol.impl.record.value.distribution.CommandDistributionRecord;
import io.camunda.zeebe.stream.api.InterPartitionCommandSender;
import io.camunda.zeebe.stream.api.ReadonlyStreamProcessorContext;
import io.camunda.zeebe.stream.api.StreamProcessorLifecycleAware;
import java.time.Duration;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.function.Predicate;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public final class CommandRedistributor
implements StreamProcessorLifecycleAware {
    public static final Duration COMMAND_REDISTRIBUTION_INTERVAL = Duration.ofSeconds(10L);
    private static final Duration RETRY_MAX_BACKOFF_DURATION = Duration.ofMinutes(5L);
    private static final long MAX_RETRY_CYCLES = RETRY_MAX_BACKOFF_DURATION.dividedBy(COMMAND_REDISTRIBUTION_INTERVAL);
    private static final Logger LOG = LoggerFactory.getLogger(CommandRedistributor.class);
    private final DistributionState distributionState;
    private final InterPartitionCommandSender commandSender;
    private final Map<PendingDistribution, Long> retryCyclesPerDistribution = new HashMap<PendingDistribution, Long>();

    public CommandRedistributor(DistributionState distributionState, InterPartitionCommandSender commandSender) {
        this.distributionState = distributionState;
        this.commandSender = commandSender;
    }

    @Override
    public void onRecovered(ReadonlyStreamProcessorContext context) {
        context.getScheduleService().runAtFixedRate(COMMAND_REDISTRIBUTION_INTERVAL, this::runRetryCycle);
    }

    private void runRetryCycle() {
        HashSet pendingDistributions = new HashSet();
        this.distributionState.foreachPendingDistribution((distributionKey, record) -> {
            PendingDistribution pending = new PendingDistribution(distributionKey, record.getPartitionId());
            pendingDistributions.add(pending);
            this.retryDistribution(pending, record);
        });
        this.retryCyclesPerDistribution.keySet().removeIf(Predicate.not(pendingDistributions::contains));
    }

    private void retryDistribution(PendingDistribution pending, CommandDistributionRecord commandDistributionRecord) {
        if (!this.shouldRetryNow(pending)) {
            return;
        }
        LOG.info("Retrying to distribute pending command {} to partition {}", (Object)pending.distributionKey, (Object)pending.partitionId);
        this.commandSender.sendCommand(pending.partitionId, commandDistributionRecord.getValueType(), commandDistributionRecord.getIntent(), pending.distributionKey, commandDistributionRecord.getCommandValue());
    }

    private boolean shouldRetryNow(PendingDistribution pendingDistribution) {
        long retryCycle = this.retryCyclesPerDistribution.compute(pendingDistribution, (k, retryCycles) -> retryCycles != null ? retryCycles + 1L : 0L);
        if (retryCycle >= MAX_RETRY_CYCLES) {
            return retryCycle % MAX_RETRY_CYCLES == 0L;
        }
        return Long.bitCount(retryCycle) == 1;
    }

    private record PendingDistribution(long distributionKey, int partitionId) {
    }
}

