/*
 * Decompiled with CFR 0.152.
 */
package org.apache.druid.indexing.common.task.batch.parallel;

import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Preconditions;
import com.google.common.base.Stopwatch;
import com.google.common.collect.Maps;
import it.unimi.dsi.fastutil.ints.Int2ObjectMap;
import it.unimi.dsi.fastutil.ints.Int2ObjectOpenHashMap;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.apache.druid.data.input.impl.DimensionsSpec;
import org.apache.druid.indexer.TaskStatus;
import org.apache.druid.indexer.report.TaskReport;
import org.apache.druid.indexing.common.TaskToolbox;
import org.apache.druid.indexing.common.actions.LockListAction;
import org.apache.druid.indexing.common.actions.SurrogateAction;
import org.apache.druid.indexing.common.actions.TaskActionClient;
import org.apache.druid.indexing.common.task.AbstractBatchIndexTask;
import org.apache.druid.indexing.common.task.TaskResource;
import org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexSupervisorTaskClient;
import org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexTuningConfig;
import org.apache.druid.indexing.common.task.batch.parallel.PartialSegmentMergeIOConfig;
import org.apache.druid.indexing.common.task.batch.parallel.PartitionLocation;
import org.apache.druid.indexing.common.task.batch.parallel.PerfectRollupWorkerTask;
import org.apache.druid.indexing.common.task.batch.parallel.PushedSegmentsReport;
import org.apache.druid.java.util.common.FileUtils;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.Pair;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.io.Closer;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.segment.BaseProgressIndicator;
import org.apache.druid.segment.DataSegmentsWithSchemas;
import org.apache.druid.segment.IndexIO;
import org.apache.druid.segment.IndexMerger;
import org.apache.druid.segment.ProgressIndicator;
import org.apache.druid.segment.QueryableIndex;
import org.apache.druid.segment.SchemaPayloadPlus;
import org.apache.druid.segment.SegmentSchemaMapping;
import org.apache.druid.segment.indexing.DataSchema;
import org.apache.druid.segment.loading.DataSegmentPusher;
import org.apache.druid.segment.metadata.FingerprintGenerator;
import org.apache.druid.segment.realtime.appenderator.TaskSegmentSchemaUtil;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.SegmentId;
import org.apache.druid.timeline.partition.ShardSpec;
import org.joda.time.Interval;

