/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.client.transaction.lock.models;

import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import javax.annotation.concurrent.GuardedBy;
import javax.annotation.concurrent.ThreadSafe;
import org.apache.hudi.client.transaction.lock.models.HeartbeatManager;
import org.apache.hudi.common.util.VisibleForTesting;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@ThreadSafe
public class LockProviderHeartbeatManager
implements HeartbeatManager {
    public static long DEFAULT_STOP_HEARTBEAT_TIMEOUT_MS = 15000L;
    @GuardedBy(value="this")
    private final ScheduledExecutorService scheduler;
    private final String ownerId;
    private final Logger logger;
    private final long heartbeatTimeMs;
    private final Supplier<Boolean> heartbeatFuncToExec;
    private final long stopHeartbeatTimeoutMs;
    @GuardedBy(value="this")
    private ScheduledFuture<?> scheduledFuture;
    private final Semaphore heartbeatSemaphore;
    private static final Logger DEFAULT_LOGGER = LoggerFactory.getLogger(LockProviderHeartbeatManager.class);

    public LockProviderHeartbeatManager(String ownerId, long heartbeatTimeMs, Supplier<Boolean> heartbeatFuncToExec) {
        this(ownerId, LockProviderHeartbeatManager.createThreadScheduler(ownerId != null && ownerId.length() >= 6 ? ownerId.substring(0, 6) : ""), heartbeatTimeMs, DEFAULT_STOP_HEARTBEAT_TIMEOUT_MS, heartbeatFuncToExec, new Semaphore(1), DEFAULT_LOGGER);
    }

    @VisibleForTesting
    LockProviderHeartbeatManager(String ownerId, ScheduledExecutorService scheduler, long heartbeatTimeMs, long stopHeartbeatTimeoutMs, Supplier<Boolean> heartbeatFuncToExec, Semaphore heartbeatSemaphore, Logger testLogger) {
        this.ownerId = ownerId;
        this.heartbeatTimeMs = heartbeatTimeMs;
        this.heartbeatFuncToExec = heartbeatFuncToExec;
        this.logger = testLogger;
        this.scheduler = scheduler;
        this.heartbeatSemaphore = heartbeatSemaphore;
        this.stopHeartbeatTimeoutMs = stopHeartbeatTimeoutMs;
    }

    private static ScheduledExecutorService createThreadScheduler(String shortUuid) {
        return Executors.newSingleThreadScheduledExecutor(r -> new Thread(r, "LockProvider-HeartbeatManager-Thread-" + shortUuid));
    }

    @Override
    public synchronized boolean startHeartbeatForThread(Thread threadToMonitor) {
        if (threadToMonitor == null) {
            throw new IllegalArgumentException("threadToMonitor cannot be null.");
        }
        if (this.hasActiveHeartbeat()) {
            this.logger.warn("Owner {}: Heartbeat is already running.", (Object)this.ownerId);
            return false;
        }
        try {
            this.scheduledFuture = this.scheduler.scheduleAtFixedRate(() -> this.heartbeatTaskRunner(threadToMonitor), this.heartbeatTimeMs, this.heartbeatTimeMs, TimeUnit.MILLISECONDS);
            this.logger.debug("Owner {}: Heartbeat started with interval: {} ms", (Object)this.ownerId, (Object)this.heartbeatTimeMs);
            return true;
        }
        catch (Exception e) {
            this.logger.error("Owner {}: Unable to schedule heartbeat task. {}", (Object)this.ownerId, (Object)e);
            return false;
        }
    }

    private void heartbeatTaskRunner(Thread threadToMonitor) {
        boolean heartbeatExecutionSuccessful;
        if (!this.heartbeatSemaphore.tryAcquire()) {
            this.logger.error("Owner {}: Heartbeat semaphore should be acquirable at the start of every heartbeat!", (Object)this.ownerId);
            return;
        }
        try {
            heartbeatExecutionSuccessful = this.executeHeartbeat(threadToMonitor);
        }
        finally {
            this.heartbeatSemaphore.release();
        }
        if (!heartbeatExecutionSuccessful) {
            this.logger.warn("Owner {}: Heartbeat function did not succeed.", (Object)this.ownerId);
            this.heartbeatTaskUnscheduleItself();
        }
    }

    private boolean executeHeartbeat(Thread threadToMonitor) {
        if (!threadToMonitor.isAlive()) {
            this.logger.warn("Owner {}: Monitored thread is no longer alive.", (Object)this.ownerId);
            return false;
        }
        try {
            return this.heartbeatFuncToExec.get();
        }
        catch (Exception e) {
            this.logger.error("Owner {}: Heartbeat function threw exception {}", (Object)this.ownerId, (Object)e);
            return false;
        }
    }

    private synchronized void heartbeatTaskUnscheduleItself() {
        if (this.scheduledFuture != null) {
            boolean cancellationSuccessful = this.scheduledFuture.cancel(true);
            this.logger.info("Owner {}: Requested termination of heartbeat task. Cancellation returned {}.", (Object)this.ownerId, (Object)cancellationSuccessful);
            this.scheduledFuture = null;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public boolean stopHeartbeat(boolean mayInterruptIfRunning) {
        if (this.cancelRecurringHeartbeatTask(mayInterruptIfRunning)) {
            return false;
        }
        boolean heartbeatStillInflight = this.syncWaitInflightHeartbeatTaskToFinish();
        if (heartbeatStillInflight) {
            if (Thread.currentThread().isInterrupted()) {
                this.logger.warn("Owner {}: Heartbeat is still in flight due to interruption!", (Object)this.ownerId);
            } else {
                this.logger.error("Owner {}: Heartbeat is still in flight!", (Object)this.ownerId);
            }
            return false;
        }
        LockProviderHeartbeatManager lockProviderHeartbeatManager = this;
        synchronized (lockProviderHeartbeatManager) {
            this.logger.debug("Owner {}: Heartbeat task successfully terminated.", (Object)this.ownerId);
            this.scheduledFuture = null;
        }
        return true;
    }

    private synchronized boolean cancelRecurringHeartbeatTask(boolean mayInterruptIfRunning) {
        if (!this.hasActiveHeartbeat()) {
            this.logger.warn("Owner {}: No active heartbeat task to stop.", (Object)this.ownerId);
            return true;
        }
        boolean cancellationSuccessful = this.scheduledFuture.cancel(mayInterruptIfRunning);
        this.logger.debug("Owner {}: Requested termination of heartbeat task. Cancellation returned {}", (Object)this.ownerId, (Object)cancellationSuccessful);
        return false;
    }

    private boolean syncWaitInflightHeartbeatTaskToFinish() {
        boolean heartbeatStillInflight = true;
        try {
            boolean bl = heartbeatStillInflight = !this.heartbeatSemaphore.tryAcquire(this.stopHeartbeatTimeoutMs, TimeUnit.MILLISECONDS);
            if (heartbeatStillInflight) {
                this.logger.warn("Owner {}: Timed out while waiting for heartbeat termination.", (Object)this.ownerId);
            }
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            this.logger.warn("Owner {}: Interrupted while waiting for heartbeat termination.", (Object)this.ownerId);
        }
        this.heartbeatSemaphore.release(heartbeatStillInflight ? 0 : 1);
        return heartbeatStillInflight;
    }

    @Override
    public synchronized boolean hasActiveHeartbeat() {
        return this.scheduledFuture != null;
    }

    @Override
    public synchronized void close() throws Exception {
        if (this.hasActiveHeartbeat()) {
            this.stopHeartbeat(true);
        }
        this.scheduler.shutdown();
        try {
            if (!this.scheduler.awaitTermination(5L, TimeUnit.SECONDS)) {
                this.scheduler.shutdownNow();
            }
        }
        catch (InterruptedException e) {
            this.scheduler.shutdownNow();
            Thread.currentThread().interrupt();
        }
    }
}

