/*
 * Decompiled with CFR 0.152.
 */
package io.camunda.zeebe.engine.processing.deployment.distribute;

import io.camunda.zeebe.engine.processing.deployment.distribute.DeploymentDistributionCommandSender;
import io.camunda.zeebe.engine.state.immutable.DeploymentState;
import io.camunda.zeebe.protocol.impl.record.value.deployment.DeploymentRecord;
import io.camunda.zeebe.stream.api.ReadonlyStreamProcessorContext;
import io.camunda.zeebe.stream.api.StreamProcessorLifecycleAware;
import java.time.Duration;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.function.Predicate;
import org.agrona.DirectBuffer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class DeploymentRedistributor
implements StreamProcessorLifecycleAware {
    public static final Duration DEPLOYMENT_REDISTRIBUTION_INTERVAL = Duration.ofSeconds(10L);
    private static final Duration RETRY_MAX_BACKOFF_DURATION = Duration.ofMinutes(5L);
    private static final long MAX_RETRY_CYCLES = RETRY_MAX_BACKOFF_DURATION.dividedBy(DEPLOYMENT_REDISTRIBUTION_INTERVAL);
    private static final Logger LOG = LoggerFactory.getLogger(DeploymentRedistributor.class);
    private final DeploymentDistributionCommandSender deploymentDistributionCommandSender;
    private final DeploymentState deploymentState;
    private final Map<PendingDistribution, Long> retryCyclesPerDistribution = new HashMap<PendingDistribution, Long>();

    public DeploymentRedistributor(DeploymentDistributionCommandSender deploymentDistributionCommandSender, DeploymentState deploymentState) {
        this.deploymentDistributionCommandSender = deploymentDistributionCommandSender;
        this.deploymentState = deploymentState;
    }

    @Override
    public void onRecovered(ReadonlyStreamProcessorContext context) {
        if (context.getPartitionId() != 1) {
            return;
        }
        context.getScheduleService().runAtFixedRate(DEPLOYMENT_REDISTRIBUTION_INTERVAL, this::runRetryCycle);
    }

    private void runRetryCycle() {
        HashSet pendingDistributions = new HashSet();
        this.deploymentState.foreachPendingDeploymentDistribution((deploymentKey, partitionId, directBuffer) -> {
            PendingDistribution pending = new PendingDistribution(deploymentKey, partitionId);
            pendingDistributions.add(pending);
            this.retryDistribution(pending, directBuffer);
        });
        this.retryCyclesPerDistribution.keySet().removeIf(Predicate.not(pendingDistributions::contains));
    }

    private void retryDistribution(PendingDistribution pending, DirectBuffer copiedDeploymentBuffer) {
        if (!this.shouldRetryNow(pending)) {
            return;
        }
        LOG.info("Retrying to distribute deployment {} to partition {}", (Object)pending.deploymentKey, (Object)pending.partitionId);
        DeploymentRecord deploymentRecord = new DeploymentRecord();
        deploymentRecord.wrap(copiedDeploymentBuffer);
        this.deploymentDistributionCommandSender.distributeToPartition(pending.deploymentKey, pending.partitionId, deploymentRecord);
    }

    private boolean shouldRetryNow(PendingDistribution pendingDistribution) {
        long retryCycle = this.retryCyclesPerDistribution.compute(pendingDistribution, (k, retryCycles) -> retryCycles != null ? retryCycles + 1L : 0L);
        if (retryCycle >= MAX_RETRY_CYCLES) {
            return retryCycle % MAX_RETRY_CYCLES == 0L;
        }
        return Long.bitCount(retryCycle) == 1;
    }

    private record PendingDistribution(long deploymentKey, int partitionId) {
    }
}

