/*
 * Decompiled with CFR 0.152.
 */
package io.pravega.controller.server.eventProcessor.requesthandlers;

import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import io.pravega.client.stream.ScalingPolicy;
import io.pravega.client.stream.StreamConfiguration;
import io.pravega.common.concurrent.Futures;
import io.pravega.common.tracing.TagLogger;
import io.pravega.controller.eventProcessor.impl.EventProcessorHelper;
import io.pravega.controller.store.stream.OperationContext;
import io.pravega.controller.store.stream.StreamMetadataStore;
import io.pravega.controller.store.stream.records.StreamSegmentRecord;
import io.pravega.controller.task.Stream.StreamMetadataTasks;
import io.pravega.shared.controller.event.AutoScaleEvent;
import io.pravega.shared.controller.event.ControllerEvent;
import io.pravega.shared.controller.event.ScaleOpEvent;
import io.pravega.shared.segment.StreamSegmentNameUtils;
import java.time.Duration;
import java.util.AbstractMap;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
import java.util.stream.Collectors;
import org.apache.commons.lang3.tuple.ImmutablePair;
import org.slf4j.LoggerFactory;

public class AutoScaleTask {
    private static final TagLogger log = new TagLogger(LoggerFactory.getLogger(AutoScaleTask.class));
    private static final long REQUEST_VALIDITY_PERIOD = Duration.ofMinutes(10L).toMillis();
    private final StreamMetadataTasks streamMetadataTasks;
    private final StreamMetadataStore streamMetadataStore;
    private final ScheduledExecutorService executor;

    public AutoScaleTask(StreamMetadataTasks streamMetadataTasks, StreamMetadataStore streamMetadataStore, ScheduledExecutorService executor) {
        Preconditions.checkNotNull((Object)streamMetadataStore);
        Preconditions.checkNotNull((Object)streamMetadataTasks);
        Preconditions.checkNotNull((Object)executor);
        this.streamMetadataTasks = streamMetadataTasks;
        this.streamMetadataStore = streamMetadataStore;
        this.executor = executor;
    }

    public CompletableFuture<Void> execute(AutoScaleEvent request) {
        if (request.getTimestamp() + REQUEST_VALIDITY_PERIOD <= System.currentTimeMillis()) {
            log.info(request.getRequestId(), "Scale Request for stream {}/{} expired", new Object[]{request.getScope(), request.getStream()});
            return CompletableFuture.completedFuture(null);
        }
        OperationContext context = this.streamMetadataStore.createContext(request.getScope(), request.getStream());
        return EventProcessorHelper.withRetries(() -> {
            CompletionStage policyFuture = this.streamMetadataStore.getConfiguration(request.getScope(), request.getStream(), context, this.executor).thenApply(StreamConfiguration::getScalingPolicy);
            if (request.getDirection() == 0) {
                return ((CompletableFuture)policyFuture).thenComposeAsync(policy -> this.processScaleUp(request, (ScalingPolicy)policy, context), (Executor)this.executor);
            }
            return ((CompletableFuture)policyFuture).thenComposeAsync(policy -> this.processScaleDown(request, (ScalingPolicy)policy, context), (Executor)this.executor);
        }, this.executor);
    }

    private CompletableFuture<Void> processScaleUp(AutoScaleEvent request, ScalingPolicy policy, OperationContext context) {
        String qualifiedName = StreamSegmentNameUtils.getQualifiedStreamSegmentName((String)request.getScope(), (String)request.getStream(), (long)request.getSegmentId());
        log.info(request.getRequestId(), "Scale up request received for stream segment {}", new Object[]{qualifiedName});
        if (policy.getScaleType().equals((Object)ScalingPolicy.ScaleType.FIXED_NUM_SEGMENTS)) {
            return CompletableFuture.completedFuture(null);
        }
        return this.streamMetadataStore.getSegment(request.getScope(), request.getStream(), request.getSegmentId(), context, this.executor).thenComposeAsync(segment -> {
            int numOfSplits = Math.min(Math.max(2, request.getNumOfSplits()), Math.max(2, policy.getScaleFactor()));
            double delta = (segment.getKeyEnd() - segment.getKeyStart()) / (double)numOfSplits;
            ArrayList<Map.Entry<Double, Double>> simpleEntries = new ArrayList<Map.Entry<Double, Double>>();
            for (int i = 0; i < numOfSplits - 1; ++i) {
                simpleEntries.add(new AbstractMap.SimpleEntry<Double, Double>(segment.getKeyStart() + delta * (double)i, segment.getKeyStart() + delta * (double)(i + 1)));
            }
            simpleEntries.add(new AbstractMap.SimpleEntry<Double, Double>(segment.getKeyStart() + delta * (double)(numOfSplits - 1), segment.getKeyEnd()));
            return this.postScaleRequest(request, Lists.newArrayList((Object[])new Long[]{request.getSegmentId()}), simpleEntries, request.getRequestId());
        }, (Executor)this.executor);
    }

