/*
 * Decompiled with CFR 0.152.
 */
package org.graylog2.indexer.datanode;

import com.github.rholder.retry.Attempt;
import com.github.rholder.retry.RetryException;
import com.github.rholder.retry.RetryListener;
import com.github.rholder.retry.RetryerBuilder;
import com.github.rholder.retry.StopStrategies;
import com.github.rholder.retry.WaitStrategies;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import jakarta.annotation.Nonnull;
import jakarta.inject.Inject;
import java.util.Collections;
import java.util.HashSet;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import org.graylog2.cluster.lock.Lock;
import org.graylog2.cluster.lock.LockService;
import org.graylog2.indexer.IndexSet;
import org.graylog2.indexer.datanode.DatanodeMigrationLockException;
import org.graylog2.indexer.datanode.DatanodeMigrationLockService;
import org.graylog2.indexer.datanode.DatanodeMigrationLockWaitConfig;
import org.graylog2.plugin.Tools;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class DatanodeMigrationLockServiceImpl
implements DatanodeMigrationLockService {
    private static final Logger LOG = LoggerFactory.getLogger(DatanodeMigrationLockServiceImpl.class);
    public static final int LOCK_EXTEND_PERIOD_SECONDS = 10;
    private static final String LOCK_RESOURCE_PREFIX = "remote-reindex-migration_";
    private final LockService lockService;
    private final Set<Lock> activeLocks = Collections.synchronizedSet(new HashSet());

    @Inject
    public DatanodeMigrationLockServiceImpl(LockService lockService) {
        this.lockService = lockService;
        this.startLocksExtendingThread(lockService);
    }

    private void startLocksExtendingThread(LockService lockService) {
        ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("migration-locks-service-backend-%d").setDaemon(true).setUncaughtExceptionHandler((Thread.UncaughtExceptionHandler)new Tools.LogUncaughtExceptionHandler(LOG)).build();
        ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor(threadFactory);
        executorService.scheduleAtFixedRate(() -> this.extendActiveLocks(lockService), 10L, 10L, TimeUnit.SECONDS);
    }

    private synchronized void extendActiveLocks(LockService lockService) {
        long extendedLocks = this.activeLocks.stream().map(lockService::extendLock).filter(Optional::isPresent).count();
        if (extendedLocks > 0L) {
            LOG.trace("Extended TTL of {} datanode migration locks", (Object)extendedLocks);
        }
    }

    @Override
    public Lock acquireLock(IndexSet indexSet, Class<?> caller, String context, DatanodeMigrationLockWaitConfig config) {
        String indexSetID = indexSet.getConfig().id();
        String resource = LOCK_RESOURCE_PREFIX + indexSetID;
        return this.waitForLock(resource, caller, context, indexSet, config);
    }

    @Override
    public void tryRun(IndexSet indexSet, Class<?> caller, Runnable runnable) {
        this.tryLock(indexSet, caller).ifPresentOrElse(lock -> {
            try {
                runnable.run();
            }
            finally {
                this.release((Lock)lock);
            }
        }, () -> LOG.info("Couldn't enquire a lock of index set {}({}) for {}, skipping execution", new Object[]{indexSet.getConfig().title(), indexSet.getConfig().id(), caller.getName()}));
    }

    @Override
    public synchronized void release(Lock lock) {
        this.activeLocks.remove(lock);
        this.lockService.unlock(lock);
    }

    private Optional<Lock> tryLock(IndexSet indexSet, Class<?> caller) {
        String resource = LOCK_RESOURCE_PREFIX + indexSet.getConfig().id();
        return this.doLock(resource, caller, null);
    }

    private synchronized Optional<Lock> doLock(String resource, Class<?> caller, String context) {
        Optional<Lock> lock = this.lockService.lock(resource, caller.getName() + ":" + context);
        lock.ifPresent(this.activeLocks::add);
        return lock;
    }

    private Lock waitForLock(String resource, Class<?> caller, String context, IndexSet indexSet, DatanodeMigrationLockWaitConfig waitConfig) {
        try {
            return (Lock)((Optional)RetryerBuilder.newBuilder().withRetryListener(DatanodeMigrationLockServiceImpl.loggingRetryListener(caller, indexSet, waitConfig)).withStopStrategy(StopStrategies.stopAfterDelay((long)waitConfig.lockAcquireTimeout().getSeconds(), (TimeUnit)TimeUnit.SECONDS)).withWaitStrategy(WaitStrategies.fixedWait((long)waitConfig.delayBetweenAttempts().toMillis(), (TimeUnit)TimeUnit.MILLISECONDS)).retryIfResult(Optional::isEmpty).build().call(() -> this.doLock(resource, caller, context))).orElseThrow(() -> new DatanodeMigrationLockException("Failed to obtain index set " + indexSet.getConfig().title() + " lock"));
        }
        catch (RetryException | ExecutionException e) {
            throw new DatanodeMigrationLockException("Failed to obtain index set " + indexSet.getConfig().title() + " lock", e);
        }
    }

    @Nonnull
    private static RetryListener loggingRetryListener(final Class<?> caller, final IndexSet indexSet, final DatanodeMigrationLockWaitConfig waitConfig) {
        return new RetryListener(){

            public <V> void onRetry(Attempt<V> attempt) {
                waitConfig.lockAcquireListerer().onRetry(indexSet, caller, attempt.getAttemptNumber());
            }
        };
    }
}

