/*
 * Decompiled with CFR 0.152.
 */
package org.apache.druid.indexing.common.actions;

import com.google.inject.Inject;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.BlockingDeque;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.druid.guice.ManageLifecycle;
import org.apache.druid.indexing.common.LockGranularity;
import org.apache.druid.indexing.common.actions.SegmentAllocateAction;
import org.apache.druid.indexing.common.actions.SegmentAllocateRequest;
import org.apache.druid.indexing.common.actions.SegmentAllocateResult;
import org.apache.druid.indexing.common.task.IndexTaskUtils;
import org.apache.druid.indexing.common.task.Task;
import org.apache.druid.indexing.overlord.IndexerMetadataStorageCoordinator;
import org.apache.druid.indexing.overlord.Segments;
import org.apache.druid.indexing.overlord.TaskLockbox;
import org.apache.druid.indexing.overlord.config.TaskLockConfig;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.concurrent.ScheduledExecutorFactory;
import org.apache.druid.java.util.common.granularity.Granularity;
import org.apache.druid.java.util.common.guava.Comparators;
import org.apache.druid.java.util.common.lifecycle.LifecycleStart;
import org.apache.druid.java.util.common.lifecycle.LifecycleStop;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
import org.apache.druid.segment.realtime.appenderator.SegmentIdWithShardSpec;
import org.apache.druid.timeline.DataSegment;
import org.joda.time.Interval;
import org.joda.time.ReadableInterval;

@ManageLifecycle
public class SegmentAllocationQueue {
    private static final Logger log = new Logger(SegmentAllocationQueue.class);
    private static final int MAX_QUEUE_SIZE = 2000;
    private static final int MAX_BATCH_SIZE = 500;
    private final long maxWaitTimeMillis;
    private final TaskLockbox taskLockbox;
    private final ScheduledExecutorService executor;
    private final IndexerMetadataStorageCoordinator metadataStorage;
    private final AtomicBoolean isLeader = new AtomicBoolean(false);
    private final ServiceEmitter emitter;
    private final ConcurrentHashMap<AllocateRequestKey, AllocateRequestBatch> keyToBatch = new ConcurrentHashMap();
    private final BlockingDeque<AllocateRequestKey> processingQueue = new LinkedBlockingDeque<AllocateRequestKey>(2000);

    @Inject
    public SegmentAllocationQueue(TaskLockbox taskLockbox, TaskLockConfig taskLockConfig, IndexerMetadataStorageCoordinator metadataStorage, ServiceEmitter emitter, ScheduledExecutorFactory executorFactory) {
        this.emitter = emitter;
        this.taskLockbox = taskLockbox;
        this.metadataStorage = metadataStorage;
        this.maxWaitTimeMillis = taskLockConfig.getBatchAllocationWaitTime();
        this.executor = taskLockConfig.isBatchSegmentAllocation() ? executorFactory.create(1, "SegmentAllocQueue-%s") : null;
    }

    @LifecycleStart
    public void start() {
        if (this.isEnabled()) {
            log.info("Initializing segment allocation queue.", new Object[0]);
            this.scheduleQueuePoll(this.maxWaitTimeMillis);
        }
    }

    @LifecycleStop
    public void stop() {
        if (this.isEnabled()) {
            log.info("Tearing down segment allocation queue.", new Object[0]);
            this.executor.shutdownNow();
        }
    }

    public void becomeLeader() {
        if (!this.isLeader.compareAndSet(false, true)) {
            log.info("Already the leader. Queue processing has started.", new Object[0]);
        } else if (this.isEnabled()) {
            log.info("Elected leader. Starting queue processing.", new Object[0]);
        } else {
            log.info("Elected leader but batched segment allocation is disabled. Segment allocation queue will not be used.", new Object[0]);
        }
    }

    public void stopBeingLeader() {
        if (!this.isLeader.compareAndSet(true, false)) {
            log.info("Already surrendered leadership. Queue processing is stopped.", new Object[0]);
        } else if (this.isEnabled()) {
            log.info("Not leader anymore. Stopping queue processing.", new Object[0]);
        } else {
            log.info("Not leader anymore. Segment allocation queue is already disabled.", new Object[0]);
        }
    }

    public boolean isEnabled() {
        return this.executor != null && !this.executor.isShutdown();
    }

    private void scheduleQueuePoll(long delay) {
        this.executor.schedule(this::processBatchesDue, delay, TimeUnit.MILLISECONDS);
    }

    public int size() {
        return this.processingQueue.size();
    }

