/*
 * Decompiled with CFR 0.152.
 */
package org.apache.druid.server.coordinator.duty;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.TreeSet;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.druid.client.ImmutableDruidDataSource;
import org.apache.druid.client.ImmutableDruidServer;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.server.coordinator.DruidCoordinatorRuntimeParams;
import org.apache.druid.server.coordinator.ServerHolder;
import org.apache.druid.server.coordinator.duty.CoordinatorDuty;
import org.apache.druid.server.coordinator.loading.SegmentAction;
import org.apache.druid.server.coordinator.loading.SegmentLoadQueueManager;
import org.apache.druid.server.coordinator.rules.BroadcastDistributionRule;
import org.apache.druid.server.coordinator.stats.CoordinatorRunStats;
import org.apache.druid.server.coordinator.stats.Stats;
import org.apache.druid.timeline.DataSegment;

public class UnloadUnusedSegments
implements CoordinatorDuty {
    private static final Logger log = new Logger(UnloadUnusedSegments.class);
    private final SegmentLoadQueueManager loadQueueManager;

    public UnloadUnusedSegments(SegmentLoadQueueManager loadQueueManager) {
        this.loadQueueManager = loadQueueManager;
    }

    @Override
    public DruidCoordinatorRuntimeParams run(DruidCoordinatorRuntimeParams params) {
        HashMap<String, Boolean> broadcastStatusByDatasource = new HashMap<String, Boolean>();
        for (String broadcastDatasource : params.getBroadcastDatasources()) {
            broadcastStatusByDatasource.put(broadcastDatasource, true);
        }
        List<ServerHolder> allServers = params.getDruidCluster().getAllServers();
        int numCancelledLoads = allServers.stream().mapToInt(server -> this.cancelLoadOfUnusedSegments((ServerHolder)server, (Map<String, Boolean>)broadcastStatusByDatasource, params)).sum();
        CoordinatorRunStats stats = params.getCoordinatorStats();
        int numQueuedDrops = allServers.stream().mapToInt(server -> this.dropUnusedSegments((ServerHolder)server, params, stats, (Map<String, Boolean>)broadcastStatusByDatasource)).sum();
        if (numCancelledLoads > 0 || numQueuedDrops > 0) {
            log.info("Cancelled [%d] loads and started [%d] drops of unused segments.", new Object[]{numCancelledLoads, numQueuedDrops});
        }
        return params;
    }

    private int dropUnusedSegments(ServerHolder serverHolder, DruidCoordinatorRuntimeParams params, CoordinatorRunStats stats, Map<String, Boolean> broadcastStatusByDatasource) {
        TreeSet<DataSegment> usedSegments = params.getUsedSegments();
        AtomicInteger numQueuedDrops = new AtomicInteger(0);
        ImmutableDruidServer server = serverHolder.getServer();
        for (ImmutableDruidDataSource dataSource : server.getDataSources()) {
            if (this.shouldSkipUnload(serverHolder, dataSource.getName(), broadcastStatusByDatasource, params)) continue;
            int totalUnneededCount = 0;
            for (DataSegment segment : dataSource.getSegments()) {
                if (usedSegments.contains(segment) || !this.loadQueueManager.dropSegment(segment, serverHolder)) continue;
                ++totalUnneededCount;
                log.debug("Dropping uneeded segment[%s] from server[%s] in tier[%s].", new Object[]{segment.getId(), server.getName(), server.getTier()});
            }
            if (totalUnneededCount <= 0) continue;
            stats.addToSegmentStat(Stats.Segments.UNNEEDED, server.getTier(), dataSource.getName(), totalUnneededCount);
            numQueuedDrops.addAndGet(totalUnneededCount);
        }
        return numQueuedDrops.get();
    }

    private int cancelLoadOfUnusedSegments(ServerHolder server, Map<String, Boolean> broadcastStatusByDatasource, DruidCoordinatorRuntimeParams params) {
        TreeSet<DataSegment> usedSegments = params.getUsedSegments();
        AtomicInteger cancelledOperations = new AtomicInteger(0);
        server.getQueuedSegments().forEach((segment, action) -> {
            if (!this.shouldSkipUnload(server, segment.getDataSource(), broadcastStatusByDatasource, params) && !usedSegments.contains(segment) && action.isLoad() && server.cancelOperation((SegmentAction)((Object)action), (DataSegment)segment)) {
                cancelledOperations.incrementAndGet();
            }
        });
        return cancelledOperations.get();
    }

    private boolean shouldSkipUnload(ServerHolder server, String dataSource, Map<String, Boolean> broadcastStatusByDatasource, DruidCoordinatorRuntimeParams params) {
        boolean isBroadcastDatasource = broadcastStatusByDatasource.computeIfAbsent(dataSource, ds -> this.isBroadcastDatasource((String)ds, params));
        return server.isRealtimeServer() && !isBroadcastDatasource;
    }

    private boolean isBroadcastDatasource(String datasource, DruidCoordinatorRuntimeParams params) {
        return params.getDatabaseRuleManager().getRulesWithDefault(datasource).stream().anyMatch(rule -> rule instanceof BroadcastDistributionRule);
    }
}

