/*
 * Decompiled with CFR 0.152.
 */
package org.apache.druid.indexing.common.task;

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.NavigableMap;
import java.util.Set;
import java.util.TreeMap;
import java.util.stream.Collectors;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import org.apache.druid.error.InvalidInput;
import org.apache.druid.indexer.TaskStatus;
import org.apache.druid.indexing.common.KillTaskReport;
import org.apache.druid.indexing.common.TaskLock;
import org.apache.druid.indexing.common.TaskReport;
import org.apache.druid.indexing.common.TaskToolbox;
import org.apache.druid.indexing.common.actions.MarkSegmentsAsUnusedAction;
import org.apache.druid.indexing.common.actions.RetrieveUnusedSegmentsAction;
import org.apache.druid.indexing.common.actions.RetrieveUsedSegmentsAction;
import org.apache.druid.indexing.common.actions.SegmentNukeAction;
import org.apache.druid.indexing.common.actions.TaskActionClient;
import org.apache.druid.indexing.common.actions.TaskLocks;
import org.apache.druid.indexing.common.task.AbstractFixedIntervalTask;
import org.apache.druid.indexing.overlord.Segments;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.server.security.ResourceAction;
import org.apache.druid.timeline.DataSegment;
import org.joda.time.DateTime;
import org.joda.time.Interval;

