/*
 * Decompiled with CFR 0.152.
 */
package io.pravega.controller.store.stream;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Strings;
import com.google.common.collect.ImmutableMap;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import io.pravega.client.stream.StreamConfiguration;
import io.pravega.common.concurrent.Futures;
import io.pravega.common.util.BitConverter;
import io.pravega.controller.store.stream.AbstractStreamMetadataStore;
import io.pravega.controller.store.stream.CreateStreamResponse;
import io.pravega.controller.store.stream.PersistentStreamBase;
import io.pravega.controller.store.stream.PravegaTablesStoreHelper;
import io.pravega.controller.store.stream.PravegaTablesStreamMetadataStore;
import io.pravega.controller.store.stream.State;
import io.pravega.controller.store.stream.StoreException;
import io.pravega.controller.store.stream.TxnStatus;
import io.pravega.controller.store.stream.Version;
import io.pravega.controller.store.stream.VersionedMetadata;
import io.pravega.controller.store.stream.ZkOrderedStore;
import io.pravega.controller.store.stream.records.ActiveTxnRecord;
import io.pravega.controller.store.stream.records.CommittingTransactionsRecord;
import io.pravega.controller.store.stream.records.CompletedTxnRecord;
import io.pravega.controller.store.stream.records.EpochRecord;
import io.pravega.controller.store.stream.records.EpochTransitionRecord;
import io.pravega.controller.store.stream.records.HistoryTimeSeries;
import io.pravega.controller.store.stream.records.RetentionSet;
import io.pravega.controller.store.stream.records.SealedSegmentsMapShard;
import io.pravega.controller.store.stream.records.StateRecord;
import io.pravega.controller.store.stream.records.StreamConfigurationRecord;
import io.pravega.controller.store.stream.records.StreamCutRecord;
import io.pravega.controller.store.stream.records.StreamTruncationRecord;
import io.pravega.controller.store.stream.records.WriterMark;
import io.pravega.shared.segment.StreamSegmentNameUtils;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Predicate;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

