/*
 * Decompiled with CFR 0.152.
 */
package org.apache.druid.indexing.common.task.batch.parallel;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.TreeSet;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.apache.commons.lang3.mutable.MutableObject;
import org.apache.druid.data.input.FiniteFirehoseFactory;
import org.apache.druid.data.input.FirehoseFactory;
import org.apache.druid.data.input.FirehoseFactoryToInputSourceAdaptor;
import org.apache.druid.data.input.InputSource;
import org.apache.druid.data.input.InputSplit;
import org.apache.druid.data.input.impl.SplittableInputSource;
import org.apache.druid.indexing.common.Counters;
import org.apache.druid.indexing.common.TaskLock;
import org.apache.druid.indexing.common.TaskLockType;
import org.apache.druid.indexing.common.TaskToolbox;
import org.apache.druid.indexing.common.actions.LockListAction;
import org.apache.druid.indexing.common.actions.TimeChunkLockTryAcquireAction;
import org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexIOConfig;
import org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexIngestionSpec;
import org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexPhaseRunner;
import org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexSupervisorTask;
import org.apache.druid.indexing.common.task.batch.parallel.PushedSegmentsReport;
import org.apache.druid.indexing.common.task.batch.parallel.SinglePhaseSubTask;
import org.apache.druid.indexing.common.task.batch.parallel.SinglePhaseSubTaskSpec;
import org.apache.druid.indexing.common.task.batch.parallel.SubTaskSpec;
import org.apache.druid.indexing.common.task.batch.parallel.TaskMonitor;
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.NonnullPair;
import org.apache.druid.segment.indexing.granularity.GranularitySpec;
import org.apache.druid.segment.realtime.appenderator.SegmentIdWithShardSpec;
import org.apache.druid.timeline.SegmentId;
import org.apache.druid.timeline.partition.BuildingNumberedShardSpec;
import org.apache.druid.timeline.partition.ShardSpec;
import org.joda.time.DateTime;
import org.joda.time.Interval;

