/*
 * Decompiled with CFR 0.152.
 */
package org.apache.druid.server.coordinator.duty;

import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import javax.annotation.Nullable;
import org.apache.druid.client.indexing.NoopOverlordClient;
import org.apache.druid.indexer.RunnerTaskState;
import org.apache.druid.indexer.TaskLocation;
import org.apache.druid.indexer.TaskState;
import org.apache.druid.indexer.TaskStatusPlus;
import org.apache.druid.java.util.common.CloseableIterators;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.parsers.CloseableIterator;
import org.apache.druid.rpc.indexing.OverlordClient;
import org.apache.druid.server.coordinator.CoordinatorDynamicConfig;
import org.apache.druid.server.coordinator.DruidCoordinatorRuntimeParams;
import org.apache.druid.server.coordinator.duty.KillStalePendingSegments;
import org.apache.druid.server.coordinator.stats.CoordinatorRunStats;
import org.apache.druid.server.coordinator.stats.Dimension;
import org.apache.druid.server.coordinator.stats.RowKey;
import org.apache.druid.server.coordinator.stats.Stats;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.partition.NumberedShardSpec;
import org.apache.druid.timeline.partition.ShardSpec;
import org.joda.time.DateTime;
import org.joda.time.Interval;
import org.joda.time.ReadableInstant;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

public class KillStalePendingSegmentsTest {
    private TestOverlordClient overlordClient;
    private KillStalePendingSegments killDuty;

    @Before
    public void setup() {
        this.overlordClient = new TestOverlordClient();
        this.killDuty = new KillStalePendingSegments((OverlordClient)this.overlordClient);
    }

    @Test
    public void testRetentionStarts1DayBeforeNowWhenNoKnownTask() {
        DruidCoordinatorRuntimeParams params = this.createParamsWithDatasources("wiki").build();
        this.killDuty.run(params);
        Interval observedKillInterval = (Interval)this.overlordClient.observedKillIntervals.get("wiki");
        Assert.assertEquals((Object)DateTimes.MIN, (Object)observedKillInterval.getStart());
        DateTime expectedCutoffTime = DateTimes.nowUtc().minusDays(1);
        Assert.assertTrue((expectedCutoffTime.getMillis() - observedKillInterval.getEnd().getMillis() <= 100L ? 1 : 0) != 0);
    }

    @Test
    public void testRetentionStarts1DayBeforeEarliestActiveTask() {
        DateTime startOfEarliestActiveTask = DateTimes.of((String)"2023-01-01");
        this.overlordClient.addTaskAndSegment("wiki", startOfEarliestActiveTask, TaskState.RUNNING);
        this.overlordClient.addTaskAndSegment("wiki", startOfEarliestActiveTask.plusHours(2), TaskState.RUNNING);
        this.overlordClient.addTaskAndSegment("wiki", startOfEarliestActiveTask.plusDays(1), TaskState.RUNNING);
        this.overlordClient.addTaskAndSegment("wiki", startOfEarliestActiveTask.plusHours(3), TaskState.RUNNING);
        DruidCoordinatorRuntimeParams params = this.createParamsWithDatasources("wiki").build();
        this.killDuty.run(params);
        Interval observedKillInterval = (Interval)this.overlordClient.observedKillIntervals.get("wiki");
        Assert.assertEquals((Object)DateTimes.MIN, (Object)observedKillInterval.getStart());
        Assert.assertEquals((Object)startOfEarliestActiveTask.minusDays(1), (Object)observedKillInterval.getEnd());
    }

