/*
 * Decompiled with CFR 0.152.
 */
package org.apache.druid.server.coordinator;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Stopwatch;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Ordering;
import com.google.common.collect.Sets;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.inject.Inject;
import it.unimi.dsi.fastutil.objects.Object2IntMap;
import it.unimi.dsi.fastutil.objects.Object2IntMaps;
import it.unimi.dsi.fastutil.objects.Object2IntOpenHashMap;
import it.unimi.dsi.fastutil.objects.Object2LongMap;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.apache.druid.client.DataSourcesSnapshot;
import org.apache.druid.client.DruidDataSource;
import org.apache.druid.client.DruidServer;
import org.apache.druid.client.ImmutableDruidDataSource;
import org.apache.druid.client.ImmutableDruidServer;
import org.apache.druid.client.ServerInventoryView;
import org.apache.druid.client.coordinator.Coordinator;
import org.apache.druid.client.indexing.IndexingServiceClient;
import org.apache.druid.common.config.JacksonConfigManager;
import org.apache.druid.curator.discovery.ServiceAnnouncer;
import org.apache.druid.discovery.DruidLeaderSelector;
import org.apache.druid.guice.ManageLifecycle;
import org.apache.druid.guice.annotations.CoordinatorIndexingServiceDuty;
import org.apache.druid.guice.annotations.CoordinatorMetadataStoreManagementDuty;
import org.apache.druid.guice.annotations.Self;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.java.util.common.concurrent.ScheduledExecutorFactory;
import org.apache.druid.java.util.common.concurrent.ScheduledExecutors;
import org.apache.druid.java.util.common.guava.Comparators;
import org.apache.druid.java.util.common.lifecycle.LifecycleStart;
import org.apache.druid.java.util.common.lifecycle.LifecycleStop;
import org.apache.druid.java.util.emitter.EmittingLogger;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
import org.apache.druid.metadata.MetadataRuleManager;
import org.apache.druid.metadata.SegmentsMetadataManager;
import org.apache.druid.server.DruidNode;
import org.apache.druid.server.coordinator.AutoCompactionSnapshot;
import org.apache.druid.server.coordinator.CoordinatorCompactionConfig;
import org.apache.druid.server.coordinator.CoordinatorDynamicConfig;
import org.apache.druid.server.coordinator.DruidCluster;
import org.apache.druid.server.coordinator.DruidCoordinatorConfig;
import org.apache.druid.server.coordinator.DruidCoordinatorRuntimeParams;
import org.apache.druid.server.coordinator.ServerHolder;
import org.apache.druid.server.coordinator.balancer.BalancerStrategy;
import org.apache.druid.server.coordinator.balancer.BalancerStrategyFactory;
import org.apache.druid.server.coordinator.duty.BalanceSegments;
import org.apache.druid.server.coordinator.duty.CollectSegmentAndServerStats;
import org.apache.druid.server.coordinator.duty.CompactSegments;
import org.apache.druid.server.coordinator.duty.CompactionSegmentSearchPolicy;
import org.apache.druid.server.coordinator.duty.CoordinatorCustomDutyGroup;
import org.apache.druid.server.coordinator.duty.CoordinatorCustomDutyGroups;
import org.apache.druid.server.coordinator.duty.CoordinatorDuty;
import org.apache.druid.server.coordinator.duty.MarkOvershadowedSegmentsAsUnused;
import org.apache.druid.server.coordinator.duty.RunRules;
import org.apache.druid.server.coordinator.duty.UnloadUnusedSegments;
import org.apache.druid.server.coordinator.loading.LoadQueuePeon;
import org.apache.druid.server.coordinator.loading.LoadQueueTaskMaster;
import org.apache.druid.server.coordinator.loading.SegmentAction;
import org.apache.druid.server.coordinator.loading.SegmentLoadQueueManager;
import org.apache.druid.server.coordinator.loading.SegmentLoadingConfig;
import org.apache.druid.server.coordinator.loading.SegmentReplicaCount;
import org.apache.druid.server.coordinator.loading.SegmentReplicationStatus;
import org.apache.druid.server.coordinator.stats.CoordinatorRunStats;
import org.apache.druid.server.coordinator.stats.CoordinatorStat;
import org.apache.druid.server.coordinator.stats.Dimension;
import org.apache.druid.server.coordinator.stats.RowKey;
import org.apache.druid.server.coordinator.stats.Stats;
import org.apache.druid.server.lookup.cache.LookupCoordinatorManager;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.SegmentId;
import org.joda.time.DateTime;
import org.joda.time.Duration;

