/*
 * Decompiled with CFR 0.152.
 */
package org.apache.druid.server.coordinator.duty;

import java.util.HashMap;
import java.util.Map;
import java.util.TreeSet;
import org.apache.druid.client.ImmutableDruidDataSource;
import org.apache.druid.client.ImmutableDruidServer;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.server.coordinator.DruidCoordinatorRuntimeParams;
import org.apache.druid.server.coordinator.ServerHolder;
import org.apache.druid.server.coordinator.duty.CoordinatorDuty;
import org.apache.druid.server.coordinator.loading.SegmentLoadQueueManager;
import org.apache.druid.server.coordinator.rules.BroadcastDistributionRule;
import org.apache.druid.server.coordinator.stats.CoordinatorRunStats;
import org.apache.druid.server.coordinator.stats.Stats;
import org.apache.druid.timeline.DataSegment;

public class UnloadUnusedSegments
implements CoordinatorDuty {
    private static final Logger log = new Logger(UnloadUnusedSegments.class);
    private final SegmentLoadQueueManager loadQueueManager;

    public UnloadUnusedSegments(SegmentLoadQueueManager loadQueueManager) {
        this.loadQueueManager = loadQueueManager;
    }

    @Override
    public DruidCoordinatorRuntimeParams run(DruidCoordinatorRuntimeParams params) {
        HashMap<String, Boolean> broadcastStatusByDatasource = new HashMap<String, Boolean>();
        for (String broadcastDatasource : params.getBroadcastDatasources()) {
            broadcastStatusByDatasource.put(broadcastDatasource, true);
        }
        CoordinatorRunStats stats = params.getCoordinatorStats();
        params.getDruidCluster().getAllServers().forEach(server -> this.handleUnusedSegmentsForServer((ServerHolder)server, params, stats, (Map<String, Boolean>)broadcastStatusByDatasource));
        return params;
    }

    private void handleUnusedSegmentsForServer(ServerHolder serverHolder, DruidCoordinatorRuntimeParams params, CoordinatorRunStats stats, Map<String, Boolean> broadcastStatusByDatasource) {
        ImmutableDruidServer server = serverHolder.getServer();
        for (ImmutableDruidDataSource dataSource : server.getDataSources()) {
            boolean isBroadcastDatasource = broadcastStatusByDatasource.computeIfAbsent(dataSource.getName(), dataSourceName -> this.isBroadcastDatasource((String)dataSourceName, params));
            if (serverHolder.isRealtimeServer() && !isBroadcastDatasource) continue;
            int totalUnneededCount = 0;
            TreeSet<DataSegment> usedSegments = params.getUsedSegments();
            for (DataSegment segment : dataSource.getSegments()) {
                if (usedSegments.contains(segment) || !this.loadQueueManager.dropSegment(segment, serverHolder)) continue;
                ++totalUnneededCount;
                log.info("Dropping uneeded segment [%s] from server [%s] in tier [%s]", new Object[]{segment.getId(), server.getName(), server.getTier()});
            }
            if (totalUnneededCount <= 0) continue;
            stats.addToSegmentStat(Stats.Segments.UNNEEDED, server.getTier(), dataSource.getName(), totalUnneededCount);
        }
    }

    private boolean isBroadcastDatasource(String datasource, DruidCoordinatorRuntimeParams params) {
        return params.getDatabaseRuleManager().getRulesWithDefault(datasource).stream().anyMatch(rule -> rule instanceof BroadcastDistributionRule);
    }
}

