/*
 * Decompiled with CFR 0.152.
 */
package org.apache.druid.server.coordinator.balancer;

import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.apache.druid.client.ImmutableDruidDataSource;
import org.apache.druid.java.util.emitter.EmittingLogger;
import org.apache.druid.server.coordinator.DruidCoordinatorRuntimeParams;
import org.apache.druid.server.coordinator.ServerHolder;
import org.apache.druid.server.coordinator.balancer.BalancerSegmentHolder;
import org.apache.druid.server.coordinator.balancer.ReservoirSegmentSampler;
import org.apache.druid.server.coordinator.loading.SegmentLoadingConfig;
import org.apache.druid.server.coordinator.loading.StrategicSegmentAssigner;
import org.apache.druid.server.coordinator.stats.CoordinatorRunStats;
import org.apache.druid.server.coordinator.stats.Dimension;
import org.apache.druid.server.coordinator.stats.RowKey;
import org.apache.druid.server.coordinator.stats.Stats;
import org.apache.druid.timeline.DataSegment;

public class TierSegmentBalancer {
    private static final EmittingLogger log = new EmittingLogger(TierSegmentBalancer.class);
    private final String tier;
    private final DruidCoordinatorRuntimeParams params;
    private final StrategicSegmentAssigner segmentAssigner;
    private final SegmentLoadingConfig loadingConfig;
    private final CoordinatorRunStats runStats;
    private final List<ServerHolder> activeServers;
    private final List<ServerHolder> decommissioningServers;
    private final int totalMaxSegmentsToMove;
    private final int movingSegmentCount;

    public TierSegmentBalancer(String tier, Set<ServerHolder> servers, DruidCoordinatorRuntimeParams params) {
        this.tier = tier;
        this.params = params;
        this.segmentAssigner = params.getSegmentAssigner();
        this.loadingConfig = params.getSegmentLoadingConfig();
        this.totalMaxSegmentsToMove = this.loadingConfig.getMaxSegmentsToMove();
        this.runStats = this.segmentAssigner.getStats();
        Map<Boolean, List<ServerHolder>> partitions = servers.stream().collect(Collectors.partitioningBy(ServerHolder::isDecommissioning));
        this.decommissioningServers = partitions.get(true);
        this.activeServers = partitions.get(false);
        this.movingSegmentCount = this.activeServers.stream().mapToInt(ServerHolder::getNumMovingSegments).sum();
    }

    public void run() {
        if (this.activeServers.isEmpty() || this.activeServers.size() <= 1 && this.decommissioningServers.isEmpty()) {
            log.warn("Skipping balance for tier [%s] with [%d] active servers and [%d] decomissioning servers.", new Object[]{this.tier, this.activeServers.size(), this.decommissioningServers.size()});
            return;
        }
        log.info("Moving max [%d] segments in tier [%s] with [%d] active servers and [%d] decommissioning servers. There are [%d] segments already in queue.", new Object[]{this.totalMaxSegmentsToMove, this.tier, this.activeServers.size(), this.decommissioningServers.size(), this.movingSegmentCount});
        int movedDecommSegments = 0;
        if (!this.decommissioningServers.isEmpty()) {
            int maxDecommPercentToMove = this.loadingConfig.getPercentDecommSegmentsToMove();
            int maxDecommSegmentsToMove = (int)Math.ceil((double)this.totalMaxSegmentsToMove * ((double)maxDecommPercentToMove / 100.0));
            log.info("Moved [%d] segments out of max [%d (%d%%)] from decommissioning to active servers in tier [%s].", new Object[]{movedDecommSegments += this.moveSegmentsFromTo(this.decommissioningServers, this.activeServers, maxDecommSegmentsToMove), maxDecommSegmentsToMove, maxDecommPercentToMove, this.tier});
        }
        int maxGeneralSegmentsToMove = this.totalMaxSegmentsToMove - movedDecommSegments;
        int movedGeneralSegments = this.moveSegmentsFromTo(this.activeServers, this.activeServers, maxGeneralSegmentsToMove);
        log.info("Moved [%d] segments out of max [%d] between active servers in tier [%s].", new Object[]{movedGeneralSegments, maxGeneralSegmentsToMove, this.tier});
    }

    private int moveSegmentsFromTo(List<ServerHolder> sourceServers, List<ServerHolder> destServers, int maxSegmentsToMove) {
        if (maxSegmentsToMove <= 0 || sourceServers.isEmpty() || destServers.isEmpty()) {
            return 0;
        }
        Set<String> broadcastDatasources = this.params.getBroadcastDatasources();
        List<BalancerSegmentHolder> pickedSegments = ReservoirSegmentSampler.pickMovableSegmentsFrom(sourceServers, maxSegmentsToMove, ServerHolder::getLoadingSegments, broadcastDatasources);
        int movedCount = this.moveSegmentsTo(destServers, pickedSegments, maxSegmentsToMove);
        if (this.movingSegmentCount <= 0) {
            pickedSegments = ReservoirSegmentSampler.pickMovableSegmentsFrom(sourceServers, maxSegmentsToMove -= movedCount, server -> server.getServer().iterateAllSegments(), broadcastDatasources);
            movedCount += this.moveSegmentsTo(destServers, pickedSegments, maxSegmentsToMove);
        }
        return movedCount;
    }

    private int moveSegmentsTo(List<ServerHolder> destinationServers, List<BalancerSegmentHolder> movableSegments, int maxSegmentsToMove) {
        int processed = 0;
        int movedCount = 0;
        Iterator<BalancerSegmentHolder> segmentIterator = movableSegments.iterator();
        while (segmentIterator.hasNext() && processed < maxSegmentsToMove) {
            ++processed;
            BalancerSegmentHolder segmentHolder = segmentIterator.next();
            DataSegment segmentToMove = this.getLoadableSegment(segmentHolder.getSegment());
            if (segmentToMove == null || !this.segmentAssigner.moveSegment(segmentToMove, segmentHolder.getServer(), destinationServers)) continue;
            ++movedCount;
        }
        return movedCount;
    }

    @Nullable
    private DataSegment getLoadableSegment(DataSegment segmentToMove) {
        if (!this.params.getUsedSegments().contains(segmentToMove)) {
            this.markUnmoved("Segment is unused", segmentToMove);
            return null;
        }
        ImmutableDruidDataSource datasource = this.params.getDataSourcesSnapshot().getDataSource(segmentToMove.getDataSource());
        if (datasource == null) {
            this.markUnmoved("Invalid datasource", segmentToMove);
            return null;
        }
        DataSegment loadableSegment = datasource.getSegment(segmentToMove.getId());
        if (loadableSegment == null) {
            this.markUnmoved("Invalid segment ID", segmentToMove);
            return null;
        }
        return loadableSegment;
    }

    private void markUnmoved(String reason, DataSegment segment) {
        RowKey key = RowKey.with(Dimension.TIER, this.tier).with(Dimension.DATASOURCE, segment.getDataSource()).and(Dimension.DESCRIPTION, reason);
        this.runStats.add(Stats.Segments.MOVE_SKIPPED, key, 1L);
    }
}

