/*
 * Decompiled with CFR 0.152.
 */
package com.starrocks.data.load.stream.v2;

import com.starrocks.data.load.stream.DefaultStreamLoader;
import com.starrocks.data.load.stream.EnvUtils;
import com.starrocks.data.load.stream.LabelGenerator;
import com.starrocks.data.load.stream.LabelGeneratorFactory;
import com.starrocks.data.load.stream.LoadMetrics;
import com.starrocks.data.load.stream.StreamLoadManager;
import com.starrocks.data.load.stream.StreamLoadResponse;
import com.starrocks.data.load.stream.StreamLoadSnapshot;
import com.starrocks.data.load.stream.StreamLoadUtils;
import com.starrocks.data.load.stream.StreamLoader;
import com.starrocks.data.load.stream.TableRegion;
import com.starrocks.data.load.stream.TransactionStreamLoader;
import com.starrocks.data.load.stream.properties.StreamLoadProperties;
import com.starrocks.data.load.stream.properties.StreamLoadTableProperties;
import com.starrocks.data.load.stream.v2.FlushAndCommitStrategy;
import com.starrocks.data.load.stream.v2.FlushReason;
import com.starrocks.data.load.stream.v2.StreamLoadListener;
import com.starrocks.data.load.stream.v2.TransactionTableRegion;
import java.io.Serializable;
import java.nio.charset.StandardCharsets;
import java.util.Map;
import java.util.Queue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.LockSupport;
import java.util.concurrent.locks.ReentrantLock;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class StreamLoadManagerV2
implements StreamLoadManager,
Serializable {
    private static final Logger LOG = LoggerFactory.getLogger(StreamLoadManagerV2.class);
    private static final long serialVersionUID = 1L;
    private final StreamLoadProperties properties;
    private final boolean enableAutoCommit;
    private final StreamLoader streamLoader;
    private final int maxRetries;
    private final int retryIntervalInMs;
    private final long maxCacheBytes;
    private final long maxWriteBlockCacheBytes;
    private final Map<String, TableRegion> regions = new ConcurrentHashMap<String, TableRegion>();
    private final AtomicLong currentCacheBytes = new AtomicLong(0L);
    private final AtomicLong totalFlushRows = new AtomicLong(0L);
    private final AtomicLong numberTotalRows = new AtomicLong(0L);
    private final AtomicLong numberLoadRows = new AtomicLong(0L);
    private final FlushAndCommitStrategy flushAndCommitStrategy;
    private final long scanningFrequency;
    private Thread current;
    private Thread manager;
    private volatile boolean savepoint = false;
    private volatile boolean allRegionsCommitted;
    private final Lock lock = new ReentrantLock();
    private final Condition writable = this.lock.newCondition();
    private final Condition flushable = this.lock.newCondition();
    private final AtomicReference<State> state = new AtomicReference<State>(State.INACTIVE);
    private volatile Throwable e;
    private final Queue<TransactionTableRegion> flushQ = new ConcurrentLinkedQueue<TransactionTableRegion>();
    private transient AtomicBoolean writeTriggerFlush;
    private transient LoadMetrics loadMetrics;
    private transient StreamLoadListener streamLoadListener;
    private transient LabelGeneratorFactory labelGeneratorFactory;

    public StreamLoadManagerV2(StreamLoadProperties properties, boolean enableAutoCommit) {
        this.properties = properties;
        if (!enableAutoCommit && !properties.isEnableTransaction()) {
            throw new IllegalArgumentException("You must use transaction stream load if not enable auto-commit");
        }
        this.enableAutoCommit = enableAutoCommit;
        if (!enableAutoCommit) {
            this.streamLoader = new TransactionStreamLoader();
            this.maxRetries = 0;
            this.retryIntervalInMs = 0;
        } else {
            this.streamLoader = properties.getMaxRetries() > 0 || !properties.isEnableTransaction() ? new DefaultStreamLoader() : new TransactionStreamLoader();
            this.maxRetries = properties.getMaxRetries();
            this.retryIntervalInMs = properties.getRetryIntervalInMs();
        }
        this.maxCacheBytes = properties.getMaxCacheBytes();
        this.maxWriteBlockCacheBytes = 2L * this.maxCacheBytes;
        this.scanningFrequency = properties.getScanningFrequency();
        this.flushAndCommitStrategy = new FlushAndCommitStrategy(properties, enableAutoCommit);
    }

    @Override
    public void init() {
        if (this.labelGeneratorFactory == null) {
            this.labelGeneratorFactory = new LabelGeneratorFactory.DefaultLabelGeneratorFactory(this.properties.getLabelPrefix());
        }
        this.writeTriggerFlush = new AtomicBoolean(false);
        this.loadMetrics = new LoadMetrics();
        if (this.state.compareAndSet(State.INACTIVE, State.ACTIVE)) {
            this.manager = new Thread(this::lambda$init$0, "StarRocks-Sink-Manager");
            this.manager.setDaemon(true);
            this.manager.start();
            this.manager.setUncaughtExceptionHandler((t, ee) -> {
                LOG.error("StarRocks-Sink-Manager error", ee);
                this.e = ee;
            });
            LOG.info("StarRocks-Sink-Manager start, enableAutoCommit: {}, streamLoader: {}, {}", new Object[]{this.enableAutoCommit, this.streamLoader.getClass().getName(), EnvUtils.getGitInformation()});
            this.streamLoader.start(this.properties, this);
        }
    }

    public void setStreamLoadListener(StreamLoadListener streamLoadListener) {
        this.streamLoadListener = streamLoadListener;
    }

    public void setLabelGeneratorFactory(LabelGeneratorFactory labelGeneratorFactory) {
        this.labelGeneratorFactory = labelGeneratorFactory;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void write(String uniqueKey, String database, String table, String ... rows) {
        TableRegion region = this.getCacheRegion(uniqueKey, database, table);
        for (String row : rows) {
            int bytes;
            long cachedBytes;
            this.AssertNotException();
            if (LOG.isTraceEnabled()) {
                LOG.trace("Write uniqueKey {}, database {}, table {}, row {}", new Object[]{uniqueKey == null ? "null" : uniqueKey, database, table, row});
            }
            if ((cachedBytes = this.currentCacheBytes.addAndGet(bytes = region.write(row.getBytes(StandardCharsets.UTF_8)))) >= this.maxWriteBlockCacheBytes) {
                long startTime = System.nanoTime();
                this.lock.lock();
                try {
                    int idx = 0;
                    while (this.currentCacheBytes.get() >= this.maxWriteBlockCacheBytes) {
                        this.AssertNotException();
                        LOG.info("Cache full, wait flush, currentBytes: {}, maxWriteBlockCacheBytes: {}", (Object)this.currentCacheBytes.get(), (Object)this.maxWriteBlockCacheBytes);
                        this.flushable.signal();
                        this.writable.await(Math.min(++idx, 5), TimeUnit.SECONDS);
                    }
                }
                catch (InterruptedException ex) {
                    this.e = ex;
                    throw new RuntimeException(ex);
                }
                finally {
                    this.lock.unlock();
                }
                this.loadMetrics.updateWriteBlock(1, System.nanoTime() - startTime);
                continue;
            }
            if (cachedBytes < this.maxCacheBytes || !this.writeTriggerFlush.compareAndSet(false, true)) continue;
            this.lock.lock();
            try {
                this.flushable.signal();
            }
            finally {
                this.lock.unlock();
            }
            this.loadMetrics.updateWriteTriggerFlush(1);
            LOG.info("Trigger flush, currentBytes: {}, maxCacheBytes: {}", (Object)cachedBytes, (Object)this.maxCacheBytes);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void callback(StreamLoadResponse response) {
        long cacheByteBeforeFlush;
        long l = cacheByteBeforeFlush = response.getFlushBytes() != null ? this.currentCacheBytes.getAndAdd(-response.getFlushBytes().longValue()) : this.currentCacheBytes.get();
        if (response.getFlushRows() != null) {
            this.totalFlushRows.addAndGet(response.getFlushRows());
        }
        this.writeTriggerFlush.set(false);
        LOG.info("Receive load response, cacheByteBeforeFlush: {}, currentCacheBytes: {}, totalFlushRows : {}", new Object[]{cacheByteBeforeFlush, this.currentCacheBytes.get(), this.totalFlushRows.get()});
        this.lock.lock();
        try {
            this.writable.signal();
        }
        finally {
            this.lock.unlock();
        }
        if (response.getException() != null) {
            LOG.error("Stream load failed", (Throwable)response.getException());
            this.e = response.getException();
        }
        if (response.getBody() != null) {
            if (response.getBody().getNumberTotalRows() != null) {
                this.numberTotalRows.addAndGet(response.getBody().getNumberTotalRows());
            }
            if (response.getBody().getNumberLoadedRows() != null) {
                this.numberLoadRows.addAndGet(response.getBody().getNumberLoadedRows());
            }
        }
        if (response.getException() != null) {
            this.loadMetrics.updateFailedLoad();
        } else {
            this.loadMetrics.updateSuccessLoad(response);
        }
        if (this.streamLoadListener != null) {
            this.streamLoadListener.onResponse(response);
        }
        if (LOG.isDebugEnabled()) {
            LOG.debug("{}", (Object)this.loadMetrics);
        }
    }

    @Override
    public void callback(Throwable e) {
        LOG.error("Stream load failed", e);
        this.e = e;
    }

    public Throwable getException() {
        return this.e;
    }

    @Override
    public void flush() {
        LOG.info("Stream load manager flush");
        this.savepoint = true;
        this.allRegionsCommitted = false;
        this.current = Thread.currentThread();
        while (!this.isSavepointFinished()) {
            this.AssertNotException();
            this.lock.lock();
            try {
                this.flushable.signal();
            }
            finally {
                this.lock.unlock();
            }
            LockSupport.park(this.current);
            if (!this.savepoint) break;
            try {
                for (TableRegion tableRegion : this.regions.values()) {
                    Future<?> result = tableRegion.getResult();
                    if (result == null) continue;
                    result.get();
                }
            }
            catch (InterruptedException | ExecutionException ex) {
                LOG.warn("Flush get result failed", (Throwable)ex);
            }
        }
        this.savepoint = false;
    }

    @Override
    public StreamLoadSnapshot snapshot() {
        StreamLoadSnapshot snapshot = StreamLoadSnapshot.snapshot(this.regions.values());
        for (TableRegion region : this.regions.values()) {
            region.setLabel(null);
        }
        return snapshot;
    }

    public StreamLoader getStreamLoader() {
        return this.streamLoader;
    }

    @Override
    public boolean prepare(StreamLoadSnapshot snapshot) {
        return this.streamLoader.prepare(snapshot);
    }

    @Override
    public boolean commit(StreamLoadSnapshot snapshot) {
        return this.streamLoader.commit(snapshot);
    }

    @Override
    public boolean abort(StreamLoadSnapshot snapshot) {
        return this.streamLoader.rollback(snapshot);
    }

    @Override
    public void close() {
        if (this.state.compareAndSet(State.ACTIVE, State.INACTIVE)) {
            LOG.info("StreamLoadManagerV2 close, loadMetrics: {}, flushAndCommit: {}", (Object)this.loadMetrics, (Object)this.flushAndCommitStrategy);
            this.manager.interrupt();
            this.streamLoader.close();
        }
    }

    private boolean isSavepointFinished() {
        return this.currentCacheBytes.compareAndSet(0L, 0L) && (!this.enableAutoCommit || this.allRegionsCommitted);
    }

    private void AssertNotException() {
        if (this.e != null) {
            LOG.error("catch exception, wait rollback ", this.e);
            this.streamLoader.rollback(this.snapshot());
            this.close();
            throw new RuntimeException(this.e);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected TableRegion getCacheRegion(String uniqueKey, String database, String table) {
        TableRegion region;
        if (uniqueKey == null) {
            uniqueKey = StreamLoadUtils.getTableUniqueKey(database, table);
        }
        if ((region = this.regions.get(uniqueKey)) == null) {
            Map<String, TableRegion> map = this.regions;
            synchronized (map) {
                region = this.regions.get(uniqueKey);
                if (region == null) {
                    StreamLoadTableProperties tableProperties = this.properties.getTableProperties(uniqueKey, database, table);
                    LabelGenerator labelGenerator = this.labelGeneratorFactory.create(database, table);
                    region = new TransactionTableRegion(uniqueKey, database, table, this, tableProperties, this.streamLoader, labelGenerator, this.maxRetries, this.retryIntervalInMs);
                    this.regions.put(uniqueKey, region);
                    this.flushQ.offer((TransactionTableRegion)region);
                }
            }
        }
        return region;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     * Could not resolve type clashes
     * Unable to fully structure code
     */
    private /* synthetic */ void lambda$init$0() {
        lastPrintTimestamp = -1L;
        StreamLoadManagerV2.LOG.info("manager running, scanningFrequency : {}", (Object)this.scanningFrequency);
        block5: while (true) {
            this.lock.lock();
            try {
                this.flushable.await(this.scanningFrequency, TimeUnit.MILLISECONDS);
            }
            catch (InterruptedException e) {
                if (!this.savepoint) break;
                this.savepoint = false;
                LockSupport.unpark(this.current);
                break;
            }
            finally {
                this.lock.unlock();
            }
            if (lastPrintTimestamp == -1L || System.currentTimeMillis() - lastPrintTimestamp > 10000L) {
                lastPrintTimestamp = System.currentTimeMillis();
                StreamLoadManagerV2.LOG.debug("Audit information: {}, {}", (Object)this.loadMetrics, (Object)this.flushAndCommitStrategy);
            }
            if (this.savepoint) {
                for (Object region : this.flushQ) {
                    flush = region.flush(FlushReason.FORCE);
                    StreamLoadManagerV2.LOG.debug("Trigger flush table region {} because of savepoint, region cache bytes: {}, flush: {}", new Object[]{region.getUniqueKey(), region.getCacheBytes(), flush});
                }
                if (this.enableAutoCommit) {
                    committedRegions = 0;
                    region = this.flushQ.iterator();
                    while (region.hasNext()) {
                        region = (TransactionTableRegion)region.next();
                        success = region.commit();
                        if (success && region.getCacheBytes() == 0L) {
                            ++committedRegions;
                            region.resetAge();
                        }
                        StreamLoadManagerV2.LOG.debug("Commit region {} for savepoint, success: {}", (Object)region.getUniqueKey(), (Object)success);
                    }
                    if (committedRegions == this.flushQ.size()) {
                        this.allRegionsCommitted = true;
                        StreamLoadManagerV2.LOG.info("All regions committed for savepoint, number of regions: {}", (Object)committedRegions);
                    } else {
                        StreamLoadManagerV2.LOG.debug("Some regions not committed for savepoint, expected num: {}, actual num: {}", (Object)this.flushQ.size(), (Object)committedRegions);
                    }
                }
                LockSupport.unpark(this.current);
                continue;
            }
            for (Object region : this.flushQ) {
                region.getAndIncrementAge();
                if (!this.flushAndCommitStrategy.shouldCommit((TableRegion)region)) continue;
                success = region.commit();
                if (success) {
                    region.resetAge();
                }
                StreamLoadManagerV2.LOG.debug("Commit region {} for normal, success: {}", (Object)region.getUniqueKey(), (Object)success);
            }
            var3_6 = this.flushAndCommitStrategy.selectFlushRegions(this.flushQ, this.currentCacheBytes.get()).iterator();
            while (true) {
                if (var3_6.hasNext()) ** break;
                continue block5;
                result = (FlushAndCommitStrategy.SelectFlushResult)var3_6.next();
                region = result.getRegion();
                flush = region.flush(result.getReason());
                StreamLoadManagerV2.LOG.debug("Trigger flush table region {} because of selection, region cache bytes: {}, flush: {}", new Object[]{region.getUniqueKey(), region.getCacheBytes(), flush});
            }
            break;
        }
    }

    static enum State {
        ACTIVE,
        INACTIVE;

    }
}