    public Future<SegmentIdWithShardSpec> add(SegmentAllocateRequest request) {
        if (!this.isLeader.get()) {
            throw new ISE("Cannot allocate segment if not leader.", new Object[0]);
        }
        if (!this.isEnabled()) {
            throw new ISE("Batched segment allocation is disabled.", new Object[0]);
        }
        AllocateRequestKey requestKey = this.getKeyForAvailableBatch(request);
        AtomicReference futureReference = new AtomicReference();
        this.keyToBatch.compute(requestKey, (key, existingBatch) -> {
            if (existingBatch == null) {
                AllocateRequestBatch newBatch = new AllocateRequestBatch((AllocateRequestKey)key);
                futureReference.set(newBatch.add(request));
                return this.addBatchToQueue(newBatch) ? newBatch : null;
            }
            futureReference.set(existingBatch.add(request));
            return existingBatch;
        });
        return (Future)futureReference.get();
    }

    private AllocateRequestKey getKeyForAvailableBatch(SegmentAllocateRequest request) {
        for (int batchIncrementalId = 0; batchIncrementalId < 2000; ++batchIncrementalId) {
            AllocateRequestKey nextKey = new AllocateRequestKey(request, this.maxWaitTimeMillis, batchIncrementalId);
            AllocateRequestBatch nextBatch = this.keyToBatch.get(nextKey);
            if (nextBatch != null && nextBatch.size() >= 500) continue;
            return nextKey;
        }
        throw new ISE("Allocation queue is at capacity, all batches are full.", new Object[0]);
    }

    private boolean addBatchToQueue(AllocateRequestBatch batch) {
        batch.key.resetQueueTime();
        if (!this.isLeader.get()) {
            batch.failPendingRequests("Cannot allocate segment if not leader");
            return false;
        }
        if (this.processingQueue.offer(batch.key)) {
            log.debug("Added a new batch [%s] to queue.", new Object[]{batch.key});
            return true;
        }
        batch.failPendingRequests("Segment allocation queue is full. Check the metric `task/action/batch/runTime` to determine if metadata operations are slow.");
        return false;
    }

    private void requeueBatch(AllocateRequestBatch batch) {
        log.info("Requeueing [%d] failed requests in batch [%s].", new Object[]{batch.size(), batch.key});
        this.keyToBatch.compute(batch.key, (key, existingBatch) -> {
            if (existingBatch == null) {
                return this.addBatchToQueue(batch) ? batch : null;
            }
            existingBatch.transferRequestsFrom(batch);
            return existingBatch;
        });
    }

    private void processBatchesDue() {
        long nextScheduleDelay;
        this.clearQueueIfNotLeader();
        log.debug("Processing batches which are due. Queue size [%d].", new Object[]{this.processingQueue.size()});
        int numProcessedBatches = 0;
        AllocateRequestKey nextKey = (AllocateRequestKey)this.processingQueue.peekFirst();
        while (nextKey != null && nextKey.isDue()) {
            boolean processed;
            this.processingQueue.pollFirst();
            AllocateRequestBatch nextBatch = this.keyToBatch.remove(nextKey);
            try {
                processed = this.processBatch(nextBatch);
            }
            catch (Throwable t) {
                nextBatch.failPendingRequests(t);
                processed = true;
                log.error(t, "Error while processing batch [%s]", new Object[]{nextKey});
            }
            if (processed) {
                ++numProcessedBatches;
            } else {
                this.requeueBatch(nextBatch);
            }
            nextKey = this.processingQueue.peek();
        }
        if (this.processingQueue.isEmpty()) {
            nextScheduleDelay = this.maxWaitTimeMillis;
        } else {
            nextKey = this.processingQueue.peek();
            long timeElapsed = System.currentTimeMillis() - nextKey.getQueueTime();
            nextScheduleDelay = Math.max(0L, this.maxWaitTimeMillis - timeElapsed);
        }
        this.scheduleQueuePoll(nextScheduleDelay);
        log.info("Processed [%d] batches, next execution in [%d ms]", new Object[]{numProcessedBatches, nextScheduleDelay});
    }

    private void clearQueueIfNotLeader() {
        int failedBatches = 0;
        AllocateRequestKey nextKey = (AllocateRequestKey)this.processingQueue.peekFirst();
        while (nextKey != null && !this.isLeader.get()) {
            this.processingQueue.pollFirst();
            AllocateRequestBatch nextBatch = this.keyToBatch.remove(nextKey);
            nextBatch.failPendingRequests("Cannot allocate segment if not leader");
            ++failedBatches;
            nextKey = (AllocateRequestKey)this.processingQueue.peekFirst();
        }
        if (failedBatches > 0) {
            log.info("Not leader. Failed [%d] batches, remaining in queue [%d].", new Object[]{failedBatches, this.processingQueue.size()});
        }
    }

