/*
 * Decompiled with CFR 0.152.
 */
package org.apache.druid.server.coordinator.duty;

import com.fasterxml.jackson.annotation.JacksonInject;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.google.common.annotations.VisibleForTesting;
import com.google.inject.Inject;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.apache.druid.client.indexing.ClientCompactionTaskDimensionsSpec;
import org.apache.druid.client.indexing.ClientCompactionTaskGranularitySpec;
import org.apache.druid.client.indexing.ClientCompactionTaskQuery;
import org.apache.druid.client.indexing.ClientCompactionTaskQueryTuningConfig;
import org.apache.druid.client.indexing.ClientCompactionTaskTransformSpec;
import org.apache.druid.client.indexing.IndexingServiceClient;
import org.apache.druid.client.indexing.TaskPayloadResponse;
import org.apache.druid.indexer.TaskStatusPlus;
import org.apache.druid.indexer.partitions.DimensionRangePartitionsSpec;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.granularity.Granularity;
import org.apache.druid.java.util.common.granularity.GranularityType;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.server.coordinator.AutoCompactionSnapshot;
import org.apache.druid.server.coordinator.CompactionStatistics;
import org.apache.druid.server.coordinator.CoordinatorCompactionConfig;
import org.apache.druid.server.coordinator.DataSourceCompactionConfig;
import org.apache.druid.server.coordinator.DruidCoordinatorConfig;
import org.apache.druid.server.coordinator.DruidCoordinatorRuntimeParams;
import org.apache.druid.server.coordinator.duty.CompactionSegmentIterator;
import org.apache.druid.server.coordinator.duty.CompactionSegmentSearchPolicy;
import org.apache.druid.server.coordinator.duty.CoordinatorCustomDuty;
import org.apache.druid.server.coordinator.stats.CoordinatorRunStats;
import org.apache.druid.server.coordinator.stats.Dimension;
import org.apache.druid.server.coordinator.stats.RowKey;
import org.apache.druid.server.coordinator.stats.Stats;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.SegmentTimeline;
import org.joda.time.Interval;
import org.joda.time.Period;
import org.joda.time.ReadableInterval;