public class KillUnusedSegmentsTask
extends AbstractFixedIntervalTask {
    public static final String TYPE = "kill";
    private static final Logger LOG = new Logger(KillUnusedSegmentsTask.class);
    private static final int DEFAULT_SEGMENT_NUKE_BATCH_SIZE = 100;
    @Deprecated
    private final boolean markAsUnused;
    private final int batchSize;
    @Nullable
    private final Integer limit;
    @Nullable
    private final DateTime maxUsedStatusLastUpdatedTime;

    @JsonCreator
    public KillUnusedSegmentsTask(@JsonProperty(value="id") String id, @JsonProperty(value="dataSource") String dataSource, @JsonProperty(value="interval") Interval interval, @JsonProperty(value="context") Map<String, Object> context, @JsonProperty(value="markAsUnused") @Deprecated Boolean markAsUnused, @JsonProperty(value="batchSize") Integer batchSize, @JsonProperty(value="limit") @Nullable Integer limit, @JsonProperty(value="maxUsedStatusLastUpdatedTime") @Nullable DateTime maxUsedStatusLastUpdatedTime) {
        super(KillUnusedSegmentsTask.getOrMakeId(id, TYPE, dataSource, interval), dataSource, interval, context);
        this.markAsUnused = markAsUnused != null && markAsUnused != false;
        int n = this.batchSize = batchSize != null ? batchSize : 100;
        if (this.batchSize <= 0) {
            throw InvalidInput.exception((String)"batchSize[%d] must be a positive integer.", (Object[])new Object[]{batchSize});
        }
        if (limit != null && limit <= 0) {
            throw InvalidInput.exception((String)"limit[%d] must be a positive integer.", (Object[])new Object[]{limit});
        }
        if (limit != null && Boolean.TRUE.equals(markAsUnused)) {
            throw InvalidInput.exception((String)"limit[%d] cannot be provided when markAsUnused is enabled.", (Object[])new Object[]{limit});
        }
        this.limit = limit;
        this.maxUsedStatusLastUpdatedTime = maxUsedStatusLastUpdatedTime;
    }

    @Deprecated
    @JsonProperty
    @JsonInclude(value=JsonInclude.Include.NON_DEFAULT)
    public boolean isMarkAsUnused() {
        return this.markAsUnused;
    }

    @JsonProperty
    @JsonInclude(value=JsonInclude.Include.NON_DEFAULT)
    public int getBatchSize() {
        return this.batchSize;
    }

    @Nullable
    @JsonProperty
    public Integer getLimit() {
        return this.limit;
    }

    @Nullable
    @JsonProperty
    public DateTime getMaxUsedStatusLastUpdatedTime() {
        return this.maxUsedStatusLastUpdatedTime;
    }

    @Override
    public String getType() {
        return TYPE;
    }

    @Override
    @Nonnull
    @JsonIgnore
    public Set<ResourceAction> getInputSourceResources() {
        return ImmutableSet.of();
    }

    @Override
    public TaskStatus runTask(TaskToolbox toolbox) throws Exception {
        int numSegmentsMarkedAsUnused;
        int numSegmentsKilled = 0;
        int numBatchesProcessed = 0;
        if (this.markAsUnused) {
            numSegmentsMarkedAsUnused = toolbox.getTaskActionClient().submit(new MarkSegmentsAsUnusedAction(this.getDataSource(), this.getInterval()));
            LOG.info("Marked [%d] segments of datasource[%s] in interval[%s] as unused.", new Object[]{numSegmentsMarkedAsUnused, this.getDataSource(), this.getInterval()});
        } else {
            numSegmentsMarkedAsUnused = 0;
        }
        int nextBatchSize = this.computeNextBatchSize(numSegmentsKilled);
        Integer numTotalBatches = this.getNumTotalBatches();
        LOG.info("Starting kill for datasource[%s] in interval[%s] with batchSize[%d], up to limit[%d] segments before maxUsedStatusLastUpdatedTime[%s] will be deleted%s", new Object[]{this.getDataSource(), this.getInterval(), this.batchSize, this.limit, this.maxUsedStatusLastUpdatedTime, numTotalBatches != null ? StringUtils.format((String)" in [%d] batches.", (Object[])new Object[]{numTotalBatches}) : "."});
        RetrieveUsedSegmentsAction retrieveUsedSegmentsAction = new RetrieveUsedSegmentsAction(this.getDataSource(), null, (Collection<Interval>)ImmutableList.of((Object)this.getInterval()), Segments.INCLUDING_OVERSHADOWED);
        HashSet usedSegmentLoadSpecs = new HashSet(toolbox.getTaskActionClient().submit(retrieveUsedSegmentsAction).stream().map(DataSegment::getLoadSpec).collect(Collectors.toSet()));
        while (nextBatchSize > 0) {
            List<DataSegment> unusedSegments = toolbox.getTaskActionClient().submit(new RetrieveUnusedSegmentsAction(this.getDataSource(), this.getInterval(), nextBatchSize, this.maxUsedStatusLastUpdatedTime));
            NavigableMap<DateTime, List<TaskLock>> taskLockMap = this.getNonRevokedTaskLockMap(toolbox.getTaskActionClient());
            if (!TaskLocks.isLockCoversSegments(taskLockMap, unusedSegments)) {
                throw new ISE("Locks[%s] for task[%s] can't cover segments[%s]", new Object[]{taskLockMap.values().stream().flatMap(Collection::stream).collect(Collectors.toList()), this.getId(), unusedSegments});
            }
            toolbox.getTaskActionClient().submit(new SegmentNukeAction(new HashSet<DataSegment>(unusedSegments)));
            List segmentsToBeKilled = unusedSegments.stream().filter(unusedSegment -> unusedSegment.getLoadSpec() == null || !usedSegmentLoadSpecs.contains(unusedSegment.getLoadSpec())).collect(Collectors.toList());
            toolbox.getDataSegmentKiller().kill(segmentsToBeKilled);
            LOG.info("Processed [%d] batches for kill task[%s].", new Object[]{++numBatchesProcessed, this.getId()});
            nextBatchSize = this.computeNextBatchSize(numSegmentsKilled += unusedSegments.size());
            if (unusedSegments.size() != 0 && (null == numTotalBatches || numBatchesProcessed < numTotalBatches)) continue;
        }
        String taskId = this.getId();
        LOG.info("Finished kill task[%s] for dataSource[%s] and interval[%s]. Deleted total [%d] unused segments in [%d] batches.", new Object[]{taskId, this.getDataSource(), this.getInterval(), numSegmentsKilled, numBatchesProcessed});
        KillTaskReport.Stats stats = new KillTaskReport.Stats(numSegmentsKilled, numBatchesProcessed, numSegmentsMarkedAsUnused);
        toolbox.getTaskReportFileWriter().write(taskId, TaskReport.buildTaskReports(new KillTaskReport(taskId, stats)));
        return TaskStatus.success((String)taskId);
    }

    @JsonIgnore
    @Nullable
    @VisibleForTesting
    Integer getNumTotalBatches() {
        return null != this.limit ? Integer.valueOf((int)Math.ceil((double)this.limit.intValue() / (double)this.batchSize)) : null;
    }

    @JsonIgnore
    @VisibleForTesting
    int computeNextBatchSize(int numSegmentsKilled) {
        return null != this.limit ? Math.min(this.limit - numSegmentsKilled, this.batchSize) : this.batchSize;
    }

    private NavigableMap<DateTime, List<TaskLock>> getNonRevokedTaskLockMap(TaskActionClient client) throws IOException {
        TreeMap<DateTime, List<TaskLock>> taskLockMap = new TreeMap<DateTime, List<TaskLock>>();
        KillUnusedSegmentsTask.getTaskLocks(client).forEach(taskLock -> {
            if (!taskLock.isRevoked()) {
                taskLockMap.computeIfAbsent(taskLock.getInterval().getStart(), k -> new ArrayList()).add(taskLock);
            }
        });
        return taskLockMap;
    }
}