    private boolean processBatch(AllocateRequestBatch requestBatch) {
        AllocateRequestKey requestKey = requestBatch.key;
        if (requestBatch.isEmpty()) {
            return true;
        }
        if (!this.isLeader.get()) {
            requestBatch.failPendingRequests("Cannot allocate segment if not leader");
            return true;
        }
        log.debug("Processing [%d] requests for batch [%s], queue time [%s].", new Object[]{requestBatch.size(), requestKey, requestKey.getQueueTime()});
        long startTimeMillis = System.currentTimeMillis();
        int batchSize = requestBatch.size();
        this.emitBatchMetric("task/action/batch/size", batchSize, requestKey);
        this.emitBatchMetric("task/action/batch/queueTime", startTimeMillis - requestKey.getQueueTime(), requestKey);
        Set<DataSegment> usedSegments = this.retrieveUsedSegments(requestKey);
        int successCount = this.allocateSegmentsForBatch(requestBatch, usedSegments);
        this.emitBatchMetric("task/action/batch/attempts", 1L, requestKey);
        this.emitBatchMetric("task/action/batch/runTime", System.currentTimeMillis() - startTimeMillis, requestKey);
        log.info("Successfully processed [%d / %d] requests in batch [%s].", new Object[]{successCount, batchSize, requestKey});
        if (requestBatch.isEmpty()) {
            log.debug("All requests in batch [%s] have been processed.", new Object[]{requestKey});
            return true;
        }
        log.debug("There are [%d] failed requests in batch [%s].", new Object[]{requestBatch.size(), requestKey});
        Set<DataSegment> updatedUsedSegments = this.retrieveUsedSegments(requestKey);
        if (updatedUsedSegments.equals(usedSegments)) {
            requestBatch.failPendingRequests("Allocation failed probably due to conflicting segments.");
            return true;
        }
        log.debug("Used segments have changed. Requeuing failed requests.", new Object[0]);
        return false;
    }

    private Set<DataSegment> retrieveUsedSegments(AllocateRequestKey key) {
        return new HashSet<DataSegment>(this.metadataStorage.retrieveUsedSegmentsForInterval(key.dataSource, key.preferredAllocationInterval, Segments.ONLY_VISIBLE));
    }

    private int allocateSegmentsForBatch(AllocateRequestBatch requestBatch, Set<DataSegment> usedSegments) {
        int successCount = 0;
        Set<SegmentAllocateRequest> allRequests = requestBatch.getRequests();
        HashSet<SegmentAllocateRequest> requestsWithNoOverlappingSegment = new HashSet<SegmentAllocateRequest>();
        if (usedSegments.isEmpty()) {
            requestsWithNoOverlappingSegment.addAll(allRequests);
        } else {
            Interval[] sortedUsedSegmentIntervals = this.getSortedIntervals(usedSegments);
            HashMap<Interval, List> overlapIntervalToRequests = new HashMap<Interval, List>();
            for (SegmentAllocateRequest segmentAllocateRequest : allRequests) {
                Interval overlappingInterval = Intervals.findOverlappingInterval((Interval)segmentAllocateRequest.getRowInterval(), (Interval[])sortedUsedSegmentIntervals);
                if (overlappingInterval == null) {
                    requestsWithNoOverlappingSegment.add(segmentAllocateRequest);
                    continue;
                }
                if (!overlappingInterval.contains((ReadableInterval)segmentAllocateRequest.getRowInterval())) continue;
                overlapIntervalToRequests.computeIfAbsent(overlappingInterval, i -> new ArrayList()).add(segmentAllocateRequest);
            }
            for (Map.Entry entry : overlapIntervalToRequests.entrySet()) {
                successCount += this.allocateSegmentsForInterval((Interval)entry.getKey(), (List)entry.getValue(), requestBatch);
            }
        }
        HashSet<SegmentAllocateRequest> pendingRequests = new HashSet<SegmentAllocateRequest>(requestsWithNoOverlappingSegment);
        for (Granularity granularity : Granularity.granularitiesFinerThan((Granularity)requestBatch.key.preferredSegmentGranularity)) {
            Map<Interval, List<SegmentAllocateRequest>> map = this.getRequestsByInterval(pendingRequests, granularity);
            for (Map.Entry<Interval, List<SegmentAllocateRequest>> entry : map.entrySet()) {
                successCount += this.allocateSegmentsForInterval(entry.getKey(), entry.getValue(), requestBatch);
                pendingRequests.retainAll(requestBatch.getRequests());
            }
        }
        return successCount;
    }

