/*
 * Decompiled with CFR 0.152.
 */
package io.pravega.controller.store.stream;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Lists;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import io.pravega.client.stream.StreamConfiguration;
import io.pravega.common.Exceptions;
import io.pravega.common.concurrent.Futures;
import io.pravega.common.util.CollectionHelpers;
import io.pravega.controller.metrics.TransactionMetrics;
import io.pravega.controller.store.stream.AbstractStreamMetadataStore;
import io.pravega.controller.store.stream.CreateStreamResponse;
import io.pravega.controller.store.stream.EpochTransitionOperationExceptions;
import io.pravega.controller.store.stream.ScaleMetadata;
import io.pravega.controller.store.stream.Segment;
import io.pravega.controller.store.stream.State;
import io.pravega.controller.store.stream.StoreException;
import io.pravega.controller.store.stream.Stream;
import io.pravega.controller.store.stream.TxnStatus;
import io.pravega.controller.store.stream.Version;
import io.pravega.controller.store.stream.VersionedMetadata;
import io.pravega.controller.store.stream.VersionedTransactionData;
import io.pravega.controller.store.stream.WriterTimestampResponse;
import io.pravega.controller.store.stream.ZKStreamMetadataStore;
import io.pravega.controller.store.stream.ZkOrderedStore;
import io.pravega.controller.store.stream.records.ActiveTxnRecord;
import io.pravega.controller.store.stream.records.CommittingTransactionsRecord;
import io.pravega.controller.store.stream.records.CompletedTxnRecord;
import io.pravega.controller.store.stream.records.EpochRecord;
import io.pravega.controller.store.stream.records.EpochTransitionRecord;
import io.pravega.controller.store.stream.records.HistoryTimeSeries;
import io.pravega.controller.store.stream.records.HistoryTimeSeriesRecord;
import io.pravega.controller.store.stream.records.RecordHelper;
import io.pravega.controller.store.stream.records.RetentionSet;
import io.pravega.controller.store.stream.records.SealedSegmentsMapShard;
import io.pravega.controller.store.stream.records.StateRecord;
import io.pravega.controller.store.stream.records.StreamConfigurationRecord;
import io.pravega.controller.store.stream.records.StreamCutRecord;
import io.pravega.controller.store.stream.records.StreamCutReferenceRecord;
import io.pravega.controller.store.stream.records.StreamSegmentRecord;
import io.pravega.controller.store.stream.records.StreamTruncationRecord;
import io.pravega.controller.store.stream.records.WriterMark;
import io.pravega.shared.segment.StreamSegmentNameUtils;
import java.util.AbstractMap;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class PersistentStreamBase
implements Stream {
    @SuppressFBWarnings(justification="generated code")
    private static final Logger log = LoggerFactory.getLogger(PersistentStreamBase.class);
    private final String scope;
    private final String name;
    private final AtomicInteger historyChunkSize;
    private final AtomicInteger shardSize;

    PersistentStreamBase(String scope, String name, int historyChunkSize, int shardSize) {
        this.scope = scope;
        this.name = name;
        this.historyChunkSize = new AtomicInteger(historyChunkSize);
        this.shardSize = new AtomicInteger(shardSize);
    }

    @Override
    public String getScope() {
        return this.scope;
    }

    @Override
    public String getName() {
        return this.name;
    }

    @Override
    public String getScopeName() {
        return this.scope;
    }

    @Override
    public CompletableFuture<CreateStreamResponse> create(StreamConfiguration configuration, long createTimestamp, int startingSegmentNumber) {
        return this.checkStreamExists(configuration, createTimestamp, startingSegmentNumber).thenCompose(createStreamResponse -> ((CompletableFuture)((CompletableFuture)((CompletableFuture)((CompletableFuture)((CompletableFuture)((CompletableFuture)((CompletableFuture)this.createStreamMetadata().thenCompose(v -> this.storeCreationTimeIfAbsent(createStreamResponse.getTimestamp()))).thenCompose(v -> this.createConfigurationIfAbsent(StreamConfigurationRecord.complete(this.scope, this.name, createStreamResponse.getConfiguration())))).thenCompose(v -> this.createEpochTransitionIfAbsent(EpochTransitionRecord.EMPTY))).thenCompose(v -> this.createTruncationDataIfAbsent(StreamTruncationRecord.EMPTY))).thenCompose(v -> this.createCommitTxnRecordIfAbsent(CommittingTransactionsRecord.EMPTY))).thenCompose(v -> this.createStateIfAbsent(StateRecord.builder().state(State.CREATING).build()))).thenCompose(v -> this.createHistoryRecords(startingSegmentNumber, (CreateStreamResponse)createStreamResponse))).thenApply(v -> createStreamResponse));
    }

    private CompletionStage<Void> createHistoryRecords(int startingSegmentNumber, CreateStreamResponse createStreamResponse) {
        int numSegments = createStreamResponse.getConfiguration().getScalingPolicy().getMinNumSegments();
        double keyRangeChunk = 1.0 / (double)numSegments;
        long creationTime = createStreamResponse.getTimestamp();
        ImmutableList.Builder builder = ImmutableList.builder();
        IntStream.range(0, numSegments).boxed().forEach(x -> builder.add((Object)this.newSegmentRecord(0, startingSegmentNumber + x, creationTime, (double)x.intValue() * keyRangeChunk, (double)(x + 1) * keyRangeChunk)));
        EpochRecord epoch0 = new EpochRecord(0, 0, (ImmutableList<StreamSegmentRecord>)builder.build(), creationTime);
        return ((CompletableFuture)((CompletableFuture)((CompletableFuture)this.createEpochRecord(epoch0).thenCompose(r -> this.createHistoryChunk(epoch0))).thenCompose(r -> this.createSealedSegmentSizeMapShardIfAbsent(0))).thenCompose(r -> this.createRetentionSetDataIfAbsent(new RetentionSet((ImmutableList<StreamCutReferenceRecord>)ImmutableList.of())))).thenCompose(r -> this.createCurrentEpochRecordDataIfAbsent(epoch0));
    }

    private CompletionStage<Void> createHistoryChunk(EpochRecord epoch0) {
        HistoryTimeSeriesRecord record = new HistoryTimeSeriesRecord(0, 0, (ImmutableList<StreamSegmentRecord>)ImmutableList.of(), epoch0.getSegments(), epoch0.getCreationTime());
        return this.createHistoryTimeSeriesChunk(0, record);
    }

    private CompletableFuture<Void> createHistoryTimeSeriesChunk(int chunkNumber, HistoryTimeSeriesRecord epoch) {
        ImmutableList.Builder builder = ImmutableList.builder();
        HistoryTimeSeries timeSeries = new HistoryTimeSeries((ImmutableList<HistoryTimeSeriesRecord>)builder.add((Object)epoch).build());
        return this.createHistoryTimeSeriesChunkDataIfAbsent(chunkNumber, timeSeries);
    }

    @Override
    public CompletableFuture<Void> delete() {
        return this.deleteStream();
    }

    @Override
    public CompletableFuture<Void> startTruncation(Map<Long, Long> streamCut) {
        return this.getTruncationRecord().thenCompose(existing -> {
            Preconditions.checkNotNull((Object)existing);
            Preconditions.checkArgument((!((StreamTruncationRecord)existing.getObject()).isUpdating() ? 1 : 0) != 0);
            return this.isStreamCutValid(streamCut).thenCompose(isValid -> {
                Exceptions.checkArgument((boolean)isValid, (String)"streamCut", (String)"invalid stream cut", (Object[])new Object[0]);
                return this.computeStreamCutSpan(streamCut).thenCompose(span -> {
                    StreamTruncationRecord previous = (StreamTruncationRecord)existing.getObject();
                    Exceptions.checkArgument((boolean)this.greaterThan(streamCut, (Map<StreamSegmentRecord, Integer>)span, (Map<Long, Long>)previous.getStreamCut(), (Map<StreamSegmentRecord, Integer>)previous.getSpan()), (String)"StreamCut", (String)"Supplied streamcut is behind previous truncation point", (Object[])new Object[0]);
                    return this.computeTruncationRecord(previous, streamCut, (ImmutableMap<StreamSegmentRecord, Integer>)span).thenCompose(prop -> Futures.toVoid(this.setTruncationData(new VersionedMetadata<StreamTruncationRecord>((StreamTruncationRecord)prop, existing.getVersion()))));
                });
            });
        });
    }

    private boolean greaterThan(Map<Long, Long> cut1, Map<StreamSegmentRecord, Integer> span1, Map<Long, Long> cut2, Map<StreamSegmentRecord, Integer> span2) {
        return span1.entrySet().stream().allMatch(e1 -> span2.entrySet().stream().noneMatch(e2 -> ((StreamSegmentRecord)e2.getKey()).segmentId() == ((StreamSegmentRecord)e1.getKey()).segmentId() && (Long)cut1.get(((StreamSegmentRecord)e1.getKey()).segmentId()) < (Long)cut2.get(((StreamSegmentRecord)e2.getKey()).segmentId()) || ((StreamSegmentRecord)e2.getKey()).overlaps((StreamSegmentRecord)e1.getKey()) && (Integer)e1.getValue() < (Integer)e2.getValue()));
    }

    private CompletableFuture<StreamTruncationRecord> computeTruncationRecord(StreamTruncationRecord previous, Map<Long, Long> streamCut, ImmutableMap<StreamSegmentRecord, Integer> span) {
        log.debug("computing truncation for stream {}/{}", (Object)this.scope, (Object)this.name);
        CompletableFuture<ImmutableMap<StreamSegmentRecord, Integer>> previousSpanFuture = previous.getSpan().isEmpty() ? this.getEpochRecord(0).thenApply(epoch -> this.convertToSpan((EpochRecord)epoch)) : CompletableFuture.completedFuture(previous.getSpan());
        return ((CompletableFuture)previousSpanFuture.thenCompose(spanFrom -> this.segmentsBetweenStreamCutSpans((Map<StreamSegmentRecord, Integer>)spanFrom, (Map<StreamSegmentRecord, Integer>)span))).thenCompose(segmentsBetween -> this.sizeBetweenStreamCuts((Map<Long, Long>)previous.getStreamCut(), streamCut, (Set<StreamSegmentRecord>)segmentsBetween).thenApply(sizeBetween -> {
            ImmutableSet.Builder builder = ImmutableSet.builder();
            segmentsBetween.stream().map(StreamSegmentRecord::segmentId).filter(x -> !streamCut.containsKey(x)).forEach(arg_0 -> ((ImmutableSet.Builder)builder).add(arg_0));
            return new StreamTruncationRecord((ImmutableMap<Long, Long>)ImmutableMap.copyOf((Map)streamCut), span, previous.getDeletedSegments(), (ImmutableSet<Long>)builder.build(), previous.getSizeTill() + sizeBetween, true);
        }));
    }

    @Override
    public CompletableFuture<Void> completeTruncation(VersionedMetadata<StreamTruncationRecord> record) {
        Preconditions.checkNotNull(record);
        Preconditions.checkArgument((boolean)record.getObject().isUpdating());
        StreamTruncationRecord current = record.getObject();
        if (current.isUpdating()) {
            StreamTruncationRecord completedProp = StreamTruncationRecord.complete(current);
            return Futures.toVoid(this.setTruncationData(new VersionedMetadata<StreamTruncationRecord>(completedProp, record.getVersion())));
        }
        return CompletableFuture.completedFuture(null);
    }

    @Override
    public CompletableFuture<VersionedMetadata<StreamTruncationRecord>> getTruncationRecord() {
        return this.getTruncationData(true).thenApply(data -> {
            StreamTruncationRecord truncationRecord = (StreamTruncationRecord)data.getObject();
            return new VersionedMetadata<StreamTruncationRecord>(truncationRecord, data.getVersion());
        });
    }

    @Override
    public CompletableFuture<Void> startUpdateConfiguration(StreamConfiguration newConfiguration) {
        return this.getVersionedConfigurationRecord().thenCompose(configRecord -> {
            Preconditions.checkArgument((!((StreamConfigurationRecord)configRecord.getObject()).isUpdating() ? 1 : 0) != 0);
            StreamConfigurationRecord update = StreamConfigurationRecord.update(this.scope, this.name, newConfiguration);
            return Futures.toVoid(this.setConfigurationData(new VersionedMetadata<StreamConfigurationRecord>(update, configRecord.getVersion())));
        });
    }

    @Override
    public CompletableFuture<Void> completeUpdateConfiguration(VersionedMetadata<StreamConfigurationRecord> existing) {
        StreamConfigurationRecord current = existing.getObject();
        Preconditions.checkNotNull((Object)current);
        if (current.isUpdating()) {
            StreamConfigurationRecord newProperty = StreamConfigurationRecord.complete(this.scope, this.name, current.getStreamConfiguration());
            log.debug("Completing update configuration for stream {}/{}", (Object)this.scope, (Object)this.name);
            return Futures.toVoid(this.setConfigurationData(new VersionedMetadata<StreamConfigurationRecord>(newProperty, existing.getVersion())));
        }
        return CompletableFuture.completedFuture(null);
    }

    @Override
    public CompletableFuture<StreamConfiguration> getConfiguration() {
        return this.getConfigurationData(false).thenApply(x -> ((StreamConfigurationRecord)x.getObject()).getStreamConfiguration());
    }

    @Override
    public CompletableFuture<VersionedMetadata<StreamConfigurationRecord>> getVersionedConfigurationRecord() {
        return this.getConfigurationData(true).thenApply(data -> new VersionedMetadata(data.getObject(), data.getVersion()));
    }

    @Override
    public CompletableFuture<Void> updateState(State state) {
        return this.getStateData(true).thenCompose(currState -> {
            VersionedMetadata<State> currentState = new VersionedMetadata<State>(((StateRecord)currState.getObject()).getState(), currState.getVersion());
            return Futures.toVoid(this.updateVersionedState(currentState, state));
        });
    }

    @Override
    public CompletableFuture<VersionedMetadata<State>> getVersionedState() {
        return this.getStateData(true).thenApply(x -> new VersionedMetadata<State>(((StateRecord)x.getObject()).getState(), x.getVersion()));
    }

    @Override
    public CompletableFuture<VersionedMetadata<State>> updateVersionedState(VersionedMetadata<State> previous, State newState) {
        if (State.isTransitionAllowed(previous.getObject(), newState)) {
            return this.setStateData(new VersionedMetadata<StateRecord>(StateRecord.builder().state(newState).build(), previous.getVersion())).thenApply(updatedVersion -> new VersionedMetadata<State>(newState, (Version)updatedVersion));
        }
        return Futures.failedFuture((Throwable)StoreException.create(StoreException.Type.OPERATION_NOT_ALLOWED, "Stream: " + this.getName() + " State: " + newState.name() + " current state = " + (Object)((Object)previous.getObject())));
    }

    @Override
    public CompletableFuture<State> getState(boolean ignoreCached) {
        return this.getStateData(ignoreCached).thenApply(x -> ((StateRecord)x.getObject()).getState());
    }

    @Override
    public CompletableFuture<StreamSegmentRecord> getSegment(long segmentId) {
        int epoch = StreamSegmentNameUtils.getEpoch((long)segmentId);
        return this.getEpochRecord(epoch).thenApply(epochRecord -> {
            Optional<StreamSegmentRecord> segmentRecord = epochRecord.getSegments().stream().filter(x -> x.segmentId() == segmentId).findAny();
            return segmentRecord.orElseThrow(() -> StoreException.create(StoreException.Type.DATA_NOT_FOUND, "segment not found in epoch"));
        });
    }

    @Override
    public CompletableFuture<List<ScaleMetadata>> getScaleMetadata(long from, long to) {
        CompletableFuture<Integer> fromEpoch = this.findEpochAtTime(from, false);
        CompletableFuture<Integer> toEpoch = this.findEpochAtTime(to, false);
        CompletionStage records = CompletableFuture.allOf(fromEpoch, toEpoch).thenCompose(x -> this.fetchEpochs((Integer)fromEpoch.join(), (Integer)toEpoch.join(), false));
        return ((CompletableFuture)records).thenApply(this::mapToScaleMetadata);
    }

    private List<ScaleMetadata> mapToScaleMetadata(List<EpochRecord> epochRecords) {
        AtomicReference previous = new AtomicReference();
        return epochRecords.stream().map(record -> {
            long splits = 0L;
            long merges = 0L;
            ImmutableList<StreamSegmentRecord> segments = record.getSegments();
            if (previous.get() != null) {
                splits = this.findSegmentSplitsMerges((List)previous.get(), (List<StreamSegmentRecord>)segments);
                merges = this.findSegmentSplitsMerges((List<StreamSegmentRecord>)segments, (List)previous.get());
            }
            previous.set(segments);
            return new ScaleMetadata(record.getCreationTime(), this.transform((List<StreamSegmentRecord>)segments), splits, merges);
        }).collect(Collectors.toList());
    }

    private long findSegmentSplitsMerges(List<StreamSegmentRecord> referenceSegmentsList, List<StreamSegmentRecord> targetSegmentsList) {
        return referenceSegmentsList.stream().filter(segment -> targetSegmentsList.stream().filter(target -> target.overlaps((StreamSegmentRecord)segment)).count() > 1L).count();
    }

    private CompletableFuture<Integer> getSegmentSealedEpoch(long segmentId) {
        return this.getSegmentSealedRecordData(segmentId).handle((x, e) -> {
            if (e != null) {
                if (Exceptions.unwrap((Throwable)e) instanceof StoreException.DataNotFoundException) {
                    return -1;
                }
                throw new CompletionException((Throwable)e);
            }
            return (Integer)x.getObject();
        });
    }

    @Override
    public CompletableFuture<Set<Long>> getAllSegmentIds() {
        CompletionStage fromSpanFuture = this.getTruncationRecord().thenCompose(truncationRecord -> {
            if (((StreamTruncationRecord)truncationRecord.getObject()).equals(StreamTruncationRecord.EMPTY)) {
                return this.getEpochRecord(0).thenApply(this::convertToSpan);
            }
            return CompletableFuture.completedFuture(((StreamTruncationRecord)truncationRecord.getObject()).getSpan());
        });
        CompletionStage toSpanFuture = this.getActiveEpoch(true).thenApply(this::convertToSpan);
        return CompletableFuture.allOf(new CompletableFuture[]{fromSpanFuture, toSpanFuture}).thenCompose(arg_0 -> this.lambda$getAllSegmentIds$43((CompletableFuture)fromSpanFuture, (CompletableFuture)toSpanFuture, arg_0));
    }

    @Override
    public CompletableFuture<Map<StreamSegmentRecord, List<Long>>> getSuccessorsWithPredecessors(long segmentId) {
        return this.getSegmentSealedEpoch(segmentId).thenCompose(sealedEpoch -> {
            if (sealedEpoch < 0) {
                return this.getActiveEpoch(true).thenApply(activeSegments -> Collections.emptyMap());
            }
            CompletableFuture<EpochRecord> sealedEpochFuture = this.getEpochRecord((int)sealedEpoch);
            CompletableFuture<EpochRecord> previousEpochFuture = this.getEpochRecord(sealedEpoch - 1);
            return CompletableFuture.allOf(sealedEpochFuture, previousEpochFuture).thenApply(x -> {
                EpochRecord sealedEpochRecord = (EpochRecord)sealedEpochFuture.join();
                EpochRecord previousEpochRecord = (EpochRecord)previousEpochFuture.join();
                Optional<StreamSegmentRecord> segmentOpt = previousEpochRecord.getSegments().stream().filter(r -> r.segmentId() == segmentId).findAny();
                assert (segmentOpt.isPresent());
                StreamSegmentRecord segment = segmentOpt.get();
                List successors = sealedEpochRecord.getSegments().stream().filter(r -> r.overlaps(segment)).collect(Collectors.toList());
                return successors.stream().collect(Collectors.toMap(record -> record, z -> previousEpochRecord.getSegments().stream().filter(predecessor -> predecessor.overlaps((StreamSegmentRecord)z)).map(StreamSegmentRecord::segmentId).collect(Collectors.toList())));
            });
        });
    }

    private CompletableFuture<EpochRecord> getActiveEpochRecord(boolean ignoreCached) {
        return this.getCurrentEpochRecordData(ignoreCached).thenApply(VersionedMetadata::getObject);
    }

    @Override
    public CompletableFuture<List<StreamSegmentRecord>> getActiveSegments() {
        return this.verifyLegalState().thenCompose(v -> this.getActiveEpochRecord(true).thenApply(epochRecord -> epochRecord.getSegments()));
    }

    @Override
    public CompletableFuture<Map<StreamSegmentRecord, Long>> getSegmentsAtHead() {
        return this.getTruncationRecord().thenCompose(truncationRecord -> {
            if (((StreamTruncationRecord)truncationRecord.getObject()).equals(StreamTruncationRecord.EMPTY)) {
                return this.getSegmentsInEpoch(0).thenApply(segments -> segments.stream().collect(Collectors.toMap(x -> x, x -> 0L)));
            }
            return CompletableFuture.completedFuture(((StreamTruncationRecord)truncationRecord.getObject()).getStreamCut().entrySet().stream().collect(Collectors.toMap(x -> ((StreamTruncationRecord)truncationRecord.getObject()).getSpan().keySet().stream().filter(y -> y.segmentId() == ((Long)x.getKey()).longValue()).findFirst().get(), Map.Entry::getValue)));
        });
    }

    @Override
    public CompletableFuture<List<StreamSegmentRecord>> getSegmentsInEpoch(int epoch) {
        return this.getEpochRecord(epoch).thenApply(epochRecord -> epochRecord.getSegments());
    }

    @Override
    public CompletableFuture<List<StreamSegmentRecord>> getSegmentsBetweenStreamCuts(Map<Long, Long> from, Map<Long, Long> to) {
        return this.segmentsBetweenStreamCuts(from, to).thenApply(ArrayList::new);
    }

    private CompletableFuture<ImmutableSet<StreamSegmentRecord>> segmentsBetweenStreamCuts(Map<Long, Long> from, Map<Long, Long> to) {
        CompletableFuture<ImmutableMap<StreamSegmentRecord, Integer>> spanFromFuture = from.isEmpty() ? this.getEpochRecord(0).thenApply(this::convertToSpan) : this.computeStreamCutSpan(from);
        CompletableFuture<ImmutableMap<StreamSegmentRecord, Integer>> spanToFuture = to.isEmpty() ? this.getActiveEpochRecord(true).thenApply(this::convertToSpan) : this.computeStreamCutSpan(to);
        return CompletableFuture.allOf(spanFromFuture, spanToFuture).thenCompose(x -> {
            if (!from.isEmpty() && !to.isEmpty()) {
                Preconditions.checkArgument((boolean)RecordHelper.streamCutComparator(to, (Map)spanToFuture.join(), from, (Map)spanFromFuture.join()));
            }
            return this.segmentsBetweenStreamCutSpans((Map)spanFromFuture.join(), (Map)spanToFuture.join());
        });
    }

    @VisibleForTesting
    CompletableFuture<ImmutableSet<StreamSegmentRecord>> segmentsBetweenStreamCutSpans(Map<StreamSegmentRecord, Integer> spanFrom, Map<StreamSegmentRecord, Integer> spanTo) {
        int toLow = Collections.min(spanTo.values());
        int toHigh = Collections.max(spanTo.values());
        int fromLow = Collections.min(spanFrom.values());
        int fromHigh = Collections.max(spanFrom.values());
        HashSet segments = new HashSet();
        return ((CompletableFuture)this.fetchEpochs(fromLow, toHigh, true).thenAccept(epochs -> epochs.forEach(epoch -> {
            if (epoch.getEpoch() >= fromHigh && epoch.getEpoch() <= toLow) {
                segments.addAll(epoch.getSegments());
            } else {
                epoch.getSegments().stream().filter(x -> !segments.contains(x)).forEach(segment -> {
                    boolean greaterThanFrom = spanFrom.keySet().stream().filter(x -> x.overlaps((StreamSegmentRecord)segment)).allMatch(x -> x.segmentId() <= segment.segmentId());
                    boolean lessThanTo = spanTo.keySet().stream().filter(x -> x.overlaps((StreamSegmentRecord)segment)).allMatch(x -> segment.segmentId() <= x.segmentId());
                    if (greaterThanFrom && lessThanTo) {
                        segments.add(segment);
                    }
                });
            }
        }))).thenApply(x -> ImmutableSet.copyOf((Collection)segments));
    }

    @VisibleForTesting
    CompletableFuture<Long> sizeBetweenStreamCuts(Map<Long, Long> streamCutFrom, Map<Long, Long> streamCutTo, Set<StreamSegmentRecord> segmentsInBetween) {
        Map<Integer, List<StreamSegmentRecord>> shards = segmentsInBetween.stream().collect(Collectors.groupingBy(x -> this.getShardNumber(x.segmentId())));
        return ((CompletableFuture)Futures.allOfWithResults(shards.entrySet().stream().map(entry -> this.getSealedSegmentSizeMapShard((Integer)entry.getKey()).thenApply(shardMap -> ((List)entry.getValue()).stream().collect(Collectors.toMap(x -> x, x -> {
            if (shardMap.getSize(x.segmentId()) == null) {
                return Long.MIN_VALUE;
            }
            return shardMap.getSize(x.segmentId());
        })))).collect(Collectors.toList())).thenApply(listOfMap -> listOfMap.stream().flatMap(s -> s.entrySet().stream()).collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)))).thenApply(sizes -> {
            AtomicLong sizeTill = new AtomicLong(0L);
            sizes.forEach((segment, value) -> {
                if (streamCutTo.containsKey(segment.segmentId()) && streamCutFrom.containsKey(segment.segmentId())) {
                    sizeTill.addAndGet((Long)streamCutTo.get(segment.segmentId()) - (Long)streamCutFrom.get(segment.segmentId()));
                } else if (streamCutTo.containsKey(segment.segmentId())) {
                    sizeTill.addAndGet((Long)streamCutTo.get(segment.segmentId()));
                } else if (streamCutFrom.containsKey(segment.segmentId())) {
                    assert (value >= 0L);
                    sizeTill.addAndGet(value - (Long)streamCutFrom.get(segment.segmentId()));
                } else {
                    assert (value >= 0L);
                    sizeTill.addAndGet((long)value);
                }
            });
            return sizeTill.get();
        });
    }

    @VisibleForTesting
    CompletableFuture<ImmutableMap<StreamSegmentRecord, Integer>> computeStreamCutSpan(Map<Long, Long> streamCut) {
        long mostRecent = (Long)streamCut.keySet().stream().max(Comparator.naturalOrder()).get();
        long oldest = (Long)streamCut.keySet().stream().min(Comparator.naturalOrder()).get();
        int epochLow = StreamSegmentNameUtils.getEpoch((long)oldest);
        int epochHigh = StreamSegmentNameUtils.getEpoch((long)mostRecent);
        return this.fetchEpochs(epochLow, epochHigh, true).thenApply(epochs -> {
            ArrayList toFind = new ArrayList(streamCut.keySet());
            ImmutableMap.Builder resultSet = ImmutableMap.builder();
            for (int i = epochHigh - epochLow; i >= 0 && !toFind.isEmpty(); --i) {
                EpochRecord epochRecord = (EpochRecord)epochs.get(i);
                Set<Long> epochSegments = epochRecord.getSegmentIds();
                List found = toFind.stream().filter(epochSegments::contains).collect(Collectors.toList());
                resultSet.putAll(found.stream().collect(Collectors.toMap(x -> epochRecord.getSegments().stream().filter(z -> z.segmentId() == x.longValue()).findFirst().get(), x -> epochRecord.getEpoch())));
                toFind.removeAll(epochSegments);
            }
            return resultSet.build();
        });
    }

    @Override
    public CompletableFuture<Boolean> isStreamCutValid(Map<Long, Long> streamCut) {
        Map<Integer, List<Long>> groupByEpoch = streamCut.keySet().stream().collect(Collectors.groupingBy(StreamSegmentNameUtils::getEpoch));
        CompletableFuture segmentRangesByEpoch = Futures.allOfWithResults(groupByEpoch.entrySet().stream().map(epochGroup -> this.getEpochRecord((Integer)epochGroup.getKey()).thenApply(epochRecord -> ((List)epochGroup.getValue()).stream().map(segmentId -> {
            StreamSegmentRecord segment = epochRecord.getSegment((long)segmentId);
            return new AbstractMap.SimpleEntry<Double, Double>(segment.getKeyStart(), segment.getKeyEnd());
        }).collect(Collectors.toList()))).collect(Collectors.toList()));
        CompletionStage segmentRangesFlattened = segmentRangesByEpoch.thenApply(listOfList -> listOfList.stream().flatMap(Collection::stream).collect(Collectors.toList()));
        return ((CompletableFuture)((CompletableFuture)segmentRangesFlattened).thenAccept(x -> RecordHelper.validateStreamCut(new ArrayList<Map.Entry<Double, Double>>((Collection<Map.Entry<Double, Double>>)x)))).handle((r, e) -> {
            if (e != null) {
                if (Exceptions.unwrap((Throwable)e) instanceof IllegalArgumentException) {
                    return false;
                }
                log.warn("Exception while trying to validate a stream cut for stream {}/{}", (Object)this.scope, (Object)this.name);
                throw Exceptions.sneakyThrow((Throwable)e);
            }
            return true;
        });
    }

    @Override
    public CompletableFuture<VersionedMetadata<EpochTransitionRecord>> submitScale(List<Long> segmentsToSeal, List<Map.Entry<Double, Double>> newRanges, long scaleTimestamp, VersionedMetadata<EpochTransitionRecord> existing) {
        return ((CompletableFuture)this.verifyNotSealed().thenCompose(v -> {
            if (existing == null) {
                return this.getEpochTransition();
            }
            return CompletableFuture.completedFuture(existing);
        })).thenCompose(record -> this.getActiveEpochRecord(true).thenCompose(currentEpoch -> {
            if (!((EpochTransitionRecord)record.getObject()).equals(EpochTransitionRecord.EMPTY)) {
                if (!RecordHelper.verifyRecordMatchesInput(segmentsToSeal, newRanges, false, (EpochTransitionRecord)record.getObject())) {
                    log.debug("scale conflict, another scale operation is ongoing");
                    throw new EpochTransitionOperationExceptions.ConflictException();
                }
                return CompletableFuture.completedFuture(record);
            }
            if (!RecordHelper.canScaleFor(segmentsToSeal, currentEpoch)) {
                return this.updateEpochTransitionNode(new VersionedMetadata<EpochTransitionRecord>(EpochTransitionRecord.EMPTY, record.getVersion())).thenApply(x -> {
                    log.warn("scale precondition failed {}", (Object)segmentsToSeal);
                    throw new EpochTransitionOperationExceptions.PreConditionFailureException();
                });
            }
            if (!RecordHelper.validateInputRange(segmentsToSeal, newRanges, currentEpoch)) {
                log.error("scale input invalid {} {}", (Object)segmentsToSeal, (Object)newRanges);
                throw new EpochTransitionOperationExceptions.InputInvalidException();
            }
            EpochTransitionRecord epochTransition = RecordHelper.computeEpochTransition(currentEpoch, segmentsToSeal, newRanges, scaleTimestamp);
            return this.updateEpochTransitionNode(new VersionedMetadata<EpochTransitionRecord>(epochTransition, record.getVersion())).thenApply(version -> {
                log.info("scale for stream {}/{} accepted. Segments to seal = {}", new Object[]{this.scope, this.name, epochTransition.getSegmentsToSeal()});
                return new VersionedMetadata<EpochTransitionRecord>(epochTransition, (Version)version);
            });
        }));
    }

    private CompletableFuture<Void> verifyNotSealed() {
        return this.getState(false).thenAccept(state -> {
            if (state.equals((Object)State.SEALING) || state.equals((Object)State.SEALED)) {
                throw StoreException.create(StoreException.Type.ILLEGAL_STATE, "Stream: " + this.getName() + " State: " + state.name());
            }
        });
    }

    @Override
    public CompletableFuture<VersionedMetadata<EpochTransitionRecord>> startScale(boolean isManualScale, VersionedMetadata<EpochTransitionRecord> record, VersionedMetadata<State> state) {
        Preconditions.checkArgument((boolean)state.getObject().equals((Object)State.SCALING));
        return this.getCurrentEpochRecordData(true).thenCompose(currentEpoch -> {
            EpochRecord currentEpochRecord = (EpochRecord)currentEpoch.getObject();
            if (isManualScale) {
                return this.migrateManualScaleToNewEpoch(record, state, currentEpochRecord);
            }
            return this.discardInconsistentEpochTransition(record, state, currentEpochRecord);
        });
    }

    private CompletableFuture<VersionedMetadata<EpochTransitionRecord>> discardInconsistentEpochTransition(VersionedMetadata<EpochTransitionRecord> epochTransition, VersionedMetadata<State> state, EpochRecord currentEpoch) {
        if (epochTransition.getObject().getNewEpoch() > currentEpoch.getEpoch()) {
            return CompletableFuture.completedFuture(epochTransition);
        }
        return ((CompletableFuture)this.updateEpochTransitionNode(new VersionedMetadata<EpochTransitionRecord>(EpochTransitionRecord.EMPTY, epochTransition.getVersion())).thenCompose(v -> this.updateVersionedState(state, State.ACTIVE))).thenApply(v -> {
            log.warn("Scale epoch transition record is inconsistent with VersionedMetadata in the table. {}", (Object)((EpochTransitionRecord)epochTransition.getObject()).getNewEpoch());
            throw new IllegalStateException("Epoch transition record is inconsistent.");
        });
    }

    @Override
    public CompletableFuture<VersionedMetadata<EpochTransitionRecord>> scaleCreateNewEpoch(VersionedMetadata<EpochTransitionRecord> versionedMetadata) {
        return this.getActiveEpochRecord(true).thenCompose(currentEpoch -> {
            if (currentEpoch.getEpoch() < ((EpochTransitionRecord)versionedMetadata.getObject()).getNewEpoch()) {
                EpochTransitionRecord epochTransition = (EpochTransitionRecord)versionedMetadata.getObject();
                long time = Math.max(epochTransition.getTime(), currentEpoch.getCreationTime() + 1L);
                ImmutableList.Builder newSegmentsBuilder = ImmutableList.builder();
                epochTransition.getNewSegmentsWithRange().forEach((key, value) -> newSegmentsBuilder.add((Object)this.newSegmentRecord((long)key, time, (Double)value.getKey(), (Double)value.getValue())));
                ImmutableList.Builder sealedSegmentsBuilder = ImmutableList.builder();
                epochTransition.getSegmentsToSeal().forEach(x -> sealedSegmentsBuilder.add((Object)currentEpoch.getSegment((long)x)));
                ImmutableList.Builder builder = ImmutableList.builder();
                currentEpoch.getSegments().forEach(x -> {
                    if (!epochTransition.getSegmentsToSeal().contains((Object)x.segmentId())) {
                        builder.add(x);
                    }
                });
                ImmutableList newSegments = newSegmentsBuilder.build();
                builder.addAll((Iterable)newSegments);
                EpochRecord epochRecord = new EpochRecord(epochTransition.getNewEpoch(), epochTransition.getNewEpoch(), (ImmutableList<StreamSegmentRecord>)builder.build(), time);
                HistoryTimeSeriesRecord timeSeriesRecord = new HistoryTimeSeriesRecord(epochTransition.getNewEpoch(), epochTransition.getNewEpoch(), (ImmutableList<StreamSegmentRecord>)sealedSegmentsBuilder.build(), (ImmutableList<StreamSegmentRecord>)newSegments, epochRecord.getCreationTime());
                return ((CompletableFuture)((CompletableFuture)this.createEpochRecord(epochRecord).thenCompose(x -> this.updateHistoryTimeSeries(timeSeriesRecord))).thenCompose(x -> this.createSegmentSealedEpochRecords((Collection<Long>)epochTransition.getSegmentsToSeal(), epochTransition.getNewEpoch()))).thenApply(x -> versionedMetadata);
            }
            return CompletableFuture.completedFuture(versionedMetadata);
        });
    }

    private CompletableFuture<Void> updateHistoryTimeSeries(HistoryTimeSeriesRecord record) {
        boolean isFirst;
        int historyChunk = record.getEpoch() / this.historyChunkSize.get();
        boolean bl = isFirst = record.getEpoch() % this.historyChunkSize.get() == 0;
        if (isFirst) {
            return this.createHistoryTimeSeriesChunk(historyChunk, record);
        }
        return this.getHistoryTimeSeriesChunkData(historyChunk, true).thenCompose(x -> {
            HistoryTimeSeries historyChunkTimeSeries = (HistoryTimeSeries)x.getObject();
            if (historyChunkTimeSeries.getLatestRecord().getEpoch() < record.getEpoch()) {
                HistoryTimeSeries update = HistoryTimeSeries.addHistoryRecord(historyChunkTimeSeries, record);
                return Futures.toVoid(this.updateHistoryTimeSeriesChunkData(historyChunk, new VersionedMetadata<HistoryTimeSeries>(update, x.getVersion())));
            }
            return CompletableFuture.completedFuture(null);
        });
    }

    private CompletableFuture<VersionedMetadata<EpochTransitionRecord>> migrateManualScaleToNewEpoch(VersionedMetadata<EpochTransitionRecord> versionedMetadata, VersionedMetadata<State> versionedState, EpochRecord currentEpoch) {
        EpochTransitionRecord epochTransition = versionedMetadata.getObject();
        return this.getEpochRecord(epochTransition.getActiveEpoch()).thenCompose(epochRecordActiveEpoch -> {
            if (epochTransition.getActiveEpoch() == currentEpoch.getEpoch()) {
                return CompletableFuture.completedFuture(versionedMetadata);
            }
            if (currentEpoch.getEpoch() > epochTransition.getActiveEpoch() && currentEpoch.getReferenceEpoch() == epochRecordActiveEpoch.getReferenceEpoch()) {
                List<Long> duplicateSegmentsToSeal = epochTransition.getSegmentsToSeal().stream().map(seg -> StreamSegmentNameUtils.computeSegmentId((int)StreamSegmentNameUtils.getSegmentNumber((long)seg), (int)currentEpoch.getEpoch())).collect(Collectors.toList());
                EpochTransitionRecord updatedRecord = RecordHelper.computeEpochTransition(currentEpoch, duplicateSegmentsToSeal, Lists.newArrayList((Iterable)epochTransition.getNewSegmentsWithRange().values()), epochTransition.getTime());
                return this.updateEpochTransitionNode(new VersionedMetadata<EpochTransitionRecord>(updatedRecord, versionedMetadata.getVersion())).thenApply(v -> new VersionedMetadata<EpochTransitionRecord>(updatedRecord, (Version)v));
            }
            return ((CompletableFuture)this.updateEpochTransitionNode(new VersionedMetadata<EpochTransitionRecord>(EpochTransitionRecord.EMPTY, versionedMetadata.getVersion())).thenCompose(v -> this.updateVersionedState(versionedState, State.ACTIVE))).thenApply(v -> {
                log.warn("Scale epoch transition record is inconsistent with VersionedMetadata in the table. {}", (Object)epochTransition.getNewEpoch());
                throw new IllegalStateException("Epoch transition record is inconsistent.");
            });
        });
    }

    @Override
    public CompletableFuture<VersionedMetadata<EpochTransitionRecord>> getEpochTransition() {
        return this.getEpochTransitionNode().thenApply(x -> new VersionedMetadata(x.getObject(), x.getVersion()));
    }

    private CompletableFuture<Void> clearMarkers(Set<Long> segments) {
        return Futures.toVoid((CompletableFuture)Futures.allOfWithResults(((java.util.stream.Stream)segments.stream().parallel()).map(this::removeColdMarker).collect(Collectors.toList())));
    }

    @Override
    public CompletableFuture<Void> scaleOldSegmentsSealed(Map<Long, Long> sealedSegmentSizes, VersionedMetadata<EpochTransitionRecord> record) {
        EpochTransitionRecord epochTransition = record.getObject();
        return Futures.toVoid((CompletableFuture)((CompletableFuture)this.clearMarkers((Set<Long>)epochTransition.getSegmentsToSeal()).thenCompose(x -> this.updateSealedSegmentSizes(sealedSegmentSizes))).thenCompose(x -> this.updateCurrentEpochRecord(epochTransition.getNewEpoch())));
    }

    @Override
    public CompletableFuture<Void> completeScale(VersionedMetadata<EpochTransitionRecord> record) {
        Preconditions.checkNotNull(record);
        Preconditions.checkArgument((!record.getObject().equals(EpochTransitionRecord.EMPTY) ? 1 : 0) != 0);
        return Futures.toVoid(this.updateEpochTransitionNode(new VersionedMetadata<EpochTransitionRecord>(EpochTransitionRecord.EMPTY, record.getVersion())));
    }

    @Override
    public CompletableFuture<VersionedMetadata<CommittingTransactionsRecord>> startRollingTxn(int activeEpoch, VersionedMetadata<CommittingTransactionsRecord> existing) {
        CommittingTransactionsRecord record = existing.getObject();
        if (record.isRollingTxnRecord()) {
            return CompletableFuture.completedFuture(existing);
        }
        CommittingTransactionsRecord update = record.createRollingTxnRecord(activeEpoch);
        return this.updateCommittingTxnRecord(new VersionedMetadata<CommittingTransactionsRecord>(update, existing.getVersion())).thenApply(version -> new VersionedMetadata<CommittingTransactionsRecord>(update, (Version)version));
    }

    @Override
    public CompletableFuture<Void> rollingTxnCreateDuplicateEpochs(Map<Long, Long> sealedTxnEpochSegments, long time, VersionedMetadata<CommittingTransactionsRecord> record) {
        Preconditions.checkArgument((boolean)record.getObject().isRollingTxnRecord());
        CommittingTransactionsRecord committingTxnRecord = record.getObject();
        return this.getActiveEpoch(true).thenCompose(activeEpochRecord -> ((CompletableFuture)this.getEpochRecord(committingTxnRecord.getEpoch()).thenCompose(transactionEpochRecord -> {
            if (activeEpochRecord.getEpoch() > committingTxnRecord.getCurrentEpoch()) {
                log.debug("Duplicate Epochs {} already created. Ignore.", (Object)committingTxnRecord.getNewActiveEpoch());
                return CompletableFuture.completedFuture(null);
            }
            long timeStamp = Math.max(activeEpochRecord.getCreationTime() + 1L, time);
            ImmutableList.Builder duplicateTxnSegmentsBuilder = ImmutableList.builder();
            transactionEpochRecord.getSegments().stream().forEach(x -> duplicateTxnSegmentsBuilder.add((Object)this.newSegmentRecord(StreamSegmentNameUtils.computeSegmentId((int)StreamSegmentNameUtils.getSegmentNumber((long)x.segmentId()), (int)committingTxnRecord.getNewTxnEpoch()), timeStamp, x.getKeyStart(), x.getKeyEnd())));
            ImmutableList.Builder duplicateActiveSegmentsBuilder = ImmutableList.builder();
            activeEpochRecord.getSegments().stream().forEach(x -> duplicateActiveSegmentsBuilder.add((Object)this.newSegmentRecord(StreamSegmentNameUtils.computeSegmentId((int)StreamSegmentNameUtils.getSegmentNumber((long)x.segmentId()), (int)committingTxnRecord.getNewActiveEpoch()), timeStamp + 1L, x.getKeyStart(), x.getKeyEnd())));
            EpochRecord duplicateTxnEpoch = new EpochRecord(committingTxnRecord.getNewTxnEpoch(), transactionEpochRecord.getReferenceEpoch(), (ImmutableList<StreamSegmentRecord>)duplicateTxnSegmentsBuilder.build(), timeStamp);
            EpochRecord duplicateActiveEpoch = new EpochRecord(committingTxnRecord.getNewActiveEpoch(), activeEpochRecord.getReferenceEpoch(), (ImmutableList<StreamSegmentRecord>)duplicateActiveSegmentsBuilder.build(), timeStamp + 1L);
            HistoryTimeSeriesRecord timeSeriesRecordTxnEpoch = new HistoryTimeSeriesRecord(duplicateTxnEpoch.getEpoch(), duplicateTxnEpoch.getReferenceEpoch(), (ImmutableList<StreamSegmentRecord>)ImmutableList.of(), (ImmutableList<StreamSegmentRecord>)ImmutableList.of(), timeStamp);
            HistoryTimeSeriesRecord timeSeriesRecordActiveEpoch = new HistoryTimeSeriesRecord(duplicateActiveEpoch.getEpoch(), duplicateActiveEpoch.getReferenceEpoch(), (ImmutableList<StreamSegmentRecord>)ImmutableList.of(), (ImmutableList<StreamSegmentRecord>)ImmutableList.of(), timeStamp + 1L);
            return ((CompletableFuture)((CompletableFuture)((CompletableFuture)((CompletableFuture)this.createEpochRecord(duplicateTxnEpoch).thenCompose(x -> this.updateHistoryTimeSeries(timeSeriesRecordTxnEpoch))).thenCompose(x -> this.createEpochRecord(duplicateActiveEpoch))).thenCompose(x -> this.updateHistoryTimeSeries(timeSeriesRecordActiveEpoch))).thenCompose(x -> this.createSegmentSealedEpochRecords(activeEpochRecord.getSegments().stream().map(StreamSegmentRecord::segmentId).collect(Collectors.toList()), duplicateTxnEpoch.getEpoch()))).thenCompose(x -> this.createSegmentSealedEpochRecords(duplicateTxnEpoch.getSegments().stream().map(StreamSegmentRecord::segmentId).collect(Collectors.toList()), duplicateActiveEpoch.getEpoch()));
        })).thenCompose(r -> this.updateSealedSegmentSizes(sealedTxnEpochSegments)));
    }

    @Override
    public CompletableFuture<Void> completeRollingTxn(Map<Long, Long> sealedActiveEpochSegments, VersionedMetadata<CommittingTransactionsRecord> versionedMetadata) {
        return this.getActiveEpoch(true).thenCompose(activeEpochRecord -> {
            CommittingTransactionsRecord committingTxnRecord = (CommittingTransactionsRecord)versionedMetadata.getObject();
            int activeEpoch = committingTxnRecord.getCurrentEpoch();
            if (activeEpochRecord.getEpoch() == activeEpoch) {
                return ((CompletableFuture)this.updateSealedSegmentSizes(sealedActiveEpochSegments).thenCompose(x -> this.clearMarkers(sealedActiveEpochSegments.keySet()))).thenCompose(x -> this.updateCurrentEpochRecord(committingTxnRecord.getNewActiveEpoch()));
            }
            return CompletableFuture.completedFuture(null);
        });
    }

    @Override
    public CompletableFuture<UUID> generateNewTxnId(int msb32Bit, long lsb64Bit) {
        return this.getActiveEpochRecord(true).thenApply(epochRecord -> RecordHelper.generateTxnId(epochRecord.getReferenceEpoch(), msb32Bit, lsb64Bit));
    }

    @Override
    public CompletableFuture<VersionedTransactionData> createTransaction(UUID txnId, long lease, long maxExecutionTime) {
        long current = System.currentTimeMillis();
        long leaseTimestamp = current + lease;
        long maxExecTimestamp = current + maxExecutionTime;
        int epoch = RecordHelper.getTransactionEpoch(txnId);
        ActiveTxnRecord record = ActiveTxnRecord.builder().txnStatus(TxnStatus.OPEN).leaseExpiryTime(leaseTimestamp).txCreationTimestamp(current).maxExecutionExpiryTime(maxExecTimestamp).writerId(Optional.empty()).commitTime(Optional.empty()).commitOrder(Optional.empty()).build();
        return this.verifyNotSealed().thenCompose(v -> this.createNewTransaction(epoch, txnId, record).thenApply(version -> new VersionedTransactionData(epoch, txnId, (Version)version, TxnStatus.OPEN, current, maxExecTimestamp, "", Long.MIN_VALUE, Long.MIN_VALUE, (ImmutableMap<Long, Long>)ImmutableMap.of())));
    }

    @Override
    public CompletableFuture<VersionedTransactionData> pingTransaction(VersionedTransactionData txnData, long lease) {
        int epoch = txnData.getEpoch();
        UUID txnId = txnData.getId();
        Version version = txnData.getVersion();
        long creationTime = txnData.getCreationTime();
        long maxExecutionExpiryTime = txnData.getMaxExecutionExpiryTime();
        TxnStatus status = txnData.getStatus();
        String writerId = txnData.getWriterId();
        long commitTime = txnData.getCommitTime();
        long position = txnData.getPosition();
        ImmutableMap<Long, Long> commitOffsets = txnData.getCommitOffsets();
        ActiveTxnRecord newData = new ActiveTxnRecord(creationTime, System.currentTimeMillis() + lease, maxExecutionExpiryTime, status, writerId, commitTime, position, commitOffsets);
        VersionedMetadata<ActiveTxnRecord> data = new VersionedMetadata<ActiveTxnRecord>(newData, version);
        return this.updateActiveTx(epoch, txnId, data).thenApply(updatedVersion -> new VersionedTransactionData(epoch, txnId, (Version)updatedVersion, status, creationTime, maxExecutionExpiryTime, writerId, commitTime, position, commitOffsets));
    }

    @Override
    public CompletableFuture<VersionedTransactionData> getTransactionData(UUID txId) {
        int epoch = RecordHelper.getTransactionEpoch(txId);
        return this.getActiveTx(epoch, txId).thenApply(data -> {
            ActiveTxnRecord activeTxnRecord = (ActiveTxnRecord)data.getObject();
            return new VersionedTransactionData(epoch, txId, data.getVersion(), activeTxnRecord.getTxnStatus(), activeTxnRecord.getTxCreationTimestamp(), activeTxnRecord.getMaxExecutionExpiryTime(), activeTxnRecord.getWriterId(), activeTxnRecord.getCommitTime(), activeTxnRecord.getCommitOrder(), activeTxnRecord.getCommitOffsets());
        });
    }

    @Override
    public CompletableFuture<TxnStatus> checkTransactionStatus(UUID txId) {
        int epoch = RecordHelper.getTransactionEpoch(txId);
        return ((CompletableFuture)this.getActiveTx(epoch, txId).handle((ok, ex) -> {
            if (ex != null && Exceptions.unwrap((Throwable)ex) instanceof StoreException.DataNotFoundException) {
                return TxnStatus.UNKNOWN;
            }
            if (ex != null) {
                throw new CompletionException((Throwable)ex);
            }
            return ((ActiveTxnRecord)ok.getObject()).getTxnStatus();
        })).thenCompose(x -> {
            if (x.equals((Object)TxnStatus.UNKNOWN)) {
                return this.getCompletedTxnStatus(txId);
            }
            return CompletableFuture.completedFuture(x);
        });
    }

    private CompletableFuture<TxnStatus> getCompletedTxnStatus(UUID txId) {
        return this.getCompletedTx(txId).handle((ok, ex) -> {
            if (ex != null && Exceptions.unwrap((Throwable)ex) instanceof StoreException.DataNotFoundException) {
                return TxnStatus.UNKNOWN;
            }
            if (ex != null) {
                throw new CompletionException((Throwable)ex);
            }
            return ((CompletedTxnRecord)ok.getObject()).getCompletionStatus();
        });
    }

    @Override
    public CompletableFuture<AbstractMap.SimpleEntry<TxnStatus, Integer>> sealTransaction(UUID txId, boolean commit, Optional<Version> version, String writerId, long timestamp) {
        int epoch = RecordHelper.getTransactionEpoch(txId);
        return ((CompletableFuture)this.sealActiveTxn(epoch, txId, commit, version, writerId, timestamp).exceptionally(ex -> new AbstractMap.SimpleEntry<TxnStatus, Object>(this.handleDataNotFoundException((Throwable)ex), null))).thenCompose(pair -> {
            if (pair.getKey() == TxnStatus.UNKNOWN) {
                return this.validateCompletedTxn(txId, commit).thenApply(status -> new AbstractMap.SimpleEntry<TxnStatus, Object>((TxnStatus)((Object)((Object)status)), null));
            }
            return CompletableFuture.completedFuture(pair);
        });
    }

    private CompletableFuture<AbstractMap.SimpleEntry<TxnStatus, Integer>> sealActiveTxn(int epoch, UUID txId, boolean commit, Optional<Version> version, String writerId, long timestamp) {
        return this.getActiveTx(epoch, txId).thenCompose(data -> {
            ActiveTxnRecord txnRecord = (ActiveTxnRecord)data.getObject();
            Version dataVersion = version.orElseGet(data::getVersion);
            TxnStatus status = txnRecord.getTxnStatus();
            switch (status) {
                case OPEN: {
                    return this.sealActiveTx(epoch, txId, commit, txnRecord, dataVersion, writerId, timestamp).thenApply(y -> new AbstractMap.SimpleEntry<TxnStatus, Integer>(commit ? TxnStatus.COMMITTING : TxnStatus.ABORTING, epoch));
                }
                case COMMITTING: 
                case COMMITTED: {
                    if (commit) {
                        return CompletableFuture.completedFuture(new AbstractMap.SimpleEntry<TxnStatus, Integer>(status, epoch));
                    }
                    throw StoreException.create(StoreException.Type.ILLEGAL_STATE, "Stream: " + this.getName() + " Transaction: " + txId.toString() + " State: " + status.name());
                }
                case ABORTING: 
                case ABORTED: {
                    if (commit) {
                        throw StoreException.create(StoreException.Type.ILLEGAL_STATE, "Stream: " + this.getName() + " Transaction: " + txId.toString() + " State: " + status.name());
                    }
                    return CompletableFuture.completedFuture(new AbstractMap.SimpleEntry<TxnStatus, Integer>(status, epoch));
                }
            }
            throw StoreException.create(StoreException.Type.DATA_NOT_FOUND, "Stream: " + this.getName() + " Transaction: " + txId.toString());
        });
    }

    private CompletableFuture<Version> sealActiveTx(int epoch, UUID txId, boolean commit, ActiveTxnRecord previous, Version version, String writerId, long timestamp) {
        CompletionStage<Object> future = commit ? (!previous.getTxnStatus().equals((Object)TxnStatus.COMMITTING) ? this.addTxnToCommitOrder(txId).thenApply(position -> new ActiveTxnRecord(previous.getTxCreationTimestamp(), previous.getLeaseExpiryTime(), previous.getMaxExecutionExpiryTime(), TxnStatus.COMMITTING, writerId, timestamp, (long)position)) : CompletableFuture.completedFuture(previous)) : CompletableFuture.completedFuture(new ActiveTxnRecord(previous.getTxCreationTimestamp(), previous.getLeaseExpiryTime(), previous.getMaxExecutionExpiryTime(), TxnStatus.ABORTING));
        return future.thenCompose(updated -> {
            VersionedMetadata<ActiveTxnRecord> data = new VersionedMetadata<ActiveTxnRecord>((ActiveTxnRecord)updated, version);
            return this.updateActiveTx(epoch, txId, data);
        });
    }

    @Override
    public CompletableFuture<Void> recordCommitOffsets(UUID txnId, Map<Long, Long> commitOffsets) {
        int epoch = RecordHelper.getTransactionEpoch(txnId);
        return Futures.exceptionallyExpecting((CompletableFuture)this.getActiveTx(epoch, txnId).thenCompose(txnRecord -> {
            ActiveTxnRecord activeTxnRecord = (ActiveTxnRecord)txnRecord.getObject();
            Preconditions.checkArgument((boolean)activeTxnRecord.getTxnStatus().equals((Object)TxnStatus.COMMITTING));
            if (activeTxnRecord.getCommitOffsets().isEmpty()) {
                ActiveTxnRecord updated = new ActiveTxnRecord(activeTxnRecord.getTxCreationTimestamp(), activeTxnRecord.getLeaseExpiryTime(), activeTxnRecord.getMaxExecutionExpiryTime(), TxnStatus.COMMITTING, activeTxnRecord.getWriterId(), activeTxnRecord.getCommitTime(), activeTxnRecord.getCommitOrder(), (ImmutableMap<Long, Long>)ImmutableMap.copyOf((Map)commitOffsets));
                return Futures.toVoid(this.updateActiveTx(epoch, txnId, new VersionedMetadata<ActiveTxnRecord>(updated, txnRecord.getVersion())));
            }
            return CompletableFuture.completedFuture(null);
        }), AbstractStreamMetadataStore.DATA_NOT_FOUND_PREDICATE, null);
    }

    CompletableFuture<Void> generateMarksForTransactions(CommittingTransactionsRecord committingTransactionsRecord) {
        return Futures.allOf((Collection)committingTransactionsRecord.getTransactionsToCommit().stream().map(txId -> {
            int epoch = RecordHelper.getTransactionEpoch(txId);
            CompletionStage future = this.getActiveTx(epoch, (UUID)txId).thenCompose(txnRecord -> {
                if (txnRecord != null && !Strings.isNullOrEmpty((String)((ActiveTxnRecord)txnRecord.getObject()).getWriterId()) && ((ActiveTxnRecord)txnRecord.getObject()).getCommitTime() >= 0L && !((ActiveTxnRecord)txnRecord.getObject()).getCommitOffsets().isEmpty()) {
                    ActiveTxnRecord record = (ActiveTxnRecord)txnRecord.getObject();
                    return Futures.toVoid(this.noteWriterMark(record.getWriterId(), record.getCommitTime(), (Map<Long, Long>)record.getCommitOffsets()));
                }
                return CompletableFuture.completedFuture(null);
            });
            return Futures.exceptionallyExpecting((CompletableFuture)future, AbstractStreamMetadataStore.DATA_NOT_FOUND_PREDICATE, null);
        }).collect(Collectors.toList()));
    }

    @VisibleForTesting
    public CompletableFuture<TxnStatus> commitTransaction(UUID txId) {
        int epoch = RecordHelper.getTransactionEpoch(txId);
        return ((CompletableFuture)((CompletableFuture)((CompletableFuture)this.checkTransactionStatus(txId).thenApply(x -> {
            switch (x) {
                case COMMITTING: 
                case COMMITTED: {
                    return x;
                }
                case OPEN: 
                case ABORTING: 
                case ABORTED: {
                    throw StoreException.create(StoreException.Type.ILLEGAL_STATE, "Stream: " + this.getName() + " Transaction: " + txId.toString() + " State: " + x.toString());
                }
            }
            throw StoreException.create(StoreException.Type.DATA_NOT_FOUND, "Stream: " + this.getName() + " Transaction: " + txId.toString());
        })).thenCompose(x -> {
            if (x.equals((Object)TxnStatus.COMMITTING)) {
                return this.createCompletedTxEntry(txId, new CompletedTxnRecord(System.currentTimeMillis(), TxnStatus.COMMITTED));
            }
            return CompletableFuture.completedFuture(null);
        })).thenCompose(x -> this.removeActiveTxEntry(epoch, txId))).thenApply(x -> TxnStatus.COMMITTED);
    }

    @Override
    public CompletableFuture<TxnStatus> abortTransaction(UUID txId) {
        int epoch = RecordHelper.getTransactionEpoch(txId);
        return ((CompletableFuture)((CompletableFuture)((CompletableFuture)this.checkTransactionStatus(txId).thenApply(x -> {
            switch (x) {
                case ABORTING: 
                case ABORTED: {
                    return x;
                }
                case COMMITTING: 
                case OPEN: 
                case COMMITTED: {
                    throw StoreException.create(StoreException.Type.ILLEGAL_STATE, "Stream: " + this.getName() + " Transaction: " + txId.toString() + " State: " + x.name());
                }
            }
            throw StoreException.create(StoreException.Type.DATA_NOT_FOUND, "Stream: " + this.getName() + " Transaction: " + txId.toString());
        })).thenCompose(x -> {
            if (x.equals((Object)TxnStatus.ABORTING)) {
                return this.createCompletedTxEntry(txId, new CompletedTxnRecord(System.currentTimeMillis(), TxnStatus.ABORTED));
            }
            return CompletableFuture.completedFuture(null);
        })).thenCompose(y -> this.removeActiveTxEntry(epoch, txId))).thenApply(y -> TxnStatus.ABORTED);
    }

    private TxnStatus handleDataNotFoundException(Throwable ex) {
        if (Exceptions.unwrap((Throwable)ex) instanceof StoreException.DataNotFoundException) {
            return TxnStatus.UNKNOWN;
        }
        throw ex;
    }

    private CompletableFuture<TxnStatus> validateCompletedTxn(UUID txId, boolean commit) {
        return this.getCompletedTxnStatus(txId).thenApply(status -> {
            if (commit && status == TxnStatus.COMMITTED || !commit && status == TxnStatus.ABORTED) {
                return status;
            }
            if (status == TxnStatus.UNKNOWN) {
                throw StoreException.create(StoreException.Type.DATA_NOT_FOUND, "Stream: " + this.getName() + " Transaction: " + txId.toString());
            }
            throw StoreException.create(StoreException.Type.ILLEGAL_STATE, "Stream: " + this.getName() + " Transaction: " + txId.toString() + " State: " + status.name());
        });
    }

    @Override
    public CompletableFuture<EpochRecord> getActiveEpoch(boolean ignoreCached) {
        return this.getCurrentEpochRecordData(ignoreCached).thenApply(VersionedMetadata::getObject);
    }

    @Override
    public CompletableFuture<EpochRecord> getEpochRecord(int epoch) {
        return this.getEpochRecordData(epoch).thenApply(VersionedMetadata::getObject);
    }

    @Override
    public CompletableFuture<Void> setColdMarker(long segmentId, long timestamp) {
        return this.getMarkerData(segmentId).thenCompose(x -> {
            if (x != null) {
                VersionedMetadata<Long> data = new VersionedMetadata<Long>(timestamp, x.getVersion());
                return Futures.toVoid(this.updateMarkerData(segmentId, data));
            }
            return this.createMarkerData(segmentId, timestamp);
        });
    }

    @Override
    public CompletableFuture<Long> getColdMarker(long segmentId) {
        return this.getMarkerData(segmentId).thenApply(x -> x != null ? (Long)x.getObject() : Long.valueOf(0L));
    }

    @Override
    public CompletableFuture<Void> removeColdMarker(long segmentId) {
        return this.removeMarkerData(segmentId);
    }

    @Override
    public CompletableFuture<Long> getSizeTillStreamCut(Map<Long, Long> streamCut, Optional<StreamCutRecord> reference) {
        Map<Long, Long> referenceStreamCut = reference.map(StreamCutRecord::getStreamCut).orElse(Collections.emptyMap());
        return this.segmentsBetweenStreamCuts(referenceStreamCut, streamCut).thenCompose(segmentsInBetween -> this.sizeBetweenStreamCuts(referenceStreamCut, streamCut, (Set<StreamSegmentRecord>)segmentsInBetween).thenApply(sizeBetween -> sizeBetween + reference.map(StreamCutRecord::getRecordingSize).orElse(0L)));
    }

    @Override
    public CompletableFuture<Void> addStreamCutToRetentionSet(StreamCutRecord record) {
        return this.getRetentionSetData().thenCompose(data -> {
            RetentionSet retention = (RetentionSet)data.getObject();
            RetentionSet update = RetentionSet.addReferenceToStreamCutIfLatest(retention, record);
            return this.createStreamCutRecordData(record.getRecordingTime(), record).thenCompose(v -> Futures.toVoid(this.updateRetentionSetData(new VersionedMetadata<RetentionSet>(update, data.getVersion()))));
        });
    }

    @Override
    public CompletableFuture<RetentionSet> getRetentionSet() {
        return this.getRetentionSetData().thenApply(VersionedMetadata::getObject);
    }

    @Override
    public CompletableFuture<StreamCutRecord> getStreamCutRecord(StreamCutReferenceRecord record) {
        return this.getStreamCutRecordData(record.getRecordingTime()).thenApply(VersionedMetadata::getObject);
    }

    @Override
    public CompletableFuture<Void> deleteStreamCutBefore(StreamCutReferenceRecord record) {
        return this.getRetentionSetData().thenCompose(data -> {
            RetentionSet retention = (RetentionSet)data.getObject();
            RetentionSet update = RetentionSet.removeStreamCutBefore(retention, record);
            List<StreamCutReferenceRecord> toRemove = retention.retentionRecordsBefore(record);
            return Futures.allOf((Collection)toRemove.stream().map(x -> this.deleteStreamCutRecordData(x.getRecordingTime())).collect(Collectors.toList())).thenCompose(x -> Futures.toVoid(this.updateRetentionSetData(new VersionedMetadata<RetentionSet>(update, data.getVersion()))));
        });
    }

    @Override
    public CompletableFuture<VersionedMetadata<CommittingTransactionsRecord>> startCommittingTransactions() {
        return this.getVersionedCommitTransactionsRecord().thenCompose(versioned -> {
            if (((CommittingTransactionsRecord)versioned.getObject()).equals(CommittingTransactionsRecord.EMPTY)) {
                return this.getOrderedCommittingTxnInLowestEpoch().thenCompose(list -> {
                    if (list.isEmpty()) {
                        return CompletableFuture.completedFuture(versioned);
                    }
                    Map.Entry firstEntry = (Map.Entry)list.get(0);
                    ImmutableList.Builder txIdList = ImmutableList.builder();
                    list.forEach(x -> txIdList.add(x.getKey()));
                    List positions = list.stream().map(x -> ((ActiveTxnRecord)x.getValue()).getCommitOrder()).collect(Collectors.toList());
                    int epoch = RecordHelper.getTransactionEpoch((UUID)firstEntry.getKey());
                    CommittingTransactionsRecord record = new CommittingTransactionsRecord(epoch, (ImmutableList<UUID>)txIdList.build());
                    return this.updateCommittingTxnRecord(new VersionedMetadata<CommittingTransactionsRecord>(record, versioned.getVersion())).thenCompose(version -> this.removeTxnsFromCommitOrder(positions).thenApply(v -> new VersionedMetadata<CommittingTransactionsRecord>(record, (Version)version)));
                });
            }
            return CompletableFuture.completedFuture(versioned);
        });
    }

    @Override
    public CompletableFuture<VersionedMetadata<CommittingTransactionsRecord>> getVersionedCommitTransactionsRecord() {
        return this.getCommitTxnRecord().thenApply(r -> new VersionedMetadata(r.getObject(), r.getVersion()));
    }

    @Override
    public CompletableFuture<Void> completeCommittingTransactions(VersionedMetadata<CommittingTransactionsRecord> record) {
        CompletionStage<Void> future = this.generateMarksForTransactions(record.getObject());
        for (UUID txnId : record.getObject().getTransactionsToCommit()) {
            log.debug("Committing transaction {} on stream {}/{}", new Object[]{txnId, this.scope, this.name});
            future = future.thenCompose(x -> this.commitTransaction(txnId).thenAccept(done -> log.debug("transaction {} on stream {}/{} committed successfully", new Object[]{txnId, this.scope, this.name})));
        }
        return ((CompletableFuture)future.thenCompose(x -> this.getNumberOfOngoingTransactions().thenAccept(count -> TransactionMetrics.reportOpenTransactions(this.getScope(), this.getName(), count)))).thenCompose(x -> Futures.toVoid(this.updateCommittingTxnRecord(new VersionedMetadata<CommittingTransactionsRecord>(CommittingTransactionsRecord.EMPTY, record.getVersion()))));
    }

    @Override
    public CompletableFuture<Void> createWaitingRequestIfAbsent(String processorName) {
        return this.createWaitingRequestNodeIfAbsent(processorName);
    }

    @Override
    public CompletableFuture<String> getWaitingRequestProcessor() {
        return this.getWaitingRequestNode().handle((data, e) -> {
            if (e != null) {
                if (Exceptions.unwrap((Throwable)e) instanceof StoreException.DataNotFoundException) {
                    return null;
                }
                throw new CompletionException((Throwable)e);
            }
            return data;
        });
    }

    @Override
    public CompletableFuture<Void> deleteWaitingRequestConditionally(String processorName) {
        return this.getWaitingRequestProcessor().thenCompose(waitingRequest -> {
            if (waitingRequest != null && waitingRequest.equals(processorName)) {
                return this.deleteWaitingRequestNode();
            }
            return CompletableFuture.completedFuture(null);
        });
    }

    @Override
    public CompletableFuture<WriterTimestampResponse> noteWriterMark(String writer, long timestamp, Map<Long, Long> position) {
        ImmutableMap newPosition = ImmutableMap.copyOf(position);
        return Futures.exceptionallyExpecting(this.getWriterMarkRecord(writer), AbstractStreamMetadataStore.DATA_NOT_FOUND_PREDICATE, null).thenCompose(record -> {
            if (record == null) {
                return ((CompletableFuture)this.createWriterMarkRecord(writer, timestamp, (ImmutableMap<Long, Long>)newPosition).exceptionally(e -> {
                    if (Exceptions.unwrap((Throwable)e) instanceof StoreException.DataExistsException) {
                        throw StoreException.create(StoreException.Type.WRITE_CONFLICT, "writer mark exists");
                    }
                    throw new CompletionException((Throwable)e);
                })).thenApply(v -> WriterTimestampResponse.SUCCESS);
            }
            if (((WriterMark)record.getObject()).getTimestamp() > timestamp) {
                return CompletableFuture.completedFuture(WriterTimestampResponse.INVALID_TIME);
            }
            if (!this.compareWriterPositions((Map<Long, Long>)((WriterMark)record.getObject()).getPosition(), (Map<Long, Long>)newPosition)) {
                return CompletableFuture.completedFuture(WriterTimestampResponse.INVALID_POSITION);
            }
            return this.updateWriterMarkRecord(writer, timestamp, (ImmutableMap<Long, Long>)newPosition, true, record.getVersion()).thenApply(v -> WriterTimestampResponse.SUCCESS);
        });
    }

    @Override
    public CompletableFuture<Void> shutdownWriter(String writer) {
        return this.getWriterMarkRecord(writer).thenCompose(record -> this.updateWriterMarkRecord(writer, ((WriterMark)record.getObject()).getTimestamp(), ((WriterMark)record.getObject()).getPosition(), false, record.getVersion()));
    }

    @Override
    public CompletableFuture<Void> removeWriter(String writer, WriterMark writerMark) {
        return Futures.exceptionallyExpecting(this.getWriterMarkRecord(writer), AbstractStreamMetadataStore.DATA_NOT_FOUND_PREDICATE, null).thenCompose(record -> {
            if (record == null) {
                return CompletableFuture.completedFuture(null);
            }
            if (writerMark.equals(record.getObject())) {
                return this.removeWriterRecord(writer, record.getVersion());
            }
            throw StoreException.create(StoreException.Type.WRITE_CONFLICT, "Writer mark supplied for removal doesn't match stored writer mark");
        });
    }

    @VisibleForTesting
    boolean compareWriterPositions(Map<Long, Long> position1, Map<Long, Long> position2) {
        boolean compareMaxes;
        long maxInPos2 = position2.keySet().stream().filter(position1::containsKey).max(Long::compare).orElse(Long.MIN_VALUE);
        long maxInPos1 = position1.keySet().stream().filter(position2::containsKey).max(Long::compare).orElse(Long.MIN_VALUE);
        boolean bl = compareMaxes = maxInPos2 >= maxInPos1;
        if (compareMaxes) {
            return position2.entrySet().stream().filter(x -> position1.containsKey(x.getKey())).allMatch(x -> (Long)x.getValue() >= (Long)position1.get(x.getKey()));
        }
        return false;
    }

    @Override
    public CompletableFuture<WriterMark> getWriterMark(String writer) {
        return this.getWriterMarkRecord(writer).thenApply(VersionedMetadata::getObject);
    }

    protected CompletableFuture<List<Map.Entry<UUID, ActiveTxnRecord>>> getOrderedCommittingTxnInLowestEpochHelper(ZkOrderedStore txnCommitOrderer, Executor executor) {
        return Futures.exceptionallyExpecting(txnCommitOrderer.getEntitiesWithPosition(this.getScope(), this.getName()), AbstractStreamMetadataStore.DATA_NOT_FOUND_PREDICATE, Collections.emptyMap()).thenCompose(allTxns -> {
            Map<Integer, List<Map.Entry>> groupByEpoch = allTxns.entrySet().stream().collect(Collectors.groupingBy(x -> RecordHelper.getTransactionEpoch(UUID.fromString((String)x.getValue()))));
            Iterator iterator = groupByEpoch.entrySet().stream().sorted(Comparator.comparingInt(Map.Entry::getKey)).iterator();
            ConcurrentSkipListSet toPurge = new ConcurrentSkipListSet();
            ConcurrentHashMap transactionsMap = new ConcurrentHashMap();
            return ((CompletableFuture)Futures.loop(() -> iterator.hasNext() && transactionsMap.isEmpty(), () -> this.processTransactionsInEpoch((Map.Entry)iterator.next(), toPurge, transactionsMap), (Executor)executor).thenCompose(v -> txnCommitOrderer.removeEntities(this.getScope(), this.getName(), toPurge))).thenApply(v -> transactionsMap.entrySet().stream().sorted(Comparator.comparing(x -> ((ActiveTxnRecord)x.getValue()).getCommitOrder())).collect(Collectors.toList()));
        });
    }

    protected CompletableFuture<Map<Long, UUID>> getAllOrderedCommittingTxnsHelper(ZkOrderedStore txnCommitOrderer) {
        return Futures.exceptionallyExpecting(txnCommitOrderer.getEntitiesWithPosition(this.getScope(), this.getName()), (Predicate)ZKStreamMetadataStore.DATA_NOT_FOUND_PREDICATE, Collections.emptyMap()).thenApply(map -> map.entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, x -> UUID.fromString((String)x.getValue()))));
    }

    private CompletableFuture<Void> processTransactionsInEpoch(Map.Entry<Integer, List<Map.Entry<Long, String>>> nextEpoch, ConcurrentSkipListSet<Long> toPurge, ConcurrentHashMap<UUID, ActiveTxnRecord> transactionsMap) {
        int epoch = nextEpoch.getKey();
        List<Map.Entry<Long, String>> txnIds = nextEpoch.getValue();
        return Futures.allOf((Collection)txnIds.stream().map(txnIdOrder -> {
            UUID txnId = UUID.fromString((String)txnIdOrder.getValue());
            long order = (Long)txnIdOrder.getKey();
            return Futures.exceptionallyExpecting((CompletableFuture)this.getActiveTx(epoch, txnId).thenApply(VersionedMetadata::getObject), (Predicate)ZKStreamMetadataStore.DATA_NOT_FOUND_PREDICATE, (Object)ActiveTxnRecord.EMPTY).thenAccept(txnRecord -> {
                switch (txnRecord.getTxnStatus()) {
                    case COMMITTING: {
                        if (txnRecord.getCommitOrder() == order) {
                            transactionsMap.put(txnId, (ActiveTxnRecord)txnRecord);
                            break;
                        }
                        log.debug("duplicate txn {} at position {}. removing {}", new Object[]{txnId, txnRecord.getCommitOrder(), order});
                        toPurge.add(order);
                        break;
                    }
                    case OPEN: {
                        break;
                    }
                    case COMMITTED: 
                    case ABORTING: 
                    case ABORTED: 
                    case UNKNOWN: {
                        log.debug("stale txn {} with status. removing {}", new Object[]{txnId, txnRecord.getTxnStatus(), order});
                        toPurge.add(order);
                    }
                }
            });
        }).collect(Collectors.toList()));
    }

    private CompletableFuture<Void> verifyLegalState() {
        return this.getState(false).thenApply(state -> {
            if (state == null || state.equals((Object)State.UNKNOWN) || state.equals((Object)State.CREATING)) {
                throw StoreException.create(StoreException.Type.ILLEGAL_STATE, "Stream: " + this.getName() + " State: " + state.name());
            }
            return null;
        });
    }

    private CompletableFuture<Void> createEpochRecord(EpochRecord epoch) {
        return this.createEpochRecordDataIfAbsent(epoch.getEpoch(), epoch);
    }

    private CompletableFuture<Void> updateCurrentEpochRecord(int newActiveEpoch) {
        return this.getEpochRecord(newActiveEpoch).thenCompose(epochRecord -> this.getCurrentEpochRecordData(true).thenCompose(currentEpochRecordData -> {
            EpochRecord existing = (EpochRecord)currentEpochRecordData.getObject();
            if (existing.getEpoch() < newActiveEpoch) {
                return Futures.toVoid(this.updateCurrentEpochRecordData(new VersionedMetadata<EpochRecord>((EpochRecord)epochRecord, currentEpochRecordData.getVersion())));
            }
            return CompletableFuture.completedFuture(null);
        }));
    }

    private CompletableFuture<Void> createSealedSegmentSizeMapShardIfAbsent(int shardNumber) {
        SealedSegmentsMapShard shard = SealedSegmentsMapShard.builder().shardNumber(shardNumber).sealedSegmentsSizeMap(Collections.emptyMap()).build();
        return this.createSealedSegmentSizesMapShardDataIfAbsent(shardNumber, shard);
    }

    @VisibleForTesting
    CompletableFuture<SealedSegmentsMapShard> getSealedSegmentSizeMapShard(int shard) {
        return this.getSealedSegmentSizesMapShardData(shard).handle((r, e) -> {
            if (e != null) {
                if (Exceptions.unwrap((Throwable)e) instanceof StoreException.DataNotFoundException) {
                    return SealedSegmentsMapShard.builder().shardNumber(shard).sealedSegmentsSizeMap(Collections.emptyMap()).build();
                }
                throw new CompletionException((Throwable)e);
            }
            return (SealedSegmentsMapShard)r.getObject();
        });
    }

    private CompletableFuture<Void> updateSealedSegmentSizes(Map<Long, Long> sealedSegmentSizes) {
        Map<Integer, List<Long>> shards = sealedSegmentSizes.keySet().stream().collect(Collectors.groupingBy(this::getShardNumber));
        return Futures.allOf((Collection)shards.entrySet().stream().map(x -> {
            int shard = (Integer)x.getKey();
            List segments = (List)x.getValue();
            return Futures.exceptionallyComposeExpecting(this.getSealedSegmentSizesMapShardData(shard), AbstractStreamMetadataStore.DATA_NOT_FOUND_PREDICATE, () -> this.createSealedSegmentSizeMapShardIfAbsent(shard).thenCompose(v -> this.getSealedSegmentSizesMapShardData(shard))).thenCompose(mapShardData -> {
                SealedSegmentsMapShard mapShard = (SealedSegmentsMapShard)mapShardData.getObject();
                segments.forEach(z -> mapShard.addSealedSegmentSize((long)z, (Long)sealedSegmentSizes.get(z)));
                return this.updateSealedSegmentSizesMapShardData(shard, new VersionedMetadata<SealedSegmentsMapShard>(mapShard, mapShardData.getVersion()));
            });
        }).collect(Collectors.toList()));
    }

    private int getShardNumber(long segmentId) {
        return StreamSegmentNameUtils.getEpoch((long)segmentId) / this.shardSize.get();
    }

    private ImmutableMap<StreamSegmentRecord, Integer> convertToSpan(EpochRecord epochRecord) {
        ImmutableMap.Builder builder = ImmutableMap.builder();
        epochRecord.getSegments().forEach(x -> builder.put(x, (Object)epochRecord.getEpoch()));
        return builder.build();
    }

    private Segment transform(StreamSegmentRecord segmentRecord) {
        return new Segment(segmentRecord.segmentId(), segmentRecord.getCreationTime(), segmentRecord.getKeyStart(), segmentRecord.getKeyEnd());
    }

    private List<Segment> transform(List<StreamSegmentRecord> segmentRecords) {
        return segmentRecords.stream().map(this::transform).collect(Collectors.toList());
    }

    @VisibleForTesting
    CompletableFuture<List<EpochRecord>> fetchEpochs(int fromEpoch, int toEpoch, boolean ignoreCache) {
        return ((CompletableFuture)((CompletableFuture)this.getActiveEpochRecord(ignoreCache).thenApply(currentEpoch -> currentEpoch.getEpoch() / this.historyChunkSize.get())).thenCompose(latestChunkNumber -> Futures.allOfWithResults(IntStream.range(fromEpoch / this.historyChunkSize.get(), toEpoch / this.historyChunkSize.get() + 1).mapToObj(i -> {
            int firstEpoch = i * this.historyChunkSize.get() > fromEpoch ? i * this.historyChunkSize.get() : fromEpoch;
            boolean ignoreCached = i >= latestChunkNumber;
            return this.getEpochsFromHistoryChunk(i, firstEpoch, toEpoch, ignoreCached);
        }).collect(Collectors.toList())))).thenApply(c -> c.stream().flatMap(Collection::stream).collect(Collectors.toList()));
    }

    private CompletableFuture<List<EpochRecord>> getEpochsFromHistoryChunk(int chunk, int firstEpoch, int toEpoch, boolean ignoreCached) {
        return this.getEpochRecord(firstEpoch).thenCompose(first -> this.getHistoryTimeSeriesChunk(chunk, ignoreCached).thenCompose(x -> {
            ArrayList<CompletableFuture<EpochRecord>> identity = new ArrayList<CompletableFuture<EpochRecord>>();
            identity.add(CompletableFuture.completedFuture(first));
            return Futures.allOfWithResults((List)x.getHistoryRecords().stream().filter(r -> r.getEpoch() > firstEpoch && r.getEpoch() <= toEpoch).reduce(identity, (r, s) -> {
                CompletableFuture<EpochRecord> next = this.newEpochRecord((CompletableFuture)r.get(r.size() - 1), s.getEpoch(), s.getReferenceEpoch(), (Collection<StreamSegmentRecord>)s.getSegmentsCreated(), s.getSegmentsSealed().stream().map(StreamSegmentRecord::segmentId).collect(Collectors.toList()), s.getScaleTime());
                ArrayList<CompletableFuture<EpochRecord>> list = new ArrayList<CompletableFuture<EpochRecord>>((Collection<CompletableFuture<EpochRecord>>)r);
                list.add(next);
                return list;
            }, (r, s) -> {
                ArrayList list = new ArrayList(r);
                list.addAll(s);
                return list;
            }));
        }));
    }

    private CompletableFuture<EpochRecord> newEpochRecord(CompletableFuture<EpochRecord> lastRecordFuture, int epoch, int referenceEpoch, Collection<StreamSegmentRecord> createdSegments, Collection<Long> sealedSegments, long time) {
        if (epoch == referenceEpoch) {
            return lastRecordFuture.thenApply(lastRecord -> {
                assert (lastRecord.getEpoch() == epoch - 1);
                ImmutableList.Builder segmentsBuilder = ImmutableList.builder();
                lastRecord.getSegments().forEach(segment -> {
                    if (!sealedSegments.contains(segment.segmentId())) {
                        segmentsBuilder.add(segment);
                    }
                });
                segmentsBuilder.addAll((Iterable)createdSegments);
                return new EpochRecord(epoch, referenceEpoch, (ImmutableList<StreamSegmentRecord>)segmentsBuilder.build(), time);
            });
        }
        return this.getEpochRecord(epoch);
    }

    private StreamSegmentRecord newSegmentRecord(long segmentId, long time, Double low, Double high) {
        return this.newSegmentRecord(StreamSegmentNameUtils.getEpoch((long)segmentId), StreamSegmentNameUtils.getSegmentNumber((long)segmentId), time, low, high);
    }

    private StreamSegmentRecord newSegmentRecord(int epoch, int segmentNumber, long time, Double low, Double high) {
        return StreamSegmentRecord.builder().creationEpoch(epoch).segmentNumber(segmentNumber).creationTime(time).keyStart(low).keyEnd(high).build();
    }

    @VisibleForTesting
    CompletableFuture<Integer> findEpochAtTime(long timestamp, boolean ignoreCached) {
        return this.getActiveEpoch(ignoreCached).thenCompose(activeEpoch -> this.searchEpochAtTime(0, activeEpoch.getEpoch() / this.historyChunkSize.get(), x -> x == activeEpoch.getEpoch() / this.historyChunkSize.get(), timestamp).thenApply(epoch -> {
            if (epoch == -1) {
                if (timestamp > activeEpoch.getCreationTime()) {
                    return activeEpoch.getEpoch();
                }
                return 0;
            }
            return epoch;
        }));
    }

    private CompletableFuture<Integer> searchEpochAtTime(int lowest, int highest, Predicate<Integer> ignoreCached, long timestamp) {
        int middle = (lowest + highest) / 2;
        if (lowest > highest) {
            return CompletableFuture.completedFuture(-1);
        }
        return this.getHistoryTimeSeriesChunk(middle, ignoreCached.test(middle)).thenCompose(chunk -> {
            ImmutableList<HistoryTimeSeriesRecord> historyRecords = chunk.getHistoryRecords();
            long rangeLow = ((HistoryTimeSeriesRecord)historyRecords.get(0)).getScaleTime();
            long rangeHigh = ((HistoryTimeSeriesRecord)historyRecords.get(historyRecords.size() - 1)).getScaleTime();
            if (timestamp >= rangeLow && timestamp <= rangeHigh) {
                int index = CollectionHelpers.findGreatestLowerBound(historyRecords, x -> Long.compare(timestamp, x.getScaleTime()));
                assert (index >= 0);
                return CompletableFuture.completedFuture(((HistoryTimeSeriesRecord)historyRecords.get(index)).getEpoch());
            }
            if (timestamp < rangeLow) {
                return this.searchEpochAtTime(lowest, middle - 1, ignoreCached, timestamp);
            }
            return this.searchEpochAtTime(middle + 1, highest, ignoreCached, timestamp);
        });
    }

    private CompletableFuture<HistoryTimeSeries> getHistoryTimeSeriesChunk(int chunkNumber, boolean ignoreCached) {
        return this.getHistoryTimeSeriesChunkData(chunkNumber, ignoreCached).thenCompose(x -> {
            HistoryTimeSeries timeSeries = (HistoryTimeSeries)x.getObject();
            if (!ignoreCached && timeSeries.getHistoryRecords().size() < this.historyChunkSize.get()) {
                return this.getHistoryTimeSeriesChunk(chunkNumber, true);
            }
            return CompletableFuture.completedFuture(timeSeries);
        });
    }

    abstract CompletableFuture<CreateStreamResponse> checkStreamExists(StreamConfiguration var1, long var2, int var4);

    abstract CompletableFuture<Void> createStreamMetadata();

    abstract CompletableFuture<Void> storeCreationTimeIfAbsent(long var1);

    abstract CompletableFuture<Void> deleteStream();

    abstract CompletableFuture<Void> createConfigurationIfAbsent(StreamConfigurationRecord var1);

    abstract CompletableFuture<Version> setConfigurationData(VersionedMetadata<StreamConfigurationRecord> var1);

    abstract CompletableFuture<VersionedMetadata<StreamConfigurationRecord>> getConfigurationData(boolean var1);

    abstract CompletableFuture<Void> createTruncationDataIfAbsent(StreamTruncationRecord var1);

    abstract CompletableFuture<Version> setTruncationData(VersionedMetadata<StreamTruncationRecord> var1);

    abstract CompletableFuture<VersionedMetadata<StreamTruncationRecord>> getTruncationData(boolean var1);

    abstract CompletableFuture<Void> createStateIfAbsent(StateRecord var1);

    abstract CompletableFuture<Version> setStateData(VersionedMetadata<StateRecord> var1);

    abstract CompletableFuture<VersionedMetadata<StateRecord>> getStateData(boolean var1);

    abstract CompletableFuture<Void> createRetentionSetDataIfAbsent(RetentionSet var1);

    abstract CompletableFuture<Void> createStreamCutRecordData(long var1, StreamCutRecord var3);

    abstract CompletableFuture<VersionedMetadata<StreamCutRecord>> getStreamCutRecordData(long var1);

    abstract CompletableFuture<Void> deleteStreamCutRecordData(long var1);

    abstract CompletableFuture<Version> updateRetentionSetData(VersionedMetadata<RetentionSet> var1);

    abstract CompletableFuture<VersionedMetadata<RetentionSet>> getRetentionSetData();

    abstract CompletableFuture<Void> createHistoryTimeSeriesChunkDataIfAbsent(int var1, HistoryTimeSeries var2);

    abstract CompletableFuture<VersionedMetadata<HistoryTimeSeries>> getHistoryTimeSeriesChunkData(int var1, boolean var2);

    abstract CompletableFuture<Version> updateHistoryTimeSeriesChunkData(int var1, VersionedMetadata<HistoryTimeSeries> var2);

    abstract CompletableFuture<Void> createCurrentEpochRecordDataIfAbsent(EpochRecord var1);

    abstract CompletableFuture<Version> updateCurrentEpochRecordData(VersionedMetadata<EpochRecord> var1);

    abstract CompletableFuture<VersionedMetadata<EpochRecord>> getCurrentEpochRecordData(boolean var1);

    abstract CompletableFuture<Void> createEpochRecordDataIfAbsent(int var1, EpochRecord var2);

    abstract CompletableFuture<VersionedMetadata<EpochRecord>> getEpochRecordData(int var1);

    abstract CompletableFuture<Void> createSealedSegmentSizesMapShardDataIfAbsent(int var1, SealedSegmentsMapShard var2);

    abstract CompletableFuture<VersionedMetadata<SealedSegmentsMapShard>> getSealedSegmentSizesMapShardData(int var1);

    abstract CompletableFuture<Version> updateSealedSegmentSizesMapShardData(int var1, VersionedMetadata<SealedSegmentsMapShard> var2);

    abstract CompletableFuture<Void> createSegmentSealedEpochRecords(Collection<Long> var1, int var2);

    abstract CompletableFuture<VersionedMetadata<Integer>> getSegmentSealedRecordData(long var1);

    abstract CompletableFuture<Version> createNewTransaction(int var1, UUID var2, ActiveTxnRecord var3);

    abstract CompletableFuture<VersionedMetadata<ActiveTxnRecord>> getActiveTx(int var1, UUID var2);

    abstract CompletableFuture<Version> updateActiveTx(int var1, UUID var2, VersionedMetadata<ActiveTxnRecord> var3);

    abstract CompletableFuture<Long> addTxnToCommitOrder(UUID var1);

    abstract CompletableFuture<Void> removeTxnsFromCommitOrder(List<Long> var1);

    abstract CompletableFuture<VersionedMetadata<CompletedTxnRecord>> getCompletedTx(UUID var1);

    abstract CompletableFuture<Void> removeActiveTxEntry(int var1, UUID var2);

    abstract CompletableFuture<Void> createCompletedTxEntry(UUID var1, CompletedTxnRecord var2);

    abstract CompletableFuture<Map<UUID, ActiveTxnRecord>> getTxnInEpoch(int var1);

    abstract CompletableFuture<List<Map.Entry<UUID, ActiveTxnRecord>>> getOrderedCommittingTxnInLowestEpoch();

    @VisibleForTesting
    abstract CompletableFuture<Map<Long, UUID>> getAllOrderedCommittingTxns();

    abstract CompletableFuture<Void> createMarkerData(long var1, long var3);

    abstract CompletableFuture<Version> updateMarkerData(long var1, VersionedMetadata<Long> var3);

    abstract CompletableFuture<Void> removeMarkerData(long var1);

    abstract CompletableFuture<VersionedMetadata<Long>> getMarkerData(long var1);

    abstract CompletableFuture<Void> createEpochTransitionIfAbsent(EpochTransitionRecord var1);

    abstract CompletableFuture<Version> updateEpochTransitionNode(VersionedMetadata<EpochTransitionRecord> var1);

    abstract CompletableFuture<VersionedMetadata<EpochTransitionRecord>> getEpochTransitionNode();

    abstract CompletableFuture<Void> createCommitTxnRecordIfAbsent(CommittingTransactionsRecord var1);

    abstract CompletableFuture<VersionedMetadata<CommittingTransactionsRecord>> getCommitTxnRecord();

    abstract CompletableFuture<Version> updateCommittingTxnRecord(VersionedMetadata<CommittingTransactionsRecord> var1);

    abstract CompletableFuture<Void> createWaitingRequestNodeIfAbsent(String var1);

    abstract CompletableFuture<String> getWaitingRequestNode();

    abstract CompletableFuture<Void> deleteWaitingRequestNode();

    abstract CompletableFuture<Void> createWriterMarkRecord(String var1, long var2, ImmutableMap<Long, Long> var4);

    abstract CompletableFuture<VersionedMetadata<WriterMark>> getWriterMarkRecord(String var1);

    abstract CompletableFuture<Void> updateWriterMarkRecord(String var1, long var2, ImmutableMap<Long, Long> var4, boolean var5, Version var6);

    abstract CompletableFuture<Void> removeWriterRecord(String var1, Version var2);

    private /* synthetic */ CompletionStage lambda$getAllSegmentIds$43(CompletableFuture fromSpanFuture, CompletableFuture toSpanFuture, Void v) {
        Map fromSpan = (Map)fromSpanFuture.join();
        Map toSpan = (Map)toSpanFuture.join();
        return this.segmentsBetweenStreamCutSpans(fromSpan, toSpan).thenApply(x -> x.stream().map(StreamSegmentRecord::segmentId).collect(Collectors.toSet()));
    }
}

