/*
 * Decompiled with CFR 0.152.
 */
package org.apache.druid.server.coordinator;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Ordering;
import com.google.common.collect.Sets;
import com.google.errorprone.annotations.concurrent.GuardedBy;
import com.google.inject.Inject;
import it.unimi.dsi.fastutil.objects.Object2IntMap;
import it.unimi.dsi.fastutil.objects.Object2IntMaps;
import it.unimi.dsi.fastutil.objects.Object2IntOpenHashMap;
import it.unimi.dsi.fastutil.objects.Object2LongMap;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.apache.druid.client.DataSourcesSnapshot;
import org.apache.druid.client.DruidDataSource;
import org.apache.druid.client.DruidServer;
import org.apache.druid.client.ImmutableDruidDataSource;
import org.apache.druid.client.ServerInventoryView;
import org.apache.druid.client.coordinator.Coordinator;
import org.apache.druid.curator.discovery.ServiceAnnouncer;
import org.apache.druid.discovery.DruidLeaderSelector;
import org.apache.druid.guice.ManageLifecycle;
import org.apache.druid.guice.annotations.Self;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.Stopwatch;
import org.apache.druid.java.util.common.concurrent.ScheduledExecutorFactory;
import org.apache.druid.java.util.common.concurrent.ScheduledExecutors;
import org.apache.druid.java.util.common.guava.Comparators;
import org.apache.druid.java.util.common.lifecycle.LifecycleStart;
import org.apache.druid.java.util.common.lifecycle.LifecycleStop;
import org.apache.druid.java.util.emitter.EmittingLogger;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.druid.java.util.emitter.service.ServiceEventBuilder;
import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
import org.apache.druid.rpc.indexing.OverlordClient;
import org.apache.druid.server.DruidNode;
import org.apache.druid.server.coordinator.AutoCompactionSnapshot;
import org.apache.druid.server.coordinator.CoordinatorCompactionConfig;
import org.apache.druid.server.coordinator.CoordinatorDynamicConfig;
import org.apache.druid.server.coordinator.DruidCoordinatorConfig;
import org.apache.druid.server.coordinator.DruidCoordinatorRuntimeParams;
import org.apache.druid.server.coordinator.MetadataManager;
import org.apache.druid.server.coordinator.balancer.BalancerStrategyFactory;
import org.apache.druid.server.coordinator.compact.CompactionSegmentSearchPolicy;
import org.apache.druid.server.coordinator.duty.BalanceSegments;
import org.apache.druid.server.coordinator.duty.CollectSegmentAndServerStats;
import org.apache.druid.server.coordinator.duty.CompactSegments;
import org.apache.druid.server.coordinator.duty.CoordinatorCustomDutyGroup;
import org.apache.druid.server.coordinator.duty.CoordinatorCustomDutyGroups;
import org.apache.druid.server.coordinator.duty.CoordinatorDuty;
import org.apache.druid.server.coordinator.duty.KillAuditLog;
import org.apache.druid.server.coordinator.duty.KillCompactionConfig;
import org.apache.druid.server.coordinator.duty.KillDatasourceMetadata;
import org.apache.druid.server.coordinator.duty.KillRules;
import org.apache.druid.server.coordinator.duty.KillStalePendingSegments;
import org.apache.druid.server.coordinator.duty.KillSupervisors;
import org.apache.druid.server.coordinator.duty.KillUnusedSegments;
import org.apache.druid.server.coordinator.duty.MarkEternityTombstonesAsUnused;
import org.apache.druid.server.coordinator.duty.MarkOvershadowedSegmentsAsUnused;
import org.apache.druid.server.coordinator.duty.PrepareBalancerAndLoadQueues;
import org.apache.druid.server.coordinator.duty.RunRules;
import org.apache.druid.server.coordinator.duty.UnloadUnusedSegments;
import org.apache.druid.server.coordinator.loading.LoadQueuePeon;
import org.apache.druid.server.coordinator.loading.LoadQueueTaskMaster;
import org.apache.druid.server.coordinator.loading.SegmentLoadQueueManager;
import org.apache.druid.server.coordinator.loading.SegmentReplicaCount;
import org.apache.druid.server.coordinator.loading.SegmentReplicationStatus;
import org.apache.druid.server.coordinator.stats.CoordinatorRunStats;
import org.apache.druid.server.coordinator.stats.CoordinatorStat;
import org.apache.druid.server.coordinator.stats.Dimension;
import org.apache.druid.server.coordinator.stats.RowKey;
import org.apache.druid.server.coordinator.stats.Stats;
import org.apache.druid.server.lookup.cache.LookupCoordinatorManager;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.SegmentId;
import org.joda.time.DateTime;
import org.joda.time.Duration;

