/*
 * Decompiled with CFR 0.152.
 */
package io.pravega.controller.task.Stream;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableMap;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import io.pravega.client.EventStreamClientFactory;
import io.pravega.client.stream.EventStreamWriter;
import io.pravega.client.stream.EventWriterConfig;
import io.pravega.client.stream.RetentionPolicy;
import io.pravega.client.stream.ScalingPolicy;
import io.pravega.client.stream.StreamConfiguration;
import io.pravega.client.stream.impl.ModelHelper;
import io.pravega.common.Exceptions;
import io.pravega.common.concurrent.Futures;
import io.pravega.common.tracing.RequestTracker;
import io.pravega.common.tracing.TagLogger;
import io.pravega.common.util.RetriesExhaustedException;
import io.pravega.controller.metrics.StreamMetrics;
import io.pravega.controller.server.SegmentHelper;
import io.pravega.controller.server.eventProcessor.ControllerEventProcessors;
import io.pravega.controller.server.eventProcessor.requesthandlers.TaskExceptions;
import io.pravega.controller.server.rpc.auth.GrpcAuthHelper;
import io.pravega.controller.store.stream.BucketStore;
import io.pravega.controller.store.stream.CreateStreamResponse;
import io.pravega.controller.store.stream.EpochTransitionOperationExceptions;
import io.pravega.controller.store.stream.OperationContext;
import io.pravega.controller.store.stream.State;
import io.pravega.controller.store.stream.StoreException;
import io.pravega.controller.store.stream.StreamMetadataStore;
import io.pravega.controller.store.stream.VersionedMetadata;
import io.pravega.controller.store.stream.records.EpochRecord;
import io.pravega.controller.store.stream.records.EpochTransitionRecord;
import io.pravega.controller.store.stream.records.RetentionSet;
import io.pravega.controller.store.stream.records.StreamConfigurationRecord;
import io.pravega.controller.store.stream.records.StreamCutRecord;
import io.pravega.controller.store.stream.records.StreamCutReferenceRecord;
import io.pravega.controller.store.stream.records.StreamSegmentRecord;
import io.pravega.controller.store.stream.records.StreamTruncationRecord;
import io.pravega.controller.store.task.LockFailedException;
import io.pravega.controller.store.task.Resource;
import io.pravega.controller.store.task.TaskMetadataStore;
import io.pravega.controller.stream.api.grpc.v1.Controller;
import io.pravega.controller.task.Stream.TaskStepsRetryHelper;
import io.pravega.controller.task.Task;
import io.pravega.controller.task.TaskBase;
import io.pravega.controller.util.Config;
import io.pravega.controller.util.RetryHelper;
import io.pravega.shared.NameUtils;
import io.pravega.shared.controller.event.ControllerEvent;
import io.pravega.shared.controller.event.DeleteStreamEvent;
import io.pravega.shared.controller.event.ScaleOpEvent;
import io.pravega.shared.controller.event.SealStreamEvent;
import io.pravega.shared.controller.event.TruncateStreamEvent;
import io.pravega.shared.controller.event.UpdateStreamEvent;
import io.pravega.shared.protocol.netty.WireCommands;
import io.pravega.shared.segment.StreamSegmentNameUtils;
import java.io.Serializable;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.Stream;
import org.apache.commons.lang3.NotImplementedException;
import org.slf4j.LoggerFactory;

