/*
 * Decompiled with CFR 0.152.
 */
package org.apache.seatunnel.connectors.seatunnel.starrocks.client;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import org.apache.commons.codec.binary.Base64;
import org.apache.seatunnel.api.table.catalog.Column;
import org.apache.seatunnel.api.table.catalog.TableSchema;
import org.apache.seatunnel.common.exception.SeaTunnelErrorCode;
import org.apache.seatunnel.common.utils.JsonUtils;
import org.apache.seatunnel.connectors.seatunnel.starrocks.client.HttpHelper;
import org.apache.seatunnel.connectors.seatunnel.starrocks.client.StarRocksFlushTuple;
import org.apache.seatunnel.connectors.seatunnel.starrocks.config.SinkConfig;
import org.apache.seatunnel.connectors.seatunnel.starrocks.exception.StarRocksConnectorErrorCode;
import org.apache.seatunnel.connectors.seatunnel.starrocks.exception.StarRocksConnectorException;
import org.apache.seatunnel.connectors.seatunnel.starrocks.serialize.StarRocksDelimiterParser;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class StarRocksStreamLoadVisitor {
    private static final Logger LOG = LoggerFactory.getLogger(StarRocksStreamLoadVisitor.class);
    private final HttpHelper httpHelper;
    private static final int MAX_SLEEP_TIME = 5;
    private final SinkConfig sinkConfig;
    private long pos;
    private static final String RESULT_FAILED = "Fail";
    private static final String RESULT_SUCCESS = "Success";
    private static final String RESULT_LABEL_EXISTED = "Label Already Exists";
    private static final String LABEL_STATE_VISIBLE = "VISIBLE";
    private static final String LABEL_STATE_COMMITTED = "COMMITTED";
    private static final String RESULT_LABEL_PREPARE = "PREPARE";
    private static final String RESULT_LABEL_ABORTED = "ABORTED";
    private static final String RESULT_LABEL_UNKNOWN = "UNKNOWN";
    private final TableSchema tableSchema;

    public StarRocksStreamLoadVisitor(SinkConfig sinkConfig, TableSchema tableSchema) {
        this.sinkConfig = sinkConfig;
        this.tableSchema = tableSchema;
        this.httpHelper = new HttpHelper(sinkConfig);
        this.checkBatchMaxBytes(sinkConfig.getBatchMaxBytes(), sinkConfig.getBatchMaxSize());
    }

    public Boolean doStreamLoad(StarRocksFlushTuple flushData) throws IOException {
        String host = this.getAvailableHost();
        if (null == host) {
            throw new StarRocksConnectorException((SeaTunnelErrorCode)StarRocksConnectorErrorCode.HOST_IS_NULL, "None of the host in `load_url` could be connected.");
        }
        String loadUrl = host + "/api/" + this.sinkConfig.getDatabase() + "/" + this.sinkConfig.getTable() + "/_stream_load";
        if (LOG.isDebugEnabled()) {
            LOG.debug(String.format("Start to join batch data: rows[%d] bytes[%d] label[%s].", flushData.getRows().size(), flushData.getBytes(), flushData.getLabel()));
        }
        Map<String, Object> loadResult = this.httpHelper.doHttpPut(loadUrl, this.joinRows(flushData.getRows(), flushData.getBytes()), this.getStreamLoadHttpHeader(flushData.getLabel()));
        String keyStatus = "Status";
        if (null == loadResult || !loadResult.containsKey("Status")) {
            LOG.error("unknown result status. {}", loadResult);
            throw new StarRocksConnectorException((SeaTunnelErrorCode)StarRocksConnectorErrorCode.FLUSH_DATA_FAILED, "Unable to flush data to StarRocks: unknown result status. " + loadResult);
        }
        if (LOG.isDebugEnabled()) {
            LOG.debug("StreamLoad response:\n" + JsonUtils.toJsonString(loadResult));
        }
        if (RESULT_FAILED.equals(loadResult.get("Status"))) {
            StringBuilder errorBuilder = new StringBuilder("Failed to flush data to StarRocks \n");
            errorBuilder.append(this.sinkConfig.getDatabase()).append("/").append(this.sinkConfig.getTable()).append("\n");
            if (loadResult.containsKey("Message")) {
                errorBuilder.append(loadResult.get("Message"));
                errorBuilder.append('\n');
            }
            if (loadResult.containsKey("ErrorURL")) {
                LOG.error("StreamLoad response: {}", loadResult);
                try {
                    errorBuilder.append(this.httpHelper.doHttpGet(loadResult.get("ErrorURL").toString()));
                    errorBuilder.append('\n');
                }
                catch (IOException e) {
                    LOG.warn("Get Error URL failed. {} ", loadResult.get("ErrorURL"), (Object)e);
                }
            } else {
                errorBuilder.append(JsonUtils.toJsonString(loadResult));
                errorBuilder.append('\n');
            }
            throw new StarRocksConnectorException((SeaTunnelErrorCode)StarRocksConnectorErrorCode.FLUSH_DATA_FAILED, errorBuilder.toString());
        }
        if (RESULT_LABEL_EXISTED.equals(loadResult.get("Status"))) {
            LOG.debug("StreamLoad response:\n" + JsonUtils.toJsonString(loadResult));
            this.checkLabelState(host, flushData.getLabel());
        }
        return RESULT_SUCCESS.equals(loadResult.get("Status"));
    }

    private String getAvailableHost() {
        List<String> hostList = this.sinkConfig.getNodeUrls();
        long tmp = this.pos + (long)hostList.size();
        while (this.pos < tmp) {
            String host = "http://" + hostList.get((int)(this.pos % (long)hostList.size()));
            if (this.httpHelper.tryHttpConnection(host)) {
                return host;
            }
            ++this.pos;
        }
        return null;
    }

    private byte[] joinRows(List<byte[]> rows, Long totalBytes) {
        this.checkBatchMaxBytes(totalBytes, rows.size());
        if (SinkConfig.StreamLoadFormat.CSV.equals((Object)this.sinkConfig.getLoadFormat())) {
            Map<String, Object> props = this.sinkConfig.getStreamLoadProps();
            byte[] lineDelimiter = StarRocksDelimiterParser.parse((String)props.get("row_delimiter"), "\n").getBytes(StandardCharsets.UTF_8);
            ByteBuffer bos = ByteBuffer.allocate(totalBytes.intValue() + rows.size() * lineDelimiter.length);
            for (byte[] row : rows) {
                bos.put(row);
                bos.put(lineDelimiter);
            }
            return bos.array();
        }
        if (SinkConfig.StreamLoadFormat.JSON.equals((Object)this.sinkConfig.getLoadFormat())) {
            ByteBuffer bos = ByteBuffer.allocate(totalBytes.intValue() + (rows.isEmpty() ? 2 : rows.size() + 1));
            bos.put("[".getBytes(StandardCharsets.UTF_8));
            byte[] jsonDelimiter = ",".getBytes(StandardCharsets.UTF_8);
            boolean isFirstElement = true;
            for (byte[] row : rows) {
                if (!isFirstElement) {
                    bos.put(jsonDelimiter);
                }
                bos.put(row);
                isFirstElement = false;
            }
            bos.put("]".getBytes(StandardCharsets.UTF_8));
            return bos.array();
        }
        throw new StarRocksConnectorException((SeaTunnelErrorCode)StarRocksConnectorErrorCode.FLUSH_DATA_FAILED, "Failed to join rows data, unsupported `format` from stream load properties:");
    }

    /*
     * Exception decompiling
     */
    private void checkLabelState(String host, String label) throws IOException {
        /*
         * This method has failed to decompile.  When submitting a bug report, please provide this stack trace, and (if you hold appropriate legal rights) the relevant class file.
         * 
         * org.benf.cfr.reader.util.ConfusedCFRException: Tried to end blocks [16[UNCONDITIONALDOLOOP]], but top level block is 1[TRYBLOCK]
         *     at org.benf.cfr.reader.bytecode.analysis.opgraph.Op04StructuredStatement.processEndingBlocks(Op04StructuredStatement.java:435)
         *     at org.benf.cfr.reader.bytecode.analysis.opgraph.Op04StructuredStatement.buildNestedBlocks(Op04StructuredStatement.java:484)
         *     at org.benf.cfr.reader.bytecode.analysis.opgraph.Op03SimpleStatement.createInitialStructuredBlock(Op03SimpleStatement.java:736)
         *     at org.benf.cfr.reader.bytecode.CodeAnalyser.getAnalysisInner(CodeAnalyser.java:850)
         *     at org.benf.cfr.reader.bytecode.CodeAnalyser.getAnalysisOrWrapFail(CodeAnalyser.java:278)
         *     at org.benf.cfr.reader.bytecode.CodeAnalyser.getAnalysis(CodeAnalyser.java:201)
         *     at org.benf.cfr.reader.entities.attributes.AttributeCode.analyse(AttributeCode.java:94)
         *     at org.benf.cfr.reader.entities.Method.analyse(Method.java:531)
         *     at org.benf.cfr.reader.entities.ClassFile.analyseMid(ClassFile.java:1055)
         *     at org.benf.cfr.reader.entities.ClassFile.analyseTop(ClassFile.java:942)
         *     at org.benf.cfr.reader.Driver.doJarVersionTypes(Driver.java:257)
         *     at org.benf.cfr.reader.Driver.doJar(Driver.java:139)
         *     at org.benf.cfr.reader.CfrDriverImpl.analyse(CfrDriverImpl.java:76)
         *     at org.benf.cfr.reader.Main.main(Main.java:54)
         */
        throw new IllegalStateException("Decompilation failed");
    }

    private String getBasicAuthHeader(String username, String password) {
        String auth = username + ":" + password;
        byte[] encodedAuth = Base64.encodeBase64(auth.getBytes(StandardCharsets.UTF_8));
        return "Basic " + new String(encodedAuth);
    }

    private Map<String, String> getStreamLoadHttpHeader(String label) {
        HashMap<String, String> headerMap = new HashMap<String, String>();
        List columns = this.tableSchema.getColumns();
        List fieldNames = columns.stream().map(Column::getName).collect(Collectors.toList());
        if (this.sinkConfig.isEnableUpsertDelete()) {
            fieldNames.add("__op");
        }
        if (!fieldNames.isEmpty() && SinkConfig.StreamLoadFormat.CSV.equals((Object)this.sinkConfig.getLoadFormat())) {
            headerMap.put("columns", fieldNames.stream().map(f -> String.format("`%s`", f)).collect(Collectors.joining(",")));
        }
        if (null != this.sinkConfig.getStreamLoadProps()) {
            for (Map.Entry<String, Object> entry : this.sinkConfig.getStreamLoadProps().entrySet()) {
                headerMap.put(entry.getKey(), String.valueOf(entry.getValue()));
            }
        }
        headerMap.put("strip_outer_array", "true");
        headerMap.put("Expect", "100-continue");
        headerMap.put("label", label);
        headerMap.put("Content-Type", "application/x-www-form-urlencoded");
        headerMap.put("format", this.sinkConfig.getLoadFormat().name().toUpperCase());
        headerMap.put("Authorization", this.getBasicAuthHeader(this.sinkConfig.getUsername(), this.sinkConfig.getPassword()));
        return headerMap;
    }

    private Map<String, String> getLoadStateHttpHeader(String label) {
        HashMap<String, String> headerMap = new HashMap<String, String>();
        headerMap.put("Authorization", this.getBasicAuthHeader(this.sinkConfig.getUsername(), this.sinkConfig.getPassword()));
        headerMap.put("Connection", "close");
        return headerMap;
    }

    void checkBatchMaxBytes(long batchMaxBytes, long batchMaxRows) {
        long batchMaxBytesLimit;
        if (SinkConfig.StreamLoadFormat.CSV.equals((Object)this.sinkConfig.getLoadFormat())) {
            Map<String, Object> props = this.sinkConfig.getStreamLoadProps();
            byte[] lineDelimiter = StarRocksDelimiterParser.parse((String)props.get("row_delimiter"), "\n").getBytes(StandardCharsets.UTF_8);
            batchMaxBytesLimit = Integer.MAX_VALUE - batchMaxRows * (long)lineDelimiter.length;
        } else if (SinkConfig.StreamLoadFormat.JSON.equals((Object)this.sinkConfig.getLoadFormat())) {
            batchMaxBytesLimit = Integer.MAX_VALUE - (batchMaxRows == 0L ? 2L : batchMaxRows + 1L);
        } else {
            throw new StarRocksConnectorException((SeaTunnelErrorCode)StarRocksConnectorErrorCode.FLUSH_DATA_FAILED, "Failed to join rows data, unsupported `format` from stream load properties:");
        }
        if (batchMaxBytes > batchMaxBytesLimit) {
            throw new StarRocksConnectorException((SeaTunnelErrorCode)StarRocksConnectorErrorCode.FLUSH_DATA_FAILED, String.format("The batch_max_bytes[%d] of the data exceeds the maximum limit[%d], please reset the batch_max_bytes.", batchMaxBytes, batchMaxBytesLimit));
        }
    }
}

