/*
 * Decompiled with CFR 0.152.
 */
package com.starrocks.data.load.stream.v2;

import com.starrocks.data.load.stream.Chunk;
import com.starrocks.data.load.stream.LabelGenerator;
import com.starrocks.data.load.stream.StreamLoadDataFormat;
import com.starrocks.data.load.stream.StreamLoadManager;
import com.starrocks.data.load.stream.StreamLoadResponse;
import com.starrocks.data.load.stream.StreamLoadSnapshot;
import com.starrocks.data.load.stream.StreamLoader;
import com.starrocks.data.load.stream.TableRegion;
import com.starrocks.data.load.stream.compress.CompressionCodec;
import com.starrocks.data.load.stream.compress.CompressionHttpEntity;
import com.starrocks.data.load.stream.exception.ErrorUtils;
import com.starrocks.data.load.stream.exception.StreamLoadFailException;
import com.starrocks.data.load.stream.http.StreamLoadEntityMeta;
import com.starrocks.data.load.stream.properties.StreamLoadTableProperties;
import com.starrocks.data.load.stream.v2.ChunkHttpEntity;
import com.starrocks.data.load.stream.v2.FlushReason;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.http.HttpEntity;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class TransactionTableRegion
implements TableRegion {
    private static final Logger LOG = LoggerFactory.getLogger(TransactionTableRegion.class);
    private final StreamLoadManager manager;
    private final StreamLoader streamLoader;
    private final LabelGenerator labelGenerator;
    private final String uniqueKey;
    private final String database;
    private final String table;
    private final StreamLoadTableProperties properties;
    private final Map<String, String> headers = new HashMap<String, String>();
    private final Optional<CompressionCodec> compressionCodec;
    private final AtomicLong age = new AtomicLong(0L);
    private final AtomicLong cacheBytes = new AtomicLong();
    private final AtomicLong cacheRows = new AtomicLong();
    private final AtomicReference<State> state;
    private final AtomicBoolean ctl = new AtomicBoolean(false);
    private volatile Chunk activeChunk;
    private final ConcurrentLinkedQueue<Chunk> inactiveChunks = new ConcurrentLinkedQueue();
    private volatile String label;
    private volatile Future<?> responseFuture;
    private volatile long lastCommitTimeMills;
    private final int maxRetries;
    private final int retryIntervalInMs;
    private volatile int numRetries;
    private volatile long lastFailTimeMs;
    private volatile Throwable firstException;

    public TransactionTableRegion(String uniqueKey, String database, String table, StreamLoadManager manager, StreamLoadTableProperties properties, StreamLoader streamLoader, LabelGenerator labelGenerator, int maxRetries, int retryIntervalInMs) {
        this.uniqueKey = uniqueKey;
        this.database = database;
        this.table = table;
        this.manager = manager;
        this.properties = properties;
        this.streamLoader = streamLoader;
        this.labelGenerator = labelGenerator;
        this.initHeaders(properties);
        this.compressionCodec = CompressionCodec.createCompressionCodec(properties.getDataFormat(), properties.getProperty("compression"), properties.getTableProperties());
        this.state = new AtomicReference<State>(State.ACTIVE);
        this.lastCommitTimeMills = System.currentTimeMillis();
        this.activeChunk = new Chunk(properties.getDataFormat());
        this.maxRetries = maxRetries;
        this.retryIntervalInMs = retryIntervalInMs;
        this.initHeaders(properties);
    }

    private void initHeaders(StreamLoadTableProperties properties) {
        this.headers.putAll(properties.getProperties());
        Optional<String> compressionType = properties.getProperty("compression");
        if (properties.getDataFormat() instanceof StreamLoadDataFormat.CSVFormat && compressionType.isPresent()) {
            if ("LZ4_FRAME".equalsIgnoreCase(compressionType.get())) {
                this.headers.put("format", "lz4");
            } else {
                throw new UnsupportedOperationException("CSV format does not support compression type: " + compressionType.get());
            }
        }
    }

    @Override
    public StreamLoadTableProperties getProperties() {
        return this.properties;
    }

    @Override
    public Map<String, String> getHeaders() {
        return this.headers;
    }

    @Override
    public String getUniqueKey() {
        return this.uniqueKey;
    }

    @Override
    public String getDatabase() {
        return this.database;
    }

    @Override
    public String getTable() {
        return this.table;
    }

    @Override
    public LabelGenerator getLabelGenerator() {
        return this.labelGenerator;
    }

    @Override
    public void setLabel(String label) {
        if (this.numRetries > 0 && label != null) {
            return;
        }
        this.label = label;
    }

    @Override
    public String getLabel() {
        return this.label;
    }

    @Override
    public long getCacheBytes() {
        return this.cacheBytes.get();
    }

    @Override
    public void resetAge() {
        this.age.set(0L);
    }

    @Override
    public long getAndIncrementAge() {
        return this.age.getAndIncrement();
    }

    @Override
    public long getAge() {
        return this.age.get();
    }

    @Override
    public int write(byte[] row) {
        int c;
        if (row == null) {
            return 0;
        }
        if (this.ctl.compareAndSet(false, true)) {
            c = this.write0(row);
        } else {
            while (!this.ctl.compareAndSet(false, true)) {
            }
            c = this.write0(row);
        }
        this.ctl.set(false);
        return c;
    }

    private void switchChunk() {
        if (this.activeChunk == null || this.activeChunk.numRows() == 0) {
            return;
        }
        this.inactiveChunks.add(this.activeChunk);
        this.activeChunk = new Chunk(this.properties.getDataFormat());
    }

    protected int write0(byte[] row) {
        if (this.activeChunk.estimateChunkSize(row) > this.properties.getChunkLimit() || this.activeChunk.numRows() >= this.properties.getMaxBufferRows()) {
            this.switchChunk();
        }
        this.activeChunk.addRow(row);
        this.cacheBytes.addAndGet(row.length);
        this.cacheRows.incrementAndGet();
        return row.length;
    }

    @Override
    public boolean isFlushing() {
        return this.state.get() == State.FLUSHING;
    }

    public FlushReason shouldFlush() {
        if (this.state.get() != State.ACTIVE) {
            return FlushReason.NONE;
        }
        return this.cacheRows.get() >= (long)this.properties.getMaxBufferRows() ? FlushReason.BUFFER_ROWS_REACH_LIMIT : FlushReason.NONE;
    }

    public boolean flush(FlushReason reason) {
        LOG.debug("Try to flush db: {}, table: {}, label: {}, cacheBytes: {}, cacheRows: {}, reason: {}", new Object[]{this.database, this.table, this.label, this.cacheBytes, this.cacheRows, reason});
        if (this.state.compareAndSet(State.ACTIVE, State.FLUSHING)) {
            while (!this.ctl.compareAndSet(false, true)) {
            }
            if (reason != FlushReason.BUFFER_ROWS_REACH_LIMIT || this.activeChunk.numRows() >= this.properties.getMaxBufferRows()) {
                this.switchChunk();
            }
            this.ctl.set(false);
            if (!this.inactiveChunks.isEmpty()) {
                LOG.info("Flush db: {}, table: {}, label: {}, cacheBytes: {}, cacheRows: {}, reason: {}", new Object[]{this.database, this.table, this.label, this.cacheBytes.get(), this.cacheRows.get(), reason});
                this.streamLoad(0);
                return true;
            }
            this.state.compareAndSet(State.FLUSHING, State.ACTIVE);
            return false;
        }
        return false;
    }

    public boolean commit() {
        LOG.debug("Try to commit, db: {}, table: {}, label: {}", new Object[]{this.database, this.table, this.label});
        boolean commitTriggered = false;
        if (!this.state.compareAndSet(State.ACTIVE, State.COMMITTING)) {
            if (this.state.get() != State.COMMITTING) {
                return false;
            }
            commitTriggered = true;
        }
        if (commitTriggered) {
            if (this.label == null) {
                this.state.compareAndSet(State.COMMITTING, State.ACTIVE);
                LOG.debug("Success to commit, db: {}, table: {}", (Object)this.database, (Object)this.table);
                return true;
            }
            return false;
        }
        if (this.label == null) {
            boolean commitSuccess = this.cacheBytes.get() == 0L;
            this.state.compareAndSet(State.COMMITTING, State.ACTIVE);
            if (commitSuccess) {
                LOG.debug("Success to commit, db: {}, table: {}", (Object)this.database, (Object)this.table);
            }
            return commitSuccess;
        }
        try {
            this.streamLoader.getExecutorService().submit(this::doCommit);
        }
        catch (Exception e) {
            LOG.error("Failed to submit commit task, db: {}, table: {}, label: {}", new Object[]{this.database, this.table, this.label, e});
            throw e;
        }
        return false;
    }

    private void doCommit() {
        StreamLoadSnapshot.Transaction transaction = new StreamLoadSnapshot.Transaction(this.database, this.table, this.label);
        try {
            if (!this.streamLoader.prepare(transaction)) {
                String errorMsg = "Failed to prepare transaction, please check taskmanager log for details, " + transaction;
                throw new StreamLoadFailException(errorMsg);
            }
            if (!this.streamLoader.commit(transaction)) {
                String errorMsg = "Failed to commit transaction, please check taskmanager log for details, " + transaction;
                throw new StreamLoadFailException(errorMsg);
            }
        }
        catch (Throwable e) {
            LOG.error("TransactionTableRegion commit failed, db: {}, table: {}, label: {}", new Object[]{this.database, this.table, this.label, e});
            this.fail(e);
        }
        long commitTime = System.currentTimeMillis();
        long commitDuration = commitTime - this.lastCommitTimeMills;
        this.lastCommitTimeMills = commitTime;
        this.label = null;
        LOG.info("Success to commit transaction: {}, duration: {} ms", (Object)transaction, (Object)commitDuration);
    }

    @Override
    public void fail(Throwable e) {
        if (this.firstException == null) {
            this.firstException = e;
        }
        if (this.numRetries >= this.maxRetries || !ErrorUtils.isRetryable(e)) {
            LOG.error("Failed to flush data for db: {}, table: {} after {} times retry, the last exception is", new Object[]{this.database, this.table, this.numRetries, e});
            this.manager.callback(this.firstException);
            return;
        }
        this.responseFuture = null;
        ++this.numRetries;
        this.lastFailTimeMs = System.currentTimeMillis();
        LOG.warn("Failed to flush data for db: {}, table: {}, and will retry for {} times after {} ms", new Object[]{this.database, this.table, this.numRetries, this.retryIntervalInMs, e});
        this.streamLoad(this.retryIntervalInMs);
    }

    @Override
    public void complete(StreamLoadResponse response) {
        Chunk chunk = (Chunk)this.inactiveChunks.remove();
        this.cacheBytes.addAndGet(-chunk.rowBytes());
        this.cacheRows.addAndGet(-chunk.numRows());
        response.setFlushBytes(chunk.rowBytes());
        response.setFlushRows(chunk.numRows());
        this.manager.callback(response);
        this.numRetries = 0;
        this.firstException = null;
        if (!this.inactiveChunks.isEmpty()) {
            LOG.info("Stream load continue, db: {}, table: {}, label: {}, cacheBytes: {}, cacheRows: {}", new Object[]{this.database, this.table, this.label, this.cacheBytes, this.cacheRows});
            this.streamLoad(0);
            return;
        }
        if (this.state.compareAndSet(State.FLUSHING, State.ACTIVE)) {
            LOG.info("Stream load completed, db: {}, table: {}, label: {}, cacheBytes: {}, cacheRows: {}", new Object[]{this.database, this.table, this.label, this.cacheBytes, this.cacheRows});
        }
    }

    @Override
    public Future<?> getResult() {
        return this.responseFuture;
    }

    protected void streamLoad(int delayMs) {
        try {
            Chunk chunk = this.inactiveChunks.peek();
            LOG.info("Stream load chunk, db: {}, table: {}, numRows: {}, rowBytes: {}, chunkBytes: {}", new Object[]{this.database, this.table, chunk.numRows(), chunk.rowBytes(), chunk.chunkBytes()});
            this.responseFuture = this.streamLoader.send(this, delayMs);
        }
        catch (Exception e) {
            this.fail(e);
        }
    }

    @Override
    public HttpEntity getHttpEntity() {
        ChunkHttpEntity entity = new ChunkHttpEntity(this.uniqueKey, this.inactiveChunks.peek());
        return this.compressionCodec.map(codec -> new CompressionHttpEntity(entity, (CompressionCodec)codec)).orElse((HttpEntity)entity);
    }

    @Override
    public long getLastWriteTimeMillis() {
        throw new UnsupportedOperationException();
    }

    @Override
    public void setResult(Future<?> result) {
        throw new UnsupportedOperationException();
    }

    @Override
    public void callback(StreamLoadResponse response) {
        throw new UnsupportedOperationException();
    }

    @Override
    public long getFlushBytes() {
        throw new UnsupportedOperationException();
    }

    @Override
    public byte[] read() {
        throw new UnsupportedOperationException();
    }

    @Override
    public StreamLoadEntityMeta getEntityMeta() {
        throw new UnsupportedOperationException();
    }

    @Override
    public boolean testPrepare() {
        throw new UnsupportedOperationException();
    }

    @Override
    public boolean prepare() {
        throw new UnsupportedOperationException();
    }

    @Override
    public boolean cancel() {
        throw new UnsupportedOperationException();
    }

    @Override
    public boolean isReadable() {
        throw new UnsupportedOperationException();
    }

    @Override
    public boolean flush() {
        throw new UnsupportedOperationException();
    }

    static enum State {
        ACTIVE,
        FLUSHING,
        COMMITTING;

    }
}

