/*
 * Decompiled with CFR 0.152.
 */
package org.apache.druid.server.coordinator.duty;

import com.fasterxml.jackson.annotation.JacksonInject;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.annotations.VisibleForTesting;
import com.google.inject.Inject;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.apache.druid.client.indexing.ClientCompactionTaskDimensionsSpec;
import org.apache.druid.client.indexing.ClientCompactionTaskGranularitySpec;
import org.apache.druid.client.indexing.ClientCompactionTaskQuery;
import org.apache.druid.client.indexing.ClientCompactionTaskQueryTuningConfig;
import org.apache.druid.client.indexing.ClientCompactionTaskTransformSpec;
import org.apache.druid.client.indexing.IndexingServiceClient;
import org.apache.druid.client.indexing.TaskPayloadResponse;
import org.apache.druid.indexer.TaskStatusPlus;
import org.apache.druid.indexer.partitions.DimensionRangePartitionsSpec;
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.granularity.Granularity;
import org.apache.druid.java.util.common.granularity.GranularityType;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.server.coordinator.AutoCompactionSnapshot;
import org.apache.druid.server.coordinator.CompactionStatistics;
import org.apache.druid.server.coordinator.CoordinatorCompactionConfig;
import org.apache.druid.server.coordinator.CoordinatorStats;
import org.apache.druid.server.coordinator.DataSourceCompactionConfig;
import org.apache.druid.server.coordinator.DruidCoordinatorConfig;
import org.apache.druid.server.coordinator.DruidCoordinatorRuntimeParams;
import org.apache.druid.server.coordinator.duty.CompactionSegmentIterator;
import org.apache.druid.server.coordinator.duty.CompactionSegmentSearchPolicy;
import org.apache.druid.server.coordinator.duty.CoordinatorCustomDuty;
import org.apache.druid.server.coordinator.duty.NewestSegmentFirstPolicy;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.VersionedIntervalTimeline;
import org.joda.time.Interval;
import org.joda.time.Period;
import org.joda.time.ReadableInterval;

