/*
 * Decompiled with CFR 0.152.
 */
package com.starrocks.data.load.stream;

import com.starrocks.data.load.stream.BatchTableRegion;
import com.starrocks.data.load.stream.DefaultStreamLoader;
import com.starrocks.data.load.stream.EnvUtils;
import com.starrocks.data.load.stream.LabelGenerator;
import com.starrocks.data.load.stream.LabelGeneratorFactory;
import com.starrocks.data.load.stream.LoadMetrics;
import com.starrocks.data.load.stream.StarRocksVersion;
import com.starrocks.data.load.stream.StreamLoadDataFormat;
import com.starrocks.data.load.stream.StreamLoadManager;
import com.starrocks.data.load.stream.StreamLoadResponse;
import com.starrocks.data.load.stream.StreamLoadSnapshot;
import com.starrocks.data.load.stream.StreamLoadStrategy;
import com.starrocks.data.load.stream.StreamLoadUtils;
import com.starrocks.data.load.stream.StreamLoader;
import com.starrocks.data.load.stream.TableRegion;
import com.starrocks.data.load.stream.TransactionStreamLoader;
import com.starrocks.data.load.stream.properties.StreamLoadProperties;
import com.starrocks.data.load.stream.properties.StreamLoadTableProperties;
import java.io.Serializable;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.Map;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.LockSupport;
import java.util.concurrent.locks.ReentrantLock;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class DefaultStreamLoadManager
implements StreamLoadManager,
Serializable {
    private static final long serialVersionUID = 1L;
    private static final Logger LOG = LoggerFactory.getLogger(DefaultStreamLoadManager.class);
    private static final Logger log = LoggerFactory.getLogger(DefaultStreamLoadManager.class);
    private final StreamLoadProperties properties;
    private final StreamLoader streamLoader;
    private final long maxCacheBytes;
    private final long maxWriteBlockCacheBytes;
    private final Map<String, TableRegion> regions = new HashMap<String, TableRegion>();
    private final AtomicLong currentCacheBytes = new AtomicLong(0L);
    private final AtomicLong totalFlushRows = new AtomicLong(0L);
    private final AtomicLong numberTotalRows = new AtomicLong(0L);
    private final AtomicLong numberLoadRows = new AtomicLong(0L);
    private final StreamLoadStrategy loadStrategy;
    private final long scanningFrequency;
    private Thread current;
    private Thread manager;
    private volatile boolean savepoint = false;
    private final Lock lock = new ReentrantLock();
    private final Condition writable = this.lock.newCondition();
    private final Condition flushable = this.lock.newCondition();
    private final AtomicReference<State> state = new AtomicReference<State>(State.INACTIVE);
    private volatile Throwable e;
    private final Queue<TableRegion> waitQ = new ConcurrentLinkedQueue<TableRegion>();
    private final Queue<TableRegion> prepareQ = new LinkedList<TableRegion>();
    private final Queue<TableRegion> commitQ = new LinkedList<TableRegion>();
    private transient LabelGeneratorFactory labelGeneratorFactory;
    private transient AtomicBoolean writeTriggerFlush;
    private transient LoadMetrics loadMetrics;

    public DefaultStreamLoadManager(StreamLoadProperties properties) {
        this(properties, new StreamLoadStrategy.DefaultLoadStrategy(properties));
    }

    public DefaultStreamLoadManager(StreamLoadProperties properties, StreamLoadStrategy loadStrategy) {
        this.properties = properties;
        this.streamLoader = properties.isEnableTransaction() ? new TransactionStreamLoader() : new DefaultStreamLoader();
        this.maxCacheBytes = properties.getMaxCacheBytes();
        this.maxWriteBlockCacheBytes = 2L * this.maxCacheBytes;
        this.scanningFrequency = properties.getScanningFrequency();
        this.loadStrategy = loadStrategy;
    }

    @Override
    public void init() {
        this.labelGeneratorFactory = new LabelGeneratorFactory.DefaultLabelGeneratorFactory(this.properties.getLabelPrefix());
        this.writeTriggerFlush = new AtomicBoolean(false);
        this.loadMetrics = new LoadMetrics();
        if (this.state.compareAndSet(State.INACTIVE, State.ACTIVE)) {
            this.manager = new Thread(() -> {
                Long lastPrintTimestamp = null;
                log.info("manager running, scanningFrequency : {}", (Object)this.scanningFrequency);
                while (true) {
                    TableRegion maxFlushRegion;
                    this.lock.lock();
                    try {
                        this.flushable.await(this.scanningFrequency, TimeUnit.MILLISECONDS);
                    }
                    catch (InterruptedException e) {
                        if (!this.savepoint) break;
                        this.savepoint = false;
                        LockSupport.unpark(this.current);
                        break;
                    }
                    finally {
                        this.lock.unlock();
                    }
                    if (lastPrintTimestamp == null || System.currentTimeMillis() - lastPrintTimestamp > 4999L) {
                        log.info("manager report, current Bytes : {},  waitQ : {}, prepareQ : {}, commitQ : {}", new Object[]{this.currentCacheBytes.get(), this.waitQ.size(), this.prepareQ.size(), this.commitQ.size()});
                        lastPrintTimestamp = System.currentTimeMillis();
                    }
                    Iterator iterator = this.waitQ.iterator();
                    while (iterator.hasNext()) {
                        TableRegion region = (TableRegion)iterator.next();
                        if (region.isReadable()) {
                            this.prepareQ.offer(region);
                            iterator.remove();
                            LOG.debug("Move table region {}.{} from waitQ to prepareQ", (Object)region.getDatabase(), (Object)region.getTable());
                            continue;
                        }
                        region.getAndIncrementAge();
                        LOG.debug("Increment age of table region {}.{} in waitQ", (Object)region.getDatabase(), (Object)region.getTable());
                    }
                    iterator = this.prepareQ.iterator();
                    while (iterator.hasNext()) {
                        TableRegion region = (TableRegion)iterator.next();
                        if (!region.testPrepare()) continue;
                        if (!region.isReadable()) {
                            region.cancel();
                            this.waitQ.offer(region);
                            iterator.remove();
                            LOG.debug("Move table region {}.{} from prepareQ to waitQ", (Object)region.getDatabase(), (Object)region.getTable());
                            continue;
                        }
                        if (!region.prepare()) continue;
                        this.commitQ.offer(region);
                        iterator.remove();
                        LOG.debug("Move table region {}.{} from prepareQ to commitQ", (Object)region.getDatabase(), (Object)region.getTable());
                    }
                    boolean flushingCommit = false;
                    if (this.savepoint) {
                        iterator = this.commitQ.iterator();
                        while (iterator.hasNext()) {
                            TableRegion region = (TableRegion)iterator.next();
                            region.getAndIncrementAge();
                            if (!region.flush()) continue;
                            this.waitQ.offer(region);
                            if (region.isFlushing()) {
                                flushingCommit = true;
                            }
                            iterator.remove();
                            LOG.debug("Move table region {}.{} from commitQ to waitQ because of savepoint", (Object)region.getDatabase(), (Object)region.getTable());
                        }
                    } else {
                        for (TableRegion region : this.loadStrategy.select(this.commitQ)) {
                            if (!region.flush()) continue;
                            this.waitQ.offer(region);
                            this.commitQ.remove(region);
                            if (region.isFlushing()) {
                                flushingCommit = true;
                            }
                            LOG.debug("Move table region {}.{} from commitQ to waitQ for normal", (Object)region.getDatabase(), (Object)region.getTable());
                        }
                    }
                    if (!flushingCommit && this.currentCacheBytes.get() >= this.maxCacheBytes && (maxFlushRegion = (TableRegion)this.commitQ.stream().max((r1, r2) -> {
                        if (r1.getFlushBytes() != r2.getFlushBytes()) {
                            return Long.compare(r2.getFlushBytes(), r1.getFlushBytes());
                        }
                        return Long.compare(r2.getCacheBytes(), r1.getCacheBytes());
                    }).orElse(null)) != null && maxFlushRegion.flush()) {
                        this.commitQ.remove(maxFlushRegion);
                        this.waitQ.offer(maxFlushRegion);
                        LOG.debug("Move table region {}.{} from commitQ to waitQ for max cache bytes", (Object)maxFlushRegion.getDatabase(), (Object)maxFlushRegion.getTable());
                    }
                    if (!this.savepoint) continue;
                    LockSupport.unpark(this.current);
                }
            }, "StarRocks-Sink-Manager");
            this.manager.setDaemon(true);
            this.manager.start();
            this.manager.setUncaughtExceptionHandler((t, ee) -> {
                log.error("StarRocks-Sink-Manager error", ee);
                this.e = ee;
            });
            log.info("StarRocks-Sink-Manager start, {}", (Object)EnvUtils.getGitInformation());
            this.streamLoader.start(this.properties, this);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void write(String uniqueKey, String database, String table, String ... rows) {
        TableRegion region = this.getCacheRegion(uniqueKey, database, table);
        for (String row : rows) {
            int bytes;
            long cachedBytes;
            this.AssertNotException();
            if (LOG.isTraceEnabled()) {
                LOG.trace("Write uniqueKey {}, database {}, table {}, row {}", new Object[]{uniqueKey == null ? "null" : uniqueKey, database, table, row});
            }
            if ((cachedBytes = this.currentCacheBytes.addAndGet(bytes = region.write(row.getBytes(StandardCharsets.UTF_8)))) >= this.maxWriteBlockCacheBytes) {
                this.lock.lock();
                try {
                    int idx = 0;
                    while (this.currentCacheBytes.get() >= this.maxWriteBlockCacheBytes) {
                        this.AssertNotException();
                        log.info("Cache full, wait flush, currentBytes: {}, maxWriteBlockCacheBytes: {}", (Object)this.currentCacheBytes.get(), (Object)this.maxWriteBlockCacheBytes);
                        this.flushable.signal();
                        this.writable.await(Math.min(++idx, 5), TimeUnit.SECONDS);
                    }
                    continue;
                }
                catch (InterruptedException ex) {
                    this.e = ex;
                    throw new RuntimeException(ex);
                }
                finally {
                    this.lock.unlock();
                }
            }
            if (cachedBytes < this.maxCacheBytes || !this.writeTriggerFlush.compareAndSet(false, true)) continue;
            this.lock.lock();
            try {
                this.flushable.signal();
            }
            finally {
                this.lock.unlock();
            }
            LOG.info("Trigger flush, currentBytes: {}, maxCacheBytes: {}", (Object)cachedBytes, (Object)this.maxCacheBytes);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void callback(StreamLoadResponse response) {
        long currentBytes;
        long l = currentBytes = response.getFlushBytes() != null ? this.currentCacheBytes.getAndAdd(-response.getFlushBytes().longValue()) : this.currentCacheBytes.get();
        if (response.getFlushRows() != null) {
            this.totalFlushRows.addAndGet(response.getFlushRows());
        }
        this.writeTriggerFlush.set(false);
        log.info("pre bytes : {}, current bytes : {}, totalFlushRows : {}", new Object[]{currentBytes, this.currentCacheBytes.get(), this.totalFlushRows.get()});
        this.lock.lock();
        try {
            this.writable.signal();
        }
        finally {
            this.lock.unlock();
        }
        if (response.getException() != null) {
            log.error("Stream load failed", (Throwable)response.getException());
            this.e = response.getException();
        }
        if (response.getBody() != null) {
            if (response.getBody().getNumberTotalRows() != null) {
                this.numberTotalRows.addAndGet(response.getBody().getNumberTotalRows());
            }
            if (response.getBody().getNumberLoadedRows() != null) {
                this.numberLoadRows.addAndGet(response.getBody().getNumberLoadedRows());
            }
        }
        if (response.getException() != null) {
            this.loadMetrics.updateFailedLoad();
        } else {
            this.loadMetrics.updateSuccessLoad(response);
        }
        if (LOG.isDebugEnabled()) {
            LOG.debug("{}", (Object)this.loadMetrics);
        }
    }

    @Override
    public void callback(Throwable e) {
        log.error("Stream load failed", e);
        this.e = e;
    }

    @Override
    public void flush() {
        log.info("Stream load manager flush");
        this.savepoint = true;
        this.current = Thread.currentThread();
        while (!this.check()) {
            this.AssertNotException();
            this.lock.lock();
            try {
                this.flushable.signal();
            }
            finally {
                this.lock.unlock();
            }
            LockSupport.park(this.current);
            if (!this.savepoint) break;
            try {
                for (TableRegion tableRegion : this.regions.values()) {
                    Future<?> result = tableRegion.getResult();
                    if (result == null) continue;
                    result.get();
                }
            }
            catch (InterruptedException | ExecutionException ex) {
                log.warn("Flush get result failed", (Throwable)ex);
            }
        }
        this.savepoint = false;
    }

    @Override
    public StreamLoadSnapshot snapshot() {
        StreamLoadSnapshot snapshot = StreamLoadSnapshot.snapshot(this.regions.values());
        for (TableRegion region : this.regions.values()) {
            region.setLabel(null);
        }
        return snapshot;
    }

    @Override
    public boolean prepare(StreamLoadSnapshot snapshot) {
        return this.streamLoader.prepare(snapshot);
    }

    @Override
    public boolean commit(StreamLoadSnapshot snapshot) {
        return this.streamLoader.commit(snapshot);
    }

    @Override
    public boolean abort(StreamLoadSnapshot snapshot) {
        return this.streamLoader.rollback(snapshot);
    }

    @Override
    public void close() {
        if (this.state.compareAndSet(State.ACTIVE, State.INACTIVE)) {
            log.info("Stream load manger close, current bytes : {}, flush rows : {}, numberTotalRows : {}, numberLoadRows : {}, loadMetrics: {}", new Object[]{this.currentCacheBytes.get(), this.totalFlushRows.get(), this.numberTotalRows.get(), this.numberLoadRows.get(), this.loadMetrics});
            this.manager.interrupt();
            this.streamLoader.close();
        }
    }

    private boolean check() {
        return this.currentCacheBytes.compareAndSet(0L, 0L);
    }

    private void AssertNotException() {
        if (this.e != null) {
            log.error("catch exception, wait rollback ", this.e);
            this.streamLoader.rollback(this.snapshot());
            this.close();
            throw new RuntimeException(this.e);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected TableRegion getCacheRegion(String uniqueKey, String database, String table) {
        TableRegion region;
        if (uniqueKey == null) {
            uniqueKey = StreamLoadUtils.getTableUniqueKey(database, table);
        }
        if ((region = this.regions.get(uniqueKey)) == null) {
            Map<String, TableRegion> map = this.regions;
            synchronized (map) {
                region = this.regions.get(uniqueKey);
                if (region == null) {
                    StreamLoadTableProperties tableProperties = this.properties.getTableProperties(uniqueKey, database, table);
                    LabelGenerator labelGenerator = this.labelGeneratorFactory.create(database, table);
                    region = new BatchTableRegion(uniqueKey, database, table, this, tableProperties, this.streamLoader, labelGenerator);
                    this.regions.put(uniqueKey, region);
                    this.waitQ.offer(region);
                }
            }
        }
        return region;
    }

    static boolean useBatchTableRegion(StreamLoadDataFormat format, boolean enableTransaction, StarRocksVersion version) {
        if (format instanceof StreamLoadDataFormat.CSVFormat) {
            return false;
        }
        if (enableTransaction) {
            return true;
        }
        if (version == null) {
            return false;
        }
        return version.getMajor() < 2 || version.getMajor() == 2 && version.getMinor() <= 2;
    }

    static enum State {
        ACTIVE,
        INACTIVE;

    }
}

