/*
 * Decompiled with CFR 0.152.
 */
package org.apache.druid.server.coordinator.duty;

import java.util.List;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import org.apache.druid.client.DruidServer;
import org.apache.druid.client.ImmutableDruidServer;
import org.apache.druid.client.ServerInventoryView;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.server.coordinator.CoordinatorDynamicConfig;
import org.apache.druid.server.coordinator.DruidCluster;
import org.apache.druid.server.coordinator.DruidCoordinatorRuntimeParams;
import org.apache.druid.server.coordinator.ServerHolder;
import org.apache.druid.server.coordinator.balancer.BalancerStrategy;
import org.apache.druid.server.coordinator.balancer.BalancerStrategyFactory;
import org.apache.druid.server.coordinator.duty.CoordinatorDuty;
import org.apache.druid.server.coordinator.loading.LoadQueueTaskMaster;
import org.apache.druid.server.coordinator.loading.SegmentAction;
import org.apache.druid.server.coordinator.loading.SegmentLoadQueueManager;
import org.apache.druid.server.coordinator.loading.SegmentLoadingConfig;
import org.apache.druid.server.coordinator.stats.CoordinatorRunStats;
import org.apache.druid.server.coordinator.stats.Dimension;
import org.apache.druid.server.coordinator.stats.RowKey;
import org.apache.druid.server.coordinator.stats.Stats;
import org.apache.druid.timeline.DataSegment;

public class PrepareBalancerAndLoadQueues
implements CoordinatorDuty {
    private static final Logger log = new Logger(PrepareBalancerAndLoadQueues.class);
    private final LoadQueueTaskMaster taskMaster;
    private final SegmentLoadQueueManager loadQueueManager;
    private final ServerInventoryView serverInventoryView;
    private final BalancerStrategyFactory balancerStrategyFactory;

    public PrepareBalancerAndLoadQueues(LoadQueueTaskMaster taskMaster, SegmentLoadQueueManager loadQueueManager, BalancerStrategyFactory balancerStrategyFactory, ServerInventoryView serverInventoryView) {
        this.taskMaster = taskMaster;
        this.loadQueueManager = loadQueueManager;
        this.balancerStrategyFactory = balancerStrategyFactory;
        this.serverInventoryView = serverInventoryView;
    }

    @Override
    public DruidCoordinatorRuntimeParams run(DruidCoordinatorRuntimeParams params) {
        List<ImmutableDruidServer> currentServers = this.prepareCurrentServers();
        this.taskMaster.resetPeonsForNewServers(currentServers);
        CoordinatorDynamicConfig dynamicConfig = params.getCoordinatorDynamicConfig();
        SegmentLoadingConfig segmentLoadingConfig = SegmentLoadingConfig.create(dynamicConfig, params.getUsedSegmentCount());
        DruidCluster cluster = this.prepareCluster(dynamicConfig, segmentLoadingConfig, currentServers);
        this.cancelLoadsOnDecommissioningServers(cluster);
        CoordinatorRunStats stats = params.getCoordinatorStats();
        this.collectHistoricalStats(cluster, stats);
        this.collectUsedSegmentStats(params, stats);
        this.collectDebugStats(segmentLoadingConfig, stats);
        int numBalancerThreads = segmentLoadingConfig.getBalancerComputeThreads();
        BalancerStrategy balancerStrategy = this.balancerStrategyFactory.createBalancerStrategy(numBalancerThreads);
        log.debug("Using balancer strategy[%s] with [%d] threads.", new Object[]{balancerStrategy.getClass().getSimpleName(), numBalancerThreads});
        return params.buildFromExisting().withDruidCluster(cluster).withBalancerStrategy(balancerStrategy).withSegmentLoadingConfig(segmentLoadingConfig).withSegmentAssignerUsing(this.loadQueueManager).build();
    }

    private void cancelLoadsOnDecommissioningServers(DruidCluster cluster) {
        AtomicInteger cancelledCount = new AtomicInteger(0);
        List decommissioningServers = cluster.getAllServers().stream().filter(ServerHolder::isDecommissioning).collect(Collectors.toList());
        for (ServerHolder server : decommissioningServers) {
            server.getQueuedSegments().forEach((segment, action) -> {
                if (action.isLoad() && server.cancelOperation((SegmentAction)((Object)action), (DataSegment)segment)) {
                    cancelledCount.incrementAndGet();
                }
            });
        }
    }

    private List<ImmutableDruidServer> prepareCurrentServers() {
        return this.serverInventoryView.getInventory().stream().filter(DruidServer::isSegmentReplicationOrBroadcastTarget).map(DruidServer::toImmutableDruidServer).collect(Collectors.toList());
    }

    private DruidCluster prepareCluster(CoordinatorDynamicConfig dynamicConfig, SegmentLoadingConfig segmentLoadingConfig, List<ImmutableDruidServer> currentServers) {
        Set<String> decommissioningServers = dynamicConfig.getDecommissioningNodes();
        DruidCluster.Builder cluster = DruidCluster.builder();
        for (ImmutableDruidServer server : currentServers) {
            cluster.add(new ServerHolder(server, this.taskMaster.getPeonForServer(server), decommissioningServers.contains(server.getHost()), segmentLoadingConfig.getMaxSegmentsInLoadQueue(), segmentLoadingConfig.getMaxLifetimeInLoadQueue()));
        }
        return cluster.build();
    }

    private void collectHistoricalStats(DruidCluster cluster, CoordinatorRunStats stats) {
        cluster.getHistoricals().forEach((tier, historicals) -> {
            RowKey rowKey = RowKey.of(Dimension.TIER, tier);
            stats.add(Stats.Tier.HISTORICAL_COUNT, rowKey, historicals.size());
            long totalCapacity = historicals.stream().mapToLong(ServerHolder::getMaxSize).sum();
            stats.add(Stats.Tier.TOTAL_CAPACITY, rowKey, totalCapacity);
        });
    }

    private void collectUsedSegmentStats(DruidCoordinatorRuntimeParams params, CoordinatorRunStats stats) {
        params.getUsedSegmentsTimelinesPerDataSource().forEach((dataSource, timeline) -> {
            long totalSizeOfUsedSegments = timeline.iterateAllObjects().stream().mapToLong(DataSegment::getSize).sum();
            RowKey datasourceKey = RowKey.of(Dimension.DATASOURCE, dataSource);
            stats.add(Stats.Segments.USED_BYTES, datasourceKey, totalSizeOfUsedSegments);
            stats.add(Stats.Segments.USED, datasourceKey, timeline.getNumObjects());
        });
    }

    private void collectDebugStats(SegmentLoadingConfig config, CoordinatorRunStats stats) {
        stats.add(Stats.Balancer.COMPUTE_THREADS, config.getBalancerComputeThreads());
        stats.add(Stats.Segments.REPLICATION_THROTTLE_LIMIT, config.getReplicationThrottleLimit());
    }
}