    private CompletableFuture<Void> processScaleDown(AutoScaleEvent request, ScalingPolicy policy, OperationContext context) {
        String qualifiedName = StreamSegmentNameUtils.getQualifiedStreamSegmentName((String)request.getScope(), (String)request.getStream(), (long)request.getSegmentId());
        log.info(request.getRequestId(), "Scale down request received for stream segment {}", new Object[]{qualifiedName});
        if (policy.getScaleType().equals((Object)ScalingPolicy.ScaleType.FIXED_NUM_SEGMENTS)) {
            return CompletableFuture.completedFuture(null);
        }
        return ((CompletableFuture)((CompletableFuture)((CompletableFuture)this.streamMetadataStore.markCold(request.getScope(), request.getStream(), request.getSegmentId(), request.isSilent() ? Long.MAX_VALUE : request.getTimestamp() + REQUEST_VALIDITY_PERIOD, context, this.executor).thenCompose(x -> this.streamMetadataStore.getActiveSegments(request.getScope(), request.getStream(), context, this.executor))).thenApply(activeSegments -> {
            assert (activeSegments != null);
            Optional<StreamSegmentRecord> currentOpt = activeSegments.stream().filter(y -> y.segmentId() == request.getSegmentId()).findAny();
            if (!currentOpt.isPresent() || activeSegments.size() == policy.getMinNumSegments()) {
                return null;
            }
            List candidates = activeSegments.stream().filter(z -> z.getKeyEnd() == ((StreamSegmentRecord)currentOpt.get()).getKeyStart() || z.getKeyStart() == ((StreamSegmentRecord)currentOpt.get()).getKeyEnd() || z.segmentId() == request.getSegmentId()).sorted(Comparator.comparingDouble(StreamSegmentRecord::getKeyStart)).collect(Collectors.toList());
            return new ImmutablePair(candidates, (Object)(activeSegments.size() - policy.getMinNumSegments()));
        })).thenCompose(input -> {
            if (input != null && ((List)input.getLeft()).size() > 1) {
                List candidates = (List)input.getLeft();
                int maxScaleDownFactor = (Integer)input.getRight();
                return Futures.filter((List)candidates, candidate -> this.streamMetadataStore.isCold(request.getScope(), request.getStream(), candidate.segmentId(), context, this.executor)).thenApply(segments -> {
                    if (maxScaleDownFactor == 1 && segments.size() == 3) {
                        return Lists.newArrayList((Object[])new StreamSegmentRecord[]{(StreamSegmentRecord)segments.get(0), (StreamSegmentRecord)segments.get(1)});
                    }
                    return segments;
                });
            }
            return CompletableFuture.completedFuture(null);
        })).thenCompose(toMerge -> {
            if (toMerge != null && toMerge.size() > 1) {
                toMerge.forEach(x -> {
                    String segmentName = StreamSegmentNameUtils.getQualifiedStreamSegmentName((String)request.getScope(), (String)request.getStream(), (long)x.segmentId());
                    log.debug(request.getRequestId(), "Merging stream segment {} ", new Object[]{segmentName});
                });
                ArrayList<Map.Entry<Double, Double>> simpleEntries = new ArrayList<Map.Entry<Double, Double>>();
                double min = toMerge.stream().mapToDouble(StreamSegmentRecord::getKeyStart).min().getAsDouble();
                double max = toMerge.stream().mapToDouble(StreamSegmentRecord::getKeyEnd).max().getAsDouble();
                simpleEntries.add(new AbstractMap.SimpleEntry<Double, Double>(min, max));
                ArrayList<Long> segments = new ArrayList<Long>();
                toMerge.forEach(segment -> segments.add(segment.segmentId()));
                return this.postScaleRequest(request, segments, simpleEntries, request.getRequestId());
            }
            return CompletableFuture.completedFuture(null);
        });
    }

    private CompletableFuture<Void> postScaleRequest(AutoScaleEvent request, List<Long> segments, List<Map.Entry<Double, Double>> newRanges, long requestId) {
        ScaleOpEvent event = new ScaleOpEvent(request.getScope(), request.getStream(), segments, newRanges, false, System.currentTimeMillis(), requestId);
        return this.streamMetadataTasks.writeEvent((ControllerEvent)event);
    }
}

