/*
 * Decompiled with CFR 0.152.
 */
package io.camunda.zeebe.engine.processing.deployment.distribute;

import io.camunda.zeebe.engine.processing.deployment.distribute.DeploymentDistributor;
import io.camunda.zeebe.engine.processing.streamprocessor.writers.StateWriter;
import io.camunda.zeebe.engine.processing.streamprocessor.writers.TypedCommandWriter;
import io.camunda.zeebe.engine.processing.streamprocessor.writers.Writers;
import io.camunda.zeebe.protocol.impl.record.value.deployment.DeploymentDistributionRecord;
import io.camunda.zeebe.protocol.impl.record.value.deployment.DeploymentRecord;
import io.camunda.zeebe.protocol.record.RecordValue;
import io.camunda.zeebe.protocol.record.intent.DeploymentDistributionIntent;
import io.camunda.zeebe.protocol.record.intent.DeploymentIntent;
import io.camunda.zeebe.protocol.record.intent.Intent;
import io.camunda.zeebe.util.buffer.BufferUtil;
import io.camunda.zeebe.util.buffer.BufferWriter;
import io.camunda.zeebe.util.sched.ActorControl;
import io.camunda.zeebe.util.sched.future.ActorFuture;
import java.time.Duration;
import java.util.List;
import java.util.function.BiConsumer;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.agrona.DirectBuffer;

public final class DeploymentDistributionBehavior {
    private final DeploymentDistributionRecord deploymentDistributionRecord = new DeploymentDistributionRecord();
    private final DeploymentRecord emptyDeploymentRecord = new DeploymentRecord();
    private final List<Integer> otherPartitions;
    private final DeploymentDistributor deploymentDistributor;
    private final ActorControl processingActor;
    private final StateWriter stateWriter;
    private final TypedCommandWriter commandWriter;

    public DeploymentDistributionBehavior(Writers writers, int partitionsCount, DeploymentDistributor deploymentDistributor, ActorControl processingActor) {
        this.otherPartitions = IntStream.range(1, 1 + partitionsCount).filter(partition -> partition != 1).boxed().collect(Collectors.toList());
        this.deploymentDistributor = deploymentDistributor;
        this.processingActor = processingActor;
        this.stateWriter = writers.state();
        this.commandWriter = writers.command();
    }

    public void distributeDeployment(DeploymentRecord deploymentEvent, long key) {
        DirectBuffer copiedDeploymentBuffer = BufferUtil.createCopy((BufferWriter)deploymentEvent);
        this.otherPartitions.forEach(partitionId -> {
            this.deploymentDistributionRecord.setPartition(partitionId.intValue());
            this.stateWriter.appendFollowUpEvent(key, (Intent)DeploymentDistributionIntent.DISTRIBUTING, (RecordValue)this.deploymentDistributionRecord);
            this.distributeDeploymentToPartition((int)partitionId, key, copiedDeploymentBuffer);
        });
        if (this.otherPartitions.isEmpty()) {
            this.stateWriter.appendFollowUpEvent(key, (Intent)DeploymentIntent.FULLY_DISTRIBUTED, (RecordValue)this.emptyDeploymentRecord);
        }
    }

    public void distributeDeploymentToPartition(int partitionId, long key, DirectBuffer copiedDeploymentBuffer) {
        ActorFuture<Void> deploymentPushedFuture = this.deploymentDistributor.pushDeploymentToPartition(key, partitionId, copiedDeploymentBuffer);
        deploymentPushedFuture.onComplete((BiConsumer)new WriteDeploymentDistributionCompleteTask(partitionId, key));
    }

    private final class WriteDeploymentDistributionCompleteTask
    implements Runnable,
    BiConsumer<Void, Throwable> {
        private final int partitionId;
        private final long key;

        private WriteDeploymentDistributionCompleteTask(int partitionId, long key) {
            this.partitionId = partitionId;
            this.key = key;
        }

        @Override
        public void run() {
            DeploymentDistributionBehavior.this.deploymentDistributionRecord.setPartition(this.partitionId);
            DeploymentDistributionBehavior.this.commandWriter.reset();
            DeploymentDistributionBehavior.this.commandWriter.appendFollowUpCommand(this.key, (Intent)DeploymentDistributionIntent.COMPLETE, (RecordValue)DeploymentDistributionBehavior.this.deploymentDistributionRecord);
            long pos = DeploymentDistributionBehavior.this.commandWriter.flush();
            if (pos < 0L) {
                DeploymentDistributionBehavior.this.processingActor.runDelayed(Duration.ofMillis(100L), (Runnable)this);
            }
        }

        @Override
        public void accept(Void unused, Throwable throwable) {
            this.run();
        }
    }
}

