/*
 * Decompiled with CFR 0.152.
 */
package org.apache.iceberg.flink.maintenance.api;

import java.io.IOException;
import java.util.concurrent.TimeUnit;
import org.apache.flink.shaded.curator5.org.apache.curator.RetryPolicy;
import org.apache.flink.shaded.curator5.org.apache.curator.framework.CuratorFramework;
import org.apache.flink.shaded.curator5.org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.flink.shaded.curator5.org.apache.curator.framework.recipes.shared.SharedCount;
import org.apache.flink.shaded.curator5.org.apache.curator.framework.recipes.shared.VersionedValue;
import org.apache.flink.shaded.curator5.org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.iceberg.flink.maintenance.api.TriggerLockFactory;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ZkLockFactory
implements TriggerLockFactory {
    private static final Logger LOG = LoggerFactory.getLogger(ZkLockFactory.class);
    private static final String LOCK_BASE_PATH = "/iceberg/flink/maintenance/locks/";
    private final String connectString;
    private final String lockId;
    private final int sessionTimeoutMs;
    private final int connectionTimeoutMs;
    private final int baseSleepTimeMs;
    private final int maxRetries;
    private transient CuratorFramework client;
    private transient SharedCount taskSharedCount;
    private transient SharedCount recoverySharedCount;
    private volatile boolean isOpen;

    public ZkLockFactory(String connectString, String lockId, int sessionTimeoutMs, int connectionTimeoutMs, int baseSleepTimeMs, int maxRetries) {
        Preconditions.checkNotNull((Object)connectString, (Object)"Zookeeper connection string cannot be null");
        Preconditions.checkNotNull((Object)lockId, (Object)"Lock ID cannot be null");
        Preconditions.checkArgument((sessionTimeoutMs >= 0 ? 1 : 0) != 0, (String)"Session timeout must be positive, got: %s", (int)sessionTimeoutMs);
        Preconditions.checkArgument((connectionTimeoutMs >= 0 ? 1 : 0) != 0, (String)"Connection timeout must be positive, got: %s", (int)connectionTimeoutMs);
        Preconditions.checkArgument((baseSleepTimeMs >= 0 ? 1 : 0) != 0, (String)"Base sleep time must be positive, got: %s", (int)baseSleepTimeMs);
        Preconditions.checkArgument((maxRetries >= 0 ? 1 : 0) != 0, (String)"Max retries must be non-negative, got: %s", (int)maxRetries);
        this.connectString = connectString;
        this.lockId = lockId;
        this.sessionTimeoutMs = sessionTimeoutMs;
        this.connectionTimeoutMs = connectionTimeoutMs;
        this.baseSleepTimeMs = baseSleepTimeMs;
        this.maxRetries = maxRetries;
    }

    @Override
    public void open() {
        if (this.isOpen) {
            LOG.debug("ZkLockFactory already opened for lockId: {}.", (Object)this.lockId);
            return;
        }
        this.client = CuratorFrameworkFactory.builder().connectString(this.connectString).sessionTimeoutMs(this.sessionTimeoutMs).connectionTimeoutMs(this.connectionTimeoutMs).retryPolicy((RetryPolicy)new ExponentialBackoffRetry(this.baseSleepTimeMs, this.maxRetries)).build();
        this.client.start();
        try {
            if (!this.client.blockUntilConnected(this.connectionTimeoutMs, TimeUnit.MILLISECONDS)) {
                throw new IllegalStateException("Connection to Zookeeper timed out");
            }
            this.taskSharedCount = new SharedCount(this.client, this.getTaskSharePath(), 0);
            this.recoverySharedCount = new SharedCount(this.client, this.getRecoverySharedPath(), 0);
            this.taskSharedCount.start();
            this.recoverySharedCount.start();
            this.isOpen = true;
            LOG.info("ZkLockFactory initialized for lockId: {}.", (Object)this.lockId);
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new RuntimeException("Interrupted while connecting to Zookeeper", e);
        }
        catch (Exception e) {
            this.closeQuietly();
            throw new RuntimeException("Failed to initialize SharedCount", e);
        }
    }

    private String getTaskSharePath() {
        return LOCK_BASE_PATH + this.lockId + "/task";
    }

    private String getRecoverySharedPath() {
        return LOCK_BASE_PATH + this.lockId + "/recovery";
    }

    private void closeQuietly() {
        try {
            this.close();
        }
        catch (Exception e) {
            LOG.warn("Failed to close ZkLockFactory for lockId: {}", (Object)this.lockId, (Object)e);
        }
    }

    @Override
    public TriggerLockFactory.Lock createLock() {
        return new ZkLock(this.getTaskSharePath(), this.taskSharedCount);
    }

    @Override
    public TriggerLockFactory.Lock createRecoveryLock() {
        return new ZkLock(this.getRecoverySharedPath(), this.recoverySharedCount);
    }

    @Override
    public void close() throws IOException {
        try {
            if (this.taskSharedCount != null) {
                this.taskSharedCount.close();
            }
            if (this.recoverySharedCount != null) {
                this.recoverySharedCount.close();
            }
        }
        finally {
            if (this.client != null) {
                this.client.close();
            }
            this.isOpen = false;
        }
    }

    private static class ZkLock
    implements TriggerLockFactory.Lock {
        private final SharedCount sharedCount;
        private final String lockPath;
        private static final int LOCKED = 1;
        private static final int UNLOCKED = 0;

        private ZkLock(String lockPath, SharedCount sharedCount) {
            this.lockPath = lockPath;
            this.sharedCount = sharedCount;
        }

        @Override
        public boolean tryLock() {
            VersionedValue versionedValue = this.sharedCount.getVersionedValue();
            if (ZkLock.isHeld((VersionedValue<Integer>)versionedValue)) {
                LOG.debug("Lock is already held for path: {}", (Object)this.lockPath);
                return false;
            }
            try {
                boolean acquired = this.sharedCount.trySetCount(versionedValue, 1);
                if (!acquired) {
                    LOG.debug("Failed to acquire lock for path: {}", (Object)this.lockPath);
                }
                return acquired;
            }
            catch (Exception e) {
                LOG.warn("Failed to acquire Zookeeper lock", (Throwable)e);
                return false;
            }
        }

        @Override
        public boolean isHeld() {
            return ZkLock.isHeld((VersionedValue<Integer>)this.sharedCount.getVersionedValue());
        }

        private static boolean isHeld(VersionedValue<Integer> versionedValue) {
            try {
                return (Integer)versionedValue.getValue() == 1;
            }
            catch (Exception e) {
                throw new RuntimeException("Failed to check Zookeeper lock status", e);
            }
        }

        @Override
        public void unlock() {
            try {
                this.sharedCount.setCount(0);
                LOG.debug("Released lock for path: {}", (Object)this.lockPath);
            }
            catch (Exception e) {
                LOG.warn("Failed to release lock for path: {}", (Object)this.lockPath, (Object)e);
                throw new RuntimeException("Failed to release lock", e);
            }
        }
    }
}