    @Test
    public void testRetentionStarts1DayBeforeLatestCompletedTask() {
        DateTime startOfLatestCompletedTask = DateTimes.of((String)"2023-01-01");
        this.overlordClient.addTaskAndSegment("wiki", startOfLatestCompletedTask, TaskState.FAILED);
        this.overlordClient.addTaskAndSegment("wiki", startOfLatestCompletedTask.minusHours(2), TaskState.SUCCESS);
        this.overlordClient.addTaskAndSegment("wiki", startOfLatestCompletedTask.minusDays(2), TaskState.FAILED);
        this.overlordClient.addTaskAndSegment("wiki", startOfLatestCompletedTask.minusDays(3), TaskState.SUCCESS);
        DruidCoordinatorRuntimeParams params = this.createParamsWithDatasources("wiki").build();
        this.killDuty.run(params);
        Interval observedKillInterval = (Interval)this.overlordClient.observedKillIntervals.get("wiki");
        Assert.assertEquals((Object)DateTimes.MIN, (Object)observedKillInterval.getStart());
        Assert.assertEquals((Object)startOfLatestCompletedTask.minusDays(1), (Object)observedKillInterval.getEnd());
        CoordinatorRunStats stats = params.getCoordinatorStats();
        Assert.assertEquals((long)2L, (long)stats.get(Stats.Kill.PENDING_SEGMENTS, RowKey.of((Dimension)Dimension.DATASOURCE, (String)"wiki")));
    }

    @Test
    public void testRetentionStarts1DayBeforeLatestCompletedOrEarliestActiveTask() {
        DateTime startOfLatestCompletedTask = DateTimes.of((String)"2023-02-01");
        this.overlordClient.addTaskAndSegment("wiki", startOfLatestCompletedTask, TaskState.FAILED);
        DateTime startOfEarliestActiveTask = DateTimes.of((String)"2023-01-01");
        this.overlordClient.addTaskAndSegment("koala", startOfEarliestActiveTask, TaskState.RUNNING);
        DruidCoordinatorRuntimeParams params = this.createParamsWithDatasources("wiki", "koala").build();
        this.killDuty.run(params);
        DateTime earliestEligibleTask = DateTimes.earlierOf((DateTime)startOfEarliestActiveTask, (DateTime)startOfLatestCompletedTask);
        Interval wikiKillInterval = (Interval)this.overlordClient.observedKillIntervals.get("wiki");
        Assert.assertEquals((Object)DateTimes.MIN, (Object)wikiKillInterval.getStart());
        Assert.assertEquals((Object)earliestEligibleTask.minusDays(1), (Object)wikiKillInterval.getEnd());
        Interval koalaKillInterval = (Interval)this.overlordClient.observedKillIntervals.get("koala");
        Assert.assertEquals((Object)DateTimes.MIN, (Object)koalaKillInterval.getStart());
        Assert.assertEquals((Object)earliestEligibleTask.minusDays(1), (Object)wikiKillInterval.getEnd());
    }

    @Test
    public void testPendingSegmentOfDisallowedDatasourceIsNotDeleted() {
        DruidCoordinatorRuntimeParams params = this.createParamsWithDatasources("wiki", "koala").withDynamicConfigs(CoordinatorDynamicConfig.builder().withDatasourcesToNotKillPendingSegmentsIn(Collections.singleton("koala")).build()).build();
        DateTime startOfLatestCompletedTask = DateTimes.of((String)"2023-01-01");
        this.overlordClient.addTaskAndSegment("wiki", startOfLatestCompletedTask, TaskState.SUCCESS);
        this.overlordClient.addTaskAndSegment("wiki", startOfLatestCompletedTask.minusDays(3), TaskState.SUCCESS);
        this.overlordClient.addTaskAndSegment("wiki", startOfLatestCompletedTask.minusDays(5), TaskState.SUCCESS);
        this.overlordClient.addTaskAndSegment("koala", startOfLatestCompletedTask, TaskState.SUCCESS);
        this.overlordClient.addTaskAndSegment("koala", startOfLatestCompletedTask.minusDays(3), TaskState.SUCCESS);
        this.overlordClient.addTaskAndSegment("koala", startOfLatestCompletedTask.minusDays(5), TaskState.SUCCESS);
        this.killDuty.run(params);
        CoordinatorRunStats stats = params.getCoordinatorStats();
        Assert.assertTrue((boolean)this.overlordClient.observedKillIntervals.containsKey("wiki"));
        Assert.assertEquals((long)2L, (long)stats.get(Stats.Kill.PENDING_SEGMENTS, RowKey.of((Dimension)Dimension.DATASOURCE, (String)"wiki")));
        Assert.assertFalse((boolean)this.overlordClient.observedKillIntervals.containsKey("koala"));
        Assert.assertEquals((long)0L, (long)stats.get(Stats.Kill.PENDING_SEGMENTS, RowKey.of((Dimension)Dimension.DATASOURCE, (String)"koala")));
    }

