/*
 * Decompiled with CFR 0.152.
 */
package com.azure.messaging.eventhubs;

import com.azure.core.util.CoreUtils;
import com.azure.core.util.logging.ClientLogger;
import com.azure.core.util.logging.LogLevel;
import com.azure.messaging.eventhubs.CheckpointStore;
import com.azure.messaging.eventhubs.EventHubAsyncClient;
import com.azure.messaging.eventhubs.LoadBalancingStrategy;
import com.azure.messaging.eventhubs.Messages;
import com.azure.messaging.eventhubs.PartitionPumpManager;
import com.azure.messaging.eventhubs.models.Checkpoint;
import com.azure.messaging.eventhubs.models.ErrorContext;
import com.azure.messaging.eventhubs.models.PartitionContext;
import com.azure.messaging.eventhubs.models.PartitionOwnership;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.stream.Collectors;
import reactor.core.Exceptions;
import reactor.core.publisher.Mono;
import reactor.util.function.Tuple2;

final class PartitionBasedLoadBalancer {
    private static final ClientLogger LOGGER = new ClientLogger(PartitionBasedLoadBalancer.class);
    private final String eventHubName;
    private final String consumerGroupName;
    private final CheckpointStore checkpointStore;
    private final EventHubAsyncClient eventHubAsyncClient;
    private final String ownerId;
    private final long inactiveTimeLimitInSeconds;
    private final PartitionPumpManager partitionPumpManager;
    private final String fullyQualifiedNamespace;
    private final Consumer<ErrorContext> processError;
    private final PartitionContext partitionAgnosticContext;
    private final AtomicBoolean isLoadBalancerRunning = new AtomicBoolean();
    private final LoadBalancingStrategy loadBalancingStrategy;
    private final AtomicBoolean morePartitionsToClaim = new AtomicBoolean();
    private final AtomicReference<List<String>> partitionsCache = new AtomicReference(new ArrayList());

    PartitionBasedLoadBalancer(CheckpointStore checkpointStore, EventHubAsyncClient eventHubAsyncClient, String fullyQualifiedNamespace, String eventHubName, String consumerGroupName, String ownerId, long inactiveTimeLimitInSeconds, PartitionPumpManager partitionPumpManager, Consumer<ErrorContext> processError, LoadBalancingStrategy loadBalancingStrategy) {
        this.checkpointStore = checkpointStore;
        this.eventHubAsyncClient = eventHubAsyncClient;
        this.fullyQualifiedNamespace = fullyQualifiedNamespace;
        this.eventHubName = eventHubName;
        this.consumerGroupName = consumerGroupName;
        this.ownerId = ownerId;
        this.inactiveTimeLimitInSeconds = inactiveTimeLimitInSeconds;
        this.partitionPumpManager = partitionPumpManager;
        this.processError = processError;
        this.partitionAgnosticContext = new PartitionContext(fullyQualifiedNamespace, eventHubName, consumerGroupName, "NONE");
        this.loadBalancingStrategy = loadBalancingStrategy;
    }

    void loadBalance() {
        Mono partitionsMono;
        if (!this.isLoadBalancerRunning.compareAndSet(false, true)) {
            LOGGER.info("Load balancer already running");
            return;
        }
        LOGGER.atInfo().addKeyValue("ownerId", this.ownerId).log("Starting load balancer.");
        Mono partitionOwnershipMono = this.checkpointStore.listOwnership(this.fullyQualifiedNamespace, this.eventHubName, this.consumerGroupName).timeout(Duration.ofMinutes(1L)).collectMap(PartitionOwnership::getPartitionId, Function.identity());
        if (CoreUtils.isNullOrEmpty((Collection)this.partitionsCache.get())) {
            LOGGER.atInfo().addKeyValue("entityPath", this.eventHubName).log("Getting partitions from Event Hubs service.");
            partitionsMono = this.eventHubAsyncClient.getPartitionIds().timeout(Duration.ofMinutes(1L)).collectList();
        } else {
            partitionsMono = Mono.just(this.partitionsCache.get());
            this.closeClient();
        }
        Mono.zip((Mono)partitionOwnershipMono, (Mono)partitionsMono).flatMap(this::loadBalance).then().repeat(() -> LoadBalancingStrategy.GREEDY == this.loadBalancingStrategy && this.morePartitionsToClaim.get()).subscribe(ignored -> {}, ex -> {
            LOGGER.warning(Messages.LOAD_BALANCING_FAILED, new Object[]{ex});
            ErrorContext errorContext = new ErrorContext(this.partitionAgnosticContext, (Throwable)ex);
            this.processError.accept(errorContext);
            this.isLoadBalancerRunning.set(false);
            this.morePartitionsToClaim.set(false);
        }, () -> LOGGER.info("Load balancing completed successfully"));
    }