public class CompactSegments
implements CoordinatorCustomDuty {
    public static final String COMPACTION_TASK_TYPE = "compact";
    public static final String STORE_COMPACTION_STATE_KEY = "storeCompactionState";
    private static final Logger LOG = new Logger(CompactSegments.class);
    private final CompactionSegmentSearchPolicy policy;
    private final boolean skipLockedIntervals;
    private final IndexingServiceClient indexingServiceClient;
    private final AtomicReference<Map<String, AutoCompactionSnapshot>> autoCompactionSnapshotPerDataSource = new AtomicReference();

    @Inject
    @JsonCreator
    public CompactSegments(@JacksonInject DruidCoordinatorConfig config, @JacksonInject CompactionSegmentSearchPolicy policy, @JacksonInject IndexingServiceClient indexingServiceClient) {
        this.policy = policy;
        this.indexingServiceClient = indexingServiceClient;
        this.skipLockedIntervals = config.getCompactionSkipLockedIntervals();
        this.resetCompactionSnapshot();
        LOG.info("Scheduling compaction with skipLockedIntervals [%s]", new Object[]{this.skipLockedIntervals});
    }

    @VisibleForTesting
    public boolean isSkipLockedIntervals() {
        return this.skipLockedIntervals;
    }

    @Override
    public DruidCoordinatorRuntimeParams run(DruidCoordinatorRuntimeParams params) {
        LOG.info("Running CompactSegments duty", new Object[0]);
        CoordinatorCompactionConfig dynamicConfig = params.getCoordinatorCompactionConfig();
        int maxCompactionTaskSlots = dynamicConfig.getMaxCompactionTaskSlots();
        if (maxCompactionTaskSlots <= 0) {
            LOG.info("Skipping compaction as maxCompactionTaskSlots is [%d].", new Object[]{maxCompactionTaskSlots});
            this.resetCompactionSnapshot();
            return params;
        }
        List<DataSourceCompactionConfig> compactionConfigList = dynamicConfig.getCompactionConfigs();
        if (compactionConfigList == null || compactionConfigList.isEmpty()) {
            LOG.info("Skipping compaction as compaction config list is empty.", new Object[0]);
            this.resetCompactionSnapshot();
            return params;
        }
        Map<String, DataSourceCompactionConfig> compactionConfigs = compactionConfigList.stream().collect(Collectors.toMap(DataSourceCompactionConfig::getDataSource, Function.identity()));
        HashMap<String, List<Interval>> intervalsToSkipCompaction = new HashMap<String, List<Interval>>();
        int busyCompactionTaskSlots = 0;
        List<TaskStatusPlus> compactionTasks = CompactSegments.filterNonCompactionTasks(this.indexingServiceClient.getActiveTasks());
        for (TaskStatusPlus status : compactionTasks) {
            DataSourceCompactionConfig dataSourceCompactionConfig;
            TaskPayloadResponse response = this.indexingServiceClient.getTaskPayload(status.getId());
            if (response == null) {
                throw new ISE("Could not find payload for active compaction task[%s]", new Object[]{status.getId()});
            }
            if (!COMPACTION_TASK_TYPE.equals(response.getPayload().getType())) {
                throw new ISE("Payload of active compaction task[%s] is of invalid type[%s]", new Object[]{status.getId(), response.getPayload().getType()});
            }
            ClientCompactionTaskQuery compactionTaskQuery = (ClientCompactionTaskQuery)response.getPayload();
            if (this.cancelTaskIfGranularityChanged(compactionTaskQuery, dataSourceCompactionConfig = compactionConfigs.get(status.getDataSource()))) continue;
            Interval interval = compactionTaskQuery.getIoConfig().getInputSpec().getInterval();
            intervalsToSkipCompaction.computeIfAbsent(status.getDataSource(), k -> new ArrayList()).add(interval);
            busyCompactionTaskSlots += CompactSegments.findMaxNumTaskSlotsUsedByOneCompactionTask(compactionTaskQuery.getTuningConfig());
        }
        this.getLockedIntervalsToSkip(compactionConfigList).forEach((dataSource, intervals) -> intervalsToSkipCompaction.computeIfAbsent((String)dataSource, ds -> new ArrayList()).addAll(intervals));
        Map<String, SegmentTimeline> dataSources = params.getUsedSegmentsTimelinesPerDataSource();
        CompactionSegmentIterator iterator = this.policy.reset(compactionConfigs, dataSources, intervalsToSkipCompaction);
        int compactionTaskCapacity = this.getCompactionTaskCapacity(dynamicConfig);
        int availableCompactionTaskSlots = this.getAvailableCompactionTaskSlots(compactionTaskCapacity, busyCompactionTaskSlots);
        HashMap<String, AutoCompactionSnapshot.Builder> currentRunAutoCompactionSnapshotBuilders = new HashMap<String, AutoCompactionSnapshot.Builder>();
        int numSubmittedCompactionTasks = this.submitCompactionTasks(compactionConfigs, currentRunAutoCompactionSnapshotBuilders, availableCompactionTaskSlots, iterator);
        CoordinatorRunStats stats = params.getCoordinatorStats();
        stats.add(Stats.Compaction.MAX_SLOTS, compactionTaskCapacity);
        stats.add(Stats.Compaction.AVAILABLE_SLOTS, availableCompactionTaskSlots);
        stats.add(Stats.Compaction.SUBMITTED_TASKS, numSubmittedCompactionTasks);
        this.addCompactionSnapshotStats(currentRunAutoCompactionSnapshotBuilders, iterator, stats);
        return params;
    }

    private void resetCompactionSnapshot() {
        this.autoCompactionSnapshotPerDataSource.set(Collections.emptyMap());
    }

    private boolean cancelTaskIfGranularityChanged(ClientCompactionTaskQuery compactionTaskQuery, DataSourceCompactionConfig dataSourceCompactionConfig) {
        Granularity taskSegmentGranularity;
        if (dataSourceCompactionConfig == null || dataSourceCompactionConfig.getGranularitySpec() == null || compactionTaskQuery.getGranularitySpec() == null) {
            return false;
        }
        Granularity configuredSegmentGranularity = dataSourceCompactionConfig.getGranularitySpec().getSegmentGranularity();
        if (configuredSegmentGranularity.equals(taskSegmentGranularity = compactionTaskQuery.getGranularitySpec().getSegmentGranularity())) {
            return false;
        }
        LOG.info("Cancelling task [%s] as task segmentGranularity is [%s] but compaction config segmentGranularity is [%s]", new Object[]{compactionTaskQuery.getId(), taskSegmentGranularity, configuredSegmentGranularity});
        this.indexingServiceClient.cancelTask(compactionTaskQuery.getId());
        return true;
    }

    private Map<String, List<Interval>> getLockedIntervalsToSkip(List<DataSourceCompactionConfig> compactionConfigs) {
        if (!this.skipLockedIntervals) {
            LOG.info("Not skipping any locked interval for Compaction", new Object[0]);
            return new HashMap<String, List<Interval>>();
        }
        Map<String, Integer> minTaskPriority = compactionConfigs.stream().collect(Collectors.toMap(DataSourceCompactionConfig::getDataSource, DataSourceCompactionConfig::getTaskPriority));
        HashMap<String, List<Interval>> datasourceToLockedIntervals = new HashMap<String, List<Interval>>(this.indexingServiceClient.getLockedIntervals(minTaskPriority));
        LOG.debug("Skipping the following intervals for Compaction as they are currently locked: %s", new Object[]{datasourceToLockedIntervals});
        return datasourceToLockedIntervals;
    }

    @VisibleForTesting
    static int findMaxNumTaskSlotsUsedByOneCompactionTask(@Nullable ClientCompactionTaskQueryTuningConfig tuningConfig) {
        if (CompactSegments.isParallelMode(tuningConfig)) {
            Integer maxNumConcurrentSubTasks = tuningConfig.getMaxNumConcurrentSubTasks();
            return (maxNumConcurrentSubTasks == null ? 1 : maxNumConcurrentSubTasks) + 1;
        }
        return 1;
    }

    @VisibleForTesting
    static boolean isParallelMode(@Nullable ClientCompactionTaskQueryTuningConfig tuningConfig) {
        if (null == tuningConfig) {
            return false;
        }
        boolean useRangePartitions = CompactSegments.useRangePartitions(tuningConfig);
        int minRequiredNumConcurrentSubTasks = useRangePartitions ? 1 : 2;
        return tuningConfig.getMaxNumConcurrentSubTasks() != null && tuningConfig.getMaxNumConcurrentSubTasks() >= minRequiredNumConcurrentSubTasks;
    }

    private static boolean useRangePartitions(ClientCompactionTaskQueryTuningConfig tuningConfig) {
        return tuningConfig.getPartitionsSpec() instanceof DimensionRangePartitionsSpec;
    }

    private static List<TaskStatusPlus> filterNonCompactionTasks(List<TaskStatusPlus> taskStatuses) {
        return taskStatuses.stream().filter(status -> {
            String taskType = status.getType();
            return taskType == null || COMPACTION_TASK_TYPE.equals(taskType);
        }).collect(Collectors.toList());
    }

    private int getCompactionTaskCapacity(CoordinatorCompactionConfig dynamicConfig) {
        int totalWorkerCapacity;
        try {
            totalWorkerCapacity = dynamicConfig.isUseAutoScaleSlots() ? this.indexingServiceClient.getTotalWorkerCapacityWithAutoScale() : this.indexingServiceClient.getTotalWorkerCapacity();
        }
        catch (Exception e) {
            LOG.warn("Failed to get total worker capacity with auto scale slots. Falling back to current capacity count", new Object[0]);
            totalWorkerCapacity = this.indexingServiceClient.getTotalWorkerCapacity();
        }
        return Math.min((int)((double)totalWorkerCapacity * dynamicConfig.getCompactionTaskSlotRatio()), dynamicConfig.getMaxCompactionTaskSlots());
    }

    private int getAvailableCompactionTaskSlots(int compactionTaskCapacity, int busyCompactionTaskSlots) {
        int availableCompactionTaskSlots = busyCompactionTaskSlots > 0 ? Math.max(0, compactionTaskCapacity - busyCompactionTaskSlots) : Math.max(1, compactionTaskCapacity);
        LOG.info("Found [%d] available task slots for compaction out of max compaction task capacity [%d]", new Object[]{availableCompactionTaskSlots, compactionTaskCapacity});
        return availableCompactionTaskSlots;
    }

    private int submitCompactionTasks(Map<String, DataSourceCompactionConfig> compactionConfigs, Map<String, AutoCompactionSnapshot.Builder> currentRunAutoCompactionSnapshotBuilders, int numAvailableCompactionTaskSlots, CompactionSegmentIterator iterator) {
        DataSourceCompactionConfig config;
        if (numAvailableCompactionTaskSlots <= 0) {
            return 0;
        }
        int numSubmittedTasks = 0;
        for (int numCompactionTasksAndSubtasks = 0; iterator.hasNext() && numCompactionTasksAndSubtasks < numAvailableCompactionTaskSlots; numCompactionTasksAndSubtasks += CompactSegments.findMaxNumTaskSlotsUsedByOneCompactionTask(config.getTuningConfig())) {
            List segmentsToCompact = (List)iterator.next();
            if (segmentsToCompact.isEmpty()) {
                throw new ISE("segmentsToCompact is empty?", new Object[0]);
            }
            String dataSourceName = ((DataSegment)segmentsToCompact.get(0)).getDataSource();
            AutoCompactionSnapshot.Builder snapshotBuilder = currentRunAutoCompactionSnapshotBuilders.computeIfAbsent(dataSourceName, k -> new AutoCompactionSnapshot.Builder((String)k, AutoCompactionSnapshot.AutoCompactionScheduleStatus.RUNNING));
            snapshotBuilder.incrementBytesCompacted(segmentsToCompact.stream().mapToLong(DataSegment::getSize).sum());
            snapshotBuilder.incrementIntervalCountCompacted(segmentsToCompact.stream().map(DataSegment::getInterval).distinct().count());
            snapshotBuilder.incrementSegmentCountCompacted(segmentsToCompact.size());
            config = compactionConfigs.get(dataSourceName);
            Granularity segmentGranularityToUse = null;
            if (config.getGranularitySpec() == null || config.getGranularitySpec().getSegmentGranularity() == null) {
                Interval interval = ((DataSegment)segmentsToCompact.get(0)).getInterval();
                if (segmentsToCompact.stream().allMatch(segment -> interval.overlaps((ReadableInterval)segment.getInterval()))) {
                    try {
                        segmentGranularityToUse = GranularityType.fromPeriod((Period)interval.toPeriod()).getDefaultGranularity();
                    }
                    catch (IllegalArgumentException iae) {
                        LOG.warn("Cannot determine segmentGranularity from interval [%s]", new Object[]{interval});
                    }
                } else {
                    LOG.warn("segmentsToCompact does not have the same interval. Fallback to not setting segmentGranularity for auto compaction task", new Object[0]);
                }
            } else {
                segmentGranularityToUse = config.getGranularitySpec().getSegmentGranularity();
            }
            ClientCompactionTaskGranularitySpec granularitySpec = new ClientCompactionTaskGranularitySpec(segmentGranularityToUse, config.getGranularitySpec() != null ? config.getGranularitySpec().getQueryGranularity() : null, config.getGranularitySpec() != null ? config.getGranularitySpec().isRollup() : null);
            ClientCompactionTaskDimensionsSpec dimensionsSpec = config.getDimensionsSpec() != null ? new ClientCompactionTaskDimensionsSpec(config.getDimensionsSpec().getDimensions()) : null;
            ClientCompactionTaskTransformSpec transformSpec = null;
            if (config.getTransformSpec() != null) {
                transformSpec = new ClientCompactionTaskTransformSpec(config.getTransformSpec().getFilter());
            }
            Boolean dropExisting = null;
            if (config.getIoConfig() != null) {
                dropExisting = config.getIoConfig().isDropExisting();
            }
            if ((dropExisting == null || !dropExisting.booleanValue()) && segmentsToCompact.stream().allMatch(DataSegment::isTombstone)) {
                dropExisting = true;
                LOG.info("Forcing dropExisting to true since all segments to compact are tombstones.", new Object[0]);
            }
            String taskId = this.indexingServiceClient.compactSegments("coordinator-issued", segmentsToCompact, config.getTaskPriority(), ClientCompactionTaskQueryTuningConfig.from(config.getTuningConfig(), config.getMaxRowsPerSegment(), config.getMetricsSpec() != null), granularitySpec, dimensionsSpec, config.getMetricsSpec(), transformSpec, dropExisting, this.newAutoCompactionContext(config.getTaskContext()));
            LOG.info("Submitted a compactionTask[%s] for [%d] segments", new Object[]{taskId, segmentsToCompact.size()});
            LOG.infoSegments((Collection)segmentsToCompact, "Compacting segments");
            ++numSubmittedTasks;
        }
        return numSubmittedTasks;
    }

    private Map<String, Object> newAutoCompactionContext(@Nullable Map<String, Object> configuredContext) {
        HashMap<String, Object> newContext = configuredContext == null ? new HashMap<String, Object>() : new HashMap<String, Object>(configuredContext);
        newContext.put(STORE_COMPACTION_STATE_KEY, true);
        return newContext;
    }

    private void addCompactionSnapshotStats(Map<String, AutoCompactionSnapshot.Builder> currentRunAutoCompactionSnapshotBuilders, CompactionSegmentIterator iterator, CoordinatorRunStats stats) {
        AutoCompactionSnapshot.Builder builder;
        while (iterator.hasNext()) {
            List segmentsToCompact = (List)iterator.next();
            if (segmentsToCompact.isEmpty()) continue;
            String dataSourceName = ((DataSegment)segmentsToCompact.get(0)).getDataSource();
            AutoCompactionSnapshot.Builder builder2 = currentRunAutoCompactionSnapshotBuilders.computeIfAbsent(dataSourceName, k -> new AutoCompactionSnapshot.Builder((String)k, AutoCompactionSnapshot.AutoCompactionScheduleStatus.RUNNING));
            builder2.incrementBytesAwaitingCompaction(segmentsToCompact.stream().mapToLong(DataSegment::getSize).sum());
            builder2.incrementIntervalCountAwaitingCompaction(segmentsToCompact.stream().map(DataSegment::getInterval).distinct().count());
            builder2.incrementSegmentCountAwaitingCompaction(segmentsToCompact.size());
        }
        Map<String, CompactionStatistics> allCompactedStatistics = iterator.totalCompactedStatistics();
        for (Map.Entry<String, CompactionStatistics> entry : allCompactedStatistics.entrySet()) {
            String string = entry.getKey();
            CompactionStatistics dataSourceCompactedStatistics = entry.getValue();
            AutoCompactionSnapshot.Builder builder2 = currentRunAutoCompactionSnapshotBuilders.computeIfAbsent(string, k -> new AutoCompactionSnapshot.Builder((String)k, AutoCompactionSnapshot.AutoCompactionScheduleStatus.RUNNING));
            builder2.incrementBytesCompacted(dataSourceCompactedStatistics.getByteSum());
            builder2.incrementSegmentCountCompacted(dataSourceCompactedStatistics.getSegmentNumberCountSum());
            builder2.incrementIntervalCountCompacted(dataSourceCompactedStatistics.getSegmentIntervalCountSum());
        }
        Map<String, CompactionStatistics> allSkippedStatistics = iterator.totalSkippedStatistics();
        for (Map.Entry<String, CompactionStatistics> entry : allSkippedStatistics.entrySet()) {
            String dataSource = entry.getKey();
            CompactionStatistics dataSourceSkippedStatistics = entry.getValue();
            builder = currentRunAutoCompactionSnapshotBuilders.computeIfAbsent(dataSource, k -> new AutoCompactionSnapshot.Builder((String)k, AutoCompactionSnapshot.AutoCompactionScheduleStatus.RUNNING));
            builder.incrementBytesSkipped(dataSourceSkippedStatistics.getByteSum());
            builder.incrementSegmentCountSkipped(dataSourceSkippedStatistics.getSegmentNumberCountSum());
            builder.incrementIntervalCountSkipped(dataSourceSkippedStatistics.getSegmentIntervalCountSum());
        }
        HashMap<String, AutoCompactionSnapshot> hashMap = new HashMap<String, AutoCompactionSnapshot>();
        for (Map.Entry<String, AutoCompactionSnapshot.Builder> autoCompactionSnapshotBuilderEntry : currentRunAutoCompactionSnapshotBuilders.entrySet()) {
            String dataSource = autoCompactionSnapshotBuilderEntry.getKey();
            builder = autoCompactionSnapshotBuilderEntry.getValue();
            AutoCompactionSnapshot autoCompactionSnapshot = builder.build();
            hashMap.put(dataSource, autoCompactionSnapshot);
            this.addStatsForDatasource(dataSource, autoCompactionSnapshot, stats);
        }
        this.autoCompactionSnapshotPerDataSource.set(hashMap);
    }

    private void addStatsForDatasource(String dataSource, AutoCompactionSnapshot autoCompactionSnapshot, CoordinatorRunStats stats) {
        RowKey rowKey = RowKey.of(Dimension.DATASOURCE, dataSource);
        stats.add(Stats.Compaction.PENDING_BYTES, rowKey, autoCompactionSnapshot.getBytesAwaitingCompaction());
        stats.add(Stats.Compaction.PENDING_SEGMENTS, rowKey, autoCompactionSnapshot.getSegmentCountAwaitingCompaction());
        stats.add(Stats.Compaction.PENDING_INTERVALS, rowKey, autoCompactionSnapshot.getIntervalCountAwaitingCompaction());
        stats.add(Stats.Compaction.COMPACTED_BYTES, rowKey, autoCompactionSnapshot.getBytesCompacted());
        stats.add(Stats.Compaction.COMPACTED_SEGMENTS, rowKey, autoCompactionSnapshot.getSegmentCountCompacted());
        stats.add(Stats.Compaction.COMPACTED_INTERVALS, rowKey, autoCompactionSnapshot.getIntervalCountCompacted());
        stats.add(Stats.Compaction.SKIPPED_BYTES, rowKey, autoCompactionSnapshot.getBytesSkipped());
        stats.add(Stats.Compaction.SKIPPED_SEGMENTS, rowKey, autoCompactionSnapshot.getSegmentCountSkipped());
        stats.add(Stats.Compaction.SKIPPED_INTERVALS, rowKey, autoCompactionSnapshot.getIntervalCountSkipped());
    }

    @Nullable
    public Long getTotalSizeOfSegmentsAwaitingCompaction(String dataSource) {
        AutoCompactionSnapshot autoCompactionSnapshot = this.autoCompactionSnapshotPerDataSource.get().get(dataSource);
        if (autoCompactionSnapshot == null) {
            return null;
        }
        return autoCompactionSnapshot.getBytesAwaitingCompaction();
    }

    @Nullable
    public AutoCompactionSnapshot getAutoCompactionSnapshot(String dataSource) {
        return this.autoCompactionSnapshotPerDataSource.get().get(dataSource);
    }

    public Map<String, AutoCompactionSnapshot> getAutoCompactionSnapshot() {
        return this.autoCompactionSnapshotPerDataSource.get();
    }
}

