/*
 * Decompiled with CFR 0.152.
 */
package org.apache.druid.indexing.common.task;

import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.errorprone.annotations.concurrent.GuardedBy;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.apache.druid.data.input.FirehoseFactory;
import org.apache.druid.data.input.InputFormat;
import org.apache.druid.data.input.InputRow;
import org.apache.druid.data.input.InputSource;
import org.apache.druid.data.input.InputSourceReader;
import org.apache.druid.indexer.TaskStatus;
import org.apache.druid.indexing.common.LockGranularity;
import org.apache.druid.indexing.common.TaskLock;
import org.apache.druid.indexing.common.TaskLockType;
import org.apache.druid.indexing.common.TaskToolbox;
import org.apache.druid.indexing.common.actions.RetrieveUsedSegmentsAction;
import org.apache.druid.indexing.common.actions.TaskActionClient;
import org.apache.druid.indexing.common.actions.TimeChunkLockTryAcquireAction;
import org.apache.druid.indexing.common.config.TaskConfig;
import org.apache.druid.indexing.common.task.AbstractTask;
import org.apache.druid.indexing.common.task.FilteringCloseableInputRowIterator;
import org.apache.druid.indexing.common.task.IndexTask;
import org.apache.druid.indexing.common.task.TaskLockHelper;
import org.apache.druid.indexing.common.task.TaskResource;
import org.apache.druid.indexing.common.task.TaskResourceCleaner;
import org.apache.druid.indexing.firehose.IngestSegmentFirehoseFactory;
import org.apache.druid.indexing.firehose.WindowedSegmentId;
import org.apache.druid.indexing.input.InputRowSchemas;
import org.apache.druid.indexing.overlord.Segments;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.JodaUtils;
import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.java.util.common.granularity.Granularity;
import org.apache.druid.java.util.common.granularity.GranularityType;
import org.apache.druid.java.util.common.granularity.IntervalsByGranularity;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.java.util.common.parsers.CloseableIterator;
import org.apache.druid.query.SegmentDescriptor;
import org.apache.druid.segment.handoff.SegmentHandoffNotifier;
import org.apache.druid.segment.incremental.ParseExceptionHandler;
import org.apache.druid.segment.incremental.RowIngestionMeters;
import org.apache.druid.segment.indexing.DataSchema;
import org.apache.druid.segment.indexing.granularity.GranularitySpec;
import org.apache.druid.timeline.CompactionState;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.Partitions;
import org.apache.druid.timeline.VersionedIntervalTimeline;
import org.joda.time.Interval;
import org.joda.time.Period;
import org.joda.time.ReadableInterval;

