/*
 * Decompiled with CFR 0.152.
 */
package io.pravega.controller.store.stream;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import io.pravega.client.stream.StreamConfiguration;
import io.pravega.common.Exceptions;
import io.pravega.common.concurrent.Futures;
import io.pravega.common.lang.Int96;
import io.pravega.common.util.BitConverter;
import io.pravega.controller.server.SegmentHelper;
import io.pravega.controller.server.rpc.auth.GrpcAuthHelper;
import io.pravega.controller.store.index.ZKHostIndex;
import io.pravega.controller.store.stream.AbstractStreamMetadataStore;
import io.pravega.controller.store.stream.CreateStreamResponse;
import io.pravega.controller.store.stream.OperationContext;
import io.pravega.controller.store.stream.PravegaTablesScope;
import io.pravega.controller.store.stream.PravegaTablesStoreHelper;
import io.pravega.controller.store.stream.PravegaTablesStream;
import io.pravega.controller.store.stream.StoreException;
import io.pravega.controller.store.stream.Version;
import io.pravega.controller.store.stream.ZKGarbageCollector;
import io.pravega.controller.store.stream.ZKStoreHelper;
import io.pravega.controller.store.stream.ZkInt96Counter;
import io.pravega.controller.store.stream.ZkOrderedStore;
import io.pravega.controller.util.Config;
import io.pravega.shared.segment.StreamSegmentNameUtils;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import org.apache.curator.framework.CuratorFramework;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class PravegaTablesStreamMetadataStore
extends AbstractStreamMetadataStore {
    @SuppressFBWarnings(justification="generated code")
    private static final Logger log = LoggerFactory.getLogger(PravegaTablesStreamMetadataStore.class);
    static final String SEPARATOR = ".#.";
    static final String SCOPES_TABLE = StreamSegmentNameUtils.getQualifiedTableName((String)"_system", (String[])new String[]{"scopes"});
    static final String DELETED_STREAMS_TABLE = StreamSegmentNameUtils.getQualifiedTableName((String)"_system", (String[])new String[]{"deletedStreams"});
    static final String COMPLETED_TRANSACTIONS_BATCHES_TABLE = StreamSegmentNameUtils.getQualifiedTableName((String)"_system", (String[])new String[]{"completedTransactionsBatches"});
    static final String COMPLETED_TRANSACTIONS_BATCH_TABLE_FORMAT = "completedTransactionsBatch-%d";
    private static final String COMPLETED_TXN_GC_NAME = "completedTxnGC";
    private final ZkInt96Counter counter;
    private final AtomicReference<ZKGarbageCollector> completedTxnGCRef;
    private final ZKGarbageCollector completedTxnGC;
    @VisibleForTesting
    private final PravegaTablesStoreHelper storeHelper;
    private final ZkOrderedStore orderer;
    private final ScheduledExecutorService executor;

    @VisibleForTesting
    PravegaTablesStreamMetadataStore(SegmentHelper segmentHelper, CuratorFramework client, ScheduledExecutorService executor, GrpcAuthHelper authHelper) {
        this(segmentHelper, client, executor, Duration.ofHours(Config.COMPLETED_TRANSACTION_TTL_IN_HOURS), authHelper);
    }

    @VisibleForTesting
    PravegaTablesStreamMetadataStore(SegmentHelper segmentHelper, CuratorFramework curatorClient, ScheduledExecutorService executor, Duration gcPeriod, GrpcAuthHelper authHelper) {
        super(new ZKHostIndex(curatorClient, "/hostTxnIndex", executor), new ZKHostIndex(curatorClient, "/hostRequestIndex", executor));
        ZKStoreHelper zkStoreHelper = new ZKStoreHelper(curatorClient, executor);
        this.orderer = new ZkOrderedStore("txnCommitOrderer", zkStoreHelper, executor);
        this.completedTxnGC = new ZKGarbageCollector(COMPLETED_TXN_GC_NAME, zkStoreHelper, this::gcCompletedTxn, gcPeriod);
        this.completedTxnGC.startAsync();
        this.completedTxnGC.awaitRunning();
        this.completedTxnGCRef = new AtomicReference<ZKGarbageCollector>(this.completedTxnGC);
        this.counter = new ZkInt96Counter(zkStoreHelper);
        this.storeHelper = new PravegaTablesStoreHelper(segmentHelper, authHelper, executor);
        this.executor = executor;
    }

    @VisibleForTesting
    CompletableFuture<Void> gcCompletedTxn() {
        ArrayList batches = new ArrayList();
        return this.withCompletion(this.storeHelper.expectingDataNotFound(((CompletableFuture)this.storeHelper.getAllKeys(COMPLETED_TRANSACTIONS_BATCHES_TABLE).collectRemaining(batches::add).thenApply(v -> this.findStaleBatches(batches))).thenCompose(toDeleteList -> {
            log.debug("deleting batches {} on new scheme", toDeleteList);
            return Futures.allOf((Collection)toDeleteList.stream().map(toDelete -> {
                String table = StreamSegmentNameUtils.getQualifiedTableName((String)"_system", (String[])new String[]{String.format(COMPLETED_TRANSACTIONS_BATCH_TABLE_FORMAT, Long.parseLong(toDelete))});
                return this.storeHelper.deleteTable(table, false);
            }).collect(Collectors.toList())).thenCompose(v -> this.storeHelper.removeEntries(COMPLETED_TRANSACTIONS_BATCHES_TABLE, (Collection<String>)toDeleteList));
        }), null), this.executor);
    }

    @VisibleForTesting
    List<String> findStaleBatches(List<String> batches) {
        if (batches.size() > 2) {
            int biggestIndex = Integer.MIN_VALUE;
            int secondIndex = Integer.MIN_VALUE;
            long biggest = Long.MIN_VALUE;
            long second = Long.MIN_VALUE;
            for (int i = 0; i < batches.size(); ++i) {
                long element = Long.parseLong(batches.get(i));
                if (element > biggest) {
                    secondIndex = biggestIndex;
                    second = biggest;
                    biggest = element;
                    biggestIndex = i;
                    continue;
                }
                if (element <= second) continue;
                secondIndex = i;
                second = element;
            }
            ArrayList<String> list = new ArrayList<String>(batches);
            list.remove(biggestIndex);
            if (biggestIndex < secondIndex) {
                list.remove(secondIndex - 1);
            } else {
                list.remove(secondIndex);
            }
            return list;
        }
        return new ArrayList<String>();
    }

    @VisibleForTesting
    void setCompletedTxnGCRef(ZKGarbageCollector garbageCollector) {
        this.completedTxnGCRef.set(garbageCollector);
    }

    @Override
    PravegaTablesStream newStream(String scope, String name) {
        return new PravegaTablesStream(scope, name, this.storeHelper, this.orderer, this.completedTxnGCRef.get()::getLatestBatch, () -> ((PravegaTablesScope)this.getScope(scope)).getStreamsInScopeTableName(), this.executor);
    }

    @Override
    CompletableFuture<Int96> getNextCounter() {
        return this.withCompletion(this.counter.getNextCounter(), this.executor);
    }

    @Override
    CompletableFuture<Boolean> checkScopeExists(String scope) {
        return this.withCompletion(this.storeHelper.expectingDataNotFound(this.storeHelper.getEntry(SCOPES_TABLE, scope, x -> x).thenApply(v -> true), false), this.executor);
    }

    @Override
    public CompletableFuture<CreateStreamResponse> createStream(String scope, String name, StreamConfiguration configuration, long createTimestamp, OperationContext context, Executor executor) {
        return this.withCompletion(((PravegaTablesScope)this.getScope(scope)).addStreamToScope(name).thenCompose(id -> super.createStream(scope, name, configuration, createTimestamp, context, executor)), executor);
    }

    @Override
    public CompletableFuture<Void> deleteStream(String scope, String name, OperationContext context, Executor executor) {
        return this.withCompletion(super.deleteStream(scope, name, context, executor).thenCompose(status -> ((PravegaTablesScope)this.getScope(scope)).removeStreamFromScope(name).thenApply(v -> status)), executor);
    }

    @Override
    Version getEmptyVersion() {
        return Version.LongVersion.EMPTY;
    }

    @Override
    Version parseVersionData(byte[] data) {
        return Version.IntVersion.fromBytes(data);
    }

    @Override
    PravegaTablesScope newScope(String scopeName) {
        return new PravegaTablesScope(scopeName, this.storeHelper);
    }

    @Override
    public CompletableFuture<String> getScopeConfiguration(String scopeName) {
        return this.withCompletion(this.storeHelper.getEntry(SCOPES_TABLE, scopeName, x -> x).thenApply(x -> scopeName), this.executor);
    }

    @Override
    public CompletableFuture<List<String>> listScopes() {
        ArrayList scopes = new ArrayList();
        return this.withCompletion(Futures.exceptionallyComposeExpecting((CompletableFuture)this.storeHelper.getAllKeys(SCOPES_TABLE).collectRemaining(scopes::add).thenApply(v -> scopes), (Predicate)DATA_NOT_FOUND_PREDICATE, () -> this.storeHelper.createTable(SCOPES_TABLE).thenApply(v -> Collections.emptyList())), this.executor);
    }

    @Override
    public CompletableFuture<Boolean> checkStreamExists(String scopeName, String streamName) {
        return this.withCompletion(((PravegaTablesScope)this.getScope(scopeName)).checkStreamExistsInScope(streamName), this.executor);
    }

    @Override
    public CompletableFuture<Integer> getSafeStartingSegmentNumberFor(String scopeName, String streamName) {
        return this.withCompletion(this.storeHelper.getEntry(DELETED_STREAMS_TABLE, this.getScopedStreamName(scopeName, streamName), x -> BitConverter.readInt((byte[])x, (int)0)).handle((data, ex) -> {
            if (ex == null) {
                return (Integer)data.getObject() + 1;
            }
            if (Exceptions.unwrap((Throwable)ex) instanceof StoreException.DataNotFoundException) {
                return 0;
            }
            log.error("Problem found while getting a safe starting segment number for {}.", (Object)this.getScopedStreamName(scopeName, streamName), ex);
            throw new CompletionException((Throwable)ex);
        }), this.executor);
    }

    @Override
    CompletableFuture<Void> recordLastStreamSegment(String scope, String stream, int lastActiveSegment, OperationContext context, Executor executor) {
        String key = this.getScopedStreamName(scope, stream);
        byte[] maxSegmentNumberBytes = new byte[4];
        BitConverter.writeInt((byte[])maxSegmentNumberBytes, (int)0, (int)lastActiveSegment);
        return this.withCompletion(this.storeHelper.createTable(DELETED_STREAMS_TABLE).thenCompose(created -> this.storeHelper.expectingDataNotFound(this.storeHelper.getEntry(DELETED_STREAMS_TABLE, key, x -> BitConverter.readInt((byte[])x, (int)0)), null).thenCompose(existing -> {
            log.debug("Recording last segment {} for stream {}/{} on deletion.", new Object[]{lastActiveSegment, scope, stream});
            if (existing != null) {
                int oldLastActiveSegment = (Integer)existing.getObject();
                Preconditions.checkArgument((lastActiveSegment >= oldLastActiveSegment ? 1 : 0) != 0, (String)"Old last active segment ({}) for {}/{} is higher than current one {}.", (Object)oldLastActiveSegment, (Object)scope, (Object)stream, (Object)lastActiveSegment);
                return Futures.toVoid(this.storeHelper.updateEntry(DELETED_STREAMS_TABLE, key, maxSegmentNumberBytes, existing.getVersion()));
            }
            return Futures.toVoid(this.storeHelper.addNewEntryIfAbsent(DELETED_STREAMS_TABLE, key, maxSegmentNumberBytes));
        })), executor);
    }

    @Override
    public void close() {
        this.completedTxnGC.stopAsync();
        this.completedTxnGC.awaitTerminated();
    }

    @SuppressFBWarnings(justification="generated code")
    PravegaTablesStoreHelper getStoreHelper() {
        return this.storeHelper;
    }
}