public class SinglePhaseParallelIndexTaskRunner
extends ParallelIndexPhaseRunner<SinglePhaseSubTask, PushedSegmentsReport> {
    public static final String CTX_USE_LINEAGE_BASED_SEGMENT_ALLOCATION_KEY = "useLineageBasedSegmentAllocation";
    @Deprecated
    static final boolean LEGACY_DEFAULT_USE_LINEAGE_BASED_SEGMENT_ALLOCATION = false;
    public static final boolean DEFAULT_USE_LINEAGE_BASED_SEGMENT_ALLOCATION = true;
    private static final String PHASE_NAME = "segment generation";
    private final ConcurrentHashMap<Interval, AtomicInteger> partitionNumCountersPerInterval = new ConcurrentHashMap();
    private final ConcurrentHashMap<String, List<String>> sequenceToSegmentIds = new ConcurrentHashMap();
    private final ParallelIndexIngestionSpec ingestionSchema;
    private final SplittableInputSource<?> baseInputSource;

    SinglePhaseParallelIndexTaskRunner(TaskToolbox toolbox, String taskId, String groupId, String baseSubtaskSpecName, ParallelIndexIngestionSpec ingestionSchema, Map<String, Object> context) {
        super(toolbox, taskId, groupId, baseSubtaskSpecName, ingestionSchema.getTuningConfig(), context);
        this.ingestionSchema = ingestionSchema;
        this.baseInputSource = (SplittableInputSource)ingestionSchema.getIOConfig().getNonNullInputSource(ingestionSchema.getDataSchema().getParser());
    }

    @VisibleForTesting
    SinglePhaseParallelIndexTaskRunner(TaskToolbox toolbox, String taskId, String groupId, ParallelIndexIngestionSpec ingestionSchema, Map<String, Object> context) {
        this(toolbox, taskId, groupId, taskId, ingestionSchema, context);
    }

    @Override
    public String getName() {
        return PHASE_NAME;
    }

    @VisibleForTesting
    ParallelIndexIngestionSpec getIngestionSchema() {
        return this.ingestionSchema;
    }

    @Override
    @VisibleForTesting
    Iterator<SubTaskSpec<SinglePhaseSubTask>> subTaskSpecIterator() throws IOException {
        return this.baseInputSource.createSplits(this.ingestionSchema.getIOConfig().getInputFormat(), this.getTuningConfig().getSplitHintSpec()).map(this::newTaskSpec).iterator();
    }

    @Override
    int estimateTotalNumSubTasks() throws IOException {
        return this.baseInputSource.estimateNumSplits(this.ingestionSchema.getIOConfig().getInputFormat(), this.getTuningConfig().getSplitHintSpec());
    }

    @VisibleForTesting
    SubTaskSpec<SinglePhaseSubTask> newTaskSpec(InputSplit split) {
        InputSource inputSource;
        FiniteFirehoseFactory firehoseFactory;
        if (this.baseInputSource instanceof FirehoseFactoryToInputSourceAdaptor) {
            firehoseFactory = ((FirehoseFactoryToInputSourceAdaptor)this.baseInputSource).getFirehoseFactory().withSplit(split);
            inputSource = null;
        } else {
            firehoseFactory = null;
            inputSource = this.baseInputSource.withSplit(split);
        }
        HashMap<String, Object> subtaskContext = new HashMap<String, Object>(this.getContext());
        return new SinglePhaseSubTaskSpec(this.getBaseSubtaskSpecName() + "_" + this.getAndIncrementNextSpecId(), this.getGroupId(), this.getTaskId(), new ParallelIndexIngestionSpec(this.ingestionSchema.getDataSchema(), new ParallelIndexIOConfig((FirehoseFactory)firehoseFactory, inputSource, this.ingestionSchema.getIOConfig().getInputFormat(), this.ingestionSchema.getIOConfig().isAppendToExisting(), this.ingestionSchema.getIOConfig().isDropExisting()), this.ingestionSchema.getTuningConfig()), subtaskContext, split);
    }

    @Deprecated
    public SegmentIdWithShardSpec allocateNewSegment(String dataSource, DateTime timestamp) throws IOException {
        NonnullPair<Interval, String> intervalAndVersion = this.findIntervalAndVersion(timestamp);
        int partitionNum = Counters.getAndIncrementInt(this.partitionNumCountersPerInterval, intervalAndVersion.lhs);
        return new SegmentIdWithShardSpec(dataSource, (Interval)intervalAndVersion.lhs, (String)intervalAndVersion.rhs, (ShardSpec)new BuildingNumberedShardSpec(partitionNum));
    }

    public SegmentIdWithShardSpec allocateNewSegment(String dataSource, DateTime timestamp, String sequenceName, @Nullable String prevSegmentId) throws IOException {
        NonnullPair<Interval, String> intervalAndVersion = this.findIntervalAndVersion(timestamp);
        MutableObject segmentIdHolder = new MutableObject();
        this.sequenceToSegmentIds.compute(sequenceName, (k, v) -> {
            SegmentIdWithShardSpec newSegmentId;
            List segmentIds;
            int prevSegmentIdIndex;
            if (prevSegmentId == null) {
                prevSegmentIdIndex = -1;
                segmentIds = v == null ? new ArrayList() : v;
            } else {
                segmentIds = v;
                if (segmentIds == null) {
                    throw new ISE("Can't find previous segmentIds for sequence[%s]", new Object[]{sequenceName});
                }
                prevSegmentIdIndex = segmentIds.indexOf(prevSegmentId);
                if (prevSegmentIdIndex == -1) {
                    throw new ISE("Can't find previously allocated segmentId[%s] for sequence[%s]", new Object[]{prevSegmentId, sequenceName});
                }
            }
            int nextSegmentIdIndex = prevSegmentIdIndex + 1;
            if (nextSegmentIdIndex < segmentIds.size()) {
                SegmentId segmentId = SegmentId.tryParse((String)dataSource, (String)((String)segmentIds.get(nextSegmentIdIndex)));
                if (segmentId == null) {
                    throw new ISE("Illegal segmentId format [%s]", new Object[]{segmentIds.get(nextSegmentIdIndex)});
                }
                newSegmentId = new SegmentIdWithShardSpec(segmentId.getDataSource(), segmentId.getInterval(), segmentId.getVersion(), (ShardSpec)new BuildingNumberedShardSpec(segmentId.getPartitionNum()));
            } else {
                int partitionNum = Counters.getAndIncrementInt(this.partitionNumCountersPerInterval, intervalAndVersion.lhs);
                newSegmentId = new SegmentIdWithShardSpec(dataSource, (Interval)intervalAndVersion.lhs, (String)intervalAndVersion.rhs, (ShardSpec)new BuildingNumberedShardSpec(partitionNum));
                segmentIds.add(newSegmentId.toString());
            }
            segmentIdHolder.setValue((Object)newSegmentId);
            return segmentIds;
        });
        return (SegmentIdWithShardSpec)segmentIdHolder.getValue();
    }

    private NonnullPair<Interval, String> findIntervalAndVersion(DateTime timestamp) throws IOException {
        String version;
        Interval interval;
        GranularitySpec granularitySpec = this.getIngestionSchema().getDataSchema().getGranularitySpec();
        TreeSet materializedBucketIntervals = granularitySpec.materializedBucketIntervals();
        List<TaskLock> locks = this.getToolbox().getTaskActionClient().submit(new LockListAction());
        TaskLock revokedLock = locks.stream().filter(TaskLock::isRevoked).findAny().orElse(null);
        if (revokedLock != null) {
            throw new ISE("Lock revoked: [%s]", new Object[]{revokedLock});
        }
        Map<Interval, String> versions = locks.stream().collect(Collectors.toMap(TaskLock::getInterval, TaskLock::getVersion));
        if (!materializedBucketIntervals.isEmpty()) {
            Optional maybeInterval = granularitySpec.bucketInterval(timestamp);
            if (!maybeInterval.isPresent()) {
                throw new IAE("Could not find interval for timestamp [%s]", new Object[]{timestamp});
            }
            interval = (Interval)maybeInterval.get();
            if (!materializedBucketIntervals.contains(interval)) {
                throw new ISE("Unspecified interval[%s] in granularitySpec[%s]", new Object[]{interval, granularitySpec});
            }
            version = ParallelIndexSupervisorTask.findVersion(versions, interval);
            if (version == null) {
                throw new ISE("Cannot find a version for interval[%s]", new Object[]{interval});
            }
        } else {
            interval = granularitySpec.getSegmentGranularity().bucket(timestamp);
            version = ParallelIndexSupervisorTask.findVersion(versions, interval);
            if (version == null) {
                TaskLock lock = (TaskLock)Preconditions.checkNotNull((Object)this.getToolbox().getTaskActionClient().submit(new TimeChunkLockTryAcquireAction(TaskLockType.EXCLUSIVE, interval)), (String)"Cannot acquire a lock for interval[%s]", (Object[])new Object[]{interval});
                version = lock.getVersion();
            }
        }
        return new NonnullPair((Object)interval, (Object)version);
    }

    @Override
    public Runnable getSubtaskCompletionCallback(TaskMonitor.SubTaskCompleteEvent<?> event) {
        return () -> {
            if (event.getLastState().isSuccess()) {
                this.sequenceToSegmentIds.remove(event.getSpec().getId());
            }
        };
    }
}

