/*
 * Decompiled with CFR 0.152.
 */
package io.pravega.controller.store.stream;

import com.google.common.collect.ImmutableMap;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import io.pravega.client.stream.impl.StreamImpl;
import io.pravega.common.Exceptions;
import io.pravega.common.concurrent.Futures;
import io.pravega.controller.store.client.StoreType;
import io.pravega.controller.store.stream.BucketStore;
import io.pravega.controller.store.stream.StoreException;
import io.pravega.controller.store.stream.ZKStoreHelper;
import java.io.Serializable;
import java.util.Base64;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.stream.Collectors;
import org.apache.commons.lang3.SerializationUtils;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.cache.PathChildrenCache;
import org.apache.curator.utils.ZKPaths;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ZookeeperBucketStore
implements BucketStore {
    @SuppressFBWarnings(justification="generated code")
    private static final Logger log = LoggerFactory.getLogger(ZookeeperBucketStore.class);
    private static final String ROOT_PATH = "/";
    private static final String OWNERSHIP_CHILD_PATH = "ownership";
    private final ImmutableMap<BucketStore.ServiceType, Integer> bucketCountMap;
    private final ZKStoreHelper storeHelper;

    ZookeeperBucketStore(ImmutableMap<BucketStore.ServiceType, Integer> bucketCountMap, CuratorFramework client, Executor executor) {
        this.bucketCountMap = bucketCountMap;
        this.storeHelper = new ZKStoreHelper(client, executor);
    }

    @Override
    public StoreType getStoreType() {
        return StoreType.Zookeeper;
    }

    @Override
    public int getBucketCount(BucketStore.ServiceType serviceType) {
        return (Integer)this.bucketCountMap.get((Object)serviceType);
    }

    public CompletableFuture<Void> createBucketsRoot(BucketStore.ServiceType serviceType) {
        String bucketRootPath = this.getBucketRootPath(serviceType);
        String bucketOwnershipPath = this.getBucketOwnershipPath(serviceType);
        return Futures.toVoid((CompletableFuture)this.storeHelper.createZNodeIfNotExist(bucketRootPath).thenCompose(x -> this.storeHelper.createZNodeIfNotExist(bucketOwnershipPath)));
    }

    public CompletableFuture<Void> createBucket(BucketStore.ServiceType serviceType, int bucketId) {
        String bucketPath = this.getBucketPath(serviceType, bucketId);
        return Futures.toVoid((CompletableFuture)this.createBucketsRoot(serviceType).thenCompose(x -> this.storeHelper.createZNodeIfNotExist(bucketPath)));
    }

    @Override
    public CompletableFuture<Set<String>> getStreamsForBucket(BucketStore.ServiceType serviceType, int bucket, Executor executor) {
        String bucketPath = this.getBucketPath(serviceType, bucket);
        return this.storeHelper.getChildren(bucketPath).thenApply(list -> list.stream().map(this::decodedScopedStreamName).collect(Collectors.toSet()));
    }

    @Override
    public CompletableFuture<Void> addStreamToBucketStore(BucketStore.ServiceType serviceType, String scope, String stream, Executor executor) {
        int bucketCount = (Integer)this.bucketCountMap.get((Object)serviceType);
        int bucket = BucketStore.getBucket(scope, stream, bucketCount);
        String bucketPath = this.getBucketPath(serviceType, bucket);
        String streamPath = ZKPaths.makePath((String)bucketPath, (String)this.encodedScopedStreamName(scope, stream));
        return this.storeHelper.checkExists(streamPath).thenCompose(exists -> {
            if (exists.booleanValue()) {
                return CompletableFuture.completedFuture(null);
            }
            return Futures.toVoid(this.storeHelper.createZNodeIfNotExist(streamPath));
        });
    }

    @Override
    public CompletableFuture<Void> removeStreamFromBucketStore(BucketStore.ServiceType serviceType, String scope, String stream, Executor executor) {
        int bucketCount = (Integer)this.bucketCountMap.get((Object)serviceType);
        int bucket = BucketStore.getBucket(scope, stream, bucketCount);
        String bucketPath = this.getBucketPath(serviceType, bucket);
        String streamPath = ZKPaths.makePath((String)bucketPath, (String)this.encodedScopedStreamName(scope, stream));
        return Futures.exceptionallyExpecting(this.storeHelper.deleteNode(streamPath), e -> Exceptions.unwrap((Throwable)e) instanceof StoreException.DataNotFoundException, null);
    }

    public CompletableFuture<Boolean> takeBucketOwnership(BucketStore.ServiceType serviceType, int bucketId, String processId) {
        String bucketPath = ZKPaths.makePath((String)this.getBucketOwnershipPath(serviceType), (String)("" + bucketId));
        return this.storeHelper.createEphemeralZNode(bucketPath, SerializationUtils.serialize((Serializable)((Object)processId))).thenCompose(created -> {
            if (!created.booleanValue()) {
                return this.storeHelper.getData(bucketPath, x -> x).thenApply(data -> SerializationUtils.deserialize((byte[])((byte[])data.getObject())).equals(processId));
            }
            return CompletableFuture.completedFuture(true);
        });
    }

    public PathChildrenCache getBucketPathChildrenCache(BucketStore.ServiceType serviceType, int bucketId) {
        return this.storeHelper.getPathChildrenCache(this.getBucketPath(serviceType, bucketId), true);
    }

    public PathChildrenCache getServiceOwnershipPathChildrenCache(BucketStore.ServiceType serviceType) {
        return this.storeHelper.getPathChildrenCache(this.getBucketOwnershipPath(serviceType), true);
    }

    public StreamImpl getStreamFromPath(String path) {
        String scopedStream = this.decodedScopedStreamName(ZKPaths.getNodeFromPath((String)path));
        String[] splits = scopedStream.split(ROOT_PATH);
        return new StreamImpl(splits[0], splits[1]);
    }

    private String encodedScopedStreamName(String scope, String stream) {
        String scopedStreamName = BucketStore.getScopedStreamName(scope, stream);
        return Base64.getEncoder().encodeToString(scopedStreamName.getBytes());
    }

    private String decodedScopedStreamName(String encodedScopedStreamName) {
        return new String(Base64.getDecoder().decode(encodedScopedStreamName));
    }

    private String getBucketRootPath(BucketStore.ServiceType serviceType) {
        return ZKPaths.makePath((String)ROOT_PATH, (String)serviceType.getName());
    }

    private String getBucketOwnershipPath(BucketStore.ServiceType serviceType) {
        String bucketRootPath = this.getBucketRootPath(serviceType);
        return ZKPaths.makePath((String)bucketRootPath, (String)OWNERSHIP_CHILD_PATH);
    }

    private String getBucketPath(BucketStore.ServiceType serviceType, int bucket) {
        String bucketRootPath = ZKPaths.makePath((String)ROOT_PATH, (String)serviceType.getName());
        return ZKPaths.makePath((String)bucketRootPath, (String)("" + bucket));
    }

    @SuppressFBWarnings(justification="generated code")
    public ZKStoreHelper getStoreHelper() {
        return this.storeHelper;
    }
}

