/*
 * Decompiled with CFR 0.152.
 */
package org.apache.druid.server.coordinator.duty;

import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import javax.annotation.Nullable;
import org.apache.druid.client.indexing.IndexingTotalWorkerCapacityInfo;
import org.apache.druid.client.indexing.IndexingWorker;
import org.apache.druid.client.indexing.IndexingWorkerInfo;
import org.apache.druid.indexer.RunnerTaskState;
import org.apache.druid.indexer.TaskLocation;
import org.apache.druid.indexer.TaskState;
import org.apache.druid.indexer.TaskStatusPlus;
import org.apache.druid.indexing.overlord.IndexerMetadataStorageCoordinator;
import org.apache.druid.java.util.common.CloseableIterators;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.JodaUtils;
import org.apache.druid.java.util.common.parsers.CloseableIterator;
import org.apache.druid.metadata.IndexerSQLMetadataStorageCoordinator;
import org.apache.druid.metadata.MetadataStorageTablesConfig;
import org.apache.druid.metadata.SQLMetadataConnector;
import org.apache.druid.metadata.SqlSegmentsMetadataManagerTestBase;
import org.apache.druid.metadata.TestDerbyConnector;
import org.apache.druid.rpc.indexing.NoopOverlordClient;
import org.apache.druid.rpc.indexing.OverlordClient;
import org.apache.druid.segment.TestHelper;
import org.apache.druid.segment.metadata.CentralizedDatasourceSchemaConfig;
import org.apache.druid.server.coordinator.CoordinatorDynamicConfig;
import org.apache.druid.server.coordinator.DruidCoordinatorRuntimeParams;
import org.apache.druid.server.coordinator.config.KillUnusedSegmentsConfig;
import org.apache.druid.server.coordinator.duty.KillUnusedSegments;
import org.apache.druid.server.coordinator.stats.CoordinatorRunStats;
import org.apache.druid.server.coordinator.stats.Dimension;
import org.apache.druid.server.coordinator.stats.RowKey;
import org.apache.druid.server.coordinator.stats.Stats;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.partition.NoneShardSpec;
import org.apache.druid.timeline.partition.ShardSpec;
import org.joda.time.DateTime;
import org.joda.time.Duration;
import org.joda.time.Interval;
import org.joda.time.Period;
import org.joda.time.ReadableInstant;
import org.joda.time.ReadablePeriod;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;

public class KillUnusedSegmentsTest {
    private static final DateTime NOW = DateTimes.nowUtc();
    private static final Interval YEAR_OLD = new Interval((ReadablePeriod)Period.days((int)1), (ReadableInstant)NOW.minusDays(365));
    private static final Interval MONTH_OLD = new Interval((ReadablePeriod)Period.days((int)1), (ReadableInstant)NOW.minusDays(30));
    private static final Interval FIFTEEN_DAY_OLD = new Interval((ReadablePeriod)Period.days((int)1), (ReadableInstant)NOW.minusDays(15));
    private static final Interval DAY_OLD = new Interval((ReadablePeriod)Period.days((int)1), (ReadableInstant)NOW.minusDays(1));
    private static final Interval HOUR_OLD = new Interval((ReadablePeriod)Period.days((int)1), (ReadableInstant)NOW.minusHours(1));
    private static final Interval NEXT_DAY = new Interval((ReadablePeriod)Period.days((int)1), (ReadableInstant)NOW.plusDays(1));
    private static final Interval NEXT_MONTH = new Interval((ReadablePeriod)Period.days((int)1), (ReadableInstant)NOW.plusDays(30));
    private static final String DS1 = "DS1";
    private static final String DS2 = "DS2";
    private static final String DS3 = "DS3";
    private static final RowKey DS1_STAT_KEY = RowKey.of((Dimension)Dimension.DATASOURCE, (String)"DS1");
    private static final RowKey DS2_STAT_KEY = RowKey.of((Dimension)Dimension.DATASOURCE, (String)"DS2");
    private static final RowKey DS3_STAT_KEY = RowKey.of((Dimension)Dimension.DATASOURCE, (String)"DS3");
    private static final String VERSION = "v1";
    private CoordinatorDynamicConfig.Builder dynamicConfigBuilder;
    private TestOverlordClient overlordClient;
    private KillUnusedSegmentsConfig.Builder configBuilder;
    private DruidCoordinatorRuntimeParams.Builder paramsBuilder;
    private KillUnusedSegments killDuty;
    @Rule
    public final TestDerbyConnector.DerbyConnectorRule derbyConnectorRule = new TestDerbyConnector.DerbyConnectorRule();
    private IndexerMetadataStorageCoordinator storageCoordinator;
    private SQLMetadataConnector connector;
    private MetadataStorageTablesConfig config;

    @Before
    public void setup() {
        this.connector = this.derbyConnectorRule.getConnector();
        this.storageCoordinator = new IndexerSQLMetadataStorageCoordinator(null, TestHelper.JSON_MAPPER, (MetadataStorageTablesConfig)this.derbyConnectorRule.metadataTablesConfigSupplier().get(), this.connector, null, CentralizedDatasourceSchemaConfig.create());
        this.config = (MetadataStorageTablesConfig)this.derbyConnectorRule.metadataTablesConfigSupplier().get();
        this.connector.createSegmentTable();
        this.overlordClient = new TestOverlordClient();
        this.configBuilder = KillUnusedSegmentsConfig.builder().withCleanupPeriod(Duration.standardSeconds((long)0L)).withDurationToRetain(Duration.standardHours((long)36L)).withMaxSegmentsToKill(Integer.valueOf(10)).withMaxIntervalToKill(Period.ZERO).withBufferPeriod(Duration.standardSeconds((long)1L));
        this.dynamicConfigBuilder = CoordinatorDynamicConfig.builder().withKillTaskSlotRatio(Double.valueOf(1.0));
        this.paramsBuilder = DruidCoordinatorRuntimeParams.builder().withUsedSegments(Collections.emptySet());
    }

    @Test
    public void testKillWithDefaultCoordinatorConfig() {
        this.configBuilder = KillUnusedSegmentsConfig.builder();
        this.dynamicConfigBuilder = CoordinatorDynamicConfig.builder();
        DateTime sixtyDaysAgo = NOW.minusDays(60);
        this.createAndAddUnusedSegment(DS1, YEAR_OLD, VERSION, sixtyDaysAgo);
        this.createAndAddUnusedSegment(DS1, MONTH_OLD, VERSION, sixtyDaysAgo);
        this.createAndAddUnusedSegment(DS1, DAY_OLD, VERSION, sixtyDaysAgo);
        this.createAndAddUnusedSegment(DS1, HOUR_OLD, VERSION, sixtyDaysAgo);
        this.createAndAddUnusedSegment(DS1, NEXT_DAY, VERSION, sixtyDaysAgo);
        this.createAndAddUnusedSegment(DS1, NEXT_MONTH, VERSION, sixtyDaysAgo);
        this.createAndAddUnusedSegment(DS1, Intervals.ETERNITY, VERSION, sixtyDaysAgo);
        this.initDuty();
        CoordinatorRunStats stats = this.runDutyAndGetStats();
        Assert.assertEquals((long)1L, (long)stats.get(Stats.Kill.AVAILABLE_SLOTS));
        Assert.assertEquals((long)1L, (long)stats.get(Stats.Kill.SUBMITTED_TASKS));
        Assert.assertEquals((long)1L, (long)stats.get(Stats.Kill.MAX_SLOTS));
        Assert.assertEquals((long)1L, (long)stats.get(Stats.Kill.ELIGIBLE_UNUSED_SEGMENTS, DS1_STAT_KEY));
        this.validateLastKillStateAndReset(DS1, Intervals.ETERNITY);
    }

