/*
 * Decompiled with CFR 0.152.
 */
package nosql.batch.update.aerospike.wal;

import com.aerospike.client.AerospikeException;
import com.aerospike.client.Bin;
import com.aerospike.client.IAerospikeClient;
import com.aerospike.client.Key;
import com.aerospike.client.Value;
import com.aerospike.client.policy.GenerationPolicy;
import com.aerospike.client.policy.RecordExistsAction;
import com.aerospike.client.policy.WritePolicy;
import java.time.Duration;
import java.time.Instant;
import java.time.temporal.ChronoUnit;
import java.util.UUID;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import nosql.batch.update.aerospike.wal.AerospikeWriteAheadLogManager;
import nosql.batch.update.util.AsyncUtil;
import nosql.batch.update.wal.ExclusiveLocker;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class AerospikeExclusiveLocker
implements ExclusiveLocker {
    private static final Logger logger = LoggerFactory.getLogger(AerospikeExclusiveLocker.class);
    private static final Instant JAN_01_2010 = Instant.parse("2010-01-01T00:00:00.00Z");
    private static final Value EXCLUSIVE_LOCK_KEY = Value.get((byte)0);
    private final IAerospikeClient client;
    private final Duration exclusiveLockTtl;
    private final ScheduledExecutorService scheduledExecutorService;
    private final WritePolicy putLockPolicy;
    private final Bin exclusiveLockBin;
    private final Key exclusiveLockKey;
    private final AtomicInteger generation = new AtomicInteger(0);
    private final AtomicReference<ScheduledFuture> scheduledFuture = new AtomicReference();

    public AerospikeExclusiveLocker(IAerospikeClient client, String namespace, String setName) {
        this(client, namespace, setName, Executors.newSingleThreadScheduledExecutor(), Duration.ofSeconds(60L));
    }

    public AerospikeExclusiveLocker(IAerospikeClient client, String namespace, String setName, ScheduledExecutorService scheduledExecutorService, Duration exclusiveLockTtl) {
        this.client = client;
        this.exclusiveLockTtl = exclusiveLockTtl;
        this.scheduledExecutorService = scheduledExecutorService;
        this.putLockPolicy = this.buildPutLockPolicy();
        this.exclusiveLockBin = new Bin("EL", AerospikeWriteAheadLogManager.getBytesFromUUID(UUID.randomUUID()));
        this.exclusiveLockKey = new Key(namespace, setName, EXCLUSIVE_LOCK_KEY);
    }

    public boolean acquire() {
        if (this.generation.get() > 0) {
            return true;
        }
        try {
            this.client.put(this.putLockPolicy, this.exclusiveLockKey, new Bin[]{this.exclusiveLockBin});
            this.generation.incrementAndGet();
            logger.info("Successfully got exclusive WAL lock");
            this.scheduledFuture.set(this.scheduledExecutorService.scheduleAtFixedRate(this::upgradeLock, this.exclusiveLockTtl.getSeconds() / 2L, this.exclusiveLockTtl.getSeconds() / 2L, TimeUnit.SECONDS));
            return true;
        }
        catch (AerospikeException e) {
            if (e.getResultCode() == 5) {
                logger.debug("Failed to get exclusive WAL lock, will try later");
                int expiration = this.client.get(null, (Key)this.exclusiveLockKey).expiration;
                logger.debug("WAL lock will be released at {}", (Object)JAN_01_2010.plus((long)expiration, ChronoUnit.SECONDS));
                return false;
            }
            logger.error("Failed while getting exclusive WAL lock", (Throwable)e);
            throw e;
        }
    }

    public void release() {
        if (this.generation.get() > 0) {
            this.client.delete(null, this.exclusiveLockKey);
            this.reset();
        }
    }

    private WritePolicy buildPutLockPolicy() {
        WritePolicy putLockPolicy = new WritePolicy();
        putLockPolicy.recordExistsAction = RecordExistsAction.CREATE_ONLY;
        putLockPolicy.expiration = (int)this.exclusiveLockTtl.get(ChronoUnit.SECONDS);
        return putLockPolicy;
    }

    private void upgradeLock() {
        try {
            this.client.touch(this.buildTouchLockPolicy(), this.exclusiveLockKey);
            this.generation.incrementAndGet();
            logger.info("Successfully upgraded WAL lock");
        }
        catch (AerospikeException e) {
            logger.error("Failed while upgrading WAL lock", (Throwable)e);
            this.reset();
            throw e;
        }
    }

    private void reset() {
        this.generation.set(0);
        if (this.scheduledFuture.get() != null) {
            this.scheduledFuture.get().cancel(false);
            this.scheduledFuture.set(null);
        }
    }

    private WritePolicy buildTouchLockPolicy() {
        WritePolicy touchLockPolicy = new WritePolicy();
        touchLockPolicy.recordExistsAction = RecordExistsAction.UPDATE_ONLY;
        touchLockPolicy.generation = this.generation.get();
        touchLockPolicy.generationPolicy = GenerationPolicy.EXPECT_GEN_EQUAL;
        touchLockPolicy.expiration = (int)this.exclusiveLockTtl.get(ChronoUnit.SECONDS);
        return touchLockPolicy;
    }

    public void shutdown() {
        AsyncUtil.shutdownAndAwaitTermination((ExecutorService)this.scheduledExecutorService);
    }
}