    private Interval[] getSortedIntervals(Set<DataSegment> usedSegments) {
        TreeSet sortedSet = new TreeSet(Comparators.intervalsByStartThenEnd());
        usedSegments.forEach(segment -> sortedSet.add(segment.getInterval()));
        return sortedSet.toArray(new Interval[0]);
    }

    private int allocateSegmentsForInterval(Interval tryInterval, List<SegmentAllocateRequest> requests, AllocateRequestBatch requestBatch) {
        if (requests.isEmpty()) {
            return 0;
        }
        AllocateRequestKey requestKey = requestBatch.key;
        log.debug("Trying allocation for [%d] requests, interval [%s] in batch [%s]", new Object[]{requests.size(), tryInterval, requestKey});
        List<SegmentAllocateResult> results = this.taskLockbox.allocateSegments(requests, requestKey.dataSource, tryInterval, requestKey.skipSegmentLineageCheck, requestKey.lockGranularity);
        int successfulRequests = 0;
        for (int i = 0; i < requests.size(); ++i) {
            SegmentAllocateRequest request = requests.get(i);
            SegmentAllocateResult result = results.get(i);
            if (result.isSuccess()) {
                ++successfulRequests;
            }
            requestBatch.handleResult(result, request);
        }
        return successfulRequests;
    }

    private Map<Interval, List<SegmentAllocateRequest>> getRequestsByInterval(Set<SegmentAllocateRequest> requests, Granularity tryGranularity) {
        HashMap<Interval, List<SegmentAllocateRequest>> tryIntervalToRequests = new HashMap<Interval, List<SegmentAllocateRequest>>();
        for (SegmentAllocateRequest request : requests) {
            Interval tryInterval = tryGranularity.bucket(request.getAction().getTimestamp());
            if (!tryInterval.contains((ReadableInterval)request.getRowInterval())) continue;
            tryIntervalToRequests.computeIfAbsent(tryInterval, i -> new ArrayList()).add(request);
        }
        return tryIntervalToRequests;
    }

    private void emitTaskMetric(String metric, long value, SegmentAllocateRequest request) {
        ServiceMetricEvent.Builder metricBuilder = ServiceMetricEvent.builder();
        IndexTaskUtils.setTaskDimensions(metricBuilder, request.getTask());
        metricBuilder.setDimension("taskActionType", (Object)"segmentAllocate");
        this.emitter.emit(metricBuilder.build(metric, (Number)value));
    }

    private void emitBatchMetric(String metric, long value, AllocateRequestKey key) {
        ServiceMetricEvent.Builder metricBuilder = ServiceMetricEvent.builder();
        metricBuilder.setDimension("taskActionType", (Object)"segmentAllocate");
        metricBuilder.setDimension("dataSource", (Object)key.dataSource);
        metricBuilder.setDimension("interval", (Object)key.preferredAllocationInterval.toString());
        this.emitter.emit(metricBuilder.build(metric, (Number)value));
    }

    private static class AllocateRequestKey {
        private final int batchIncrementalId;
        private long queueTimeMillis;
        private final long maxWaitTimeMillis;
        private final String dataSource;
        private final String groupId;
        private final Interval preferredAllocationInterval;
        private final Granularity preferredSegmentGranularity;
        private final boolean skipSegmentLineageCheck;
        private final LockGranularity lockGranularity;
        private final boolean useNonRootGenPartitionSpace;
        private final int hash;

        AllocateRequestKey(SegmentAllocateRequest request, long maxWaitTimeMillis, int batchIncrementalId) {
            SegmentAllocateAction action = request.getAction();
            Task task = request.getTask();
            this.batchIncrementalId = batchIncrementalId;
            this.dataSource = action.getDataSource();
            this.groupId = task.getGroupId();
            this.skipSegmentLineageCheck = action.isSkipSegmentLineageCheck();
            this.lockGranularity = action.getLockGranularity();
            this.useNonRootGenPartitionSpace = action.getPartialShardSpec().useNonRootGenerationPartitionSpace();
            this.preferredSegmentGranularity = action.getPreferredSegmentGranularity();
            this.preferredAllocationInterval = action.getPreferredSegmentGranularity().bucket(action.getTimestamp());
            this.hash = Objects.hash(new Object[]{this.dataSource, this.groupId, batchIncrementalId, this.skipSegmentLineageCheck, this.useNonRootGenPartitionSpace, this.preferredAllocationInterval, this.lockGranularity});
            this.maxWaitTimeMillis = maxWaitTimeMillis;
        }