    @Test
    public void testKillWithDefaultCoordinatorConfigPlusZeroMaxIntervalToKill() {
        this.configBuilder = KillUnusedSegmentsConfig.builder().withMaxIntervalToKill(Period.ZERO);
        this.dynamicConfigBuilder = CoordinatorDynamicConfig.builder();
        DateTime sixtyDaysAgo = NOW.minusDays(60);
        this.createAndAddUnusedSegment(DS1, YEAR_OLD, VERSION, sixtyDaysAgo);
        this.createAndAddUnusedSegment(DS1, MONTH_OLD, VERSION, sixtyDaysAgo);
        this.createAndAddUnusedSegment(DS1, DAY_OLD, VERSION, sixtyDaysAgo);
        this.createAndAddUnusedSegment(DS1, HOUR_OLD, VERSION, sixtyDaysAgo);
        this.createAndAddUnusedSegment(DS1, NEXT_DAY, VERSION, sixtyDaysAgo);
        this.createAndAddUnusedSegment(DS1, NEXT_MONTH, VERSION, sixtyDaysAgo);
        this.createAndAddUnusedSegment(DS1, Intervals.ETERNITY, VERSION, sixtyDaysAgo);
        this.initDuty();
        CoordinatorRunStats stats = this.runDutyAndGetStats();
        Assert.assertEquals((long)1L, (long)stats.get(Stats.Kill.AVAILABLE_SLOTS));
        Assert.assertEquals((long)1L, (long)stats.get(Stats.Kill.SUBMITTED_TASKS));
        Assert.assertEquals((long)1L, (long)stats.get(Stats.Kill.MAX_SLOTS));
        Assert.assertEquals((long)2L, (long)stats.get(Stats.Kill.ELIGIBLE_UNUSED_SEGMENTS, DS1_STAT_KEY));
        this.validateLastKillStateAndReset(DS1, Intervals.ETERNITY);
    }

    @Test
    public void testKillWithNoDatasources() {
        this.initDuty();
        CoordinatorRunStats stats = this.runDutyAndGetStats();
        Assert.assertEquals((long)10L, (long)stats.get(Stats.Kill.AVAILABLE_SLOTS));
        Assert.assertEquals((long)0L, (long)stats.get(Stats.Kill.SUBMITTED_TASKS));
        Assert.assertEquals((long)10L, (long)stats.get(Stats.Kill.MAX_SLOTS));
    }

    @Test
    public void testKillWithMultipleDatasources() {
        this.configBuilder.withIgnoreDurationToRetain(Boolean.valueOf(true)).withMaxSegmentsToKill(Integer.valueOf(2));
        this.createAndAddUnusedSegment(DS1, YEAR_OLD, VERSION, NOW.minusDays(1));
        this.createAndAddUnusedSegment(DS1, MONTH_OLD, VERSION, NOW.minusDays(1));
        this.createAndAddUnusedSegment(DS1, DAY_OLD, VERSION, NOW.minusDays(1));
        this.createAndAddUnusedSegment(DS1, NEXT_DAY, VERSION, NOW.minusDays(1));
        this.createAndAddUnusedSegment(DS1, NEXT_MONTH, VERSION, NOW.minusDays(1));
        this.createAndAddUnusedSegment(DS2, YEAR_OLD, VERSION, NOW.minusDays(1));
        this.createAndAddUnusedSegment(DS2, DAY_OLD, VERSION, NOW.minusDays(1));
        this.createAndAddUnusedSegment(DS2, NEXT_DAY, VERSION, NOW.minusDays(1));
        this.initDuty();
        CoordinatorRunStats stats = this.runDutyAndGetStats();
        Assert.assertEquals((long)10L, (long)stats.get(Stats.Kill.AVAILABLE_SLOTS));
        Assert.assertEquals((long)2L, (long)stats.get(Stats.Kill.SUBMITTED_TASKS));
        Assert.assertEquals((long)10L, (long)stats.get(Stats.Kill.MAX_SLOTS));
        Assert.assertEquals((long)2L, (long)stats.get(Stats.Kill.ELIGIBLE_UNUSED_SEGMENTS, DS1_STAT_KEY));
        Assert.assertEquals((long)2L, (long)stats.get(Stats.Kill.ELIGIBLE_UNUSED_SEGMENTS, DS2_STAT_KEY));
        this.validateLastKillStateAndReset(DS1, new Interval((ReadableInstant)YEAR_OLD.getStart(), (ReadableInstant)MONTH_OLD.getEnd()));
        this.validateLastKillStateAndReset(DS2, new Interval((ReadableInstant)YEAR_OLD.getStart(), (ReadableInstant)DAY_OLD.getEnd()));
        stats = this.runDutyAndGetStats();
        Assert.assertEquals((long)20L, (long)stats.get(Stats.Kill.AVAILABLE_SLOTS));
        Assert.assertEquals((long)4L, (long)stats.get(Stats.Kill.SUBMITTED_TASKS));
        Assert.assertEquals((long)20L, (long)stats.get(Stats.Kill.MAX_SLOTS));
        Assert.assertEquals((long)4L, (long)stats.get(Stats.Kill.ELIGIBLE_UNUSED_SEGMENTS, DS1_STAT_KEY));
        Assert.assertEquals((long)3L, (long)stats.get(Stats.Kill.ELIGIBLE_UNUSED_SEGMENTS, DS2_STAT_KEY));
        this.validateLastKillStateAndReset(DS1, new Interval((ReadableInstant)DAY_OLD.getStart(), (ReadableInstant)NEXT_DAY.getEnd()));
        this.validateLastKillStateAndReset(DS2, NEXT_DAY);
        stats = this.runDutyAndGetStats();
        Assert.assertEquals((long)30L, (long)stats.get(Stats.Kill.AVAILABLE_SLOTS));
        Assert.assertEquals((long)5L, (long)stats.get(Stats.Kill.SUBMITTED_TASKS));
        Assert.assertEquals((long)30L, (long)stats.get(Stats.Kill.MAX_SLOTS));
        Assert.assertEquals((long)5L, (long)stats.get(Stats.Kill.ELIGIBLE_UNUSED_SEGMENTS, DS1_STAT_KEY));
        Assert.assertEquals((long)3L, (long)stats.get(Stats.Kill.ELIGIBLE_UNUSED_SEGMENTS, DS2_STAT_KEY));
        this.validateLastKillStateAndReset(DS1, NEXT_MONTH);
        this.validateLastKillStateAndReset(DS2, null);
    }

    @Test
    public void testRoundRobinKillMultipleDatasources() {
        this.configBuilder.withIgnoreDurationToRetain(Boolean.valueOf(true)).withMaxSegmentsToKill(Integer.valueOf(2));
        this.dynamicConfigBuilder.withMaxKillTaskSlots(Integer.valueOf(2));
        this.createAndAddUnusedSegment(DS1, YEAR_OLD, VERSION, NOW.minusDays(1));
        this.createAndAddUnusedSegment(DS1, MONTH_OLD, VERSION, NOW.minusDays(1));
        this.createAndAddUnusedSegment(DS1, DAY_OLD, VERSION, NOW.minusDays(1));
        this.createAndAddUnusedSegment(DS1, NEXT_DAY, VERSION, NOW.minusDays(1));
        this.createAndAddUnusedSegment(DS1, NEXT_MONTH, VERSION, NOW.minusDays(1));
        this.createAndAddUnusedSegment(DS2, YEAR_OLD, VERSION, NOW.minusDays(1));
        this.createAndAddUnusedSegment(DS2, DAY_OLD, VERSION, NOW.minusDays(1));
        this.createAndAddUnusedSegment(DS2, NEXT_DAY, VERSION, NOW.minusDays(1));
        this.createAndAddUnusedSegment(DS3, YEAR_OLD, VERSION, NOW.minusDays(1));
        this.createAndAddUnusedSegment(DS3, DAY_OLD, VERSION, NOW.minusDays(1));
        this.createAndAddUnusedSegment(DS3, NEXT_DAY, VERSION, NOW.minusDays(1));
        this.initDuty();
        CoordinatorRunStats stats = this.runDutyAndGetStats();
        Assert.assertEquals((long)2L, (long)stats.get(Stats.Kill.AVAILABLE_SLOTS));
        Assert.assertEquals((long)2L, (long)stats.get(Stats.Kill.SUBMITTED_TASKS));
        Assert.assertEquals((long)2L, (long)stats.get(Stats.Kill.MAX_SLOTS));
        Assert.assertEquals((long)2L, (long)stats.get(Stats.Kill.ELIGIBLE_UNUSED_SEGMENTS, DS1_STAT_KEY));
        Assert.assertEquals((long)2L, (long)stats.get(Stats.Kill.ELIGIBLE_UNUSED_SEGMENTS, DS2_STAT_KEY));
        stats = this.runDutyAndGetStats();
        Assert.assertEquals((long)4L, (long)stats.get(Stats.Kill.AVAILABLE_SLOTS));
        Assert.assertEquals((long)4L, (long)stats.get(Stats.Kill.SUBMITTED_TASKS));
        Assert.assertEquals((long)4L, (long)stats.get(Stats.Kill.MAX_SLOTS));
        Assert.assertEquals((long)2L, (long)stats.get(Stats.Kill.ELIGIBLE_UNUSED_SEGMENTS, DS3_STAT_KEY));
        Assert.assertEquals((long)4L, (long)stats.get(Stats.Kill.ELIGIBLE_UNUSED_SEGMENTS, DS1_STAT_KEY));
        stats = this.runDutyAndGetStats();
        Assert.assertEquals((long)6L, (long)stats.get(Stats.Kill.AVAILABLE_SLOTS));
        Assert.assertEquals((long)6L, (long)stats.get(Stats.Kill.SUBMITTED_TASKS));
        Assert.assertEquals((long)6L, (long)stats.get(Stats.Kill.MAX_SLOTS));
        Assert.assertEquals((long)3L, (long)stats.get(Stats.Kill.ELIGIBLE_UNUSED_SEGMENTS, DS2_STAT_KEY));
        Assert.assertEquals((long)3L, (long)stats.get(Stats.Kill.ELIGIBLE_UNUSED_SEGMENTS, DS3_STAT_KEY));
        stats = this.runDutyAndGetStats();
        Assert.assertEquals((long)8L, (long)stats.get(Stats.Kill.AVAILABLE_SLOTS));
        Assert.assertEquals((long)7L, (long)stats.get(Stats.Kill.SUBMITTED_TASKS));
        Assert.assertEquals((long)8L, (long)stats.get(Stats.Kill.MAX_SLOTS));
        Assert.assertEquals((long)5L, (long)stats.get(Stats.Kill.ELIGIBLE_UNUSED_SEGMENTS, DS1_STAT_KEY));
        Assert.assertEquals((long)3L, (long)stats.get(Stats.Kill.ELIGIBLE_UNUSED_SEGMENTS, DS2_STAT_KEY));
    }