class PravegaTablesStream
extends PersistentStreamBase {
    @SuppressFBWarnings(justification="generated code")
    private static final Logger log = LoggerFactory.getLogger(PravegaTablesStream.class);
    private static final String METADATA_TABLE = "metadata.#.%s";
    private static final String EPOCHS_WITH_TRANSACTIONS_TABLE = "epochsWithTransactions.#.%s";
    private static final String WRITERS_POSITIONS_TABLE = "writersPositions.#.%s";
    private static final String TRANSACTIONS_IN_EPOCH_TABLE_FORMAT = "transactionsInEpoch-%d.#.%s";
    private static final String CREATION_TIME_KEY = "creationTime";
    private static final String CONFIGURATION_KEY = "configuration";
    private static final String TRUNCATION_KEY = "truncation";
    private static final String STATE_KEY = "state";
    private static final String EPOCH_TRANSITION_KEY = "epochTransition";
    private static final String RETENTION_SET_KEY = "retention";
    private static final String RETENTION_STREAM_CUT_RECORD_KEY_FORMAT = "retentionCuts-%s";
    private static final String CURRENT_EPOCH_KEY = "currentEpochRecord";
    private static final String EPOCH_RECORD_KEY_FORMAT = "epochRecord-%d";
    private static final String HISTORY_TIMESERES_CHUNK_FORMAT = "historyTimeSeriesChunk-%d";
    private static final String SEGMENTS_SEALED_SIZE_MAP_SHARD_FORMAT = "segmentsSealedSizeMapShard-%d";
    private static final String SEGMENT_SEALED_EPOCH_KEY_FORMAT = "segmentSealedEpochPath-%d";
    private static final String COMMITTING_TRANSACTIONS_RECORD_KEY = "committingTxns";
    private static final String SEGMENT_MARKER_PATH_FORMAT = "markers-%d";
    private static final String WAITING_REQUEST_PROCESSOR_PATH = "waitingRequestProcessor";
    private static final String STREAM_KEY_PREFIX = "Key.#.%s.#.%s.#.";
    private static final String COMPLETED_TRANSACTIONS_KEY_FORMAT = "Key.#.%s.#.%s.#./%s";
    private final PravegaTablesStoreHelper storeHelper;
    private final Supplier<Integer> currentBatchSupplier;
    private final Supplier<CompletableFuture<String>> streamsInScopeTableNameSupplier;
    private final AtomicReference<String> idRef;
    private final ZkOrderedStore txnCommitOrderer;
    private final ScheduledExecutorService executor;

    @VisibleForTesting
    PravegaTablesStream(String scopeName, String streamName, PravegaTablesStoreHelper storeHelper, ZkOrderedStore txnCommitOrderer, Supplier<Integer> currentBatchSupplier, Supplier<CompletableFuture<String>> streamsInScopeTableNameSupplier, ScheduledExecutorService executor) {
        this(scopeName, streamName, storeHelper, txnCommitOrderer, currentBatchSupplier, 1000, 1000, streamsInScopeTableNameSupplier, executor);
    }

    @VisibleForTesting
    PravegaTablesStream(String scopeName, String streamName, PravegaTablesStoreHelper storeHelper, ZkOrderedStore txnCommitOrderer, Supplier<Integer> currentBatchSupplier, int chunkSize, int shardSize, Supplier<CompletableFuture<String>> streamsInScopeTableNameSupplier, ScheduledExecutorService executor) {
        super(scopeName, streamName, chunkSize, shardSize);
        this.storeHelper = storeHelper;
        this.txnCommitOrderer = txnCommitOrderer;
        this.currentBatchSupplier = currentBatchSupplier;
        this.streamsInScopeTableNameSupplier = streamsInScopeTableNameSupplier;
        this.idRef = new AtomicReference<Object>(null);
        this.executor = executor;
    }

    private CompletableFuture<String> getId() {
        String id = this.idRef.get();
        if (!Strings.isNullOrEmpty((String)id)) {
            return CompletableFuture.completedFuture(id);
        }
        return ((CompletableFuture)this.streamsInScopeTableNameSupplier.get().thenCompose(streamsInScopeTable -> this.storeHelper.getEntry((String)streamsInScopeTable, this.getName(), x -> BitConverter.readUUID((byte[])x, (int)0)))).thenComposeAsync(data -> {
            this.idRef.compareAndSet(null, ((UUID)data.getObject()).toString());
            return this.getId();
        });
    }

    private CompletableFuture<String> getMetadataTable() {
        return this.getId().thenApply(this::getMetadataTableName);
    }

    private String getMetadataTableName(String id) {
        return StreamSegmentNameUtils.getQualifiedTableName((String)"_system", (String[])new String[]{this.getScope(), this.getName(), String.format(METADATA_TABLE, id)});
    }

    private CompletableFuture<String> getEpochsWithTransactionsTable() {
        return this.getId().thenApply(this::getEpochsWithTransactionsTableName);
    }

    private String getEpochsWithTransactionsTableName(String id) {
        return StreamSegmentNameUtils.getQualifiedTableName((String)"_system", (String[])new String[]{this.getScope(), this.getName(), String.format(EPOCHS_WITH_TRANSACTIONS_TABLE, id)});
    }

    private CompletableFuture<String> getTransactionsInEpochTable(int epoch) {
        return this.getId().thenApply(id -> this.getTransactionsInEpochTableName(epoch, (String)id));
    }

    private String getTransactionsInEpochTableName(int epoch, String id) {
        return StreamSegmentNameUtils.getQualifiedTableName((String)"_system", (String[])new String[]{this.getScope(), this.getName(), String.format(TRANSACTIONS_IN_EPOCH_TABLE_FORMAT, epoch, id)});
    }

    private CompletableFuture<String> getWritersTable() {
        return this.getId().thenApply(this::getWritersTableName);
    }

    private String getWritersTableName(String id) {
        return StreamSegmentNameUtils.getQualifiedTableName((String)"_system", (String[])new String[]{this.getScope(), this.getName(), String.format(WRITERS_POSITIONS_TABLE, id)});
    }

    @Override
    public CompletableFuture<Void> completeCommittingTransactions(VersionedMetadata<CommittingTransactionsRecord> record) {
        long time = System.currentTimeMillis();
        Map<String, byte[]> completedRecords = record.getObject().getTransactionsToCommit().stream().collect(Collectors.toMap(UUID::toString, x -> new CompletedTxnRecord(time, TxnStatus.COMMITTED).toBytes()));
        CompletionStage<Object> future = record.getObject().getTransactionsToCommit().size() == 0 ? CompletableFuture.completedFuture(null) : ((CompletableFuture)((CompletableFuture)this.generateMarksForTransactions(record.getObject()).thenCompose(v -> this.createCompletedTxEntries(completedRecords))).thenCompose(x -> this.getTransactionsInEpochTable(((CommittingTransactionsRecord)record.getObject()).getEpoch()).thenCompose(table -> this.storeHelper.removeEntries((String)table, (Collection<String>)completedRecords.keySet())))).thenCompose(x -> this.tryRemoveOlderTransactionsInEpochTables(epoch -> epoch < ((CommittingTransactionsRecord)record.getObject()).getEpoch()));
        return future.thenCompose(x -> Futures.toVoid(this.updateCommittingTxnRecord(new VersionedMetadata<CommittingTransactionsRecord>(CommittingTransactionsRecord.EMPTY, record.getVersion()))));
    }

    @Override
    CompletableFuture<Void> createStreamMetadata() {
        return this.getId().thenCompose(id -> {
            String metadataTable = this.getMetadataTableName((String)id);
            String epochWithTxnTable = this.getEpochsWithTransactionsTableName((String)id);
            String writersPositionsTable = this.getWritersTableName((String)id);
            return CompletableFuture.allOf(this.storeHelper.createTable(metadataTable), this.storeHelper.createTable(epochWithTxnTable), this.storeHelper.createTable(writersPositionsTable)).thenAccept(v -> log.debug("stream {}/{} metadata tables {}, {} & {} created", new Object[]{this.getScope(), this.getName(), metadataTable, epochWithTxnTable, writersPositionsTable}));
        });
    }

    @Override
    public CompletableFuture<CreateStreamResponse> checkStreamExists(StreamConfiguration configuration, long creationTime, int startingSegmentNumber) {
        return this.storeHelper.expectingDataNotFound(this.getCreationTime(), null).thenCompose(storedCreationTime -> {
            if (storedCreationTime == null) {
                return CompletableFuture.completedFuture(new CreateStreamResponse(CreateStreamResponse.CreateStatus.NEW, configuration, creationTime, startingSegmentNumber));
            }
            return this.storeHelper.expectingDataNotFound(this.getConfiguration(), null).thenCompose(config -> {
                if (config != null) {
                    return this.handleConfigExists((long)storedCreationTime, (StreamConfiguration)config, startingSegmentNumber, storedCreationTime == creationTime);
                }
                return CompletableFuture.completedFuture(new CreateStreamResponse(CreateStreamResponse.CreateStatus.NEW, configuration, (long)storedCreationTime, startingSegmentNumber));
            });
        });
    }

    private CompletableFuture<CreateStreamResponse> handleConfigExists(long creationTime, StreamConfiguration config, int startingSegmentNumber, boolean creationTimeMatched) {
        CreateStreamResponse.CreateStatus status = creationTimeMatched ? CreateStreamResponse.CreateStatus.NEW : CreateStreamResponse.CreateStatus.EXISTS_CREATING;
        return this.storeHelper.expectingDataNotFound(this.getState(true), null).thenApply(state -> {
            if (state == null) {
                return new CreateStreamResponse(status, config, creationTime, startingSegmentNumber);
            }
            if (state.equals((Object)State.UNKNOWN) || state.equals((Object)State.CREATING)) {
                return new CreateStreamResponse(status, config, creationTime, startingSegmentNumber);
            }
            return new CreateStreamResponse(CreateStreamResponse.CreateStatus.EXISTS_ACTIVE, config, creationTime, startingSegmentNumber);
        });
    }

    @Override
    public CompletableFuture<Long> getCreationTime() {
        return ((CompletableFuture)this.getMetadataTable().thenCompose(metadataTable -> this.storeHelper.getCachedData((String)metadataTable, CREATION_TIME_KEY, data -> BitConverter.readLong((byte[])data, (int)0)))).thenApply(VersionedMetadata::getObject);
    }

    @Override
    public CompletableFuture<Void> deleteStream() {
        return this.getId().thenCompose(id -> this.storeHelper.expectingDataNotFound(this.tryRemoveOlderTransactionsInEpochTables(epoch -> true), null).thenCompose(v -> ((CompletableFuture)this.getEpochsWithTransactionsTable().thenCompose(epochWithTxnTable -> this.storeHelper.expectingDataNotFound(this.storeHelper.deleteTable((String)epochWithTxnTable, false), null))).thenCompose(deleted -> this.storeHelper.deleteTable(this.getMetadataTableName((String)id), false))));
    }

    @Override
    CompletableFuture<Void> createRetentionSetDataIfAbsent(RetentionSet data) {
        return this.getMetadataTable().thenCompose(metadataTable -> this.storeHelper.addNewEntryIfAbsent((String)metadataTable, RETENTION_SET_KEY, data.toBytes()).thenAccept(v -> this.storeHelper.invalidateCache((String)metadataTable, RETENTION_SET_KEY)));
    }

    @Override
    CompletableFuture<VersionedMetadata<RetentionSet>> getRetentionSetData() {
        return this.getMetadataTable().thenCompose(metadataTable -> this.storeHelper.getEntry((String)metadataTable, RETENTION_SET_KEY, RetentionSet::fromBytes));
    }

    @Override
    CompletableFuture<Version> updateRetentionSetData(VersionedMetadata<RetentionSet> retention) {
        return this.getMetadataTable().thenCompose(metadataTable -> this.storeHelper.updateEntry((String)metadataTable, RETENTION_SET_KEY, ((RetentionSet)retention.getObject()).toBytes(), retention.getVersion()).thenApply(v -> {
            this.storeHelper.invalidateCache((String)metadataTable, RETENTION_SET_KEY);
            return v;
        }));
    }

    @Override
    CompletableFuture<Void> createStreamCutRecordData(long recordingTime, StreamCutRecord record) {
        String key = String.format(RETENTION_STREAM_CUT_RECORD_KEY_FORMAT, recordingTime);
        return this.getMetadataTable().thenCompose(metadataTable -> this.storeHelper.addNewEntryIfAbsent((String)metadataTable, key, record.toBytes()).thenAccept(v -> this.storeHelper.invalidateCache((String)metadataTable, key)));
    }

    @Override
    CompletableFuture<VersionedMetadata<StreamCutRecord>> getStreamCutRecordData(long recordingTime) {
        String key = String.format(RETENTION_STREAM_CUT_RECORD_KEY_FORMAT, recordingTime);
        return this.getMetadataTable().thenCompose(metadataTable -> this.storeHelper.getCachedData((String)metadataTable, key, StreamCutRecord::fromBytes));
    }

    @Override
    CompletableFuture<Void> deleteStreamCutRecordData(long recordingTime) {
        String key = String.format(RETENTION_STREAM_CUT_RECORD_KEY_FORMAT, recordingTime);
        return this.getMetadataTable().thenCompose(metadataTable -> this.storeHelper.removeEntry((String)metadataTable, key).thenAccept(x -> this.storeHelper.invalidateCache((String)metadataTable, key)));
    }

    @Override
    CompletableFuture<Void> createHistoryTimeSeriesChunkDataIfAbsent(int chunkNumber, HistoryTimeSeries data) {
        String key = String.format(HISTORY_TIMESERES_CHUNK_FORMAT, chunkNumber);
        return this.getMetadataTable().thenCompose(metadataTable -> this.storeHelper.addNewEntryIfAbsent((String)metadataTable, key, data.toBytes()).thenAccept(x -> this.storeHelper.invalidateCache((String)metadataTable, key)));
    }

    @Override
    CompletableFuture<VersionedMetadata<HistoryTimeSeries>> getHistoryTimeSeriesChunkData(int chunkNumber, boolean ignoreCached) {
        String key = String.format(HISTORY_TIMESERES_CHUNK_FORMAT, chunkNumber);
        return this.getMetadataTable().thenCompose(metadataTable -> {
            if (ignoreCached) {
                return this.storeHelper.getEntry((String)metadataTable, key, HistoryTimeSeries::fromBytes);
            }
            return this.storeHelper.getCachedData((String)metadataTable, key, HistoryTimeSeries::fromBytes);
        });
    }

    @Override
    CompletableFuture<Version> updateHistoryTimeSeriesChunkData(int chunkNumber, VersionedMetadata<HistoryTimeSeries> data) {
        String key = String.format(HISTORY_TIMESERES_CHUNK_FORMAT, chunkNumber);
        return this.getMetadataTable().thenCompose(metadataTable -> this.storeHelper.updateEntry((String)metadataTable, key, ((HistoryTimeSeries)data.getObject()).toBytes(), data.getVersion()).thenApply(version -> {
            this.storeHelper.invalidateCache((String)metadataTable, key);
            return version;
        }));
    }

    @Override
    CompletableFuture<Void> createCurrentEpochRecordDataIfAbsent(EpochRecord data) {
        byte[] epochData = new byte[4];
        BitConverter.writeInt((byte[])epochData, (int)0, (int)data.getEpoch());
        return this.getMetadataTable().thenCompose(metadataTable -> this.storeHelper.addNewEntryIfAbsent((String)metadataTable, CURRENT_EPOCH_KEY, epochData).thenAccept(v -> this.storeHelper.invalidateCache((String)metadataTable, CURRENT_EPOCH_KEY)));
    }

    @Override
    CompletableFuture<Version> updateCurrentEpochRecordData(VersionedMetadata<EpochRecord> data) {
        byte[] epochData = new byte[4];
        BitConverter.writeInt((byte[])epochData, (int)0, (int)data.getObject().getEpoch());
        return this.getMetadataTable().thenCompose(metadataTable -> this.storeHelper.updateEntry((String)metadataTable, CURRENT_EPOCH_KEY, epochData, data.getVersion()).thenApply(v -> {
            this.storeHelper.invalidateCache((String)metadataTable, CURRENT_EPOCH_KEY);
            return v;
        }));
    }

    @Override
    CompletableFuture<VersionedMetadata<EpochRecord>> getCurrentEpochRecordData(boolean ignoreCached) {
        return this.getMetadataTable().thenCompose(metadataTable -> {
            CompletableFuture<VersionedMetadata<Integer>> future = ignoreCached ? this.storeHelper.getEntry((String)metadataTable, CURRENT_EPOCH_KEY, x -> BitConverter.readInt((byte[])x, (int)0)) : this.storeHelper.getCachedData((String)metadataTable, CURRENT_EPOCH_KEY, x -> BitConverter.readInt((byte[])x, (int)0));
            return future.thenCompose(versionedEpochNumber -> this.getEpochRecord((Integer)versionedEpochNumber.getObject()).thenApply(epochRecord -> new VersionedMetadata<EpochRecord>((EpochRecord)epochRecord, versionedEpochNumber.getVersion())));
        });
    }

    @Override
    CompletableFuture<Void> createEpochRecordDataIfAbsent(int epoch, EpochRecord data) {
        String key = String.format(EPOCH_RECORD_KEY_FORMAT, epoch);
        return ((CompletableFuture)this.getMetadataTable().thenCompose(metadataTable -> this.storeHelper.addNewEntryIfAbsent((String)metadataTable, key, data.toBytes()).thenAccept(v -> this.storeHelper.invalidateCache((String)metadataTable, key)))).thenCompose(v -> {
            if (data.getEpoch() == data.getReferenceEpoch()) {
                return this.createTransactionsInEpochTable(epoch);
            }
            return CompletableFuture.completedFuture(null);
        });
    }

    @Override
    CompletableFuture<VersionedMetadata<EpochRecord>> getEpochRecordData(int epoch) {
        return this.getMetadataTable().thenCompose(metadataTable -> {
            String key = String.format(EPOCH_RECORD_KEY_FORMAT, epoch);
            return this.storeHelper.getCachedData((String)metadataTable, key, EpochRecord::fromBytes);
        });
    }

    @Override
    CompletableFuture<Void> createSealedSegmentSizesMapShardDataIfAbsent(int shard, SealedSegmentsMapShard data) {
        String key = String.format(SEGMENTS_SEALED_SIZE_MAP_SHARD_FORMAT, shard);
        return this.getMetadataTable().thenCompose(metadataTable -> this.storeHelper.addNewEntryIfAbsent((String)metadataTable, key, data.toBytes()).thenAccept(v -> this.storeHelper.invalidateCache((String)metadataTable, key)));
    }

    @Override
    CompletableFuture<VersionedMetadata<SealedSegmentsMapShard>> getSealedSegmentSizesMapShardData(int shard) {
        String key = String.format(SEGMENTS_SEALED_SIZE_MAP_SHARD_FORMAT, shard);
        return this.getMetadataTable().thenCompose(metadataTable -> this.storeHelper.getEntry((String)metadataTable, key, SealedSegmentsMapShard::fromBytes));
    }

    @Override
    CompletableFuture<Version> updateSealedSegmentSizesMapShardData(int shard, VersionedMetadata<SealedSegmentsMapShard> data) {
        String key = String.format(SEGMENTS_SEALED_SIZE_MAP_SHARD_FORMAT, shard);
        return this.getMetadataTable().thenCompose(metadataTable -> this.storeHelper.updateEntry((String)metadataTable, key, ((SealedSegmentsMapShard)data.getObject()).toBytes(), data.getVersion()).thenApply(v -> {
            this.storeHelper.invalidateCache((String)metadataTable, key);
            return v;
        }));
    }

    @Override
    CompletableFuture<Void> createSegmentSealedEpochRecords(Collection<Long> segmentsToSeal, int epoch) {
        byte[] epochData = new byte[4];
        BitConverter.writeInt((byte[])epochData, (int)0, (int)epoch);
        Map<String, byte[]> map = segmentsToSeal.stream().collect(Collectors.toMap(x -> String.format(SEGMENT_SEALED_EPOCH_KEY_FORMAT, x), x -> epochData));
        return this.getMetadataTable().thenCompose(metadataTable -> this.storeHelper.addNewEntriesIfAbsent((String)metadataTable, map));
    }

    @Override
    CompletableFuture<VersionedMetadata<Integer>> getSegmentSealedRecordData(long segmentId) {
        String key = String.format(SEGMENT_SEALED_EPOCH_KEY_FORMAT, segmentId);
        return this.getMetadataTable().thenCompose(metadataTable -> this.storeHelper.getCachedData((String)metadataTable, key, x -> BitConverter.readInt((byte[])x, (int)0)));
    }

    @Override
    CompletableFuture<Void> createEpochTransitionIfAbsent(EpochTransitionRecord epochTransition) {
        return this.getMetadataTable().thenCompose(metadataTable -> this.storeHelper.addNewEntryIfAbsent((String)metadataTable, EPOCH_TRANSITION_KEY, epochTransition.toBytes()).thenAccept(v -> this.storeHelper.invalidateCache((String)metadataTable, EPOCH_TRANSITION_KEY)));
    }

    @Override
    CompletableFuture<Version> updateEpochTransitionNode(VersionedMetadata<EpochTransitionRecord> epochTransition) {
        return this.getMetadataTable().thenCompose(metadataTable -> this.storeHelper.updateEntry((String)metadataTable, EPOCH_TRANSITION_KEY, ((EpochTransitionRecord)epochTransition.getObject()).toBytes(), epochTransition.getVersion()).thenApply(v -> {
            this.storeHelper.invalidateCache((String)metadataTable, EPOCH_TRANSITION_KEY);
            return v;
        }));
    }

    @Override
    CompletableFuture<VersionedMetadata<EpochTransitionRecord>> getEpochTransitionNode() {
        return this.getMetadataTable().thenCompose(metadataTable -> this.storeHelper.getEntry((String)metadataTable, EPOCH_TRANSITION_KEY, EpochTransitionRecord::fromBytes));
    }

    @Override
    CompletableFuture<Void> storeCreationTimeIfAbsent(long creationTime) {
        byte[] b = new byte[8];
        BitConverter.writeLong((byte[])b, (int)0, (long)creationTime);
        return this.getMetadataTable().thenCompose(metadataTable -> this.storeHelper.addNewEntryIfAbsent((String)metadataTable, CREATION_TIME_KEY, b).thenAccept(v -> this.storeHelper.invalidateCache((String)metadataTable, CREATION_TIME_KEY)));
    }

    @Override
    public CompletableFuture<Void> createConfigurationIfAbsent(StreamConfigurationRecord configuration) {
        return this.getMetadataTable().thenCompose(metadataTable -> this.storeHelper.addNewEntryIfAbsent((String)metadataTable, CONFIGURATION_KEY, configuration.toBytes()).thenAccept(v -> this.storeHelper.invalidateCache((String)metadataTable, CONFIGURATION_KEY)));
    }

    @Override
    public CompletableFuture<Void> createStateIfAbsent(StateRecord state) {
        return this.getMetadataTable().thenCompose(metadataTable -> Futures.toVoid(this.storeHelper.addNewEntryIfAbsent((String)metadataTable, STATE_KEY, state.toBytes())));
    }

    @Override
    public CompletableFuture<Void> createMarkerData(long segmentId, long timestamp) {
        String key = String.format(SEGMENT_MARKER_PATH_FORMAT, segmentId);
        byte[] b = new byte[8];
        BitConverter.writeLong((byte[])b, (int)0, (long)timestamp);
        return this.getMetadataTable().thenCompose(metadataTable -> Futures.toVoid(this.storeHelper.addNewEntryIfAbsent((String)metadataTable, key, b)));
    }

    @Override
    CompletableFuture<Version> updateMarkerData(long segmentId, VersionedMetadata<Long> data) {
        String key = String.format(SEGMENT_MARKER_PATH_FORMAT, segmentId);
        byte[] marker = new byte[8];
        BitConverter.writeLong((byte[])marker, (int)0, (long)data.getObject());
        return this.getMetadataTable().thenCompose(metadataTable -> this.storeHelper.updateEntry((String)metadataTable, key, marker, data.getVersion()));
    }

    @Override
    CompletableFuture<VersionedMetadata<Long>> getMarkerData(long segmentId) {
        String key = String.format(SEGMENT_MARKER_PATH_FORMAT, segmentId);
        return this.getMetadataTable().thenCompose(metadataTable -> this.storeHelper.expectingDataNotFound(this.storeHelper.getEntry((String)metadataTable, key, x -> BitConverter.readLong((byte[])x, (int)0)), null));
    }

    @Override
    CompletableFuture<Void> removeMarkerData(long segmentId) {
        String key = String.format(SEGMENT_MARKER_PATH_FORMAT, segmentId);
        return this.getMetadataTable().thenCompose(id -> this.storeHelper.removeEntry((String)id, key));
    }

    @Override
    public CompletableFuture<Map<UUID, ActiveTxnRecord>> getActiveTxns() {
        return ((CompletableFuture)this.getEpochsWithTransactions().thenCompose(epochsWithTransactions -> Futures.allOfWithResults(epochsWithTransactions.stream().map(this::getTxnInEpoch).collect(Collectors.toList())))).thenApply(list -> {
            HashMap map = new HashMap();
            list.forEach(map::putAll);
            return map;
        });
    }

    private CompletableFuture<List<Integer>> getEpochsWithTransactions() {
        return this.getEpochsWithTransactionsTable().thenCompose(epochWithTxnTable -> {
            ArrayList epochsWithTransactions = new ArrayList();
            return this.storeHelper.getAllKeys((String)epochWithTxnTable).collectRemaining(x -> {
                epochsWithTransactions.add(Integer.parseInt(x));
                return true;
            }).thenApply(v -> epochsWithTransactions);
        });
    }

    @Override
    public CompletableFuture<Integer> getNumberOfOngoingTransactions() {
        ArrayList futures = new ArrayList();
        return this.getEpochsWithTransactionsTable().thenCompose(epochsWithTxn -> this.storeHelper.getAllKeys((String)epochsWithTxn).forEachRemaining(x -> futures.add(this.getNumberOfOngoingTransactions(Integer.parseInt(x))), (Executor)this.executor).thenCompose(v -> Futures.allOfWithResults((List)futures).thenApply(list -> list.stream().reduce(0, Integer::sum))));
    }

    private CompletableFuture<Integer> getNumberOfOngoingTransactions(int epoch) {
        AtomicInteger count = new AtomicInteger(0);
        return this.getTransactionsInEpochTable(epoch).thenCompose(epochTableName -> this.storeHelper.getAllKeys((String)epochTableName).forEachRemaining(x -> count.incrementAndGet(), (Executor)this.executor).thenApply(x -> count.get()));
    }

    @Override
    public CompletableFuture<List<Map.Entry<UUID, ActiveTxnRecord>>> getOrderedCommittingTxnInLowestEpoch() {
        return super.getOrderedCommittingTxnInLowestEpochHelper(this.txnCommitOrderer, this.executor);
    }

    @Override
    @VisibleForTesting
    CompletableFuture<Map<Long, UUID>> getAllOrderedCommittingTxns() {
        return super.getAllOrderedCommittingTxnsHelper(this.txnCommitOrderer);
    }

    @Override
    public CompletableFuture<Map<UUID, ActiveTxnRecord>> getTxnInEpoch(int epoch) {
        ConcurrentHashMap result = new ConcurrentHashMap();
        return this.getTransactionsInEpochTable(epoch).thenCompose(tableName -> this.storeHelper.expectingDataNotFound(this.storeHelper.getAllEntries((String)tableName, ActiveTxnRecord::fromBytes).collectRemaining(x -> {
            result.put(UUID.fromString((String)x.getKey()), ((VersionedMetadata)x.getValue()).getObject());
            return true;
        }).thenApply(v -> result), Collections.emptyMap()));
    }

    @Override
    public CompletableFuture<Version> createNewTransaction(int epoch, UUID txId, ActiveTxnRecord txnRecord) {
        return this.getTransactionsInEpochTable(epoch).thenCompose(epochTable -> this.storeHelper.addNewEntryIfAbsent((String)epochTable, txId.toString(), txnRecord.toBytes()));
    }

    private CompletableFuture<Void> createTransactionsInEpochTable(int epoch) {
        return ((CompletableFuture)this.getEpochsWithTransactionsTable().thenCompose(epochsWithTxnTable -> this.storeHelper.addNewEntryIfAbsent((String)epochsWithTxnTable, Integer.toString(epoch), new byte[0]))).thenCompose(epochTxnEntryCreated -> this.getTransactionsInEpochTable(epoch).thenCompose(this.storeHelper::createTable));
    }

    @Override
    CompletableFuture<VersionedMetadata<ActiveTxnRecord>> getActiveTx(int epoch, UUID txId) {
        return this.getTransactionsInEpochTable(epoch).thenCompose(epochTxnTable -> this.storeHelper.getEntry((String)epochTxnTable, txId.toString(), ActiveTxnRecord::fromBytes));
    }

    @Override
    CompletableFuture<Version> updateActiveTx(int epoch, UUID txId, VersionedMetadata<ActiveTxnRecord> data) {
        return this.getTransactionsInEpochTable(epoch).thenCompose(epochTxnTable -> this.storeHelper.updateEntry((String)epochTxnTable, txId.toString(), ((ActiveTxnRecord)data.getObject()).toBytes(), data.getVersion()));
    }

    @Override
    CompletableFuture<Long> addTxnToCommitOrder(UUID txId) {
        return this.txnCommitOrderer.addEntity(this.getScope(), this.getName(), txId.toString());
    }

    @Override
    CompletableFuture<Void> removeTxnsFromCommitOrder(List<Long> orderedPositions) {
        return this.txnCommitOrderer.removeEntities(this.getScope(), this.getName(), orderedPositions);
    }

    @Override
    CompletableFuture<Void> removeActiveTxEntry(int epoch, UUID txId) {
        return ((CompletableFuture)this.getTransactionsInEpochTable(epoch).thenCompose(epochTransactionsTableName -> this.storeHelper.removeEntry((String)epochTransactionsTableName, txId.toString()))).thenCompose(v -> this.tryRemoveOlderTransactionsInEpochTables(e -> e < epoch));
    }

    private CompletableFuture<Void> tryRemoveOlderTransactionsInEpochTables(Predicate<Integer> epochPredicate) {
        return this.getEpochsWithTransactions().thenCompose(list -> Futures.allOf((Collection)list.stream().filter(epochPredicate).map(this::tryRemoveTransactionsInEpochTable).collect(Collectors.toList())));
    }

    private CompletableFuture<Void> tryRemoveTransactionsInEpochTable(int epoch) {
        return this.getTransactionsInEpochTable(epoch).thenCompose(epochTable -> ((CompletableFuture)this.storeHelper.deleteTable((String)epochTable, true).handle((r, e) -> {
            if (e != null) {
                if (PravegaTablesStreamMetadataStore.DATA_NOT_FOUND_PREDICATE.test(e)) {
                    return true;
                }
                if (AbstractStreamMetadataStore.DATA_NOT_EMPTY_PREDICATE.test((Throwable)e)) {
                    return false;
                }
                throw new CompletionException((Throwable)e);
            }
            return true;
        })).thenCompose(deleted -> {
            if (deleted.booleanValue()) {
                return this.getEpochsWithTransactionsTable().thenCompose(table -> this.storeHelper.removeEntry((String)table, Integer.toString(epoch)));
            }
            return CompletableFuture.completedFuture(null);
        }));
    }

    @Override
    CompletableFuture<Void> createCompletedTxEntry(UUID txId, CompletedTxnRecord complete) {
        return this.createCompletedTxEntries(Collections.singletonMap(txId.toString(), complete.toBytes()));
    }

    private CompletableFuture<Void> createCompletedTxEntries(Map<String, byte[]> complete) {
        Integer batch = this.currentBatchSupplier.get();
        String tableName = PravegaTablesStream.getCompletedTransactionsBatchTableName(batch);
        Map<String, byte[]> map = complete.entrySet().stream().collect(Collectors.toMap(x -> PravegaTablesStream.getCompletedTransactionKey(this.getScope(), this.getName(), (String)x.getKey()), Map.Entry::getValue));
        return Futures.toVoid((CompletableFuture)Futures.exceptionallyComposeExpecting(this.storeHelper.addNewEntriesIfAbsent(tableName, map), (Predicate)PravegaTablesStreamMetadataStore.DATA_NOT_FOUND_PREDICATE, () -> this.tryCreateBatchTable(batch).thenCompose(v -> this.storeHelper.addNewEntriesIfAbsent(tableName, map)))).exceptionally(e -> {
            throw new CompletionException((Throwable)e);
        });
    }

    @VisibleForTesting
    static String getCompletedTransactionKey(String scope, String stream, String txnId) {
        return String.format(COMPLETED_TRANSACTIONS_KEY_FORMAT, scope, stream, txnId);
    }

    @VisibleForTesting
    static String getCompletedTransactionsBatchTableName(int batch) {
        return StreamSegmentNameUtils.getQualifiedTableName((String)"_system", (String[])new String[]{String.format("completedTransactionsBatch-%d", batch)});
    }

    private CompletableFuture<Void> tryCreateBatchTable(int batch) {
        String batchTable = PravegaTablesStream.getCompletedTransactionsBatchTableName(batch);
        return ((CompletableFuture)((CompletableFuture)this.storeHelper.createTable(PravegaTablesStreamMetadataStore.COMPLETED_TRANSACTIONS_BATCHES_TABLE).thenAccept(v -> log.debug("batches root table {} created", (Object)PravegaTablesStreamMetadataStore.COMPLETED_TRANSACTIONS_BATCHES_TABLE))).thenCompose(v -> this.storeHelper.addNewEntryIfAbsent(PravegaTablesStreamMetadataStore.COMPLETED_TRANSACTIONS_BATCHES_TABLE, Integer.toString(batch), new byte[0]))).thenCompose(v -> this.storeHelper.createTable(batchTable));
    }

    @Override
    CompletableFuture<VersionedMetadata<CompletedTxnRecord>> getCompletedTx(UUID txId) {
        ArrayList batches = new ArrayList();
        return ((CompletableFuture)this.storeHelper.getAllKeys(PravegaTablesStreamMetadataStore.COMPLETED_TRANSACTIONS_BATCHES_TABLE).collectRemaining(x -> {
            batches.add(Integer.parseInt(x));
            return true;
        }).thenCompose(v -> Futures.allOfWithResults(batches.stream().map(batch -> {
            String table = PravegaTablesStream.getCompletedTransactionsBatchTableName(batch);
            String key = PravegaTablesStream.getCompletedTransactionKey(this.getScope(), this.getName(), txId.toString());
            return this.storeHelper.expectingDataNotFound(this.storeHelper.getCachedData(table, key, CompletedTxnRecord::fromBytes), null);
        }).collect(Collectors.toList())))).thenCompose(result -> {
            Optional<VersionedMetadata> any = result.stream().filter(Objects::nonNull).findFirst();
            if (any.isPresent()) {
                return CompletableFuture.completedFuture(any.get());
            }
            throw StoreException.create(StoreException.Type.DATA_NOT_FOUND, "Completed Txn not found");
        });
    }

    @Override
    public CompletableFuture<Void> createTruncationDataIfAbsent(StreamTruncationRecord truncationRecord) {
        return this.getMetadataTable().thenCompose(metadataTable -> Futures.toVoid(this.storeHelper.addNewEntryIfAbsent((String)metadataTable, TRUNCATION_KEY, truncationRecord.toBytes())));
    }

    @Override
    CompletableFuture<Version> setTruncationData(VersionedMetadata<StreamTruncationRecord> truncationRecord) {
        return this.getMetadataTable().thenCompose(metadataTable -> this.storeHelper.updateEntry((String)metadataTable, TRUNCATION_KEY, ((StreamTruncationRecord)truncationRecord.getObject()).toBytes(), truncationRecord.getVersion()).thenApply(r -> {
            this.storeHelper.invalidateCache((String)metadataTable, TRUNCATION_KEY);
            return r;
        }));
    }

    @Override
    CompletableFuture<VersionedMetadata<StreamTruncationRecord>> getTruncationData(boolean ignoreCached) {
        return this.getMetadataTable().thenCompose(metadataTable -> {
            if (ignoreCached) {
                return this.storeHelper.getEntry((String)metadataTable, TRUNCATION_KEY, StreamTruncationRecord::fromBytes);
            }
            return this.storeHelper.getCachedData((String)metadataTable, TRUNCATION_KEY, StreamTruncationRecord::fromBytes);
        });
    }

    @Override
    CompletableFuture<Version> setConfigurationData(VersionedMetadata<StreamConfigurationRecord> configuration) {
        return this.getMetadataTable().thenCompose(metadataTable -> this.storeHelper.updateEntry((String)metadataTable, CONFIGURATION_KEY, ((StreamConfigurationRecord)configuration.getObject()).toBytes(), configuration.getVersion()).thenApply(r -> {
            this.storeHelper.invalidateCache((String)metadataTable, CONFIGURATION_KEY);
            return r;
        }));
    }

    @Override
    CompletableFuture<VersionedMetadata<StreamConfigurationRecord>> getConfigurationData(boolean ignoreCached) {
        return this.getMetadataTable().thenCompose(metadataTable -> {
            if (ignoreCached) {
                return this.storeHelper.getEntry((String)metadataTable, CONFIGURATION_KEY, StreamConfigurationRecord::fromBytes);
            }
            return this.storeHelper.getCachedData((String)metadataTable, CONFIGURATION_KEY, StreamConfigurationRecord::fromBytes);
        });
    }

    @Override
    CompletableFuture<Version> setStateData(VersionedMetadata<StateRecord> state) {
        return this.getMetadataTable().thenCompose(metadataTable -> this.storeHelper.updateEntry((String)metadataTable, STATE_KEY, ((StateRecord)state.getObject()).toBytes(), state.getVersion()).thenApply(r -> {
            this.storeHelper.invalidateCache((String)metadataTable, STATE_KEY);
            return r;
        }));
    }

    @Override
    CompletableFuture<VersionedMetadata<StateRecord>> getStateData(boolean ignoreCached) {
        return this.getMetadataTable().thenCompose(metadataTable -> {
            if (ignoreCached) {
                return this.storeHelper.getEntry((String)metadataTable, STATE_KEY, StateRecord::fromBytes);
            }
            return this.storeHelper.getCachedData((String)metadataTable, STATE_KEY, StateRecord::fromBytes);
        });
    }

    @Override
    CompletableFuture<Void> createCommitTxnRecordIfAbsent(CommittingTransactionsRecord committingTxns) {
        return this.getMetadataTable().thenCompose(metadataTable -> Futures.toVoid(this.storeHelper.addNewEntryIfAbsent((String)metadataTable, COMMITTING_TRANSACTIONS_RECORD_KEY, committingTxns.toBytes())));
    }

    @Override
    CompletableFuture<VersionedMetadata<CommittingTransactionsRecord>> getCommitTxnRecord() {
        return this.getMetadataTable().thenCompose(metadataTable -> this.storeHelper.getEntry((String)metadataTable, COMMITTING_TRANSACTIONS_RECORD_KEY, CommittingTransactionsRecord::fromBytes));
    }

    @Override
    CompletableFuture<Version> updateCommittingTxnRecord(VersionedMetadata<CommittingTransactionsRecord> update) {
        return this.getMetadataTable().thenCompose(metadataTable -> this.storeHelper.updateEntry((String)metadataTable, COMMITTING_TRANSACTIONS_RECORD_KEY, ((CommittingTransactionsRecord)update.getObject()).toBytes(), update.getVersion()));
    }

    @Override
    CompletableFuture<Void> createWaitingRequestNodeIfAbsent(String waitingRequestProcessor) {
        return this.getMetadataTable().thenCompose(metadataTable -> Futures.toVoid(this.storeHelper.addNewEntryIfAbsent((String)metadataTable, WAITING_REQUEST_PROCESSOR_PATH, waitingRequestProcessor.getBytes(StandardCharsets.UTF_8))));
    }

    @Override
    CompletableFuture<String> getWaitingRequestNode() {
        return ((CompletableFuture)this.getMetadataTable().thenCompose(metadataTable -> this.storeHelper.getEntry((String)metadataTable, WAITING_REQUEST_PROCESSOR_PATH, x -> StandardCharsets.UTF_8.decode(ByteBuffer.wrap(x)).toString()))).thenApply(VersionedMetadata::getObject);
    }

    @Override
    CompletableFuture<Void> deleteWaitingRequestNode() {
        return this.getMetadataTable().thenCompose(metadataTable -> this.storeHelper.removeEntry((String)metadataTable, WAITING_REQUEST_PROCESSOR_PATH));
    }

    @Override
    CompletableFuture<Void> createWriterMarkRecord(String writer, long timestamp, ImmutableMap<Long, Long> position) {
        WriterMark mark = new WriterMark(timestamp, position);
        return Futures.toVoid((CompletableFuture)this.getWritersTable().thenCompose(table -> this.storeHelper.addNewEntry((String)table, writer, mark.toBytes())));
    }

    @Override
    public CompletableFuture<Void> removeWriterRecord(String writer, Version version) {
        return this.getWritersTable().thenCompose(table -> this.storeHelper.removeEntry((String)table, writer, version));
    }

    @Override
    CompletableFuture<VersionedMetadata<WriterMark>> getWriterMarkRecord(String writer) {
        return this.getWritersTable().thenCompose(table -> this.storeHelper.getEntry((String)table, writer, WriterMark::fromBytes));
    }

    @Override
    CompletableFuture<Void> updateWriterMarkRecord(String writer, long timestamp, ImmutableMap<Long, Long> position, boolean isAlive, Version version) {
        WriterMark mark = new WriterMark(timestamp, position, isAlive);
        return Futures.toVoid((CompletableFuture)this.getWritersTable().thenCompose(table -> this.storeHelper.updateEntry((String)table, writer, mark.toBytes(), version)));
    }

    @Override
    public CompletableFuture<Map<String, WriterMark>> getAllWriterMarks() {
        ConcurrentHashMap result = new ConcurrentHashMap();
        return ((CompletableFuture)this.getWritersTable().thenCompose(table -> this.storeHelper.getAllEntries((String)table, WriterMark::fromBytes).collectRemaining(x -> {
            result.put(x.getKey(), ((VersionedMetadata)x.getValue()).getObject());
            return true;
        }))).thenApply(v -> result);
    }

    @Override
    public void refresh() {
        String id = this.idRef.getAndSet(null);
        if (!Strings.isNullOrEmpty((String)id)) {
            this.storeHelper.invalidateCache(this.getMetadataTableName(id), STATE_KEY);
            this.storeHelper.invalidateCache(this.getMetadataTableName(id), CONFIGURATION_KEY);
            this.storeHelper.invalidateCache(this.getMetadataTableName(id), TRUNCATION_KEY);
            this.storeHelper.invalidateCache(this.getMetadataTableName(id), EPOCH_TRANSITION_KEY);
            this.storeHelper.invalidateCache(this.getMetadataTableName(id), COMMITTING_TRANSACTIONS_RECORD_KEY);
            this.storeHelper.invalidateCache(this.getMetadataTableName(id), CURRENT_EPOCH_KEY);
        }
    }
}