@ManageLifecycle
public class DruidCoordinator {
    public static final Ordering<DataSegment> SEGMENT_COMPARATOR_RECENT_FIRST = Ordering.from((Comparator)Comparators.intervalsByEndThenStart()).onResultOf(DataSegment::getInterval).compound((Comparator)Ordering.natural()).reverse();
    private static final EmittingLogger log = new EmittingLogger(DruidCoordinator.class);
    private final Object lock = new Object();
    private final DruidCoordinatorConfig config;
    private final MetadataManager metadataManager;
    private final ServerInventoryView serverInventoryView;
    private final ServiceEmitter emitter;
    private final OverlordClient overlordClient;
    private final ScheduledExecutorFactory executorFactory;
    private final Map<String, ScheduledExecutorService> dutyGroupExecutors = new HashMap<String, ScheduledExecutorService>();
    private final LoadQueueTaskMaster taskMaster;
    private final SegmentLoadQueueManager loadQueueManager;
    private final ServiceAnnouncer serviceAnnouncer;
    private final DruidNode self;
    private final CoordinatorCustomDutyGroups customDutyGroups;
    private final BalancerStrategyFactory balancerStrategyFactory;
    private final LookupCoordinatorManager lookupCoordinatorManager;
    private final DruidLeaderSelector coordLeaderSelector;
    private final CompactSegments compactSegments;
    private volatile boolean started = false;
    private volatile SegmentReplicationStatus segmentReplicationStatus = null;
    public static final String HISTORICAL_MANAGEMENT_DUTIES_DUTY_GROUP = "HistoricalManagementDuties";
    private static final String METADATA_STORE_MANAGEMENT_DUTIES_DUTY_GROUP = "MetadataStoreManagementDuties";
    private static final String INDEXING_SERVICE_DUTIES_DUTY_GROUP = "IndexingServiceDuties";
    private static final String COMPACT_SEGMENTS_DUTIES_DUTY_GROUP = "CompactSegmentsDuties";

    @Inject
    public DruidCoordinator(DruidCoordinatorConfig config, MetadataManager metadataManager, ServerInventoryView serverInventoryView, ServiceEmitter emitter, ScheduledExecutorFactory scheduledExecutorFactory, OverlordClient overlordClient, LoadQueueTaskMaster taskMaster, SegmentLoadQueueManager loadQueueManager, ServiceAnnouncer serviceAnnouncer, @Self DruidNode self, CoordinatorCustomDutyGroups customDutyGroups, BalancerStrategyFactory balancerStrategyFactory, LookupCoordinatorManager lookupCoordinatorManager, @Coordinator DruidLeaderSelector coordLeaderSelector, CompactionSegmentSearchPolicy compactionSegmentSearchPolicy) {
        this.config = config;
        this.metadataManager = metadataManager;
        this.serverInventoryView = serverInventoryView;
        this.emitter = emitter;
        this.overlordClient = overlordClient;
        this.taskMaster = taskMaster;
        this.serviceAnnouncer = serviceAnnouncer;
        this.self = self;
        this.customDutyGroups = customDutyGroups;
        this.executorFactory = scheduledExecutorFactory;
        this.balancerStrategyFactory = balancerStrategyFactory;
        this.lookupCoordinatorManager = lookupCoordinatorManager;
        this.coordLeaderSelector = coordLeaderSelector;
        this.compactSegments = this.initializeCompactSegmentsDuty(compactionSegmentSearchPolicy);
        this.loadQueueManager = loadQueueManager;
    }