    @Test
    public void testRoundRobinKillWhenDatasourcesChange() {
        this.configBuilder.withIgnoreDurationToRetain(Boolean.valueOf(true)).withMaxSegmentsToKill(Integer.valueOf(2));
        this.dynamicConfigBuilder.withMaxKillTaskSlots(Integer.valueOf(1));
        this.createAndAddUnusedSegment(DS1, YEAR_OLD, VERSION, NOW.minusDays(1));
        this.createAndAddUnusedSegment(DS1, MONTH_OLD, VERSION, NOW.minusDays(1));
        this.createAndAddUnusedSegment(DS1, DAY_OLD, VERSION, NOW.minusDays(1));
        this.initDuty();
        CoordinatorRunStats stats = this.runDutyAndGetStats();
        Assert.assertEquals((long)1L, (long)stats.get(Stats.Kill.AVAILABLE_SLOTS));
        Assert.assertEquals((long)1L, (long)stats.get(Stats.Kill.SUBMITTED_TASKS));
        Assert.assertEquals((long)1L, (long)stats.get(Stats.Kill.MAX_SLOTS));
        Assert.assertEquals((long)2L, (long)stats.get(Stats.Kill.ELIGIBLE_UNUSED_SEGMENTS, DS1_STAT_KEY));
        this.validateLastKillStateAndReset(DS1, new Interval((ReadableInstant)YEAR_OLD.getStart(), (ReadableInstant)MONTH_OLD.getEnd()));
        this.createAndAddUnusedSegment(DS2, YEAR_OLD, VERSION, NOW.minusDays(1));
        this.createAndAddUnusedSegment(DS2, DAY_OLD, VERSION, NOW.minusDays(1));
        this.createAndAddUnusedSegment(DS2, NEXT_DAY, VERSION, NOW.minusDays(1));
        stats = this.runDutyAndGetStats();
        Assert.assertEquals((long)2L, (long)stats.get(Stats.Kill.AVAILABLE_SLOTS));
        Assert.assertEquals((long)2L, (long)stats.get(Stats.Kill.SUBMITTED_TASKS));
        Assert.assertEquals((long)2L, (long)stats.get(Stats.Kill.MAX_SLOTS));
        Assert.assertEquals((long)2L, (long)stats.get(Stats.Kill.ELIGIBLE_UNUSED_SEGMENTS, DS2_STAT_KEY));
        stats = this.runDutyAndGetStats();
        Assert.assertEquals((long)3L, (long)stats.get(Stats.Kill.AVAILABLE_SLOTS));
        Assert.assertEquals((long)3L, (long)stats.get(Stats.Kill.SUBMITTED_TASKS));
        Assert.assertEquals((long)3L, (long)stats.get(Stats.Kill.MAX_SLOTS));
        Assert.assertEquals((long)3L, (long)stats.get(Stats.Kill.ELIGIBLE_UNUSED_SEGMENTS, DS1_STAT_KEY));
        stats = this.runDutyAndGetStats();
        Assert.assertEquals((long)4L, (long)stats.get(Stats.Kill.AVAILABLE_SLOTS));
        Assert.assertEquals((long)4L, (long)stats.get(Stats.Kill.SUBMITTED_TASKS));
        Assert.assertEquals((long)4L, (long)stats.get(Stats.Kill.MAX_SLOTS));
        Assert.assertEquals((long)3L, (long)stats.get(Stats.Kill.ELIGIBLE_UNUSED_SEGMENTS, DS2_STAT_KEY));
    }

    @Test
    public void testKillSingleDatasourceMultipleRuns() {
        this.configBuilder.withIgnoreDurationToRetain(Boolean.valueOf(true)).withMaxSegmentsToKill(Integer.valueOf(2));
        this.dynamicConfigBuilder.withMaxKillTaskSlots(Integer.valueOf(2));
        this.createAndAddUnusedSegment(DS1, YEAR_OLD, VERSION, NOW.minusDays(1));
        this.createAndAddUnusedSegment(DS1, MONTH_OLD, VERSION, NOW.minusDays(1));
        this.createAndAddUnusedSegment(DS1, DAY_OLD, VERSION, NOW.minusDays(1));
        this.initDuty();
        CoordinatorRunStats stats = this.runDutyAndGetStats();
        Assert.assertEquals((long)2L, (long)stats.get(Stats.Kill.AVAILABLE_SLOTS));
        Assert.assertEquals((long)1L, (long)stats.get(Stats.Kill.SUBMITTED_TASKS));
        Assert.assertEquals((long)2L, (long)stats.get(Stats.Kill.MAX_SLOTS));
        Assert.assertEquals((long)2L, (long)stats.get(Stats.Kill.ELIGIBLE_UNUSED_SEGMENTS, DS1_STAT_KEY));
        stats = this.runDutyAndGetStats();
        Assert.assertEquals((long)4L, (long)stats.get(Stats.Kill.AVAILABLE_SLOTS));
        Assert.assertEquals((long)2L, (long)stats.get(Stats.Kill.SUBMITTED_TASKS));
        Assert.assertEquals((long)4L, (long)stats.get(Stats.Kill.MAX_SLOTS));
        Assert.assertEquals((long)3L, (long)stats.get(Stats.Kill.ELIGIBLE_UNUSED_SEGMENTS, DS1_STAT_KEY));
        stats = this.runDutyAndGetStats();
        Assert.assertEquals((long)6L, (long)stats.get(Stats.Kill.AVAILABLE_SLOTS));
        Assert.assertEquals((long)2L, (long)stats.get(Stats.Kill.SUBMITTED_TASKS));
        Assert.assertEquals((long)6L, (long)stats.get(Stats.Kill.MAX_SLOTS));
        Assert.assertEquals((long)3L, (long)stats.get(Stats.Kill.ELIGIBLE_UNUSED_SEGMENTS, DS1_STAT_KEY));
    }

