/*
 * Decompiled with CFR 0.152.
 */
package com.starrocks.connector.spark.sql.conf;

import com.starrocks.connector.spark.sql.conf.StarRocksConfigBase;
import com.starrocks.connector.spark.sql.schema.StarRocksField;
import com.starrocks.connector.spark.sql.schema.StarRocksSchema;
import com.starrocks.data.load.stream.DelimiterParser;
import com.starrocks.data.load.stream.StreamLoadDataFormat;
import com.starrocks.data.load.stream.StreamLoadUtils;
import com.starrocks.data.load.stream.properties.StreamLoadProperties;
import com.starrocks.data.load.stream.properties.StreamLoadTableProperties;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.spark.sql.types.ArrayType;
import org.apache.spark.sql.types.ByteType;
import org.apache.spark.sql.types.DataType;
import org.apache.spark.sql.types.IntegerType;
import org.apache.spark.sql.types.LongType;
import org.apache.spark.sql.types.ShortType;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
import org.apache.spark.util.Utils;

public class WriteStarRocksConfig
extends StarRocksConfigBase {
    private static final long serialVersionUID = 1L;
    public static final String WRITE_PREFIX = "starrocks.write.";
    private static final String KEY_LABEL_PREFIX = "starrocks.write.label.prefix";
    private static final String KEY_SOCKET_TIMEOUT = "starrocks.write.socket.timeout.ms";
    private static final String KEY_WAIT_FOR_CONTINUE_TIMEOUT = "starrocks.write.wait-for-continue.timeout.ms";
    private static final String KEY_CHUNK_LIMIT = "starrocks.write.chunk.limit";
    private static final String KEY_SCAN_FREQUENCY = "starrocks.write.scan-frequency.ms";
    private static final String KEY_ENABLE_TRANSACTION = "starrocks.write.enable.transaction-stream-load";
    private static final String KEY_BUFFER_SIZE = "starrocks.write.buffer.size";
    private static final String KEY_BUFFER_ROWS = "starrocks.write.buffer.rows";
    private static final String KEY_FLUSH_INTERVAL = "starrocks.write.flush.interval.ms";
    private static final String KEY_MAX_RETIES = "starrocks.write.max.retries";
    private static final String KEY_RETRY_INTERVAL_MS = "starrocks.write.retry.interval.ms";
    private static final String PROPS_PREFIX = "starrocks.write.properties.";
    private static final String KEY_PROPS_FORMAT = "starrocks.write.properties.format";
    private static final String KEY_PROPS_ROW_DELIMITER = "starrocks.write.properties.row_delimiter";
    private static final String KEY_PROPS_COLUMN_SEPARATOR = "starrocks.write.properties.column_separator";
    private static final String KEY_NUM_PARTITIONS = "starrocks.write.num.partitions";
    private static final String KEY_PARTITION_COLUMNS = "starrocks.write.partition.columns";
    private String labelPrefix = "spark";
    private int socketTimeoutMs = -1;
    private int waitForContinueTimeoutMs = 30000;
    private int ioThreadCount = 1;
    private long chunkLimit = 0xC0000000L;
    private int scanFrequencyInMs = 50;
    private boolean enableTransactionStreamLoad = true;
    private long bufferSize = 0x6400000L;
    private int bufferRows = Integer.MAX_VALUE;
    private int flushInterval = 300000;
    private int maxRetries = 0;
    private int retryIntervalInMs = 10000;
    private Map<String, String> properties;
    private String format = "CSV";
    private String rowDelimiter = "\n";
    private String columnSeparator = "\t";
    private boolean supportTransactionStreamLoad = true;
    private int numPartitions = 0;
    private String[] partitionColumns;
    private String streamLoadColumnProperty;
    private String[] streamLoadColumnNames;
    private final Set<String> starRocksJsonColumnNames;

    public WriteStarRocksConfig(Map<String, String> originOptions, StructType sparkSchema, StarRocksSchema starRocksSchema) {
        super(originOptions);
        this.load(sparkSchema);
        this.genStreamLoadColumns(sparkSchema, starRocksSchema);
        this.starRocksJsonColumnNames = new HashSet<String>();
        for (StarRocksField column : starRocksSchema.getColumns()) {
            if (!column.isJson()) continue;
            this.starRocksJsonColumnNames.add(column.getName());
        }
    }

    private void load(StructType sparkSchema) {
        this.labelPrefix = this.get(KEY_LABEL_PREFIX, "spark");
        this.socketTimeoutMs = this.getInt(KEY_SOCKET_TIMEOUT, -1);
        this.waitForContinueTimeoutMs = this.getInt(KEY_WAIT_FOR_CONTINUE_TIMEOUT, 30000);
        this.chunkLimit = Utils.byteStringAsBytes((String)this.get(KEY_CHUNK_LIMIT, "3g"));
        this.scanFrequencyInMs = this.getInt(KEY_SCAN_FREQUENCY, 50);
        this.enableTransactionStreamLoad = this.getBoolean(KEY_ENABLE_TRANSACTION, true);
        this.bufferSize = Utils.byteStringAsBytes((String)this.get(KEY_BUFFER_SIZE, "100m"));
        this.bufferRows = this.getInt(KEY_BUFFER_ROWS, Integer.MAX_VALUE);
        this.flushInterval = this.getInt(KEY_FLUSH_INTERVAL, 300000);
        this.maxRetries = this.getInt(KEY_MAX_RETIES, 3);
        this.retryIntervalInMs = this.getInt(KEY_RETRY_INTERVAL_MS, 10000);
        this.properties = this.originOptions.entrySet().stream().filter(entry -> ((String)entry.getKey()).startsWith(PROPS_PREFIX)).collect(Collectors.toMap(entry -> ((String)entry.getKey()).replaceFirst(PROPS_PREFIX, ""), Map.Entry::getValue));
        this.format = this.originOptions.getOrDefault(KEY_PROPS_FORMAT, "CSV");
        this.rowDelimiter = DelimiterParser.convertDelimiter(this.originOptions.getOrDefault(KEY_PROPS_ROW_DELIMITER, "\n"));
        this.columnSeparator = DelimiterParser.convertDelimiter(this.originOptions.getOrDefault(KEY_PROPS_COLUMN_SEPARATOR, "\t"));
        String inferedFormat = this.inferFormatFromSchema(sparkSchema);
        if (inferedFormat != null) {
            this.format = inferedFormat;
            this.properties.put("format", this.format);
        }
        if ("json".equalsIgnoreCase(this.format)) {
            if (!this.properties.containsKey("strip_outer_array")) {
                this.properties.put("strip_outer_array", "true");
            }
            if (!this.properties.containsKey("ignore_json_size")) {
                this.properties.put("ignore_json_size", "true");
            }
        }
        if (!this.properties.containsKey("timeout")) {
            int timeout = Math.max(600, this.flushInterval / 1000 + 600);
            this.properties.put("timeout", String.valueOf(timeout));
        }
        this.numPartitions = this.getInt(KEY_NUM_PARTITIONS, 0);
        this.partitionColumns = this.getArray(KEY_PARTITION_COLUMNS, null);
        this.supportTransactionStreamLoad = StreamLoadUtils.isStarRocksSupportTransactionLoad(Arrays.asList(this.getFeHttpUrls()), this.getHttpRequestConnectTimeoutMs(), this.getUsername(), this.getPassword());
    }

    private void genStreamLoadColumns(StructType sparkSchema, StarRocksSchema starRocksSchema) {
        this.streamLoadColumnNames = new String[sparkSchema.length()];
        ArrayList<String> expressions = new ArrayList<String>();
        for (int i = 0; i < sparkSchema.length(); ++i) {
            StructField field = sparkSchema.apply(i);
            StarRocksField starRocksField = starRocksSchema.getField(field.name());
            if (starRocksField.isBitmap()) {
                this.streamLoadColumnNames[i] = "__tmp" + field.name();
                expressions.add(String.format("`%s`=%s(`%s`)", field.name(), this.getBitmapFunction(field), this.streamLoadColumnNames[i]));
                continue;
            }
            if (starRocksField.isHll()) {
                this.streamLoadColumnNames[i] = "__tmp" + field.name();
                expressions.add(String.format("`%s`=hll_hash(`%s`)", field.name(), this.streamLoadColumnNames[i]));
                continue;
            }
            this.streamLoadColumnNames[i] = field.name();
        }
        if (this.properties.containsKey("columns")) {
            this.streamLoadColumnProperty = this.properties.get("columns");
        } else if (this.getColumns() != null || !expressions.isEmpty()) {
            String joinedCols = Arrays.stream(this.streamLoadColumnNames).map(f -> String.format("`%s`", f.trim().replace("`", ""))).collect(Collectors.joining(","));
            String joinedExps = String.join((CharSequence)",", expressions);
            this.streamLoadColumnProperty = joinedExps.isEmpty() ? joinedCols : joinedCols + "," + joinedExps;
        }
    }

    private String inferFormatFromSchema(StructType sparkSchema) {
        for (StructField field : sparkSchema.fields()) {
            if (!(field.dataType() instanceof ArrayType)) continue;
            return "json";
        }
        return null;
    }

    private String getBitmapFunction(StructField field) {
        DataType dataType = field.dataType();
        if (dataType instanceof ByteType || dataType instanceof ShortType || dataType instanceof IntegerType || dataType instanceof LongType) {
            return "to_bitmap";
        }
        return "bitmap_hash";
    }

    public String getFormat() {
        return this.format;
    }

    public String getColumnSeparator() {
        return this.columnSeparator;
    }

    public int getNumPartitions() {
        return this.numPartitions;
    }

    public String[] getPartitionColumns() {
        return this.partitionColumns;
    }

    public String[] getStreamLoadColumnNames() {
        return this.streamLoadColumnNames;
    }

    public Set<String> getStarRocksJsonColumnNames() {
        return this.starRocksJsonColumnNames;
    }

    public boolean isPartialUpdate() {
        String val = this.properties.get("partial_update");
        return val != null && val.equalsIgnoreCase("true");
    }

    public StreamLoadProperties toStreamLoadProperties() {
        StreamLoadDataFormat dataFormat = "json".equalsIgnoreCase(this.format) ? StreamLoadDataFormat.JSON : new StreamLoadDataFormat.CSVFormat(this.rowDelimiter);
        StreamLoadTableProperties tableProperties = StreamLoadTableProperties.builder().database(this.getDatabase()).table(this.getTable()).columns(this.streamLoadColumnProperty).streamLoadDataFormat(dataFormat).chunkLimit(this.chunkLimit).maxBufferRows(this.bufferRows).addCommonProperties(this.properties).build();
        StreamLoadProperties.Builder builder = StreamLoadProperties.builder().defaultTableProperties(tableProperties).loadUrls(this.getFeHttpUrls()).jdbcUrl(this.getFeJdbcUrl()).username(this.getUsername()).password(this.getPassword()).connectTimeout(this.getHttpRequestConnectTimeoutMs()).socketTimeout(this.socketTimeoutMs).waitForContinueTimeoutMs(this.waitForContinueTimeoutMs).ioThreadCount(this.ioThreadCount).scanningFrequency(this.scanFrequencyInMs).cacheMaxBytes(this.bufferSize).expectDelayTime(this.flushInterval).labelPrefix(this.labelPrefix).maxRetries(this.maxRetries).retryIntervalInMs(this.retryIntervalInMs).addHeaders(this.properties);
        if (this.enableTransactionStreamLoad && this.supportTransactionStreamLoad) {
            builder.enableTransaction();
        }
        return builder.build();
    }
}