    private Mono<Void> loadBalance(Tuple2<Map<String, PartitionOwnership>, List<String>> tuple) {
        return Mono.fromRunnable(() -> {
            LOGGER.info("Starting next iteration of load balancer");
            Map partitionOwnershipMap = (Map)tuple.getT1();
            List partitionIds = (List)tuple.getT2();
            if (CoreUtils.isNullOrEmpty((Collection)partitionIds)) {
                throw LOGGER.logExceptionAsError(Exceptions.propagate((Throwable)new IllegalStateException("There are no partitions in Event Hub " + this.eventHubName)));
            }
            this.partitionsCache.set(partitionIds);
            int numberOfPartitions = partitionIds.size();
            LOGGER.info("Number of ownership records {}, number of partitions {}", new Object[]{partitionOwnershipMap.size(), numberOfPartitions});
            if (!this.isValid(partitionOwnershipMap)) {
                throw LOGGER.logExceptionAsError(Exceptions.propagate((Throwable)new IllegalStateException("Invalid partitionOwnership data from CheckpointStore")));
            }
            Map<String, PartitionOwnership> activePartitionOwnershipMap = this.removeInactivePartitionOwnerships(partitionOwnershipMap);
            LOGGER.info("Number of active ownership records {}", new Object[]{activePartitionOwnershipMap.size()});
            Map<String, List<PartitionOwnership>> ownerPartitionMap = activePartitionOwnershipMap.values().stream().collect(Collectors.groupingBy(PartitionOwnership::getOwnerId, Collectors.mapping(Function.identity(), Collectors.toList())));
            ownerPartitionMap.putIfAbsent(this.ownerId, new ArrayList());
            if (LOGGER.canLogAtLevel(LogLevel.VERBOSE)) {
                LOGGER.verbose("Current partition distribution {}", new Object[]{PartitionBasedLoadBalancer.format(ownerPartitionMap)});
            }
            if (CoreUtils.isNullOrEmpty(activePartitionOwnershipMap)) {
                this.claimOwnership(partitionOwnershipMap, (String)partitionIds.get(ThreadLocalRandom.current().nextInt(numberOfPartitions)));
                return;
            }
            int numberOfActiveEventProcessors = ownerPartitionMap.size();
            LOGGER.info("Number of active event processors {}", new Object[]{ownerPartitionMap.size()});
            int minPartitionsPerEventProcessor = numberOfPartitions / numberOfActiveEventProcessors;
            int numberOfEventProcessorsWithAdditionalPartition = numberOfPartitions % numberOfActiveEventProcessors;
            LOGGER.info("Expected min partitions per event processor = {}, expected number of event processors with additional partition = {}", new Object[]{minPartitionsPerEventProcessor, numberOfEventProcessorsWithAdditionalPartition});
            if (this.isLoadBalanced(minPartitionsPerEventProcessor, numberOfEventProcessorsWithAdditionalPartition, ownerPartitionMap)) {
                LOGGER.info("Load is balanced with this event processor owning {} partitions", new Object[]{ownerPartitionMap.get(this.ownerId).size()});
                this.renewOwnership(partitionOwnershipMap);
                return;
            }
            if (!this.shouldOwnMorePartitions(minPartitionsPerEventProcessor, ownerPartitionMap)) {
                LOGGER.info("This event processor owns {} partitions and shouldn't own more", new Object[]{ownerPartitionMap.get(this.ownerId).size()});
                this.renewOwnership(partitionOwnershipMap);
                return;
            }
            LOGGER.info("Load is unbalanced and this event processor owns {} partitions and should own more partitions", new Object[]{ownerPartitionMap.get(this.ownerId).size()});
            String partitionToClaim = partitionIds.parallelStream().filter(partitionId -> !activePartitionOwnershipMap.containsKey(partitionId)).findAny().orElseGet(() -> {
                LOGGER.info("No unclaimed partitions, stealing from another event processor");
                return this.findPartitionToSteal(ownerPartitionMap);
            });
            this.claimOwnership(partitionOwnershipMap, partitionToClaim);
        });
    }