        void resetQueueTime() {
            this.queueTimeMillis = System.currentTimeMillis();
        }

        long getQueueTime() {
            return this.queueTimeMillis;
        }

        boolean isDue() {
            return System.currentTimeMillis() - this.queueTimeMillis >= this.maxWaitTimeMillis;
        }

        public boolean equals(Object o) {
            if (this == o) {
                return true;
            }
            if (o == null || this.getClass() != o.getClass()) {
                return false;
            }
            AllocateRequestKey that = (AllocateRequestKey)o;
            return this.dataSource.equals(that.dataSource) && this.groupId.equals(that.groupId) && this.batchIncrementalId == that.batchIncrementalId && this.skipSegmentLineageCheck == that.skipSegmentLineageCheck && this.useNonRootGenPartitionSpace == that.useNonRootGenPartitionSpace && this.preferredAllocationInterval.equals((Object)that.preferredAllocationInterval) && this.lockGranularity == that.lockGranularity;
        }

        public int hashCode() {
            return this.hash;
        }

        public String toString() {
            return "{ds='" + this.dataSource + '\'' + ", gr='" + this.groupId + '\'' + ", incId=" + this.batchIncrementalId + ", lock=" + (Object)((Object)this.lockGranularity) + ", invl=" + this.preferredAllocationInterval + ", slc=" + this.skipSegmentLineageCheck + '}';
        }
    }

    private class AllocateRequestBatch {
        private final AllocateRequestKey key;
        private final Map<SegmentAllocateRequest, CompletableFuture<SegmentIdWithShardSpec>> requestToFuture = new HashMap<SegmentAllocateRequest, CompletableFuture<SegmentIdWithShardSpec>>();

        AllocateRequestBatch(AllocateRequestKey key) {
            this.key = key;
        }

        synchronized Future<SegmentIdWithShardSpec> add(SegmentAllocateRequest request) {
            log.debug("Adding request to batch [%s]: %s", new Object[]{this.key, request.getAction()});
            return this.requestToFuture.computeIfAbsent(request, req -> new CompletableFuture());
        }

        synchronized void transferRequestsFrom(AllocateRequestBatch batch) {
            this.requestToFuture.putAll(batch.requestToFuture);
            batch.requestToFuture.clear();
        }

        synchronized Set<SegmentAllocateRequest> getRequests() {
            return new HashSet<SegmentAllocateRequest>(this.requestToFuture.keySet());
        }

        synchronized void failPendingRequests(String reason) {
            this.failPendingRequests((Throwable)new ISE(reason, new Object[0]));
        }

        synchronized void failPendingRequests(Throwable cause) {
            if (!this.requestToFuture.isEmpty()) {
                log.warn("Failing [%d] requests in batch due to [%s]. Batch key: %s", new Object[]{this.size(), cause.getMessage(), this.key});
                this.requestToFuture.values().forEach(future -> future.completeExceptionally(cause));
                this.requestToFuture.keySet().forEach(request -> SegmentAllocationQueue.this.emitTaskMetric("task/action/failed/count", 1L, request));
                this.requestToFuture.clear();
            }
        }

        synchronized void handleResult(SegmentAllocateResult result, SegmentAllocateRequest request) {
            request.incrementAttempts();
            if (result.isSuccess()) {
                SegmentAllocationQueue.this.emitTaskMetric("task/action/success/count", 1L, request);
                this.requestToFuture.remove(request).complete(result.getSegmentId());
            } else if (request.canRetry()) {
                log.info("Allocation failed in attempt [%d] due to error [%s]. Can still retry. Action: %s", new Object[]{request.getAttempts(), result.getErrorMessage(), request.getAction()});
            } else {
                SegmentAllocationQueue.this.emitTaskMetric("task/action/failed/count", 1L, request);
                log.error("Failing allocate action after [%d] attempts. Latest error [%s]. Action: %s", new Object[]{request.getAttempts(), result.getErrorMessage(), request.getAction()});
                this.requestToFuture.remove(request).completeExceptionally((Throwable)new ISE(result.getErrorMessage(), new Object[0]));
            }
        }

        synchronized boolean isEmpty() {
            return this.requestToFuture.isEmpty();
        }

        synchronized int size() {
            return this.requestToFuture.size();
        }
    }
}