    public boolean isLeader() {
        return this.coordLeaderSelector.isLeader();
    }

    public Map<String, LoadQueuePeon> getLoadManagementPeons() {
        return this.taskMaster.getAllPeons();
    }

    public Map<String, Object2LongMap<String>> getTierToDatasourceToUnderReplicatedCount(boolean useClusterView) {
        Iterable<DataSegment> dataSegments = this.metadataManager.segments().iterateAllUsedSegments();
        return this.computeUnderReplicated(dataSegments, useClusterView);
    }

    public Map<String, Object2LongMap<String>> getTierToDatasourceToUnderReplicatedCount(Iterable<DataSegment> dataSegments, boolean useClusterView) {
        return this.computeUnderReplicated(dataSegments, useClusterView);
    }

    public Object2IntMap<String> getDatasourceToUnavailableSegmentCount() {
        if (this.segmentReplicationStatus == null) {
            return Object2IntMaps.emptyMap();
        }
        Object2IntOpenHashMap datasourceToUnavailableSegments = new Object2IntOpenHashMap();
        Iterable<DataSegment> dataSegments = this.metadataManager.segments().iterateAllUsedSegments();
        for (DataSegment segment : dataSegments) {
            SegmentReplicaCount replicaCount = this.segmentReplicationStatus.getReplicaCountsInCluster(segment.getId());
            if (replicaCount != null && replicaCount.totalLoaded() > 0) {
                datasourceToUnavailableSegments.addTo((Object)segment.getDataSource(), 0);
                continue;
            }
            datasourceToUnavailableSegments.addTo((Object)segment.getDataSource(), 1);
        }
        return datasourceToUnavailableSegments;
    }

    public Map<String, Double> getDatasourceToLoadStatus() {
        HashMap<String, Double> loadStatus = new HashMap<String, Double>();
        Collection<ImmutableDruidDataSource> dataSources = this.metadataManager.segments().getImmutableDataSourcesWithAllUsedSegments();
        for (ImmutableDruidDataSource dataSource : dataSources) {
            HashSet segments = Sets.newHashSet(dataSource.getSegments());
            int numPublishedSegments = segments.size();
            for (DruidServer druidServer : this.serverInventoryView.getInventory()) {
                DruidDataSource loadedView = druidServer.getDataSource(dataSource.getName());
                if (loadedView == null) continue;
                for (DataSegment serverSegment : loadedView.getSegments()) {
                    segments.remove(serverSegment);
                }
            }
            int numUnavailableSegments = segments.size();
            loadStatus.put(dataSource.getName(), (double)(numPublishedSegments - numUnavailableSegments) * 100.0 / (double)numPublishedSegments);
        }
        return loadStatus;
    }

    @Nullable
    public Integer getReplicationFactor(SegmentId segmentId) {
        if (this.segmentReplicationStatus == null) {
            return null;
        }
        SegmentReplicaCount replicaCountsInCluster = this.segmentReplicationStatus.getReplicaCountsInCluster(segmentId);
        return replicaCountsInCluster == null ? null : Integer.valueOf(replicaCountsInCluster.required());
    }

    @Nullable
    public Long getTotalSizeOfSegmentsAwaitingCompaction(String dataSource) {
        return this.compactSegments.getTotalSizeOfSegmentsAwaitingCompaction(dataSource);
    }

    @Nullable
    public AutoCompactionSnapshot getAutoCompactionSnapshotForDataSource(String dataSource) {
        return this.compactSegments.getAutoCompactionSnapshot(dataSource);
    }

    public Map<String, AutoCompactionSnapshot> getAutoCompactionSnapshot() {
        return this.compactSegments.getAutoCompactionSnapshot();
    }