@ManageLifecycle
public class DruidCoordinator {
    public static final Ordering<DataSegment> SEGMENT_COMPARATOR_RECENT_FIRST = Ordering.from((Comparator)Comparators.intervalsByEndThenStart()).onResultOf(DataSegment::getInterval).compound((Comparator)Ordering.natural()).reverse();
    private static final EmittingLogger log = new EmittingLogger(DruidCoordinator.class);
    private final Object lock = new Object();
    private final DruidCoordinatorConfig config;
    private final JacksonConfigManager configManager;
    private final SegmentsMetadataManager segmentsMetadataManager;
    private final ServerInventoryView serverInventoryView;
    private final MetadataRuleManager metadataRuleManager;
    private final ServiceEmitter emitter;
    private final IndexingServiceClient indexingServiceClient;
    private final ScheduledExecutorService exec;
    private final LoadQueueTaskMaster taskMaster;
    private final ConcurrentHashMap<String, LoadQueuePeon> loadManagementPeons = new ConcurrentHashMap();
    private final SegmentLoadQueueManager loadQueueManager;
    private final ServiceAnnouncer serviceAnnouncer;
    private final DruidNode self;
    private final Set<CoordinatorDuty> indexingServiceDuties;
    private final Set<CoordinatorDuty> metadataStoreManagementDuties;
    private final CoordinatorCustomDutyGroups customDutyGroups;
    private final BalancerStrategyFactory balancerStrategyFactory;
    private final LookupCoordinatorManager lookupCoordinatorManager;
    private final DruidLeaderSelector coordLeaderSelector;
    private final CompactSegments compactSegments;
    private volatile boolean started = false;
    private volatile SegmentReplicationStatus segmentReplicationStatus = null;
    private int cachedBalancerThreadNumber;
    private ListeningExecutorService balancerExec;
    public static final String HISTORICAL_MANAGEMENT_DUTIES_DUTY_GROUP = "HistoricalManagementDuties";
    private static final String METADATA_STORE_MANAGEMENT_DUTIES_DUTY_GROUP = "MetadataStoreManagementDuties";
    private static final String INDEXING_SERVICE_DUTIES_DUTY_GROUP = "IndexingServiceDuties";
    private static final String COMPACT_SEGMENTS_DUTIES_DUTY_GROUP = "CompactSegmentsDuties";

    @Inject
    public DruidCoordinator(DruidCoordinatorConfig config, JacksonConfigManager configManager, SegmentsMetadataManager segmentsMetadataManager, ServerInventoryView serverInventoryView, MetadataRuleManager metadataRuleManager, ServiceEmitter emitter, ScheduledExecutorFactory scheduledExecutorFactory, IndexingServiceClient indexingServiceClient, LoadQueueTaskMaster taskMaster, SegmentLoadQueueManager loadQueueManager, ServiceAnnouncer serviceAnnouncer, @Self DruidNode self, @CoordinatorMetadataStoreManagementDuty Set<CoordinatorDuty> metadataStoreManagementDuties, @CoordinatorIndexingServiceDuty Set<CoordinatorDuty> indexingServiceDuties, CoordinatorCustomDutyGroups customDutyGroups, BalancerStrategyFactory balancerStrategyFactory, LookupCoordinatorManager lookupCoordinatorManager, @Coordinator DruidLeaderSelector coordLeaderSelector, CompactionSegmentSearchPolicy compactionSegmentSearchPolicy) {
        this.config = config;
        this.configManager = configManager;
        this.segmentsMetadataManager = segmentsMetadataManager;
        this.serverInventoryView = serverInventoryView;
        this.metadataRuleManager = metadataRuleManager;
        this.emitter = emitter;
        this.indexingServiceClient = indexingServiceClient;
        this.taskMaster = taskMaster;
        this.serviceAnnouncer = serviceAnnouncer;
        this.self = self;
        this.indexingServiceDuties = indexingServiceDuties;
        this.metadataStoreManagementDuties = metadataStoreManagementDuties;
        this.customDutyGroups = customDutyGroups;
        this.exec = scheduledExecutorFactory.create(1, "Coordinator-Exec--%d");
        this.balancerStrategyFactory = balancerStrategyFactory;
        this.lookupCoordinatorManager = lookupCoordinatorManager;
        this.coordLeaderSelector = coordLeaderSelector;
        this.compactSegments = this.initializeCompactSegmentsDuty(compactionSegmentSearchPolicy);
        this.loadQueueManager = loadQueueManager;
    }