public class CompactSegments
implements CoordinatorCustomDuty {
    static final String COMPACTION_TASK_COUNT = "compactTaskCount";
    static final String AVAILABLE_COMPACTION_TASK_SLOT = "availableCompactionTaskSlot";
    static final String MAX_COMPACTION_TASK_SLOT = "maxCompactionTaskSlot";
    static final String TOTAL_SIZE_OF_SEGMENTS_SKIPPED = "segmentSizeSkippedCompact";
    static final String TOTAL_COUNT_OF_SEGMENTS_SKIPPED = "segmentCountSkippedCompact";
    static final String TOTAL_INTERVAL_OF_SEGMENTS_SKIPPED = "segmentIntervalSkippedCompact";
    static final String TOTAL_SIZE_OF_SEGMENTS_AWAITING = "segmentSizeWaitCompact";
    static final String TOTAL_COUNT_OF_SEGMENTS_AWAITING = "segmentCountWaitCompact";
    static final String TOTAL_INTERVAL_OF_SEGMENTS_AWAITING = "segmentIntervalWaitCompact";
    static final String TOTAL_SIZE_OF_SEGMENTS_COMPACTED = "segmentSizeCompacted";
    static final String TOTAL_COUNT_OF_SEGMENTS_COMPACTED = "segmentCountCompacted";
    static final String TOTAL_INTERVAL_OF_SEGMENTS_COMPACTED = "segmentIntervalCompacted";
    public static final String COMPACTION_TASK_TYPE = "compact";
    public static final String STORE_COMPACTION_STATE_KEY = "storeCompactionState";
    private static final Logger LOG = new Logger(CompactSegments.class);
    private final CompactionSegmentSearchPolicy policy;
    private final boolean skipLockedIntervals;
    private final IndexingServiceClient indexingServiceClient;
    private final AtomicReference<Map<String, AutoCompactionSnapshot>> autoCompactionSnapshotPerDataSource = new AtomicReference();

    @Inject
    @JsonCreator
    public CompactSegments(@JacksonInject DruidCoordinatorConfig config, @JacksonInject ObjectMapper objectMapper, @JacksonInject IndexingServiceClient indexingServiceClient) {
        this.policy = new NewestSegmentFirstPolicy(objectMapper);
        this.indexingServiceClient = indexingServiceClient;
        this.skipLockedIntervals = config.getCompactionSkipLockedIntervals();
        this.autoCompactionSnapshotPerDataSource.set(new HashMap());
        LOG.info("Scheduling compaction with skipLockedIntervals [%s]", new Object[]{this.skipLockedIntervals});
    }

    @VisibleForTesting
    public boolean isSkipLockedIntervals() {
        return this.skipLockedIntervals;
    }

    @VisibleForTesting
    IndexingServiceClient getIndexingServiceClient() {
        return this.indexingServiceClient;
    }

    @Override
    public DruidCoordinatorRuntimeParams run(DruidCoordinatorRuntimeParams params) {
        LOG.info("Compact segments", new Object[0]);
        CoordinatorCompactionConfig dynamicConfig = params.getCoordinatorCompactionConfig();
        CoordinatorStats stats = new CoordinatorStats();
        List<DataSourceCompactionConfig> compactionConfigList = dynamicConfig.getCompactionConfigs();
        if (dynamicConfig.getMaxCompactionTaskSlots() > 0) {
            Map<String, VersionedIntervalTimeline<String, DataSegment>> dataSources = params.getUsedSegmentsTimelinesPerDataSource();
            if (compactionConfigList != null && !compactionConfigList.isEmpty()) {
                int totalCapacity;
                Map<String, DataSourceCompactionConfig> compactionConfigs = compactionConfigList.stream().collect(Collectors.toMap(DataSourceCompactionConfig::getDataSource, Function.identity()));
                List<TaskStatusPlus> compactionTasks = CompactSegments.filterNonCompactionTasks(this.indexingServiceClient.getActiveTasks());
                HashMap<String, List<Interval>> intervalsToSkipCompaction = new HashMap<String, List<Interval>>();
                int numEstimatedNonCompleteCompactionTasks = 0;
                for (TaskStatusPlus status : compactionTasks) {
                    TaskPayloadResponse response = this.indexingServiceClient.getTaskPayload(status.getId());
                    if (response == null) {
                        throw new ISE("Got a null paylord from overlord for task[%s]", new Object[]{status.getId()});
                    }
                    if (COMPACTION_TASK_TYPE.equals(response.getPayload().getType())) {
                        Granularity configuredSegmentGranularity;
                        ClientCompactionTaskQuery compactionTaskQuery = (ClientCompactionTaskQuery)response.getPayload();
                        DataSourceCompactionConfig dataSourceCompactionConfig = compactionConfigs.get(status.getDataSource());
                        if (dataSourceCompactionConfig != null && dataSourceCompactionConfig.getGranularitySpec() != null && (configuredSegmentGranularity = dataSourceCompactionConfig.getGranularitySpec().getSegmentGranularity()) != null && compactionTaskQuery.getGranularitySpec() != null && !configuredSegmentGranularity.equals(compactionTaskQuery.getGranularitySpec().getSegmentGranularity())) {
                            LOG.info("Canceled task[%s] as task segmentGranularity is [%s] but compaction config segmentGranularity is [%s]", new Object[]{status.getId(), compactionTaskQuery.getGranularitySpec().getSegmentGranularity(), configuredSegmentGranularity});
                            this.indexingServiceClient.cancelTask(status.getId());
                            continue;
                        }
                        Interval interval = compactionTaskQuery.getIoConfig().getInputSpec().getInterval();
                        intervalsToSkipCompaction.computeIfAbsent(status.getDataSource(), k -> new ArrayList()).add(interval);
                        numEstimatedNonCompleteCompactionTasks += CompactSegments.findMaxNumTaskSlotsUsedByOneCompactionTask(compactionTaskQuery.getTuningConfig());
                        continue;
                    }
                    throw new ISE("task[%s] is not a compactionTask", new Object[]{status.getId()});
                }
                this.getLockedIntervalsToSkip(compactionConfigList).forEach((dataSource, intervals) -> intervalsToSkipCompaction.computeIfAbsent((String)dataSource, ds -> new ArrayList()).addAll(intervals));
                CompactionSegmentIterator iterator = this.policy.reset(compactionConfigs, dataSources, intervalsToSkipCompaction);
                if (dynamicConfig.isUseAutoScaleSlots()) {
                    try {
                        totalCapacity = this.indexingServiceClient.getTotalWorkerCapacityWithAutoScale();
                    }
                    catch (Exception e) {
                        LOG.warn("Failed to get total worker capacity with auto scale slots. Falling back to current capacity count", new Object[0]);
                        totalCapacity = this.indexingServiceClient.getTotalWorkerCapacity();
                    }
                } else {
                    totalCapacity = this.indexingServiceClient.getTotalWorkerCapacity();
                }
                int compactionTaskCapacity = (int)Math.min((double)totalCapacity * dynamicConfig.getCompactionTaskSlotRatio(), (double)dynamicConfig.getMaxCompactionTaskSlots());
                int numAvailableCompactionTaskSlots = numEstimatedNonCompleteCompactionTasks > 0 ? Math.max(0, compactionTaskCapacity - numEstimatedNonCompleteCompactionTasks) : Math.max(1, compactionTaskCapacity);
                LOG.info("Found [%d] available task slots for compaction out of [%d] max compaction task capacity", new Object[]{numAvailableCompactionTaskSlots, compactionTaskCapacity});
                stats.addToGlobalStat(AVAILABLE_COMPACTION_TASK_SLOT, numAvailableCompactionTaskSlots);
                stats.addToGlobalStat(MAX_COMPACTION_TASK_SLOT, compactionTaskCapacity);
                HashMap<String, AutoCompactionSnapshot.Builder> currentRunAutoCompactionSnapshotBuilders = new HashMap<String, AutoCompactionSnapshot.Builder>();
                if (numAvailableCompactionTaskSlots > 0) {
                    stats.accumulate(this.doRun(compactionConfigs, currentRunAutoCompactionSnapshotBuilders, numAvailableCompactionTaskSlots, iterator));
                } else {
                    stats.accumulate(this.makeStats(currentRunAutoCompactionSnapshotBuilders, 0, iterator));
                }
            } else {
                LOG.info("compactionConfig is empty. Skip.", new Object[0]);
                this.autoCompactionSnapshotPerDataSource.set(new HashMap());
            }
        } else {
            LOG.info("maxCompactionTaskSlots was set to 0. Skip compaction", new Object[0]);
            this.autoCompactionSnapshotPerDataSource.set(new HashMap());
        }
        return params.buildFromExisting().withCoordinatorStats(stats).build();
    }

    private Map<String, List<Interval>> getLockedIntervalsToSkip(List<DataSourceCompactionConfig> compactionConfigs) {
        if (!this.skipLockedIntervals) {
            LOG.info("Not skipping any locked interval for Compaction", new Object[0]);
            return new HashMap<String, List<Interval>>();
        }
        Map<String, Integer> minTaskPriority = compactionConfigs.stream().collect(Collectors.toMap(DataSourceCompactionConfig::getDataSource, DataSourceCompactionConfig::getTaskPriority));
        HashMap<String, List<Interval>> datasourceToLockedIntervals = new HashMap<String, List<Interval>>(this.indexingServiceClient.getLockedIntervals(minTaskPriority));
        LOG.debug("Skipping the following intervals for Compaction as they are currently locked: %s", new Object[]{datasourceToLockedIntervals});
        return datasourceToLockedIntervals;
    }

    @VisibleForTesting
    static int findMaxNumTaskSlotsUsedByOneCompactionTask(@Nullable ClientCompactionTaskQueryTuningConfig tuningConfig) {
        if (CompactSegments.isParallelMode(tuningConfig)) {
            Integer maxNumConcurrentSubTasks = tuningConfig.getMaxNumConcurrentSubTasks();
            return (maxNumConcurrentSubTasks == null ? 1 : maxNumConcurrentSubTasks) + 1;
        }
        return 1;
    }

    @VisibleForTesting
    static boolean isParallelMode(@Nullable ClientCompactionTaskQueryTuningConfig tuningConfig) {
        if (null == tuningConfig) {
            return false;
        }
        boolean useRangePartitions = CompactSegments.useRangePartitions(tuningConfig);
        int minRequiredNumConcurrentSubTasks = useRangePartitions ? 1 : 2;
        return tuningConfig.getMaxNumConcurrentSubTasks() != null && tuningConfig.getMaxNumConcurrentSubTasks() >= minRequiredNumConcurrentSubTasks;
    }

    private static boolean useRangePartitions(ClientCompactionTaskQueryTuningConfig tuningConfig) {
        return tuningConfig.getPartitionsSpec() instanceof DimensionRangePartitionsSpec;
    }

    private static List<TaskStatusPlus> filterNonCompactionTasks(List<TaskStatusPlus> taskStatuses) {
        return taskStatuses.stream().filter(status -> {
            String taskType = status.getType();
            return taskType == null || COMPACTION_TASK_TYPE.equals(taskType);
        }).collect(Collectors.toList());
    }

    private CoordinatorStats doRun(Map<String, DataSourceCompactionConfig> compactionConfigs, Map<String, AutoCompactionSnapshot.Builder> currentRunAutoCompactionSnapshotBuilders, int numAvailableCompactionTaskSlots, CompactionSegmentIterator iterator) {
        DataSourceCompactionConfig config;
        int numSubmittedTasks = 0;
        for (int numCompactionTasksAndSubtasks = 0; iterator.hasNext() && numCompactionTasksAndSubtasks < numAvailableCompactionTaskSlots; numCompactionTasksAndSubtasks += CompactSegments.findMaxNumTaskSlotsUsedByOneCompactionTask(config.getTuningConfig())) {
            List segmentsToCompact = (List)iterator.next();
            if (!segmentsToCompact.isEmpty()) {
                String dataSourceName = ((DataSegment)segmentsToCompact.get(0)).getDataSource();
                AutoCompactionSnapshot.Builder snapshotBuilder = currentRunAutoCompactionSnapshotBuilders.computeIfAbsent(dataSourceName, k -> new AutoCompactionSnapshot.Builder((String)k, AutoCompactionSnapshot.AutoCompactionScheduleStatus.RUNNING));
                snapshotBuilder.incrementBytesCompacted(segmentsToCompact.stream().mapToLong(DataSegment::getSize).sum());
                snapshotBuilder.incrementIntervalCountCompacted(segmentsToCompact.stream().map(DataSegment::getInterval).distinct().count());
                snapshotBuilder.incrementSegmentCountCompacted(segmentsToCompact.size());
                config = compactionConfigs.get(dataSourceName);
                Granularity segmentGranularityToUse = null;
                if (config.getGranularitySpec() == null || config.getGranularitySpec().getSegmentGranularity() == null) {
                    Interval interval = ((DataSegment)segmentsToCompact.get(0)).getInterval();
                    if (segmentsToCompact.stream().allMatch(segment -> interval.overlaps((ReadableInterval)segment.getInterval()))) {
                        try {
                            segmentGranularityToUse = GranularityType.fromPeriod((Period)interval.toPeriod()).getDefaultGranularity();
                        }
                        catch (IAE iae) {
                            LOG.warn("Cannot determine segmentGranularity from interval [%s]", new Object[]{interval});
                        }
                    } else {
                        LOG.warn("segmentsToCompact does not have the same interval. Fallback to not setting segmentGranularity for auto compaction task", new Object[0]);
                    }
                } else {
                    segmentGranularityToUse = config.getGranularitySpec().getSegmentGranularity();
                }
                ClientCompactionTaskGranularitySpec granularitySpec = new ClientCompactionTaskGranularitySpec(segmentGranularityToUse, config.getGranularitySpec() != null ? config.getGranularitySpec().getQueryGranularity() : null, config.getGranularitySpec() != null ? config.getGranularitySpec().isRollup() : null);
                ClientCompactionTaskDimensionsSpec dimensionsSpec = config.getDimensionsSpec() != null ? new ClientCompactionTaskDimensionsSpec(config.getDimensionsSpec().getDimensions()) : null;
                ClientCompactionTaskTransformSpec transformSpec = null;
                if (config.getTransformSpec() != null) {
                    transformSpec = new ClientCompactionTaskTransformSpec(config.getTransformSpec().getFilter());
                }
                Boolean dropExisting = null;
                if (config.getIoConfig() != null) {
                    dropExisting = config.getIoConfig().isDropExisting();
                }
                if ((dropExisting == null || !dropExisting.booleanValue()) && segmentsToCompact.stream().allMatch(dataSegment -> dataSegment.isTombstone())) {
                    dropExisting = true;
                    LOG.info("Forcing dropExisting to %s since all segments to compact are tombstones", new Object[]{dropExisting});
                }
                String taskId = this.indexingServiceClient.compactSegments("coordinator-issued", segmentsToCompact, config.getTaskPriority(), ClientCompactionTaskQueryTuningConfig.from(config.getTuningConfig(), config.getMaxRowsPerSegment(), config.getMetricsSpec() != null), granularitySpec, dimensionsSpec, config.getMetricsSpec(), transformSpec, dropExisting, this.newAutoCompactionContext(config.getTaskContext()));
                LOG.info("Submitted a compactionTask[%s] for %s segments", new Object[]{taskId, segmentsToCompact.size()});
                LOG.infoSegments((Collection)segmentsToCompact, "Compacting segments");
                ++numSubmittedTasks;
                continue;
            }
            throw new ISE("segmentsToCompact is empty?", new Object[0]);
        }
        return this.makeStats(currentRunAutoCompactionSnapshotBuilders, numSubmittedTasks, iterator);
    }

    private Map<String, Object> newAutoCompactionContext(@Nullable Map<String, Object> configuredContext) {
        HashMap<String, Object> newContext = configuredContext == null ? new HashMap<String, Object>() : new HashMap<String, Object>(configuredContext);
        newContext.put(STORE_COMPACTION_STATE_KEY, true);
        return newContext;
    }

    private CoordinatorStats makeStats(Map<String, AutoCompactionSnapshot.Builder> currentRunAutoCompactionSnapshotBuilders, int numCompactionTasks, CompactionSegmentIterator iterator) {
        String dataSource;
        AutoCompactionSnapshot.Builder builder;
        HashMap<String, AutoCompactionSnapshot> currentAutoCompactionSnapshotPerDataSource = new HashMap<String, AutoCompactionSnapshot>();
        CoordinatorStats stats = new CoordinatorStats();
        stats.addToGlobalStat(COMPACTION_TASK_COUNT, numCompactionTasks);
        while (iterator.hasNext()) {
            List segmentsToCompact = (List)iterator.next();
            if (segmentsToCompact.isEmpty()) continue;
            String dataSourceName = ((DataSegment)segmentsToCompact.get(0)).getDataSource();
            AutoCompactionSnapshot.Builder snapshotBuilder = currentRunAutoCompactionSnapshotBuilders.computeIfAbsent(dataSourceName, k -> new AutoCompactionSnapshot.Builder((String)k, AutoCompactionSnapshot.AutoCompactionScheduleStatus.RUNNING));
            snapshotBuilder.incrementBytesAwaitingCompaction(segmentsToCompact.stream().mapToLong(DataSegment::getSize).sum());
            snapshotBuilder.incrementIntervalCountAwaitingCompaction(segmentsToCompact.stream().map(DataSegment::getInterval).distinct().count());
            snapshotBuilder.incrementSegmentCountAwaitingCompaction(segmentsToCompact.size());
        }
        Map<String, CompactionStatistics> allCompactedStatistics = iterator.totalCompactedStatistics();
        for (Map.Entry<String, CompactionStatistics> compactionStatisticsEntry : allCompactedStatistics.entrySet()) {
            String string = compactionStatisticsEntry.getKey();
            CompactionStatistics dataSourceCompactedStatistics = compactionStatisticsEntry.getValue();
            builder = currentRunAutoCompactionSnapshotBuilders.computeIfAbsent(string, k -> new AutoCompactionSnapshot.Builder((String)k, AutoCompactionSnapshot.AutoCompactionScheduleStatus.RUNNING));
            builder.incrementBytesCompacted(dataSourceCompactedStatistics.getByteSum());
            builder.incrementSegmentCountCompacted(dataSourceCompactedStatistics.getSegmentNumberCountSum());
            builder.incrementIntervalCountCompacted(dataSourceCompactedStatistics.getSegmentIntervalCountSum());
        }
        Map<String, CompactionStatistics> allSkippedStatistics = iterator.totalSkippedStatistics();
        for (Map.Entry<String, CompactionStatistics> entry : allSkippedStatistics.entrySet()) {
            dataSource = entry.getKey();
            CompactionStatistics dataSourceSkippedStatistics = entry.getValue();
            AutoCompactionSnapshot.Builder builder2 = currentRunAutoCompactionSnapshotBuilders.computeIfAbsent(dataSource, k -> new AutoCompactionSnapshot.Builder((String)k, AutoCompactionSnapshot.AutoCompactionScheduleStatus.RUNNING));
            builder2.incrementBytesSkipped(dataSourceSkippedStatistics.getByteSum());
            builder2.incrementSegmentCountSkipped(dataSourceSkippedStatistics.getSegmentNumberCountSum());
            builder2.incrementIntervalCountSkipped(dataSourceSkippedStatistics.getSegmentIntervalCountSum());
        }
        for (Map.Entry<String, Object> entry : currentRunAutoCompactionSnapshotBuilders.entrySet()) {
            dataSource = entry.getKey();
            builder = (AutoCompactionSnapshot.Builder)entry.getValue();
            AutoCompactionSnapshot autoCompactionSnapshot = builder.build();
            currentAutoCompactionSnapshotPerDataSource.put(dataSource, autoCompactionSnapshot);
            stats.addToDataSourceStat(TOTAL_SIZE_OF_SEGMENTS_AWAITING, dataSource, autoCompactionSnapshot.getBytesAwaitingCompaction());
            stats.addToDataSourceStat(TOTAL_COUNT_OF_SEGMENTS_AWAITING, dataSource, autoCompactionSnapshot.getSegmentCountAwaitingCompaction());
            stats.addToDataSourceStat(TOTAL_INTERVAL_OF_SEGMENTS_AWAITING, dataSource, autoCompactionSnapshot.getIntervalCountAwaitingCompaction());
            stats.addToDataSourceStat(TOTAL_SIZE_OF_SEGMENTS_COMPACTED, dataSource, autoCompactionSnapshot.getBytesCompacted());
            stats.addToDataSourceStat(TOTAL_COUNT_OF_SEGMENTS_COMPACTED, dataSource, autoCompactionSnapshot.getSegmentCountCompacted());
            stats.addToDataSourceStat(TOTAL_INTERVAL_OF_SEGMENTS_COMPACTED, dataSource, autoCompactionSnapshot.getIntervalCountCompacted());
            stats.addToDataSourceStat(TOTAL_SIZE_OF_SEGMENTS_SKIPPED, dataSource, autoCompactionSnapshot.getBytesSkipped());
            stats.addToDataSourceStat(TOTAL_COUNT_OF_SEGMENTS_SKIPPED, dataSource, autoCompactionSnapshot.getSegmentCountSkipped());
            stats.addToDataSourceStat(TOTAL_INTERVAL_OF_SEGMENTS_SKIPPED, dataSource, autoCompactionSnapshot.getIntervalCountSkipped());
        }
        this.autoCompactionSnapshotPerDataSource.set(currentAutoCompactionSnapshotPerDataSource);
        return stats;
    }

    @Nullable
    public Long getTotalSizeOfSegmentsAwaitingCompaction(String dataSource) {
        AutoCompactionSnapshot autoCompactionSnapshot = this.autoCompactionSnapshotPerDataSource.get().get(dataSource);
        if (autoCompactionSnapshot == null) {
            return null;
        }
        return autoCompactionSnapshot.getBytesAwaitingCompaction();
    }

    @Nullable
    public AutoCompactionSnapshot getAutoCompactionSnapshot(String dataSource) {
        return this.autoCompactionSnapshotPerDataSource.get().get(dataSource);
    }

    public Map<String, AutoCompactionSnapshot> getAutoCompactionSnapshot() {
        return this.autoCompactionSnapshotPerDataSource.get();
    }
}

