/*
 * Decompiled with CFR 0.152.
 */
package io.pravega.controller.store.stream;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import com.google.common.collect.ImmutableMap;
import io.pravega.client.stream.StreamConfiguration;
import io.pravega.common.concurrent.Futures;
import io.pravega.controller.store.stream.CreateStreamResponse;
import io.pravega.controller.store.stream.PersistentStreamBase;
import io.pravega.controller.store.stream.State;
import io.pravega.controller.store.stream.StoreException;
import io.pravega.controller.store.stream.Version;
import io.pravega.controller.store.stream.VersionedMetadata;
import io.pravega.controller.store.stream.records.ActiveTxnRecord;
import io.pravega.controller.store.stream.records.CommittingTransactionsRecord;
import io.pravega.controller.store.stream.records.CompletedTxnRecord;
import io.pravega.controller.store.stream.records.EpochRecord;
import io.pravega.controller.store.stream.records.EpochTransitionRecord;
import io.pravega.controller.store.stream.records.HistoryTimeSeries;
import io.pravega.controller.store.stream.records.RecordHelper;
import io.pravega.controller.store.stream.records.RetentionSet;
import io.pravega.controller.store.stream.records.SealedSegmentsMapShard;
import io.pravega.controller.store.stream.records.StateRecord;
import io.pravega.controller.store.stream.records.StreamConfigurationRecord;
import io.pravega.controller.store.stream.records.StreamCutRecord;
import io.pravega.controller.store.stream.records.StreamTruncationRecord;
import io.pravega.controller.store.stream.records.WriterMark;
import io.pravega.controller.util.Config;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
import javax.annotation.concurrent.GuardedBy;