    public String getCurrentLeader() {
        return this.coordLeaderSelector.getCurrentLeader();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @LifecycleStart
    public void start() {
        Object object = this.lock;
        synchronized (object) {
            if (this.started) {
                return;
            }
            this.started = true;
            this.coordLeaderSelector.registerListener(new DruidLeaderSelector.Listener(){

                @Override
                public void becomeLeader() {
                    DruidCoordinator.this.becomeLeader();
                }

                @Override
                public void stopBeingLeader() {
                    DruidCoordinator.this.stopBeingLeader();
                }
            });
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @LifecycleStop
    public void stop() {
        Object object = this.lock;
        synchronized (object) {
            if (!this.started) {
                return;
            }
            this.coordLeaderSelector.unregisterListener();
            this.started = false;
            this.stopAllDutyGroupExecutors();
            this.balancerStrategyFactory.stopExecutor();
        }
    }

    public void runCompactSegmentsDuty() {
        int startingLeaderCounter = this.coordLeaderSelector.localTerm();
        DutiesRunnable compactSegmentsDuty = new DutiesRunnable((List<? extends CoordinatorDuty>)ImmutableList.of((Object)this.compactSegments), startingLeaderCounter, COMPACT_SEGMENTS_DUTIES_DUTY_GROUP, null);
        compactSegmentsDuty.run();
    }

    private Map<String, Object2LongMap<String>> computeUnderReplicated(Iterable<DataSegment> dataSegments, boolean computeUsingClusterView) {
        if (this.segmentReplicationStatus == null) {
            return Collections.emptyMap();
        }
        return this.segmentReplicationStatus.getTierToDatasourceToUnderReplicated(dataSegments, !computeUsingClusterView);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void becomeLeader() {
        Object object = this.lock;
        synchronized (object) {
            if (!this.started) {
                return;
            }
            log.info("I am the leader of the coordinators, all must bow! Starting coordination in [%s].", new Object[]{this.config.getCoordinatorStartDelay()});
            this.metadataManager.onLeaderStart();
            this.taskMaster.onLeaderStart();
            this.lookupCoordinatorManager.start();
            this.serviceAnnouncer.announce(this.self);
            int startingLeaderCounter = this.coordLeaderSelector.localTerm();
            ArrayList<DutiesRunnable> dutiesRunnables = new ArrayList<DutiesRunnable>();
            dutiesRunnables.add(new DutiesRunnable(this.makeHistoricalManagementDuties(), startingLeaderCounter, HISTORICAL_MANAGEMENT_DUTIES_DUTY_GROUP, this.config.getCoordinatorPeriod()));
            if (this.overlordClient != null) {
                dutiesRunnables.add(new DutiesRunnable(this.makeIndexingServiceDuties(), startingLeaderCounter, INDEXING_SERVICE_DUTIES_DUTY_GROUP, this.config.getCoordinatorIndexingPeriod()));
            }
            dutiesRunnables.add(new DutiesRunnable(this.makeMetadataStoreManagementDuties(), startingLeaderCounter, METADATA_STORE_MANAGEMENT_DUTIES_DUTY_GROUP, this.config.getCoordinatorMetadataStoreManagementPeriod()));
            for (CoordinatorCustomDutyGroup customDutyGroup : this.customDutyGroups.getCoordinatorCustomDutyGroups()) {
                dutiesRunnables.add(new DutiesRunnable(customDutyGroup.getCustomDutyList(), startingLeaderCounter, customDutyGroup.getName(), customDutyGroup.getPeriod()));
                log.info("Done making custom coordinator duties [%s] for group [%s]", new Object[]{customDutyGroup.getCustomDutyList().stream().map(duty -> duty.getClass().getName()).collect(Collectors.toList()), customDutyGroup.getName()});
            }
            for (DutiesRunnable dutiesRunnable : dutiesRunnables) {
                ScheduledExecutors.scheduleAtFixedRate((ScheduledExecutorService)this.getOrCreateDutyGroupExecutor(dutiesRunnable.dutyGroupName), (Duration)this.config.getCoordinatorStartDelay(), (Duration)dutiesRunnable.getPeriod(), () -> {
                    if (this.coordLeaderSelector.isLeader() && startingLeaderCounter == this.coordLeaderSelector.localTerm()) {
                        dutiesRunnable.run();
                    }
                    if (this.coordLeaderSelector.isLeader() && startingLeaderCounter == this.coordLeaderSelector.localTerm()) {
                        return ScheduledExecutors.Signal.REPEAT;
                    }
                    return ScheduledExecutors.Signal.STOP;
                });
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void stopBeingLeader() {
        Object object = this.lock;
        synchronized (object) {
            log.info("I am no longer the leader...", new Object[0]);
            this.taskMaster.onLeaderStop();
            this.serviceAnnouncer.unannounce(this.self);
            this.lookupCoordinatorManager.stop();
            this.metadataManager.onLeaderStop();
            this.balancerStrategyFactory.stopExecutor();
        }
    }

    @GuardedBy(value="lock")
    private ScheduledExecutorService getOrCreateDutyGroupExecutor(String dutyGroup) {
        return this.dutyGroupExecutors.computeIfAbsent(dutyGroup, group -> this.executorFactory.create(1, "Coordinator-Exec-" + dutyGroup + "-%d"));
    }

    @GuardedBy(value="lock")
    private void stopAllDutyGroupExecutors() {
        this.dutyGroupExecutors.values().forEach(ExecutorService::shutdownNow);
        this.dutyGroupExecutors.clear();
    }

    private List<CoordinatorDuty> makeHistoricalManagementDuties() {
        return ImmutableList.of((Object)new PrepareBalancerAndLoadQueues(this.taskMaster, this.loadQueueManager, this.balancerStrategyFactory, this.serverInventoryView), (Object)new RunRules(segments -> this.metadataManager.segments().markSegmentsAsUnused(segments)), (Object)new UpdateReplicationStatus(), (Object)new UnloadUnusedSegments(this.loadQueueManager), (Object)new MarkOvershadowedSegmentsAsUnused(segments -> this.metadataManager.segments().markSegmentsAsUnused(segments)), (Object)new MarkEternityTombstonesAsUnused(segments -> this.metadataManager.segments().markSegmentsAsUnused(segments)), (Object)new BalanceSegments(this.config.getCoordinatorPeriod()), (Object)new CollectSegmentAndServerStats(this.taskMaster));
    }

    @VisibleForTesting
    List<CoordinatorDuty> makeIndexingServiceDuties() {
        ArrayList<CoordinatorDuty> duties = new ArrayList<CoordinatorDuty>();
        if (this.config.isKillUnusedSegmentsEnabled()) {
            duties.add(new KillUnusedSegments(this.metadataManager.segments(), this.overlordClient, this.config));
        }
        if (this.config.isKillPendingSegmentsEnabled()) {
            duties.add(new KillStalePendingSegments(this.overlordClient));
        }
        if (this.getCompactSegmentsDutyFromCustomGroups().isEmpty()) {
            duties.add(this.compactSegments);
        }
        log.debug("Initialized indexing service duties [%s].", new Object[]{duties.stream().map(duty -> duty.getClass().getName()).collect(Collectors.toList())});
        return ImmutableList.copyOf(duties);
    }

    private List<CoordinatorDuty> makeMetadataStoreManagementDuties() {
        return Arrays.asList(new KillSupervisors(this.config, this.metadataManager.supervisors()), new KillAuditLog(this.config, this.metadataManager.audit()), new KillRules(this.config, this.metadataManager.rules()), new KillDatasourceMetadata(this.config, this.metadataManager.indexer(), this.metadataManager.supervisors()), new KillCompactionConfig(this.config, this.metadataManager.segments(), this.metadataManager.configs()));
    }

    @VisibleForTesting
    CompactSegments initializeCompactSegmentsDuty(CompactionSegmentSearchPolicy compactionSegmentSearchPolicy) {
        List<CompactSegments> compactSegmentsDutyFromCustomGroups = this.getCompactSegmentsDutyFromCustomGroups();
        if (compactSegmentsDutyFromCustomGroups.isEmpty()) {
            return new CompactSegments(compactionSegmentSearchPolicy, this.overlordClient);
        }
        if (compactSegmentsDutyFromCustomGroups.size() > 1) {
            log.warn("More than one compactSegments duty is configured in the Coordinator Custom Duty Group. The first duty will be picked up.", new Object[0]);
        }
        return compactSegmentsDutyFromCustomGroups.get(0);
    }

    @VisibleForTesting
    List<CompactSegments> getCompactSegmentsDutyFromCustomGroups() {
        return this.customDutyGroups.getCoordinatorCustomDutyGroups().stream().flatMap(coordinatorCustomDutyGroup -> coordinatorCustomDutyGroup.getCustomDutyList().stream()).filter(duty -> duty instanceof CompactSegments).map(duty -> (CompactSegments)duty).collect(Collectors.toList());
    }

    private class UpdateReplicationStatus
    implements CoordinatorDuty {
        private UpdateReplicationStatus() {
        }

        @Override
        public DruidCoordinatorRuntimeParams run(DruidCoordinatorRuntimeParams params) {
            DruidCoordinator.this.segmentReplicationStatus = params.getSegmentReplicationStatus();
            CoordinatorRunStats stats = params.getCoordinatorStats();
            DruidCoordinator.this.getDatasourceToUnavailableSegmentCount().forEach((dataSource, numUnavailable) -> stats.add(Stats.Segments.UNAVAILABLE, RowKey.of(Dimension.DATASOURCE, dataSource), numUnavailable.intValue()));
            DruidCoordinator.this.getTierToDatasourceToUnderReplicatedCount(false).forEach((tier, countsPerDatasource) -> countsPerDatasource.forEach((dataSource, underReplicatedCount) -> stats.addToSegmentStat(Stats.Segments.UNDER_REPLICATED, (String)tier, (String)dataSource, (long)underReplicatedCount)));
            return params;
        }
    }

    private class DutiesRunnable
    implements Runnable {
        private final DateTime coordinatorStartTime = DateTimes.nowUtc();
        private final List<? extends CoordinatorDuty> duties;
        private final int startingLeaderCounter;
        private final String dutyGroupName;
        private final Duration period;

        DutiesRunnable(List<? extends CoordinatorDuty> duties, int startingLeaderCounter, String alias, Duration period) {
            this.duties = duties;
            this.startingLeaderCounter = startingLeaderCounter;
            this.dutyGroupName = alias;
            this.period = period;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void run() {
            try {
                log.info("Starting coordinator run for group [%s]", new Object[]{this.dutyGroupName});
                Stopwatch groupRunTime = Stopwatch.createStarted();
                Object object = DruidCoordinator.this.lock;
                synchronized (object) {
                    if (!DruidCoordinator.this.coordLeaderSelector.isLeader()) {
                        log.info("LEGGO MY EGGO. [%s] is leader.", new Object[]{DruidCoordinator.this.coordLeaderSelector.getCurrentLeader()});
                        DruidCoordinator.this.stopBeingLeader();
                        return;
                    }
                }
                List<Boolean> allStarted = Arrays.asList(DruidCoordinator.this.metadataManager.isStarted(), DruidCoordinator.this.serverInventoryView.isStarted());
                for (Boolean aBoolean : allStarted) {
                    if (aBoolean.booleanValue()) continue;
                    log.error("InventoryManagers not started[%s]", new Object[]{allStarted});
                    DruidCoordinator.this.stopBeingLeader();
                    return;
                }
                DataSourcesSnapshot dataSourcesSnapshot = DruidCoordinator.this.metadataManager.segments().getSnapshotOfDataSourcesWithAllUsedSegments();
                CoordinatorDynamicConfig dynamicConfig = DruidCoordinator.this.metadataManager.configs().getCurrentDynamicConfig();
                CoordinatorCompactionConfig compactionConfig = DruidCoordinator.this.metadataManager.configs().getCurrentCompactionConfig();
                DruidCoordinatorRuntimeParams params = DruidCoordinatorRuntimeParams.newBuilder(this.coordinatorStartTime).withDatabaseRuleManager(DruidCoordinator.this.metadataManager.rules()).withDataSourcesSnapshot(dataSourcesSnapshot).withDynamicConfigs(dynamicConfig).withCompactionConfig(compactionConfig).build();
                log.info("Initialized run params for group [%s] with [%,d] used segments in [%d] datasources.", new Object[]{this.dutyGroupName, params.getUsedSegments().size(), dataSourcesSnapshot.getDataSourcesMap().size()});
                boolean coordinationPaused = dynamicConfig.getPauseCoordination();
                if (coordinationPaused && DruidCoordinator.this.coordLeaderSelector.isLeader() && this.startingLeaderCounter == DruidCoordinator.this.coordLeaderSelector.localTerm()) {
                    log.info("Coordination has been paused. Duties will not run until coordination is resumed.", new Object[0]);
                }
                Stopwatch dutyRunTime = Stopwatch.createUnstarted();
                for (CoordinatorDuty coordinatorDuty : this.duties) {
                    if (coordinationPaused || !DruidCoordinator.this.coordLeaderSelector.isLeader() || this.startingLeaderCounter != DruidCoordinator.this.coordLeaderSelector.localTerm()) continue;
                    dutyRunTime.restart();
                    params = coordinatorDuty.run(params);
                    dutyRunTime.stop();
                    String dutyName = coordinatorDuty.getClass().getName();
                    if (params == null) {
                        log.info("Stopping run for group [%s] on request of duty [%s].", new Object[]{this.dutyGroupName, dutyName});
                        return;
                    }
                    RowKey rowKey = RowKey.of(Dimension.DUTY, dutyName);
                    long dutyRunMillis = dutyRunTime.millisElapsed();
                    params.getCoordinatorStats().add(Stats.CoordinatorRun.DUTY_RUN_TIME, rowKey, dutyRunMillis);
                }
                CoordinatorRunStats allStats = params.getCoordinatorStats();
                if (allStats.rowCount() > 0) {
                    AtomicInteger atomicInteger = new AtomicInteger();
                    allStats.forEachStat((stat, dimensions, value) -> {
                        if (stat.shouldEmit()) {
                            this.emitStat(stat, dimensions.getValues(), value);
                            emittedCount.incrementAndGet();
                        }
                    });
                    log.info("Emitted [%d] stats for group [%s]. All collected stats:%s", new Object[]{atomicInteger.get(), this.dutyGroupName, allStats.buildStatsTable()});
                }
                groupRunTime.stop();
                long l = groupRunTime.millisElapsed();
                this.emitStat(Stats.CoordinatorRun.GROUP_RUN_TIME, Collections.emptyMap(), l);
                log.info("Finished coordinator run for group [%s] in [%d] ms.%n", new Object[]{this.dutyGroupName, l});
            }
            catch (Exception e) {
                log.makeAlert((Throwable)e, "Caught exception, ignoring so that schedule keeps going.", new Object[0]).emit();
            }
        }

        private void emitStat(CoordinatorStat stat, Map<Dimension, String> dimensionValues, long value) {
            ServiceMetricEvent.Builder eventBuilder = new ServiceMetricEvent.Builder().setDimension(Dimension.DUTY_GROUP.reportedName(), (Object)this.dutyGroupName);
            dimensionValues.forEach((dim, dimValue) -> eventBuilder.setDimension(dim.reportedName(), dimValue));
            DruidCoordinator.this.emitter.emit((ServiceEventBuilder)eventBuilder.setMetric(stat.getMetricName(), (Number)value));
        }

        Duration getPeriod() {
            return this.period;
        }

        public String toString() {
            return "DutiesRunnable{group='" + this.dutyGroupName + '\'' + '}';
        }
    }
}

