/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.ozone.om.ratis;

import com.google.common.annotations.VisibleForTesting;
import java.io.IOException;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.hadoop.ozone.om.OMMetadataManager;
import org.apache.hadoop.ozone.om.ratis.OzoneManagerRatisSnapshot;
import org.apache.hadoop.ozone.om.ratis.helpers.DoubleBufferEntry;
import org.apache.hadoop.ozone.om.ratis.metrics.OzoneManagerDoubleBufferMetrics;
import org.apache.hadoop.ozone.om.response.OMClientResponse;
import org.apache.hadoop.util.Daemon;
import org.apache.hadoop.utils.db.BatchOperation;
import org.apache.ratis.util.ExitUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class OzoneManagerDoubleBuffer {
    private static final Logger LOG = LoggerFactory.getLogger(OzoneManagerDoubleBuffer.class);
    private Queue<DoubleBufferEntry<OMClientResponse>> currentBuffer;
    private Queue<DoubleBufferEntry<OMClientResponse>> readyBuffer;
    private Daemon daemon;
    private final OMMetadataManager omMetadataManager;
    private final AtomicLong flushedTransactionCount = new AtomicLong(0L);
    private final AtomicLong flushIterations = new AtomicLong(0L);
    private volatile boolean isRunning;
    private OzoneManagerDoubleBufferMetrics ozoneManagerDoubleBufferMetrics;
    private long maxFlushedTransactionsInOneIteration;
    private final OzoneManagerRatisSnapshot ozoneManagerRatisSnapShot;

    public OzoneManagerDoubleBuffer(OMMetadataManager omMetadataManager, OzoneManagerRatisSnapshot ozoneManagerRatisSnapShot) {
        this.currentBuffer = new ConcurrentLinkedQueue<DoubleBufferEntry<OMClientResponse>>();
        this.readyBuffer = new ConcurrentLinkedQueue<DoubleBufferEntry<OMClientResponse>>();
        this.omMetadataManager = omMetadataManager;
        this.ozoneManagerRatisSnapShot = ozoneManagerRatisSnapShot;
        this.ozoneManagerDoubleBufferMetrics = OzoneManagerDoubleBufferMetrics.create();
        this.isRunning = true;
        this.daemon = new Daemon(this::flushTransactions);
        this.daemon.setName("OMDoubleBufferFlushThread");
        this.daemon.start();
    }

    private void flushTransactions() {
        while (this.isRunning) {
            try {
                if (!this.canFlush()) continue;
                this.setReadyBuffer();
                BatchOperation batchOperation = this.omMetadataManager.getStore().initBatchOperation();
                this.readyBuffer.iterator().forEachRemaining(entry -> {
                    try {
                        ((OMClientResponse)entry.getResponse()).addToDBBatch(this.omMetadataManager, batchOperation);
                    }
                    catch (IOException ex) {
                        this.terminate(ex);
                    }
                });
                this.omMetadataManager.getStore().commitBatchOperation(batchOperation);
                int flushedTransactionsSize = this.readyBuffer.size();
                this.flushedTransactionCount.addAndGet(flushedTransactionsSize);
                this.flushIterations.incrementAndGet();
                LOG.debug("Sync Iteration {} flushed transactions in this iteration{}", (Object)this.flushIterations.get(), (Object)flushedTransactionsSize);
                long lastRatisTransactionIndex = this.readyBuffer.stream().map(DoubleBufferEntry::getTrxLogIndex).max(Long::compareTo).get();
                this.readyBuffer.clear();
                this.cleanupCache(lastRatisTransactionIndex);
                this.ozoneManagerRatisSnapShot.updateLastAppliedIndex(lastRatisTransactionIndex);
                this.updateMetrics(flushedTransactionsSize);
            }
            catch (InterruptedException ex) {
                Thread.currentThread().interrupt();
                if (this.isRunning) {
                    String message = "OMDoubleBuffer flush thread " + Thread.currentThread().getName() + " encountered Interrupted exception while running";
                    ExitUtils.terminate((int)1, (String)message, (Throwable)ex, (Logger)LOG);
                    continue;
                }
                LOG.info("OMDoubleBuffer flush thread " + Thread.currentThread().getName() + " is interrupted and will exit. {}", (Object)Thread.currentThread().getName());
            }
            catch (IOException ex) {
                this.terminate(ex);
            }
            catch (Throwable t) {
                String s = "OMDoubleBuffer flush thread" + Thread.currentThread().getName() + "encountered Throwable error";
                ExitUtils.terminate((int)2, (String)s, (Throwable)t, (Logger)LOG);
            }
        }
    }

    private void cleanupCache(long lastRatisTransactionIndex) {
        this.omMetadataManager.getBucketTable().cleanupCache(lastRatisTransactionIndex);
        this.omMetadataManager.getVolumeTable().cleanupCache(lastRatisTransactionIndex);
        this.omMetadataManager.getUserTable().cleanupCache(lastRatisTransactionIndex);
        this.omMetadataManager.getOpenKeyTable().cleanupCache(lastRatisTransactionIndex);
        this.omMetadataManager.getKeyTable().cleanupCache(lastRatisTransactionIndex);
        this.omMetadataManager.getDeletedTable().cleanupCache(lastRatisTransactionIndex);
        this.omMetadataManager.getS3Table().cleanupCache(lastRatisTransactionIndex);
        this.omMetadataManager.getMultipartInfoTable().cleanupCache(lastRatisTransactionIndex);
    }

    private void updateMetrics(long flushedTransactionsSize) {
        this.ozoneManagerDoubleBufferMetrics.incrTotalNumOfFlushOperations();
        this.ozoneManagerDoubleBufferMetrics.incrTotalSizeOfFlushedTransactions(flushedTransactionsSize);
        if (this.maxFlushedTransactionsInOneIteration < flushedTransactionsSize) {
            this.maxFlushedTransactionsInOneIteration = flushedTransactionsSize;
            this.ozoneManagerDoubleBufferMetrics.setMaxNumberOfTransactionsFlushedInOneIteration(flushedTransactionsSize);
        }
    }

    public synchronized void stop() {
        if (this.isRunning) {
            LOG.info("Stopping OMDoubleBuffer flush thread");
            this.isRunning = false;
            this.daemon.interrupt();
            this.ozoneManagerDoubleBufferMetrics.unRegister();
        } else {
            LOG.info("OMDoubleBuffer flush thread is not running.");
        }
    }

    private void terminate(IOException ex) {
        String message = "During flush to DB encountered error in OMDoubleBuffer flush thread " + Thread.currentThread().getName();
        ExitUtils.terminate((int)1, (String)message, (Throwable)ex, (Logger)LOG);
    }

    public long getFlushedTransactionCount() {
        return this.flushedTransactionCount.get();
    }

    public long getFlushIterations() {
        return this.flushIterations.get();
    }

    public synchronized void add(OMClientResponse response, long transactionIndex) {
        this.currentBuffer.add(new DoubleBufferEntry<OMClientResponse>(transactionIndex, response));
        this.notify();
    }

    private synchronized boolean canFlush() throws InterruptedException {
        while (this.currentBuffer.size() == 0) {
            this.wait(Long.MAX_VALUE);
        }
        return true;
    }

    private synchronized void setReadyBuffer() {
        Queue<DoubleBufferEntry<OMClientResponse>> temp = this.currentBuffer;
        this.currentBuffer = this.readyBuffer;
        this.readyBuffer = temp;
    }

    @VisibleForTesting
    public OzoneManagerDoubleBufferMetrics getOzoneManagerDoubleBufferMetrics() {
        return this.ozoneManagerDoubleBufferMetrics;
    }
}