public abstract class AbstractBatchIndexTask
extends AbstractTask {
    private static final Logger log = new Logger(AbstractBatchIndexTask.class);
    protected boolean segmentAvailabilityConfirmationCompleted = false;
    @GuardedBy(value="this")
    private final TaskResourceCleaner resourceCloserOnAbnormalExit = new TaskResourceCleaner();
    @GuardedBy(value="this")
    private boolean stopped = false;
    private TaskLockHelper taskLockHelper;

    protected AbstractBatchIndexTask(String id, String dataSource, Map<String, Object> context) {
        super(id, dataSource, context);
    }

    protected AbstractBatchIndexTask(String id, @Nullable String groupId, @Nullable TaskResource taskResource, String dataSource, @Nullable Map<String, Object> context) {
        super(id, groupId, taskResource, dataSource, context);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public TaskStatus run(TaskToolbox toolbox) throws Exception {
        if (this.taskLockHelper == null && !this.isReady(toolbox.getTaskActionClient())) {
            throw new ISE("Cannot start; not ready!", new Object[0]);
        }
        AbstractBatchIndexTask abstractBatchIndexTask = this;
        synchronized (abstractBatchIndexTask) {
            if (this.stopped) {
                String errMsg = "Attempting to run a task that has been stopped. See overlord & task logs for more details.";
                return TaskStatus.failure((String)this.getId(), (String)errMsg);
            }
            Thread currentThread = Thread.currentThread();
            this.resourceCloserOnAbnormalExit.register(config -> currentThread.interrupt());
        }
        return this.runTask(toolbox);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void stopGracefully(TaskConfig taskConfig) {
        AbstractBatchIndexTask abstractBatchIndexTask = this;
        synchronized (abstractBatchIndexTask) {
            this.stopped = true;
            this.resourceCloserOnAbnormalExit.clean(taskConfig);
        }
    }

    public static FilteringCloseableInputRowIterator inputSourceReader(File tmpDir, DataSchema dataSchema, InputSource inputSource, @Nullable InputFormat inputFormat, Predicate<InputRow> rowFilter, RowIngestionMeters ingestionMeters, ParseExceptionHandler parseExceptionHandler) throws IOException {
        InputSourceReader inputSourceReader = dataSchema.getTransformSpec().decorate(inputSource.reader(InputRowSchemas.fromDataSchema(dataSchema), inputFormat, tmpDir));
        return new FilteringCloseableInputRowIterator((CloseableIterator<InputRow>)inputSourceReader.read(), rowFilter, ingestionMeters, parseExceptionHandler);
    }

    protected static Predicate<InputRow> defaultRowFilter(GranularitySpec granularitySpec) {
        return inputRow -> {
            if (inputRow == null) {
                return false;
            }
            Optional optInterval = granularitySpec.bucketInterval(inputRow.getTimestamp());
            return optInterval.isPresent();
        };
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected void registerResourceCloserOnAbnormalExit(Consumer<TaskConfig> cleaner) {
        AbstractBatchIndexTask abstractBatchIndexTask = this;
        synchronized (abstractBatchIndexTask) {
            this.resourceCloserOnAbnormalExit.register(cleaner);
        }
    }

    public abstract TaskStatus runTask(TaskToolbox var1) throws Exception;

    public abstract boolean requireLockExistingSegments();

    public abstract List<DataSegment> findSegmentsToLock(TaskActionClient var1, List<Interval> var2) throws IOException;

    public abstract boolean isPerfectRollup();

    @Nullable
    public abstract Granularity getSegmentGranularity();

    @Override
    public int getPriority() {
        return this.getContextValue("priority", 50);
    }

    public TaskLockHelper getTaskLockHelper() {
        return (TaskLockHelper)Preconditions.checkNotNull((Object)this.taskLockHelper, (Object)"taskLockHelper is not initialized yet");
    }

    public boolean determineLockGranularityAndTryLock(TaskActionClient client, List<Interval> intervals) throws IOException {
        boolean forceTimeChunkLock = this.getContextValue("forceTimeChunkLock", true);
        if (forceTimeChunkLock) {
            log.info("[%s] is set to true in task context. Use timeChunk lock", new Object[]{"forceTimeChunkLock"});
            this.taskLockHelper = new TaskLockHelper(false);
            if (!intervals.isEmpty()) {
                return this.tryTimeChunkLock(client, intervals);
            }
            return true;
        }
        if (!intervals.isEmpty()) {
            LockGranularityDetermineResult result = this.determineSegmentGranularity(client, intervals);
            this.taskLockHelper = new TaskLockHelper(result.lockGranularity == LockGranularity.SEGMENT);
            return this.tryLockWithDetermineResult(client, result);
        }
        return true;
    }

    boolean determineLockGranularityAndTryLockWithSegments(TaskActionClient client, List<DataSegment> segments, BiConsumer<LockGranularity, List<DataSegment>> segmentCheckFunction) throws IOException {
        boolean forceTimeChunkLock = this.getContextValue("forceTimeChunkLock", true);
        if (forceTimeChunkLock) {
            log.info("[%s] is set to true in task context. Use timeChunk lock", new Object[]{"forceTimeChunkLock"});
            this.taskLockHelper = new TaskLockHelper(false);
            segmentCheckFunction.accept(LockGranularity.TIME_CHUNK, segments);
            return this.tryTimeChunkLock(client, new ArrayList<Interval>(segments.stream().map(DataSegment::getInterval).collect(Collectors.toSet())));
        }
        LockGranularityDetermineResult result = this.determineSegmentGranularity(segments);
        this.taskLockHelper = new TaskLockHelper(result.lockGranularity == LockGranularity.SEGMENT);
        segmentCheckFunction.accept(result.lockGranularity, segments);
        return this.tryLockWithDetermineResult(client, result);
    }

    private LockGranularityDetermineResult determineSegmentGranularity(TaskActionClient client, List<Interval> intervals) throws IOException {
        if (this.requireLockExistingSegments()) {
            if (this.isPerfectRollup()) {
                log.info("Using timeChunk lock for perfect rollup", new Object[0]);
                return new LockGranularityDetermineResult(LockGranularity.TIME_CHUNK, intervals, null);
            }
            if (!intervals.isEmpty()) {
                return this.determineSegmentGranularity(this.findSegmentsToLock(client, intervals));
            }
            log.info("Using segment lock for empty intervals", new Object[0]);
            return new LockGranularityDetermineResult(LockGranularity.SEGMENT, null, Collections.emptyList());
        }
        log.info("Using segment lock since we don't have to lock existing segments", new Object[0]);
        return new LockGranularityDetermineResult(LockGranularity.SEGMENT, null, Collections.emptyList());
    }

    private boolean tryLockWithDetermineResult(TaskActionClient client, LockGranularityDetermineResult result) throws IOException {
        if (result.lockGranularity == LockGranularity.TIME_CHUNK) {
            return this.tryTimeChunkLock(client, (List)Preconditions.checkNotNull((Object)result.intervals, (Object)"intervals"));
        }
        return this.taskLockHelper.verifyAndLockExistingSegments(client, (List)Preconditions.checkNotNull((Object)result.segments, (Object)"segments"));
    }

    protected boolean tryTimeChunkLock(TaskActionClient client, List<Interval> intervals) throws IOException {
        Iterator intervalIterator;
        Granularity segmentGranularity = this.getSegmentGranularity();
        if (segmentGranularity == null) {
            intervalIterator = JodaUtils.condenseIntervals(intervals).iterator();
        } else {
            IntervalsByGranularity intervalsByGranularity = new IntervalsByGranularity(intervals, segmentGranularity);
            intervalIterator = JodaUtils.condensedIntervalsIterator((Iterator)intervalsByGranularity.granularityIntervalsIterator());
        }
        Interval prev = null;
        while (intervalIterator.hasNext()) {
            Interval cur = (Interval)intervalIterator.next();
            if (prev != null && cur.equals((Object)prev)) continue;
            prev = cur;
            TaskLock lock = client.submit(new TimeChunkLockTryAcquireAction(TaskLockType.EXCLUSIVE, cur));
            if (lock != null) continue;
            return false;
        }
        return true;
    }

    private LockGranularityDetermineResult determineSegmentGranularity(List<DataSegment> segments) {
        if (segments.isEmpty()) {
            log.info("Using segment lock for empty segments", new Object[0]);
            return new LockGranularityDetermineResult(LockGranularity.SEGMENT, null, Collections.emptyList());
        }
        if (this.requireLockExistingSegments()) {
            Granularity granularityFromSegments = AbstractBatchIndexTask.findGranularityFromSegments(segments);
            Granularity segmentGranularityFromSpec = this.getSegmentGranularity();
            List intervals = segments.stream().map(DataSegment::getInterval).collect(Collectors.toList());
            if (granularityFromSegments == null || segmentGranularityFromSpec != null && (!granularityFromSegments.equals(segmentGranularityFromSpec) || segments.stream().anyMatch(segment -> !segmentGranularityFromSpec.isAligned(segment.getInterval())))) {
                log.info("Detected segmentGranularity change. Using timeChunk lock", new Object[0]);
                return new LockGranularityDetermineResult(LockGranularity.TIME_CHUNK, intervals, null);
            }
            VersionedIntervalTimeline timeline = VersionedIntervalTimeline.forSegments(segments);
            Set segmentsToLock = timeline.findNonOvershadowedObjectsInInterval(JodaUtils.umbrellaInterval(intervals), Partitions.ONLY_COMPLETE);
            log.info("No segmentGranularity change detected and it's not perfect rollup. Using segment lock", new Object[0]);
            return new LockGranularityDetermineResult(LockGranularity.SEGMENT, null, new ArrayList(segmentsToLock));
        }
        log.info("Using segment lock since we don't have to lock existing segments", new Object[0]);
        return new LockGranularityDetermineResult(LockGranularity.SEGMENT, null, Collections.emptyList());
    }

    public static boolean isGuaranteedRollup(IndexTask.IndexIOConfig ioConfig, IndexTask.IndexTuningConfig tuningConfig) {
        Preconditions.checkArgument((!tuningConfig.isForceGuaranteedRollup() || !ioConfig.isAppendToExisting() ? 1 : 0) != 0, (Object)"Perfect rollup cannot be guaranteed when appending to existing dataSources");
        return tuningConfig.isForceGuaranteedRollup();
    }

    public static Function<Set<DataSegment>, Set<DataSegment>> compactionStateAnnotateFunction(boolean storeCompactionState, TaskToolbox toolbox, IndexTask.IndexTuningConfig tuningConfig, GranularitySpec granularitySpec) {
        if (storeCompactionState) {
            Map indexSpecMap = tuningConfig.getIndexSpec().asMap(toolbox.getJsonMapper());
            Map granularitySpecMap = granularitySpec.asMap(toolbox.getJsonMapper());
            CompactionState compactionState = new CompactionState(tuningConfig.getPartitionsSpec(), indexSpecMap, granularitySpecMap);
            return segments -> segments.stream().map(s -> s.withLastCompactionState(compactionState)).collect(Collectors.toSet());
        }
        return Function.identity();
    }

    public static Set<DataSegment> getUsedSegmentsWithinInterval(TaskToolbox toolbox, String dataSource, List<Interval> intervals) throws IOException {
        HashSet<DataSegment> segmentsFoundForDrop = new HashSet<DataSegment>();
        List condensedIntervals = JodaUtils.condenseIntervals(intervals);
        if (!intervals.isEmpty()) {
            Collection<DataSegment> usedSegment = toolbox.getTaskActionClient().submit(new RetrieveUsedSegmentsAction(dataSource, null, condensedIntervals, Segments.ONLY_VISIBLE));
            block0: for (DataSegment segment : usedSegment) {
                for (Interval interval : condensedIntervals) {
                    if (!interval.contains((ReadableInterval)segment.getInterval())) continue;
                    segmentsFoundForDrop.add(segment);
                    continue block0;
                }
            }
        }
        return segmentsFoundForDrop;
    }

    @Nullable
    static Granularity findGranularityFromSegments(List<DataSegment> segments) {
        if (segments.isEmpty()) {
            return null;
        }
        Period firstSegmentPeriod = segments.get(0).getInterval().toPeriod();
        boolean allHasSameGranularity = segments.stream().allMatch(segment -> firstSegmentPeriod.equals((Object)segment.getInterval().toPeriod()));
        if (allHasSameGranularity) {
            return GranularityType.fromPeriod((Period)firstSegmentPeriod).getDefaultGranularity();
        }
        return null;
    }

    protected static List<DataSegment> findInputSegments(String dataSource, TaskActionClient actionClient, List<Interval> intervalsToRead, FirehoseFactory firehoseFactory) throws IOException {
        if (firehoseFactory instanceof IngestSegmentFirehoseFactory) {
            List<WindowedSegmentId> inputSegments = ((IngestSegmentFirehoseFactory)firehoseFactory).getSegments();
            if (inputSegments == null) {
                Interval inputInterval = (Interval)Preconditions.checkNotNull((Object)((IngestSegmentFirehoseFactory)firehoseFactory).getInterval(), (Object)"input interval");
                return ImmutableList.copyOf(actionClient.submit(new RetrieveUsedSegmentsAction(dataSource, inputInterval, null, Segments.ONLY_VISIBLE)));
            }
            List inputSegmentIds = inputSegments.stream().map(WindowedSegmentId::getSegmentId).collect(Collectors.toList());
            Collection<DataSegment> dataSegmentsInIntervals = actionClient.submit(new RetrieveUsedSegmentsAction(dataSource, null, inputSegments.stream().flatMap(windowedSegmentId -> windowedSegmentId.getIntervals().stream()).collect(Collectors.toSet()), Segments.ONLY_VISIBLE));
            return dataSegmentsInIntervals.stream().filter(segment -> inputSegmentIds.contains(segment.getId().toString())).collect(Collectors.toList());
        }
        return ImmutableList.copyOf(actionClient.submit(new RetrieveUsedSegmentsAction(dataSource, null, intervalsToRead, Segments.ONLY_VISIBLE)));
    }

    /*
     * Enabled aggressive block sorting
     * Enabled unnecessary exception pruning
     * Enabled aggressive exception aggregation
     */
    protected boolean waitForSegmentAvailability(TaskToolbox toolbox, List<DataSegment> segmentsToWaitFor, long waitTimeout) {
        if (segmentsToWaitFor.isEmpty()) {
            log.info("Asked to wait for segments to be available, but I wasn't provided with any segments.", new Object[0]);
            return true;
        }
        if (waitTimeout < 0L) {
            log.warn("Asked to wait for availability for < 0 seconds?! Requested waitTimeout: [%s]", new Object[]{waitTimeout});
            return false;
        }
        log.info("Waiting for [%d] segments to be loaded by the cluster...", new Object[]{segmentsToWaitFor.size()});
        try (SegmentHandoffNotifier notifier = toolbox.getSegmentHandoffNotifierFactory().createSegmentHandoffNotifier(segmentsToWaitFor.get(0).getDataSource());){
            ListeningExecutorService exec = Execs.directExecutor();
            CountDownLatch doneSignal = new CountDownLatch(segmentsToWaitFor.size());
            notifier.start();
            for (DataSegment s : segmentsToWaitFor) {
                notifier.registerSegmentHandoffCallback(new SegmentDescriptor(s.getInterval(), s.getVersion(), s.getShardSpec().getPartitionNum()), (Executor)exec, () -> {
                    log.debug("Confirmed availability for [%s]. Removing from list of segments to wait for", new Object[]{s.getId()});
                    doneSignal.countDown();
                });
            }
            boolean bl = doneSignal.await(waitTimeout, TimeUnit.MILLISECONDS);
            return bl;
        }
        catch (InterruptedException e) {
            log.warn("Interrupted while waiting for segment availablity; Unable to confirm availability!", new Object[0]);
            Thread.currentThread().interrupt();
            return false;
        }
    }

    private static class LockGranularityDetermineResult {
        private final LockGranularity lockGranularity;
        @Nullable
        private final List<Interval> intervals;
        @Nullable
        private final List<DataSegment> segments;

        private LockGranularityDetermineResult(LockGranularity lockGranularity, @Nullable List<Interval> intervals, @Nullable List<DataSegment> segments) {
            this.lockGranularity = lockGranularity;
            this.intervals = intervals;
            this.segments = segments;
        }
    }
}

