/*
 * Decompiled with CFR 0.152.
 */
package org.apache.druid.indexing.common.task.batch.parallel;

import com.google.common.annotations.VisibleForTesting;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
import javax.annotation.Nullable;
import org.apache.commons.lang3.mutable.MutableObject;
import org.apache.druid.data.input.InputSource;
import org.apache.druid.data.input.InputSplit;
import org.apache.druid.data.input.impl.SplittableInputSource;
import org.apache.druid.indexing.common.Counters;
import org.apache.druid.indexing.common.TaskLockType;
import org.apache.druid.indexing.common.TaskToolbox;
import org.apache.druid.indexing.common.actions.TaskLocks;
import org.apache.druid.indexing.common.task.AbstractBatchIndexTask;
import org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexIOConfig;
import org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexIngestionSpec;
import org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexPhaseRunner;
import org.apache.druid.indexing.common.task.batch.parallel.PushedSegmentsReport;
import org.apache.druid.indexing.common.task.batch.parallel.SinglePhaseSubTask;
import org.apache.druid.indexing.common.task.batch.parallel.SinglePhaseSubTaskSpec;
import org.apache.druid.indexing.common.task.batch.parallel.SubTaskSpec;
import org.apache.druid.indexing.common.task.batch.parallel.TaskMonitor;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.NonnullPair;
import org.apache.druid.segment.metadata.CentralizedDatasourceSchemaConfig;
import org.apache.druid.segment.realtime.appenderator.SegmentIdWithShardSpec;
import org.apache.druid.timeline.SegmentId;
import org.apache.druid.timeline.partition.BuildingNumberedShardSpec;
import org.apache.druid.timeline.partition.ShardSpec;
import org.joda.time.DateTime;
import org.joda.time.Interval;