public class StreamMetadataTasks
extends TaskBase {
    @SuppressFBWarnings(justification="generated code")
    private final Object $lock = new Object[0];
    private static final TagLogger log = new TagLogger(LoggerFactory.getLogger(StreamMetadataTasks.class));
    private static final long RETENTION_FREQUENCY_IN_MINUTES = Duration.ofMinutes(Config.MINIMUM_RETENTION_FREQUENCY_IN_MINUTES).toMillis();
    private final StreamMetadataStore streamMetadataStore;
    private final BucketStore bucketStore;
    private final SegmentHelper segmentHelper;
    private String requestStreamName;
    private final CompletableFuture<Void> writerInitFuture = new CompletableFuture();
    private final AtomicReference<EventStreamWriter<ControllerEvent>> requestEventWriterRef = new AtomicReference();
    private final GrpcAuthHelper authHelper;
    private final RequestTracker requestTracker;

    public StreamMetadataTasks(StreamMetadataStore streamMetadataStore, BucketStore bucketStore, TaskMetadataStore taskMetadataStore, SegmentHelper segmentHelper, ScheduledExecutorService executor, String hostId, GrpcAuthHelper authHelper, RequestTracker requestTracker) {
        this(streamMetadataStore, bucketStore, taskMetadataStore, segmentHelper, executor, new TaskBase.Context(hostId), authHelper, requestTracker);
    }

    private StreamMetadataTasks(StreamMetadataStore streamMetadataStore, BucketStore bucketStore, TaskMetadataStore taskMetadataStore, SegmentHelper segmentHelper, ScheduledExecutorService executor, TaskBase.Context context, GrpcAuthHelper authHelper, RequestTracker requestTracker) {
        super(taskMetadataStore, executor, context);
        this.streamMetadataStore = streamMetadataStore;
        this.bucketStore = bucketStore;
        this.segmentHelper = segmentHelper;
        this.authHelper = authHelper;
        this.requestTracker = requestTracker;
        this.setReady();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void initializeStreamWriters(EventStreamClientFactory clientFactory, String streamName) {
        Object object = this.$lock;
        synchronized (object) {
            this.requestStreamName = streamName;
            this.requestEventWriterRef.set((EventStreamWriter<ControllerEvent>)clientFactory.createEventWriter(this.requestStreamName, ControllerEventProcessors.CONTROLLER_EVENT_SERIALIZER, EventWriterConfig.builder().build()));
            this.writerInitFuture.complete(null);
        }
    }

    public CompletableFuture<Controller.CreateStreamStatus.Status> createStreamRetryOnLockFailure(String scope, String stream, StreamConfiguration config, long createTimestamp, int numOfRetries) {
        return RetryHelper.withRetriesAsync(() -> this.createStream(scope, stream, config, createTimestamp), e -> Exceptions.unwrap((Throwable)e) instanceof LockFailedException, numOfRetries, this.executor).exceptionally(e -> {
            Throwable unwrap = Exceptions.unwrap((Throwable)e);
            if (unwrap instanceof RetriesExhaustedException) {
                throw new CompletionException(unwrap.getCause());
            }
            throw new CompletionException(unwrap);
        });
    }

    @Task(name="createStream", version="1.0", resource="{scope}/{stream}")
    public CompletableFuture<Controller.CreateStreamStatus.Status> createStream(String scope, String stream, StreamConfiguration config, long createTimestamp) {
        return this.execute(new Resource(scope, stream), new Serializable[]{scope, stream, config, Long.valueOf(createTimestamp)}, () -> this.createStreamBody(scope, stream, config, createTimestamp));
    }

    public CompletableFuture<Controller.UpdateStreamStatus.Status> updateStream(String scope, String stream, StreamConfiguration newConfig, OperationContext contextOpt) {
        OperationContext context = contextOpt == null ? this.streamMetadataStore.createContext(scope, stream) : contextOpt;
        long requestId = this.requestTracker.getRequestIdFor(new String[]{"updateStream", scope, stream});
        return ((CompletableFuture)this.streamMetadataStore.getConfigurationRecord(scope, stream, context, this.executor).thenCompose(configProperty -> {
            if (!((StreamConfigurationRecord)configProperty.getObject()).isUpdating()) {
                return this.addIndexAndSubmitTask((ControllerEvent)new UpdateStreamEvent(scope, stream, requestId), () -> this.streamMetadataStore.startUpdateConfiguration(scope, stream, newConfig, context, this.executor)).thenCompose(x -> this.checkDone(() -> this.isUpdated(scope, stream, newConfig, context)).thenApply(y -> Controller.UpdateStreamStatus.Status.SUCCESS));
            }
            log.warn(requestId, "Another update in progress for {}/{}", new Object[]{scope, stream});
            return CompletableFuture.completedFuture(Controller.UpdateStreamStatus.Status.FAILURE);
        })).exceptionally(ex -> {
            log.warn(requestId, "Exception thrown in trying to update stream configuration {}", new Object[]{ex.getMessage()});
            return this.handleUpdateStreamError((Throwable)ex, requestId);
        });
    }

    private CompletableFuture<Void> checkDone(Supplier<CompletableFuture<Boolean>> condition) {
        return this.checkDone(condition, 100L);
    }

    private CompletableFuture<Void> checkDone(Supplier<CompletableFuture<Boolean>> condition, long delay) {
        AtomicBoolean isDone = new AtomicBoolean(false);
        return Futures.loop(() -> !isDone.get(), () -> Futures.delayedFuture((Supplier)condition, (long)delay, (ScheduledExecutorService)this.executor).thenAccept(isDone::set), (Executor)this.executor);
    }

    @VisibleForTesting
    CompletableFuture<Boolean> isUpdated(String scope, String stream, StreamConfiguration newConfig, OperationContext context) {
        CompletableFuture<State> stateFuture = this.streamMetadataStore.getState(scope, stream, true, context, this.executor);
        CompletionStage configPropertyFuture = this.streamMetadataStore.getConfigurationRecord(scope, stream, context, this.executor).thenApply(VersionedMetadata::getObject);
        return CompletableFuture.allOf(new CompletableFuture[]{stateFuture, configPropertyFuture}).thenApply(arg_0 -> StreamMetadataTasks.lambda$isUpdated$12(stateFuture, (CompletableFuture)configPropertyFuture, newConfig, arg_0));
    }

    public CompletableFuture<Void> retention(String scope, String stream, RetentionPolicy policy, long recordingTime, OperationContext contextOpt, String delegationToken) {
        Preconditions.checkNotNull((Object)policy);
        OperationContext context = contextOpt == null ? this.streamMetadataStore.createContext(scope, stream) : contextOpt;
        long requestId = this.requestTracker.getRequestIdFor(new String[]{"truncateStream", scope, stream});
        return ((CompletableFuture)this.streamMetadataStore.getRetentionSet(scope, stream, context, this.executor).thenCompose(retentionSet -> {
            StreamCutReferenceRecord latestCut = retentionSet.getLatest();
            return this.generateStreamCutIfRequired(scope, stream, latestCut, recordingTime, context, delegationToken).thenCompose(newRecord -> this.truncate(scope, stream, policy, context, (RetentionSet)retentionSet, (StreamCutRecord)newRecord, recordingTime, requestId));
        })).thenAccept(x -> StreamMetrics.reportRetentionEvent(scope, stream));
    }

    private CompletableFuture<StreamCutRecord> generateStreamCutIfRequired(String scope, String stream, StreamCutReferenceRecord previous, long recordingTime, OperationContext context, String delegationToken) {
        if (previous == null || recordingTime - previous.getRecordingTime() > RETENTION_FREQUENCY_IN_MINUTES) {
            return Futures.exceptionallyComposeExpecting(previous == null ? CompletableFuture.completedFuture(null) : this.streamMetadataStore.getStreamCutRecord(scope, stream, previous, context, this.executor), e -> e instanceof StoreException.DataNotFoundException, () -> null).thenCompose(previousRecord -> this.generateStreamCut(scope, stream, (StreamCutRecord)previousRecord, context, delegationToken).thenCompose(newRecord -> this.streamMetadataStore.addStreamCutToRetentionSet(scope, stream, (StreamCutRecord)newRecord, context, this.executor).thenApply(x -> {
                log.debug("New streamCut generated for stream {}/{}", (Object)scope, (Object)stream);
                return newRecord;
            })));
        }
        return CompletableFuture.completedFuture(null);
    }

    private CompletableFuture<Void> truncate(String scope, String stream, RetentionPolicy policy, OperationContext context, RetentionSet retentionSet, StreamCutRecord newRecord, long recordingTime, long requestId) {
        return this.findTruncationRecord(policy, retentionSet, newRecord, recordingTime).map(record -> ((CompletableFuture)((CompletableFuture)this.streamMetadataStore.getStreamCutRecord(scope, stream, (StreamCutReferenceRecord)record, context, this.executor).thenCompose(streamCutRecord -> this.startTruncation(scope, stream, streamCutRecord.getStreamCut(), context, requestId))).thenCompose(started -> {
            if (started.booleanValue()) {
                return this.streamMetadataStore.deleteStreamCutBefore(scope, stream, (StreamCutReferenceRecord)record, context, this.executor);
            }
            throw new RuntimeException("Could not start truncation");
        })).exceptionally(e -> {
            if (Exceptions.unwrap((Throwable)e) instanceof IllegalArgumentException) {
                log.debug(requestId, "Cannot truncate at given streamCut because it intersects with existing truncation point", new Object[0]);
                return null;
            }
            throw new CompletionException((Throwable)e);
        })).orElse(CompletableFuture.completedFuture(null));
    }

    private Optional<StreamCutReferenceRecord> findTruncationRecord(RetentionPolicy policy, RetentionSet retentionSet, StreamCutRecord newRecord, long recordingTime) {
        switch (policy.getRetentionType()) {
            case TIME: {
                return retentionSet.getRetentionRecords().stream().filter(x -> x.getRecordingTime() < recordingTime - policy.getRetentionParam()).max(Comparator.comparingLong(StreamCutReferenceRecord::getRecordingTime));
            }
            case SIZE: {
                Optional<StreamCutRecord> latestOpt = Optional.ofNullable(newRecord);
                return latestOpt.flatMap(latest -> retentionSet.getRetentionRecords().stream().filter(x -> latest.getRecordingSize() - x.getRecordingSize() > policy.getRetentionParam()).max(Comparator.comparingLong(StreamCutReferenceRecord::getRecordingTime)));
            }
        }
        throw new NotImplementedException(policy.getRetentionType().toString());
    }

    public CompletableFuture<StreamCutRecord> generateStreamCut(String scope, String stream, StreamCutRecord previous, OperationContext contextOpt, String delegationToken) {
        OperationContext context = contextOpt == null ? this.streamMetadataStore.createContext(scope, stream) : contextOpt;
        return ((CompletableFuture)this.streamMetadataStore.getActiveSegments(scope, stream, context, this.executor).thenCompose(activeSegments -> Futures.allOfWithResults(((Stream)activeSegments.stream().parallel()).collect(Collectors.toMap(x -> x, x -> this.getSegmentOffset(scope, stream, x.segmentId(), delegationToken)))))).thenCompose(map -> {
            long generationTime = System.currentTimeMillis();
            ImmutableMap.Builder builder = ImmutableMap.builder();
            map.forEach((key, value) -> builder.put((Object)key.segmentId(), value));
            ImmutableMap streamCutMap = builder.build();
            return this.streamMetadataStore.getSizeTillStreamCut(scope, stream, (Map<Long, Long>)streamCutMap, Optional.ofNullable(previous), context, this.executor).thenApply(sizeTill -> new StreamCutRecord(generationTime, (long)sizeTill, (ImmutableMap<Long, Long>)streamCutMap));
        });
    }

    public CompletableFuture<Controller.UpdateStreamStatus.Status> truncateStream(String scope, String stream, Map<Long, Long> streamCut, OperationContext contextOpt) {
        OperationContext context = contextOpt == null ? this.streamMetadataStore.createContext(scope, stream) : contextOpt;
        long requestId = this.requestTracker.getRequestIdFor(new String[]{"truncateStream", scope, stream});
        return ((CompletableFuture)this.startTruncation(scope, stream, streamCut, context, requestId).thenCompose(truncationStarted -> {
            if (truncationStarted.booleanValue()) {
                return this.checkDone(() -> this.isTruncated(scope, stream, streamCut, context), 1000L).thenApply(y -> Controller.UpdateStreamStatus.Status.SUCCESS);
            }
            log.warn(requestId, "Unable to start truncation for {}/{}", new Object[]{scope, stream});
            return CompletableFuture.completedFuture(Controller.UpdateStreamStatus.Status.FAILURE);
        })).exceptionally(ex -> {
            log.warn(requestId, "Exception thrown in trying to update stream configuration {}", new Object[]{ex});
            return this.handleUpdateStreamError((Throwable)ex, requestId);
        });
    }

    private CompletableFuture<Boolean> startTruncation(String scope, String stream, Map<Long, Long> streamCut, OperationContext contextOpt, long requestId) {
        OperationContext context = contextOpt == null ? this.streamMetadataStore.createContext(scope, stream) : contextOpt;
        return this.streamMetadataStore.getTruncationRecord(scope, stream, context, this.executor).thenCompose(property -> {
            if (!((StreamTruncationRecord)property.getObject()).isUpdating()) {
                return this.addIndexAndSubmitTask((ControllerEvent)new TruncateStreamEvent(scope, stream, requestId), () -> this.streamMetadataStore.startTruncation(scope, stream, streamCut, context, this.executor)).thenApply(x -> {
                    log.debug(requestId, "Started truncation request for stream {}/{}", new Object[]{scope, stream});
                    return true;
                });
            }
            log.warn(requestId, "Another truncation in progress for {}/{}", new Object[]{scope, stream});
            return CompletableFuture.completedFuture(false);
        });
    }

    @VisibleForTesting
    CompletableFuture<Boolean> isTruncated(String scope, String stream, Map<Long, Long> streamCut, OperationContext context) {
        CompletableFuture<State> stateFuture = this.streamMetadataStore.getState(scope, stream, true, context, this.executor);
        CompletionStage configPropertyFuture = this.streamMetadataStore.getTruncationRecord(scope, stream, context, this.executor).thenApply(VersionedMetadata::getObject);
        return CompletableFuture.allOf(new CompletableFuture[]{stateFuture, configPropertyFuture}).thenApply(arg_0 -> StreamMetadataTasks.lambda$isTruncated$41(stateFuture, (CompletableFuture)configPropertyFuture, streamCut, arg_0));
    }

    public CompletableFuture<Controller.UpdateStreamStatus.Status> sealStream(String scope, String stream, OperationContext contextOpt) {
        OperationContext context = contextOpt == null ? this.streamMetadataStore.createContext(scope, stream) : contextOpt;
        long requestId = this.requestTracker.getRequestIdFor(new String[]{"sealStream", scope, stream});
        SealStreamEvent event = new SealStreamEvent(scope, stream, requestId);
        return ((CompletableFuture)this.addIndexAndSubmitTask((ControllerEvent)event, () -> this.streamMetadataStore.getVersionedState(scope, stream, context, this.executor).thenCompose(state -> {
            if (((State)((Object)((Object)((Object)state.getObject())))).equals((Object)State.SEALED)) {
                return CompletableFuture.completedFuture(state);
            }
            return this.streamMetadataStore.updateVersionedState(scope, stream, State.SEALING, (VersionedMetadata<State>)state, context, this.executor);
        })).thenCompose(result -> {
            if (((State)((Object)((Object)result.getObject()))).equals((Object)State.SEALED) || ((State)((Object)((Object)result.getObject()))).equals((Object)State.SEALING)) {
                return this.checkDone(() -> this.isSealed(scope, stream, context)).thenApply(x -> Controller.UpdateStreamStatus.Status.SUCCESS);
            }
            return CompletableFuture.completedFuture(Controller.UpdateStreamStatus.Status.FAILURE);
        })).exceptionally(ex -> {
            log.warn(requestId, "Exception thrown in trying to notify sealed segments {}", new Object[]{ex.getMessage()});
            return this.handleUpdateStreamError((Throwable)ex, requestId);
        });
    }

    private CompletableFuture<Boolean> isSealed(String scope, String stream, OperationContext context) {
        return this.streamMetadataStore.getState(scope, stream, true, context, this.executor).thenApply(state -> state.equals((Object)State.SEALED));
    }

    public CompletableFuture<Controller.DeleteStreamStatus.Status> deleteStream(String scope, String stream, OperationContext contextOpt) {
        OperationContext context = contextOpt == null ? this.streamMetadataStore.createContext(scope, stream) : contextOpt;
        long requestId = this.requestTracker.getRequestIdFor(new String[]{"deleteStream", scope, stream});
        return ((CompletableFuture)((CompletableFuture)Futures.exceptionallyExpecting(this.streamMetadataStore.getState(scope, stream, false, context, this.executor), e -> Exceptions.unwrap((Throwable)e) instanceof StoreException.DataNotFoundException, (Object)((Object)State.UNKNOWN)).thenCompose(state -> {
            if (State.SEALED.equals(state) || State.CREATING.equals(state)) {
                return ((CompletableFuture)((CompletableFuture)this.streamMetadataStore.getCreationTime(scope, stream, context, this.executor).thenApply(time -> new DeleteStreamEvent(scope, stream, requestId, time.longValue()))).thenCompose(event -> this.writeEvent((ControllerEvent)event))).thenApply(x -> true);
            }
            if (State.UNKNOWN.equals(state)) {
                return ((CompletableFuture)this.streamMetadataStore.deleteStream(scope, stream, context, this.executor).exceptionally(e -> {
                    throw new CompletionException((Throwable)e);
                })).thenApply(v -> true);
            }
            return CompletableFuture.completedFuture(false);
        })).thenCompose(result -> {
            if (result.booleanValue()) {
                return this.checkDone(() -> this.isDeleted(scope, stream)).thenApply(x -> Controller.DeleteStreamStatus.Status.SUCCESS);
            }
            return CompletableFuture.completedFuture(Controller.DeleteStreamStatus.Status.STREAM_NOT_SEALED);
        })).exceptionally(ex -> {
            log.warn(requestId, "Exception thrown while deleting stream {}", new Object[]{ex.getMessage()});
            return this.handleDeleteStreamError((Throwable)ex, requestId);
        });
    }

    private CompletableFuture<Boolean> isDeleted(String scope, String stream) {
        return this.streamMetadataStore.checkStreamExists(scope, stream).thenApply(x -> x == false);
    }

    public CompletableFuture<Controller.ScaleResponse> manualScale(String scope, String stream, List<Long> segmentsToSeal, List<Map.Entry<Double, Double>> newRanges, long scaleTimestamp, OperationContext context) {
        long requestId = this.requestTracker.getRequestIdFor(new String[]{"scaleStream", scope, stream, String.valueOf(scaleTimestamp)});
        ScaleOpEvent event = new ScaleOpEvent(scope, stream, segmentsToSeal, newRanges, true, scaleTimestamp, requestId);
        return this.addIndexAndSubmitTask((ControllerEvent)event, () -> this.streamMetadataStore.submitScale(scope, stream, segmentsToSeal, new ArrayList<Map.Entry<Double, Double>>(newRanges), scaleTimestamp, null, context, this.executor)).handle((startScaleResponse, e) -> {
            Controller.ScaleResponse.Builder response = Controller.ScaleResponse.newBuilder();
            if (e != null) {
                Throwable cause = Exceptions.unwrap((Throwable)e);
                if (cause instanceof EpochTransitionOperationExceptions.PreConditionFailureException) {
                    response.setStatus(Controller.ScaleResponse.ScaleStreamStatus.PRECONDITION_FAILED);
                } else {
                    log.warn(requestId, "Scale for stream {}/{} failed with exception {}", new Object[]{scope, stream, cause});
                    response.setStatus(Controller.ScaleResponse.ScaleStreamStatus.FAILURE);
                }
            } else {
                log.info(requestId, "scale for stream {}/{} started successfully", new Object[]{scope, stream});
                response.setStatus(Controller.ScaleResponse.ScaleStreamStatus.STARTED);
                response.addAllSegments((Iterable)((EpochTransitionRecord)startScaleResponse.getObject()).getNewSegmentsWithRange().entrySet().stream().map(segment -> this.convert(scope, stream, (Map.Entry<Long, Map.Entry<Double, Double>>)segment)).collect(Collectors.toList()));
                response.setEpoch(((EpochTransitionRecord)startScaleResponse.getObject()).getActiveEpoch());
            }
            return response.build();
        });
    }

    public CompletableFuture<Controller.ScaleStatusResponse> checkScale(String scope, String stream, int epoch, OperationContext context) {
        CompletableFuture<EpochRecord> activeEpochFuture = this.streamMetadataStore.getActiveEpoch(scope, stream, context, true, this.executor);
        CompletableFuture<State> stateFuture = this.streamMetadataStore.getState(scope, stream, true, context, this.executor);
        CompletionStage etrFuture = this.streamMetadataStore.getEpochTransition(scope, stream, context, this.executor).thenApply(VersionedMetadata::getObject);
        return CompletableFuture.allOf(stateFuture, activeEpochFuture).handle((arg_0, arg_1) -> StreamMetadataTasks.lambda$checkScale$64(activeEpochFuture, stateFuture, (CompletableFuture)etrFuture, epoch, arg_0, arg_1));
    }

    @VisibleForTesting
    <T> CompletableFuture<T> addIndexAndSubmitTask(ControllerEvent event, Supplier<CompletableFuture<T>> futureSupplier) {
        String id = UUID.randomUUID().toString();
        return this.streamMetadataStore.addRequestToIndex(this.context.getHostId(), id, event).thenCompose(v -> Futures.handleCompose((CompletableFuture)((CompletableFuture)futureSupplier.get()), (r, e) -> {
            if (e == null || Exceptions.unwrap((Throwable)e) instanceof StoreException.StoreConnectionException || Exceptions.unwrap((Throwable)e) instanceof StoreException.WriteConflictException) {
                return ((CompletableFuture)RetryHelper.withIndefiniteRetriesAsync(() -> this.writeEvent(event), ex -> log.warn("writing event failed with {}", (Object)ex.getMessage()), this.executor).thenCompose(z -> this.streamMetadataStore.removeTaskFromIndex(this.context.getHostId(), id))).thenApply(vd -> {
                    if (e != null) {
                        throw new CompletionException((Throwable)e);
                    }
                    return r;
                });
            }
            throw new CompletionException((Throwable)e);
        }));
    }

    public CompletableFuture<Void> writeEvent(ControllerEvent event) {
        CompletableFuture<Void> result = new CompletableFuture<Void>();
        ((CompletableFuture)this.writerInitFuture.thenCompose(v -> this.requestEventWriterRef.get().writeEvent(event.getKey(), (Object)event))).whenComplete((r, e) -> {
            if (e != null) {
                log.warn("exception while posting event {} {}", (Object)e.getClass().getName(), (Object)e.getMessage());
                if (e instanceof TaskExceptions.ProcessingDisabledException) {
                    result.completeExceptionally((Throwable)e);
                } else {
                    result.completeExceptionally(new TaskExceptions.PostEventException("Failed to post event", (Throwable)e));
                }
            } else {
                log.info("event posted successfully");
                result.complete(null);
            }
        });
        return result;
    }

    @VisibleForTesting
    public void setRequestEventWriter(EventStreamWriter<ControllerEvent> requestEventWriter) {
        this.requestEventWriterRef.set(requestEventWriter);
        this.writerInitFuture.complete(null);
    }

    @VisibleForTesting
    CompletableFuture<Controller.CreateStreamStatus.Status> createStreamBody(String scope, String stream, StreamConfiguration config, long timestamp) {
        long requestId = this.requestTracker.getRequestIdFor(new String[]{"createStream", scope, stream});
        return ((CompletableFuture)this.streamMetadataStore.createStream(scope, stream, config, timestamp, null, this.executor).thenComposeAsync(response -> {
            log.info(requestId, "{}/{} created in metadata store", new Object[]{scope, stream});
            Controller.CreateStreamStatus.Status status = this.translate(response.getStatus());
            if (response.getStatus().equals((Object)CreateStreamResponse.CreateStatus.NEW) || response.getStatus().equals((Object)CreateStreamResponse.CreateStatus.EXISTS_CREATING)) {
                int startingSegmentNumber = response.getStartingSegmentNumber();
                int minNumSegments = response.getConfiguration().getScalingPolicy().getMinNumSegments();
                List<Long> newSegments = IntStream.range(startingSegmentNumber, startingSegmentNumber + minNumSegments).boxed().map(x -> StreamSegmentNameUtils.computeSegmentId((int)x, (int)0)).collect(Collectors.toList());
                return ((CompletableFuture)this.notifyNewSegments(scope, stream, response.getConfiguration(), newSegments, this.retrieveDelegationToken(), requestId).thenCompose(v -> this.createMarkStream(scope, stream, timestamp, requestId))).thenCompose(y -> {
                    OperationContext context = this.streamMetadataStore.createContext(scope, stream);
                    return TaskStepsRetryHelper.withRetries(() -> {
                        CompletableFuture<Object> future = config.getRetentionPolicy() != null ? this.bucketStore.addStreamToBucketStore(BucketStore.ServiceType.RetentionService, scope, stream, this.executor) : CompletableFuture.completedFuture(null);
                        return future.thenCompose(v -> this.streamMetadataStore.getVersionedState(scope, stream, context, this.executor).thenCompose(state -> {
                            if (((State)((Object)((Object)((Object)((Object)((Object)((Object)state.getObject()))))))).equals((Object)State.CREATING)) {
                                return this.streamMetadataStore.updateVersionedState(scope, stream, State.ACTIVE, (VersionedMetadata<State>)state, context, this.executor);
                            }
                            return CompletableFuture.completedFuture(state);
                        }));
                    }, this.executor).thenApply(z -> status);
                });
            }
            return CompletableFuture.completedFuture(status);
        }, (Executor)this.executor)).handle((result, ex) -> {
            if (ex != null) {
                Throwable cause = Exceptions.unwrap((Throwable)ex);
                if (cause instanceof StoreException.DataNotFoundException) {
                    return Controller.CreateStreamStatus.Status.SCOPE_NOT_FOUND;
                }
                log.warn(requestId, "Create stream failed due to ", new Object[]{ex});
                return Controller.CreateStreamStatus.Status.FAILURE;
            }
            return result;
        });
    }

    private CompletableFuture<Void> createMarkStream(String scope, String baseStream, long timestamp, long requestId) {
        String markStream = NameUtils.getMarkStreamForStream((String)baseStream);
        StreamConfiguration config = StreamConfiguration.builder().scalingPolicy(ScalingPolicy.fixed((int)1)).build();
        return ((CompletableFuture)this.streamMetadataStore.createStream(scope, markStream, config, timestamp, null, this.executor).thenCompose(response -> {
            long segmentId = StreamSegmentNameUtils.computeSegmentId((int)response.getStartingSegmentNumber(), (int)0);
            return this.notifyNewSegment(scope, markStream, segmentId, response.getConfiguration().getScalingPolicy(), this.retrieveDelegationToken(), requestId);
        })).thenCompose(v -> {
            OperationContext context = this.streamMetadataStore.createContext(scope, markStream);
            return this.streamMetadataStore.getVersionedState(scope, markStream, context, this.executor).thenCompose(state -> Futures.toVoid(this.streamMetadataStore.updateVersionedState(scope, markStream, State.ACTIVE, (VersionedMetadata<State>)state, context, this.executor)));
        });
    }

    private Controller.CreateStreamStatus.Status translate(CreateStreamResponse.CreateStatus status) {
        Controller.CreateStreamStatus.Status retVal;
        switch (status) {
            case NEW: {
                retVal = Controller.CreateStreamStatus.Status.SUCCESS;
                break;
            }
            case EXISTS_ACTIVE: 
            case EXISTS_CREATING: {
                retVal = Controller.CreateStreamStatus.Status.STREAM_EXISTS;
                break;
            }
            default: {
                retVal = Controller.CreateStreamStatus.Status.FAILURE;
            }
        }
        return retVal;
    }

    public CompletableFuture<Void> notifyNewSegments(String scope, String stream, List<Long> segmentIds, OperationContext context, String controllerToken) {
        return this.notifyNewSegments(scope, stream, segmentIds, context, controllerToken, 0L);
    }

    public CompletableFuture<Void> notifyNewSegments(String scope, String stream, List<Long> segmentIds, OperationContext context, String controllerToken, long requestId) {
        return TaskStepsRetryHelper.withRetries(() -> this.streamMetadataStore.getConfiguration(scope, stream, context, this.executor), this.executor).thenCompose(configuration -> this.notifyNewSegments(scope, stream, (StreamConfiguration)configuration, segmentIds, controllerToken, requestId));
    }

    public CompletableFuture<Void> notifyNewSegments(String scope, String stream, StreamConfiguration configuration, List<Long> segmentIds, String controllerToken, long requestId) {
        return Futures.toVoid((CompletableFuture)Futures.allOfWithResults(((Stream)segmentIds.stream().parallel()).map(segment -> this.notifyNewSegment(scope, stream, (long)segment, configuration.getScalingPolicy(), controllerToken, requestId)).collect(Collectors.toList())));
    }

    public CompletableFuture<Void> notifyNewSegment(String scope, String stream, long segmentId, ScalingPolicy policy, String controllerToken) {
        return Futures.toVoid(TaskStepsRetryHelper.withRetries(() -> this.segmentHelper.createSegment(scope, stream, segmentId, policy, controllerToken, 0L), this.executor));
    }

    public CompletableFuture<Void> notifyNewSegment(String scope, String stream, long segmentId, ScalingPolicy policy, String controllerToken, long requestId) {
        return Futures.toVoid(TaskStepsRetryHelper.withRetries(() -> this.segmentHelper.createSegment(scope, stream, segmentId, policy, controllerToken, requestId), this.executor));
    }

    public CompletableFuture<Void> notifyDeleteSegments(String scope, String stream, Set<Long> segmentsToDelete, String delegationToken, long requestId) {
        return Futures.allOf((Collection)((Stream)segmentsToDelete.stream().parallel()).map(segment -> this.notifyDeleteSegment(scope, stream, (long)segment, delegationToken, requestId)).collect(Collectors.toList()));
    }

    public CompletableFuture<Void> notifyDeleteSegment(String scope, String stream, long segmentId, String delegationToken, long requestId) {
        return Futures.toVoid(TaskStepsRetryHelper.withRetries(() -> this.segmentHelper.deleteSegment(scope, stream, segmentId, delegationToken, requestId), this.executor));
    }

    public CompletableFuture<Void> notifyTruncateSegment(String scope, String stream, Map.Entry<Long, Long> segmentCut, String delegationToken, long requestId) {
        return Futures.toVoid(TaskStepsRetryHelper.withRetries(() -> this.segmentHelper.truncateSegment(scope, stream, (Long)segmentCut.getKey(), (Long)segmentCut.getValue(), delegationToken, requestId), this.executor));
    }

    public CompletableFuture<Map<Long, Long>> getSealedSegmentsSize(String scope, String stream, List<Long> segments, String delegationToken) {
        return Futures.allOfWithResults(((Stream)segments.stream().parallel()).collect(Collectors.toMap(x -> x, x -> this.getSegmentOffset(scope, stream, (long)x, delegationToken))));
    }

    public CompletableFuture<Void> notifySealedSegments(String scope, String stream, List<Long> sealedSegments, String delegationToken) {
        return this.notifySealedSegments(scope, stream, sealedSegments, delegationToken, 0L);
    }

    public CompletableFuture<Void> notifySealedSegments(String scope, String stream, List<Long> sealedSegments, String delegationToken, long requestId) {
        return Futures.allOf((Collection)((Stream)sealedSegments.stream().parallel()).map(id -> this.notifySealedSegment(scope, stream, (long)id, delegationToken, requestId)).collect(Collectors.toList()));
    }

    private CompletableFuture<Void> notifySealedSegment(String scope, String stream, long sealedSegment, String delegationToken, long requestId) {
        return Futures.toVoid(TaskStepsRetryHelper.withRetries(() -> this.segmentHelper.sealSegment(scope, stream, sealedSegment, delegationToken, requestId), this.executor));
    }

    public CompletableFuture<Void> notifyPolicyUpdates(String scope, String stream, List<StreamSegmentRecord> activeSegments, ScalingPolicy policy, String delegationToken, long requestId) {
        return Futures.toVoid((CompletableFuture)Futures.allOfWithResults(((Stream)activeSegments.stream().parallel()).map(segment -> this.notifyPolicyUpdate(scope, stream, policy, segment.segmentId(), delegationToken, requestId)).collect(Collectors.toList())));
    }

    private CompletableFuture<Long> getSegmentOffset(String scope, String stream, long segmentId, String delegationToken) {
        return TaskStepsRetryHelper.withRetries(() -> this.segmentHelper.getSegmentInfo(scope, stream, segmentId, delegationToken), this.executor).thenApply(WireCommands.StreamSegmentInfo::getWriteOffset);
    }

    private CompletableFuture<Void> notifyPolicyUpdate(String scope, String stream, ScalingPolicy policy, long segmentId, String delegationToken, long requestId) {
        return TaskStepsRetryHelper.withRetries(() -> this.segmentHelper.updatePolicy(scope, stream, policy, segmentId, delegationToken, requestId), this.executor);
    }

    private Controller.SegmentRange convert(String scope, String stream, Map.Entry<Long, Map.Entry<Double, Double>> segment) {
        return ModelHelper.createSegmentRange((String)scope, (String)stream, (long)segment.getKey(), (double)segment.getValue().getKey(), (double)segment.getValue().getValue());
    }

    private Controller.UpdateStreamStatus.Status handleUpdateStreamError(Throwable ex, long requestId) {
        Throwable cause = Exceptions.unwrap((Throwable)ex);
        if (cause instanceof StoreException.DataNotFoundException) {
            return Controller.UpdateStreamStatus.Status.STREAM_NOT_FOUND;
        }
        log.warn(requestId, "Update stream failed due to ", new Object[]{cause});
        return Controller.UpdateStreamStatus.Status.FAILURE;
    }

    private Controller.DeleteStreamStatus.Status handleDeleteStreamError(Throwable ex, long requestId) {
        Throwable cause = Exceptions.unwrap((Throwable)ex);
        if (cause instanceof StoreException.DataNotFoundException) {
            return Controller.DeleteStreamStatus.Status.STREAM_NOT_FOUND;
        }
        log.warn(requestId, "Delete stream failed.", new Object[]{ex});
        return Controller.DeleteStreamStatus.Status.FAILURE;
    }

    public CompletableFuture<Void> notifyTxnCommit(String scope, String stream, List<Long> segments, UUID txnId) {
        return Futures.allOf((Collection)((Stream)segments.stream().parallel()).map(segment -> this.notifyTxnCommit(scope, stream, (long)segment, txnId)).collect(Collectors.toList()));
    }

    private CompletableFuture<Controller.TxnStatus> notifyTxnCommit(String scope, String stream, long segmentNumber, UUID txnId) {
        return TaskStepsRetryHelper.withRetries(() -> this.segmentHelper.commitTransaction(scope, stream, segmentNumber, segmentNumber, txnId, this.retrieveDelegationToken()), this.executor);
    }

    public CompletableFuture<Void> notifyTxnAbort(String scope, String stream, List<Long> segments, UUID txnId) {
        return Futures.allOf((Collection)((Stream)segments.stream().parallel()).map(segment -> this.notifyTxnAbort(scope, stream, (long)segment, txnId)).collect(Collectors.toList()));
    }

    private CompletableFuture<Controller.TxnStatus> notifyTxnAbort(String scope, String stream, long segmentNumber, UUID txnId) {
        return TaskStepsRetryHelper.withRetries(() -> this.segmentHelper.abortTransaction(scope, stream, segmentNumber, txnId, this.retrieveDelegationToken()), this.executor);
    }

    public CompletableFuture<Map<Long, Long>> getCurrentSegmentSizes(String scope, String stream, List<Long> segments) {
        return Futures.allOfWithResults(segments.stream().collect(Collectors.toMap(x -> x, x -> this.getSegmentOffset(scope, stream, (long)x, this.retrieveDelegationToken()))));
    }

    @Override
    public TaskBase copyWithContext(TaskBase.Context context) {
        return new StreamMetadataTasks(this.streamMetadataStore, this.bucketStore, this.taskMetadataStore, this.segmentHelper, this.executor, context, this.authHelper, this.requestTracker);
    }

    @Override
    public void close() throws Exception {
        EventStreamWriter<ControllerEvent> writer;
        if (!this.writerInitFuture.isDone()) {
            this.writerInitFuture.cancel(true);
        }
        if ((writer = this.requestEventWriterRef.get()) != null) {
            writer.close();
        }
    }

    public String retrieveDelegationToken() {
        return this.authHelper.retrieveMasterToken();
    }

    private static /* synthetic */ Controller.ScaleStatusResponse lambda$checkScale$64(CompletableFuture activeEpochFuture, CompletableFuture stateFuture, CompletableFuture etrFuture, int epoch, Void r, Throwable ex) {
        Controller.ScaleStatusResponse.Builder response = Controller.ScaleStatusResponse.newBuilder();
        if (ex != null) {
            Throwable e = Exceptions.unwrap((Throwable)ex);
            if (e instanceof StoreException.DataNotFoundException) {
                response.setStatus(Controller.ScaleStatusResponse.ScaleStatus.INVALID_INPUT);
            } else {
                response.setStatus(Controller.ScaleStatusResponse.ScaleStatus.INTERNAL_ERROR);
            }
        } else {
            EpochRecord activeEpoch = (EpochRecord)activeEpochFuture.join();
            State state = (State)((Object)stateFuture.join());
            EpochTransitionRecord etr = (EpochTransitionRecord)etrFuture.join();
            if (epoch > activeEpoch.getEpoch()) {
                response.setStatus(Controller.ScaleStatusResponse.ScaleStatus.INVALID_INPUT);
            } else if (activeEpoch.getEpoch() == epoch || activeEpoch.getReferenceEpoch() == epoch) {
                response.setStatus(Controller.ScaleStatusResponse.ScaleStatus.IN_PROGRESS);
            } else if (epoch + 1 == activeEpoch.getReferenceEpoch() && state.equals((Object)State.SCALING) && (etr.equals(EpochTransitionRecord.EMPTY) || etr.getNewEpoch() == activeEpoch.getEpoch())) {
                response.setStatus(Controller.ScaleStatusResponse.ScaleStatus.IN_PROGRESS);
            } else {
                response.setStatus(Controller.ScaleStatusResponse.ScaleStatus.SUCCESS);
            }
        }
        return response.build();
    }

    private static /* synthetic */ Boolean lambda$isTruncated$41(CompletableFuture stateFuture, CompletableFuture configPropertyFuture, Map streamCut, Void v) {
        State state = (State)((Object)stateFuture.join());
        StreamTruncationRecord truncationRecord = (StreamTruncationRecord)configPropertyFuture.join();
        if (truncationRecord.isUpdating()) {
            return !truncationRecord.getStreamCut().equals((Object)streamCut);
        }
        return !truncationRecord.getStreamCut().equals((Object)streamCut) || !state.equals((Object)State.TRUNCATING);
    }

    private static /* synthetic */ Boolean lambda$isUpdated$12(CompletableFuture stateFuture, CompletableFuture configPropertyFuture, StreamConfiguration newConfig, Void v) {
        State state = (State)((Object)stateFuture.join());
        StreamConfigurationRecord configProperty = (StreamConfigurationRecord)configPropertyFuture.join();
        if (configProperty.isUpdating()) {
            return !configProperty.getStreamConfiguration().equals((Object)newConfig);
        }
        return !configProperty.getStreamConfiguration().equals((Object)newConfig) || !state.equals((Object)State.UPDATING);
    }
}