    public boolean isLeader() {
        return this.coordLeaderSelector.isLeader();
    }

    public Map<String, LoadQueuePeon> getLoadManagementPeons() {
        return this.loadManagementPeons;
    }

    public Map<String, Object2LongMap<String>> getTierToDatasourceToUnderReplicatedCount(boolean useClusterView) {
        Iterable<DataSegment> dataSegments = this.segmentsMetadataManager.iterateAllUsedSegments();
        return this.computeUnderReplicated(dataSegments, useClusterView);
    }

    public Map<String, Object2LongMap<String>> getTierToDatasourceToUnderReplicatedCount(Iterable<DataSegment> dataSegments, boolean useClusterView) {
        return this.computeUnderReplicated(dataSegments, useClusterView);
    }

    public Object2IntMap<String> getDatasourceToUnavailableSegmentCount() {
        if (this.segmentReplicationStatus == null) {
            return Object2IntMaps.emptyMap();
        }
        Object2IntOpenHashMap datasourceToUnavailableSegments = new Object2IntOpenHashMap();
        Iterable<DataSegment> dataSegments = this.segmentsMetadataManager.iterateAllUsedSegments();
        for (DataSegment segment : dataSegments) {
            SegmentReplicaCount replicaCount = this.segmentReplicationStatus.getReplicaCountsInCluster(segment.getId());
            if (replicaCount != null && replicaCount.totalLoaded() > 0) {
                datasourceToUnavailableSegments.addTo((Object)segment.getDataSource(), 0);
                continue;
            }
            datasourceToUnavailableSegments.addTo((Object)segment.getDataSource(), 1);
        }
        return datasourceToUnavailableSegments;
    }

    public Map<String, Double> getDatasourceToLoadStatus() {
        HashMap<String, Double> loadStatus = new HashMap<String, Double>();
        Collection<ImmutableDruidDataSource> dataSources = this.segmentsMetadataManager.getImmutableDataSourcesWithAllUsedSegments();
        for (ImmutableDruidDataSource dataSource : dataSources) {
            HashSet segments = Sets.newHashSet(dataSource.getSegments());
            int numPublishedSegments = segments.size();
            for (DruidServer druidServer : this.serverInventoryView.getInventory()) {
                DruidDataSource loadedView = druidServer.getDataSource(dataSource.getName());
                if (loadedView == null) continue;
                for (DataSegment serverSegment : loadedView.getSegments()) {
                    segments.remove(serverSegment);
                }
            }
            int numUnavailableSegments = segments.size();
            loadStatus.put(dataSource.getName(), (double)(numPublishedSegments - numUnavailableSegments) * 100.0 / (double)numPublishedSegments);
        }
        return loadStatus;
    }

    @Nullable
    public Integer getReplicationFactor(SegmentId segmentId) {
        if (this.segmentReplicationStatus == null) {
            return null;
        }
        SegmentReplicaCount replicaCountsInCluster = this.segmentReplicationStatus.getReplicaCountsInCluster(segmentId);
        return replicaCountsInCluster == null ? null : Integer.valueOf(replicaCountsInCluster.required());
    }

    @Nullable
    public Long getTotalSizeOfSegmentsAwaitingCompaction(String dataSource) {
        return this.compactSegments.getTotalSizeOfSegmentsAwaitingCompaction(dataSource);
    }

    @Nullable
    public AutoCompactionSnapshot getAutoCompactionSnapshotForDataSource(String dataSource) {
        return this.compactSegments.getAutoCompactionSnapshot(dataSource);
    }

    public Map<String, AutoCompactionSnapshot> getAutoCompactionSnapshot() {
        return this.compactSegments.getAutoCompactionSnapshot();
    }