public class SinglePhaseParallelIndexTaskRunner
extends ParallelIndexPhaseRunner<SinglePhaseSubTask, PushedSegmentsReport> {
    public static final String CTX_USE_LINEAGE_BASED_SEGMENT_ALLOCATION_KEY = "useLineageBasedSegmentAllocation";
    @Deprecated
    static final boolean LEGACY_DEFAULT_USE_LINEAGE_BASED_SEGMENT_ALLOCATION = false;
    public static final boolean DEFAULT_USE_LINEAGE_BASED_SEGMENT_ALLOCATION = true;
    private static final String PHASE_NAME = "segment generation";
    private final ConcurrentHashMap<Interval, AtomicInteger> partitionNumCountersPerInterval = new ConcurrentHashMap();
    private final ConcurrentHashMap<String, List<String>> sequenceToSegmentIds = new ConcurrentHashMap();
    private final ParallelIndexIngestionSpec ingestionSchema;
    private final SplittableInputSource<?> baseInputSource;
    private CentralizedDatasourceSchemaConfig centralizedDatasourceSchemaConfig;

    SinglePhaseParallelIndexTaskRunner(TaskToolbox toolbox, String taskId, String groupId, String baseSubtaskSpecName, ParallelIndexIngestionSpec ingestionSchema, Map<String, Object> context, CentralizedDatasourceSchemaConfig centralizedDatasourceSchemaConfig) {
        super(toolbox, taskId, groupId, baseSubtaskSpecName, ingestionSchema.getTuningConfig(), context);
        this.ingestionSchema = ingestionSchema;
        this.baseInputSource = (SplittableInputSource)ingestionSchema.getIOConfig().getNonNullInputSource(toolbox);
        this.centralizedDatasourceSchemaConfig = centralizedDatasourceSchemaConfig;
    }

    @VisibleForTesting
    SinglePhaseParallelIndexTaskRunner(TaskToolbox toolbox, String taskId, String groupId, ParallelIndexIngestionSpec ingestionSchema, Map<String, Object> context, CentralizedDatasourceSchemaConfig centralizedDatasourceSchemaConfig) {
        this(toolbox, taskId, groupId, taskId, ingestionSchema, context, centralizedDatasourceSchemaConfig);
    }

    @Override
    public String getName() {
        return PHASE_NAME;
    }

    @VisibleForTesting
    ParallelIndexIngestionSpec getIngestionSchema() {
        return this.ingestionSchema;
    }

    @Override
    @VisibleForTesting
    Iterator<SubTaskSpec<SinglePhaseSubTask>> subTaskSpecIterator() throws IOException {
        return this.baseInputSource.createSplits(this.ingestionSchema.getIOConfig().getInputFormat(), this.getTuningConfig().getSplitHintSpec()).map(this::newTaskSpec).iterator();
    }

    @Override
    int estimateTotalNumSubTasks() throws IOException {
        return this.baseInputSource.estimateNumSplits(this.ingestionSchema.getIOConfig().getInputFormat(), this.getTuningConfig().getSplitHintSpec());
    }

    @VisibleForTesting
    SubTaskSpec<SinglePhaseSubTask> newTaskSpec(InputSplit split) {
        InputSource inputSource = this.baseInputSource.withSplit(split);
        HashMap<String, Object> subtaskContext = new HashMap<String, Object>(this.getContext());
        return new SinglePhaseSubTaskSpec(this.getBaseSubtaskSpecName() + "_" + this.getAndIncrementNextSpecId(), this.getGroupId(), this.getTaskId(), new ParallelIndexIngestionSpec(this.ingestionSchema.getDataSchema(), new ParallelIndexIOConfig(inputSource, this.ingestionSchema.getIOConfig().getInputFormat(), this.ingestionSchema.getIOConfig().isAppendToExisting(), this.ingestionSchema.getIOConfig().isDropExisting()), this.ingestionSchema.getTuningConfig()), subtaskContext, split);
    }

    @Deprecated
    public SegmentIdWithShardSpec allocateNewSegment(String dataSource, DateTime timestamp) throws IOException {
        NonnullPair<Interval, String> intervalAndVersion = this.findIntervalAndVersion(timestamp);
        int partitionNum = Counters.getAndIncrementInt(this.partitionNumCountersPerInterval, intervalAndVersion.lhs);
        return new SegmentIdWithShardSpec(dataSource, (Interval)intervalAndVersion.lhs, (String)intervalAndVersion.rhs, (ShardSpec)new BuildingNumberedShardSpec(partitionNum));
    }

    public SegmentIdWithShardSpec allocateNewSegment(String dataSource, DateTime timestamp, String sequenceName, @Nullable String prevSegmentId) throws IOException {
        NonnullPair<Interval, String> intervalAndVersion = this.findIntervalAndVersion(timestamp);
        MutableObject segmentIdHolder = new MutableObject();
        this.sequenceToSegmentIds.compute(sequenceName, (k, v) -> {
            SegmentIdWithShardSpec newSegmentId;
            List segmentIds;
            int prevSegmentIdIndex;
            if (prevSegmentId == null) {
                prevSegmentIdIndex = -1;
                segmentIds = v == null ? new ArrayList() : v;
            } else {
                segmentIds = v;
                if (segmentIds == null) {
                    throw new ISE("Can't find previous segmentIds for sequence[%s]", new Object[]{sequenceName});
                }
                prevSegmentIdIndex = segmentIds.indexOf(prevSegmentId);
                if (prevSegmentIdIndex == -1) {
                    throw new ISE("Can't find previously allocated segmentId[%s] for sequence[%s]", new Object[]{prevSegmentId, sequenceName});
                }
            }
            int nextSegmentIdIndex = prevSegmentIdIndex + 1;
            if (nextSegmentIdIndex < segmentIds.size()) {
                SegmentId segmentId = SegmentId.tryParse((String)dataSource, (String)((String)segmentIds.get(nextSegmentIdIndex)));
                if (segmentId == null) {
                    throw new ISE("Illegal segmentId format [%s]", new Object[]{segmentIds.get(nextSegmentIdIndex)});
                }
                newSegmentId = new SegmentIdWithShardSpec(segmentId.getDataSource(), segmentId.getInterval(), segmentId.getVersion(), (ShardSpec)new BuildingNumberedShardSpec(segmentId.getPartitionNum()));
            } else {
                int partitionNum = Counters.getAndIncrementInt(this.partitionNumCountersPerInterval, intervalAndVersion.lhs);
                newSegmentId = new SegmentIdWithShardSpec(dataSource, (Interval)intervalAndVersion.lhs, (String)intervalAndVersion.rhs, (ShardSpec)new BuildingNumberedShardSpec(partitionNum));
                segmentIds.add(newSegmentId.toString());
            }
            segmentIdHolder.setValue((Object)newSegmentId);
            return segmentIds;
        });
        return (SegmentIdWithShardSpec)segmentIdHolder.getValue();
    }

    NonnullPair<Interval, String> findIntervalAndVersion(DateTime timestamp) throws IOException {
        TaskLockType taskLockType = TaskLocks.determineLockTypeForAppend(this.getContext());
        return AbstractBatchIndexTask.findIntervalAndVersion(this.getToolbox(), this.ingestionSchema, timestamp, taskLockType);
    }

    @Override
    public Runnable getSubtaskCompletionCallback(TaskMonitor.SubTaskCompleteEvent<?> event) {
        return () -> {
            if (event.getLastState().isSuccess()) {
                this.sequenceToSegmentIds.remove(event.getSpec().getId());
            }
        };
    }
}