    @Test
    public void testKillWithDifferentLastUpdatedTimesInWideInterval() {
        this.configBuilder.withIgnoreDurationToRetain(Boolean.valueOf(true)).withBufferPeriod(Duration.standardDays((long)3L));
        this.createAndAddUnusedSegment(DS1, YEAR_OLD, VERSION, NOW.minusDays(10));
        this.createAndAddUnusedSegment(DS1, MONTH_OLD, VERSION, NOW.minusDays(10));
        this.createAndAddUnusedSegment(DS1, DAY_OLD, VERSION, NOW.minusDays(2));
        this.createAndAddUnusedSegment(DS1, HOUR_OLD, VERSION, NOW.minusDays(2));
        this.createAndAddUnusedSegment(DS1, NEXT_DAY, VERSION, NOW.minusDays(10));
        this.createAndAddUnusedSegment(DS1, NEXT_MONTH, VERSION, NOW.minusDays(10));
        this.initDuty();
        CoordinatorRunStats stats = this.runDutyAndGetStats();
        Assert.assertEquals((long)10L, (long)stats.get(Stats.Kill.AVAILABLE_SLOTS));
        Assert.assertEquals((long)1L, (long)stats.get(Stats.Kill.SUBMITTED_TASKS));
        Assert.assertEquals((long)10L, (long)stats.get(Stats.Kill.MAX_SLOTS));
        Assert.assertEquals((long)4L, (long)stats.get(Stats.Kill.ELIGIBLE_UNUSED_SEGMENTS, DS1_STAT_KEY));
        this.validateLastKillStateAndReset(DS1, new Interval((ReadableInstant)YEAR_OLD.getStart(), (ReadableInstant)NEXT_MONTH.getEnd()));
    }

    @Test
    public void testAddOlderSegmentsAfterInitialRun() {
        this.configBuilder.withIgnoreDurationToRetain(Boolean.valueOf(true)).withMaxSegmentsToKill(Integer.valueOf(2));
        this.createAndAddUnusedSegment(DS1, DAY_OLD, VERSION, NOW.minusDays(1));
        this.createAndAddUnusedSegment(DS1, NEXT_DAY, VERSION, NOW.minusDays(1));
        this.createAndAddUnusedSegment(DS1, NEXT_MONTH, VERSION, NOW.minusDays(1));
        this.initDuty();
        CoordinatorRunStats stats = this.runDutyAndGetStats();
        Assert.assertEquals((long)10L, (long)stats.get(Stats.Kill.AVAILABLE_SLOTS));
        Assert.assertEquals((long)1L, (long)stats.get(Stats.Kill.SUBMITTED_TASKS));
        Assert.assertEquals((long)10L, (long)stats.get(Stats.Kill.MAX_SLOTS));
        Assert.assertEquals((long)2L, (long)stats.get(Stats.Kill.ELIGIBLE_UNUSED_SEGMENTS, DS1_STAT_KEY));
        this.validateLastKillStateAndReset(DS1, new Interval((ReadableInstant)DAY_OLD.getStart(), (ReadableInstant)NEXT_DAY.getEnd()));
        this.createAndAddUnusedSegment(DS1, YEAR_OLD, VERSION, NOW.minusDays(1));
        this.createAndAddUnusedSegment(DS1, MONTH_OLD, VERSION, NOW.minusDays(1));
        stats = this.runDutyAndGetStats();
        Assert.assertEquals((long)20L, (long)stats.get(Stats.Kill.AVAILABLE_SLOTS));
        Assert.assertEquals((long)2L, (long)stats.get(Stats.Kill.SUBMITTED_TASKS));
        Assert.assertEquals((long)20L, (long)stats.get(Stats.Kill.MAX_SLOTS));
        Assert.assertEquals((long)3L, (long)stats.get(Stats.Kill.ELIGIBLE_UNUSED_SEGMENTS, DS1_STAT_KEY));
        this.validateLastKillStateAndReset(DS1, NEXT_MONTH);
        stats = this.runDutyAndGetStats();
        Assert.assertEquals((long)30L, (long)stats.get(Stats.Kill.AVAILABLE_SLOTS));
        Assert.assertEquals((long)2L, (long)stats.get(Stats.Kill.SUBMITTED_TASKS));
        Assert.assertEquals((long)30L, (long)stats.get(Stats.Kill.MAX_SLOTS));
        Assert.assertEquals((long)3L, (long)stats.get(Stats.Kill.ELIGIBLE_UNUSED_SEGMENTS, DS1_STAT_KEY));
        this.validateLastKillStateAndReset(DS1, null);
        stats = this.runDutyAndGetStats();
        Assert.assertEquals((long)40L, (long)stats.get(Stats.Kill.AVAILABLE_SLOTS));
        Assert.assertEquals((long)3L, (long)stats.get(Stats.Kill.SUBMITTED_TASKS));
        Assert.assertEquals((long)40L, (long)stats.get(Stats.Kill.MAX_SLOTS));
        Assert.assertEquals((long)5L, (long)stats.get(Stats.Kill.ELIGIBLE_UNUSED_SEGMENTS, DS1_STAT_KEY));
        this.validateLastKillStateAndReset(DS1, new Interval((ReadableInstant)YEAR_OLD.getStart(), (ReadableInstant)MONTH_OLD.getEnd()));
    }

    @Test
    public void testDatasoucesAllowList() {
        this.dynamicConfigBuilder.withSpecificDataSourcesToKillUnusedSegmentsIn((Set)ImmutableSet.of((Object)DS2, (Object)DS3));
        this.paramsBuilder.withDynamicConfigs(this.dynamicConfigBuilder.build());
        this.createAndAddUnusedSegment(DS1, YEAR_OLD, VERSION, NOW.minusDays(1));
        this.createAndAddUnusedSegment(DS2, YEAR_OLD, VERSION, NOW.minusDays(1));
        this.createAndAddUnusedSegment(DS3, MONTH_OLD, VERSION, NOW.minusDays(1));
        this.initDuty();
        CoordinatorRunStats stats = this.runDutyAndGetStats();
        Assert.assertEquals((long)10L, (long)stats.get(Stats.Kill.AVAILABLE_SLOTS));
        Assert.assertEquals((long)2L, (long)stats.get(Stats.Kill.SUBMITTED_TASKS));
        Assert.assertEquals((long)10L, (long)stats.get(Stats.Kill.MAX_SLOTS));
        Assert.assertEquals((long)0L, (long)stats.get(Stats.Kill.ELIGIBLE_UNUSED_SEGMENTS, DS1_STAT_KEY));
        Assert.assertEquals((long)1L, (long)stats.get(Stats.Kill.ELIGIBLE_UNUSED_SEGMENTS, DS2_STAT_KEY));
        Assert.assertEquals((long)1L, (long)stats.get(Stats.Kill.ELIGIBLE_UNUSED_SEGMENTS, DS3_STAT_KEY));
        this.validateLastKillStateAndReset(DS1, null);
        this.validateLastKillStateAndReset(DS2, YEAR_OLD);
        this.validateLastKillStateAndReset(DS3, MONTH_OLD);
    }

    @Test
    public void testNegativeDurationToRetain() {
        this.configBuilder.withDurationToRetain(Duration.standardHours((long)36L).negated());
        this.createAndAddUnusedSegment(DS1, YEAR_OLD, VERSION, NOW.minusDays(10));
        this.createAndAddUnusedSegment(DS1, MONTH_OLD, VERSION, NOW.minusDays(10));
        this.createAndAddUnusedSegment(DS1, DAY_OLD, VERSION, NOW.minusDays(2));
        this.createAndAddUnusedSegment(DS1, HOUR_OLD, VERSION, NOW.minusDays(2));
        this.createAndAddUnusedSegment(DS1, NEXT_DAY, VERSION, NOW.minusDays(10));
        this.createAndAddUnusedSegment(DS1, NEXT_MONTH, VERSION, NOW.minusDays(10));
        this.initDuty();
        CoordinatorRunStats stats = this.runDutyAndGetStats();
        Assert.assertEquals((long)10L, (long)stats.get(Stats.Kill.AVAILABLE_SLOTS));
        Assert.assertEquals((long)1L, (long)stats.get(Stats.Kill.SUBMITTED_TASKS));
        Assert.assertEquals((long)10L, (long)stats.get(Stats.Kill.MAX_SLOTS));
        Assert.assertEquals((long)5L, (long)stats.get(Stats.Kill.ELIGIBLE_UNUSED_SEGMENTS, DS1_STAT_KEY));
        this.validateLastKillStateAndReset(DS1, new Interval((ReadableInstant)YEAR_OLD.getStart(), (ReadableInstant)NEXT_DAY.getEnd()));
    }