    private void closeClient() {
        try {
            this.eventHubAsyncClient.close();
        }
        catch (Exception ex) {
            LOGGER.warning("Failed to close the client", new Object[]{ex});
        }
    }

    private void renewOwnership(Map<String, PartitionOwnership> partitionOwnershipMap) {
        this.morePartitionsToClaim.set(false);
        this.checkpointStore.claimOwnership(this.partitionPumpManager.getPartitionPumps().keySet().stream().filter(partitionId -> partitionOwnershipMap.containsKey(partitionId) && ((PartitionOwnership)partitionOwnershipMap.get(partitionId)).getOwnerId().equals(this.ownerId)).map(partitionId -> this.createPartitionOwnershipRequest(partitionOwnershipMap, (String)partitionId)).collect(Collectors.toList())).subscribe(this.partitionPumpManager::verifyPartitionConnection, ex -> {
            LOGGER.error("Error renewing partition ownership", new Object[]{ex});
            this.isLoadBalancerRunning.set(false);
        }, () -> this.isLoadBalancerRunning.set(false));
    }

    private static String format(Map<String, List<PartitionOwnership>> ownerPartitionMap) {
        return ownerPartitionMap.entrySet().stream().map(entry -> {
            StringBuilder sb = new StringBuilder();
            sb.append((String)entry.getKey()).append("=[");
            sb.append(((List)entry.getValue()).stream().map(po -> po.getPartitionId()).collect(Collectors.joining(",")));
            sb.append("]");
            return sb.toString();
        }).collect(Collectors.joining(";"));
    }

    private boolean isValid(Map<String, PartitionOwnership> partitionOwnershipMap) {
        return partitionOwnershipMap.values().stream().noneMatch(partitionOwnership -> partitionOwnership.getEventHubName() == null || !partitionOwnership.getEventHubName().equals(this.eventHubName) || partitionOwnership.getConsumerGroup() == null || !partitionOwnership.getConsumerGroup().equals(this.consumerGroupName) || partitionOwnership.getPartitionId() == null || partitionOwnership.getLastModifiedTime() == null || partitionOwnership.getETag() == null);
    }

    private String findPartitionToSteal(Map<String, List<PartitionOwnership>> ownerPartitionMap) {
        Map.Entry ownerWithMaxPartitions = ownerPartitionMap.entrySet().stream().max(Comparator.comparingInt(entry -> ((List)entry.getValue()).size())).get();
        int numberOfPartitions = ((List)ownerWithMaxPartitions.getValue()).size();
        LOGGER.atInfo().addKeyValue("ownerId", (String)ownerWithMaxPartitions.getKey()).log("Owner owns {} partitions, stealing a partition from it.", new Object[]{numberOfPartitions});
        return ((PartitionOwnership)((List)ownerWithMaxPartitions.getValue()).get(ThreadLocalRandom.current().nextInt(numberOfPartitions))).getPartitionId();
    }

    private boolean isLoadBalanced(int minPartitionsPerEventProcessor, int numberOfEventProcessorsWithAdditionalPartition, Map<String, List<PartitionOwnership>> ownerPartitionMap) {
        int count = 0;
        for (List<PartitionOwnership> partitionOwnership : ownerPartitionMap.values()) {
            int numberOfPartitions = partitionOwnership.size();
            if (numberOfPartitions < minPartitionsPerEventProcessor || numberOfPartitions > minPartitionsPerEventProcessor + 1) {
                return false;
            }
            if (numberOfPartitions != minPartitionsPerEventProcessor + 1) continue;
            ++count;
        }
        return count == numberOfEventProcessorsWithAdditionalPartition;
    }

    private boolean shouldOwnMorePartitions(int minPartitionsPerEventProcessor, Map<String, List<PartitionOwnership>> ownerPartitionMap) {
        int numberOfPartitionsOwned = ownerPartitionMap.get(this.ownerId).size();
        int leastPartitionsOwnedByAnyEventProcessor = ownerPartitionMap.values().stream().min(Comparator.comparingInt(List::size)).get().size();
        return numberOfPartitionsOwned < minPartitionsPerEventProcessor || numberOfPartitionsOwned == leastPartitionsOwnedByAnyEventProcessor;
    }

