/*
 * Decompiled with CFR 0.152.
 */
package io.camunda.zeebe.broker.client.impl;

import io.camunda.zeebe.broker.client.api.BrokerClusterState;
import io.camunda.zeebe.broker.client.api.BrokerTopologyManager;
import io.camunda.zeebe.broker.client.api.RequestDispatchStrategy;
import io.camunda.zeebe.dynamic.config.state.RoutingState;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;

public final class RoundRobinDispatchStrategy
implements RequestDispatchStrategy {
    private final AtomicReference<VersionedPartitionRing> partitionRing = new AtomicReference<VersionedPartitionRing>(VersionedPartitionRing.uninitialized());
    private final AtomicInteger offset = new AtomicInteger(0);

    @Override
    public int determinePartition(BrokerTopologyManager topologyManager) {
        BrokerClusterState topology = topologyManager.getTopology();
        if (topology == null || !topology.isInitialized()) {
            return -3;
        }
        PartitionRing partitions = this.updatePartitionRing(topologyManager);
        for (int i = 0; i < topology.getPartitionsCount(); ++i) {
            int partition = partitions.partitionAtOffset(this.offset.getAndIncrement());
            if (topology.getLeaderForPartition(partition) == -2) continue;
            return partition;
        }
        return -3;
    }

    private PartitionRing updatePartitionRing(BrokerTopologyManager topologyManager) {
        Optional routingState = topologyManager.getClusterConfiguration().routingState();
        long expectedVersion = routingState.map(RoutingState::version).orElse(-1L);
        VersionedPartitionRing currentValue = this.partitionRing.get();
        if (currentValue.version() >= expectedVersion) {
            return currentValue.partitions();
        }
        PartitionRing newPartitionRing = routingState.map(RoutingState::activePartitions).map(PartitionRing::of).orElseGet(() -> PartitionRing.all(topologyManager.getTopology().getPartitionsCount()));
        VersionedPartitionRing newValue = new VersionedPartitionRing(expectedVersion, newPartitionRing);
        while (currentValue.version() < expectedVersion) {
            currentValue = this.partitionRing.compareAndExchange(currentValue, newValue);
        }
        return newPartitionRing;
    }

    record VersionedPartitionRing(long version, PartitionRing partitions) {
        static final long NOT_INITIALIZED = -2L;
        static final long NO_ROUTING_STATE = -1L;

        private static VersionedPartitionRing uninitialized() {
            return new VersionedPartitionRing(-2L, null);
        }
    }

    private record PartitionRing(int[] partitions) {
        PartitionRing {
            if (partitions.length == 0) {
                throw new IllegalArgumentException("Partitions must not be empty");
            }
        }

        static PartitionRing all(int partitionCount) {
            int[] partitions = new int[partitionCount];
            for (int i = 0; i < partitionCount; ++i) {
                partitions[i] = i + 1;
            }
            return new PartitionRing(partitions);
        }

        static PartitionRing of(Set<Integer> partitions) {
            int[] sorted = partitions.stream().sorted().mapToInt(Integer::intValue).toArray();
            return new PartitionRing(sorted);
        }

        public int partitionAtOffset(int offset) {
            return this.partitions[offset % this.partitions.length];
        }
    }
}

