/*
 * Decompiled with CFR 0.152.
 */
package com.starrocks.data.load.stream;

import com.starrocks.data.load.stream.LabelGenerator;
import com.starrocks.data.load.stream.StreamLoadDataFormat;
import com.starrocks.data.load.stream.StreamLoadManager;
import com.starrocks.data.load.stream.StreamLoadResponse;
import com.starrocks.data.load.stream.StreamLoader;
import com.starrocks.data.load.stream.TableRegion;
import com.starrocks.data.load.stream.http.StreamLoadEntityMeta;
import com.starrocks.data.load.stream.properties.StreamLoadTableProperties;
import java.util.LinkedList;
import java.util.Queue;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class BatchTableRegion
implements TableRegion {
    private static final Logger log = LoggerFactory.getLogger(BatchTableRegion.class);
    private final StreamLoadManager manager;
    private final StreamLoader streamLoader;
    private final LabelGenerator labelGenerator;
    private final String uniqueKey;
    private final String database;
    private final String table;
    private final StreamLoadTableProperties properties;
    private final StreamLoadDataFormat dataFormat;
    private final AtomicLong age = new AtomicLong(0L);
    private final AtomicLong cacheBytes = new AtomicLong();
    private final AtomicLong flushBytes = new AtomicLong();
    private final AtomicLong flushRows = new AtomicLong();
    private final AtomicReference<State> state;
    private final AtomicBoolean ctl = new AtomicBoolean(false);
    private volatile Queue<byte[]> outBuffer = new LinkedList<byte[]>();
    private volatile Queue<byte[]> inBuffer;
    private volatile StreamLoadEntityMeta entityMeta;
    private volatile String label;
    private volatile Future<?> responseFuture;
    private volatile long lastWriteTimeMillis = Long.MAX_VALUE;
    private volatile boolean flushing;

    public BatchTableRegion(String uniqueKey, String database, String table, StreamLoadManager manager, StreamLoadTableProperties properties, StreamLoader streamLoader, LabelGenerator labelGenerator) {
        this.uniqueKey = uniqueKey;
        this.database = database;
        this.table = table;
        this.manager = manager;
        this.properties = properties;
        this.dataFormat = properties.getDataFormat();
        this.streamLoader = streamLoader;
        this.labelGenerator = labelGenerator;
        this.state = new AtomicReference<State>(State.ACTIVE);
    }

    @Override
    public StreamLoadTableProperties getProperties() {
        return this.properties;
    }

    @Override
    public String getUniqueKey() {
        return this.uniqueKey;
    }

    @Override
    public String getDatabase() {
        return this.database;
    }

    @Override
    public String getTable() {
        return this.table;
    }

    @Override
    public LabelGenerator getLabelGenerator() {
        return this.labelGenerator;
    }

    @Override
    public void setLabel(String label) {
        this.label = label;
    }

    @Override
    public String getLabel() {
        return this.label;
    }

    @Override
    public long getCacheBytes() {
        return this.cacheBytes.get();
    }

    @Override
    public long getFlushBytes() {
        return this.flushBytes.get();
    }

    @Override
    public StreamLoadEntityMeta getEntityMeta() {
        return this.entityMeta;
    }

    @Override
    public long getLastWriteTimeMillis() {
        return this.lastWriteTimeMillis;
    }

    @Override
    public void resetAge() {
        this.age.set(0L);
    }

    @Override
    public long getAndIncrementAge() {
        return this.age.getAndIncrement();
    }

    @Override
    public long getAge() {
        return this.age.get();
    }

    @Override
    public int write(byte[] row) {
        int c;
        if (row == null) {
            return 0;
        }
        if (this.ctl.compareAndSet(false, true)) {
            c = this.write0(row);
        } else {
            while (!this.ctl.compareAndSet(false, true)) {
            }
            c = this.write0(row);
        }
        this.ctl.set(false);
        return c;
    }

    protected int write0(byte[] row) {
        if (this.outBuffer == null) {
            this.outBuffer = new LinkedList<byte[]>();
        }
        this.outBuffer.offer(row);
        this.cacheBytes.addAndGet(row.length);
        this.lastWriteTimeMillis = System.currentTimeMillis();
        return row.length;
    }

    @Override
    public byte[] read() {
        if (this.flushRows.get() == this.entityMeta.getRows()) {
            this.flushing = false;
            return null;
        }
        byte[] row = this.inBuffer.poll();
        if (row == null) {
            this.flushing = false;
            return null;
        }
        if (!this.flushing) {
            this.flushing = true;
        }
        this.cacheBytes.addAndGet(-row.length);
        this.flushBytes.addAndGet(row.length);
        this.flushRows.incrementAndGet();
        return row;
    }

    @Override
    public boolean testPrepare() {
        return this.state.compareAndSet(State.ACTIVE, State.PREPARE);
    }

    @Override
    public boolean prepare() {
        return this.state.get() == State.PREPARE;
    }

    @Override
    public boolean flush() {
        if (this.state.compareAndSet(State.PREPARE, State.COMMIT)) {
            while (!this.ctl.compareAndSet(false, true)) {
            }
            log.info("uk : {}, label : {}, bytes : {} commit", new Object[]{this.uniqueKey, this.label, this.cacheBytes.get()});
            this.inBuffer = this.outBuffer;
            this.outBuffer = null;
            this.ctl.set(false);
            this.resetAge();
            this.streamLoad();
            return true;
        }
        return false;
    }

    @Override
    public boolean cancel() {
        return this.state.compareAndSet(State.PREPARE, State.ACTIVE);
    }

    @Override
    public void callback(StreamLoadResponse response) {
        this.manager.callback(response);
    }

    @Override
    public void fail(Throwable e) {
        this.manager.callback(e);
    }

    @Override
    public void complete(StreamLoadResponse response) {
        response.setFlushBytes(this.flushBytes.get());
        response.setFlushRows(this.flushRows.get());
        this.callback(response);
        log.info("Stream load flushed, label : {}", (Object)this.label);
        if (!this.inBuffer.isEmpty()) {
            log.info("Stream load continue");
            this.streamLoad();
            return;
        }
        if (this.state.compareAndSet(State.COMMIT, State.ACTIVE)) {
            log.info("Stream load completed");
        }
    }

    @Override
    public void setResult(Future<?> result) {
        this.responseFuture = result;
    }

    @Override
    public Future<?> getResult() {
        return this.responseFuture;
    }

    @Override
    public boolean isReadable() {
        return this.cacheBytes.get() > 0L;
    }

    @Override
    public boolean isFlushing() {
        return this.flushing;
    }

    protected void flip() {
        StreamLoadEntityMeta chunkMeta;
        this.flushBytes.set(0L);
        this.flushRows.set(0L);
        this.responseFuture = null;
        this.entityMeta = chunkMeta = this.genEntityMeta();
        log.info("total rows : {}, part rows : {}, part bytes : {}", new Object[]{this.inBuffer.size(), chunkMeta.getRows(), chunkMeta.getBytes()});
    }

    protected void streamLoad() {
        try {
            this.flip();
            this.setResult(this.streamLoader.send(this));
        }
        catch (Exception e) {
            this.fail(e);
        }
    }

    protected StreamLoadEntityMeta genEntityMeta() {
        int delimiter;
        long chunkBytes = 0L;
        long chunkRows = 0L;
        int n = delimiter = this.dataFormat.delimiter() == null ? 0 : this.dataFormat.delimiter().length;
        if (this.dataFormat.first() != null) {
            chunkBytes += (long)this.dataFormat.first().length;
        }
        if (this.dataFormat.end() != null) {
            chunkBytes += (long)this.dataFormat.end().length;
        }
        boolean first = true;
        for (byte[] bytes : this.inBuffer) {
            int d = first ? 0 : delimiter;
            first = false;
            if (chunkBytes + (long)d + (long)bytes.length > this.properties.getChunkLimit()) break;
            chunkBytes += (long)(bytes.length + d);
            ++chunkRows;
        }
        return new StreamLoadEntityMeta(chunkBytes, chunkRows);
    }

    static enum State {
        ACTIVE,
        PREPARE,
        COMMIT;

    }
}

