/*
 * Decompiled with CFR 0.152.
 */
package io.camunda.zeebe.engine.processing.common;

import io.camunda.zeebe.engine.processing.streamprocessor.writers.SideEffectWriter;
import io.camunda.zeebe.engine.processing.streamprocessor.writers.StateWriter;
import io.camunda.zeebe.engine.processing.streamprocessor.writers.Writers;
import io.camunda.zeebe.protocol.Protocol;
import io.camunda.zeebe.protocol.impl.record.UnifiedRecordValue;
import io.camunda.zeebe.protocol.impl.record.value.distribution.CommandDistributionRecord;
import io.camunda.zeebe.protocol.record.ValueType;
import io.camunda.zeebe.protocol.record.intent.CommandDistributionIntent;
import io.camunda.zeebe.stream.api.InterPartitionCommandSender;
import io.camunda.zeebe.stream.api.records.TypedRecord;
import java.util.List;
import java.util.stream.IntStream;

public final class CommandDistributionBehavior {
    private final StateWriter stateWriter;
    private final SideEffectWriter sideEffectWriter;
    private final List<Integer> otherPartitions;
    private final InterPartitionCommandSender interPartitionCommandSender;
    private final int currentPartitionId;

    public CommandDistributionBehavior(Writers writers, int currentPartition, int partitionsCount, InterPartitionCommandSender partitionCommandSender) {
        this.stateWriter = writers.state();
        this.sideEffectWriter = writers.sideEffect();
        this.interPartitionCommandSender = partitionCommandSender;
        this.otherPartitions = IntStream.range(1, 1 + partitionsCount).filter(partition -> partition != currentPartition).boxed().toList();
        this.currentPartitionId = currentPartition;
    }

    public <T extends UnifiedRecordValue> void distributeCommand(long distributionKey, TypedRecord<T> command) {
        if (this.otherPartitions.isEmpty()) {
            return;
        }
        CommandDistributionRecord distributionRecord = new CommandDistributionRecord().setPartitionId(this.currentPartitionId).setValueType(command.getValueType()).setIntent(command.getIntent()).setCommandValue((UnifiedRecordValue)command.getValue());
        this.stateWriter.appendFollowUpEvent(distributionKey, CommandDistributionIntent.STARTED, distributionRecord);
        this.otherPartitions.forEach(partition -> this.distributeToPartition(command, (int)partition, distributionRecord, distributionKey));
    }

    private <T extends UnifiedRecordValue> void distributeToPartition(TypedRecord<T> command, int partition, CommandDistributionRecord distributionRecord, long distributionKey) {
        ValueType valueType = distributionRecord.getValueType();
        UnifiedRecordValue commandValue = distributionRecord.getCommandValue();
        this.stateWriter.appendFollowUpEvent(distributionKey, CommandDistributionIntent.DISTRIBUTING, new CommandDistributionRecord().setPartitionId(partition).setValueType(valueType).setIntent(command.getIntent()));
        this.sideEffectWriter.appendSideEffect(() -> {
            this.interPartitionCommandSender.sendCommand(partition, command.getValueType(), command.getIntent(), distributionKey, commandValue);
            return true;
        });
    }

    public <T extends UnifiedRecordValue> void acknowledgeCommand(long distributionKey, TypedRecord<T> command) {
        CommandDistributionRecord distributionRecord = new CommandDistributionRecord().setPartitionId(this.currentPartitionId).setValueType(command.getValueType()).setIntent(command.getIntent());
        int receiverPartitionId = Protocol.decodePartitionId(command.getKey());
        this.sideEffectWriter.appendSideEffect(() -> {
            this.interPartitionCommandSender.sendCommand(receiverPartitionId, ValueType.COMMAND_DISTRIBUTION, CommandDistributionIntent.ACKNOWLEDGE, distributionKey, distributionRecord);
            return true;
        });
    }
}

