/*
 * Decompiled with CFR 0.152.
 */
package com.starrocks.connector.flink.manager;

import com.starrocks.connector.flink.manager.StarRocksStreamLoadFailedException;
import com.starrocks.connector.flink.row.StarRocksDelimiterParser;
import com.starrocks.connector.flink.table.StarRocksSinkOptions;
import com.starrocks.shade.com.alibaba.fastjson.JSON;
import com.starrocks.shade.org.apache.commons.codec.binary.Base64;
import com.starrocks.shade.org.apache.http.HttpEntity;
import com.starrocks.shade.org.apache.http.client.methods.CloseableHttpResponse;
import java.io.IOException;
import java.io.Serializable;
import java.net.HttpURLConnection;
import java.net.URL;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.flink.api.java.tuple.Tuple3;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class StarRocksStreamLoadVisitor
implements Serializable {
    private static final long serialVersionUID = 1L;
    private static final Logger LOG = LoggerFactory.getLogger(StarRocksStreamLoadVisitor.class);
    private static final int ERROR_LOG_MAX_LENGTH = 3000;
    private final StarRocksSinkOptions sinkOptions;
    private final String[] fieldNames;
    private int pos;

    public StarRocksStreamLoadVisitor(StarRocksSinkOptions sinkOptions, String[] fieldNames) {
        this.fieldNames = fieldNames;
        this.sinkOptions = sinkOptions;
    }

    public Map<String, Object> doStreamLoad(Tuple3<String, Long, ArrayList<byte[]>> labeledRows) throws IOException {
        String host = this.getAvailableHost();
        if (null == host) {
            throw new IOException("None of the hosts in `load_url` could be connected.");
        }
        String loadUrl = host + "/api/" + this.sinkOptions.getDatabaseName() + "/" + this.sinkOptions.getTableName() + "/_stream_load";
        LOG.info(String.format("Start to join batch data: rows[%d] bytes[%d] label[%s].", ((ArrayList)labeledRows.f2).size(), labeledRows.f1, labeledRows.f0));
        Map<String, Object> loadResult = this.doHttpPut(loadUrl, (String)labeledRows.f0, this.joinRows((List)labeledRows.f2, ((Long)labeledRows.f1).intValue()));
        String keyStatus = "Status";
        if (null == loadResult || !loadResult.containsKey("Status")) {
            throw new IOException("Unable to flush data to StarRocks: unknown result status, usually caused by: 1.authentication or permission related problems. 2.Wrong column_separator or row_delimiter. 3.Column count exceeded the limitation.");
        }
        if (LOG.isDebugEnabled()) {
            LOG.debug(String.format("Stream Load response: \n%s\n", JSON.toJSONString(loadResult)));
        }
        if (loadResult.get("Status").equals("Fail")) {
            LOG.error(String.format("Stream Load response: \n%s\n", JSON.toJSONString(loadResult)));
            HashMap<String, String> logMap = new HashMap<String, String>();
            if (loadResult.containsKey("ErrorURL")) {
                logMap.put("streamLoadErrorLog", this.getErrorLog((String)loadResult.get("ErrorURL")));
            }
            throw new StarRocksStreamLoadFailedException(String.format("Failed to flush data to StarRocks, Error response: \n%s\n%s\n", JSON.toJSONString(loadResult), JSON.toJSONString(logMap)), loadResult);
        }
        return loadResult;
    }

    /*
     * Exception decompiling
     */
    public String getErrorLog(String errorUrl) {
        /*
         * This method has failed to decompile.  When submitting a bug report, please provide this stack trace, and (if you hold appropriate legal rights) the relevant class file.
         * 
         * org.benf.cfr.reader.util.ConfusedCFRException: Started 2 blocks at once
         *     at org.benf.cfr.reader.bytecode.analysis.opgraph.Op04StructuredStatement.getStartingBlocks(Op04StructuredStatement.java:412)
         *     at org.benf.cfr.reader.bytecode.analysis.opgraph.Op04StructuredStatement.buildNestedBlocks(Op04StructuredStatement.java:487)
         *     at org.benf.cfr.reader.bytecode.analysis.opgraph.Op03SimpleStatement.createInitialStructuredBlock(Op03SimpleStatement.java:736)
         *     at org.benf.cfr.reader.bytecode.CodeAnalyser.getAnalysisInner(CodeAnalyser.java:850)
         *     at org.benf.cfr.reader.bytecode.CodeAnalyser.getAnalysisOrWrapFail(CodeAnalyser.java:278)
         *     at org.benf.cfr.reader.bytecode.CodeAnalyser.getAnalysis(CodeAnalyser.java:201)
         *     at org.benf.cfr.reader.entities.attributes.AttributeCode.analyse(AttributeCode.java:94)
         *     at org.benf.cfr.reader.entities.Method.analyse(Method.java:531)
         *     at org.benf.cfr.reader.entities.ClassFile.analyseMid(ClassFile.java:1055)
         *     at org.benf.cfr.reader.entities.ClassFile.analyseTop(ClassFile.java:942)
         *     at org.benf.cfr.reader.Driver.doJarVersionTypes(Driver.java:257)
         *     at org.benf.cfr.reader.Driver.doJar(Driver.java:139)
         *     at org.benf.cfr.reader.CfrDriverImpl.analyse(CfrDriverImpl.java:76)
         *     at org.benf.cfr.reader.Main.main(Main.java:54)
         */
        throw new IllegalStateException("Decompilation failed");
    }

    private String getAvailableHost() {
        List<String> hostList = this.sinkOptions.getLoadUrlList();
        if (this.pos >= hostList.size()) {
            this.pos = 0;
        }
        while (this.pos < hostList.size()) {
            String host = "http://" + hostList.get(this.pos);
            if (this.tryHttpConnection(host)) {
                return host;
            }
            ++this.pos;
        }
        return null;
    }

    private boolean tryHttpConnection(String host) {
        try {
            URL url = new URL(host);
            HttpURLConnection co = (HttpURLConnection)url.openConnection();
            co.setConnectTimeout(this.sinkOptions.getConnectTimeout());
            co.connect();
            co.disconnect();
            return true;
        }
        catch (Exception e1) {
            LOG.warn("Failed to connect to address:{}", (Object)host, (Object)e1);
            return false;
        }
    }

    private byte[] joinRows(List<byte[]> rows, int totalBytes) throws IOException {
        if (StarRocksSinkOptions.StreamLoadFormat.CSV.equals((Object)this.sinkOptions.getStreamLoadFormat())) {
            byte[] lineDelimiter = StarRocksDelimiterParser.parse(this.sinkOptions.getSinkStreamLoadProperties().get("row_delimiter"), "\n").getBytes(StandardCharsets.UTF_8);
            ByteBuffer bos = ByteBuffer.allocate(totalBytes + rows.size() * lineDelimiter.length);
            for (byte[] row : rows) {
                bos.put(row);
                bos.put(lineDelimiter);
            }
            return bos.array();
        }
        if (StarRocksSinkOptions.StreamLoadFormat.JSON.equals((Object)this.sinkOptions.getStreamLoadFormat())) {
            ByteBuffer bos = ByteBuffer.allocate(totalBytes + (rows.isEmpty() ? 2 : rows.size() + 1));
            bos.put("[".getBytes(StandardCharsets.UTF_8));
            byte[] jsonDelimiter = ",".getBytes(StandardCharsets.UTF_8);
            boolean isFirstElement = true;
            for (byte[] row : rows) {
                if (!isFirstElement) {
                    bos.put(jsonDelimiter);
                }
                bos.put(row);
                isFirstElement = false;
            }
            bos.put("]".getBytes(StandardCharsets.UTF_8));
            return bos.array();
        }
        throw new RuntimeException("Failed to join rows data, unsupported `format` from stream load properties:");
    }

    /*
     * Exception decompiling
     */
    private Map<String, Object> doHttpPut(String loadUrl, String label, byte[] data) throws IOException {
        /*
         * This method has failed to decompile.  When submitting a bug report, please provide this stack trace, and (if you hold appropriate legal rights) the relevant class file.
         * 
         * org.benf.cfr.reader.util.ConfusedCFRException: Started 2 blocks at once
         *     at org.benf.cfr.reader.bytecode.analysis.opgraph.Op04StructuredStatement.getStartingBlocks(Op04StructuredStatement.java:412)
         *     at org.benf.cfr.reader.bytecode.analysis.opgraph.Op04StructuredStatement.buildNestedBlocks(Op04StructuredStatement.java:487)
         *     at org.benf.cfr.reader.bytecode.analysis.opgraph.Op03SimpleStatement.createInitialStructuredBlock(Op03SimpleStatement.java:736)
         *     at org.benf.cfr.reader.bytecode.CodeAnalyser.getAnalysisInner(CodeAnalyser.java:850)
         *     at org.benf.cfr.reader.bytecode.CodeAnalyser.getAnalysisOrWrapFail(CodeAnalyser.java:278)
         *     at org.benf.cfr.reader.bytecode.CodeAnalyser.getAnalysis(CodeAnalyser.java:201)
         *     at org.benf.cfr.reader.entities.attributes.AttributeCode.analyse(AttributeCode.java:94)
         *     at org.benf.cfr.reader.entities.Method.analyse(Method.java:531)
         *     at org.benf.cfr.reader.entities.ClassFile.analyseMid(ClassFile.java:1055)
         *     at org.benf.cfr.reader.entities.ClassFile.analyseTop(ClassFile.java:942)
         *     at org.benf.cfr.reader.Driver.doJarVersionTypes(Driver.java:257)
         *     at org.benf.cfr.reader.Driver.doJar(Driver.java:139)
         *     at org.benf.cfr.reader.CfrDriverImpl.analyse(CfrDriverImpl.java:76)
         *     at org.benf.cfr.reader.Main.main(Main.java:54)
         */
        throw new IllegalStateException("Decompilation failed");
    }

    private String getBasicAuthHeader(String username, String password) {
        String auth = username + ":" + password;
        byte[] encodedAuth = Base64.encodeBase64(auth.getBytes(StandardCharsets.UTF_8));
        return "Basic " + new String(encodedAuth);
    }

    private HttpEntity getHttpEntity(CloseableHttpResponse resp) {
        int code = resp.getStatusLine().getStatusCode();
        if (200 != code) {
            LOG.warn("Request failed with code:{}", (Object)code);
            return null;
        }
        HttpEntity respEntity = resp.getEntity();
        if (null == respEntity) {
            LOG.warn("Request failed with empty response.");
            return null;
        }
        return respEntity;
    }

    private static /* synthetic */ String lambda$doHttpPut$0(String f) {
        return String.format("`%s`", f.trim().replace("`", ""));
    }
}

