/*
 * Decompiled with CFR 0.152.
 */
package nosql.batch.update.wal;

import java.time.Duration;
import java.util.List;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import nosql.batch.update.lock.LockingException;
import nosql.batch.update.util.AsyncUtil;
import nosql.batch.update.wal.CompletionStatistic;
import nosql.batch.update.wal.ExclusiveLocker;
import nosql.batch.update.wal.WalRecord;
import nosql.batch.update.wal.WalTimeRange;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class AbstractWriteAheadLogCompleter<LOCKS, UPDATES, BATCH_ID> {
    private static final Logger logger = LoggerFactory.getLogger(AbstractWriteAheadLogCompleter.class);
    private final Duration staleBatchesThreshold;
    private final int batchSize;
    private final ExclusiveLocker exclusiveLocker;
    private final ScheduledExecutorService scheduledExecutorService;
    private ScheduledFuture<?> scheduledFuture;
    private final AtomicBoolean suspended = new AtomicBoolean(false);

    public AbstractWriteAheadLogCompleter(Duration staleBatchesThreshold, int batchSize, ExclusiveLocker exclusiveLocker, ScheduledExecutorService scheduledExecutorService) {
        this.staleBatchesThreshold = staleBatchesThreshold;
        this.batchSize = batchSize;
        this.exclusiveLocker = exclusiveLocker;
        this.scheduledExecutorService = scheduledExecutorService;
    }

    public void start() {
        this.scheduledFuture = this.scheduledExecutorService.scheduleAtFixedRate(this::completeHangedTransactions, 0L, this.staleBatchesThreshold.toMillis() + 100L, TimeUnit.MILLISECONDS);
    }

    public void shutdown() {
        this.scheduledFuture.cancel(true);
        AsyncUtil.shutdownAndAwaitTermination(this.scheduledExecutorService);
        this.exclusiveLocker.release();
        this.exclusiveLocker.shutdown();
    }

    public void suspend() {
        logger.info("WAL completion is suspended");
        this.suspended.set(true);
        this.exclusiveLocker.release();
    }

    public boolean isSuspended() {
        return this.suspended.get();
    }

    public void resume() {
        logger.info("WAL completion is resumed");
        this.suspended.set(false);
    }

    public CompletionStatistic completeHangedTransactions() {
        if (this.suspended.get()) {
            logger.info("WAL completion was suspended");
            return new CompletionStatistic(0, 0, 0, 0);
        }
        int staleBatchesCount = 0;
        int completeBatchesCount = 0;
        int ignoredBatchesCount = 0;
        int errorBatchesCount = 0;
        try {
            if (this.exclusiveLocker.acquire()) {
                List<WalTimeRange> timeRanges = this.getTimeRanges(this.staleBatchesThreshold, this.batchSize);
                logger.info("Got {} chunks of stale transactions. Max chunk size {}", (Object)timeRanges.size(), (Object)this.batchSize);
                block5: for (WalTimeRange timeRange : timeRanges) {
                    List<WalRecord<LOCKS, UPDATES, BATCH_ID>> staleBatches = this.getStaleBatchesForRange(timeRange);
                    staleBatchesCount += staleBatches.size();
                    logger.info("Processing {} stale transactions", (Object)staleBatches.size());
                    for (WalRecord<LOCKS, UPDATES, BATCH_ID> batch : staleBatches) {
                        if (this.suspended.get()) {
                            logger.info("WAL completion was suspended");
                            continue block5;
                        }
                        if (Thread.currentThread().isInterrupted()) {
                            logger.info("WAL completion was interrupted");
                            continue block5;
                        }
                        if (!this.exclusiveLocker.acquire()) continue;
                        logger.info("Trying to complete batch batchId=[{}], timestamp=[{}]", batch.batchId, (Object)batch.timestamp);
                        try {
                            this.processAndDeleteTransactions(batch);
                            ++completeBatchesCount;
                            logger.info("Successfully complete batch batchId=[{}]", batch.batchId);
                        }
                        catch (LockingException be) {
                            logger.info("Failed to complete batch batchId=[{}] as it's already completed", batch.batchId, (Object)be);
                            this.releaseLocksAndDeleteWalTransactionOnError(batch);
                            ++ignoredBatchesCount;
                            logger.info("released locks for batch batchId=[{}]", batch.batchId, (Object)be);
                        }
                        catch (Exception e) {
                            ++errorBatchesCount;
                            logger.error("!!! Failed to complete batch batchId=[{}], need to be investigated", batch.batchId, (Object)e);
                        }
                    }
                }
            }
        }
        catch (Throwable t) {
            logger.error("Error while running completeHangedTransactions()", t);
        }
        return new CompletionStatistic(staleBatchesCount, completeBatchesCount, ignoredBatchesCount, errorBatchesCount);
    }

    protected abstract void releaseLocksAndDeleteWalTransactionOnError(WalRecord<LOCKS, UPDATES, BATCH_ID> var1);

    protected abstract void processAndDeleteTransactions(WalRecord<LOCKS, UPDATES, BATCH_ID> var1);

    protected abstract List<WalTimeRange> getTimeRanges(Duration var1, int var2);

    protected abstract List<WalRecord<LOCKS, UPDATES, BATCH_ID>> getStaleBatchesForRange(WalTimeRange var1);
}

