/*
 * Decompiled with CFR 0.152.
 */
package io.pravega.controller.store.stream;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import io.pravega.client.stream.StreamConfiguration;
import io.pravega.common.Exceptions;
import io.pravega.common.concurrent.Futures;
import io.pravega.common.hash.RandomFactory;
import io.pravega.common.lang.Int96;
import io.pravega.controller.metrics.StreamMetrics;
import io.pravega.controller.metrics.TransactionMetrics;
import io.pravega.controller.store.index.HostIndex;
import io.pravega.controller.store.stream.CreateStreamResponse;
import io.pravega.controller.store.stream.OperationContext;
import io.pravega.controller.store.stream.OperationContextImpl;
import io.pravega.controller.store.stream.PersistentStreamBase;
import io.pravega.controller.store.stream.ScaleMetadata;
import io.pravega.controller.store.stream.Scope;
import io.pravega.controller.store.stream.State;
import io.pravega.controller.store.stream.StoreException;
import io.pravega.controller.store.stream.Stream;
import io.pravega.controller.store.stream.StreamMetadataStore;
import io.pravega.controller.store.stream.TxnStatus;
import io.pravega.controller.store.stream.Version;
import io.pravega.controller.store.stream.VersionedMetadata;
import io.pravega.controller.store.stream.VersionedTransactionData;
import io.pravega.controller.store.stream.WriterTimestampResponse;
import io.pravega.controller.store.stream.records.ActiveTxnRecord;
import io.pravega.controller.store.stream.records.CommittingTransactionsRecord;
import io.pravega.controller.store.stream.records.EpochRecord;
import io.pravega.controller.store.stream.records.EpochTransitionRecord;
import io.pravega.controller.store.stream.records.RetentionSet;
import io.pravega.controller.store.stream.records.StreamConfigurationRecord;
import io.pravega.controller.store.stream.records.StreamCutRecord;
import io.pravega.controller.store.stream.records.StreamCutReferenceRecord;
import io.pravega.controller.store.stream.records.StreamSegmentRecord;
import io.pravega.controller.store.stream.records.StreamTruncationRecord;
import io.pravega.controller.store.stream.records.WriterMark;
import io.pravega.controller.store.task.TxnResource;
import io.pravega.controller.stream.api.grpc.v1.Controller;
import io.pravega.shared.controller.event.ControllerEvent;
import io.pravega.shared.controller.event.ControllerEventSerializer;
import java.nio.ByteBuffer;
import java.util.AbstractMap;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import javax.annotation.ParametersAreNonnullByDefault;
import org.apache.commons.lang3.tuple.ImmutablePair;
import org.apache.commons.lang3.tuple.Pair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class AbstractStreamMetadataStore
implements StreamMetadataStore {
    @SuppressFBWarnings(justification="generated code")
    private static final Logger log = LoggerFactory.getLogger(AbstractStreamMetadataStore.class);
    public static final Predicate<Throwable> DATA_NOT_FOUND_PREDICATE = e -> Exceptions.unwrap((Throwable)e) instanceof StoreException.DataNotFoundException;
    public static final Predicate<Throwable> DATA_NOT_EMPTY_PREDICATE = e -> Exceptions.unwrap((Throwable)e) instanceof StoreException.DataNotEmptyException;
    private static final String RESOURCE_PART_SEPARATOR = "_%_";
    private final LoadingCache<String, Scope> scopeCache;
    private final LoadingCache<Pair<String, String>, Stream> cache = CacheBuilder.newBuilder().maximumSize(10000L).refreshAfterWrite(10L, TimeUnit.MINUTES).expireAfterWrite(10L, TimeUnit.MINUTES).build((CacheLoader)new CacheLoader<Pair<String, String>, Stream>(){

        @ParametersAreNonnullByDefault
        public Stream load(Pair<String, String> input) {
            try {
                return AbstractStreamMetadataStore.this.newStream((String)input.getKey(), (String)input.getValue());
            }
            catch (Exception e) {
                throw new RuntimeException(e);
            }
        }
    });
    private final HostIndex hostTxnIndex;
    private final HostIndex hostTaskIndex;
    private final ControllerEventSerializer controllerEventSerializer;

    protected AbstractStreamMetadataStore(HostIndex hostTxnIndex, HostIndex hostTaskIndex) {
        this.scopeCache = CacheBuilder.newBuilder().maximumSize(1000L).refreshAfterWrite(10L, TimeUnit.MINUTES).expireAfterWrite(10L, TimeUnit.MINUTES).build((CacheLoader)new CacheLoader<String, Scope>(){

            @ParametersAreNonnullByDefault
            public Scope load(String scopeName) {
                try {
                    return AbstractStreamMetadataStore.this.newScope(scopeName);
                }
                catch (Exception e) {
                    throw new RuntimeException(e);
                }
            }
        });
        this.hostTxnIndex = hostTxnIndex;
        this.hostTaskIndex = hostTaskIndex;
        this.controllerEventSerializer = new ControllerEventSerializer();
    }

    abstract Scope newScope(String var1);

    @Override
    public OperationContext createContext(String scope, String name) {
        return new OperationContextImpl(this.getStream(scope, name, null));
    }

    @Override
    public CompletableFuture<CreateStreamResponse> createStream(String scope, String name, StreamConfiguration configuration, long createTimestamp, OperationContext context, Executor executor) {
        return this.getSafeStartingSegmentNumberFor(scope, name).thenCompose(startingSegmentNumber -> this.withCompletion((CompletableFuture)this.checkScopeExists(scope).thenCompose(exists -> {
            if (exists.booleanValue()) {
                return this.getStream(scope, name, context).create(configuration, createTimestamp, (int)startingSegmentNumber);
            }
            return Futures.failedFuture((Throwable)StoreException.create(StoreException.Type.DATA_NOT_FOUND, "scope does not exist"));
        }), executor));
    }

    @Override
    public CompletableFuture<Void> deleteStream(String scope, String name, OperationContext context, Executor executor) {
        Stream s = this.getStream(scope, name, context);
        return ((CompletableFuture)Futures.exceptionallyExpecting((CompletableFuture)((CompletableFuture)s.getActiveEpoch(true).thenApply(epoch -> epoch.getSegments().stream().map(StreamSegmentRecord::getSegmentNumber).reduce(Integer::max).get())).thenCompose(lastActiveSegment -> this.recordLastStreamSegment(scope, name, (int)lastActiveSegment, context, executor)), DATA_NOT_FOUND_PREDICATE, null).thenCompose(v -> this.withCompletion(s.delete(), executor))).thenAccept(v -> this.cache.invalidate((Object)new ImmutablePair((Object)scope, (Object)name)));
    }

    @Override
    public CompletableFuture<Long> getCreationTime(String scope, String name, OperationContext context, Executor executor) {
        return this.withCompletion(this.getStream(scope, name, context).getCreationTime(), executor);
    }

    @Override
    public CompletableFuture<Void> setState(String scope, String name, State state, OperationContext context, Executor executor) {
        return this.withCompletion(this.getStream(scope, name, context).updateState(state), executor);
    }

    @Override
    public CompletableFuture<State> getState(String scope, String name, boolean ignoreCached, OperationContext context, Executor executor) {
        return this.withCompletion(this.getStream(scope, name, context).getState(ignoreCached), executor);
    }

    @Override
    public CompletableFuture<VersionedMetadata<State>> updateVersionedState(String scope, String name, State state, VersionedMetadata<State> previous, OperationContext context, Executor executor) {
        return this.withCompletion(this.getStream(scope, name, context).updateVersionedState(previous, state), executor);
    }

    @Override
    public CompletableFuture<VersionedMetadata<State>> getVersionedState(String scope, String name, OperationContext context, Executor executor) {
        return this.withCompletion(this.getStream(scope, name, context).getVersionedState(), executor);
    }

    @Override
    public CompletableFuture<Controller.CreateScopeStatus> createScope(String scopeName) {
        return this.getScope(scopeName).createScope().handle((result, ex) -> {
            if (ex == null) {
                return Controller.CreateScopeStatus.newBuilder().setStatus(Controller.CreateScopeStatus.Status.SUCCESS).build();
            }
            if (ex instanceof StoreException.DataExistsException || ex.getCause() instanceof StoreException.DataExistsException) {
                return Controller.CreateScopeStatus.newBuilder().setStatus(Controller.CreateScopeStatus.Status.SCOPE_EXISTS).build();
            }
            log.debug("Create scope failed due to ", ex);
            return Controller.CreateScopeStatus.newBuilder().setStatus(Controller.CreateScopeStatus.Status.FAILURE).build();
        });
    }

    @Override
    public CompletableFuture<Controller.DeleteScopeStatus> deleteScope(String scopeName) {
        return this.getScope(scopeName).deleteScope().handle((result, e) -> {
            Throwable ex = Exceptions.unwrap((Throwable)e);
            if (ex == null) {
                return Controller.DeleteScopeStatus.newBuilder().setStatus(Controller.DeleteScopeStatus.Status.SUCCESS).build();
            }
            if (ex instanceof StoreException.DataNotFoundException) {
                return Controller.DeleteScopeStatus.newBuilder().setStatus(Controller.DeleteScopeStatus.Status.SCOPE_NOT_FOUND).build();
            }
            if (ex instanceof StoreException.DataNotEmptyException) {
                return Controller.DeleteScopeStatus.newBuilder().setStatus(Controller.DeleteScopeStatus.Status.SCOPE_NOT_EMPTY).build();
            }
            log.debug("DeleteScope failed due to {} ", ex);
            return Controller.DeleteScopeStatus.newBuilder().setStatus(Controller.DeleteScopeStatus.Status.FAILURE).build();
        });
    }

    @Override
    public CompletableFuture<Map<String, StreamConfiguration>> listStreamsInScope(String scopeName) {
        return this.getScope(scopeName).listStreamsInScope().thenCompose(streams -> {
            HashMap<String, CompletionStage> result = new HashMap<String, CompletionStage>();
            for (String s : streams) {
                Stream stream = this.getStream(scopeName, s, null);
                result.put(stream.getName(), Futures.exceptionallyExpecting(stream.getConfiguration(), e -> e instanceof StoreException.DataNotFoundException, null).thenApply(Optional::ofNullable));
            }
            return Futures.allOfWithResults(result).thenApply(x -> x.entrySet().stream().filter(y -> ((Optional)y.getValue()).isPresent()).collect(Collectors.toMap(Map.Entry::getKey, entry -> (StreamConfiguration)((Optional)entry.getValue()).get())));
        });
    }

    @Override
    public CompletableFuture<Pair<List<String>, String>> listStream(String scopeName, String continuationToken, int limit, Executor executor) {
        return this.getScope(scopeName).listStreams(limit, continuationToken, executor);
    }

    @Override
    public CompletableFuture<Void> startTruncation(String scope, String name, Map<Long, Long> streamCut, OperationContext context, Executor executor) {
        return this.withCompletion(this.getStream(scope, name, context).startTruncation(streamCut), executor);
    }

    @Override
    public CompletableFuture<Void> completeTruncation(String scope, String name, VersionedMetadata<StreamTruncationRecord> record, OperationContext context, Executor executor) {
        return this.withCompletion(this.getStream(scope, name, context).completeTruncation(record), executor);
    }

    @Override
    public CompletableFuture<VersionedMetadata<StreamTruncationRecord>> getTruncationRecord(String scope, String name, OperationContext context, Executor executor) {
        return this.withCompletion(this.getStream(scope, name, context).getTruncationRecord(), executor);
    }

    @Override
    public CompletableFuture<Void> startUpdateConfiguration(String scope, String name, StreamConfiguration configuration, OperationContext context, Executor executor) {
        return this.withCompletion(this.getStream(scope, name, context).startUpdateConfiguration(configuration), executor);
    }

    @Override
    public CompletableFuture<Void> completeUpdateConfiguration(String scope, String name, VersionedMetadata<StreamConfigurationRecord> existing, OperationContext context, Executor executor) {
        return this.withCompletion(this.getStream(scope, name, context).completeUpdateConfiguration(existing), executor);
    }

    @Override
    public CompletableFuture<StreamConfiguration> getConfiguration(String scope, String name, OperationContext context, Executor executor) {
        return this.withCompletion(this.getStream(scope, name, context).getConfiguration(), executor);
    }

    @Override
    public CompletableFuture<VersionedMetadata<StreamConfigurationRecord>> getConfigurationRecord(String scope, String name, OperationContext context, Executor executor) {
        return this.withCompletion(this.getStream(scope, name, context).getVersionedConfigurationRecord(), executor);
    }

    @Override
    public CompletableFuture<Boolean> isSealed(String scope, String name, OperationContext context, Executor executor) {
        return this.withCompletion((CompletableFuture)this.getStream(scope, name, context).getState(true).thenApply(state -> state.equals((Object)State.SEALED)), executor);
    }

    @Override
    public CompletableFuture<Void> setSealed(String scope, String name, OperationContext context, Executor executor) {
        return this.withCompletion(this.getStream(scope, name, context).updateState(State.SEALED), executor);
    }

    @Override
    public CompletableFuture<StreamSegmentRecord> getSegment(String scope, String name, long segmentId, OperationContext context, Executor executor) {
        return this.withCompletion(this.getStream(scope, name, context).getSegment(segmentId), executor);
    }

    @Override
    public CompletableFuture<Set<Long>> getAllSegmentIds(String scope, String name, OperationContext context, Executor executor) {
        return this.withCompletion(this.getStream(scope, name, context).getAllSegmentIds(), executor);
    }

    @Override
    public CompletableFuture<Map<StreamSegmentRecord, Long>> getSegmentsAtHead(String scope, String name, OperationContext context, Executor executor) {
        Stream stream = this.getStream(scope, name, context);
        return this.withCompletion(stream.getSegmentsAtHead(), executor);
    }

    @Override
    public CompletableFuture<List<StreamSegmentRecord>> getActiveSegments(String scope, String name, OperationContext context, Executor executor) {
        Stream stream = this.getStream(scope, name, context);
        return this.withCompletion((CompletableFuture)stream.getState(true).thenComposeAsync(state -> {
            if (State.SEALED.equals(state)) {
                return CompletableFuture.completedFuture(Collections.emptyList());
            }
            return stream.getActiveSegments();
        }, executor), executor);
    }

    @Override
    public CompletableFuture<List<StreamSegmentRecord>> getSegmentsInEpoch(String scope, String stream, int epoch, OperationContext context, Executor executor) {
        Stream streamObj = this.getStream(scope, stream, context);
        return this.withCompletion(streamObj.getSegmentsInEpoch(epoch), executor);
    }

    @Override
    public CompletableFuture<Map<StreamSegmentRecord, List<Long>>> getSuccessors(String scope, String streamName, long segmentId, OperationContext context, Executor executor) {
        Stream stream = this.getStream(scope, streamName, context);
        return this.withCompletion(stream.getSuccessorsWithPredecessors(segmentId), executor);
    }

    @Override
    public CompletableFuture<List<StreamSegmentRecord>> getSegmentsBetweenStreamCuts(String scope, String streamName, Map<Long, Long> from, Map<Long, Long> to, OperationContext context, Executor executor) {
        Stream stream = this.getStream(scope, streamName, context);
        return this.withCompletion(stream.getSegmentsBetweenStreamCuts(from, to), executor);
    }

    @Override
    public CompletableFuture<Boolean> isStreamCutValid(String scope, String streamName, Map<Long, Long> streamCut, OperationContext context, Executor executor) {
        Stream stream = this.getStream(scope, streamName, context);
        return this.withCompletion(stream.isStreamCutValid(streamCut), executor);
    }

    @Override
    public CompletableFuture<VersionedMetadata<EpochTransitionRecord>> submitScale(String scope, String name, List<Long> sealedSegments, List<Map.Entry<Double, Double>> newRanges, long scaleTimestamp, VersionedMetadata<EpochTransitionRecord> record, OperationContext context, Executor executor) {
        return this.withCompletion(this.getStream(scope, name, context).submitScale(sealedSegments, newRanges, scaleTimestamp, record), executor);
    }

    @Override
    public CompletableFuture<VersionedMetadata<EpochTransitionRecord>> getEpochTransition(String scope, String stream, OperationContext context, ScheduledExecutorService executor) {
        return this.withCompletion(this.getStream(scope, stream, context).getEpochTransition(), executor);
    }

    @Override
    public CompletableFuture<VersionedMetadata<EpochTransitionRecord>> startScale(String scope, String name, boolean isManualScale, VersionedMetadata<EpochTransitionRecord> record, VersionedMetadata<State> state, OperationContext context, Executor executor) {
        return this.withCompletion(this.getStream(scope, name, context).startScale(isManualScale, record, state), executor);
    }

    @Override
    public CompletableFuture<VersionedMetadata<EpochTransitionRecord>> scaleCreateNewEpochs(String scope, String name, VersionedMetadata<EpochTransitionRecord> record, OperationContext context, Executor executor) {
        return this.withCompletion(this.getStream(scope, name, context).scaleCreateNewEpoch(record), executor);
    }

    @Override
    public CompletableFuture<Void> scaleSegmentsSealed(String scope, String name, Map<Long, Long> sealedSegmentSizes, VersionedMetadata<EpochTransitionRecord> record, OperationContext context, Executor executor) {
        CompletableFuture<Void> future = this.withCompletion(this.getStream(scope, name, context).scaleOldSegmentsSealed(sealedSegmentSizes, record), executor);
        future.thenCompose(result -> CompletableFuture.allOf(new CompletableFuture[]{this.getActiveSegments(scope, name, context, executor).thenAccept(list -> StreamMetrics.reportActiveSegments(scope, name, list.size())), this.findNumSplitsMerges(scope, name, context, executor).thenAccept(simpleEntry -> StreamMetrics.reportSegmentSplitsAndMerges(scope, name, (Long)simpleEntry.getKey(), (Long)simpleEntry.getValue()))}));
        return future;
    }

    @Override
    public CompletableFuture<Void> completeScale(String scope, String name, VersionedMetadata<EpochTransitionRecord> record, OperationContext context, Executor executor) {
        return this.withCompletion(this.getStream(scope, name, context).completeScale(record), executor);
    }

    @Override
    public CompletableFuture<VersionedMetadata<CommittingTransactionsRecord>> startRollingTxn(String scope, String stream, int activeEpoch, VersionedMetadata<CommittingTransactionsRecord> existing, OperationContext context, ScheduledExecutorService executor) {
        return this.withCompletion(this.getStream(scope, stream, context).startRollingTxn(activeEpoch, existing), executor);
    }

    @Override
    public CompletableFuture<Void> rollingTxnCreateDuplicateEpochs(String scope, String name, Map<Long, Long> sealedTxnEpochSegments, long time, VersionedMetadata<CommittingTransactionsRecord> record, OperationContext context, Executor executor) {
        return this.withCompletion(this.getStream(scope, name, context).rollingTxnCreateDuplicateEpochs(sealedTxnEpochSegments, time, record), executor);
    }

    @Override
    public CompletableFuture<Void> completeRollingTxn(String scope, String name, Map<Long, Long> sealedActiveEpochSegments, VersionedMetadata<CommittingTransactionsRecord> record, OperationContext context, Executor executor) {
        return this.withCompletion(this.getStream(scope, name, context).completeRollingTxn(sealedActiveEpochSegments, record), executor);
    }

    @Override
    public CompletableFuture<Void> addStreamCutToRetentionSet(String scope, String name, StreamCutRecord streamCut, OperationContext context, Executor executor) {
        Stream stream = this.getStream(scope, name, context);
        return this.withCompletion(stream.addStreamCutToRetentionSet(streamCut), executor);
    }

    @Override
    public CompletableFuture<RetentionSet> getRetentionSet(String scope, String name, OperationContext context, Executor executor) {
        Stream stream = this.getStream(scope, name, context);
        return this.withCompletion(stream.getRetentionSet(), executor);
    }

    @Override
    public CompletableFuture<StreamCutRecord> getStreamCutRecord(String scope, String name, StreamCutReferenceRecord reference, OperationContext context, Executor executor) {
        Stream stream = this.getStream(scope, name, context);
        return this.withCompletion(stream.getStreamCutRecord(reference), executor);
    }

    @Override
    public CompletableFuture<Void> deleteStreamCutBefore(String scope, String name, StreamCutReferenceRecord streamCut, OperationContext context, Executor executor) {
        Stream stream = this.getStream(scope, name, context);
        return this.withCompletion(stream.deleteStreamCutBefore(streamCut), executor);
    }

    @Override
    public CompletableFuture<Long> getSizeTillStreamCut(String scope, String name, Map<Long, Long> streamCut, Optional<StreamCutRecord> reference, OperationContext context, ScheduledExecutorService executor) {
        Stream stream = this.getStream(scope, name, context);
        return this.withCompletion(stream.getSizeTillStreamCut(streamCut, reference), executor);
    }

    @Override
    public CompletableFuture<UUID> generateTransactionId(String scopeName, String streamName, OperationContext context, Executor executor) {
        Stream stream = this.getStream(scopeName, streamName, context);
        CompletableFuture<Int96> nextFuture = this.getNextCounter();
        return this.withCompletion((CompletableFuture)nextFuture.thenCompose(next -> stream.generateNewTxnId(next.getMsb(), next.getLsb())), executor);
    }

    @Override
    public CompletableFuture<VersionedTransactionData> createTransaction(String scopeName, String streamName, UUID txnId, long lease, long maxExecutionTime, OperationContext context, Executor executor) {
        Stream stream = this.getStream(scopeName, streamName, context);
        return this.withCompletion(stream.createTransaction(txnId, lease, maxExecutionTime), executor);
    }

    @Override
    public CompletableFuture<VersionedTransactionData> pingTransaction(String scopeName, String streamName, VersionedTransactionData txData, long lease, OperationContext context, Executor executor) {
        return this.withCompletion(this.getStream(scopeName, streamName, context).pingTransaction(txData, lease), executor);
    }

    @Override
    public CompletableFuture<VersionedTransactionData> getTransactionData(String scopeName, String streamName, UUID txId, OperationContext context, Executor executor) {
        return this.withCompletion(this.getStream(scopeName, streamName, context).getTransactionData(txId), executor);
    }

    @Override
    public CompletableFuture<TxnStatus> transactionStatus(String scopeName, String streamName, UUID txId, OperationContext context, Executor executor) {
        return this.withCompletion(this.getStream(scopeName, streamName, context).checkTransactionStatus(txId), executor);
    }

    @VisibleForTesting
    public CompletableFuture<TxnStatus> commitTransaction(String scope, String streamName, UUID txId, OperationContext context, Executor executor) {
        Stream stream = this.getStream(scope, streamName, context);
        return this.withCompletion(((PersistentStreamBase)stream).commitTransaction(txId), executor);
    }

    @Override
    public CompletableFuture<AbstractMap.SimpleEntry<TxnStatus, Integer>> sealTransaction(String scopeName, String streamName, UUID txId, boolean commit, Optional<Version> version, String writerId, long timestamp, OperationContext context, Executor executor) {
        return this.withCompletion(this.getStream(scopeName, streamName, context).sealTransaction(txId, commit, version, writerId, timestamp), executor);
    }

    @Override
    public CompletableFuture<TxnStatus> abortTransaction(String scope, String streamName, UUID txId, OperationContext context, Executor executor) {
        Stream stream = this.getStream(scope, streamName, context);
        return this.withCompletion(stream.abortTransaction(txId), executor);
    }

    @Override
    public CompletableFuture<Void> addTxnToIndex(String hostId, TxnResource txn, Version version) {
        return this.hostTxnIndex.addEntity(hostId, this.getTxnResourceString(txn), Optional.ofNullable(version).orElse(this.getEmptyVersion()).toBytes());
    }

    @Override
    public CompletableFuture<Void> removeTxnFromIndex(String hostId, TxnResource txn, boolean deleteEmptyParent) {
        return this.hostTxnIndex.removeEntity(hostId, this.getTxnResourceString(txn), deleteEmptyParent);
    }

    @Override
    public CompletableFuture<Optional<TxnResource>> getRandomTxnFromIndex(String hostId) {
        return this.hostTxnIndex.getEntities(hostId).thenApply(list -> list != null && list.size() > 0 ? Optional.of(this.getTxnResource((String)list.get(RandomFactory.create().nextInt(list.size())))) : Optional.empty());
    }

    @Override
    public CompletableFuture<Version> getTxnVersionFromIndex(String hostId, TxnResource resource) {
        return this.hostTxnIndex.getEntityData(hostId, this.getTxnResourceString(resource)).thenApply(data -> Optional.ofNullable(data).map(this::parseVersionData).filter(x -> !x.equals(this.getEmptyVersion())).orElse(null));
    }

    @Override
    public CompletableFuture<Void> removeHostFromIndex(String hostId) {
        return this.hostTxnIndex.removeHost(hostId);
    }

    @Override
    public CompletableFuture<Set<String>> listHostsOwningTxn() {
        return this.hostTxnIndex.getHosts();
    }

    @Override
    public CompletableFuture<Void> addRequestToIndex(String hostId, String id, ControllerEvent task) {
        return this.hostTaskIndex.addEntity(hostId, id, this.controllerEventSerializer.toByteBuffer(task).array());
    }

    @Override
    public CompletableFuture<Void> removeTaskFromIndex(String hostId, String id) {
        return this.hostTaskIndex.removeEntity(hostId, id, true);
    }

    @Override
    public CompletableFuture<Map<String, ControllerEvent>> getPendingsTaskForHost(String hostId, int limit) {
        return this.hostTaskIndex.getEntities(hostId).thenCompose(list -> Futures.allOfWithResults(list.stream().limit(limit).collect(Collectors.toMap(id -> id, id -> this.getControllerTask(hostId, (String)id)))));
    }

    private CompletableFuture<ControllerEvent> getControllerTask(String hostId, String id) {
        return this.hostTaskIndex.getEntityData(hostId, id).thenApply(data -> this.controllerEventSerializer.fromByteBuffer(ByteBuffer.wrap(data)));
    }

    @Override
    public CompletableFuture<Void> removeHostFromTaskIndex(String hostId) {
        return this.hostTaskIndex.removeHost(hostId);
    }

    @Override
    public CompletableFuture<Set<String>> listHostsWithPendingTask() {
        return this.hostTaskIndex.getHosts();
    }

    @Override
    public CompletableFuture<Void> markCold(String scope, String stream, long segmentId, long timestamp, OperationContext context, Executor executor) {
        return this.withCompletion(this.getStream(scope, stream, context).setColdMarker(segmentId, timestamp), executor);
    }

    @Override
    public CompletableFuture<Boolean> isCold(String scope, String stream, long segmentId, OperationContext context, Executor executor) {
        return this.withCompletion((CompletableFuture)this.getStream(scope, stream, context).getColdMarker(segmentId).thenApply(marker -> marker != null && marker > System.currentTimeMillis()), executor);
    }

    @Override
    public CompletableFuture<Void> removeMarker(String scope, String stream, long segmentId, OperationContext context, Executor executor) {
        return this.withCompletion(this.getStream(scope, stream, context).removeColdMarker(segmentId), executor);
    }

    @Override
    public CompletableFuture<Map<UUID, ActiveTxnRecord>> getActiveTxns(String scope, String stream, OperationContext context, Executor executor) {
        return this.withCompletion(this.getStream(scope, stream, context).getActiveTxns(), executor);
    }

    @Override
    public CompletableFuture<List<ScaleMetadata>> getScaleMetadata(String scope, String name, long from, long to, OperationContext context, Executor executor) {
        return this.withCompletion(this.getStream(scope, name, context).getScaleMetadata(from, to), executor);
    }

    @Override
    public CompletableFuture<EpochRecord> getActiveEpoch(String scope, String stream, OperationContext context, boolean ignoreCached, Executor executor) {
        return this.withCompletion(this.getStream(scope, stream, context).getActiveEpoch(ignoreCached), executor);
    }

    @Override
    public CompletableFuture<EpochRecord> getEpoch(String scope, String stream, int epoch, OperationContext context, Executor executor) {
        return this.withCompletion(this.getStream(scope, stream, context).getEpochRecord(epoch), executor);
    }

    @Override
    public CompletableFuture<VersionedMetadata<CommittingTransactionsRecord>> startCommitTransactions(String scope, String stream, OperationContext context, ScheduledExecutorService executor) {
        return this.withCompletion(this.getStream(scope, stream, context).startCommittingTransactions(), executor);
    }

    @Override
    public CompletableFuture<VersionedMetadata<CommittingTransactionsRecord>> getVersionedCommittingTransactionsRecord(String scope, String stream, OperationContext context, ScheduledExecutorService executor) {
        return this.withCompletion(this.getStream(scope, stream, context).getVersionedCommitTransactionsRecord(), executor);
    }

    @Override
    public CompletableFuture<Void> recordCommitOffsets(String scope, String stream, UUID txnId, Map<Long, Long> commitOffsets, OperationContext context, ScheduledExecutorService executor) {
        return this.withCompletion(this.getStream(scope, stream, context).recordCommitOffsets(txnId, commitOffsets), executor);
    }

    @Override
    public CompletableFuture<Void> completeCommitTransactions(String scope, String stream, VersionedMetadata<CommittingTransactionsRecord> record, OperationContext context, ScheduledExecutorService executor) {
        Stream streamObj = this.getStream(scope, stream, context);
        return this.withCompletion(streamObj.completeCommittingTransactions(record), executor).thenAccept(result -> streamObj.getNumberOfOngoingTransactions().thenAccept(count -> TransactionMetrics.reportOpenTransactions(scope, stream, count)));
    }

    @Override
    public CompletableFuture<Void> createWaitingRequestIfAbsent(String scope, String stream, String processorName, OperationContext context, ScheduledExecutorService executor) {
        return this.withCompletion(this.getStream(scope, stream, context).createWaitingRequestIfAbsent(processorName), executor);
    }

    @Override
    public CompletableFuture<String> getWaitingRequestProcessor(String scope, String stream, OperationContext context, ScheduledExecutorService executor) {
        return this.withCompletion(this.getStream(scope, stream, context).getWaitingRequestProcessor(), executor);
    }

    @Override
    public CompletableFuture<Void> deleteWaitingRequestConditionally(String scope, String stream, String processorName, OperationContext context, ScheduledExecutorService executor) {
        return this.withCompletion(this.getStream(scope, stream, context).deleteWaitingRequestConditionally(processorName), executor);
    }

    @Override
    public CompletableFuture<WriterTimestampResponse> noteWriterMark(String scope, String stream, String writer, long timestamp, Map<Long, Long> position, OperationContext context, Executor executor) {
        return this.withCompletion(this.getStream(scope, stream, context).noteWriterMark(writer, timestamp, position), executor);
    }

    @Override
    public CompletableFuture<Void> shutdownWriter(String scope, String stream, String writer, OperationContext context, Executor executor) {
        return this.withCompletion(this.getStream(scope, stream, context).shutdownWriter(writer), executor);
    }

    @Override
    public CompletableFuture<Void> removeWriter(String scope, String stream, String writer, WriterMark writerMark, OperationContext context, Executor executor) {
        return this.withCompletion(this.getStream(scope, stream, context).removeWriter(writer, writerMark), executor);
    }

    @Override
    public CompletableFuture<WriterMark> getWriterMark(String scope, String stream, String writer, OperationContext context, Executor executor) {
        return this.withCompletion(this.getStream(scope, stream, context).getWriterMark(writer), executor);
    }

    @Override
    public CompletableFuture<Map<String, WriterMark>> getAllWriterMarks(String scope, String stream, OperationContext context, Executor executor) {
        return this.withCompletion(this.getStream(scope, stream, context).getAllWriterMarks(), executor);
    }

    protected Stream getStream(String scope, String name, OperationContext context) {
        Stream stream;
        if (context != null) {
            stream = context.getStream();
            assert (stream.getScope().equals(scope));
            assert (stream.getName().equals(name));
        } else {
            stream = (Stream)this.cache.getUnchecked((Object)new ImmutablePair((Object)scope, (Object)name));
            stream.refresh();
        }
        return stream;
    }

    @VisibleForTesting
    void setStream(Stream stream) {
        this.cache.put((Object)new ImmutablePair((Object)stream.getScope(), (Object)stream.getName()), (Object)stream);
    }

    protected Scope getScope(String scopeName) {
        Scope scope = (Scope)this.scopeCache.getUnchecked((Object)scopeName);
        scope.refresh();
        return scope;
    }

    protected <T> CompletableFuture<T> withCompletion(CompletableFuture<T> future, Executor executor) {
        CompletableFuture result = new CompletableFuture();
        future.whenCompleteAsync((r, e) -> {
            if (e != null) {
                result.completeExceptionally((Throwable)e);
            } else {
                result.complete(r);
            }
        }, executor);
        return result;
    }

    private CompletableFuture<AbstractMap.SimpleEntry<Long, Long>> findNumSplitsMerges(String scopeName, String streamName, OperationContext context, Executor executor) {
        return this.getScaleMetadata(scopeName, streamName, 0L, Long.MAX_VALUE, context, executor).thenApply(scaleMetadataList -> {
            AtomicLong totalNumSplits = new AtomicLong(0L);
            AtomicLong totalNumMerges = new AtomicLong(0L);
            scaleMetadataList.forEach(x -> {
                totalNumMerges.addAndGet(x.getMerges());
                totalNumSplits.addAndGet(x.getSplits());
            });
            return new AbstractMap.SimpleEntry<Long, Long>(totalNumSplits.get(), totalNumMerges.get());
        });
    }

    abstract CompletableFuture<Integer> getSafeStartingSegmentNumberFor(String var1, String var2);

    abstract CompletableFuture<Void> recordLastStreamSegment(String var1, String var2, int var3, OperationContext var4, Executor var5);

    abstract Stream newStream(String var1, String var2);

    abstract CompletableFuture<Int96> getNextCounter();

    abstract CompletableFuture<Boolean> checkScopeExists(String var1);

    private String getTxnResourceString(TxnResource txn) {
        return txn.toString(RESOURCE_PART_SEPARATOR);
    }

    private TxnResource getTxnResource(String str) {
        return TxnResource.parse(str, RESOURCE_PART_SEPARATOR);
    }

    String getScopedStreamName(String scope, String stream) {
        return String.format("%s/%s", scope, stream);
    }

    abstract Version getEmptyVersion();

    abstract Version parseVersionData(byte[] var1);
}