abstract class PartialSegmentMergeTask<S extends ShardSpec>
extends PerfectRollupWorkerTask {
    private static final Logger LOG = new Logger(PartialSegmentMergeTask.class);
    private final PartialSegmentMergeIOConfig ioConfig;
    private final int numAttempts;
    private final String subtaskSpecId;

    PartialSegmentMergeTask(@Nullable String id, String groupId, TaskResource taskResource, String supervisorTaskId, @Nullable String subtaskSpecId, DataSchema dataSchema, PartialSegmentMergeIOConfig ioConfig, ParallelIndexTuningConfig tuningConfig, int numAttempts, Map<String, Object> context) {
        super(id, groupId, taskResource, dataSchema, tuningConfig, context, supervisorTaskId);
        Preconditions.checkArgument((!dataSchema.getGranularitySpec().inputIntervals().isEmpty() ? 1 : 0) != 0, (Object)"Missing intervals in granularitySpec");
        this.subtaskSpecId = subtaskSpecId;
        this.ioConfig = ioConfig;
        this.numAttempts = numAttempts;
    }

    @JsonProperty
    public int getNumAttempts() {
        return this.numAttempts;
    }

    @Override
    @JsonProperty
    public String getSubtaskSpecId() {
        return this.subtaskSpecId;
    }

    @Override
    public boolean isReady(TaskActionClient taskActionClient) {
        return true;
    }

    @Override
    public TaskStatus runTask(TaskToolbox toolbox) throws Exception {
        HashMap<Interval, Int2ObjectMap<List<PartitionLocation>>> intervalToBuckets = new HashMap<Interval, Int2ObjectMap<List<PartitionLocation>>>();
        for (PartitionLocation location : this.ioConfig.getPartitionLocations()) {
            ((List)intervalToBuckets.computeIfAbsent(location.getInterval(), k -> new Int2ObjectOpenHashMap()).computeIfAbsent(location.getBucketId(), k -> new ArrayList())).add(location);
        }
        List locks = (List)toolbox.getTaskActionClient().submit(new SurrogateAction(this.getSupervisorTaskId(), new LockListAction()));
        HashMap intervalToVersion = Maps.newHashMapWithExpectedSize((int)locks.size());
        locks.forEach(lock -> {
            if (lock.isRevoked()) {
                throw new ISE("Lock[%s] is revoked", new Object[]{lock});
            }
            String mustBeNull = intervalToVersion.put(lock.getInterval(), lock.getVersion());
            if (mustBeNull != null) {
                throw new ISE("Unexpected state: Two versions([%s], [%s]) for the same interval[%s]", new Object[]{lock.getVersion(), mustBeNull, lock.getInterval()});
            }
        });
        Stopwatch fetchStopwatch = Stopwatch.createStarted();
        Map<Interval, Int2ObjectMap<List<File>>> intervalToUnzippedFiles = this.fetchSegmentFiles(toolbox, intervalToBuckets);
        long fetchTime = fetchStopwatch.elapsed(TimeUnit.SECONDS);
        fetchStopwatch.stop();
        LOG.info("Fetch took [%s] seconds", new Object[]{fetchTime});
        ParallelIndexSupervisorTaskClient taskClient = toolbox.getSupervisorTaskClientProvider().build(this.getSupervisorTaskId(), this.getTuningConfig().getChatHandlerTimeout(), this.getTuningConfig().getChatHandlerNumRetries());
        File persistDir = toolbox.getPersistDir();
        org.apache.commons.io.FileUtils.deleteQuietly((File)persistDir);
        FileUtils.mkdirp((File)persistDir);
        DataSegmentsWithSchemas dataSegmentsWithSchemas = this.mergeAndPushSegments(toolbox, this.getDataSchema(), this.getTuningConfig(), persistDir, intervalToVersion, intervalToUnzippedFiles);
        taskClient.report(new PushedSegmentsReport(this.getId(), Collections.emptySet(), dataSegmentsWithSchemas.getSegments(), new TaskReport.ReportMap(), dataSegmentsWithSchemas.getSegmentSchemaMapping()));
        return TaskStatus.success((String)this.getId());
    }

    private Map<Interval, Int2ObjectMap<List<File>>> fetchSegmentFiles(TaskToolbox toolbox, Map<Interval, Int2ObjectMap<List<PartitionLocation>>> intervalToBuckets) throws IOException {
        File tempDir = toolbox.getIndexingTmpDir();
        org.apache.commons.io.FileUtils.deleteQuietly((File)tempDir);
        FileUtils.mkdirp((File)tempDir);
        HashMap<Interval, Int2ObjectMap<List<File>>> intervalToUnzippedFiles = new HashMap<Interval, Int2ObjectMap<List<File>>>();
        for (Map.Entry<Interval, Int2ObjectMap<List<PartitionLocation>>> entryPerInterval : intervalToBuckets.entrySet()) {
            Interval interval = entryPerInterval.getKey();
            for (Int2ObjectMap.Entry entryPerBucketId : entryPerInterval.getValue().int2ObjectEntrySet()) {
                int bucketId = entryPerBucketId.getIntKey();
                File partitionDir = org.apache.commons.io.FileUtils.getFile((File)tempDir, (String[])new String[]{interval.getStart().toString(), interval.getEnd().toString(), Integer.toString(bucketId)});
                FileUtils.mkdirp((File)partitionDir);
                for (PartitionLocation location : (List)entryPerBucketId.getValue()) {
                    File unzippedDir = toolbox.getShuffleClient().fetchSegmentFile(partitionDir, this.getSupervisorTaskId(), location);
                    ((List)intervalToUnzippedFiles.computeIfAbsent(interval, k -> new Int2ObjectOpenHashMap()).computeIfAbsent(bucketId, k -> new ArrayList())).add(unzippedDir);
                }
            }
        }
        return intervalToUnzippedFiles;
    }

    abstract S createShardSpec(TaskToolbox var1, Interval var2, int var3);

    private DataSegmentsWithSchemas mergeAndPushSegments(TaskToolbox toolbox, DataSchema dataSchema, ParallelIndexTuningConfig tuningConfig, File persistDir, Map<Interval, String> intervalToVersion, Map<Interval, Int2ObjectMap<List<File>>> intervalToUnzippedFiles) throws Exception {
        DataSegmentPusher segmentPusher = toolbox.getSegmentPusher();
        HashSet<DataSegment> pushedSegments = new HashSet<DataSegment>();
        SegmentSchemaMapping segmentSchemaMapping = new SegmentSchemaMapping(1);
        FingerprintGenerator fingerprintGenerator = new FingerprintGenerator(toolbox.getJsonMapper());
        for (Map.Entry<Interval, Int2ObjectMap<List<File>>> entryPerInterval : intervalToUnzippedFiles.entrySet()) {
            Interval interval = entryPerInterval.getKey();
            for (Int2ObjectMap.Entry entryPerBucketId : entryPerInterval.getValue().int2ObjectEntrySet()) {
                long startTime = System.nanoTime();
                int bucketId = entryPerBucketId.getIntKey();
                List segmentFilesToMerge = (List)entryPerBucketId.getValue();
                Pair<File, List<String>> mergedFileAndDimensionNames = PartialSegmentMergeTask.mergeSegmentsInSamePartition(dataSchema, tuningConfig, toolbox.getIndexIO(), toolbox.getIndexMerger(), segmentFilesToMerge, tuningConfig.getMaxNumSegmentsToMerge(), persistDir, 0);
                long mergeFinishTime = System.nanoTime();
                LOG.info("Merged [%d] input segment(s) for interval [%s] in [%,d]ms.", new Object[]{segmentFilesToMerge.size(), interval, (mergeFinishTime - startTime) / 1000000L});
                List metricNames = Arrays.stream(dataSchema.getAggregators()).map(AggregatorFactory::getName).collect(Collectors.toList());
                SegmentId segmentId = SegmentId.of((String)this.getDataSource(), (Interval)interval, (String)((String)Preconditions.checkNotNull((Object)AbstractBatchIndexTask.findVersion(intervalToVersion, interval), (String)"version for interval[%s]", (Object)interval)), (int)0);
                DataSegment segment = segmentPusher.push((File)mergedFileAndDimensionNames.lhs, DataSegment.builder((SegmentId)segmentId).shardSpec(this.createShardSpec(toolbox, interval, bucketId)).dimensions((List)mergedFileAndDimensionNames.rhs).metrics(metricNames).projections(dataSchema.getProjectionNames()).build(), false);
                long pushFinishTime = System.nanoTime();
                pushedSegments.add(segment);
                if (toolbox.getCentralizedTableSchemaConfig().isEnabled()) {
                    SchemaPayloadPlus schemaPayloadPlus = TaskSegmentSchemaUtil.getSegmentSchema((File)((File)mergedFileAndDimensionNames.lhs), (IndexIO)toolbox.getIndexIO());
                    segmentSchemaMapping.addSchema(segment.getId(), schemaPayloadPlus, fingerprintGenerator.generateFingerprint(schemaPayloadPlus.getSchemaPayload(), this.getDataSource(), 1));
                }
                LOG.info("Built segment [%s] for interval [%s] (from [%d] input segment(s) in [%,d]ms) of size [%d] bytes and pushed ([%,d]ms) to deep storage [%s].", new Object[]{segment.getId(), interval, segmentFilesToMerge.size(), (mergeFinishTime - startTime) / 1000000L, segment.getSize(), (pushFinishTime - mergeFinishTime) / 1000000L, segment.getLoadSpec()});
            }
        }
        if (toolbox.getCentralizedTableSchemaConfig().isEnabled()) {
            LOG.info("SegmentSchema for the pushed segments is [%s]", new Object[]{segmentSchemaMapping});
        }
        return new DataSegmentsWithSchemas(pushedSegments, (SegmentSchemaMapping)(segmentSchemaMapping.isNonEmpty() ? segmentSchemaMapping : null));
    }

    private static Pair<File, List<String>> mergeSegmentsInSamePartition(DataSchema dataSchema, ParallelIndexTuningConfig tuningConfig, IndexIO indexIO, IndexMerger merger, List<File> indexes, int maxNumSegmentsToMerge, File baseOutDir, int outDirSuffix) throws IOException {
        int suffix = outDirSuffix;
        ArrayList<File> mergedFiles = new ArrayList<File>();
        List dimensionNames = null;
        for (int i = 0; i < indexes.size(); i += maxNumSegmentsToMerge) {
            List<File> filesToMerge = indexes.subList(i, Math.min(i + maxNumSegmentsToMerge, indexes.size()));
            ArrayList<QueryableIndex> indexesToMerge = new ArrayList<QueryableIndex>(filesToMerge.size());
            Closer indexCleaner = Closer.create();
            for (File file : filesToMerge) {
                QueryableIndex queryableIndex = indexIO.loadIndex(file);
                indexesToMerge.add(queryableIndex);
                indexCleaner.register(() -> {
                    queryableIndex.close();
                    file.delete();
                });
            }
            if (maxNumSegmentsToMerge >= indexes.size()) {
                dimensionNames = IndexMerger.getMergedDimensionsFromQueryableIndexes(indexesToMerge, (DimensionsSpec)dataSchema.getDimensionsSpec());
            }
            File outDir = new File(baseOutDir, StringUtils.format((String)"merged_%d", (Object[])new Object[]{suffix++}));
            mergedFiles.add(merger.mergeQueryableIndex(indexesToMerge, dataSchema.getGranularitySpec().isRollup(), dataSchema.getAggregators(), dataSchema.getDimensionsSpec(), outDir, tuningConfig.getIndexSpec(), tuningConfig.getIndexSpecForIntermediatePersists(), (ProgressIndicator)new BaseProgressIndicator(), tuningConfig.getSegmentWriteOutMediumFactory(), tuningConfig.getMaxColumnsToMerge()));
            indexCleaner.close();
        }
        if (mergedFiles.size() == 1) {
            return Pair.of((Object)((File)mergedFiles.get(0)), (Object)((List)Preconditions.checkNotNull(dimensionNames, (Object)"dimensionNames")));
        }
        return PartialSegmentMergeTask.mergeSegmentsInSamePartition(dataSchema, tuningConfig, indexIO, merger, mergedFiles, maxNumSegmentsToMerge, baseOutDir, suffix);
    }
}

