/*
 * Decompiled with CFR 0.152.
 */
package org.apache.druid.server.coordinator.duty;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Predicate;
import com.google.common.collect.ImmutableList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import javax.annotation.Nullable;
import org.apache.druid.common.guava.FutureUtils;
import org.apache.druid.error.InvalidInput;
import org.apache.druid.indexer.TaskStatusPlus;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.JodaUtils;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.metadata.SegmentsMetadataManager;
import org.apache.druid.rpc.indexing.OverlordClient;
import org.apache.druid.server.coordinator.DruidCoordinatorConfig;
import org.apache.druid.server.coordinator.DruidCoordinatorRuntimeParams;
import org.apache.druid.server.coordinator.duty.CoordinatorDuty;
import org.apache.druid.server.coordinator.duty.CoordinatorDutyUtils;
import org.apache.druid.server.coordinator.stats.CoordinatorRunStats;
import org.apache.druid.server.coordinator.stats.Stats;
import org.apache.druid.utils.CollectionUtils;
import org.joda.time.DateTime;
import org.joda.time.Duration;
import org.joda.time.Interval;
import org.joda.time.ReadableDuration;
import org.joda.time.ReadableInstant;

public class KillUnusedSegments
implements CoordinatorDuty {
    public static final String KILL_TASK_TYPE = "kill";
    public static final String TASK_ID_PREFIX = "coordinator-issued";
    public static final Predicate<TaskStatusPlus> IS_AUTO_KILL_TASK = status -> null != status && KILL_TASK_TYPE.equals(status.getType()) && status.getId().startsWith(TASK_ID_PREFIX);
    private static final Logger log = new Logger(KillUnusedSegments.class);
    private final Duration period;
    private final Duration durationToRetain;
    private final boolean ignoreDurationToRetain;
    private final int maxSegmentsToKill;
    private final Map<String, DateTime> datasourceToLastKillIntervalEnd;
    private DateTime lastKillTime;
    private final long bufferPeriod;
    private final SegmentsMetadataManager segmentsMetadataManager;
    private final OverlordClient overlordClient;

    public KillUnusedSegments(SegmentsMetadataManager segmentsMetadataManager, OverlordClient overlordClient, DruidCoordinatorConfig config) {
        if (config.getCoordinatorKillPeriod().getMillis() < config.getCoordinatorIndexingPeriod().getMillis()) {
            throw InvalidInput.exception((String)"druid.coordinator.kill.period[%s] must be >= druid.coordinator.period.indexingPeriod[%s]", (Object[])new Object[]{config.getCoordinatorKillPeriod(), config.getCoordinatorIndexingPeriod()});
        }
        if (config.getCoordinatorKillMaxSegments() < 0) {
            throw InvalidInput.exception((String)"druid.coordinator.kill.maxSegments[%s] is invalid. It must be a positive integer.", (Object[])new Object[]{config.getCoordinatorKillMaxSegments()});
        }
        this.period = config.getCoordinatorKillPeriod();
        this.ignoreDurationToRetain = config.getCoordinatorKillIgnoreDurationToRetain();
        this.durationToRetain = config.getCoordinatorKillDurationToRetain();
        if (this.ignoreDurationToRetain) {
            log.debug("druid.coordinator.kill.durationToRetain[%s] will be ignored when discovering segments to kill because druid.coordinator.kill.ignoreDurationToRetain is set to true.", new Object[]{this.durationToRetain});
        }
        this.bufferPeriod = config.getCoordinatorKillBufferPeriod().getMillis();
        this.maxSegmentsToKill = config.getCoordinatorKillMaxSegments();
        this.datasourceToLastKillIntervalEnd = new ConcurrentHashMap<String, DateTime>();
        log.info("Kill task scheduling enabled with period[%s], durationToRetain[%s], bufferPeriod[%s], maxSegmentsToKill[%s]", new Object[]{this.period, this.ignoreDurationToRetain ? "IGNORING" : this.durationToRetain, this.bufferPeriod, this.maxSegmentsToKill});
        this.segmentsMetadataManager = segmentsMetadataManager;
        this.overlordClient = overlordClient;
    }

    @Override
    public DruidCoordinatorRuntimeParams run(DruidCoordinatorRuntimeParams params) {
        if (!this.canDutyRun()) {
            log.debug("Skipping KillUnusedSegments until period[%s] has elapsed after lastKillTime[%s].", new Object[]{this.period, this.lastKillTime});
            return params;
        }
        return this.runInternal(params);
    }

    @VisibleForTesting
    DruidCoordinatorRuntimeParams runInternal(DruidCoordinatorRuntimeParams params) {
        TaskStats taskStats = new TaskStats();
        Set<String> dataSourcesToKill = params.getCoordinatorDynamicConfig().getSpecificDataSourcesToKillUnusedSegmentsIn();
        double killTaskSlotRatio = params.getCoordinatorDynamicConfig().getKillTaskSlotRatio();
        int maxKillTaskSlots = params.getCoordinatorDynamicConfig().getMaxKillTaskSlots();
        int killTaskCapacity = KillUnusedSegments.getKillTaskCapacity(CoordinatorDutyUtils.getTotalWorkerCapacity(this.overlordClient), killTaskSlotRatio, maxKillTaskSlots);
        int availableKillTaskSlots = this.getAvailableKillTaskSlots(killTaskCapacity, CoordinatorDutyUtils.getNumActiveTaskSlots(this.overlordClient, IS_AUTO_KILL_TASK).size());
        CoordinatorRunStats stats = params.getCoordinatorStats();
        taskStats.availableTaskSlots = availableKillTaskSlots;
        taskStats.maxSlots = killTaskCapacity;
        if (0 < availableKillTaskSlots) {
            if (CollectionUtils.isNullOrEmpty(dataSourcesToKill)) {
                dataSourcesToKill = this.segmentsMetadataManager.retrieveAllDataSourceNames();
            }
            log.debug("Killing unused segments for datasources[%s]", new Object[]{dataSourcesToKill});
            this.lastKillTime = DateTimes.nowUtc();
            taskStats.submittedTasks = this.killUnusedSegments(dataSourcesToKill, availableKillTaskSlots);
        }
        this.datasourceToLastKillIntervalEnd.keySet().retainAll(dataSourcesToKill);
        this.addStats(taskStats, stats);
        return params;
    }

    private void addStats(TaskStats taskStats, CoordinatorRunStats stats) {
        stats.add(Stats.Kill.AVAILABLE_SLOTS, taskStats.availableTaskSlots);
        stats.add(Stats.Kill.SUBMITTED_TASKS, taskStats.submittedTasks);
        stats.add(Stats.Kill.MAX_SLOTS, taskStats.maxSlots);
    }

    private int killUnusedSegments(Collection<String> dataSourcesToKill, int availableKillTaskSlots) {
        int submittedTasks = 0;
        if (0 < availableKillTaskSlots && !CollectionUtils.isNullOrEmpty(dataSourcesToKill)) {
            for (String dataSource : dataSourcesToKill) {
                if (submittedTasks >= availableKillTaskSlots) {
                    log.debug(StringUtils.format((String)"Submitted [%d] kill tasks and reached kill task slot limit [%d]. Will resume on the next coordinator cycle.", (Object[])new Object[]{submittedTasks, availableKillTaskSlots}), new Object[0]);
                    break;
                }
                DateTime maxUsedStatusLastUpdatedTime = DateTimes.nowUtc().minus(this.bufferPeriod);
                Interval intervalToKill = this.findIntervalForKill(dataSource, maxUsedStatusLastUpdatedTime);
                if (intervalToKill == null) {
                    this.datasourceToLastKillIntervalEnd.remove(dataSource);
                    continue;
                }
                try {
                    FutureUtils.getUnchecked(this.overlordClient.runKillTask(TASK_ID_PREFIX, dataSource, intervalToKill, this.maxSegmentsToKill, maxUsedStatusLastUpdatedTime), (boolean)true);
                    ++submittedTasks;
                    this.datasourceToLastKillIntervalEnd.put(dataSource, intervalToKill.getEnd());
                }
                catch (Exception ex) {
                    log.error((Throwable)ex, "Failed to submit kill task for dataSource[%s] in interval[%s]", new Object[]{dataSource, intervalToKill});
                    if (!Thread.currentThread().isInterrupted()) continue;
                    log.warn("Skipping kill task scheduling because thread is interrupted.", new Object[0]);
                    break;
                }
            }
        }
        if (log.isDebugEnabled()) {
            log.debug("Submitted [%d] kill tasks for [%d] datasources.%s", new Object[]{submittedTasks, dataSourcesToKill.size(), availableKillTaskSlots < dataSourcesToKill.size() ? StringUtils.format((String)" Datasources skipped: %s", (Object[])new Object[]{ImmutableList.copyOf(dataSourcesToKill).subList(submittedTasks, dataSourcesToKill.size())}) : ""});
        }
        return submittedTasks;
    }

    @Nullable
    private Interval findIntervalForKill(String dataSource, DateTime maxUsedStatusLastUpdatedTime) {
        DateTime maxEndTime = this.ignoreDurationToRetain ? DateTimes.COMPARE_DATE_AS_STRING_MAX : DateTimes.nowUtc().minus((ReadableDuration)this.durationToRetain);
        List<Interval> unusedSegmentIntervals = this.segmentsMetadataManager.getUnusedSegmentIntervals(dataSource, this.datasourceToLastKillIntervalEnd.get(dataSource), maxEndTime, this.maxSegmentsToKill, maxUsedStatusLastUpdatedTime);
        if (CollectionUtils.isNullOrEmpty(unusedSegmentIntervals)) {
            return null;
        }
        if (unusedSegmentIntervals.size() == 1) {
            return unusedSegmentIntervals.get(0);
        }
        return JodaUtils.umbrellaInterval(unusedSegmentIntervals);
    }

    private int getAvailableKillTaskSlots(int killTaskCapacity, int numActiveKillTasks) {
        return Math.max(0, killTaskCapacity - numActiveKillTasks);
    }

    private boolean canDutyRun() {
        return this.lastKillTime == null || !DateTimes.nowUtc().isBefore((ReadableInstant)this.lastKillTime.plus((ReadableDuration)this.period));
    }

    @VisibleForTesting
    static int getKillTaskCapacity(int totalWorkerCapacity, double killTaskSlotRatio, int maxKillTaskSlots) {
        return Math.min((int)((double)totalWorkerCapacity * Math.min(killTaskSlotRatio, 1.0)), maxKillTaskSlots);
    }

    @VisibleForTesting
    Map<String, DateTime> getDatasourceToLastKillIntervalEnd() {
        return this.datasourceToLastKillIntervalEnd;
    }

    static class TaskStats {
        int availableTaskSlots = 0;
        int maxSlots = 0;
        int submittedTasks = 0;

        TaskStats() {
        }
    }
}