    @Test
    public void testIgnoreDurationToRetain() {
        this.configBuilder.withIgnoreDurationToRetain(Boolean.valueOf(true));
        this.createAndAddUnusedSegment(DS1, YEAR_OLD, VERSION, NOW.minusDays(10));
        this.createAndAddUnusedSegment(DS1, MONTH_OLD, VERSION, NOW.minusDays(10));
        this.createAndAddUnusedSegment(DS1, DAY_OLD, VERSION, NOW.minusDays(2));
        this.createAndAddUnusedSegment(DS1, HOUR_OLD, VERSION, NOW.minusDays(2));
        this.createAndAddUnusedSegment(DS1, NEXT_DAY, VERSION, NOW.minusDays(10));
        this.createAndAddUnusedSegment(DS1, NEXT_MONTH, VERSION, NOW.minusDays(10));
        this.initDuty();
        CoordinatorRunStats stats = this.runDutyAndGetStats();
        Assert.assertEquals((long)10L, (long)stats.get(Stats.Kill.AVAILABLE_SLOTS));
        Assert.assertEquals((long)1L, (long)stats.get(Stats.Kill.SUBMITTED_TASKS));
        Assert.assertEquals((long)10L, (long)stats.get(Stats.Kill.MAX_SLOTS));
        Assert.assertEquals((long)6L, (long)stats.get(Stats.Kill.ELIGIBLE_UNUSED_SEGMENTS, DS1_STAT_KEY));
        this.validateLastKillStateAndReset(DS1, new Interval((ReadableInstant)YEAR_OLD.getStart(), (ReadableInstant)NEXT_MONTH.getEnd()));
    }

    @Test
    public void testLowerMaxSegmentsToKill() {
        this.configBuilder.withMaxSegmentsToKill(Integer.valueOf(1));
        this.createAndAddUnusedSegment(DS1, YEAR_OLD, VERSION, NOW.minusDays(10));
        this.createAndAddUnusedSegment(DS1, MONTH_OLD, VERSION, NOW.minusDays(10));
        this.createAndAddUnusedSegment(DS1, DAY_OLD, VERSION, NOW.minusDays(2));
        this.initDuty();
        CoordinatorRunStats stats = this.runDutyAndGetStats();
        Assert.assertEquals((long)10L, (long)stats.get(Stats.Kill.AVAILABLE_SLOTS));
        Assert.assertEquals((long)1L, (long)stats.get(Stats.Kill.SUBMITTED_TASKS));
        Assert.assertEquals((long)10L, (long)stats.get(Stats.Kill.MAX_SLOTS));
        Assert.assertEquals((long)1L, (long)stats.get(Stats.Kill.ELIGIBLE_UNUSED_SEGMENTS, DS1_STAT_KEY));
        this.validateLastKillStateAndReset(DS1, YEAR_OLD);
    }

    @Test
    public void testLowerMaxIntervalToKill() {
        this.configBuilder.withMaxIntervalToKill(Period.hours((int)1));
        this.createAndAddUnusedSegment(DS1, YEAR_OLD, VERSION, NOW.minusDays(10));
        this.createAndAddUnusedSegment(DS1, MONTH_OLD, VERSION, NOW.minusDays(10));
        this.createAndAddUnusedSegment(DS1, DAY_OLD, VERSION, NOW.minusDays(2));
        this.initDuty();
        CoordinatorRunStats stats = this.runDutyAndGetStats();
        Assert.assertEquals((long)10L, (long)stats.get(Stats.Kill.AVAILABLE_SLOTS));
        Assert.assertEquals((long)1L, (long)stats.get(Stats.Kill.SUBMITTED_TASKS));
        Assert.assertEquals((long)10L, (long)stats.get(Stats.Kill.MAX_SLOTS));
        Assert.assertEquals((long)1L, (long)stats.get(Stats.Kill.ELIGIBLE_UNUSED_SEGMENTS, DS1_STAT_KEY));
        this.validateLastKillStateAndReset(DS1, YEAR_OLD);
    }

    @Test
    public void testMaxIntervalToKillOverridesDurationToRetain() {
        this.configBuilder.withDurationToRetain(Period.hours((int)6).toStandardDuration()).withMaxIntervalToKill(Period.days((int)20));
        this.initDuty();
        this.createAndAddUnusedSegment(DS1, MONTH_OLD, VERSION, NOW.minusDays(29));
        CoordinatorRunStats newDatasourceStats = this.runDutyAndGetStats();
        Assert.assertEquals((long)1L, (long)newDatasourceStats.get(Stats.Kill.ELIGIBLE_UNUSED_SEGMENTS, DS1_STAT_KEY));
        this.validateLastKillStateAndReset(DS1, MONTH_OLD);
        this.createAndAddUnusedSegment(DS1, FIFTEEN_DAY_OLD, VERSION, NOW.minusDays(14));
        this.createAndAddUnusedSegment(DS1, DAY_OLD, VERSION, NOW.minusHours(2));
        CoordinatorRunStats oldDatasourceStats = this.runDutyAndGetStats();
        Assert.assertEquals((long)2L, (long)oldDatasourceStats.get(Stats.Kill.ELIGIBLE_UNUSED_SEGMENTS, DS1_STAT_KEY));
        this.validateLastKillStateAndReset(DS1, FIFTEEN_DAY_OLD);
    }

    @Test
    public void testDurationToRetainOverridesMaxIntervalToKill() {
        this.configBuilder.withDurationToRetain(Period.days((int)20).toStandardDuration()).withMaxIntervalToKill(Period.days((int)350));
        this.initDuty();
        this.createAndAddUnusedSegment(DS1, YEAR_OLD, VERSION, NOW.minusDays(29));
        CoordinatorRunStats newDatasourceStats = this.runDutyAndGetStats();
        Assert.assertEquals((long)1L, (long)newDatasourceStats.get(Stats.Kill.ELIGIBLE_UNUSED_SEGMENTS, DS1_STAT_KEY));
        this.validateLastKillStateAndReset(DS1, YEAR_OLD);
        this.createAndAddUnusedSegment(DS1, MONTH_OLD, VERSION, NOW.minusDays(29));
        this.createAndAddUnusedSegment(DS1, FIFTEEN_DAY_OLD, VERSION, NOW.minusDays(14));
        CoordinatorRunStats oldDatasourceStats = this.runDutyAndGetStats();
        Assert.assertEquals((long)2L, (long)oldDatasourceStats.get(Stats.Kill.ELIGIBLE_UNUSED_SEGMENTS, DS1_STAT_KEY));
        this.validateLastKillStateAndReset(DS1, MONTH_OLD);
    }

    @Test
    public void testHigherMaxIntervalToKill() {
        this.configBuilder.withMaxIntervalToKill(Period.days((int)360));
        this.createAndAddUnusedSegment(DS1, YEAR_OLD, VERSION, NOW.minusDays(10));
        this.createAndAddUnusedSegment(DS1, MONTH_OLD, VERSION, NOW.minusDays(10));
        this.createAndAddUnusedSegment(DS1, DAY_OLD, VERSION, NOW.minusDays(2));
        this.initDuty();
        CoordinatorRunStats stats = this.runDutyAndGetStats();
        Assert.assertEquals((long)10L, (long)stats.get(Stats.Kill.AVAILABLE_SLOTS));
        Assert.assertEquals((long)1L, (long)stats.get(Stats.Kill.SUBMITTED_TASKS));
        Assert.assertEquals((long)10L, (long)stats.get(Stats.Kill.MAX_SLOTS));
        Assert.assertEquals((long)2L, (long)stats.get(Stats.Kill.ELIGIBLE_UNUSED_SEGMENTS, DS1_STAT_KEY));
        this.validateLastKillStateAndReset(DS1, JodaUtils.umbrellaInterval(Arrays.asList(YEAR_OLD, MONTH_OLD)));
    }

    @Test
    public void testKillDatasourceWithNoUnusedSegmentsInInitialRun() {
        this.configBuilder.withMaxSegmentsToKill(Integer.valueOf(1));
        this.createAndAddUsedSegment(DS1, YEAR_OLD, VERSION);
        this.initDuty();
        CoordinatorRunStats stats = this.runDutyAndGetStats();
        Assert.assertEquals((long)10L, (long)stats.get(Stats.Kill.AVAILABLE_SLOTS));
        Assert.assertEquals((long)0L, (long)stats.get(Stats.Kill.SUBMITTED_TASKS));
        Assert.assertEquals((long)10L, (long)stats.get(Stats.Kill.MAX_SLOTS));
        Assert.assertEquals((long)0L, (long)stats.get(Stats.Kill.ELIGIBLE_UNUSED_SEGMENTS, DS1_STAT_KEY));
        this.createAndAddUnusedSegment(DS1, YEAR_OLD, VERSION, NOW.minusDays(10));
        this.createAndAddUnusedSegment(DS1, MONTH_OLD, VERSION, NOW.minusDays(10));
        this.createAndAddUnusedSegment(DS1, DAY_OLD, VERSION, NOW.minusDays(2));
        stats = this.runDutyAndGetStats();
        Assert.assertEquals((long)20L, (long)stats.get(Stats.Kill.AVAILABLE_SLOTS));
        Assert.assertEquals((long)1L, (long)stats.get(Stats.Kill.SUBMITTED_TASKS));
        Assert.assertEquals((long)20L, (long)stats.get(Stats.Kill.MAX_SLOTS));
        Assert.assertEquals((long)1L, (long)stats.get(Stats.Kill.ELIGIBLE_UNUSED_SEGMENTS, DS1_STAT_KEY));
        this.validateLastKillStateAndReset(DS1, YEAR_OLD);
    }

