/*
 * Decompiled with CFR 0.152.
 */
package com.starrocks.connector.flink.manager;

import com.starrocks.connector.flink.connection.StarRocksJdbcConnectionOptions;
import com.starrocks.connector.flink.connection.StarRocksJdbcConnectionProvider;
import com.starrocks.connector.flink.manager.StarRocksQueryVisitor;
import com.starrocks.connector.flink.manager.StarRocksStreamLoadVisitor;
import com.starrocks.connector.flink.table.StarRocksSinkOptions;
import com.starrocks.connector.flink.table.StarRocksSinkSemantic;
import java.io.IOException;
import java.io.Serializable;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.calcite.shaded.com.google.common.base.Strings;
import org.apache.flink.calcite.shaded.com.google.common.collect.Lists;
import org.apache.flink.metrics.Counter;
import org.apache.flink.metrics.Histogram;
import org.apache.flink.runtime.metrics.DescriptiveStatisticsHistogram;
import org.apache.flink.runtime.util.ExecutorThreadFactory;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.api.constraints.UniqueConstraint;
import org.apache.flink.table.types.logical.LogicalTypeRoot;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class StarRocksSinkManager
implements Serializable {
    private static final long serialVersionUID = 1L;
    private static final Logger LOG = LoggerFactory.getLogger(StarRocksSinkManager.class);
    private final StarRocksJdbcConnectionProvider jdbcConnProvider;
    private final StarRocksQueryVisitor starrocksQueryVisitor;
    private final StarRocksStreamLoadVisitor starrocksStreamLoadVisitor;
    private final StarRocksSinkOptions sinkOptions;
    private final Map<String, List<LogicalTypeRoot>> typesMap;
    final LinkedBlockingDeque<Tuple3<String, Long, ArrayList<byte[]>>> flushQueue = new LinkedBlockingDeque(1);
    private transient Counter totalFlushBytes;
    private transient Counter totalFlushRows;
    private transient Counter totalFlushTime;
    private transient Counter totalFlushTimeWithoutRetries;
    private transient Counter totalFlushSucceededTimes;
    private transient Counter totalFlushFailedTimes;
    private transient Histogram flushTimeNs;
    private transient Histogram offerTimeNs;
    private transient Counter totalFilteredRows;
    private transient Histogram commitAndPublishTimeMs;
    private transient Histogram streamLoadPutTimeMs;
    private transient Histogram readDataTimeMs;
    private transient Histogram writeDataTimeMs;
    private transient Histogram loadTimeMs;
    private static final String COUNTER_TOTAL_FLUSH_BYTES = "totalFlushBytes";
    private static final String COUNTER_TOTAL_FLUSH_ROWS = "totalFlushRows";
    private static final String COUNTER_TOTAL_FLUSH_COST_TIME_WITHOUT_RETRIES = "totalFlushTimeNsWithoutRetries";
    private static final String COUNTER_TOTAL_FLUSH_COST_TIME = "totalFlushTimeNs";
    private static final String COUNTER_TOTAL_FLUSH_SUCCEEDED_TIMES = "totalFlushSucceededTimes";
    private static final String COUNTER_TOTAL_FLUSH_FAILED_TIMES = "totalFlushFailedTimes";
    private static final String HISTOGRAM_FLUSH_TIME = "flushTimeNs";
    private static final String HISTOGRAM_OFFER_TIME_NS = "offerTimeNs";
    private static final String COUNTER_NUMBER_FILTERED_ROWS = "totalFilteredRows";
    private static final String HISTOGRAM_COMMIT_AND_PUBLISH_TIME_MS = "commitAndPublishTimeMs";
    private static final String HISTOGRAM_STREAM_LOAD_PUT_TIME_MS = "streamLoadPutTimeMs";
    private static final String HISTOGRAM_READ_DATA_TIME_MS = "readDataTimeMs";
    private static final String HISTOGRAM_WRITE_DATA_TIME_MS = "writeDataTimeMs";
    private static final String HISTOGRAM_LOAD_TIME_MS = "loadTimeMs";
    static final String EOF = "EOF";
    private final ArrayList<byte[]> buffer = new ArrayList();
    private int batchCount = 0;
    private long batchSize = 0L;
    volatile boolean closed = false;
    volatile boolean flushThreadAlive = false;
    private volatile Throwable flushException;
    private ScheduledExecutorService scheduler;
    private ScheduledFuture<?> scheduledFuture;

    public StarRocksSinkManager(StarRocksSinkOptions sinkOptions, TableSchema flinkSchema) {
        this.sinkOptions = sinkOptions;
        StarRocksJdbcConnectionOptions jdbcOptions = new StarRocksJdbcConnectionOptions(sinkOptions.getJdbcUrl(), sinkOptions.getUsername(), sinkOptions.getPassword());
        this.jdbcConnProvider = new StarRocksJdbcConnectionProvider(jdbcOptions);
        this.starrocksQueryVisitor = new StarRocksQueryVisitor(this.jdbcConnProvider, sinkOptions.getDatabaseName(), sinkOptions.getTableName());
        this.typesMap = new HashMap<String, List<LogicalTypeRoot>>();
        this.typesMap.put("bigint", Lists.newArrayList((Object[])new LogicalTypeRoot[]{LogicalTypeRoot.BIGINT, LogicalTypeRoot.INTEGER, LogicalTypeRoot.BINARY}));
        this.typesMap.put("largeint", Lists.newArrayList((Object[])new LogicalTypeRoot[]{LogicalTypeRoot.DECIMAL, LogicalTypeRoot.BIGINT, LogicalTypeRoot.INTEGER, LogicalTypeRoot.BINARY}));
        this.typesMap.put("char", Lists.newArrayList((Object[])new LogicalTypeRoot[]{LogicalTypeRoot.CHAR, LogicalTypeRoot.VARCHAR}));
        this.typesMap.put("date", Lists.newArrayList((Object[])new LogicalTypeRoot[]{LogicalTypeRoot.DATE, LogicalTypeRoot.VARCHAR}));
        this.typesMap.put("datetime", Lists.newArrayList((Object[])new LogicalTypeRoot[]{LogicalTypeRoot.TIMESTAMP_WITHOUT_TIME_ZONE, LogicalTypeRoot.VARCHAR}));
        this.typesMap.put("decimal", Lists.newArrayList((Object[])new LogicalTypeRoot[]{LogicalTypeRoot.DECIMAL, LogicalTypeRoot.BIGINT, LogicalTypeRoot.INTEGER, LogicalTypeRoot.DOUBLE, LogicalTypeRoot.FLOAT}));
        this.typesMap.put("double", Lists.newArrayList((Object[])new LogicalTypeRoot[]{LogicalTypeRoot.DOUBLE, LogicalTypeRoot.BIGINT, LogicalTypeRoot.INTEGER}));
        this.typesMap.put("float", Lists.newArrayList((Object[])new LogicalTypeRoot[]{LogicalTypeRoot.FLOAT, LogicalTypeRoot.INTEGER}));
        this.typesMap.put("int", Lists.newArrayList((Object[])new LogicalTypeRoot[]{LogicalTypeRoot.INTEGER, LogicalTypeRoot.BINARY}));
        this.typesMap.put("tinyint", Lists.newArrayList((Object[])new LogicalTypeRoot[]{LogicalTypeRoot.TINYINT, LogicalTypeRoot.INTEGER, LogicalTypeRoot.BINARY, LogicalTypeRoot.BOOLEAN}));
        this.typesMap.put("smallint", Lists.newArrayList((Object[])new LogicalTypeRoot[]{LogicalTypeRoot.SMALLINT, LogicalTypeRoot.INTEGER, LogicalTypeRoot.BINARY}));
        this.typesMap.put("varchar", Lists.newArrayList((Object[])new LogicalTypeRoot[]{LogicalTypeRoot.VARCHAR, LogicalTypeRoot.ARRAY, LogicalTypeRoot.MAP, LogicalTypeRoot.ROW}));
        this.typesMap.put("string", Lists.newArrayList((Object[])new LogicalTypeRoot[]{LogicalTypeRoot.CHAR, LogicalTypeRoot.VARCHAR, LogicalTypeRoot.ARRAY, LogicalTypeRoot.MAP, LogicalTypeRoot.ROW}));
        this.validateTableStructure(flinkSchema);
        this.starrocksStreamLoadVisitor = new StarRocksStreamLoadVisitor(sinkOptions, null == flinkSchema ? new String[]{} : flinkSchema.getFieldNames());
    }

    public void setRuntimeContext(RuntimeContext runtimeCtx) {
        this.totalFlushBytes = runtimeCtx.getMetricGroup().counter(COUNTER_TOTAL_FLUSH_BYTES);
        this.totalFlushRows = runtimeCtx.getMetricGroup().counter(COUNTER_TOTAL_FLUSH_ROWS);
        this.totalFlushTime = runtimeCtx.getMetricGroup().counter(COUNTER_TOTAL_FLUSH_COST_TIME);
        this.totalFlushTimeWithoutRetries = runtimeCtx.getMetricGroup().counter(COUNTER_TOTAL_FLUSH_COST_TIME_WITHOUT_RETRIES);
        this.totalFlushSucceededTimes = runtimeCtx.getMetricGroup().counter(COUNTER_TOTAL_FLUSH_SUCCEEDED_TIMES);
        this.totalFlushFailedTimes = runtimeCtx.getMetricGroup().counter(COUNTER_TOTAL_FLUSH_FAILED_TIMES);
        this.flushTimeNs = runtimeCtx.getMetricGroup().histogram(HISTOGRAM_FLUSH_TIME, (Histogram)new DescriptiveStatisticsHistogram(this.sinkOptions.getSinkHistogramWindowSize()));
        this.offerTimeNs = runtimeCtx.getMetricGroup().histogram(HISTOGRAM_OFFER_TIME_NS, (Histogram)new DescriptiveStatisticsHistogram(this.sinkOptions.getSinkHistogramWindowSize()));
        this.totalFilteredRows = runtimeCtx.getMetricGroup().counter(COUNTER_NUMBER_FILTERED_ROWS);
        this.commitAndPublishTimeMs = runtimeCtx.getMetricGroup().histogram(HISTOGRAM_COMMIT_AND_PUBLISH_TIME_MS, (Histogram)new DescriptiveStatisticsHistogram(this.sinkOptions.getSinkHistogramWindowSize()));
        this.streamLoadPutTimeMs = runtimeCtx.getMetricGroup().histogram(HISTOGRAM_STREAM_LOAD_PUT_TIME_MS, (Histogram)new DescriptiveStatisticsHistogram(this.sinkOptions.getSinkHistogramWindowSize()));
        this.readDataTimeMs = runtimeCtx.getMetricGroup().histogram(HISTOGRAM_READ_DATA_TIME_MS, (Histogram)new DescriptiveStatisticsHistogram(this.sinkOptions.getSinkHistogramWindowSize()));
        this.writeDataTimeMs = runtimeCtx.getMetricGroup().histogram(HISTOGRAM_WRITE_DATA_TIME_MS, (Histogram)new DescriptiveStatisticsHistogram(this.sinkOptions.getSinkHistogramWindowSize()));
        this.loadTimeMs = runtimeCtx.getMetricGroup().histogram(HISTOGRAM_LOAD_TIME_MS, (Histogram)new DescriptiveStatisticsHistogram(this.sinkOptions.getSinkHistogramWindowSize()));
    }

    public void startAsyncFlushing() {
        Thread flushThread = new Thread(() -> {
            while (true) {
                try {
                    while (this.asyncFlush()) {
                    }
                    LOG.info("StarRocks flush thread is about to exit.");
                    this.flushThreadAlive = false;
                }
                catch (Exception e) {
                    this.flushException = e;
                    continue;
                }
                break;
            }
        });
        flushThread.setUncaughtExceptionHandler((t, e) -> {
            LOG.error("StarRocks flush thread uncaught exception occurred: " + e.getMessage(), e);
            this.flushException = e;
            this.flushThreadAlive = false;
        });
        flushThread.setName("starrocks-flush");
        flushThread.setDaemon(true);
        flushThread.start();
        this.flushThreadAlive = true;
    }

    public void startScheduler() throws IOException {
        if (StarRocksSinkSemantic.EXACTLY_ONCE.equals((Object)this.sinkOptions.getSemantic())) {
            return;
        }
        this.stopScheduler();
        this.scheduler = Executors.newScheduledThreadPool(1, (ThreadFactory)new ExecutorThreadFactory("starrocks-interval-sink"));
        this.scheduledFuture = this.scheduler.schedule(() -> {
            StarRocksSinkManager starRocksSinkManager = this;
            synchronized (starRocksSinkManager) {
                if (!this.closed) {
                    try {
                        String label = this.createBatchLabel();
                        LOG.info(String.format("StarRocks interval Sinking triggered: label[%s].", label));
                        if (this.batchCount == 0) {
                            this.startScheduler();
                        }
                        this.flush(label, false);
                    }
                    catch (Exception e) {
                        this.flushException = e;
                    }
                }
            }
        }, this.sinkOptions.getSinkMaxFlushInterval(), TimeUnit.MILLISECONDS);
    }

    public void stopScheduler() {
        if (this.scheduledFuture != null) {
            this.scheduledFuture.cancel(false);
            this.scheduler.shutdown();
        }
    }

    public final synchronized void writeRecord(String record) throws IOException {
        this.checkFlushException();
        try {
            byte[] bts = record.getBytes(StandardCharsets.UTF_8);
            this.buffer.add(bts);
            ++this.batchCount;
            this.batchSize += (long)bts.length;
            if (StarRocksSinkSemantic.EXACTLY_ONCE.equals((Object)this.sinkOptions.getSemantic())) {
                return;
            }
            if ((long)this.batchCount >= this.sinkOptions.getSinkMaxRows() || this.batchSize >= this.sinkOptions.getSinkMaxBytes()) {
                String label = this.createBatchLabel();
                LOG.info(String.format("StarRocks buffer Sinking triggered: rows[%d] label[%s].", this.batchCount, label));
                this.flush(label, false);
            }
        }
        catch (Exception e) {
            throw new IOException("Writing records to StarRocks failed.", e);
        }
    }

    public synchronized void flush(String label, boolean waitUtilDone) throws Exception {
        this.checkFlushException();
        if (this.batchCount == 0) {
            if (waitUtilDone) {
                this.waitAsyncFlushingDone();
            }
            return;
        }
        this.offer((Tuple3<String, Long, ArrayList<byte[]>>)new Tuple3((Object)label, (Object)this.batchSize, new ArrayList<byte[]>(this.buffer)));
        if (waitUtilDone) {
            this.waitAsyncFlushingDone();
        }
        this.buffer.clear();
        this.batchCount = 0;
        this.batchSize = 0L;
    }

    public synchronized void close() {
        if (!this.closed) {
            this.closed = true;
            if (this.scheduledFuture != null) {
                this.scheduledFuture.cancel(false);
                this.scheduler.shutdown();
            }
            if (this.jdbcConnProvider != null) {
                this.jdbcConnProvider.close();
            }
            if (this.flushException != null) {
                this.offerEOF();
                this.checkFlushException();
                return;
            }
            try {
                String label = this.createBatchLabel();
                if (this.batchCount > 0) {
                    LOG.info(String.format("StarRocks Sink is about to close: label[%s].", label));
                }
                this.flush(label, true);
            }
            catch (Exception e) {
                throw new RuntimeException("Writing records to StarRocks failed.", e);
            }
            finally {
                this.offerEOF();
            }
        }
        this.checkFlushException();
    }

    public String createBatchLabel() {
        return UUID.randomUUID().toString();
    }

    public List<byte[]> getBufferedBatchList() {
        return this.buffer;
    }

    public void setBufferedBatchList(List<byte[]> list) throws IOException {
        if (!StarRocksSinkSemantic.EXACTLY_ONCE.equals((Object)this.sinkOptions.getSemantic())) {
            return;
        }
        this.buffer.clear();
        this.batchCount = 0;
        this.batchSize = 0L;
        for (byte[] row : list) {
            this.writeRecord(new String(row, StandardCharsets.UTF_8));
        }
    }

    public boolean asyncFlush() throws Exception {
        Tuple3<String, Long, ArrayList<byte[]>> flushData = this.flushQueue.poll(3L, TimeUnit.SECONDS);
        if (flushData == null || Strings.isNullOrEmpty((String)((String)flushData.f0))) {
            return true;
        }
        if (EOF.equals(flushData.f0)) {
            return false;
        }
        this.stopScheduler();
        LOG.info(String.format("Async stream load: rows[%d] bytes[%d] label[%s].", ((ArrayList)flushData.f2).size(), flushData.f1, flushData.f0));
        long startWithRetries = System.nanoTime();
        for (int i = 0; i <= this.sinkOptions.getSinkMaxRetries(); ++i) {
            try {
                long start = System.nanoTime();
                Map<String, Object> result = this.starrocksStreamLoadVisitor.doStreamLoad(flushData);
                LOG.info(String.format("Async stream load finished: label[%s].", flushData.f0));
                if (null != this.totalFlushBytes) {
                    this.totalFlushBytes.inc(((Long)flushData.f1).longValue());
                    this.totalFlushRows.inc((long)((ArrayList)flushData.f2).size());
                    this.totalFlushTime.inc(System.nanoTime() - startWithRetries);
                    this.totalFlushTimeWithoutRetries.inc(System.nanoTime() - start);
                    this.totalFlushSucceededTimes.inc();
                    this.flushTimeNs.update(System.nanoTime() - start);
                    this.updateMetricsFromStreamLoadResult(result);
                }
                this.startScheduler();
                break;
            }
            catch (Exception e) {
                if (this.totalFlushFailedTimes != null) {
                    this.totalFlushFailedTimes.inc();
                }
                LOG.warn("Failed to flush batch data to StarRocks, retry times = {}", (Object)i, (Object)e);
                if (i >= this.sinkOptions.getSinkMaxRetries()) {
                    throw e;
                }
                try {
                    Thread.sleep(1000L * (long)(i + 1));
                    continue;
                }
                catch (InterruptedException ex) {
                    Thread.currentThread().interrupt();
                    throw new IOException("Unable to flush, interrupted while doing another attempt", e);
                }
            }
        }
        return true;
    }

    private void waitAsyncFlushingDone() throws InterruptedException {
        this.offer((Tuple3<String, Long, ArrayList<byte[]>>)new Tuple3((Object)"", (Object)0L, null));
        this.offer((Tuple3<String, Long, ArrayList<byte[]>>)new Tuple3((Object)"", (Object)0L, null));
        this.checkFlushException();
    }

    void offer(Tuple3<String, Long, ArrayList<byte[]>> tuple3) throws InterruptedException {
        if (!this.flushThreadAlive) {
            LOG.info(String.format("Flush thread already exit, ignore offer request for label[%s]", tuple3.f0));
            return;
        }
        long start = System.nanoTime();
        if (!this.flushQueue.offer(tuple3, this.sinkOptions.getSinkOfferTimeout(), TimeUnit.MILLISECONDS)) {
            throw new RuntimeException("Timeout while offering data to flushQueue, exceed " + this.sinkOptions.getSinkOfferTimeout() + " ms, see " + StarRocksSinkOptions.SINK_BATCH_OFFER_TIMEOUT.key());
        }
        if (this.offerTimeNs != null) {
            this.offerTimeNs.update(System.nanoTime() - start);
        }
    }

    private void offerEOF() {
        try {
            this.offer((Tuple3<String, Long, ArrayList<byte[]>>)new Tuple3((Object)EOF, (Object)0L, null));
        }
        catch (Exception e) {
            LOG.warn("Writing EOF failed.", (Throwable)e);
        }
    }

    private void checkFlushException() {
        if (this.flushException != null) {
            StackTraceElement[] stack = Thread.currentThread().getStackTrace();
            for (int i = 0; i < stack.length; ++i) {
                LOG.info(stack[i].getClassName() + "." + stack[i].getMethodName() + " line:" + stack[i].getLineNumber());
            }
            throw new RuntimeException("Writing records to StarRocks failed.", this.flushException);
        }
    }

    private void validateTableStructure(TableSchema flinkSchema) {
        if (null == flinkSchema) {
            return;
        }
        Optional constraint = flinkSchema.getPrimaryKey();
        List<Map<String, Object>> rows = this.starrocksQueryVisitor.getTableColumnsMetaData();
        if (rows == null || rows.isEmpty()) {
            throw new IllegalArgumentException("Couldn't get the sink table's column info.");
        }
        ArrayList<String> primayKeys = new ArrayList<String>();
        for (int i = 0; i < rows.size(); ++i) {
            String keysType = rows.get(i).get("COLUMN_KEY").toString();
            if (!"PRI".equals(keysType)) continue;
            primayKeys.add(rows.get(i).get("COLUMN_NAME").toString().toLowerCase());
        }
        if (!primayKeys.isEmpty()) {
            if (!constraint.isPresent()) {
                throw new IllegalArgumentException("Primary keys not defined in the sink `TableSchema`.");
            }
            if (((UniqueConstraint)constraint.get()).getColumns().size() != primayKeys.size() || !((UniqueConstraint)constraint.get()).getColumns().stream().allMatch(col -> primayKeys.contains(col.toLowerCase()))) {
                throw new IllegalArgumentException("Primary keys of the flink `TableSchema` do not match with the ones from starrocks table.");
            }
            this.sinkOptions.enableUpsertDelete();
        }
        if (this.sinkOptions.hasColumnMappingProperty()) {
            return;
        }
        if (flinkSchema.getFieldCount() != rows.size()) {
            throw new IllegalArgumentException("Fields count of " + this.sinkOptions.getTableName() + " mismatch. \nflinkSchema[" + flinkSchema.getFieldNames().length + "]:" + Arrays.asList(flinkSchema.getFieldNames()).stream().collect(Collectors.joining(",")) + "\n realTab[" + rows.size() + "]:" + rows.stream().map(r -> String.valueOf(r.get("COLUMN_NAME"))).collect(Collectors.joining(",")));
        }
        List flinkCols = flinkSchema.getTableColumns();
        for (int i = 0; i < rows.size(); ++i) {
            String starrocksField = rows.get(i).get("COLUMN_NAME").toString().toLowerCase();
            String starrocksType = rows.get(i).get("DATA_TYPE").toString().toLowerCase();
            List matchedFlinkCols = flinkCols.stream().filter(col -> col.getName().toLowerCase().equals(starrocksField) && (!this.typesMap.containsKey(starrocksType) || this.typesMap.get(starrocksType).contains(col.getType().getLogicalType().getTypeRoot()))).collect(Collectors.toList());
            if (!matchedFlinkCols.isEmpty()) continue;
            throw new IllegalArgumentException("Fields name or type mismatch for:" + starrocksField);
        }
    }

    private void updateMetricsFromStreamLoadResult(Map<String, Object> result) {
        if (result != null) {
            this.updateHisto(result, "CommitAndPublishTimeMs", this.commitAndPublishTimeMs);
            this.updateHisto(result, "StreamLoadPutTimeMs", this.streamLoadPutTimeMs);
            this.updateHisto(result, "ReadDataTimeMs", this.readDataTimeMs);
            this.updateHisto(result, "WriteDataTimeMs", this.writeDataTimeMs);
            this.updateHisto(result, "LoadTimeMs", this.loadTimeMs);
            this.updateCounter(result, "NumberFilteredRows", this.totalFilteredRows);
        }
    }

    private void updateCounter(Map<String, Object> result, String key, Counter counter) {
        Object val;
        if (result.containsKey(key) && (val = result.get(key)) != null) {
            try {
                long longValue = Long.parseLong(val.toString());
                counter.inc(longValue);
            }
            catch (Exception e) {
                LOG.warn("Parse stream load result metric error", (Throwable)e);
            }
        }
    }

    private void updateHisto(Map<String, Object> result, String key, Histogram histogram) {
        Object val;
        if (result.containsKey(key) && (val = result.get(key)) != null) {
            try {
                long longValue = Long.parseLong(val.toString());
                histogram.update(longValue);
            }
            catch (Exception e) {
                LOG.warn("Parse stream load result metric error", (Throwable)e);
            }
        }
    }
}

