/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.client.transaction.lock;

import java.io.Serializable;
import java.net.URI;
import java.util.Objects;
import java.util.Properties;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import javax.annotation.concurrent.GuardedBy;
import javax.annotation.concurrent.ThreadSafe;
import org.apache.hudi.client.transaction.lock.StorageLockClient;
import org.apache.hudi.client.transaction.lock.audit.AuditOperationState;
import org.apache.hudi.client.transaction.lock.audit.AuditService;
import org.apache.hudi.client.transaction.lock.audit.AuditServiceFactory;
import org.apache.hudi.client.transaction.lock.metrics.HoodieLockMetrics;
import org.apache.hudi.client.transaction.lock.models.HeartbeatManager;
import org.apache.hudi.client.transaction.lock.models.LockGetResult;
import org.apache.hudi.client.transaction.lock.models.LockProviderHeartbeatManager;
import org.apache.hudi.client.transaction.lock.models.LockUpsertResult;
import org.apache.hudi.client.transaction.lock.models.StorageLockData;
import org.apache.hudi.client.transaction.lock.models.StorageLockFile;
import org.apache.hudi.common.config.LockConfiguration;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.lock.LockProvider;
import org.apache.hudi.common.lock.LockState;
import org.apache.hudi.common.util.Functions;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ReflectionUtils;
import org.apache.hudi.common.util.VisibleForTesting;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.StorageBasedLockConfig;
import org.apache.hudi.exception.HoodieLockException;
import org.apache.hudi.exception.HoodieNotSupportedException;
import org.apache.hudi.storage.StorageConfiguration;
import org.apache.hudi.storage.StorageSchemes;
import org.jetbrains.annotations.NotNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@ThreadSafe
public class StorageBasedLockProvider
implements LockProvider<StorageLockFile> {
    public static final String DEFAULT_TABLE_LOCK_FILE_NAME = "table_lock.json";
    private static final long CLOCK_DRIFT_BUFFER_MS = 500L;
    private static final Logger LOGGER = LoggerFactory.getLogger(StorageBasedLockProvider.class);
    private final Logger logger;
    private final StorageLockClient storageLockClient;
    private final long lockValiditySecs;
    private final String ownerId;
    private final String lockFilePath;
    private final HeartbeatManager heartbeatManager;
    private final transient Thread shutdownThread;
    private final Option<HoodieLockMetrics> hoodieLockMetrics;
    private Option<AuditService> auditService;
    private final String basePath;
    @GuardedBy(value="this")
    private StorageLockFile currentLockObj = null;
    @GuardedBy(value="this")
    private boolean isClosed = false;
    private static final String LOCK_STATE_LOGGER_MSG = "Owner {}: Lock file path {}, Thread {}, Storage based lock state {}";
    private static final String LOCK_STATE_LOGGER_MSG_WITH_INFO = "Owner {}: Lock file path {}, Thread {}, Storage based lock state {}, {}";

    private synchronized void setLock(StorageLockFile lockObj) {
        if (lockObj != null && !Objects.equals(lockObj.getOwner(), this.ownerId)) {
            throw new HoodieLockException("Owners do not match. Current lock owner: " + this.ownerId + " lock path: " + this.lockFilePath + " owner: " + lockObj.getOwner());
        }
        this.currentLockObj = lockObj;
    }

    public StorageBasedLockProvider(LockConfiguration lockConfiguration, StorageConfiguration<?> conf) {
        this(UUID.randomUUID().toString(), lockConfiguration.getConfig(), (Functions.Function3<String, Long, Supplier<Boolean>, HeartbeatManager>)((Functions.Function3 & Serializable)LockProviderHeartbeatManager::new), StorageBasedLockProvider.getStorageLockClientClassName(), LOGGER, null);
    }

    public StorageBasedLockProvider(LockConfiguration lockConfiguration, StorageConfiguration<?> conf, HoodieLockMetrics metrics) {
        this(UUID.randomUUID().toString(), lockConfiguration.getConfig(), (Functions.Function3<String, Long, Supplier<Boolean>, HeartbeatManager>)((Functions.Function3 & Serializable)LockProviderHeartbeatManager::new), StorageBasedLockProvider.getStorageLockClientClassName(), LOGGER, metrics);
    }

    private static Functions.Function3<String, String, TypedProperties, StorageLockClient> getStorageLockClientClassName() {
        return (Functions.Function3 & Serializable)(ownerId, lockFilePath, lockConfig) -> {
            try {
                return (StorageLockClient)ReflectionUtils.loadClass((String)StorageBasedLockProvider.getLockServiceClassName(new URI((String)lockFilePath).getScheme()), (Class[])new Class[]{String.class, String.class, Properties.class}, (Object[])new Object[]{ownerId, lockFilePath, lockConfig});
            }
            catch (Throwable e) {
                throw new HoodieLockException("Failed to load and initialize StorageLock", e);
            }
        };
    }

    @NotNull
    private static String getLockServiceClassName(String scheme) {
        Option schemeOptional = StorageSchemes.getStorageLockImplementationIfExists((String)scheme);
        if (schemeOptional.isPresent()) {
            return ((StorageSchemes)schemeOptional.get()).getStorageLockClass();
        }
        throw new HoodieNotSupportedException("No implementation of StorageLock supports this scheme: " + scheme);
    }

    @VisibleForTesting
    StorageBasedLockProvider(String ownerId, TypedProperties properties, Functions.Function3<String, Long, Supplier<Boolean>, HeartbeatManager> heartbeatManagerLoader, Functions.Function3<String, String, TypedProperties, StorageLockClient> storageLockClientLoader, Logger logger, HoodieLockMetrics hoodieLockMetrics) {
        StorageBasedLockConfig config = new StorageBasedLockConfig.Builder().fromProperties(properties).build();
        long heartbeatPollSeconds = config.getRenewIntervalSecs();
        this.lockValiditySecs = config.getValiditySeconds();
        this.basePath = config.getHudiTableBasePath();
        String lockFolderPath = StorageLockClient.getLockFolderPath(this.basePath);
        this.lockFilePath = String.format("%s%s%s", lockFolderPath, "/", DEFAULT_TABLE_LOCK_FILE_NAME);
        this.heartbeatManager = (HeartbeatManager)heartbeatManagerLoader.apply((Object)ownerId, (Object)TimeUnit.SECONDS.toMillis(heartbeatPollSeconds), this::renewLock);
        this.storageLockClient = (StorageLockClient)storageLockClientLoader.apply((Object)ownerId, (Object)this.lockFilePath, (Object)properties);
        this.ownerId = ownerId;
        this.logger = logger;
        this.hoodieLockMetrics = Option.ofNullable((Object)hoodieLockMetrics);
        this.auditService = Option.empty();
        this.shutdownThread = new Thread(() -> this.shutdown(true));
        Runtime.getRuntime().addShutdownHook(this.shutdownThread);
        logger.info("Instantiated new storage-based lock provider, owner: {}, lockfilePath: {}", (Object)ownerId, (Object)this.lockFilePath);
    }

    public synchronized StorageLockFile getLock() {
        return this.currentLockObj;
    }

    public boolean tryLock(long time, TimeUnit unit) {
        long deadlineNanos = System.nanoTime() + unit.toNanos(time);
        while (System.nanoTime() < deadlineNanos) {
            try {
                this.logDebugLockState(LockState.ACQUIRING);
                if (this.tryLock()) {
                    return true;
                }
                Thread.sleep(Long.parseLong(LockConfiguration.DEFAULT_LOCK_ACQUIRE_RETRY_WAIT_TIME_IN_MILLIS));
            }
            catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                this.hoodieLockMetrics.ifPresent(HoodieLockMetrics::updateLockInterruptedMetric);
                throw new HoodieLockException(this.generateLockStateMessage(LockState.FAILED_TO_ACQUIRE), (Throwable)e);
            }
        }
        return false;
    }

    public synchronized void close() {
        this.shutdown(false);
    }

    private synchronized void shutdown(boolean fromShutdownHook) {
        if (fromShutdownHook) {
            if (!this.isClosed && this.actuallyHoldsLock()) {
                this.tryExpireCurrentLock(true);
            }
            this.isClosed = true;
            return;
        }
        try {
            this.tryRemoveShutdownHook();
        }
        catch (IllegalStateException e) {
            this.logger.warn("Owner {}: Failed to remove shutdown hook, JVM is already shutting down.", (Object)this.ownerId, (Object)e);
        }
        try {
            this.unlock();
        }
        catch (Exception e) {
            this.logger.error("Owner {}: Failed to unlock current lock.", (Object)this.ownerId, (Object)e);
        }
        try {
            this.storageLockClient.close();
        }
        catch (Exception e) {
            this.logger.error("Owner {}: Lock service failed to close.", (Object)this.ownerId, (Object)e);
        }
        try {
            this.heartbeatManager.close();
        }
        catch (Exception e) {
            this.logger.error("Owner {}: Heartbeat manager failed to close.", (Object)this.ownerId, (Object)e);
        }
        try {
            this.auditService.ifPresent(auditService -> {
                try {
                    auditService.close();
                }
                catch (Exception e) {
                    this.logger.error("Owner {}: Audit service failed to close.", (Object)this.ownerId, (Object)e);
                }
            });
            this.auditService = Option.empty();
        }
        catch (Exception e) {
            this.logger.error("Owner {}: Failed to close audit service.", (Object)this.ownerId, (Object)e);
        }
        this.isClosed = true;
    }

    @VisibleForTesting
    void tryRemoveShutdownHook() {
        Runtime.getRuntime().removeShutdownHook(this.shutdownThread);
    }

    private synchronized boolean isLockStillValid(StorageLockFile lock) {
        if (lock.isExpired()) {
            return false;
        }
        if (this.isCurrentTimeCertainlyOlderThanDistributedTime(lock.getValidUntilMs())) {
            this.hoodieLockMetrics.ifPresent(HoodieLockMetrics::updateLockDanglingMetric);
            this.logWarnLockState(LockState.ACQUIRING, "Found a dangling expired lock with owner " + lock.getOwner());
            return false;
        }
        return true;
    }

    public synchronized boolean tryLock() {
        this.assertHeartbeatManagerExists();
        this.assertUnclosed();
        this.logDebugLockState(LockState.ACQUIRING);
        if (this.actuallyHoldsLock()) {
            return true;
        }
        if (this.heartbeatManager.hasActiveHeartbeat()) {
            this.logger.error("Detected broken invariant: there is an active heartbeat without a lock being held.");
            this.hoodieLockMetrics.ifPresent(HoodieLockMetrics::updateLockProviderFatalErrorMetric);
            throw new HoodieLockException(this.generateLockStateMessage(LockState.FAILED_TO_ACQUIRE));
        }
        Pair<LockGetResult, Option<StorageLockFile>> latestLock = this.storageLockClient.readCurrentLockFile();
        if (latestLock.getLeft() == LockGetResult.UNKNOWN_ERROR) {
            this.logInfoLockState(LockState.FAILED_TO_ACQUIRE, "Failed to get the latest lock status");
            this.hoodieLockMetrics.ifPresent(HoodieLockMetrics::updateLockStateUnknownMetric);
            return false;
        }
        if (latestLock.getLeft() == LockGetResult.SUCCESS && this.isLockStillValid((StorageLockFile)((Option)latestLock.getRight()).get())) {
            String msg = String.format("Lock already held by %s", ((StorageLockFile)((Option)latestLock.getRight()).get()).getOwner());
            this.logInfoLockState(LockState.FAILED_TO_ACQUIRE, msg);
            return false;
        }
        long acquisitionTimestamp = this.getCurrentEpochMs();
        long lockExpirationMs = this.calculateLockExpiration(acquisitionTimestamp);
        StorageLockData newLockData = new StorageLockData(false, lockExpirationMs, this.ownerId);
        Pair<LockUpsertResult, Option<StorageLockFile>> lockUpdateStatus = this.storageLockClient.tryUpsertLockFile(newLockData, (Option<StorageLockFile>)((Option)latestLock.getRight()));
        if (lockUpdateStatus.getLeft() != LockUpsertResult.SUCCESS) {
            this.logInfoLockState(LockState.FAILED_TO_ACQUIRE);
            this.hoodieLockMetrics.ifPresent(HoodieLockMetrics::updateLockAcquirePreconditionFailureMetric);
            return false;
        }
        this.setLock((StorageLockFile)((Option)lockUpdateStatus.getRight()).get());
        this.hoodieLockMetrics.ifPresent(metrics -> metrics.updateLockExpirationDeadlineMetric((int)(((StorageLockFile)((Option)lockUpdateStatus.getRight()).get()).getValidUntilMs() - this.getCurrentEpochMs())));
        if (!this.heartbeatManager.startHeartbeatForThread(Thread.currentThread())) {
            this.logErrorLockState(LockState.RELEASING, "We were unable to start the heartbeat!");
            this.hoodieLockMetrics.ifPresent(HoodieLockMetrics::updateLockProviderFatalErrorMetric);
            this.tryExpireCurrentLock(false);
            return false;
        }
        this.logInfoLockState(LockState.ACQUIRED);
        if (this.auditService.isEmpty()) {
            this.auditService = AuditServiceFactory.createLockProviderAuditService(this.ownerId, this.basePath, this.storageLockClient, acquisitionTimestamp, this::calculateLockExpiration, this::actuallyHoldsLock);
        }
        this.recordAuditOperation(AuditOperationState.START, acquisitionTimestamp);
        return true;
    }

    private boolean actuallyHoldsLock() {
        return this.believesLockMightBeHeld() && this.isLockStillValid(this.getLock());
    }

    private boolean believesLockMightBeHeld() {
        return this.getLock() != null;
    }

    public synchronized void unlock() {
        this.assertHeartbeatManagerExists();
        if (!this.believesLockMightBeHeld()) {
            return;
        }
        boolean believesNoLongerHoldsLock = true;
        if (this.heartbeatManager.hasActiveHeartbeat()) {
            this.logger.debug("Owner {}: Gracefully shutting down heartbeat.", (Object)this.ownerId);
            believesNoLongerHoldsLock &= this.heartbeatManager.stopHeartbeat(true);
        }
        if (!(believesNoLongerHoldsLock &= this.tryExpireCurrentLock(false))) {
            this.hoodieLockMetrics.ifPresent(HoodieLockMetrics::updateLockReleaseFailureMetric);
            throw new HoodieLockException(this.generateLockStateMessage(LockState.FAILED_TO_RELEASE));
        }
    }

    private void assertHeartbeatManagerExists() {
        if (this.heartbeatManager == null) {
            this.hoodieLockMetrics.ifPresent(HoodieLockMetrics::updateLockProviderFatalErrorMetric);
            throw new HoodieLockException("Unexpected null heartbeatManager");
        }
    }

    private void assertUnclosed() {
        if (this.isClosed) {
            this.hoodieLockMetrics.ifPresent(HoodieLockMetrics::updateLockProviderFatalErrorMetric);
            throw new HoodieLockException("Lock provider already closed");
        }
    }

    private synchronized boolean tryExpireCurrentLock(boolean fromShutdownHook) {
        if (!fromShutdownHook && this.heartbeatManager.hasActiveHeartbeat()) {
            throw new HoodieLockException("Must stop heartbeat before expire lock file");
        }
        this.logDebugLockState(LockState.RELEASING);
        StorageLockData expiredLockData = new StorageLockData(true, this.getLock().getValidUntilMs(), this.ownerId);
        Pair<LockUpsertResult, Option<StorageLockFile>> result = this.storageLockClient.tryUpsertLockFile(expiredLockData, (Option<StorageLockFile>)Option.of((Object)this.getLock()));
        switch ((LockUpsertResult)((Object)result.getLeft())) {
            case UNKNOWN_ERROR: {
                this.logErrorLockState(LockState.FAILED_TO_RELEASE, "Lock state is unknown.");
                this.hoodieLockMetrics.ifPresent(HoodieLockMetrics::updateLockStateUnknownMetric);
                return false;
            }
            case SUCCESS: {
                this.logInfoLockState(LockState.RELEASED);
                this.recordAuditOperation(AuditOperationState.END, System.currentTimeMillis());
                this.setLock(null);
                return true;
            }
            case ACQUIRED_BY_OTHERS: {
                this.logErrorLockState(LockState.RELEASED, "lock should not have been acquired by others.");
                this.setLock(null);
                this.hoodieLockMetrics.ifPresent(HoodieLockMetrics::updateLockAcquiredByOthersErrorMetric);
                return true;
            }
        }
        this.hoodieLockMetrics.ifPresent(HoodieLockMetrics::updateLockReleaseFailureMetric);
        throw new HoodieLockException("Unexpected lock update result: " + result.getLeft());
    }

    @VisibleForTesting
    protected synchronized boolean renewLock() {
        try {
            if (!this.believesLockMightBeHeld()) {
                this.logger.warn("Owner {}: Cannot renew, no lock held by this process", (Object)this.ownerId);
                return false;
            }
            long oldExpirationMs = this.getLock().getValidUntilMs();
            long acquisitionTimestamp = this.getCurrentEpochMs();
            long lockExpirationMs = this.calculateLockExpiration(acquisitionTimestamp);
            Pair<LockUpsertResult, Option<StorageLockFile>> currentLock = this.storageLockClient.tryUpsertLockFile(new StorageLockData(false, lockExpirationMs, this.ownerId), (Option<StorageLockFile>)Option.of((Object)this.getLock()));
            switch ((LockUpsertResult)((Object)currentLock.getLeft())) {
                case ACQUIRED_BY_OTHERS: {
                    this.logger.error("Owner {}: Unable to renew lock as it is acquired by others.", (Object)this.ownerId);
                    this.hoodieLockMetrics.ifPresent(HoodieLockMetrics::updateLockAcquiredByOthersErrorMetric);
                    return false;
                }
                case UNKNOWN_ERROR: {
                    this.logger.warn("Owner {}: Unable to renew lock due to unknown error, could be transient.", (Object)this.ownerId);
                    this.hoodieLockMetrics.ifPresent(HoodieLockMetrics::updateLockStateUnknownMetric);
                    return true;
                }
                case SUCCESS: {
                    this.setLock((StorageLockFile)((Option)currentLock.getRight()).get());
                    this.hoodieLockMetrics.ifPresent(metrics -> metrics.updateLockExpirationDeadlineMetric((int)(oldExpirationMs - this.getCurrentEpochMs())));
                    this.logger.info("Owner {}: Lock renewal successful. The renewal completes {} ms before expiration for lock {}.", new Object[]{this.ownerId, oldExpirationMs - this.getCurrentEpochMs(), this.lockFilePath});
                    this.recordAuditOperation(AuditOperationState.RENEW, acquisitionTimestamp);
                    return true;
                }
            }
            throw new HoodieLockException("Unexpected lock update result: " + currentLock.getLeft());
        }
        catch (Exception e) {
            this.logger.error("Owner {}: Exception occurred while renewing lock", (Object)this.ownerId, (Object)e);
            this.hoodieLockMetrics.ifPresent(HoodieLockMetrics::updateLockProviderFatalErrorMetric);
            return false;
        }
    }

    protected boolean isCurrentTimeCertainlyOlderThanDistributedTime(long epochMs) {
        return this.getCurrentEpochMs() > epochMs + 500L;
    }

    private String generateLockStateMessage(LockState state) {
        String threadName = Thread.currentThread().getName();
        return String.format("Owner %s: Lock file path %s, Thread %s, Storage based lock state %s", this.ownerId, this.lockFilePath, threadName, state.toString());
    }

    private void logDebugLockState(LockState state) {
        this.logger.debug(LOCK_STATE_LOGGER_MSG, new Object[]{this.ownerId, this.lockFilePath, Thread.currentThread(), state});
    }

    private void logInfoLockState(LockState state) {
        this.logger.info(LOCK_STATE_LOGGER_MSG, new Object[]{this.ownerId, this.lockFilePath, Thread.currentThread(), state});
    }

    private void logInfoLockState(LockState state, String msg) {
        this.logger.info(LOCK_STATE_LOGGER_MSG_WITH_INFO, new Object[]{this.ownerId, this.lockFilePath, Thread.currentThread(), state, msg});
    }

    private void logWarnLockState(LockState state, String msg) {
        this.logger.warn(LOCK_STATE_LOGGER_MSG_WITH_INFO, new Object[]{this.ownerId, this.lockFilePath, Thread.currentThread(), state, msg});
    }

    private void logErrorLockState(LockState state, String msg) {
        this.logger.error(LOCK_STATE_LOGGER_MSG_WITH_INFO, new Object[]{this.ownerId, this.lockFilePath, Thread.currentThread(), state, msg});
    }

    @VisibleForTesting
    long getCurrentEpochMs() {
        return System.currentTimeMillis();
    }

    private long calculateLockExpiration(long timestamp) {
        return timestamp + TimeUnit.SECONDS.toMillis(this.lockValiditySecs);
    }

    private void recordAuditOperation(AuditOperationState state, long timestamp) {
        this.auditService.ifPresent(service -> {
            try {
                service.recordOperation(state, timestamp);
            }
            catch (Exception e) {
                this.logger.warn("Owner {}: Failed to record audit operation {}: {}", new Object[]{this.ownerId, state, e.getMessage()});
            }
        });
    }
}

