/*
 * Decompiled with CFR 0.152.
 */
package com.starrocks.data.load.stream.v2;

import com.starrocks.data.load.stream.StreamLoadStrategy;
import com.starrocks.data.load.stream.TableRegion;
import com.starrocks.data.load.stream.properties.StreamLoadProperties;
import com.starrocks.data.load.stream.v2.FlushReason;
import com.starrocks.data.load.stream.v2.TransactionTableRegion;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.atomic.AtomicLong;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class FlushAndCommitStrategy
implements StreamLoadStrategy {
    private static final long serialVersionUID = 1L;
    private static final Logger LOG = LoggerFactory.getLogger(FlushAndCommitStrategy.class);
    private final long expectDelayTime;
    private final long scanFrequency;
    private final long ageThreshold;
    private final long maxCacheBytes;
    private final boolean enableAutoCommit;
    private final AtomicLong numAgeTriggerFlush = new AtomicLong(0L);
    private final AtomicLong numCacheTriggerFlush = new AtomicLong(0L);
    private final AtomicLong numTableTriggerFlush = new AtomicLong(0L);

    public FlushAndCommitStrategy(StreamLoadProperties properties, boolean enableAutoCommit) {
        this.expectDelayTime = properties.getExpectDelayTime();
        this.scanFrequency = properties.getScanningFrequency();
        this.ageThreshold = this.expectDelayTime / this.scanFrequency;
        this.maxCacheBytes = properties.getMaxCacheBytes();
        this.enableAutoCommit = enableAutoCommit;
        LOG.info("{}", (Object)this);
    }

    @Override
    public List<TableRegion> select(Iterable<TableRegion> regions) {
        throw new UnsupportedOperationException();
    }

    public List<SelectFlushResult> selectFlushRegions(Queue<TransactionTableRegion> regions, long currentCacheBytes) {
        TransactionTableRegion region;
        ArrayList<SelectFlushResult> flushRegions = new ArrayList<SelectFlushResult>();
        for (TransactionTableRegion region2 : regions) {
            if (this.shouldCommit(region2)) {
                this.numAgeTriggerFlush.getAndIncrement();
                flushRegions.add(new SelectFlushResult(FlushReason.COMMIT, region2));
                LOG.debug("Choose region {} to flush because the region should commit, age: {}, threshold: {}, scanFreq: {}, expectDelayTime: {}", new Object[]{region2.getUniqueKey(), region2.getAge(), this.ageThreshold, this.scanFrequency, this.expectDelayTime});
                continue;
            }
            FlushReason reason = region2.shouldFlush();
            if (reason == FlushReason.NONE) continue;
            this.numTableTriggerFlush.getAndIncrement();
            flushRegions.add(new SelectFlushResult(reason, region2));
            LOG.debug("Choose region {} to flush because the region itself decide to flush, age: {}, threshold: {}, scanFreq: {}, expectDelayTime: {}, reason: {}", new Object[]{region2.getUniqueKey(), region2.getAge(), this.ageThreshold, this.scanFrequency, this.expectDelayTime, reason});
        }
        if (flushRegions.isEmpty() && currentCacheBytes >= this.maxCacheBytes && (region = (TransactionTableRegion)regions.stream().max(Comparator.comparingLong(TableRegion::getCacheBytes)).orElse(null)) != null) {
            this.numCacheTriggerFlush.getAndIncrement();
            flushRegions.add(new SelectFlushResult(FlushReason.CACHE_FULL, region));
            LOG.debug("Choose region {} to flush because it's force flush, age: {}, threshold: {}, scanFreq: {}, expectDelayTime: {}", new Object[]{region.getUniqueKey(), region.getAge(), this.ageThreshold, this.scanFrequency, this.expectDelayTime});
        }
        return flushRegions;
    }

    public boolean shouldCommit(TableRegion region) {
        return this.enableAutoCommit && region.getAge() > this.ageThreshold;
    }

    public String toString() {
        return "FlushAndCommitStrategy{expectDelayTime=" + this.expectDelayTime + ", scanFrequency=" + this.scanFrequency + ", ageThreshold=" + this.ageThreshold + ", maxCacheBytes=" + this.maxCacheBytes + ", enableAutoCommit=" + this.enableAutoCommit + ", numAgeTriggerFlush=" + this.numAgeTriggerFlush + ", numCacheTriggerFlush=" + this.numCacheTriggerFlush + ", numTableTriggerFlush=" + this.numTableTriggerFlush + '}';
    }

    public static class SelectFlushResult {
        private final FlushReason reason;
        private TransactionTableRegion region;

        public SelectFlushResult(FlushReason reason, TransactionTableRegion region) {
            this.reason = reason;
            this.region = region;
        }

        public FlushReason getReason() {
            return this.reason;
        }

        public TransactionTableRegion getRegion() {
            return this.region;
        }
    }
}

