/*
 * Decompiled with CFR 0.152.
 */
package com.starrocks.data.load.stream;

import com.starrocks.data.load.stream.LabelGenerator;
import com.starrocks.data.load.stream.StreamLoadDataFormat;
import com.starrocks.data.load.stream.StreamLoadManager;
import com.starrocks.data.load.stream.StreamLoadResponse;
import com.starrocks.data.load.stream.StreamLoader;
import com.starrocks.data.load.stream.TableRegion;
import com.starrocks.data.load.stream.http.StreamLoadEntityMeta;
import com.starrocks.data.load.stream.properties.StreamLoadTableProperties;
import java.io.Serializable;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedTransferQueue;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class StreamTableRegion
implements TableRegion,
Serializable {
    private static final Logger log = LoggerFactory.getLogger(StreamTableRegion.class);
    private static final byte[] END_STREAM = new byte[0];
    private final StreamLoader streamLoader;
    private final LabelGenerator labelGenerator;
    private final String uniqueKey;
    private final String database;
    private final String table;
    private final StreamLoadManager manager;
    private final StreamLoadTableProperties properties;
    private final long chunkLimit;
    private final StreamLoadDataFormat dataFormat;
    private final AtomicLong age = new AtomicLong(0L);
    private final BlockingQueue<byte[]> buffer = new LinkedTransferQueue<byte[]>();
    private final AtomicLong cacheBytes = new AtomicLong();
    private final AtomicLong flushBytes = new AtomicLong();
    private final AtomicLong flushRows = new AtomicLong();
    private final AtomicReference<State> state;
    private volatile String label;
    private volatile Future<?> responseFuture;
    private volatile long lastWriteTimeMillis = Long.MAX_VALUE;
    private volatile boolean flushing;
    private final AtomicLong totalFlushBytes = new AtomicLong();
    private volatile boolean endStream;
    private volatile byte[] next;

    public StreamTableRegion(String uniqueKey, String database, String table, StreamLoadManager manager, StreamLoadTableProperties properties, StreamLoader streamLoader, LabelGenerator labelGenerator) {
        this.uniqueKey = uniqueKey;
        this.database = database;
        this.table = table;
        this.manager = manager;
        this.properties = properties;
        this.dataFormat = properties.getDataFormat();
        this.chunkLimit = properties.getChunkLimit();
        this.streamLoader = streamLoader;
        this.labelGenerator = labelGenerator;
        this.state = new AtomicReference<State>(State.ACTIVE);
    }

    @Override
    public StreamLoadTableProperties getProperties() {
        return this.properties;
    }

    @Override
    public String getUniqueKey() {
        return this.uniqueKey;
    }

    @Override
    public String getDatabase() {
        return this.database;
    }

    @Override
    public String getTable() {
        return this.table;
    }

    @Override
    public LabelGenerator getLabelGenerator() {
        return this.labelGenerator;
    }

    @Override
    public void setLabel(String label) {
        this.label = label;
    }

    @Override
    public String getLabel() {
        return this.label;
    }

    @Override
    public long getCacheBytes() {
        return this.cacheBytes.get();
    }

    @Override
    public long getFlushBytes() {
        return this.flushBytes.get();
    }

    @Override
    public StreamLoadEntityMeta getEntityMeta() {
        return StreamLoadEntityMeta.CHUNK_ENTITY_META;
    }

    @Override
    public long getLastWriteTimeMillis() {
        return this.lastWriteTimeMillis;
    }

    @Override
    public void resetAge() {
        this.age.set(0L);
    }

    @Override
    public long getAndIncrementAge() {
        return this.age.getAndIncrement();
    }

    @Override
    public long getAge() {
        return this.age.get();
    }

    @Override
    public int write(byte[] row) {
        try {
            this.buffer.put(row);
            if (row != END_STREAM) {
                this.cacheBytes.addAndGet(row.length);
                this.lastWriteTimeMillis = System.currentTimeMillis();
            } else {
                log.info("Write EOF");
            }
            return row.length;
        }
        catch (InterruptedException interruptedException) {
            return 0;
        }
    }

    @Override
    public byte[] read() {
        if (this.state.get() != State.ACTIVE) {
            if (!this.flushing) {
                this.flushing = true;
            }
            try {
                int delimiterL;
                byte[] row = this.next == null ? this.buffer.take() : this.next;
                if (row == END_STREAM) {
                    this.endStream = true;
                    this.flushing = false;
                    log.info("Read EOF");
                    return null;
                }
                int n = delimiterL = this.dataFormat.delimiter() == null ? 0 : this.dataFormat.delimiter().length;
                if (this.totalFlushBytes.get() + (long)row.length + (long)delimiterL > this.chunkLimit) {
                    this.next = row;
                    this.flushing = false;
                    log.info("Read part EOF");
                    return null;
                }
                this.next = null;
                this.totalFlushBytes.addAndGet(row.length + delimiterL);
                this.cacheBytes.addAndGet(-row.length);
                this.flushBytes.addAndGet(row.length);
                this.flushRows.incrementAndGet();
                return row;
            }
            catch (InterruptedException e) {
                log.info("read queue interrupted, msg : {}", (Object)e.getMessage());
            }
        }
        return null;
    }

    protected void flip() {
        this.flushBytes.set(0L);
        this.flushRows.set(0L);
        this.responseFuture = null;
        int initSize = (this.dataFormat.first() == null ? 0 : this.dataFormat.first().length) + (this.dataFormat.end() == null ? 0 : this.dataFormat.end().length) - (this.dataFormat.delimiter() == null ? 0 : this.dataFormat.delimiter().length);
        this.totalFlushBytes.set(initSize);
        this.endStream = false;
    }

    @Override
    public boolean testPrepare() {
        return this.state.compareAndSet(State.ACTIVE, State.PREPARE);
    }

    @Override
    public boolean prepare() {
        if (this.streamLoad()) {
            return true;
        }
        this.cancel();
        return false;
    }

    @Override
    public boolean flush() {
        if (this.state.compareAndSet(State.PREPARE, State.COMMIT)) {
            this.write(END_STREAM);
            this.resetAge();
            log.info("uk : {}, label : {} commit", (Object)this.uniqueKey, (Object)this.label);
            return true;
        }
        return false;
    }

    @Override
    public boolean cancel() {
        return this.state.compareAndSet(State.PREPARE, State.ACTIVE);
    }

    @Override
    public void callback(StreamLoadResponse response) {
        this.manager.callback(response);
    }

    @Override
    public void fail(Throwable e) {
        this.manager.callback(e);
    }

    @Override
    public void complete(StreamLoadResponse response) {
        response.setFlushBytes(this.flushBytes.get());
        response.setFlushRows(this.flushRows.get());
        this.callback(response);
        log.info("Stream load flushed, label : {}", (Object)this.label);
        if (!this.endStream) {
            log.info("Stream load continue");
            this.streamLoad();
            return;
        }
        if (this.state.compareAndSet(State.COMMIT, State.ACTIVE)) {
            log.info("Stream load completed");
        }
    }

    @Override
    public void setResult(Future<?> result) {
        this.responseFuture = result;
    }

    @Override
    public Future<?> getResult() {
        return this.responseFuture;
    }

    @Override
    public boolean isReadable() {
        return this.cacheBytes.get() > 0L;
    }

    @Override
    public boolean isFlushing() {
        return this.flushing;
    }

    protected boolean streamLoad() {
        try {
            this.flip();
            this.setResult(this.streamLoader.send(this));
            return true;
        }
        catch (Exception e) {
            this.fail(e);
            return false;
        }
    }

    static enum State {
        ACTIVE,
        PREPARE,
        COMMIT;

    }
}

