/*
 * Decompiled with CFR 0.152.
 */
package io.pravega.controller.store.stream;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Strings;
import com.google.common.collect.ImmutableMap;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import io.pravega.client.stream.StreamConfiguration;
import io.pravega.common.Exceptions;
import io.pravega.common.concurrent.Futures;
import io.pravega.common.util.BitConverter;
import io.pravega.controller.store.stream.AbstractStreamMetadataStore;
import io.pravega.controller.store.stream.CreateStreamResponse;
import io.pravega.controller.store.stream.PersistentStreamBase;
import io.pravega.controller.store.stream.State;
import io.pravega.controller.store.stream.StoreException;
import io.pravega.controller.store.stream.Version;
import io.pravega.controller.store.stream.VersionedMetadata;
import io.pravega.controller.store.stream.ZKStoreHelper;
import io.pravega.controller.store.stream.ZkOrderedStore;
import io.pravega.controller.store.stream.records.ActiveTxnRecord;
import io.pravega.controller.store.stream.records.CommittingTransactionsRecord;
import io.pravega.controller.store.stream.records.CompletedTxnRecord;
import io.pravega.controller.store.stream.records.EpochRecord;
import io.pravega.controller.store.stream.records.EpochTransitionRecord;
import io.pravega.controller.store.stream.records.HistoryTimeSeries;
import io.pravega.controller.store.stream.records.RetentionSet;
import io.pravega.controller.store.stream.records.SealedSegmentsMapShard;
import io.pravega.controller.store.stream.records.StateRecord;
import io.pravega.controller.store.stream.records.StreamConfigurationRecord;
import io.pravega.controller.store.stream.records.StreamCutRecord;
import io.pravega.controller.store.stream.records.StreamTruncationRecord;
import io.pravega.controller.store.stream.records.WriterMark;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import org.apache.curator.utils.ZKPaths;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