public class InMemoryStream
extends PersistentStreamBase {
    private final AtomicLong creationTime = new AtomicLong(Long.MIN_VALUE);
    private final Object lock = new Object();
    @GuardedBy(value="lock")
    private VersionedMetadata<StreamConfigurationRecord> configuration;
    @GuardedBy(value="lock")
    private VersionedMetadata<StreamTruncationRecord> truncationRecord;
    @GuardedBy(value="lock")
    private VersionedMetadata<StateRecord> state;
    @GuardedBy(value="lock")
    private VersionedMetadata<EpochRecord> currentEpochRecord;
    @GuardedBy(value="lock")
    private Map<Integer, VersionedMetadata<EpochRecord>> epochRecords = new HashMap<Integer, VersionedMetadata<EpochRecord>>();
    @GuardedBy(value="lock")
    private Map<Integer, VersionedMetadata<HistoryTimeSeries>> historyTimeSeries = new HashMap<Integer, VersionedMetadata<HistoryTimeSeries>>();
    @GuardedBy(value="lock")
    private VersionedMetadata<RetentionSet> retentionSet;
    @GuardedBy(value="lock")
    private final Map<Long, VersionedMetadata<StreamCutRecord>> streamCutRecords = new HashMap<Long, VersionedMetadata<StreamCutRecord>>();
    @GuardedBy(value="lock")
    private final Map<Integer, VersionedMetadata<SealedSegmentsMapShard>> sealedSegmentsShards = new HashMap<Integer, VersionedMetadata<SealedSegmentsMapShard>>();
    @GuardedBy(value="lock")
    private final Map<Long, VersionedMetadata<Integer>> segmentSealingEpochs = new HashMap<Long, VersionedMetadata<Integer>>();
    @GuardedBy(value="lock")
    private VersionedMetadata<EpochTransitionRecord> epochTransition;
    @GuardedBy(value="lock")
    private VersionedMetadata<CommittingTransactionsRecord> committingTxnRecord;
    @GuardedBy(value="lock")
    private String waitingRequestNode;
    private final Object txnsLock = new Object();
    @GuardedBy(value="txnsLock")
    private final Map<UUID, VersionedMetadata<ActiveTxnRecord>> activeTxns = new HashMap<UUID, VersionedMetadata<ActiveTxnRecord>>();
    private final AtomicLong counter = new AtomicLong();
    private final ConcurrentHashMap<Long, UUID> transactionCommitOrder = new ConcurrentHashMap();
    @GuardedBy(value="txnsLock")
    private final Cache<UUID, VersionedMetadata<CompletedTxnRecord>> completedTxns;
    private final Object markersLock = new Object();
    @GuardedBy(value="markersLock")
    private final Map<Long, VersionedMetadata<Long>> markers = new HashMap<Long, VersionedMetadata<Long>>();
    @GuardedBy(value="txnsLock")
    private final Map<Integer, Set<UUID>> epochTxnMap = new HashMap<Integer, Set<UUID>>();
    private final Object writersLock = new Object();
    @GuardedBy(value="writersLock")
    private final Map<String, VersionedMetadata<WriterMark>> writerMarks = new HashMap<String, VersionedMetadata<WriterMark>>();

    InMemoryStream(String scope, String name) {
        this(scope, name, Duration.ofHours(Config.COMPLETED_TRANSACTION_TTL_IN_HOURS).toMillis());
    }

    @VisibleForTesting
    InMemoryStream(String scope, String name, int chunkSize, int shardSize) {
        this(scope, name, Duration.ofHours(Config.COMPLETED_TRANSACTION_TTL_IN_HOURS).toMillis(), chunkSize, shardSize);
    }

    @VisibleForTesting
    InMemoryStream(String scope, String name, long completedTxnTTL) {
        this(scope, name, completedTxnTTL, 1000, 1000);
    }

    @VisibleForTesting
    InMemoryStream(String scope, String name, long completedTxnTTL, int chunkSize, int shardSize) {
        super(scope, name, chunkSize, shardSize);
        this.completedTxns = CacheBuilder.newBuilder().expireAfterWrite(completedTxnTTL, TimeUnit.MILLISECONDS).build();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public CompletableFuture<Integer> getNumberOfOngoingTransactions() {
        Object object = this.txnsLock;
        synchronized (object) {
            return CompletableFuture.completedFuture(this.activeTxns.size());
        }
    }

    @Override
    public void refresh() {
    }

    @Override
    CompletableFuture<Void> deleteStream() {
        return CompletableFuture.completedFuture(null);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    CompletableFuture<CreateStreamResponse> checkStreamExists(StreamConfiguration configuration, long timestamp, int startingSegmentNumber) {
        VersionedMetadata<StateRecord> currentState;
        StreamConfigurationRecord config;
        long time;
        CompletableFuture<CreateStreamResponse> result = new CompletableFuture<CreateStreamResponse>();
        Object object = this.lock;
        synchronized (object) {
            time = this.creationTime.get();
            config = this.configuration == null ? null : this.configuration.getObject();
            currentState = this.state;
        }
        if (time != Long.MIN_VALUE) {
            if (config != null) {
                this.handleStreamMetadataExists(timestamp, result, time, startingSegmentNumber, config.getStreamConfiguration(), currentState);
            } else {
                result.complete(new CreateStreamResponse(CreateStreamResponse.CreateStatus.NEW, configuration, time, startingSegmentNumber));
            }
        } else {
            result.complete(new CreateStreamResponse(CreateStreamResponse.CreateStatus.NEW, configuration, timestamp, startingSegmentNumber));
        }
        return result;
    }

    @Override
    CompletableFuture<Void> createStreamMetadata() {
        return CompletableFuture.completedFuture(null);
    }

    private void handleStreamMetadataExists(long timestamp, CompletableFuture<CreateStreamResponse> result, long time, int startingSegmentNumber, StreamConfiguration config, VersionedMetadata<StateRecord> currentState) {
        if (currentState != null) {
            State stateVal = currentState.getObject().getState();
            if (stateVal.equals((Object)State.UNKNOWN) || stateVal.equals((Object)State.CREATING)) {
                CreateStreamResponse.CreateStatus status = time == timestamp ? CreateStreamResponse.CreateStatus.NEW : CreateStreamResponse.CreateStatus.EXISTS_CREATING;
                result.complete(new CreateStreamResponse(status, config, time, startingSegmentNumber));
            } else {
                result.complete(new CreateStreamResponse(CreateStreamResponse.CreateStatus.EXISTS_ACTIVE, config, time, startingSegmentNumber));
            }
        } else {
            CreateStreamResponse.CreateStatus status = time == timestamp ? CreateStreamResponse.CreateStatus.NEW : CreateStreamResponse.CreateStatus.EXISTS_CREATING;
            result.complete(new CreateStreamResponse(status, config, time, startingSegmentNumber));
        }
    }

    @Override
    CompletableFuture<Void> storeCreationTimeIfAbsent(long timestamp) {
        this.creationTime.compareAndSet(Long.MIN_VALUE, timestamp);
        return CompletableFuture.completedFuture(null);
    }

    @Override
    public CompletableFuture<Long> getCreationTime() {
        return CompletableFuture.completedFuture(this.creationTime.get());
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    CompletableFuture<Void> createConfigurationIfAbsent(StreamConfigurationRecord config) {
        Preconditions.checkNotNull((Object)config);
        Object object = this.lock;
        synchronized (object) {
            if (this.configuration == null) {
                this.configuration = new VersionedMetadata<StreamConfigurationRecord>(config, new Version.IntVersion(0));
            }
        }
        return CompletableFuture.completedFuture(null);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    CompletableFuture<Void> createTruncationDataIfAbsent(StreamTruncationRecord truncation) {
        Preconditions.checkNotNull((Object)truncation);
        Object object = this.lock;
        synchronized (object) {
            if (this.truncationRecord == null) {
                this.truncationRecord = new VersionedMetadata<StreamTruncationRecord>(truncation, new Version.IntVersion(0));
            }
        }
        return CompletableFuture.completedFuture(null);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    CompletableFuture<Version> setConfigurationData(VersionedMetadata<StreamConfigurationRecord> newConfig) {
        Preconditions.checkNotNull(newConfig);
        CompletableFuture<Version> result = new CompletableFuture<Version>();
        Object object = this.lock;
        synchronized (object) {
            if (this.configuration == null) {
                result.completeExceptionally(StoreException.create(StoreException.Type.DATA_NOT_FOUND, this.getName()));
            } else if (Objects.equals(this.configuration.getVersion(), newConfig.getVersion())) {
                this.configuration = this.updatedCopy(new VersionedMetadata<StreamConfigurationRecord>(newConfig.getObject(), this.configuration.getVersion()));
                result.complete(this.configuration.getVersion());
            } else {
                result.completeExceptionally(StoreException.create(StoreException.Type.WRITE_CONFLICT, this.getName()));
            }
        }
        return result;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    CompletableFuture<VersionedMetadata<StreamConfigurationRecord>> getConfigurationData(boolean ignoreCached) {
        Object object = this.lock;
        synchronized (object) {
            if (this.configuration == null) {
                return Futures.failedFuture((Throwable)StoreException.create(StoreException.Type.DATA_NOT_FOUND, this.getName()));
            }
            return CompletableFuture.completedFuture(this.configuration);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    CompletableFuture<Version> setTruncationData(VersionedMetadata<StreamTruncationRecord> truncationRecord) {
        Preconditions.checkNotNull(truncationRecord);
        CompletableFuture<Version> result = new CompletableFuture<Version>();
        VersionedMetadata<StreamTruncationRecord> updatedCopy = this.updatedCopy(truncationRecord);
        Object object = this.lock;
        synchronized (object) {
            if (this.truncationRecord == null) {
                result.completeExceptionally(StoreException.create(StoreException.Type.DATA_NOT_FOUND, "truncation record not found"));
            } else if (Objects.equals(this.truncationRecord.getVersion(), truncationRecord.getVersion())) {
                this.truncationRecord = updatedCopy;
                result.complete(updatedCopy.getVersion());
            } else {
                result.completeExceptionally(StoreException.create(StoreException.Type.WRITE_CONFLICT, this.getName()));
            }
        }
        return result;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    CompletableFuture<VersionedMetadata<StreamTruncationRecord>> getTruncationData(boolean ignoreCached) {
        Object object = this.lock;
        synchronized (object) {
            if (this.truncationRecord == null) {
                return Futures.failedFuture((Throwable)StoreException.create(StoreException.Type.DATA_NOT_FOUND, this.getName()));
            }
            return CompletableFuture.completedFuture(this.truncationRecord);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    CompletableFuture<Void> createStateIfAbsent(StateRecord state) {
        Preconditions.checkNotNull((Object)state);
        Object object = this.lock;
        synchronized (object) {
            if (this.state == null) {
                this.state = new VersionedMetadata<StateRecord>(state, new Version.IntVersion(0));
            }
        }
        return CompletableFuture.completedFuture(null);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    CompletableFuture<Version> setStateData(VersionedMetadata<StateRecord> newState) {
        Preconditions.checkNotNull(newState);
        CompletableFuture<Version> result = new CompletableFuture<Version>();
        Object object = this.lock;
        synchronized (object) {
            if (Objects.equals(this.state.getVersion(), newState.getVersion())) {
                this.state = this.updatedCopy(newState);
                result.complete(this.state.getVersion());
            } else {
                result.completeExceptionally(StoreException.create(StoreException.Type.WRITE_CONFLICT, this.getName()));
            }
        }
        return result;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    CompletableFuture<VersionedMetadata<StateRecord>> getStateData(boolean ignoreCached) {
        Object object = this.lock;
        synchronized (object) {
            if (this.state == null) {
                return Futures.failedFuture((Throwable)StoreException.create(StoreException.Type.DATA_NOT_FOUND, this.getName()));
            }
            return CompletableFuture.completedFuture(this.state);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    CompletableFuture<Void> createStreamCutRecordData(long key, StreamCutRecord tData) {
        Preconditions.checkNotNull(this.state);
        Object object = this.lock;
        synchronized (object) {
            this.streamCutRecords.putIfAbsent(key, new VersionedMetadata<StreamCutRecord>(tData, new Version.IntVersion(0)));
        }
        return CompletableFuture.completedFuture(null);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    CompletableFuture<VersionedMetadata<StreamCutRecord>> getStreamCutRecordData(long recordingTime) {
        Object object = this.lock;
        synchronized (object) {
            if (!this.streamCutRecords.containsKey(recordingTime)) {
                return Futures.failedFuture((Throwable)StoreException.create(StoreException.Type.DATA_NOT_FOUND, this.getName()));
            }
            return CompletableFuture.completedFuture(this.streamCutRecords.get(recordingTime));
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    CompletableFuture<Void> deleteStreamCutRecordData(long recordingTime) {
        Object object = this.lock;
        synchronized (object) {
            this.streamCutRecords.remove(recordingTime);
            return CompletableFuture.completedFuture(null);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    CompletableFuture<Void> createHistoryTimeSeriesChunkDataIfAbsent(int chunkNumber, HistoryTimeSeries data) {
        Preconditions.checkNotNull((Object)data);
        VersionedMetadata<HistoryTimeSeries> copy = new VersionedMetadata<HistoryTimeSeries>(data, new Version.IntVersion(0));
        Object object = this.lock;
        synchronized (object) {
            this.historyTimeSeries.putIfAbsent(chunkNumber, copy);
        }
        return CompletableFuture.completedFuture(null);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    CompletableFuture<VersionedMetadata<HistoryTimeSeries>> getHistoryTimeSeriesChunkData(int chunkNumber, boolean ignoreCached) {
        Object object = this.lock;
        synchronized (object) {
            if (!this.historyTimeSeries.containsKey(chunkNumber)) {
                return Futures.failedFuture((Throwable)StoreException.create(StoreException.Type.DATA_NOT_FOUND, this.getName()));
            }
            return CompletableFuture.completedFuture(this.historyTimeSeries.get(chunkNumber));
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    CompletableFuture<Version> updateHistoryTimeSeriesChunkData(int historyChunk, VersionedMetadata<HistoryTimeSeries> updated) {
        Preconditions.checkNotNull(updated);
        Preconditions.checkNotNull((Object)updated.getObject());
        CompletableFuture<Version> result = new CompletableFuture<Version>();
        VersionedMetadata<HistoryTimeSeries> copy = this.updatedCopy(updated);
        Object object = this.lock;
        synchronized (object) {
            if (!this.historyTimeSeries.containsKey(historyChunk)) {
                result.completeExceptionally(StoreException.create(StoreException.Type.DATA_NOT_FOUND, "History timeseries chunk for stream: " + this.getName()));
            } else if (this.historyTimeSeries.get(historyChunk).getVersion().equals(updated.getVersion())) {
                this.historyTimeSeries.put(historyChunk, copy);
                result.complete(copy.getVersion());
            } else {
                result.completeExceptionally(StoreException.create(StoreException.Type.WRITE_CONFLICT, "History time series for stream: " + this.getName()));
            }
        }
        return result;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    CompletableFuture<Void> createCurrentEpochRecordDataIfAbsent(EpochRecord data) {
        Preconditions.checkNotNull((Object)data);
        CompletableFuture<Void> result = new CompletableFuture<Void>();
        Object object = this.lock;
        synchronized (object) {
            if (this.currentEpochRecord == null) {
                this.currentEpochRecord = new VersionedMetadata<EpochRecord>(data, new Version.IntVersion(0));
            }
            result.complete(null);
        }
        return result;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    CompletableFuture<Version> updateCurrentEpochRecordData(VersionedMetadata<EpochRecord> updated) {
        Preconditions.checkNotNull(updated);
        Preconditions.checkNotNull((Object)updated.getObject());
        CompletableFuture<Version> result = new CompletableFuture<Version>();
        VersionedMetadata<EpochRecord> copy = this.updatedCopy(updated);
        Object object = this.lock;
        synchronized (object) {
            if (this.currentEpochRecord == null) {
                result.completeExceptionally(StoreException.create(StoreException.Type.DATA_NOT_FOUND, "current epoch record for stream: " + this.getName()));
            } else if (this.currentEpochRecord.getVersion().equals(updated.getVersion())) {
                this.currentEpochRecord = copy;
                result.complete(copy.getVersion());
            } else {
                result.completeExceptionally(StoreException.create(StoreException.Type.WRITE_CONFLICT, "current epoch record for stream: " + this.getName()));
            }
        }
        return result;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    CompletableFuture<VersionedMetadata<EpochRecord>> getCurrentEpochRecordData(boolean ignoreCached) {
        Object object = this.lock;
        synchronized (object) {
            if (this.currentEpochRecord == null) {
                return Futures.failedFuture((Throwable)StoreException.create(StoreException.Type.DATA_NOT_FOUND, this.getName()));
            }
            return CompletableFuture.completedFuture(this.currentEpochRecord);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    CompletableFuture<Void> createEpochRecordDataIfAbsent(int epoch, EpochRecord data) {
        Preconditions.checkNotNull((Object)data);
        CompletableFuture<Void> result = new CompletableFuture<Void>();
        Object object = this.lock;
        synchronized (object) {
            this.epochRecords.putIfAbsent(epoch, new VersionedMetadata<EpochRecord>(data, new Version.IntVersion(0)));
            result.complete(null);
        }
        return result;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    CompletableFuture<VersionedMetadata<EpochRecord>> getEpochRecordData(int epoch) {
        Object object = this.lock;
        synchronized (object) {
            if (!this.epochRecords.containsKey(epoch)) {
                return Futures.failedFuture((Throwable)StoreException.create(StoreException.Type.DATA_NOT_FOUND, this.getName()));
            }
            return CompletableFuture.completedFuture(this.epochRecords.get(epoch));
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    CompletableFuture<Void> createSealedSegmentSizesMapShardDataIfAbsent(int shardNumber, SealedSegmentsMapShard data) {
        Preconditions.checkNotNull((Object)data);
        VersionedMetadata<SealedSegmentsMapShard> copy = new VersionedMetadata<SealedSegmentsMapShard>(data, new Version.IntVersion(0));
        Object object = this.lock;
        synchronized (object) {
            this.sealedSegmentsShards.putIfAbsent(shardNumber, copy);
        }
        return CompletableFuture.completedFuture(null);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    CompletableFuture<VersionedMetadata<SealedSegmentsMapShard>> getSealedSegmentSizesMapShardData(int shard) {
        Object object = this.lock;
        synchronized (object) {
            if (!this.sealedSegmentsShards.containsKey(shard)) {
                return Futures.failedFuture((Throwable)StoreException.create(StoreException.Type.DATA_NOT_FOUND, this.getName()));
            }
            return CompletableFuture.completedFuture(this.sealedSegmentsShards.get(shard));
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    CompletableFuture<Version> updateSealedSegmentSizesMapShardData(int shard, VersionedMetadata<SealedSegmentsMapShard> updated) {
        Preconditions.checkNotNull(updated);
        Preconditions.checkNotNull((Object)updated.getObject());
        CompletableFuture<Version> result = new CompletableFuture<Version>();
        VersionedMetadata<SealedSegmentsMapShard> copy = this.updatedCopy(updated);
        Object object = this.lock;
        synchronized (object) {
            if (!this.sealedSegmentsShards.containsKey(shard)) {
                result.completeExceptionally(StoreException.create(StoreException.Type.DATA_NOT_FOUND, "sealed segment size map shard for stream: " + this.getName()));
            } else if (this.sealedSegmentsShards.get(shard).getVersion().equals(updated.getVersion())) {
                this.sealedSegmentsShards.put(shard, copy);
                result.complete(copy.getVersion());
            } else {
                result.completeExceptionally(StoreException.create(StoreException.Type.WRITE_CONFLICT, "History time series for stream: " + this.getName()));
            }
        }
        return result;
    }

    @Override
    CompletableFuture<Void> createSegmentSealedEpochRecords(Collection<Long> segmentToSeal, int epoch) {
        return Futures.allOf((Collection)segmentToSeal.stream().map(x -> this.createSegmentSealedEpochRecordData((long)x, epoch)).collect(Collectors.toList()));
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private CompletableFuture<Void> createSegmentSealedEpochRecordData(long segment, int epoch) {
        Preconditions.checkNotNull((Object)epoch);
        Object object = this.lock;
        synchronized (object) {
            this.segmentSealingEpochs.putIfAbsent(segment, new VersionedMetadata<Integer>(epoch, new Version.IntVersion(0)));
        }
        return CompletableFuture.completedFuture(null);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    CompletableFuture<VersionedMetadata<Integer>> getSegmentSealedRecordData(long segmentId) {
        Object object = this.lock;
        synchronized (object) {
            if (!this.segmentSealingEpochs.containsKey(segmentId)) {
                return Futures.failedFuture((Throwable)StoreException.create(StoreException.Type.DATA_NOT_FOUND, this.getName()));
            }
            return CompletableFuture.completedFuture(this.segmentSealingEpochs.get(segmentId));
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    CompletableFuture<Version> createNewTransaction(int epoch, UUID txId, ActiveTxnRecord data) {
        Preconditions.checkNotNull((Object)txId);
        CompletableFuture<Version> result = new CompletableFuture<Version>();
        VersionedMetadata<ActiveTxnRecord> txnData = new VersionedMetadata<ActiveTxnRecord>(data, new Version.IntVersion(0));
        Object object = this.txnsLock;
        synchronized (object) {
            this.activeTxns.putIfAbsent(txId, txnData);
            this.epochTxnMap.compute(epoch, (x, y) -> {
                if (y == null) {
                    y = new HashSet<UUID>();
                }
                y.add(txId);
                return y;
            });
            result.complete(new Version.IntVersion(0));
        }
        return result;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    CompletableFuture<VersionedMetadata<ActiveTxnRecord>> getActiveTx(int epoch, UUID txId) {
        Object object = this.txnsLock;
        synchronized (object) {
            if (!this.activeTxns.containsKey(txId)) {
                return Futures.failedFuture((Throwable)StoreException.create(StoreException.Type.DATA_NOT_FOUND, "Stream: " + this.getName() + " Transaction: " + txId.toString()));
            }
            return CompletableFuture.completedFuture(this.activeTxns.get(txId));
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    CompletableFuture<Version> updateActiveTx(int epoch, UUID txId, VersionedMetadata<ActiveTxnRecord> data) {
        Preconditions.checkNotNull(data);
        CompletableFuture<Version> result = new CompletableFuture<Version>();
        VersionedMetadata<ActiveTxnRecord> updatedCopy = this.updatedCopy(data);
        Object object = this.txnsLock;
        synchronized (object) {
            if (!this.activeTxns.containsKey(txId)) {
                result.completeExceptionally(StoreException.create(StoreException.Type.DATA_NOT_FOUND, "Stream: " + this.getName() + " Transaction: " + txId.toString()));
            } else {
                this.activeTxns.compute(txId, (x, y) -> {
                    if (data.getVersion().equals(y.getVersion())) {
                        result.complete(updatedCopy.getVersion());
                        return updatedCopy;
                    }
                    result.completeExceptionally(StoreException.create(StoreException.Type.WRITE_CONFLICT, "Stream: " + this.getName() + " transaction id : " + txId));
                    return y;
                });
                result.complete(this.activeTxns.get(txId).getVersion());
            }
        }
        return result;
    }

    @Override
    CompletableFuture<Long> addTxnToCommitOrder(UUID txId) {
        long orderedPosition = this.counter.getAndIncrement();
        this.transactionCommitOrder.put(orderedPosition, txId);
        return CompletableFuture.completedFuture(orderedPosition);
    }

    @Override
    CompletableFuture<Void> removeTxnsFromCommitOrder(List<Long> positions) {
        positions.forEach(this.transactionCommitOrder::remove);
        return CompletableFuture.completedFuture(null);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    CompletableFuture<VersionedMetadata<CompletedTxnRecord>> getCompletedTx(UUID txId) {
        Preconditions.checkNotNull((Object)txId);
        Object object = this.txnsLock;
        synchronized (object) {
            VersionedMetadata value = (VersionedMetadata)this.completedTxns.getIfPresent((Object)txId);
            if (value == null) {
                return Futures.failedFuture((Throwable)StoreException.create(StoreException.Type.DATA_NOT_FOUND, "Stream: " + this.getName() + " Transaction: " + txId.toString()));
            }
            return CompletableFuture.completedFuture(value);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    CompletableFuture<Void> removeActiveTxEntry(int epoch, UUID txId) {
        Preconditions.checkNotNull((Object)txId);
        Object object = this.txnsLock;
        synchronized (object) {
            this.activeTxns.remove(txId);
            this.epochTxnMap.computeIfPresent(epoch, (x, y) -> {
                y.remove(txId);
                return y;
            });
            if (this.epochTxnMap.get(epoch).isEmpty()) {
                this.epochTxnMap.remove(epoch);
            }
        }
        return CompletableFuture.completedFuture(null);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    CompletableFuture<Void> createCompletedTxEntry(UUID txId, CompletedTxnRecord complete) {
        Preconditions.checkNotNull((Object)txId);
        Object object = this.txnsLock;
        synchronized (object) {
            VersionedMetadata value = (VersionedMetadata)this.completedTxns.getIfPresent((Object)txId);
            if (value == null) {
                this.completedTxns.put((Object)txId, new VersionedMetadata<CompletedTxnRecord>(complete, new Version.IntVersion(0)));
            }
        }
        return CompletableFuture.completedFuture(null);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    CompletableFuture<Void> createMarkerData(long segmentId, long timestamp) {
        Object object = this.markersLock;
        synchronized (object) {
            this.markers.putIfAbsent(segmentId, new VersionedMetadata<Long>(timestamp, new Version.IntVersion(0)));
        }
        return CompletableFuture.completedFuture(null);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    CompletableFuture<Version> updateMarkerData(long segmentId, VersionedMetadata<Long> data) {
        CompletableFuture<Version> result = new CompletableFuture<Version>();
        VersionedMetadata<Long> next = this.updatedCopy(data);
        Object object = this.markersLock;
        synchronized (object) {
            if (!this.markers.containsKey(segmentId)) {
                result.completeExceptionally(StoreException.create(StoreException.Type.DATA_NOT_FOUND, "Stream: " + this.getName() + " Segment number: " + segmentId));
            } else {
                this.markers.compute(segmentId, (x, y) -> {
                    if (y.getVersion().equals(data.getVersion())) {
                        result.complete(next.getVersion());
                        return next;
                    }
                    result.completeExceptionally(StoreException.create(StoreException.Type.WRITE_CONFLICT, "Stream: " + this.getName() + " Segment number: " + segmentId));
                    return y;
                });
            }
        }
        return result;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    CompletableFuture<Void> removeMarkerData(long segmentId) {
        Object object = this.markersLock;
        synchronized (object) {
            this.markers.remove(segmentId);
        }
        return CompletableFuture.completedFuture(null);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    CompletableFuture<VersionedMetadata<Long>> getMarkerData(long segmentId) {
        Object object = this.markersLock;
        synchronized (object) {
            if (!this.markers.containsKey(segmentId)) {
                return Futures.failedFuture((Throwable)StoreException.create(StoreException.Type.DATA_NOT_FOUND, "Stream: " + this.getName() + " Segment: " + segmentId));
            }
            return CompletableFuture.completedFuture(this.markers.get(segmentId));
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public CompletableFuture<Map<UUID, ActiveTxnRecord>> getActiveTxns() {
        Object object = this.txnsLock;
        synchronized (object) {
            return CompletableFuture.completedFuture(Collections.unmodifiableMap(this.activeTxns.entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, x -> (ActiveTxnRecord)((VersionedMetadata)x.getValue()).getObject()))));
        }
    }

    @Override
    CompletableFuture<List<Map.Entry<UUID, ActiveTxnRecord>>> getOrderedCommittingTxnInLowestEpoch() {
        ArrayList toPurge = new ArrayList();
        HashMap committing = new HashMap();
        AtomicInteger smallestEpoch = new AtomicInteger(Integer.MAX_VALUE);
        this.transactionCommitOrder.forEach((order, txId) -> {
            ActiveTxnRecord record;
            int epoch = RecordHelper.getTransactionEpoch(txId);
            Object object = this.txnsLock;
            synchronized (object) {
                record = this.activeTxns.containsKey(txId) ? this.activeTxns.get(txId).getObject() : ActiveTxnRecord.EMPTY;
            }
            switch (record.getTxnStatus()) {
                case COMMITTING: {
                    if (record.getCommitOrder() == order.longValue()) {
                        committing.put(txId, record);
                        if (smallestEpoch.get() <= epoch) break;
                        smallestEpoch.set(epoch);
                        break;
                    }
                    toPurge.add(order);
                    break;
                }
                case OPEN: {
                    break;
                }
                case COMMITTED: 
                case ABORTING: 
                case ABORTED: 
                case UNKNOWN: {
                    toPurge.add(order);
                }
            }
        });
        toPurge.forEach(this.transactionCommitOrder::remove);
        List list = committing.entrySet().stream().filter(x -> RecordHelper.getTransactionEpoch((UUID)x.getKey()) == smallestEpoch.get()).sorted(Comparator.comparing(x -> ((ActiveTxnRecord)x.getValue()).getCommitOrder())).collect(Collectors.toList());
        return CompletableFuture.completedFuture(list);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    CompletableFuture<Map<Long, UUID>> getAllOrderedCommittingTxns() {
        Object object = this.txnsLock;
        synchronized (object) {
            return CompletableFuture.completedFuture(Collections.unmodifiableMap(this.transactionCommitOrder));
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    CompletableFuture<Map<UUID, ActiveTxnRecord>> getTxnInEpoch(int epoch) {
        Object object = this.txnsLock;
        synchronized (object) {
            Map<Object, Object> map;
            Set<UUID> transactions = this.epochTxnMap.get(epoch);
            if (transactions != null) {
                map = this.activeTxns.entrySet().stream().filter(x -> transactions.contains(x.getKey())).collect(Collectors.toMap(Map.Entry::getKey, x -> (ActiveTxnRecord)((VersionedMetadata)x.getValue()).getObject()));
                map = Collections.unmodifiableMap(map);
            } else {
                map = Collections.emptyMap();
            }
            return CompletableFuture.completedFuture(map);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    CompletableFuture<Void> createRetentionSetDataIfAbsent(RetentionSet retention) {
        Preconditions.checkNotNull((Object)retention);
        CompletableFuture<Void> result = new CompletableFuture<Void>();
        Object object = this.lock;
        synchronized (object) {
            this.retentionSet = new VersionedMetadata<RetentionSet>(retention, new Version.IntVersion(0));
            result.complete(null);
        }
        return result;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    CompletableFuture<VersionedMetadata<RetentionSet>> getRetentionSetData() {
        CompletableFuture<VersionedMetadata<RetentionSet>> result = new CompletableFuture<VersionedMetadata<RetentionSet>>();
        Object object = this.lock;
        synchronized (object) {
            if (this.retentionSet == null) {
                result.completeExceptionally(StoreException.create(StoreException.Type.DATA_NOT_FOUND, this.getName()));
            } else {
                result.complete(this.retentionSet);
            }
        }
        return result;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    CompletableFuture<Version> updateRetentionSetData(VersionedMetadata<RetentionSet> retention) {
        Preconditions.checkNotNull(retention);
        Preconditions.checkNotNull((Object)retention.getObject());
        CompletableFuture<Version> result = new CompletableFuture<Version>();
        VersionedMetadata<RetentionSet> next = this.updatedCopy(retention);
        Object object = this.lock;
        synchronized (object) {
            if (this.retentionSet == null) {
                result.completeExceptionally(StoreException.create(StoreException.Type.DATA_NOT_FOUND, "retentionSet for stream: " + this.getName()));
            } else if (this.retentionSet.getVersion().equals(retention.getVersion())) {
                this.retentionSet = next;
                result.complete(next.getVersion());
            } else {
                result.completeExceptionally(StoreException.create(StoreException.Type.WRITE_CONFLICT, "retentionSet for stream: " + this.getName()));
            }
        }
        return result;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    CompletableFuture<Void> createEpochTransitionIfAbsent(EpochTransitionRecord epochTransitionData) {
        Preconditions.checkNotNull((Object)epochTransitionData);
        CompletableFuture result = new CompletableFuture();
        Object object = this.lock;
        synchronized (object) {
            if (this.epochTransition == null) {
                this.epochTransition = new VersionedMetadata<EpochTransitionRecord>(epochTransitionData, new Version.IntVersion(0));
            }
        }
        return CompletableFuture.completedFuture(null);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    CompletableFuture<Version> updateEpochTransitionNode(VersionedMetadata<EpochTransitionRecord> record) {
        Preconditions.checkNotNull(record);
        CompletableFuture<Version> result = new CompletableFuture<Version>();
        VersionedMetadata<EpochTransitionRecord> updatedCopy = this.updatedCopy(record);
        Object object = this.lock;
        synchronized (object) {
            if (this.epochTransition == null) {
                result.completeExceptionally(StoreException.create(StoreException.Type.DATA_NOT_FOUND, "epoch transition not found"));
            } else if (!Objects.equals(this.epochTransition.getVersion(), record.getVersion())) {
                result.completeExceptionally(StoreException.create(StoreException.Type.WRITE_CONFLICT, "epoch transition version mismatch"));
            } else {
                this.epochTransition = updatedCopy;
                result.complete(this.epochTransition.getVersion());
            }
        }
        return result;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    CompletableFuture<VersionedMetadata<EpochTransitionRecord>> getEpochTransitionNode() {
        CompletableFuture<VersionedMetadata<EpochTransitionRecord>> result = new CompletableFuture<VersionedMetadata<EpochTransitionRecord>>();
        Object object = this.lock;
        synchronized (object) {
            if (this.epochTransition == null) {
                result.completeExceptionally(StoreException.create(StoreException.Type.DATA_NOT_FOUND, "epoch transition not found"));
            } else {
                result.complete(this.epochTransition);
            }
        }
        return result;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    CompletableFuture<Void> createCommitTxnRecordIfAbsent(CommittingTransactionsRecord committingTxns) {
        Preconditions.checkNotNull((Object)committingTxns);
        CompletableFuture<Void> result = new CompletableFuture<Void>();
        Object object = this.lock;
        synchronized (object) {
            if (this.committingTxnRecord == null) {
                this.committingTxnRecord = new VersionedMetadata<CommittingTransactionsRecord>(committingTxns, new Version.IntVersion(0));
            }
            result.complete(null);
        }
        return result;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    CompletableFuture<VersionedMetadata<CommittingTransactionsRecord>> getCommitTxnRecord() {
        CompletableFuture<VersionedMetadata<CommittingTransactionsRecord>> result = new CompletableFuture<VersionedMetadata<CommittingTransactionsRecord>>();
        Object object = this.lock;
        synchronized (object) {
            if (this.committingTxnRecord == null) {
                result.completeExceptionally(StoreException.create(StoreException.Type.DATA_NOT_FOUND, "committing transactions not found"));
            } else {
                result.complete(this.committingTxnRecord);
            }
        }
        return result;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    CompletableFuture<Version> updateCommittingTxnRecord(VersionedMetadata<CommittingTransactionsRecord> record) {
        Preconditions.checkNotNull(record);
        CompletableFuture<Version> result = new CompletableFuture<Version>();
        VersionedMetadata<CommittingTransactionsRecord> updatedCopy = this.updatedCopy(record);
        Object object = this.lock;
        synchronized (object) {
            if (this.committingTxnRecord == null) {
                result.completeExceptionally(StoreException.create(StoreException.Type.DATA_NOT_FOUND, "committing txn not found"));
            } else if (!Objects.equals(this.committingTxnRecord.getVersion(), record.getVersion())) {
                result.completeExceptionally(StoreException.create(StoreException.Type.WRITE_CONFLICT, "committing txn version mismatch"));
            } else {
                this.committingTxnRecord = updatedCopy;
                result.complete(this.committingTxnRecord.getVersion());
            }
        }
        return result;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    CompletableFuture<Void> createWaitingRequestNodeIfAbsent(String data) {
        Object object = this.lock;
        synchronized (object) {
            if (this.waitingRequestNode == null) {
                this.waitingRequestNode = data;
            }
        }
        return CompletableFuture.completedFuture(null);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    CompletableFuture<String> getWaitingRequestNode() {
        CompletableFuture<String> result = new CompletableFuture<String>();
        Object object = this.lock;
        synchronized (object) {
            if (this.waitingRequestNode == null) {
                result.completeExceptionally(StoreException.create(StoreException.Type.DATA_NOT_FOUND, "waiting request node not found"));
            } else {
                result.complete(this.waitingRequestNode);
            }
        }
        return result;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    CompletableFuture<Void> deleteWaitingRequestNode() {
        Object object = this.lock;
        synchronized (object) {
            this.waitingRequestNode = null;
        }
        return CompletableFuture.completedFuture(null);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    CompletableFuture<Void> createWriterMarkRecord(String writer, long timestamp, ImmutableMap<Long, Long> position) {
        WriterMark mark = new WriterMark(timestamp, position);
        Object object = this.writersLock;
        synchronized (object) {
            VersionedMetadata<WriterMark> existing = this.writerMarks.get(writer);
            if (existing != null) {
                return Futures.failedFuture((Throwable)StoreException.create(StoreException.Type.DATA_EXISTS, "writer mark exists"));
            }
            this.writerMarks.put(writer, new VersionedMetadata<WriterMark>(mark, new Version.IntVersion(0)));
            return CompletableFuture.completedFuture(null);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public CompletableFuture<Void> removeWriterRecord(String writer, Version version) {
        Object object = this.writersLock;
        synchronized (object) {
            VersionedMetadata<WriterMark> existing = this.writerMarks.get(writer);
            if (existing != null && !Objects.equals(existing.getVersion(), version)) {
                return Futures.failedFuture((Throwable)StoreException.create(StoreException.Type.WRITE_CONFLICT, "writer mark version mismatch"));
            }
            this.writerMarks.remove(writer);
            return CompletableFuture.completedFuture(null);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    CompletableFuture<VersionedMetadata<WriterMark>> getWriterMarkRecord(String writer) {
        CompletableFuture<VersionedMetadata<WriterMark>> result = new CompletableFuture<VersionedMetadata<WriterMark>>();
        Object object = this.writersLock;
        synchronized (object) {
            VersionedMetadata<WriterMark> mark = this.writerMarks.get(writer);
            if (mark == null) {
                result.completeExceptionally(StoreException.create(StoreException.Type.DATA_NOT_FOUND, "writer mark not found"));
            } else {
                result.complete(mark);
            }
        }
        return result;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public CompletableFuture<Map<String, WriterMark>> getAllWriterMarks() {
        Map<String, WriterMark> result;
        Object object = this.writersLock;
        synchronized (object) {
            result = this.writerMarks.entrySet().stream().collect(Collectors.toMap(x -> (String)x.getKey(), x -> (WriterMark)((VersionedMetadata)x.getValue()).getObject()));
        }
        return CompletableFuture.completedFuture(result);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    CompletableFuture<Void> updateWriterMarkRecord(String writer, long timestamp, ImmutableMap<Long, Long> position, boolean isAlive, Version version) {
        CompletableFuture<Void> result = new CompletableFuture<Void>();
        VersionedMetadata<WriterMark> updatedCopy = this.updatedCopy(new VersionedMetadata<WriterMark>(new WriterMark(timestamp, position, isAlive), version));
        Object object = this.writersLock;
        synchronized (object) {
            VersionedMetadata<WriterMark> existing = this.writerMarks.get(writer);
            if (existing == null) {
                result.completeExceptionally(StoreException.create(StoreException.Type.DATA_NOT_FOUND, "writer mark not found"));
            } else if (!Objects.equals(existing.getVersion(), version)) {
                result.completeExceptionally(StoreException.create(StoreException.Type.WRITE_CONFLICT, "writer mark version mismatch"));
            } else {
                this.writerMarks.put(writer, updatedCopy);
                result.complete(null);
            }
        }
        return result;
    }

    private <T> VersionedMetadata<T> updatedCopy(VersionedMetadata<T> input) {
        return new VersionedMetadata<T>(input.getObject(), new Version.IntVersion(input.getVersion().asIntVersion().getIntValue() + 1));
    }
}

