/*
 * Decompiled with CFR 0.152.
 */
package io.pravega.controller.store.stream;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.AbstractService;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import io.pravega.common.Exceptions;
import io.pravega.common.concurrent.Futures;
import io.pravega.controller.store.stream.StoreException;
import io.pravega.controller.store.stream.Version;
import io.pravega.controller.store.stream.ZKStoreHelper;
import io.pravega.controller.util.RetryHelper;
import java.io.IOException;
import java.time.Duration;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;
import org.apache.curator.framework.recipes.cache.NodeCache;
import org.apache.curator.framework.recipes.cache.NodeCacheListener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

class ZKGarbageCollector
extends AbstractService
implements AutoCloseable {
    @SuppressFBWarnings(justification="generated code")
    private static final Logger log = LoggerFactory.getLogger(ZKGarbageCollector.class);
    private static final String GC_ROOT = "/garbagecollection/%s";
    private static final String GUARD_PATH = "/garbagecollection/%s/guard";
    private final ZKStoreHelper zkStoreHelper;
    private final AtomicReference<CompletableFuture<Void>> gcLoop;
    private final CompletableFuture<Void> latch = new CompletableFuture();
    private final Supplier<CompletableFuture<Void>> gcProcessingSupplier;
    private final String gcName;
    private final String guardPath;
    private final AtomicReference<NodeCache> watch;
    private final ScheduledExecutorService gcExecutor;
    private final long periodInMillis;
    private final AtomicInteger currentBatch;
    private final AtomicInteger latestVersion;

    ZKGarbageCollector(String gcName, ZKStoreHelper zkStoreHelper, Supplier<CompletableFuture<Void>> gcProcessingSupplier, Duration gcPeriod) {
        Preconditions.checkNotNull((Object)zkStoreHelper);
        Preconditions.checkNotNull(gcProcessingSupplier);
        Preconditions.checkArgument((gcPeriod != null && !gcPeriod.isNegative() ? 1 : 0) != 0);
        this.gcName = gcName;
        this.guardPath = String.format(GUARD_PATH, gcName);
        this.watch = new AtomicReference();
        this.zkStoreHelper = zkStoreHelper;
        this.gcProcessingSupplier = gcProcessingSupplier;
        this.periodInMillis = gcPeriod.toMillis();
        this.gcExecutor = Executors.newSingleThreadScheduledExecutor();
        this.gcLoop = new AtomicReference();
        this.currentBatch = new AtomicInteger(0);
        this.latestVersion = new AtomicInteger(0);
    }

    protected void doStart() {
        RetryHelper.withRetriesAsync(() -> ((CompletableFuture)this.zkStoreHelper.createZNodeIfNotExist(this.guardPath).thenCompose(v -> this.fetchVersion().thenAccept(r -> this.currentBatch.set(this.latestVersion.get())))).thenAccept(v -> this.watch.compareAndSet(null, this.registerWatch(this.guardPath))), RetryHelper.RETRYABLE_PREDICATE, 5, this.gcExecutor).whenComplete((r, e) -> {
            if (e == null) {
                this.notifyStarted();
                this.gcLoop.set(Futures.loop(() -> ((ZKGarbageCollector)this).isRunning(), () -> Futures.delayedFuture(this::process, (long)this.periodInMillis, (ScheduledExecutorService)this.gcExecutor), (Executor)this.gcExecutor));
            } else {
                this.notifyFailed((Throwable)e);
            }
            this.latch.complete(null);
        });
    }

    protected void doStop() {
        this.latch.thenAccept(v -> {
            CompletableFuture<Void> gcLoopFuture = this.gcLoop.updateAndGet(x -> {
                if (x != null) {
                    x.cancel(true);
                    x.whenComplete((r, e) -> {
                        if (e != null && !(Exceptions.unwrap((Throwable)e) instanceof CancellationException)) {
                            log.error("Exception while trying to stop GC {}", (Object)this.gcName, e);
                            this.notifyFailed((Throwable)e);
                        } else {
                            this.notifyStopped();
                        }
                    });
                }
                return x;
            });
            if (gcLoopFuture == null) {
                this.notifyStopped();
            }
        });
    }

    int getLatestBatch() {
        return this.currentBatch.get();
    }

    @VisibleForTesting
    CompletableFuture<Void> process() {
        return ((CompletableFuture)((CompletableFuture)this.zkStoreHelper.setData(this.guardPath, new byte[0], new Version.IntVersion(this.latestVersion.get())).thenComposeAsync(r -> {
            log.info("Acquired guard, starting GC iteration for {}", (Object)this.gcName);
            return this.gcProcessingSupplier.get();
        }, (Executor)this.gcExecutor)).exceptionally(e -> {
            Throwable unwrap = Exceptions.unwrap((Throwable)e);
            if (unwrap instanceof StoreException.WriteConflictException) {
                log.debug("Unable to acquire guard. Will try in next cycle.");
            } else if (unwrap instanceof StoreException.StoreConnectionException) {
                log.info("StoreConnectionException thrown during Garbage Collection iteration for {}.", (Object)this.gcName);
            } else {
                log.warn("Exception thrown during Garbage Collection iteration for {}. Log and ignore.", (Object)this.gcName, (Object)unwrap);
            }
            return null;
        })).thenCompose(v -> this.fetchVersion());
    }

    @VisibleForTesting
    CompletableFuture<Void> fetchVersion() {
        return this.zkStoreHelper.getData(this.guardPath, x -> x).thenAccept(data -> this.latestVersion.set(data.getVersion().asIntVersion().getIntValue()));
    }

    @VisibleForTesting
    void setVersion(int newVersion) {
        this.latestVersion.set(newVersion);
    }

    @VisibleForTesting
    int getVersion() {
        return this.latestVersion.get();
    }

    private NodeCache registerWatch(String watchPath) {
        NodeCache nodeCache = new NodeCache(this.zkStoreHelper.getClient(), watchPath);
        NodeCacheListener watchListener = () -> {
            this.currentBatch.set(nodeCache.getCurrentData().getStat().getVersion());
            log.debug("Current batch for {} changed to {}", (Object)this.gcName, (Object)this.currentBatch.get());
        };
        nodeCache.getListenable().addListener((Object)watchListener);
        nodeCache.start();
        return nodeCache;
    }

    @Override
    public void close() {
        this.watch.getAndUpdate(x -> {
            if (x != null) {
                try {
                    x.close();
                }
                catch (IOException e) {
                    throw Exceptions.sneakyThrow((Throwable)e);
                }
            }
            return x;
        });
        this.gcExecutor.shutdown();
    }
}