    @Test
    public void testLargeKillPeriod() {
        this.configBuilder.withCleanupPeriod(Duration.standardHours((long)1L));
        this.createAndAddUnusedSegment(DS1, YEAR_OLD, VERSION, NOW.minusDays(10));
        this.createAndAddUnusedSegment(DS1, MONTH_OLD, VERSION, NOW.minusDays(10));
        this.createAndAddUnusedSegment(DS1, DAY_OLD, VERSION, NOW.minusDays(2));
        this.createAndAddUnusedSegment(DS1, HOUR_OLD, VERSION, NOW.minusDays(2));
        this.createAndAddUnusedSegment(DS1, NEXT_DAY, VERSION, NOW.minusDays(10));
        this.createAndAddUnusedSegment(DS1, NEXT_MONTH, VERSION, NOW.minusDays(10));
        this.initDuty();
        CoordinatorRunStats stats = this.runDutyAndGetStats();
        Assert.assertEquals((long)10L, (long)stats.get(Stats.Kill.AVAILABLE_SLOTS));
        Assert.assertEquals((long)1L, (long)stats.get(Stats.Kill.SUBMITTED_TASKS));
        Assert.assertEquals((long)10L, (long)stats.get(Stats.Kill.MAX_SLOTS));
        Assert.assertEquals((long)2L, (long)stats.get(Stats.Kill.ELIGIBLE_UNUSED_SEGMENTS, DS1_STAT_KEY));
        this.validateLastKillStateAndReset(DS1, new Interval((ReadableInstant)YEAR_OLD.getStart(), (ReadableInstant)MONTH_OLD.getEnd()));
        stats = this.runDutyAndGetStats();
        Assert.assertEquals((long)10L, (long)stats.get(Stats.Kill.AVAILABLE_SLOTS));
        Assert.assertEquals((long)1L, (long)stats.get(Stats.Kill.SUBMITTED_TASKS));
        Assert.assertEquals((long)10L, (long)stats.get(Stats.Kill.MAX_SLOTS));
        Assert.assertEquals((long)2L, (long)stats.get(Stats.Kill.ELIGIBLE_UNUSED_SEGMENTS, DS1_STAT_KEY));
        this.validateLastKillStateAndReset(DS1, null);
    }

    @Test
    public void testKillTaskSlotAtCapacity() {
        this.dynamicConfigBuilder.withKillTaskSlotRatio(Double.valueOf(0.3));
        this.dynamicConfigBuilder.withMaxKillTaskSlots(Integer.valueOf(2));
        this.paramsBuilder.withDynamicConfigs(this.dynamicConfigBuilder.build());
        this.createAndAddUnusedSegment(DS1, YEAR_OLD, VERSION, NOW.minusDays(1));
        this.createAndAddUnusedSegment(DS1, MONTH_OLD, VERSION, NOW.minusDays(1));
        this.createAndAddUnusedSegment(DS1, DAY_OLD, VERSION, NOW.minusDays(1));
        this.createAndAddUnusedSegment(DS2, YEAR_OLD, VERSION, NOW.minusDays(1));
        this.createAndAddUnusedSegment(DS2, DAY_OLD, VERSION, NOW.minusDays(1));
        this.createAndAddUnusedSegment(DS2, NEXT_MONTH, VERSION, NOW.minusDays(1));
        this.createAndAddUnusedSegment(DS3, YEAR_OLD, VERSION, NOW.minusDays(1));
        this.initDuty();
        CoordinatorRunStats stats = this.runDutyAndGetStats();
        Assert.assertEquals((long)2L, (long)stats.get(Stats.Kill.AVAILABLE_SLOTS));
        Assert.assertEquals((long)2L, (long)stats.get(Stats.Kill.SUBMITTED_TASKS));
        Assert.assertEquals((long)2L, (long)stats.get(Stats.Kill.MAX_SLOTS));
        Assert.assertEquals((long)2L, (long)stats.get(Stats.Kill.ELIGIBLE_UNUSED_SEGMENTS, DS1_STAT_KEY));
        Assert.assertEquals((long)1L, (long)stats.get(Stats.Kill.ELIGIBLE_UNUSED_SEGMENTS, DS2_STAT_KEY));
        this.validateLastKillStateAndReset(DS1, new Interval((ReadableInstant)YEAR_OLD.getStart(), (ReadableInstant)MONTH_OLD.getEnd()));
        this.validateLastKillStateAndReset(DS2, YEAR_OLD);
        this.validateLastKillStateAndReset(DS3, null);
    }

    @Test
    public void testKillWithOverlordTaskSlotsFull() {
        this.dynamicConfigBuilder.withKillTaskSlotRatio(Double.valueOf(0.1));
        this.dynamicConfigBuilder.withMaxKillTaskSlots(Integer.valueOf(10));
        this.paramsBuilder.withDynamicConfigs(this.dynamicConfigBuilder.build());
        this.overlordClient = new TestOverlordClient(1, 5);
        this.overlordClient.addTask(DS1);
        this.initDuty();
        CoordinatorRunStats stats = this.runDutyAndGetStats();
        Assert.assertEquals((long)0L, (long)stats.get(Stats.Kill.AVAILABLE_SLOTS));
        Assert.assertEquals((long)0L, (long)stats.get(Stats.Kill.SUBMITTED_TASKS));
        Assert.assertEquals((long)0L, (long)stats.get(Stats.Kill.MAX_SLOTS));
    }

    @Test
    public void testKillWithOverlordTaskSlotAvailable() {
        this.dynamicConfigBuilder.withKillTaskSlotRatio(Double.valueOf(1.0));
        this.dynamicConfigBuilder.withMaxKillTaskSlots(Integer.valueOf(3));
        this.paramsBuilder.withDynamicConfigs(this.dynamicConfigBuilder.build());
        this.overlordClient = new TestOverlordClient(3, 10);
        this.overlordClient.addTask(DS1);
        this.overlordClient.addTask(DS1);
        this.initDuty();
        CoordinatorRunStats stats = this.runDutyAndGetStats();
        Assert.assertEquals((long)1L, (long)stats.get(Stats.Kill.AVAILABLE_SLOTS));
        Assert.assertEquals((long)0L, (long)stats.get(Stats.Kill.SUBMITTED_TASKS));
        Assert.assertEquals((long)3L, (long)stats.get(Stats.Kill.MAX_SLOTS));
    }

    @Test
    public void testDefaultKillTaskSlotStats() {
        this.dynamicConfigBuilder = CoordinatorDynamicConfig.builder();
        this.paramsBuilder.withDynamicConfigs(this.dynamicConfigBuilder.build());
        this.initDuty();
        CoordinatorRunStats stats = this.runDutyAndGetStats();
        Assert.assertEquals((long)1L, (long)stats.get(Stats.Kill.AVAILABLE_SLOTS));
        Assert.assertEquals((long)0L, (long)stats.get(Stats.Kill.SUBMITTED_TASKS));
        Assert.assertEquals((long)1L, (long)stats.get(Stats.Kill.MAX_SLOTS));
    }

    @Test
    public void testKillTaskSlotStats1() {
        this.dynamicConfigBuilder.withKillTaskSlotRatio(Double.valueOf(1.0));
        this.dynamicConfigBuilder.withMaxKillTaskSlots(Integer.valueOf(Integer.MAX_VALUE));
        this.paramsBuilder.withDynamicConfigs(this.dynamicConfigBuilder.build());
        this.initDuty();
        CoordinatorRunStats stats = this.runDutyAndGetStats();
        Assert.assertEquals((long)10L, (long)stats.get(Stats.Kill.AVAILABLE_SLOTS));
        Assert.assertEquals((long)0L, (long)stats.get(Stats.Kill.SUBMITTED_TASKS));
        Assert.assertEquals((long)10L, (long)stats.get(Stats.Kill.MAX_SLOTS));
    }