    private CoordinatorDynamicConfig getDynamicConfigs() {
        return CoordinatorDynamicConfig.current(this.configManager);
    }

    private CoordinatorCompactionConfig getCompactionConfig() {
        return CoordinatorCompactionConfig.current(this.configManager);
    }

    public void markSegmentsAsUnused(String datasource, Set<SegmentId> segmentIds) {
        log.debug("Marking [%d] segments of datasource [%s] as unused.", new Object[]{segmentIds.size(), datasource});
        int updatedCount = this.segmentsMetadataManager.markSegmentsAsUnused(segmentIds);
        log.info("Successfully marked [%d] segments of datasource [%s] as unused.", new Object[]{updatedCount, datasource});
    }

    public String getCurrentLeader() {
        return this.coordLeaderSelector.getCurrentLeader();
    }

    @VisibleForTesting
    public int getCachedBalancerThreadNumber() {
        return this.cachedBalancerThreadNumber;
    }

    @VisibleForTesting
    public ListeningExecutorService getBalancerExec() {
        return this.balancerExec;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @LifecycleStart
    public void start() {
        Object object = this.lock;
        synchronized (object) {
            if (this.started) {
                return;
            }
            this.started = true;
            this.coordLeaderSelector.registerListener(new DruidLeaderSelector.Listener(){

                @Override
                public void becomeLeader() {
                    DruidCoordinator.this.becomeLeader();
                }

                @Override
                public void stopBeingLeader() {
                    DruidCoordinator.this.stopBeingLeader();
                }
            });
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @LifecycleStop
    public void stop() {
        Object object = this.lock;
        synchronized (object) {
            if (!this.started) {
                return;
            }
            this.coordLeaderSelector.unregisterListener();
            this.started = false;
            this.exec.shutdownNow();
            if (this.balancerExec != null) {
                this.balancerExec.shutdownNow();
            }
        }
    }

    public void runCompactSegmentsDuty() {
        int startingLeaderCounter = this.coordLeaderSelector.localTerm();
        DutiesRunnable compactSegmentsDuty = new DutiesRunnable(this.makeCompactSegmentsDuty(), startingLeaderCounter, COMPACT_SEGMENTS_DUTIES_DUTY_GROUP, null);
        compactSegmentsDuty.run();
    }

    private Map<String, Object2LongMap<String>> computeUnderReplicated(Iterable<DataSegment> dataSegments, boolean computeUsingClusterView) {
        if (this.segmentReplicationStatus == null) {
            return Collections.emptyMap();
        }
        return this.segmentReplicationStatus.getTierToDatasourceToUnderReplicated(dataSegments, !computeUsingClusterView);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void becomeLeader() {
        Object object = this.lock;
        synchronized (object) {
            if (!this.started) {
                return;
            }
            log.info("I am the leader of the coordinators, all must bow! Starting coordination in [%s].", new Object[]{this.config.getCoordinatorStartDelay()});
            this.segmentsMetadataManager.startPollingDatabasePeriodically();
            this.metadataRuleManager.start();
            this.lookupCoordinatorManager.start();
            this.serviceAnnouncer.announce(this.self);
            int startingLeaderCounter = this.coordLeaderSelector.localTerm();
            ArrayList<DutiesRunnable> dutiesRunnables = new ArrayList<DutiesRunnable>();
            dutiesRunnables.add(new DutiesRunnable(this.makeHistoricalManagementDuties(), startingLeaderCounter, HISTORICAL_MANAGEMENT_DUTIES_DUTY_GROUP, this.config.getCoordinatorPeriod()));
            if (this.indexingServiceClient != null) {
                dutiesRunnables.add(new DutiesRunnable(this.makeIndexingServiceDuties(), startingLeaderCounter, INDEXING_SERVICE_DUTIES_DUTY_GROUP, this.config.getCoordinatorIndexingPeriod()));
            }
            dutiesRunnables.add(new DutiesRunnable(this.makeMetadataStoreManagementDuties(), startingLeaderCounter, METADATA_STORE_MANAGEMENT_DUTIES_DUTY_GROUP, this.config.getCoordinatorMetadataStoreManagementPeriod()));
            for (CoordinatorCustomDutyGroup customDutyGroup : this.customDutyGroups.getCoordinatorCustomDutyGroups()) {
                dutiesRunnables.add(new DutiesRunnable(customDutyGroup.getCustomDutyList(), startingLeaderCounter, customDutyGroup.getName(), customDutyGroup.getPeriod()));
                log.info("Done making custom coordinator duties [%s] for group [%s]", new Object[]{customDutyGroup.getCustomDutyList().stream().map(duty -> duty.getClass().getName()).collect(Collectors.toList()), customDutyGroup.getName()});
            }
            for (DutiesRunnable dutiesRunnable : dutiesRunnables) {
                ScheduledExecutors.scheduleAtFixedRate((ScheduledExecutorService)this.exec, (Duration)this.config.getCoordinatorStartDelay(), (Duration)dutiesRunnable.getPeriod(), () -> {
                    if (this.coordLeaderSelector.isLeader() && startingLeaderCounter == this.coordLeaderSelector.localTerm()) {
                        dutiesRunnable.run();
                    }
                    if (this.coordLeaderSelector.isLeader() && startingLeaderCounter == this.coordLeaderSelector.localTerm()) {
                        return ScheduledExecutors.Signal.REPEAT;
                    }
                    return ScheduledExecutors.Signal.STOP;
                });
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void stopBeingLeader() {
        Object object = this.lock;
        synchronized (object) {
            log.info("I am no longer the leader...", new Object[0]);
            for (String server : this.loadManagementPeons.keySet()) {
                LoadQueuePeon peon = this.loadManagementPeons.remove(server);
                peon.stop();
            }
            this.loadManagementPeons.clear();
            this.serviceAnnouncer.unannounce(this.self);
            this.lookupCoordinatorManager.stop();
            this.metadataRuleManager.stop();
            this.segmentsMetadataManager.stopPollingDatabasePeriodically();
            if (this.balancerExec != null) {
                this.balancerExec.shutdownNow();
                this.balancerExec = null;
            }
        }
    }

    @VisibleForTesting
    protected void initBalancerExecutor() {
        int currentNumber = this.getDynamicConfigs().getBalancerComputeThreads();
        if (this.balancerExec == null) {
            this.balancerExec = this.createNewBalancerExecutor(currentNumber);
        } else if (this.cachedBalancerThreadNumber != currentNumber) {
            log.info("balancerComputeThreads has changed from [%d] to [%d], recreating the thread pool.", new Object[]{this.cachedBalancerThreadNumber, currentNumber});
            this.balancerExec.shutdownNow();
            this.balancerExec = this.createNewBalancerExecutor(currentNumber);
        }
    }

    private ListeningExecutorService createNewBalancerExecutor(int numThreads) {
        this.cachedBalancerThreadNumber = numThreads;
        return MoreExecutors.listeningDecorator((ExecutorService)Execs.multiThreaded((int)numThreads, (String)"coordinator-cost-balancer-%s"));
    }

    private List<CoordinatorDuty> makeHistoricalManagementDuties() {
        return ImmutableList.of((Object)new UpdateCoordinatorStateAndPrepareCluster(), (Object)new RunRules(), (Object)new UpdateReplicationStatus(), (Object)new UnloadUnusedSegments(this.loadQueueManager), (Object)new MarkOvershadowedSegmentsAsUnused(this), (Object)new BalanceSegments(), (Object)new CollectSegmentAndServerStats(this));
    }

    @VisibleForTesting
    List<CoordinatorDuty> makeIndexingServiceDuties() {
        ArrayList<CoordinatorDuty> duties = new ArrayList<CoordinatorDuty>(this.indexingServiceDuties);
        if (this.getCompactSegmentsDutyFromCustomGroups().isEmpty()) {
            duties.addAll(this.makeCompactSegmentsDuty());
        }
        log.debug("Initialized indexing service duties [%s].", new Object[]{duties.stream().map(duty -> duty.getClass().getName()).collect(Collectors.toList())});
        return ImmutableList.copyOf(duties);
    }

    private List<CoordinatorDuty> makeMetadataStoreManagementDuties() {
        ImmutableList duties = ImmutableList.copyOf(this.metadataStoreManagementDuties);
        log.debug("Initialized metadata store management duties [%s].", new Object[]{duties.stream().map(duty -> duty.getClass().getName()).collect(Collectors.toList())});
        return duties;
    }

    @VisibleForTesting
    CompactSegments initializeCompactSegmentsDuty(CompactionSegmentSearchPolicy compactionSegmentSearchPolicy) {
        List<CompactSegments> compactSegmentsDutyFromCustomGroups = this.getCompactSegmentsDutyFromCustomGroups();
        if (compactSegmentsDutyFromCustomGroups.isEmpty()) {
            return new CompactSegments(this.config, compactionSegmentSearchPolicy, this.indexingServiceClient);
        }
        if (compactSegmentsDutyFromCustomGroups.size() > 1) {
            log.warn("More than one compactSegments duty is configured in the Coordinator Custom Duty Group. The first duty will be picked up.", new Object[0]);
        }
        return compactSegmentsDutyFromCustomGroups.get(0);
    }

    @VisibleForTesting
    List<CompactSegments> getCompactSegmentsDutyFromCustomGroups() {
        return this.customDutyGroups.getCoordinatorCustomDutyGroups().stream().flatMap(coordinatorCustomDutyGroup -> coordinatorCustomDutyGroup.getCustomDutyList().stream()).filter(duty -> duty instanceof CompactSegments).map(duty -> (CompactSegments)duty).collect(Collectors.toList());
    }

    private List<CoordinatorDuty> makeCompactSegmentsDuty() {
        return ImmutableList.of((Object)this.compactSegments);
    }

    private class UpdateReplicationStatus
    implements CoordinatorDuty {
        private UpdateReplicationStatus() {
        }

        @Override
        public DruidCoordinatorRuntimeParams run(DruidCoordinatorRuntimeParams params) {
            DruidCoordinator.this.segmentReplicationStatus = params.getSegmentReplicationStatus();
            return params;
        }
    }

    private class UpdateCoordinatorStateAndPrepareCluster
    implements CoordinatorDuty {
        private UpdateCoordinatorStateAndPrepareCluster() {
        }

        @Override
        @Nullable
        public DruidCoordinatorRuntimeParams run(DruidCoordinatorRuntimeParams params) {
            List<ImmutableDruidServer> currentServers = this.prepareCurrentServers();
            this.startPeonsForNewServers(currentServers);
            this.stopPeonsForDisappearedServers(currentServers);
            CoordinatorDynamicConfig dynamicConfig = params.getCoordinatorDynamicConfig();
            SegmentLoadingConfig segmentLoadingConfig = params.getSegmentLoadingConfig();
            DruidCluster cluster = this.prepareCluster(dynamicConfig, segmentLoadingConfig, currentServers);
            this.cancelLoadsOnDecommissioningServers(cluster);
            DruidCoordinator.this.initBalancerExecutor();
            BalancerStrategy balancerStrategy = DruidCoordinator.this.balancerStrategyFactory.createBalancerStrategy(DruidCoordinator.this.balancerExec);
            log.info("Using balancer strategy [%s] with round-robin assignment [%s] and debug dimensions [%s].", new Object[]{balancerStrategy.getClass().getSimpleName(), segmentLoadingConfig.isUseRoundRobinSegmentAssignment(), dynamicConfig.getDebugDimensions()});
            return params.buildFromExisting().withDruidCluster(cluster).withBalancerStrategy(balancerStrategy).withSegmentAssignerUsing(DruidCoordinator.this.loadQueueManager).build();
        }

        private void cancelLoadsOnDecommissioningServers(DruidCluster cluster) {
            AtomicInteger cancelledCount = new AtomicInteger(0);
            List decommissioningServers = cluster.getAllServers().stream().filter(ServerHolder::isDecommissioning).collect(Collectors.toList());
            for (ServerHolder server : decommissioningServers) {
                server.getQueuedSegments().forEach((segment, action) -> {
                    if (action.isLoad() && server.cancelOperation((SegmentAction)((Object)action), (DataSegment)segment)) {
                        cancelledCount.incrementAndGet();
                    }
                });
            }
            if (cancelledCount.get() > 0) {
                log.info("Cancelled [%d] load/move operations on [%d] decommissioning servers.", new Object[]{cancelledCount.get(), decommissioningServers.size()});
            }
        }

        List<ImmutableDruidServer> prepareCurrentServers() {
            List<ImmutableDruidServer> currentServers = DruidCoordinator.this.serverInventoryView.getInventory().stream().filter(DruidServer::isSegmentReplicationOrBroadcastTarget).map(DruidServer::toImmutableDruidServer).collect(Collectors.toList());
            if (log.isDebugEnabled()) {
                log.debug("Servers", new Object[0]);
                for (ImmutableDruidServer druidServer : currentServers) {
                    log.debug("  %s", new Object[]{druidServer});
                    log.debug("    -- DataSources", new Object[0]);
                    for (ImmutableDruidDataSource druidDataSource : druidServer.getDataSources()) {
                        log.debug("    %s", new Object[]{druidDataSource});
                    }
                }
            }
            return currentServers;
        }

        void startPeonsForNewServers(List<ImmutableDruidServer> currentServers) {
            for (ImmutableDruidServer server : currentServers) {
                DruidCoordinator.this.loadManagementPeons.computeIfAbsent(server.getName(), serverName -> {
                    LoadQueuePeon loadQueuePeon = DruidCoordinator.this.taskMaster.giveMePeon(server);
                    loadQueuePeon.start();
                    log.debug("Created LoadQueuePeon for server[%s].", new Object[]{server.getName()});
                    return loadQueuePeon;
                });
            }
        }

        DruidCluster prepareCluster(CoordinatorDynamicConfig dynamicConfig, SegmentLoadingConfig segmentLoadingConfig, List<ImmutableDruidServer> currentServers) {
            Set<String> decommissioningServers = dynamicConfig.getDecommissioningNodes();
            DruidCluster.Builder cluster = DruidCluster.builder();
            for (ImmutableDruidServer server : currentServers) {
                cluster.add(new ServerHolder(server, (LoadQueuePeon)DruidCoordinator.this.loadManagementPeons.get(server.getName()), decommissioningServers.contains(server.getHost()), segmentLoadingConfig.getMaxSegmentsInLoadQueue(), segmentLoadingConfig.getMaxLifetimeInLoadQueue()));
            }
            return cluster.build();
        }

        void stopPeonsForDisappearedServers(List<ImmutableDruidServer> servers) {
            HashSet disappeared = Sets.newHashSet((Iterable)DruidCoordinator.this.loadManagementPeons.keySet());
            for (ImmutableDruidServer server : servers) {
                disappeared.remove(server.getName());
            }
            for (String name : disappeared) {
                log.debug("Removing listener for server[%s] which is no longer there.", new Object[]{name});
                LoadQueuePeon peon = (LoadQueuePeon)DruidCoordinator.this.loadManagementPeons.remove(name);
                peon.stop();
            }
        }
    }

    private class DutiesRunnable
    implements Runnable {
        private final DateTime coordinatorStartTime = DateTimes.nowUtc();
        private final List<? extends CoordinatorDuty> duties;
        private final int startingLeaderCounter;
        private final String dutyGroupName;
        private final Duration period;

        DutiesRunnable(List<? extends CoordinatorDuty> duties, int startingLeaderCounter, String alias, Duration period) {
            this.duties = duties;
            this.startingLeaderCounter = startingLeaderCounter;
            this.dutyGroupName = alias;
            this.period = period;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void run() {
            try {
                log.info("Starting coordinator run for group [%s]", new Object[]{this.dutyGroupName});
                Stopwatch groupRunTime = Stopwatch.createStarted();
                Object object = DruidCoordinator.this.lock;
                synchronized (object) {
                    if (!DruidCoordinator.this.coordLeaderSelector.isLeader()) {
                        log.info("LEGGO MY EGGO. [%s] is leader.", new Object[]{DruidCoordinator.this.coordLeaderSelector.getCurrentLeader()});
                        DruidCoordinator.this.stopBeingLeader();
                        return;
                    }
                }
                List<Boolean> allStarted = Arrays.asList(DruidCoordinator.this.segmentsMetadataManager.isPollingDatabasePeriodically(), DruidCoordinator.this.serverInventoryView.isStarted());
                for (Boolean aBoolean : allStarted) {
                    if (aBoolean.booleanValue()) continue;
                    log.error("InventoryManagers not started[%s]", new Object[]{allStarted});
                    DruidCoordinator.this.stopBeingLeader();
                    return;
                }
                DataSourcesSnapshot dataSourcesSnapshot = DruidCoordinator.this.segmentsMetadataManager.getSnapshotOfDataSourcesWithAllUsedSegments();
                DruidCoordinatorRuntimeParams params = DruidCoordinatorRuntimeParams.newBuilder(this.coordinatorStartTime).withDatabaseRuleManager(DruidCoordinator.this.metadataRuleManager).withSnapshotOfDataSourcesWithAllUsedSegments(dataSourcesSnapshot).withDynamicConfigs(DruidCoordinator.this.getDynamicConfigs()).withCompactionConfig(DruidCoordinator.this.getCompactionConfig()).withEmitter(DruidCoordinator.this.emitter).build();
                log.info("Initialized run params for group [%s] with [%,d] used segments in [%d] datasources.", new Object[]{this.dutyGroupName, params.getUsedSegments().size(), dataSourcesSnapshot.getDataSourcesMap().size()});
                boolean coordinationPaused = DruidCoordinator.this.getDynamicConfigs().getPauseCoordination();
                if (coordinationPaused && DruidCoordinator.this.coordLeaderSelector.isLeader() && this.startingLeaderCounter == DruidCoordinator.this.coordLeaderSelector.localTerm()) {
                    log.info("Coordination has been paused. Duties will not run until coordination is resumed.", new Object[0]);
                }
                Stopwatch dutyRunTime = Stopwatch.createUnstarted();
                for (CoordinatorDuty coordinatorDuty : this.duties) {
                    if (coordinationPaused || !DruidCoordinator.this.coordLeaderSelector.isLeader() || this.startingLeaderCounter != DruidCoordinator.this.coordLeaderSelector.localTerm()) continue;
                    dutyRunTime.reset().start();
                    params = coordinatorDuty.run(params);
                    dutyRunTime.stop();
                    String dutyName = coordinatorDuty.getClass().getName();
                    if (params == null) {
                        log.info("Stopping run for group [%s] on request of duty [%s].", new Object[]{this.dutyGroupName, dutyName});
                        return;
                    }
                    RowKey rowKey = RowKey.of(Dimension.DUTY, dutyName);
                    long dutyRunMillis = dutyRunTime.elapsed(TimeUnit.MILLISECONDS);
                    params.getCoordinatorStats().add(Stats.CoordinatorRun.DUTY_RUN_TIME, rowKey, dutyRunMillis);
                }
                CoordinatorRunStats allStats = params.getCoordinatorStats();
                if (allStats.rowCount() > 0) {
                    AtomicInteger atomicInteger = new AtomicInteger();
                    allStats.forEachStat((stat, dimensions, value) -> {
                        if (stat.shouldEmit()) {
                            this.emitStat(stat, dimensions.getValues(), value);
                            emittedCount.incrementAndGet();
                        }
                    });
                    log.info("Emitted [%d] stats for group [%s]. All collected stats:%s\n", new Object[]{atomicInteger.get(), this.dutyGroupName, allStats.buildStatsTable()});
                }
                long l = groupRunTime.stop().elapsed(TimeUnit.MILLISECONDS);
                this.emitStat(Stats.CoordinatorRun.GROUP_RUN_TIME, Collections.emptyMap(), l);
                log.info("Finished coordinator run for group [%s] in [%d] ms", new Object[]{this.dutyGroupName, l});
            }
            catch (Exception e) {
                log.makeAlert((Throwable)e, "Caught exception, ignoring so that schedule keeps going.", new Object[0]).emit();
            }
        }

        private void emitStat(CoordinatorStat stat, Map<Dimension, String> dimensionValues, long value) {
            ServiceMetricEvent.Builder eventBuilder = new ServiceMetricEvent.Builder().setDimension(Dimension.DUTY_GROUP.reportedName(), (Object)this.dutyGroupName);
            dimensionValues.forEach((dim, dimValue) -> eventBuilder.setDimension(dim.reportedName(), dimValue));
            DruidCoordinator.this.emitter.emit(eventBuilder.build(stat.getMetricName(), (Number)value));
        }

        Duration getPeriod() {
            return this.period;
        }

        public String toString() {
            return "DutiesRunnable{group='" + this.dutyGroupName + '\'' + '}';
        }
    }
}