class ZKStream
extends PersistentStreamBase {
    @SuppressFBWarnings(justification="generated code")
    private static final Logger log = LoggerFactory.getLogger(ZKStream.class);
    private static final String SCOPE_PATH = "/store/%s";
    private static final String STREAM_PATH = "/store/%s/%s";
    private static final String CREATION_TIME_PATH = "/store/%s/%s/creationTime";
    private static final String CONFIGURATION_PATH = "/store/%s/%s/configuration";
    private static final String TRUNCATION_PATH = "/store/%s/%s/truncation";
    private static final String STATE_PATH = "/store/%s/%s/state";
    private static final String EPOCH_TRANSITION_PATH = "/store/%s/%s/epochTransition";
    private static final String RETENTION_SET_PATH = "/store/%s/%s/retention";
    private static final String RETENTION_STREAM_CUT_RECORD_PATH = "/store/%s/%s/retentionCuts";
    private static final String CURRENT_EPOCH_RECORD = "/store/%s/%s/currentEpochRecord";
    private static final String EPOCH_RECORD = "/store/%s/%s/epochRecords";
    private static final String HISTORY_TIMESERIES_CHUNK_PATH = "/store/%s/%s/historyTimeSeriesChunks";
    private static final String SEGMENTS_SEALED_SIZE_MAP_SHARD_PATH = "/store/%s/%s/segmentsSealedSizeMapShardPath";
    private static final String SEGMENT_SEALED_EPOCH_PATH = "/store/%s/%s/segmentSealedEpochPath";
    private static final String COMMITTING_TXNS_PATH = "/store/%s/%s/committingTxns";
    private static final String WRITER_POSITIONS_PATH = "/store/%s/%s/writerPositions";
    private static final String WAITING_REQUEST_PROCESSOR_PATH = "/store/%s/%s/waitingRequestProcessor";
    private static final String MARKER_PATH = "/store/%s/%s/markers";
    private static final String ID_PATH = "/store/%s/%s/id";
    private static final String STREAM_ACTIVE_TX_PATH = "/transactions/activeTx/%s/%S";
    private static final String STREAM_COMPLETED_TX_BATCH_PATH = "/transactions/completedTx/batches/%d/%s/%s";
    private final ZKStoreHelper store;
    @VisibleForTesting
    private final String creationPath;
    private final String configurationPath;
    private final String truncationPath;
    private final String statePath;
    private final String epochTransitionPath;
    private final String committingTxnsPath;
    private final String waitingRequestProcessorPath;
    private final String activeTxRoot;
    private final String markerPath;
    private final String idPath;
    private final String streamPath;
    private final String retentionSetPath;
    private final String retentionStreamCutRecordPathFormat;
    private final String currentEpochRecordPath;
    private final String epochRecordPathFormat;
    private final String historyTimeSeriesChunkPathFormat;
    private final String segmentSealedEpochPathFormat;
    private final String segmentsSealedSizeMapShardPathFormat;
    private final String writerPositionsPath;
    private final Supplier<Integer> currentBatchSupplier;
    private final Executor executor;
    private final ZkOrderedStore txnCommitOrderer;
    private final AtomicReference<String> idRef;

    @VisibleForTesting
    ZKStream(String scopeName, String streamName, ZKStoreHelper storeHelper, Executor executor, ZkOrderedStore txnCommitOrderer) {
        this(scopeName, streamName, storeHelper, () -> 0, executor, txnCommitOrderer);
    }

    @VisibleForTesting
    ZKStream(String scopeName, String streamName, ZKStoreHelper storeHelper, int chunkSize, int shardSize, Executor executor, ZkOrderedStore txnCommitOrderer) {
        this(scopeName, streamName, storeHelper, () -> 0, chunkSize, shardSize, executor, txnCommitOrderer);
    }

    @VisibleForTesting
    ZKStream(String scopeName, String streamName, ZKStoreHelper storeHelper, Supplier<Integer> currentBatchSupplier, Executor executor, ZkOrderedStore txnCommitOrderer) {
        this(scopeName, streamName, storeHelper, currentBatchSupplier, 1000, 1000, executor, txnCommitOrderer);
    }

    @VisibleForTesting
    ZKStream(String scopeName, String streamName, ZKStoreHelper storeHelper, Supplier<Integer> currentBatchSupplier, int chunkSize, int shardSize, Executor executor, ZkOrderedStore txnCommitOrderer) {
        super(scopeName, streamName, chunkSize, shardSize);
        this.store = storeHelper;
        this.streamPath = String.format(STREAM_PATH, scopeName, streamName);
        this.creationPath = String.format(CREATION_TIME_PATH, scopeName, streamName);
        this.configurationPath = String.format(CONFIGURATION_PATH, scopeName, streamName);
        this.truncationPath = String.format(TRUNCATION_PATH, scopeName, streamName);
        this.statePath = String.format(STATE_PATH, scopeName, streamName);
        this.retentionSetPath = String.format(RETENTION_SET_PATH, scopeName, streamName);
        this.retentionStreamCutRecordPathFormat = String.format(RETENTION_STREAM_CUT_RECORD_PATH, scopeName, streamName) + "/%d";
        this.epochTransitionPath = String.format(EPOCH_TRANSITION_PATH, scopeName, streamName);
        this.activeTxRoot = String.format(STREAM_ACTIVE_TX_PATH, scopeName, streamName);
        this.committingTxnsPath = String.format(COMMITTING_TXNS_PATH, scopeName, streamName);
        this.waitingRequestProcessorPath = String.format(WAITING_REQUEST_PROCESSOR_PATH, scopeName, streamName);
        this.markerPath = String.format(MARKER_PATH, scopeName, streamName);
        this.idPath = String.format(ID_PATH, scopeName, streamName);
        this.currentEpochRecordPath = String.format(CURRENT_EPOCH_RECORD, scopeName, streamName);
        this.epochRecordPathFormat = String.format(EPOCH_RECORD, scopeName, streamName) + "/%d";
        this.historyTimeSeriesChunkPathFormat = String.format(HISTORY_TIMESERIES_CHUNK_PATH, scopeName, streamName) + "/%d";
        this.segmentSealedEpochPathFormat = String.format(SEGMENT_SEALED_EPOCH_PATH, scopeName, streamName) + "/%d";
        this.segmentsSealedSizeMapShardPathFormat = String.format(SEGMENTS_SEALED_SIZE_MAP_SHARD_PATH, scopeName, streamName) + "/%d";
        this.writerPositionsPath = String.format(WRITER_POSITIONS_PATH, scopeName, streamName);
        this.idRef = new AtomicReference();
        this.currentBatchSupplier = currentBatchSupplier;
        this.executor = executor;
        this.txnCommitOrderer = txnCommitOrderer;
    }

    @Override
    public CompletableFuture<Integer> getNumberOfOngoingTransactions() {
        return ((CompletableFuture)this.store.getChildren(this.activeTxRoot).thenCompose(list -> Futures.allOfWithResults(list.stream().map(epoch -> this.getNumberOfOngoingTransactions(Integer.parseInt(epoch))).collect(Collectors.toList())))).thenApply(list -> list.stream().reduce(0, Integer::sum));
    }

    private CompletableFuture<Integer> getNumberOfOngoingTransactions(int epoch) {
        return this.store.getChildren(this.getEpochPath(epoch)).thenApply(List::size);
    }

    @Override
    public CompletableFuture<Void> deleteStream() {
        return this.store.deleteTree(this.streamPath);
    }

    @Override
    public CompletableFuture<CreateStreamResponse> checkStreamExists(StreamConfiguration configuration, long creationTime, int startingSegmentNumber) {
        return this.store.checkExists(this.creationPath).thenCompose(exists -> {
            if (!exists.booleanValue()) {
                return CompletableFuture.completedFuture(new CreateStreamResponse(CreateStreamResponse.CreateStatus.NEW, configuration, creationTime, startingSegmentNumber));
            }
            return this.getCreationTime().thenCompose(storedCreationTime -> this.store.checkExists(this.configurationPath).thenCompose(configExists -> {
                if (configExists.booleanValue()) {
                    return this.handleConfigExists((long)storedCreationTime, startingSegmentNumber, storedCreationTime == creationTime);
                }
                return CompletableFuture.completedFuture(new CreateStreamResponse(CreateStreamResponse.CreateStatus.NEW, configuration, (long)storedCreationTime, startingSegmentNumber));
            }));
        });
    }

    @Override
    CompletableFuture<Void> createStreamMetadata() {
        return Futures.toVoid(this.store.createZNodeIfNotExist(this.getStreamPath()));
    }

    private CompletableFuture<CreateStreamResponse> handleConfigExists(long creationTime, int startingSegmentNumber, boolean creationTimeMatched) {
        CreateStreamResponse.CreateStatus status = creationTimeMatched ? CreateStreamResponse.CreateStatus.NEW : CreateStreamResponse.CreateStatus.EXISTS_CREATING;
        return this.getConfiguration().thenCompose(config -> this.store.checkExists(this.statePath).thenCompose(stateExists -> {
            if (!stateExists.booleanValue()) {
                return CompletableFuture.completedFuture(new CreateStreamResponse(status, (StreamConfiguration)config, creationTime, startingSegmentNumber));
            }
            return this.getState(false).thenApply(state -> {
                if (state.equals((Object)State.UNKNOWN) || state.equals((Object)State.CREATING)) {
                    return new CreateStreamResponse(status, (StreamConfiguration)config, creationTime, startingSegmentNumber);
                }
                return new CreateStreamResponse(CreateStreamResponse.CreateStatus.EXISTS_ACTIVE, (StreamConfiguration)config, creationTime, startingSegmentNumber);
            });
        }));
    }

    @Override
    public CompletableFuture<Long> getCreationTime() {
        return this.getId().thenCompose(id -> this.store.getCachedData(this.creationPath, (String)id, x -> BitConverter.readLong((byte[])x, (int)0)).thenApply(VersionedMetadata::getObject));
    }

    @Override
    CompletableFuture<Void> createRetentionSetDataIfAbsent(RetentionSet data) {
        return Futures.toVoid(this.store.createZNodeIfNotExist(this.retentionSetPath, data.toBytes()));
    }

    @Override
    CompletableFuture<VersionedMetadata<RetentionSet>> getRetentionSetData() {
        return this.store.getData(this.retentionSetPath, RetentionSet::fromBytes);
    }

    @Override
    CompletableFuture<Version> updateRetentionSetData(VersionedMetadata<RetentionSet> retention) {
        return this.store.setData(this.retentionSetPath, retention.getObject().toBytes(), retention.getVersion()).thenApply(Version.IntVersion::new);
    }

    @Override
    CompletableFuture<Void> createStreamCutRecordData(long recordingTime, StreamCutRecord record) {
        String path = String.format(this.retentionStreamCutRecordPathFormat, recordingTime);
        return Futures.toVoid(this.store.createZNodeIfNotExist(path, record.toBytes()));
    }

    @Override
    CompletableFuture<VersionedMetadata<StreamCutRecord>> getStreamCutRecordData(long recordingTime) {
        String path = String.format(this.retentionStreamCutRecordPathFormat, recordingTime);
        return this.getId().thenCompose(id -> this.store.getCachedData(path, (String)id, StreamCutRecord::fromBytes));
    }

    @Override
    CompletableFuture<Void> deleteStreamCutRecordData(long recordingTime) {
        String path = String.format(this.retentionStreamCutRecordPathFormat, recordingTime);
        return this.getId().thenCompose(id -> this.store.deletePath(path, false).thenAccept(x -> this.store.invalidateCache(path, (String)id)));
    }

    @Override
    CompletableFuture<Void> createHistoryTimeSeriesChunkDataIfAbsent(int chunkNumber, HistoryTimeSeries data) {
        String path = String.format(this.historyTimeSeriesChunkPathFormat, chunkNumber);
        return Futures.toVoid(this.store.createZNodeIfNotExist(path, data.toBytes()));
    }

    @Override
    CompletableFuture<VersionedMetadata<HistoryTimeSeries>> getHistoryTimeSeriesChunkData(int chunkNumber, boolean ignoreCached) {
        return this.getId().thenCompose(id -> {
            String path = String.format(this.historyTimeSeriesChunkPathFormat, chunkNumber);
            if (ignoreCached) {
                return this.store.getData(path, HistoryTimeSeries::fromBytes);
            }
            return this.store.getCachedData(path, (String)id, HistoryTimeSeries::fromBytes);
        });
    }

    @Override
    CompletableFuture<Version> updateHistoryTimeSeriesChunkData(int chunkNumber, VersionedMetadata<HistoryTimeSeries> data) {
        String path = String.format(this.historyTimeSeriesChunkPathFormat, chunkNumber);
        return this.store.setData(path, data.getObject().toBytes(), data.getVersion()).thenApply(Version.IntVersion::new);
    }

    @Override
    CompletableFuture<Void> createCurrentEpochRecordDataIfAbsent(EpochRecord data) {
        byte[] epochData = new byte[4];
        BitConverter.writeInt((byte[])epochData, (int)0, (int)data.getEpoch());
        return Futures.toVoid(this.store.createZNodeIfNotExist(this.currentEpochRecordPath, epochData));
    }

    @Override
    CompletableFuture<Version> updateCurrentEpochRecordData(VersionedMetadata<EpochRecord> data) {
        byte[] epochData = new byte[4];
        BitConverter.writeInt((byte[])epochData, (int)0, (int)data.getObject().getEpoch());
        return this.store.setData(this.currentEpochRecordPath, epochData, data.getVersion()).thenApply(Version.IntVersion::new);
    }

    @Override
    CompletableFuture<VersionedMetadata<EpochRecord>> getCurrentEpochRecordData(boolean ignoreCached) {
        return this.getId().thenCompose(id -> {
            CompletableFuture<VersionedMetadata<Integer>> future = ignoreCached ? this.store.getData(this.currentEpochRecordPath, x -> BitConverter.readInt((byte[])x, (int)0)) : this.store.getCachedData(this.currentEpochRecordPath, (String)id, x -> BitConverter.readInt((byte[])x, (int)0));
            return future.thenCompose(versionedEpochNumber -> this.getEpochRecord((Integer)versionedEpochNumber.getObject()).thenApply(epochRecord -> new VersionedMetadata<EpochRecord>((EpochRecord)epochRecord, versionedEpochNumber.getVersion())));
        });
    }

    @Override
    CompletableFuture<Void> createEpochRecordDataIfAbsent(int epoch, EpochRecord data) {
        String path = String.format(this.epochRecordPathFormat, epoch);
        return Futures.toVoid(this.store.createZNodeIfNotExist(path, data.toBytes()));
    }

    @Override
    CompletableFuture<VersionedMetadata<EpochRecord>> getEpochRecordData(int epoch) {
        String path = String.format(this.epochRecordPathFormat, epoch);
        return this.getId().thenCompose(id -> this.store.getCachedData(path, (String)id, EpochRecord::fromBytes));
    }

    @Override
    CompletableFuture<Void> createSealedSegmentSizesMapShardDataIfAbsent(int shard, SealedSegmentsMapShard data) {
        String path = String.format(this.segmentsSealedSizeMapShardPathFormat, shard);
        return Futures.toVoid(this.store.createZNodeIfNotExist(path, data.toBytes()));
    }

    @Override
    CompletableFuture<VersionedMetadata<SealedSegmentsMapShard>> getSealedSegmentSizesMapShardData(int shard) {
        String path = String.format(this.segmentsSealedSizeMapShardPathFormat, shard);
        return this.store.getData(path, SealedSegmentsMapShard::fromBytes);
    }

    @Override
    CompletableFuture<Version> updateSealedSegmentSizesMapShardData(int shard, VersionedMetadata<SealedSegmentsMapShard> data) {
        String path = String.format(this.segmentsSealedSizeMapShardPathFormat, shard);
        return this.store.setData(path, data.getObject().toBytes(), data.getVersion()).thenApply(Version.IntVersion::new);
    }

    @Override
    CompletableFuture<Void> createSegmentSealedEpochRecords(Collection<Long> segmentToSeal, int epoch) {
        return Futures.allOf((Collection)segmentToSeal.stream().map(x -> this.createSegmentSealedEpochRecordData((long)x, epoch)).collect(Collectors.toList()));
    }

    CompletableFuture<Void> createSegmentSealedEpochRecordData(long segmentToSeal, int epoch) {
        String path = String.format(this.segmentSealedEpochPathFormat, segmentToSeal);
        byte[] epochData = new byte[4];
        BitConverter.writeInt((byte[])epochData, (int)0, (int)epoch);
        return Futures.toVoid(this.store.createZNodeIfNotExist(path, epochData));
    }

    @Override
    CompletableFuture<VersionedMetadata<Integer>> getSegmentSealedRecordData(long segmentId) {
        String path = String.format(this.segmentSealedEpochPathFormat, segmentId);
        return this.getId().thenCompose(id -> this.store.getCachedData(path, (String)id, x -> BitConverter.readInt((byte[])x, (int)0)));
    }

    @Override
    CompletableFuture<Void> createEpochTransitionIfAbsent(EpochTransitionRecord epochTransition) {
        return Futures.toVoid(this.store.createZNodeIfNotExist(this.epochTransitionPath, epochTransition.toBytes()));
    }

    @Override
    CompletableFuture<Version> updateEpochTransitionNode(VersionedMetadata<EpochTransitionRecord> epochTransition) {
        return this.store.setData(this.epochTransitionPath, epochTransition.getObject().toBytes(), epochTransition.getVersion()).thenApply(Version.IntVersion::new);
    }

    @Override
    CompletableFuture<VersionedMetadata<EpochTransitionRecord>> getEpochTransitionNode() {
        return this.store.getData(this.epochTransitionPath, EpochTransitionRecord::fromBytes);
    }

    @Override
    CompletableFuture<Void> storeCreationTimeIfAbsent(long creationTime) {
        byte[] b = new byte[8];
        BitConverter.writeLong((byte[])b, (int)0, (long)creationTime);
        return Futures.toVoid(this.store.createZNodeIfNotExist(this.creationPath, b));
    }

    @Override
    public CompletableFuture<Void> createConfigurationIfAbsent(StreamConfigurationRecord configuration) {
        return Futures.toVoid(this.store.createZNodeIfNotExist(this.configurationPath, configuration.toBytes()));
    }

    @Override
    public CompletableFuture<Void> createStateIfAbsent(StateRecord state) {
        return Futures.toVoid(this.store.createZNodeIfNotExist(this.statePath, state.toBytes()));
    }

    @Override
    public CompletableFuture<Void> createMarkerData(long segmentId, long timestamp) {
        String path = ZKPaths.makePath((String)this.markerPath, (String)String.format("%d", segmentId));
        byte[] b = new byte[8];
        BitConverter.writeLong((byte[])b, (int)0, (long)timestamp);
        return this.getId().thenCompose(id -> this.store.createZNodeIfNotExist(path, b).thenAccept(x -> this.store.invalidateCache(this.markerPath, (String)id)));
    }

    @Override
    CompletableFuture<Version> updateMarkerData(long segmentId, VersionedMetadata<Long> data) {
        String path = ZKPaths.makePath((String)this.markerPath, (String)String.format("%d", segmentId));
        byte[] b = new byte[8];
        BitConverter.writeLong((byte[])b, (int)0, (long)data.getObject());
        return this.store.setData(path, b, data.getVersion()).thenApply(Version.IntVersion::new);
    }

    @Override
    CompletableFuture<VersionedMetadata<Long>> getMarkerData(long segmentId) {
        CompletableFuture<VersionedMetadata<Long>> result = new CompletableFuture<VersionedMetadata<Long>>();
        String path = ZKPaths.makePath((String)this.markerPath, (String)String.format("%d", segmentId));
        this.store.getData(path, x -> BitConverter.readLong((byte[])x, (int)0)).whenComplete((res, ex) -> {
            if (ex != null) {
                Throwable cause = Exceptions.unwrap((Throwable)ex);
                if (cause instanceof StoreException.DataNotFoundException) {
                    result.complete(null);
                } else {
                    result.completeExceptionally(cause);
                }
            } else {
                result.complete((VersionedMetadata<Long>)res);
            }
        });
        return result;
    }

    @Override
    CompletableFuture<Void> removeMarkerData(long segmentId) {
        String path = ZKPaths.makePath((String)this.markerPath, (String)String.format("%d", segmentId));
        return this.getId().thenCompose(id -> this.store.deletePath(path, false).whenComplete((r, e) -> this.store.invalidateCache(path, (String)id)));
    }

    @Override
    public CompletableFuture<Map<UUID, ActiveTxnRecord>> getActiveTxns() {
        return this.store.getChildren(this.activeTxRoot).thenCompose(children -> Futures.allOfWithResults(children.stream().map(x -> this.getTxnInEpoch(Integer.parseInt(x))).collect(Collectors.toList())).thenApply(list -> {
            HashMap map = new HashMap();
            list.forEach(map::putAll);
            return map;
        }));
    }

    @Override
    public CompletableFuture<Map<UUID, ActiveTxnRecord>> getTxnInEpoch(int epoch) {
        VersionedMetadata empty = ZKStream.getEmptyData();
        return Futures.exceptionallyExpecting(this.store.getChildren(this.getEpochPath(epoch)), e -> Exceptions.unwrap((Throwable)e) instanceof StoreException.DataNotFoundException, Collections.emptyList()).thenCompose(txIds -> Futures.allOfWithResults(txIds.stream().collect(Collectors.toMap(txId -> txId, txId -> Futures.exceptionallyExpecting(this.store.getData(this.getActiveTxPath(epoch, (String)txId), ActiveTxnRecord::fromBytes), e -> Exceptions.unwrap((Throwable)e) instanceof StoreException.DataNotFoundException, (Object)empty)))).thenApply(txnMap -> txnMap.entrySet().stream().filter(x -> !((VersionedMetadata)x.getValue()).equals(empty)).collect(Collectors.toMap(x -> UUID.fromString((String)x.getKey()), x -> (ActiveTxnRecord)((VersionedMetadata)x.getValue()).getObject()))));
    }

    @Override
    public CompletableFuture<List<Map.Entry<UUID, ActiveTxnRecord>>> getOrderedCommittingTxnInLowestEpoch() {
        return super.getOrderedCommittingTxnInLowestEpochHelper(this.txnCommitOrderer, this.executor);
    }

    @Override
    @VisibleForTesting
    CompletableFuture<Map<Long, UUID>> getAllOrderedCommittingTxns() {
        return super.getAllOrderedCommittingTxnsHelper(this.txnCommitOrderer);
    }

    @Override
    CompletableFuture<Version> createNewTransaction(int epoch, UUID txId, ActiveTxnRecord txnRecord) {
        String activePath = this.getActiveTxPath(epoch, txId.toString());
        return this.store.createZNodeIfNotExist(activePath, txnRecord.toBytes(), true).thenApply(Version.IntVersion::new);
    }

    @Override
    CompletableFuture<VersionedMetadata<ActiveTxnRecord>> getActiveTx(int epoch, UUID txId) {
        String activeTxPath = this.getActiveTxPath(epoch, txId.toString());
        return this.store.getData(activeTxPath, ActiveTxnRecord::fromBytes);
    }

    @Override
    CompletableFuture<Version> updateActiveTx(int epoch, UUID txId, VersionedMetadata<ActiveTxnRecord> data) {
        String activeTxPath = this.getActiveTxPath(epoch, txId.toString());
        return this.store.setData(activeTxPath, data.getObject().toBytes(), data.getVersion()).thenApply(Version.IntVersion::new);
    }

    @Override
    CompletableFuture<Void> removeActiveTxEntry(int epoch, UUID txId) {
        String activePath = this.getActiveTxPath(epoch, txId.toString());
        return this.store.deletePath(activePath, true);
    }

    @Override
    CompletableFuture<Long> addTxnToCommitOrder(UUID txId) {
        return this.txnCommitOrderer.addEntity(this.getScope(), this.getName(), txId.toString());
    }

    @Override
    CompletableFuture<Void> removeTxnsFromCommitOrder(List<Long> orderedPositions) {
        return this.txnCommitOrderer.removeEntities(this.getScope(), this.getName(), orderedPositions);
    }

    @Override
    CompletableFuture<Void> createCompletedTxEntry(UUID txId, CompletedTxnRecord complete) {
        String root = String.format(STREAM_COMPLETED_TX_BATCH_PATH, this.currentBatchSupplier.get(), this.getScope(), this.getName());
        String path = ZKPaths.makePath((String)root, (String)txId.toString());
        return Futures.toVoid(this.store.createZNodeIfNotExist(path, complete.toBytes()));
    }

    @Override
    CompletableFuture<VersionedMetadata<CompletedTxnRecord>> getCompletedTx(UUID txId) {
        return this.getId().thenCompose(id -> ((CompletableFuture)this.store.getChildren("/transactions/completedTx/batches").thenCompose(children -> Futures.allOfWithResults(children.stream().map(child -> {
            String root = String.format(STREAM_COMPLETED_TX_BATCH_PATH, Long.parseLong(child), this.getScope(), this.getName());
            String path = ZKPaths.makePath((String)root, (String)txId.toString());
            return this.store.getCachedData(path, (String)id, CompletedTxnRecord::fromBytes).exceptionally(e -> {
                if (Exceptions.unwrap((Throwable)e) instanceof StoreException.DataNotFoundException) {
                    return null;
                }
                log.error("Exception while trying to fetch completed transaction status", e);
                throw new CompletionException((Throwable)e);
            });
        }).collect(Collectors.toList())))).thenCompose(result -> {
            Optional<VersionedMetadata> any = result.stream().filter(Objects::nonNull).findFirst();
            if (any.isPresent()) {
                return CompletableFuture.completedFuture(any.get());
            }
            throw StoreException.create(StoreException.Type.DATA_NOT_FOUND, "Completed Txn not found");
        }));
    }

    @Override
    public CompletableFuture<Void> createTruncationDataIfAbsent(StreamTruncationRecord truncationRecord) {
        return Futures.toVoid(this.store.createZNodeIfNotExist(this.truncationPath, truncationRecord.toBytes()));
    }

    @Override
    CompletableFuture<Version> setTruncationData(VersionedMetadata<StreamTruncationRecord> truncationRecord) {
        return this.getId().thenCompose(id -> this.store.setData(this.truncationPath, ((StreamTruncationRecord)truncationRecord.getObject()).toBytes(), truncationRecord.getVersion()).thenApply(r -> {
            this.store.invalidateCache(this.truncationPath, (String)id);
            return new Version.IntVersion((int)r);
        }));
    }

    @Override
    CompletableFuture<VersionedMetadata<StreamTruncationRecord>> getTruncationData(boolean ignoreCached) {
        return this.getId().thenCompose(id -> {
            if (ignoreCached) {
                return this.store.getData(this.truncationPath, StreamTruncationRecord::fromBytes);
            }
            return this.store.getCachedData(this.truncationPath, (String)id, StreamTruncationRecord::fromBytes);
        });
    }

    @Override
    CompletableFuture<Version> setConfigurationData(VersionedMetadata<StreamConfigurationRecord> configuration) {
        return this.getId().thenCompose(id -> this.store.setData(this.configurationPath, ((StreamConfigurationRecord)configuration.getObject()).toBytes(), configuration.getVersion()).thenApply(r -> {
            this.store.invalidateCache(this.configurationPath, (String)id);
            return new Version.IntVersion((int)r);
        }));
    }

    @Override
    CompletableFuture<VersionedMetadata<StreamConfigurationRecord>> getConfigurationData(boolean ignoreCached) {
        return this.getId().thenCompose(id -> {
            if (ignoreCached) {
                return this.store.getData(this.configurationPath, StreamConfigurationRecord::fromBytes);
            }
            return this.store.getCachedData(this.configurationPath, (String)id, StreamConfigurationRecord::fromBytes);
        });
    }

    @Override
    CompletableFuture<Version> setStateData(VersionedMetadata<StateRecord> state) {
        return this.getId().thenCompose(id -> this.store.setData(this.statePath, ((StateRecord)state.getObject()).toBytes(), state.getVersion()).thenApply(r -> {
            this.store.invalidateCache(this.statePath, (String)id);
            return new Version.IntVersion((int)r);
        }));
    }

    @Override
    CompletableFuture<VersionedMetadata<StateRecord>> getStateData(boolean ignoreCached) {
        return this.getId().thenCompose(id -> {
            if (ignoreCached) {
                return this.store.getData(this.statePath, StateRecord::fromBytes);
            }
            return this.store.getCachedData(this.statePath, (String)id, StateRecord::fromBytes);
        });
    }

    @Override
    CompletableFuture<Void> createCommitTxnRecordIfAbsent(CommittingTransactionsRecord committingTxns) {
        return Futures.toVoid(this.store.createZNodeIfNotExist(this.committingTxnsPath, committingTxns.toBytes()));
    }

    @Override
    CompletableFuture<VersionedMetadata<CommittingTransactionsRecord>> getCommitTxnRecord() {
        return this.store.getData(this.committingTxnsPath, CommittingTransactionsRecord::fromBytes);
    }

    @Override
    CompletableFuture<Version> updateCommittingTxnRecord(VersionedMetadata<CommittingTransactionsRecord> update) {
        return this.store.setData(this.committingTxnsPath, update.getObject().toBytes(), update.getVersion()).thenApply(Version.IntVersion::new);
    }

    @Override
    CompletableFuture<Void> createWaitingRequestNodeIfAbsent(String waitingRequestProcessor) {
        return Futures.toVoid(this.store.createZNodeIfNotExist(this.waitingRequestProcessorPath, waitingRequestProcessor.getBytes(StandardCharsets.UTF_8)));
    }

    @Override
    CompletableFuture<String> getWaitingRequestNode() {
        return this.store.getData(this.waitingRequestProcessorPath, x -> StandardCharsets.UTF_8.decode(ByteBuffer.wrap(x)).toString()).thenApply(VersionedMetadata::getObject);
    }

    @Override
    CompletableFuture<Void> deleteWaitingRequestNode() {
        return this.store.deletePath(this.waitingRequestProcessorPath, false);
    }

    @Override
    CompletableFuture<Void> createWriterMarkRecord(String writer, long timestamp, ImmutableMap<Long, Long> position) {
        String writerPath = this.getWriterPath(writer);
        WriterMark mark = new WriterMark(timestamp, position);
        return Futures.toVoid(this.store.createZNode(writerPath, mark.toBytes()));
    }

    @Override
    public CompletableFuture<Void> removeWriterRecord(String writer, Version version) {
        String writerPath = this.getWriterPath(writer);
        return this.store.deleteNode(writerPath, version);
    }

    @Override
    CompletableFuture<VersionedMetadata<WriterMark>> getWriterMarkRecord(String writer) {
        String writerPath = this.getWriterPath(writer);
        return this.store.getData(writerPath, WriterMark::fromBytes);
    }

    @Override
    CompletableFuture<Void> updateWriterMarkRecord(String writer, long timestamp, ImmutableMap<Long, Long> position, boolean isAlive, Version version) {
        String writerPath = this.getWriterPath(writer);
        WriterMark mark = new WriterMark(timestamp, position, isAlive);
        return Futures.toVoid(this.store.setData(writerPath, mark.toBytes(), version));
    }

    private String getWriterPath(String writer) {
        return ZKPaths.makePath((String)this.writerPositionsPath, (String)writer);
    }

    @Override
    public CompletableFuture<Map<String, WriterMark>> getAllWriterMarks() {
        return this.store.getChildren(this.writerPositionsPath).thenCompose(children -> Futures.allOfWithResults(children.stream().collect(Collectors.toMap(writer -> writer, writer -> Futures.exceptionallyExpecting(this.getWriterMark((String)writer), AbstractStreamMetadataStore.DATA_NOT_FOUND_PREDICATE, (Object)WriterMark.EMPTY)))).thenApply(map -> map.entrySet().stream().filter(x -> !((WriterMark)x.getValue()).equals(WriterMark.EMPTY)).collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue))));
    }

    @Override
    public void refresh() {
        String id = this.idRef.getAndSet(null);
        id = id == null ? "" : id;
        this.store.invalidateCache(this.statePath, id);
        this.store.invalidateCache(this.configurationPath, id);
        this.store.invalidateCache(this.truncationPath, id);
        this.store.invalidateCache(this.epochTransitionPath, id);
        this.store.invalidateCache(this.committingTxnsPath, id);
        this.store.invalidateCache(this.currentEpochRecordPath, id);
    }

    private CompletableFuture<String> getId() {
        String id = this.idRef.get();
        if (!Strings.isNullOrEmpty((String)id)) {
            return CompletableFuture.completedFuture(id);
        }
        return Futures.exceptionallyExpecting((CompletableFuture)this.getStreamPosition().thenApply(pos -> {
            String s = pos.toString();
            this.idRef.compareAndSet(null, s);
            return s;
        }), e -> Exceptions.unwrap((Throwable)e) instanceof StoreException.DataNotFoundException, (Object)"");
    }

    @VisibleForTesting
    String getActiveTxPath(int epoch, String txId) {
        return ZKPaths.makePath((String)ZKPaths.makePath((String)this.activeTxRoot, (String)Integer.toString(epoch)), (String)txId);
    }

    private String getEpochPath(int epoch) {
        return ZKPaths.makePath((String)this.activeTxRoot, (String)Integer.toString(epoch));
    }

    CompletableFuture<Void> createStreamPositionNodeIfAbsent(int streamPosition) {
        byte[] b = new byte[4];
        BitConverter.writeInt((byte[])b, (int)0, (int)streamPosition);
        return Futures.toVoid(this.store.createZNodeIfNotExist(this.idPath, b));
    }

    CompletableFuture<Integer> getStreamPosition() {
        return this.store.getData(this.idPath, x -> BitConverter.readInt((byte[])x, (int)0)).thenApply(VersionedMetadata::getObject);
    }

    private static <T> VersionedMetadata<T> getEmptyData() {
        return new VersionedMetadata<Object>(null, new Version.IntVersion(Integer.MIN_VALUE));
    }

    @SuppressFBWarnings(justification="generated code")
    String getCreationPath() {
        return this.creationPath;
    }

    @SuppressFBWarnings(justification="generated code")
    String getStreamPath() {
        return this.streamPath;
    }
}