    @Test
    public void testKillTaskSlotStats2() {
        this.dynamicConfigBuilder.withKillTaskSlotRatio(Double.valueOf(0.0));
        this.dynamicConfigBuilder.withMaxKillTaskSlots(Integer.valueOf(Integer.MAX_VALUE));
        this.paramsBuilder.withDynamicConfigs(this.dynamicConfigBuilder.build());
        this.initDuty();
        CoordinatorRunStats stats = this.runDutyAndGetStats();
        Assert.assertEquals((long)0L, (long)stats.get(Stats.Kill.AVAILABLE_SLOTS));
        Assert.assertEquals((long)0L, (long)stats.get(Stats.Kill.SUBMITTED_TASKS));
        Assert.assertEquals((long)0L, (long)stats.get(Stats.Kill.MAX_SLOTS));
    }

    @Test
    public void testKillTaskSlotStats3() {
        this.dynamicConfigBuilder.withKillTaskSlotRatio(Double.valueOf(1.0));
        this.dynamicConfigBuilder.withMaxKillTaskSlots(Integer.valueOf(0));
        this.paramsBuilder.withDynamicConfigs(this.dynamicConfigBuilder.build());
        this.initDuty();
        CoordinatorRunStats stats = this.runDutyAndGetStats();
        Assert.assertEquals((long)0L, (long)stats.get(Stats.Kill.AVAILABLE_SLOTS));
        Assert.assertEquals((long)0L, (long)stats.get(Stats.Kill.SUBMITTED_TASKS));
        Assert.assertEquals((long)0L, (long)stats.get(Stats.Kill.MAX_SLOTS));
    }

    @Test
    public void testKillTaskSlotStats4() {
        this.dynamicConfigBuilder.withKillTaskSlotRatio(Double.valueOf(0.1));
        this.dynamicConfigBuilder.withMaxKillTaskSlots(Integer.valueOf(3));
        this.paramsBuilder.withDynamicConfigs(this.dynamicConfigBuilder.build());
        this.initDuty();
        CoordinatorRunStats stats = this.runDutyAndGetStats();
        Assert.assertEquals((long)1L, (long)stats.get(Stats.Kill.AVAILABLE_SLOTS));
        Assert.assertEquals((long)0L, (long)stats.get(Stats.Kill.SUBMITTED_TASKS));
        Assert.assertEquals((long)1L, (long)stats.get(Stats.Kill.MAX_SLOTS));
    }

    @Test
    public void testKillTaskSlotStats5() {
        this.dynamicConfigBuilder.withKillTaskSlotRatio(Double.valueOf(0.3));
        this.dynamicConfigBuilder.withMaxKillTaskSlots(Integer.valueOf(2));
        this.paramsBuilder.withDynamicConfigs(this.dynamicConfigBuilder.build());
        this.initDuty();
        CoordinatorRunStats stats = this.runDutyAndGetStats();
        Assert.assertEquals((long)2L, (long)stats.get(Stats.Kill.AVAILABLE_SLOTS));
        Assert.assertEquals((long)0L, (long)stats.get(Stats.Kill.SUBMITTED_TASKS));
        Assert.assertEquals((long)2L, (long)stats.get(Stats.Kill.MAX_SLOTS));
    }

    @Test
    public void testKillFirstHalfEternitySegment() {
        this.configBuilder.withIgnoreDurationToRetain(Boolean.valueOf(true));
        Interval firstHalfEternity = new Interval((ReadableInstant)DateTimes.MIN, (ReadableInstant)DateTimes.of((String)"2024"));
        this.createAndAddUnusedSegment(DS1, firstHalfEternity, VERSION, NOW.minusDays(60));
        this.initDuty();
        CoordinatorRunStats stats = this.runDutyAndGetStats();
        Assert.assertEquals((long)10L, (long)stats.get(Stats.Kill.AVAILABLE_SLOTS));
        Assert.assertEquals((long)1L, (long)stats.get(Stats.Kill.SUBMITTED_TASKS));
        Assert.assertEquals((long)10L, (long)stats.get(Stats.Kill.MAX_SLOTS));
        Assert.assertEquals((long)1L, (long)stats.get(Stats.Kill.ELIGIBLE_UNUSED_SEGMENTS, DS1_STAT_KEY));
        this.validateLastKillStateAndReset(DS1, firstHalfEternity);
    }

    @Test
    public void testKillEternitySegment() {
        this.configBuilder.withIgnoreDurationToRetain(Boolean.valueOf(true));
        this.createAndAddUnusedSegment(DS1, Intervals.ETERNITY, VERSION, NOW.minusDays(60));
        this.initDuty();
        CoordinatorRunStats stats = this.runDutyAndGetStats();
        Assert.assertEquals((long)10L, (long)stats.get(Stats.Kill.AVAILABLE_SLOTS));
        Assert.assertEquals((long)1L, (long)stats.get(Stats.Kill.SUBMITTED_TASKS));
        Assert.assertEquals((long)10L, (long)stats.get(Stats.Kill.MAX_SLOTS));
        Assert.assertEquals((long)1L, (long)stats.get(Stats.Kill.ELIGIBLE_UNUSED_SEGMENTS, DS1_STAT_KEY));
        this.validateLastKillStateAndReset(DS1, Intervals.ETERNITY);
    }

    @Test
    public void testKillSecondHalfEternitySegment() {
        this.configBuilder.withIgnoreDurationToRetain(Boolean.valueOf(true));
        Interval secondHalfEternity = new Interval((ReadableInstant)DateTimes.of((String)"1970"), (ReadableInstant)DateTimes.MAX);
        this.createAndAddUnusedSegment(DS1, secondHalfEternity, VERSION, NOW.minusDays(60));
        this.initDuty();
        CoordinatorRunStats stats = this.runDutyAndGetStats();
        Assert.assertEquals((long)10L, (long)stats.get(Stats.Kill.AVAILABLE_SLOTS));
        Assert.assertEquals((long)1L, (long)stats.get(Stats.Kill.SUBMITTED_TASKS));
        Assert.assertEquals((long)10L, (long)stats.get(Stats.Kill.MAX_SLOTS));
        Assert.assertEquals((long)1L, (long)stats.get(Stats.Kill.ELIGIBLE_UNUSED_SEGMENTS, DS1_STAT_KEY));
        this.validateLastKillStateAndReset(DS1, secondHalfEternity);
    }

    @Test
    public void testKillLargeIntervalSegments() {
        Interval largeTimeRange1 = Intervals.of((String)"1990-01-01T00Z/19940-01-01T00Z");
        Interval largeTimeRange2 = Intervals.of((String)"-19940-01-01T00Z/1970-01-01T00Z");
        this.createAndAddUnusedSegment(DS1, largeTimeRange1, VERSION, NOW.minusDays(60));
        this.createAndAddUnusedSegment(DS1, largeTimeRange2, VERSION, NOW.minusDays(60));
        this.initDuty();
        CoordinatorRunStats stats = this.runDutyAndGetStats();
        Assert.assertEquals((long)10L, (long)stats.get(Stats.Kill.AVAILABLE_SLOTS));
        Assert.assertEquals((long)1L, (long)stats.get(Stats.Kill.SUBMITTED_TASKS));
        Assert.assertEquals((long)10L, (long)stats.get(Stats.Kill.MAX_SLOTS));
        Assert.assertEquals((long)2L, (long)stats.get(Stats.Kill.ELIGIBLE_UNUSED_SEGMENTS, DS1_STAT_KEY));
        this.validateLastKillStateAndReset(DS1, new Interval((ReadableInstant)largeTimeRange2.getStart(), (ReadableInstant)largeTimeRange1.getEnd()));
    }

    @Test
    public void testKillMultipleSegmentsInSameInterval() {
        this.configBuilder.withIgnoreDurationToRetain(Boolean.valueOf(true));
        this.createAndAddUnusedSegment(DS1, YEAR_OLD, VERSION, NOW.minusDays(10));
        this.createAndAddUnusedSegment(DS1, YEAR_OLD, "v2", NOW.minusDays(10));
        this.createAndAddUnusedSegment(DS1, YEAR_OLD, "v3", NOW.minusDays(10));
        this.initDuty();
        CoordinatorRunStats stats = this.runDutyAndGetStats();
        Assert.assertEquals((long)10L, (long)stats.get(Stats.Kill.AVAILABLE_SLOTS));
        Assert.assertEquals((long)1L, (long)stats.get(Stats.Kill.SUBMITTED_TASKS));
        Assert.assertEquals((long)10L, (long)stats.get(Stats.Kill.MAX_SLOTS));
        Assert.assertEquals((long)3L, (long)stats.get(Stats.Kill.ELIGIBLE_UNUSED_SEGMENTS, DS1_STAT_KEY));
        this.validateLastKillStateAndReset(DS1, YEAR_OLD);
    }

