/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.org.apache.hadoop.hbase.master.region;

import java.io.Closeable;
import java.io.IOException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hudi.org.apache.hadoop.hbase.Abortable;
import org.apache.hudi.org.apache.hadoop.hbase.log.HBaseMarkers;
import org.apache.hudi.org.apache.hadoop.hbase.master.region.MasterRegionUtils;
import org.apache.hudi.org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hudi.org.apache.hadoop.hbase.regionserver.HStore;
import org.apache.hudi.org.apache.hadoop.hbase.regionserver.Store;
import org.apache.hudi.org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hudi.org.apache.hadoop.hbase.util.HFileArchiveUtil;
import org.apache.hudi.org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@InterfaceAudience.Private
class MasterRegionFlusherAndCompactor
implements Closeable {
    private static final Logger LOG = LoggerFactory.getLogger(MasterRegionFlusherAndCompactor.class);
    private final Configuration conf;
    private final Abortable abortable;
    private final HRegion region;
    private final AtomicLong changesAfterLastFlush = new AtomicLong(0L);
    private final long flushSize;
    private final long flushPerChanges;
    private final long flushIntervalMs;
    private final int compactMin;
    private final Path globalArchivePath;
    private final String archivedHFileSuffix;
    private final Thread flushThread;
    private final Lock flushLock = new ReentrantLock();
    private final Condition flushCond = this.flushLock.newCondition();
    private boolean flushRequest = false;
    private long lastFlushTime;
    private final ExecutorService compactExecutor;
    private final Lock compactLock = new ReentrantLock();
    private boolean compactRequest = false;
    private volatile boolean closed = false;

    MasterRegionFlusherAndCompactor(Configuration conf, Abortable abortable, HRegion region, long flushSize, long flushPerChanges, long flushIntervalMs, int compactMin, Path globalArchivePath, String archivedHFileSuffix) {
        this.conf = conf;
        this.abortable = abortable;
        this.region = region;
        this.flushSize = flushSize;
        this.flushPerChanges = flushPerChanges;
        this.flushIntervalMs = flushIntervalMs;
        this.compactMin = compactMin;
        this.globalArchivePath = globalArchivePath;
        this.archivedHFileSuffix = archivedHFileSuffix;
        this.flushThread = new Thread(this::flushLoop, region.getRegionInfo().getTable() + "-Flusher");
        this.flushThread.setDaemon(true);
        this.flushThread.start();
        this.compactExecutor = Executors.newSingleThreadExecutor(new ThreadFactoryBuilder().setNameFormat(region.getRegionInfo().getTable() + "-Store-Compactor").setDaemon(true).build());
        LOG.info("Constructor flushSize={}, flushPerChanges={}, flushIntervalMs={}, compactMin={}", flushSize, flushPerChanges, flushIntervalMs, compactMin);
    }

    static void setupConf(Configuration conf, long flushSize, long flushPerChanges, long flushIntervalMs) {
        conf.setLong("hbase.hregion.memstore.flush.size", flushSize);
        conf.setLong("hbase.regionserver.flush.per.changes", flushPerChanges);
        conf.setLong("hbase.regionserver.optionalcacheflushinterval", flushIntervalMs);
        LOG.info("Injected flushSize={}, flushPerChanges={}, flushIntervalMs={}", flushSize, flushPerChanges, flushIntervalMs);
    }

    private void moveHFileToGlobalArchiveDir() throws IOException {
        FileSystem fs = this.region.getRegionFileSystem().getFileSystem();
        for (HStore store : this.region.getStores()) {
            store.closeAndArchiveCompactedFiles();
            Path storeArchiveDir = HFileArchiveUtil.getStoreArchivePath(this.conf, this.region.getRegionInfo(), store.getColumnFamilyDescriptor().getName());
            Path globalStoreArchiveDir = HFileArchiveUtil.getStoreArchivePathForArchivePath(this.globalArchivePath, this.region.getRegionInfo(), store.getColumnFamilyDescriptor().getName());
            try {
                MasterRegionUtils.moveFilesUnderDir(fs, storeArchiveDir, globalStoreArchiveDir, this.archivedHFileSuffix);
            }
            catch (IOException e) {
                LOG.warn("Failed to move archived hfiles from {} to global dir {}", storeArchiveDir, globalStoreArchiveDir, e);
            }
        }
    }

    private void compact() {
        try {
            this.region.compact(true);
            this.moveHFileToGlobalArchiveDir();
        }
        catch (IOException e) {
            LOG.error("Failed to compact master local region", e);
        }
        this.compactLock.lock();
        try {
            if (this.needCompaction()) {
                this.compactExecutor.execute(this::compact);
            } else {
                this.compactRequest = false;
            }
        }
        finally {
            this.compactLock.unlock();
        }
    }

    private boolean needCompaction() {
        for (Store store : this.region.getStores()) {
            if (store.getStorefilesCount() < this.compactMin) continue;
            return true;
        }
        return false;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void flushLoop() {
        this.lastFlushTime = EnvironmentEdgeManager.currentTime();
        while (!this.closed) {
            this.flushLock.lock();
            try {
                while (!this.flushRequest) {
                    long waitTimeMs = this.lastFlushTime + this.flushIntervalMs - EnvironmentEdgeManager.currentTime();
                    if (waitTimeMs <= 0L) {
                        this.flushRequest = true;
                        break;
                    }
                    this.flushCond.await(waitTimeMs, TimeUnit.MILLISECONDS);
                    if (!this.closed) continue;
                    return;
                }
            }
            catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                continue;
            }
            finally {
                this.flushLock.unlock();
                continue;
            }
            assert (this.flushRequest);
            this.changesAfterLastFlush.set(0L);
            try {
                this.region.flush(true);
                this.lastFlushTime = EnvironmentEdgeManager.currentTime();
            }
            catch (IOException e) {
                LOG.error(HBaseMarkers.FATAL, "Failed to flush master local region, aborting...", e);
                this.abortable.abort("Failed to flush master local region", e);
                return;
            }
            this.compactLock.lock();
            try {
                if (!this.compactRequest && this.needCompaction()) {
                    this.compactRequest = true;
                    this.compactExecutor.execute(this::compact);
                }
            }
            finally {
                this.compactLock.unlock();
            }
            this.flushLock.lock();
            try {
                if (this.shouldFlush(this.changesAfterLastFlush.get())) continue;
                this.flushRequest = false;
            }
            finally {
                this.flushLock.unlock();
            }
        }
    }

    private boolean shouldFlush(long changes) {
        long offHeapSize;
        boolean flush;
        long heapSize = this.region.getMemStoreHeapSize();
        boolean bl = flush = heapSize + (offHeapSize = this.region.getMemStoreOffHeapSize()) >= this.flushSize || changes > this.flushPerChanges;
        if (flush && LOG.isTraceEnabled()) {
            LOG.trace("shouldFlush totalMemStoreSize={}, flushSize={}, changes={}, flushPerChanges={}", heapSize + offHeapSize, this.flushSize, changes, this.flushPerChanges);
        }
        return flush;
    }

    void onUpdate() {
        long changes = this.changesAfterLastFlush.incrementAndGet();
        if (this.shouldFlush(changes)) {
            this.requestFlush();
        }
    }

    void requestFlush() {
        this.flushLock.lock();
        try {
            if (this.flushRequest) {
                return;
            }
            this.flushRequest = true;
            this.flushCond.signalAll();
        }
        finally {
            this.flushLock.unlock();
        }
    }

    @Override
    public void close() {
        this.closed = true;
        this.flushThread.interrupt();
        this.compactExecutor.shutdown();
    }
}