    private DruidCoordinatorRuntimeParams.Builder createParamsWithDatasources(String ... datasources) {
        DruidCoordinatorRuntimeParams.Builder builder = DruidCoordinatorRuntimeParams.newBuilder((DateTime)DateTimes.nowUtc());
        HashSet<DataSegment> usedSegments = new HashSet<DataSegment>();
        for (String datasource : datasources) {
            usedSegments.add(DataSegment.builder().dataSource(datasource).interval(Intervals.ETERNITY).version("v1").shardSpec((ShardSpec)new NumberedShardSpec(0, 1)).size(100L).build());
        }
        return builder.withUsedSegments(usedSegments);
    }

    private static class TestOverlordClient
    extends NoopOverlordClient {
        private final List<TaskStatusPlus> taskStatuses = new ArrayList<TaskStatusPlus>();
        private final Map<String, List<DateTime>> datasourceToPendingSegments = new HashMap<String, List<DateTime>>();
        private final Map<String, Interval> observedKillIntervals = new HashMap<String, Interval>();
        private int taskIdSuffix = 0;

        private TestOverlordClient() {
        }

        void addTaskAndSegment(String datasource, DateTime createdTime, TaskState state) {
            this.taskStatuses.add(new TaskStatusPlus(datasource + "__" + this.taskIdSuffix++, null, null, createdTime, createdTime, state, state.isComplete() ? RunnerTaskState.NONE : RunnerTaskState.RUNNING, Long.valueOf(100L), TaskLocation.unknown(), datasource, null));
            this.datasourceToPendingSegments.computeIfAbsent(datasource, ds -> new ArrayList()).add(createdTime.plusMinutes(5));
        }

        @Override
        public ListenableFuture<CloseableIterator<TaskStatusPlus>> taskStatuses(@Nullable String state, @Nullable String dataSource, @Nullable Integer maxCompletedTasks) {
            return Futures.immediateFuture((Object)CloseableIterators.wrap(this.taskStatuses.iterator(), null));
        }

        @Override
        public ListenableFuture<Integer> killPendingSegments(String dataSource, Interval interval) {
            this.observedKillIntervals.put(dataSource, interval);
            List<DateTime> pendingSegments = this.datasourceToPendingSegments.remove(dataSource);
            if (pendingSegments == null || pendingSegments.isEmpty()) {
                return Futures.immediateFuture((Object)0);
            }
            ArrayList<DateTime> remainingPendingSegments = new ArrayList<DateTime>();
            int numDeletedPendingSegments = 0;
            for (DateTime createdTime : pendingSegments) {
                if (createdTime.isBefore((ReadableInstant)interval.getEnd())) {
                    ++numDeletedPendingSegments;
                    continue;
                }
                remainingPendingSegments.add(createdTime);
            }
            if (remainingPendingSegments.size() > 0) {
                this.datasourceToPendingSegments.put(dataSource, remainingPendingSegments);
            }
            return Futures.immediateFuture((Object)numDeletedPendingSegments);
        }
    }

    private static class DS {
        static final String WIKI = "wiki";
        static final String KOALA = "koala";

        private DS() {
        }
    }
}

