/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.common.naming;

import com.github.benmanes.caffeine.cache.AsyncLoadingCache;
import com.github.benmanes.caffeine.cache.Caffeine;
import com.google.common.base.Charsets;
import com.google.common.base.Joiner;
import com.google.common.base.Preconditions;
import com.google.common.base.Splitter;
import com.google.common.collect.BoundType;
import com.google.common.collect.Range;
import com.google.common.hash.HashFunction;
import java.io.IOException;
import java.util.Iterator;
import java.util.List;
import java.util.Optional;
import java.util.SortedSet;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import org.apache.commons.lang3.tuple.ImmutablePair;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.admin.AdminResource;
import org.apache.pulsar.common.naming.NamespaceBundle;
import org.apache.pulsar.common.naming.NamespaceBundles;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.policies.data.BundlesData;
import org.apache.pulsar.common.policies.data.LocalPolicies;
import org.apache.pulsar.common.policies.data.Policies;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.common.util.ObjectMapperFactory;
import org.apache.pulsar.metadata.api.GetResult;
import org.apache.pulsar.metadata.api.MetadataCache;
import org.apache.pulsar.metadata.api.Notification;
import org.apache.pulsar.stats.CacheMetricsCollector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class NamespaceBundleFactory {
    private static final Logger LOG = LoggerFactory.getLogger(NamespaceBundleFactory.class);
    private final HashFunction hashFunc;
    private final AsyncLoadingCache<NamespaceName, NamespaceBundles> bundlesCache;
    private final PulsarService pulsar;
    private final MetadataCache<Policies> policiesCache;

    public NamespaceBundleFactory(PulsarService pulsar, HashFunction hashFunc) {
        this.hashFunc = hashFunc;
        this.bundlesCache = Caffeine.newBuilder().recordStats().buildAsync(this::loadBundles);
        CacheMetricsCollector.CAFFEINE.addCache("bundles", this.bundlesCache);
        pulsar.getLocalMetadataStore().registerListener(this::handleMetadataStoreNotification);
        this.pulsar = pulsar;
        this.policiesCache = pulsar.getConfigurationMetadataStore().getMetadataCache(Policies.class);
    }

    private CompletableFuture<NamespaceBundles> loadBundles(NamespaceName namespace, Executor executor) {
        String path = AdminResource.joinPath("/admin/local-policies", namespace.toString());
        if (LOG.isDebugEnabled()) {
            LOG.debug("Loading cache with bundles for {}", (Object)namespace);
        }
        if (this.pulsar == null || this.pulsar.getConfigurationCache() == null) {
            return CompletableFuture.completedFuture(this.getBundles(namespace, Optional.empty()));
        }
        CompletableFuture<NamespaceBundles> future = new CompletableFuture<NamespaceBundles>();
        ((CompletableFuture)this.pulsar.getLocalMetadataStore().get(path).thenAccept(result -> {
            if (result.isPresent()) {
                try {
                    future.complete(this.readBundles(namespace, ((GetResult)result.get()).getValue(), ((GetResult)result.get()).getStat().getVersion()));
                }
                catch (IOException e) {
                    future.completeExceptionally(e);
                }
            } else {
                ((CompletableFuture)this.copyToLocalPolicies(namespace).thenAccept(b -> future.complete((NamespaceBundles)b))).exceptionally(ex -> {
                    future.completeExceptionally((Throwable)ex);
                    return null;
                });
            }
        })).exceptionally(ex -> {
            future.completeExceptionally((Throwable)ex);
            return null;
        });
        return future;
    }

    private NamespaceBundles readBundles(NamespaceName namespace, byte[] value, long version) throws IOException {
        LocalPolicies localPolicies = (LocalPolicies)ObjectMapperFactory.getThreadLocal().readValue(value, LocalPolicies.class);
        NamespaceBundles namespaceBundles = this.getBundles(namespace, Optional.of(Pair.of((Object)localPolicies, (Object)version)));
        if (LOG.isDebugEnabled()) {
            LOG.debug("[{}] Get bundles from getLocalZkCacheService: bundles: {}, version: {}", new Object[]{namespace, localPolicies.bundles.getBoundaries() != null ? localPolicies.bundles : "null", namespaceBundles.getVersion()});
        }
        return namespaceBundles;
    }

    private CompletableFuture<NamespaceBundles> copyToLocalPolicies(NamespaceName namespace) {
        return this.policiesCache.get(AdminResource.path("policies", namespace.toString())).thenCompose(optPolicies -> {
            byte[] value;
            if (!optPolicies.isPresent()) {
                return CompletableFuture.completedFuture(this.getBundles(namespace, Optional.empty()));
            }
            Policies policies = (Policies)optPolicies.get();
            LocalPolicies localPolicies = new LocalPolicies(policies.bundles, null, null);
            String localPath = AdminResource.joinPath("/admin/local-policies", namespace.toString());
            try {
                value = ObjectMapperFactory.getThreadLocal().writeValueAsBytes((Object)localPolicies);
            }
            catch (IOException e) {
                return FutureUtil.failedFuture((Throwable)e);
            }
            return this.pulsar.getLocalMetadataStore().put(localPath, value, Optional.of(-1L)).thenApply(stat -> this.getBundles(namespace, Optional.of(Pair.of((Object)localPolicies, (Object)stat.getVersion()))));
        });
    }

    private void handleMetadataStoreNotification(Notification n) {
        if (n.getPath().startsWith("/admin/local-policies")) {
            NamespaceName namespace = NamespaceName.get((String)NamespaceBundleFactory.getNamespaceFromPoliciesPath(n.getPath()));
            try {
                LOG.info("Policy updated for namespace {}, refreshing the bundle cache.", (Object)namespace);
                this.bundlesCache.synchronous().invalidate((Object)namespace);
            }
            catch (Exception e) {
                LOG.error("Failed to update the policy change for ns {}", (Object)namespace, (Object)e);
            }
        }
    }

    private boolean isOwner(NamespaceBundle nsBundle) {
        if (this.pulsar != null) {
            return this.pulsar.getNamespaceService().getOwnershipCache().getOwnedBundle(nsBundle) != null;
        }
        return false;
    }

    public void invalidateBundleCache(NamespaceName namespace) {
        this.bundlesCache.synchronous().invalidate((Object)namespace);
    }

    public CompletableFuture<NamespaceBundles> getBundlesAsync(NamespaceName nsname) {
        return this.bundlesCache.get((Object)nsname);
    }

    public NamespaceBundles getBundles(NamespaceName nsname) throws Exception {
        return (NamespaceBundles)this.bundlesCache.synchronous().get((Object)nsname);
    }

    public Optional<NamespaceBundles> getBundlesIfPresent(NamespaceName nsname) {
        return Optional.ofNullable(this.bundlesCache.synchronous().getIfPresent((Object)nsname));
    }

    public NamespaceBundle getBundle(NamespaceName nsname, Range<Long> hashRange) {
        return new NamespaceBundle(nsname, hashRange, this);
    }

    public NamespaceBundle getBundle(String namespace, String bundleRange) {
        Preconditions.checkArgument((boolean)bundleRange.contains("_"), (Object)"Invalid bundle range");
        String[] boundaries = bundleRange.split("_");
        Long lowerEndpoint = Long.decode(boundaries[0]);
        Long upperEndpoint = Long.decode(boundaries[1]);
        Range hashRange = Range.range((Comparable)lowerEndpoint, (BoundType)BoundType.CLOSED, (Comparable)upperEndpoint, (BoundType)(upperEndpoint.equals(NamespaceBundles.FULL_UPPER_BOUND) ? BoundType.CLOSED : BoundType.OPEN));
        return this.getBundle(NamespaceName.get((String)namespace), (Range<Long>)hashRange);
    }

    public NamespaceBundle getFullBundle(NamespaceName fqnn) throws Exception {
        return ((NamespaceBundles)this.bundlesCache.synchronous().get((Object)fqnn)).getFullBundle();
    }

    public CompletableFuture<NamespaceBundle> getFullBundleAsync(NamespaceName fqnn) {
        return this.bundlesCache.get((Object)fqnn).thenApply(NamespaceBundles::getFullBundle);
    }

    public long getLongHashCode(String name) {
        return this.hashFunc.hashString((CharSequence)name, Charsets.UTF_8).padToLong();
    }

    public NamespaceBundles getBundles(NamespaceName nsname, BundlesData bundleData) {
        return new NamespaceBundles(nsname, this, Optional.empty(), NamespaceBundles.getPartitions(bundleData));
    }

    private NamespaceBundles getBundles(NamespaceName nsname, Optional<Pair<LocalPolicies, Long>> localPolicies) {
        return new NamespaceBundles(nsname, this, localPolicies);
    }

    public CompletableFuture<Pair<NamespaceBundles, List<NamespaceBundle>>> splitBundles(NamespaceBundle targetBundle, int argNumBundles, Long splitBoundary) {
        Preconditions.checkArgument((boolean)this.canSplitBundle(targetBundle), (String)"%s bundle can't be split further", (Object)targetBundle);
        if (splitBoundary != null) {
            Preconditions.checkArgument((splitBoundary > targetBundle.getLowerEndpoint() && splitBoundary < targetBundle.getUpperEndpoint() ? 1 : 0) != 0, (String)"The given fixed key must between the key range of the %s bundle", (Object)targetBundle);
            argNumBundles = 2;
        }
        Preconditions.checkNotNull((Object)targetBundle, (Object)"can't split null bundle");
        Preconditions.checkNotNull((Object)targetBundle.getNamespaceObject(), (Object)"namespace must be present");
        NamespaceName nsname = targetBundle.getNamespaceObject();
        int numBundles = argNumBundles;
        return this.bundlesCache.get((Object)nsname).thenApply(sourceBundle -> {
            int lastIndex = sourceBundle.partitions.length - 1;
            long[] partitions = new long[sourceBundle.partitions.length + (numBundles - 1)];
            int pos = 0;
            int splitPartition = -1;
            Range<Long> range = targetBundle.getKeyRange();
            for (int i = 0; i < lastIndex; ++i) {
                if (sourceBundle.partitions[i] == (Long)range.lowerEndpoint() && (Long)range.upperEndpoint() == sourceBundle.partitions[i + 1]) {
                    splitPartition = i;
                    Long maxVal = sourceBundle.partitions[i + 1];
                    Long minVal = sourceBundle.partitions[i];
                    long segSize = splitBoundary == null ? (maxVal - minVal) / (long)numBundles : splitBoundary - minVal;
                    partitions[pos++] = minVal;
                    long curPartition = minVal + segSize;
                    for (int j = 0; j < numBundles - 1; ++j) {
                        partitions[pos++] = curPartition;
                        curPartition += segSize;
                    }
                    continue;
                }
                partitions[pos++] = sourceBundle.partitions[i];
            }
            partitions[pos] = sourceBundle.partitions[lastIndex];
            if (splitPartition != -1) {
                NamespaceBundles splitNsBundles = new NamespaceBundles(nsname, this, sourceBundle.getLocalPolicies(), partitions);
                List<NamespaceBundle> splitBundles = splitNsBundles.getBundles().subList(splitPartition, splitPartition + numBundles);
                return new ImmutablePair((Object)splitNsBundles, splitBundles);
            }
            return null;
        });
    }

    public boolean canSplitBundle(NamespaceBundle bundle) {
        Range<Long> range = bundle.getKeyRange();
        return (Long)range.upperEndpoint() - (Long)range.lowerEndpoint() > 1L;
    }

    public static void validateFullRange(SortedSet<String> partitions) {
        Preconditions.checkArgument((partitions.first().equals("0x00000000") && partitions.last().equals("0xffffffff") ? 1 : 0) != 0);
    }

    public static NamespaceBundleFactory createFactory(PulsarService pulsar, HashFunction hashFunc) {
        return new NamespaceBundleFactory(pulsar, hashFunc);
    }

    public static boolean isFullBundle(String bundleRange) {
        return bundleRange.equals(String.format("%s_%s", "0x00000000", "0xffffffff"));
    }

    public static String getDefaultBundleRange() {
        return String.format("%s_%s", "0x00000000", "0xffffffff");
    }

    public static String getNamespaceFromPoliciesPath(String path) {
        if (path.isEmpty()) {
            return path;
        }
        Iterable splitter = Splitter.on((String)"/").limit(6).split((CharSequence)path);
        Iterator i = splitter.iterator();
        i.next();
        i.next();
        i.next();
        return Joiner.on((String)"/").join(i);
    }
}

