/*
 * Decompiled with CFR 0.152.
 */
package org.apache.druid.server.coordinator.duty;

import java.util.Set;
import org.apache.druid.java.util.common.Pair;
import org.apache.druid.java.util.emitter.EmittingLogger;
import org.apache.druid.server.coordinator.CoordinatorDynamicConfig;
import org.apache.druid.server.coordinator.DruidCluster;
import org.apache.druid.server.coordinator.DruidCoordinatorRuntimeParams;
import org.apache.druid.server.coordinator.ServerHolder;
import org.apache.druid.server.coordinator.balancer.SegmentToMoveCalculator;
import org.apache.druid.server.coordinator.balancer.TierSegmentBalancer;
import org.apache.druid.server.coordinator.duty.CoordinatorDuty;
import org.apache.druid.server.coordinator.loading.SegmentLoadingConfig;
import org.apache.druid.server.coordinator.stats.CoordinatorRunStats;
import org.joda.time.Duration;

public class BalanceSegments
implements CoordinatorDuty {
    private static final EmittingLogger log = new EmittingLogger(BalanceSegments.class);
    private final Duration coordinatorPeriod;

    public BalanceSegments(Duration coordinatorPeriod) {
        this.coordinatorPeriod = coordinatorPeriod;
    }

    @Override
    public DruidCoordinatorRuntimeParams run(DruidCoordinatorRuntimeParams params) {
        if (params.getUsedSegments().isEmpty()) {
            log.info("Skipping balance as there are no used segments.", new Object[0]);
            return params;
        }
        DruidCluster cluster = params.getDruidCluster();
        SegmentLoadingConfig loadingConfig = params.getSegmentLoadingConfig();
        int maxSegmentsToMove = this.getMaxSegmentsToMove(params);
        if (maxSegmentsToMove <= 0) {
            log.info("Skipping balance as maxSegmentsToMove is [%d].", new Object[]{maxSegmentsToMove});
            return params;
        }
        log.info("Balancing segments in tiers [%s] with maxSegmentsToMove[%,d] and maxLifetime[%d].", new Object[]{cluster.getTierNames(), maxSegmentsToMove, loadingConfig.getMaxLifetimeInLoadQueue()});
        cluster.getHistoricals().forEach((tier, servers) -> new TierSegmentBalancer((String)tier, (Set<ServerHolder>)servers, maxSegmentsToMove, params).run());
        CoordinatorRunStats runStats = params.getCoordinatorStats();
        params.getBalancerStrategy().getStats().forEachStat(runStats::add);
        return params;
    }

    private int getMaxSegmentsToMove(DruidCoordinatorRuntimeParams params) {
        CoordinatorDynamicConfig dynamicConfig = params.getCoordinatorDynamicConfig();
        if (dynamicConfig.isSmartSegmentLoading()) {
            Pair<Integer, Integer> numHistoricalsAndSegments = this.getNumHistoricalsAndSegments(params.getDruidCluster());
            int totalSegmentsInCluster = (Integer)numHistoricalsAndSegments.rhs;
            int numBalancerThreads = params.getSegmentLoadingConfig().getBalancerComputeThreads();
            int maxSegmentsToMove = SegmentToMoveCalculator.computeMaxSegmentsToMovePerTier(totalSegmentsInCluster, numBalancerThreads, this.coordinatorPeriod);
            log.info("Computed maxSegmentsToMove[%,d] for total [%,d] segments on [%d] historicals.", new Object[]{maxSegmentsToMove, totalSegmentsInCluster, numHistoricalsAndSegments.lhs});
            return maxSegmentsToMove;
        }
        return dynamicConfig.getMaxSegmentsToMove();
    }

    private Pair<Integer, Integer> getNumHistoricalsAndSegments(DruidCluster cluster) {
        int numHistoricals = 0;
        int numSegments = 0;
        for (Set set : cluster.getHistoricals().values()) {
            for (ServerHolder historical : set) {
                ++numHistoricals;
                numSegments += historical.getServer().getNumSegments() + historical.getNumQueuedSegments();
            }
        }
        return Pair.of((Object)numHistoricals, (Object)numSegments);
    }
}