    @Test
    public void testLimitToPeriod_empty() {
        Assert.assertEquals(Collections.emptyList(), (Object)KillUnusedSegments.limitToPeriod(Collections.emptyList(), (Period)Period.ZERO));
    }

    @Test
    public void testLimitToPeriod_zeroPeriod() {
        Assert.assertEquals((Object)ImmutableList.of((Object)DAY_OLD, (Object)YEAR_OLD, (Object)MONTH_OLD), (Object)KillUnusedSegments.limitToPeriod((List)ImmutableList.of((Object)DAY_OLD, (Object)YEAR_OLD, (Object)MONTH_OLD), (Period)Period.ZERO));
    }

    @Test
    public void testLimitToPeriod_oneSecondPeriod() {
        Assert.assertEquals((Object)ImmutableList.of((Object)YEAR_OLD), (Object)KillUnusedSegments.limitToPeriod((List)ImmutableList.of((Object)DAY_OLD, (Object)YEAR_OLD, (Object)MONTH_OLD), (Period)Period.seconds((int)1)));
    }

    @Test
    public void testLimitToPeriod_360DayPeriod() {
        Assert.assertEquals((Object)ImmutableList.of((Object)YEAR_OLD, (Object)MONTH_OLD), (Object)KillUnusedSegments.limitToPeriod((List)ImmutableList.of((Object)DAY_OLD, (Object)YEAR_OLD, (Object)MONTH_OLD), (Period)Period.days((int)360)));
    }

    @Test
    public void testLimitToPeriod_1YearPeriod() {
        Assert.assertEquals((Object)ImmutableList.of((Object)DAY_OLD, (Object)YEAR_OLD, (Object)MONTH_OLD), (Object)KillUnusedSegments.limitToPeriod((List)ImmutableList.of((Object)DAY_OLD, (Object)YEAR_OLD, (Object)MONTH_OLD), (Period)Period.years((int)1)));
    }

    private void validateLastKillStateAndReset(String dataSource, @Nullable Interval expectedKillInterval) {
        Interval observedLastKillInterval = this.overlordClient.getLastKillInterval(dataSource);
        String observedLastKillTaskId = this.overlordClient.getLastKillTaskId(dataSource);
        Assert.assertEquals((Object)expectedKillInterval, (Object)observedLastKillInterval);
        String expectedKillTaskId = null;
        if (expectedKillInterval != null) {
            expectedKillTaskId = TestOverlordClient.getTaskId("coordinator-issued", dataSource, expectedKillInterval);
        }
        Assert.assertEquals(expectedKillTaskId, (Object)observedLastKillTaskId);
        this.overlordClient.deleteLastKillTaskId(dataSource);
        this.overlordClient.deleteLastKillInterval(dataSource);
    }

    private DataSegment createAndAddUsedSegment(String dataSource, Interval interval, String version) {
        DataSegment segment = this.createSegment(dataSource, interval, version);
        SqlSegmentsMetadataManagerTestBase.publishSegment(this.connector, this.config, TestHelper.makeJsonMapper(), segment);
        return segment;
    }

    private void createAndAddUnusedSegment(String dataSource, Interval interval, String version, DateTime lastUpdatedTime) {
        DataSegment segment = this.createAndAddUsedSegment(dataSource, interval, version);
        int numUpdatedSegments = SqlSegmentsMetadataManagerTestBase.markSegmentsAsUnused(Set.of(segment.getId()), (SQLMetadataConnector)this.derbyConnectorRule.getConnector(), (MetadataStorageTablesConfig)this.derbyConnectorRule.metadataTablesConfigSupplier().get(), TestHelper.JSON_MAPPER, lastUpdatedTime);
        Assert.assertEquals((long)1L, (long)numUpdatedSegments);
    }

    private DataSegment createSegment(String dataSource, Interval interval, String version) {
        return new DataSegment(dataSource, interval, version, new HashMap(), new ArrayList(), new ArrayList(), (ShardSpec)NoneShardSpec.instance(), Integer.valueOf(1), 0L);
    }

    private void initDuty() {
        this.killDuty = new KillUnusedSegments(this.storageCoordinator, (OverlordClient)this.overlordClient, this.configBuilder.build());
    }

    private CoordinatorRunStats runDutyAndGetStats() {
        this.paramsBuilder.withDynamicConfigs(this.dynamicConfigBuilder.build());
        DruidCoordinatorRuntimeParams params = this.killDuty.run(this.paramsBuilder.build());
        return params.getCoordinatorStats();
    }

    private static class TestOverlordClient
    extends NoopOverlordClient {
        private final List<TaskStatusPlus> taskStatuses = new ArrayList<TaskStatusPlus>();
        private final Map<String, Interval> observedDatasourceToLastKillInterval = new HashMap<String, Interval>();
        private final Map<String, String> observedDatasourceToLastKillTaskId = new HashMap<String, String>();
        private final IndexingTotalWorkerCapacityInfo capcityInfo;
        private int taskIdSuffix = 0;

        TestOverlordClient() {
            this.capcityInfo = new IndexingTotalWorkerCapacityInfo(5, 10);
        }

        TestOverlordClient(int currentClusterCapacity, int maxWorkerCapacity) {
            this.capcityInfo = new IndexingTotalWorkerCapacityInfo(currentClusterCapacity, maxWorkerCapacity);
        }

        static String getTaskId(String idPrefix, String dataSource, Interval interval) {
            return idPrefix + "-" + dataSource + "-" + interval;
        }

        void addTask(String datasource) {
            this.taskStatuses.add(new TaskStatusPlus("coordinator-issued__" + datasource + "__" + this.taskIdSuffix++, null, "kill", DateTimes.EPOCH, DateTimes.EPOCH, TaskState.RUNNING, RunnerTaskState.RUNNING, Long.valueOf(100L), TaskLocation.unknown(), datasource, null));
        }

        public ListenableFuture<CloseableIterator<TaskStatusPlus>> taskStatuses(@Nullable String state, @Nullable String dataSource, @Nullable Integer maxCompletedTasks) {
            return Futures.immediateFuture((Object)CloseableIterators.wrap(this.taskStatuses.iterator(), null));
        }

        public ListenableFuture<String> runKillTask(String idPrefix, String dataSource, Interval interval, @Nullable List<String> versions, @Nullable Integer maxSegmentsToKill, @Nullable DateTime maxUsedStatusLastUpdatedTime) {
            String taskId = TestOverlordClient.getTaskId(idPrefix, dataSource, interval);
            this.observedDatasourceToLastKillInterval.put(dataSource, interval);
            this.observedDatasourceToLastKillTaskId.put(dataSource, taskId);
            return Futures.immediateFuture((Object)taskId);
        }

        public ListenableFuture<IndexingTotalWorkerCapacityInfo> getTotalWorkerCapacity() {
            return Futures.immediateFuture((Object)this.capcityInfo);
        }

        public ListenableFuture<List<IndexingWorkerInfo>> getWorkers() {
            return Futures.immediateFuture((Object)ImmutableList.of((Object)new IndexingWorkerInfo(new IndexingWorker("http", "localhost", "1.2.3.4", 3, "2"), 0, Collections.emptySet(), Collections.emptyList(), DateTimes.of((String)"2000"), null)));
        }

        Interval getLastKillInterval(String dataSource) {
            return this.observedDatasourceToLastKillInterval.get(dataSource);
        }

        void deleteLastKillInterval(String dataSource) {
            this.observedDatasourceToLastKillInterval.remove(dataSource);
        }

        String getLastKillTaskId(String dataSource) {
            String lastKillTaskId = this.observedDatasourceToLastKillTaskId.get(dataSource);
            this.observedDatasourceToLastKillTaskId.remove(dataSource);
            return lastKillTaskId;
        }

        void deleteLastKillTaskId(String dataSource) {
            this.observedDatasourceToLastKillTaskId.remove(dataSource);
        }
    }
}

