/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.metadata.coordination.impl;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
import org.apache.pulsar.metadata.api.MetadataCache;
import org.apache.pulsar.metadata.api.MetadataSerde;
import org.apache.pulsar.metadata.api.MetadataStoreException;
import org.apache.pulsar.metadata.api.Notification;
import org.apache.pulsar.metadata.api.NotificationType;
import org.apache.pulsar.metadata.api.coordination.LockManager;
import org.apache.pulsar.metadata.api.coordination.ResourceLock;
import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;
import org.apache.pulsar.metadata.api.extended.SessionEvent;
import org.apache.pulsar.metadata.cache.impl.JSONMetadataSerdeSimpleType;
import org.apache.pulsar.metadata.coordination.impl.ResourceLockImpl;
import org.apache.pulsar.shade.com.fasterxml.jackson.databind.type.TypeFactory;
import org.apache.pulsar.shade.org.apache.bookkeeper.common.concurrent.FutureUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

class LockManagerImpl<T>
implements LockManager<T> {
    private static final Logger log = LoggerFactory.getLogger(LockManagerImpl.class);
    private final Map<String, ResourceLockImpl<T>> locks = new ConcurrentHashMap<String, ResourceLockImpl<T>>();
    private final MetadataStoreExtended store;
    private final MetadataCache<T> cache;
    private final MetadataSerde<T> serde;
    private State state = State.Ready;

    LockManagerImpl(MetadataStoreExtended store, Class<T> clazz) {
        this.store = store;
        this.cache = store.getMetadataCache(clazz);
        this.serde = new JSONMetadataSerdeSimpleType(TypeFactory.defaultInstance().constructSimpleType(clazz, null));
        store.registerSessionListener(this::handleSessionEvent);
        store.registerListener(this::handleDataNotification);
    }

    @Override
    public CompletableFuture<Optional<T>> readLock(String path) {
        return this.cache.get(path);
    }

    @Override
    public CompletableFuture<ResourceLock<T>> acquireLock(String path, T value) {
        ResourceLockImpl<T> lock = new ResourceLockImpl<T>(this.store, this.serde, path, value);
        CompletableFuture result = new CompletableFuture();
        ((CompletableFuture)lock.acquire().thenRun(() -> {
            LockManagerImpl lockManagerImpl = this;
            synchronized (lockManagerImpl) {
                if (this.state == State.Ready) {
                    this.locks.put(path, lock);
                    lock.getLockExpiredFuture().thenRun(() -> {
                        log.info("Released resource lock on {}", (Object)path);
                        LockManagerImpl lockManagerImpl = this;
                        synchronized (lockManagerImpl) {
                            this.locks.remove(path, lock);
                        }
                    });
                } else {
                    lock.release();
                }
            }
            result.complete(lock);
        })).exceptionally(ex -> {
            if (ex.getCause() instanceof MetadataStoreException.BadVersionException) {
                result.completeExceptionally(new MetadataStoreException.LockBusyException("Resource at " + path + " is already locked"));
            } else {
                result.completeExceptionally(ex.getCause());
            }
            return null;
        });
        return result;
    }

    private void handleSessionEvent(SessionEvent se) {
        if (se == SessionEvent.SessionReestablished) {
            log.info("Metadata store session has been re-established. Revalidating all the existing locks.");
            this.locks.values().forEach(ResourceLockImpl::revalidate);
        } else if (se == SessionEvent.Reconnected) {
            log.info("Metadata store connection has been re-established. Revalidating locks that were pending.");
            this.locks.values().forEach(ResourceLockImpl::revalidateIfNeededAfterReconnection);
        }
    }

    private void handleDataNotification(Notification n) {
        ResourceLockImpl<T> lock;
        if (n.getType() == NotificationType.Deleted && (lock = this.locks.get(n.getPath())) != null) {
            lock.lockWasInvalidated();
        }
    }

    @Override
    public CompletableFuture<List<String>> listLocks(String path) {
        return this.cache.getChildren(path);
    }

    @Override
    public void close() throws Exception {
        try {
            this.asyncClose().join();
        }
        catch (CompletionException e) {
            throw MetadataStoreException.unwrap(e);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public CompletableFuture<Void> asyncClose() {
        HashMap<String, ResourceLockImpl<T>> locks;
        LockManagerImpl lockManagerImpl = this;
        synchronized (lockManagerImpl) {
            if (this.state != State.Ready) {
                return CompletableFuture.completedFuture(null);
            }
            locks = new HashMap<String, ResourceLockImpl<T>>(this.locks);
            this.state = State.Closed;
        }
        return FutureUtils.collect(locks.values().stream().map(ResourceLock::release).collect(Collectors.toList())).thenApply(x -> null);
    }

    private static enum State {
        Ready,
        Closed;

    }
}

