/*
 * Decompiled with CFR 0.152.
 */
package com.playtika.janusgraph.aerospike.transaction;

import com.aerospike.client.AerospikeException;
import com.aerospike.client.Bin;
import com.aerospike.client.IAerospikeClient;
import com.aerospike.client.Key;
import com.aerospike.client.Value;
import com.aerospike.client.policy.RecordExistsAction;
import com.aerospike.client.policy.WritePolicy;
import com.google.common.util.concurrent.MoreExecutors;
import com.playtika.janusgraph.aerospike.transaction.TransactionalOperations;
import com.playtika.janusgraph.aerospike.transaction.WalOperations;
import com.playtika.janusgraph.aerospike.transaction.WriteAheadLogManager;
import com.playtika.janusgraph.aerospike.transaction.WriteAheadLogManagerBasic;
import com.playtika.janusgraph.aerospike.util.NamedThreadFactory;
import java.time.Duration;
import java.time.Instant;
import java.time.temporal.ChronoUnit;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.janusgraph.diskstorage.BackendException;
import org.janusgraph.diskstorage.locking.PermanentLockingException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class WriteAheadLogCompleter {
    private static Logger logger = LoggerFactory.getLogger(WriteAheadLogCompleter.class);
    private static final Instant JAN_01_2010 = Instant.parse("2010-01-01T00:00:00.00Z");
    private static final Value EXCLUSIVE_LOCK_KEY = Value.get((byte)0);
    private final IAerospikeClient client;
    private final WriteAheadLogManager writeAheadLogManager;
    private final long periodInMs;
    private final TransactionalOperations transactionalOperations;
    private final WritePolicy putLockPolicy;
    private final Key exclusiveLockKey;
    private final Bin exclusiveLockBin;
    private int generation = 0;
    private AtomicBoolean suspended = new AtomicBoolean(false);
    private final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new NamedThreadFactory("janus-aerospike", "wal"));

    public WriteAheadLogCompleter(WalOperations walOperations, TransactionalOperations transactionalOperations) {
        this.client = walOperations.getAerospikeOperations().getClient();
        this.writeAheadLogManager = transactionalOperations.getWriteAheadLogManager();
        this.transactionalOperations = transactionalOperations;
        this.putLockPolicy = this.buildPutLockPolicy(walOperations.getStaleTransactionLifetimeThresholdInMs());
        this.exclusiveLockBin = new Bin("EL", WriteAheadLogManagerBasic.getBytesFromUUID(UUID.randomUUID()));
        this.periodInMs = Duration.ofSeconds(this.putLockPolicy.expiration + 1).toMillis();
        this.exclusiveLockKey = new Key(walOperations.getWalNamespace(), walOperations.getWalSetName(), EXCLUSIVE_LOCK_KEY);
    }

    public void start() {
        this.scheduledExecutorService.scheduleAtFixedRate(this::completeHangedTransactions, 0L, this.periodInMs, TimeUnit.MILLISECONDS);
    }

    public void shutdown() {
        MoreExecutors.shutdownAndAwaitTermination((ExecutorService)this.scheduledExecutorService, (long)4L, (TimeUnit)TimeUnit.SECONDS);
    }

    public void suspend() {
        this.suspended.set(true);
    }

    public boolean isSuspended() {
        return this.suspended.get();
    }

    public void resume() {
        this.suspended.set(false);
    }

    private void completeHangedTransactions() {
        block10: {
            if (this.suspended.get()) {
                logger.info("WAL execution was suspended");
                return;
            }
            try {
                if (!this.acquireExclusiveLock()) break block10;
                List<WriteAheadLogManager.WalTransaction> staleTransactions = this.writeAheadLogManager.getStaleTransactions();
                logger.info("Got {} stale transactions", (Object)staleTransactions.size());
                for (WriteAheadLogManager.WalTransaction transaction : staleTransactions) {
                    if (this.suspended.get()) {
                        logger.info("WAL execution was suspended");
                        break;
                    }
                    if (Thread.currentThread().isInterrupted()) {
                        logger.info("WAL execution was interrupted");
                        break;
                    }
                    if (!this.renewExclusiveLock()) continue;
                    logger.info("Trying to complete transaction txId=[{}], timestamp=[{}]", (Object)transaction.transactionId, (Object)transaction.timestamp);
                    try {
                        this.transactionalOperations.processAndDeleteTransaction(transaction.transactionId, transaction.locks, transaction.mutations, true);
                        logger.info("Successfully complete transaction txId=[{}]", (Object)transaction.transactionId);
                    }
                    catch (PermanentLockingException be) {
                        logger.info("Failed to complete transaction txId=[{}] as it's already completed", (Object)transaction.transactionId, (Object)be);
                        this.transactionalOperations.releaseLocksAndDeleteWalTransactionOnError(transaction.locks, transaction.transactionId);
                        logger.info("released locks for transaction txId=[{}]", (Object)transaction.transactionId, (Object)be);
                    }
                    catch (Exception e) {
                        logger.error("!!! Failed to complete transaction txId=[{}], need to be investigated", (Object)transaction.transactionId, (Object)e);
                    }
                }
            }
            catch (BackendException t) {
                logger.error("Error while running completeHangedTransactions()", (Throwable)t);
                throw new RuntimeException(t);
            }
            catch (Throwable t) {
                logger.error("Error while running completeHangedTransactions()", t);
                throw t;
            }
        }
    }

    private boolean acquireExclusiveLock() {
        try {
            this.client.add(this.putLockPolicy, this.exclusiveLockKey, new Bin[]{this.exclusiveLockBin});
            ++this.generation;
            logger.info("Successfully got exclusive lock, will check for hanged transactions");
            return true;
        }
        catch (AerospikeException e) {
            if (e.getResultCode() == 5) {
                logger.debug("Failed to get exclusive lock, will try later");
                int expiration = this.client.get(null, (Key)this.exclusiveLockKey).expiration;
                logger.debug("lock will be released at {}", (Object)JAN_01_2010.plus((long)expiration, ChronoUnit.SECONDS));
                return false;
            }
            logger.error("Failed while getting exclusive lock", (Throwable)e);
            throw e;
        }
    }

    private WritePolicy buildPutLockPolicy(long expirationInMs) {
        WritePolicy putLockPolicy = new WritePolicy();
        putLockPolicy.recordExistsAction = RecordExistsAction.CREATE_ONLY;
        putLockPolicy.expiration = (int)Duration.ofMillis(expirationInMs).get(ChronoUnit.SECONDS);
        if (putLockPolicy.expiration < 1) {
            throw new IllegalArgumentException("Wrong expiration for WAL lock: " + putLockPolicy.expiration);
        }
        return putLockPolicy;
    }

    private boolean renewExclusiveLock() {
        try {
            this.client.touch(this.buildTouchLockPolicy(this.putLockPolicy.expiration, this.generation++), this.exclusiveLockKey);
            logger.info("Successfully renewed exclusive lock, will process transaction");
            return true;
        }
        catch (AerospikeException e) {
            logger.error("Failed while renew exclusive lock", (Throwable)e);
            throw e;
        }
    }

    private WritePolicy buildTouchLockPolicy(int expiration, int generation) {
        WritePolicy touchLockPolicy = new WritePolicy();
        touchLockPolicy.recordExistsAction = RecordExistsAction.UPDATE_ONLY;
        touchLockPolicy.generation = generation;
        touchLockPolicy.expiration = expiration;
        if (touchLockPolicy.expiration < 1) {
            throw new IllegalArgumentException("Wrong expiration for WAL lock: " + touchLockPolicy.expiration);
        }
        return touchLockPolicy;
    }
}