    private Map<String, PartitionOwnership> removeInactivePartitionOwnerships(Map<String, PartitionOwnership> partitionOwnershipMap) {
        return partitionOwnershipMap.entrySet().stream().filter(entry -> System.currentTimeMillis() - ((PartitionOwnership)entry.getValue()).getLastModifiedTime() < TimeUnit.SECONDS.toMillis(this.inactiveTimeLimitInSeconds) && !CoreUtils.isNullOrEmpty((CharSequence)((PartitionOwnership)entry.getValue()).getOwnerId())).collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
    }

    private void claimOwnership(Map<String, PartitionOwnership> partitionOwnershipMap, String partitionIdToClaim) {
        LOGGER.atInfo().addKeyValue("partitionId", partitionIdToClaim).log("Attempting to claim ownership of partition.");
        PartitionOwnership ownershipRequest = this.createPartitionOwnershipRequest(partitionOwnershipMap, partitionIdToClaim);
        ArrayList<PartitionOwnership> partitionsToClaim = new ArrayList<PartitionOwnership>();
        partitionsToClaim.add(ownershipRequest);
        partitionsToClaim.addAll(this.partitionPumpManager.getPartitionPumps().keySet().stream().filter(partitionId -> partitionOwnershipMap.containsKey(partitionId) && ((PartitionOwnership)partitionOwnershipMap.get(partitionId)).getOwnerId().equals(this.ownerId)).map(partitionId -> this.createPartitionOwnershipRequest(partitionOwnershipMap, (String)partitionId)).collect(Collectors.toList()));
        this.morePartitionsToClaim.set(true);
        this.checkpointStore.claimOwnership(partitionsToClaim).doOnNext(partitionOwnership -> LOGGER.atInfo().addKeyValue("partitionId", partitionOwnership.getPartitionId()).log("Successfully claimed ownership.")).doOnError(ex -> LOGGER.atWarning().addKeyValue("partitionId", ownershipRequest.getPartitionId()).log(Messages.FAILED_TO_CLAIM_OWNERSHIP, new Object[]{ex})).collectList().zipWhen(ownershipList -> this.checkpointStore.listCheckpoints(this.fullyQualifiedNamespace, this.eventHubName, this.consumerGroupName).collectMap(checkpoint -> checkpoint.getPartitionId(), Function.identity())).subscribe(ownedPartitionCheckpointsTuple -> ((List)ownedPartitionCheckpointsTuple.getT1()).stream().forEach(po -> this.partitionPumpManager.startPartitionPump((PartitionOwnership)po, (Checkpoint)((Map)ownedPartitionCheckpointsTuple.getT2()).get(po.getPartitionId()))), ex -> {
            LOGGER.warning("Error while claiming checkpoints", new Object[]{ex});
            ErrorContext errorContext = new ErrorContext(this.partitionAgnosticContext, (Throwable)ex);
            this.processError.accept(errorContext);
            if (this.loadBalancingStrategy == LoadBalancingStrategy.BALANCED) {
                this.isLoadBalancerRunning.set(false);
            }
            throw LOGGER.logExceptionAsError((RuntimeException)new IllegalStateException("Error while claiming checkpoints", (Throwable)ex));
        }, () -> {
            if (this.loadBalancingStrategy == LoadBalancingStrategy.BALANCED) {
                this.isLoadBalancerRunning.set(false);
            }
        });
    }

    private PartitionOwnership createPartitionOwnershipRequest(Map<String, PartitionOwnership> partitionOwnershipMap, String partitionIdToClaim) {
        PartitionOwnership previousPartitionOwnership = partitionOwnershipMap.get(partitionIdToClaim);
        PartitionOwnership partitionOwnershipRequest = new PartitionOwnership().setFullyQualifiedNamespace(this.fullyQualifiedNamespace).setOwnerId(this.ownerId).setPartitionId(partitionIdToClaim).setConsumerGroup(this.consumerGroupName).setEventHubName(this.eventHubName).setETag(previousPartitionOwnership == null ? null : previousPartitionOwnership.getETag());
        return partitionOwnershipRequest;
    }
}

