/*
 * Decompiled with CFR 0.152.
 */
package com.starrocks.connector.spark.rest;

import com.fasterxml.jackson.core.JsonParseException;
import com.fasterxml.jackson.databind.JsonMappingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.annotations.VisibleForTesting;
import com.starrocks.connector.spark.cfg.Settings;
import com.starrocks.connector.spark.exception.ConnectedFailedException;
import com.starrocks.connector.spark.exception.IllegalArgumentException;
import com.starrocks.connector.spark.exception.ShouldNeverHappenException;
import com.starrocks.connector.spark.exception.StarrocksException;
import com.starrocks.connector.spark.rest.PartitionDefinition;
import com.starrocks.connector.spark.rest.models.QueryPlan;
import com.starrocks.connector.spark.rest.models.Schema;
import com.starrocks.connector.spark.rest.models.Tablet;
import java.io.IOException;
import java.io.Serializable;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import org.apache.commons.lang3.StringUtils;
import org.apache.http.HttpEntity;
import org.apache.http.HttpRequest;
import org.apache.http.auth.AuthenticationException;
import org.apache.http.auth.Credentials;
import org.apache.http.auth.UsernamePasswordCredentials;
import org.apache.http.client.config.RequestConfig;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.client.methods.HttpRequestBase;
import org.apache.http.client.methods.HttpUriRequest;
import org.apache.http.client.protocol.HttpClientContext;
import org.apache.http.entity.StringEntity;
import org.apache.http.impl.auth.BasicScheme;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClients;
import org.apache.http.protocol.HttpContext;
import org.apache.http.util.EntityUtils;
import org.slf4j.Logger;

public class RestService
implements Serializable {
    public static final int REST_RESPONSE_STATUS_OK = 200;
    private static final String API_PREFIX = "/api";
    private static final String SCHEMA = "_schema";
    private static final String QUERY_PLAN = "_query_plan";

    private static String send(Settings cfg, HttpRequestBase request, Logger logger) throws ConnectedFailedException {
        int connectTimeout = cfg.getIntegerProperty("starrocks.request.connect.timeout.ms", 30000);
        int socketTimeout = cfg.getIntegerProperty("starrocks.request.read.timeout.ms", 30000);
        int retries = cfg.getIntegerProperty("starrocks.request.retries", 3);
        logger.trace("connect timeout set to '{}'. socket timeout set to '{}'. retries set to '{}'.", new Object[]{connectTimeout, socketTimeout, retries});
        RequestConfig requestConfig = RequestConfig.custom().setConnectTimeout(connectTimeout).setSocketTimeout(socketTimeout).build();
        request.setConfig(requestConfig);
        String user = cfg.getProperty("starrocks.request.auth.user", "");
        String password = cfg.getProperty("starrocks.request.auth.password", "");
        UsernamePasswordCredentials creds = new UsernamePasswordCredentials(user, password);
        HttpClientContext context = HttpClientContext.create();
        try {
            request.addHeader(new BasicScheme().authenticate((Credentials)creds, (HttpRequest)request, (HttpContext)context));
        }
        catch (AuthenticationException e) {
            logger.error("Connect to StarRocks {} failed.", (Object)request.getURI(), (Object)e);
            throw new ConnectedFailedException(request.getURI().toString(), e);
        }
        logger.info("Send request to StarRocks FE '{}' with user '{}'.", (Object)request.getURI(), (Object)user);
        IOException ex = null;
        String status = null;
        String responseEntity = null;
        for (int attempt = 0; attempt < retries; ++attempt) {
            CloseableHttpClient httpClient = HttpClients.createDefault();
            logger.debug("Attempt {} to request {}.", (Object)attempt, (Object)request.getURI());
            try {
                CloseableHttpResponse response = httpClient.execute((HttpUriRequest)request, (HttpContext)context);
                status = response.getStatusLine().toString();
                String string = responseEntity = response.getEntity() == null ? null : EntityUtils.toString((HttpEntity)response.getEntity(), (Charset)StandardCharsets.UTF_8);
                if (response.getStatusLine().getStatusCode() == 200) {
                    logger.trace("Success get response from StarRocks FE: {}, response is: {}.", (Object)request.getURI(), (Object)responseEntity);
                    return responseEntity;
                }
                logger.warn("Failed to get response from StarRocks FE {}, http status is {}, entity: {}", new Object[]{request.getURI(), status, responseEntity});
                continue;
            }
            catch (IOException e) {
                ex = e;
                logger.warn("Connect to StarRocks {} failed.", (Object)request.getURI(), (Object)e);
            }
        }
        logger.error("Connect to StarRocks {} failed.", (Object)request.getURI(), ex);
        throw new ConnectedFailedException(request.getURI().toString(), status, responseEntity, ex);
    }

    @VisibleForTesting
    public static String[] parseIdentifier(String tableIdentifier, Logger logger) throws IllegalArgumentException {
        logger.trace("Parse identifier '{}'.", (Object)tableIdentifier);
        if (StringUtils.isEmpty((CharSequence)tableIdentifier)) {
            logger.error("argument '{}' is illegal, value is '{}'.", (Object)"table.identifier", (Object)tableIdentifier);
            throw new IllegalArgumentException("table.identifier", tableIdentifier);
        }
        String[] identifier = tableIdentifier.split("\\.");
        if (identifier.length != 2) {
            logger.error("argument '{}' is illegal, value is '{}'.", (Object)"table.identifier", (Object)tableIdentifier);
            throw new IllegalArgumentException("table.identifier", tableIdentifier);
        }
        return identifier;
    }

    @VisibleForTesting
    static String randomEndpoint(String feNodes, Logger logger) throws IllegalArgumentException {
        logger.trace("Parse fenodes '{}'.", (Object)feNodes);
        if (StringUtils.isEmpty((CharSequence)feNodes)) {
            logger.error("argument '{}' is illegal, value is '{}'.", (Object)"fenodes", (Object)feNodes);
            throw new IllegalArgumentException("fenodes", feNodes);
        }
        List<String> nodes = Arrays.asList(feNodes.split(","));
        Collections.shuffle(nodes);
        return nodes.get(0).trim();
    }

    @VisibleForTesting
    static String getUriStr(Settings cfg, Logger logger) throws IllegalArgumentException {
        String[] identifier = RestService.parseIdentifier(cfg.getProperty("starrocks.table.identifier"), logger);
        String endPoint = RestService.randomEndpoint(cfg.getProperty("starrocks.fenodes"), logger);
        if (!endPoint.startsWith("http")) {
            endPoint = "http://" + endPoint;
        }
        return endPoint + API_PREFIX + "/" + identifier[0] + "/" + identifier[1] + "/";
    }

    public static Schema getSchema(Settings cfg, Logger logger) throws StarrocksException {
        logger.trace("Finding schema.");
        HttpGet httpGet = new HttpGet(RestService.getUriStr(cfg, logger) + SCHEMA);
        String response = RestService.send(cfg, (HttpRequestBase)httpGet, logger);
        logger.debug("Find schema response is '{}'.", (Object)response);
        return RestService.parseSchema(response, logger);
    }

    @VisibleForTesting
    public static Schema parseSchema(String response, Logger logger) throws StarrocksException {
        Schema schema;
        logger.trace("Parse response '{}' to schema.", (Object)response);
        ObjectMapper mapper = new ObjectMapper();
        try {
            schema = (Schema)mapper.readValue(response, Schema.class);
        }
        catch (JsonParseException e) {
            String errMsg = "StarRocks FE's response is not a json. res: " + response;
            logger.error(errMsg, (Throwable)e);
            throw new StarrocksException(errMsg, e);
        }
        catch (JsonMappingException e) {
            String errMsg = "StarRocks FE's response cannot map to schema. res: " + response;
            logger.error(errMsg, (Throwable)e);
            throw new StarrocksException(errMsg, e);
        }
        catch (IOException e) {
            String errMsg = "Parse StarRocks FE's response to json failed. res: " + response;
            logger.error(errMsg, (Throwable)e);
            throw new StarrocksException(errMsg, e);
        }
        if (schema == null) {
            logger.error("Should not come here.");
            throw new ShouldNeverHappenException();
        }
        if (schema.getStatus() != 200) {
            String errMsg = "StarRocks FE's response is not OK, status is " + schema.getStatus();
            logger.error(errMsg);
            throw new StarrocksException(errMsg);
        }
        logger.debug("Parsing schema result is '{}'.", (Object)schema);
        return schema;
    }

    public static List<PartitionDefinition> findPartitions(Settings cfg, Logger logger) throws StarrocksException {
        String[] tableIdentifiers = RestService.parseIdentifier(cfg.getProperty("starrocks.table.identifier"), logger);
        String sql = "select " + cfg.getProperty("starrocks.read.field", "*") + " from `" + tableIdentifiers[0] + "`.`" + tableIdentifiers[1] + "`";
        if (!StringUtils.isEmpty((CharSequence)cfg.getProperty("starrocks.filter.query"))) {
            sql = sql + " where " + cfg.getProperty("starrocks.filter.query");
        }
        logger.debug("Query SQL Sending to StarRocks FE is: '{}'.", (Object)sql);
        HttpPost httpPost = new HttpPost(RestService.getUriStr(cfg, logger) + QUERY_PLAN);
        String entity = "{\"sql\": \"" + sql + "\"}";
        logger.debug("Post body Sending to StarRocks FE is: '{}'.", (Object)entity);
        StringEntity stringEntity = new StringEntity(entity, StandardCharsets.UTF_8);
        stringEntity.setContentEncoding("UTF-8");
        stringEntity.setContentType("application/json");
        httpPost.setEntity((HttpEntity)stringEntity);
        String resStr = RestService.send(cfg, (HttpRequestBase)httpPost, logger);
        logger.debug("Find partition response is '{}'.", (Object)resStr);
        QueryPlan queryPlan = RestService.getQueryPlan(resStr, logger);
        Map<String, List<Long>> be2Tablets = RestService.selectBeForTablet(queryPlan, logger);
        return RestService.tabletsMapToPartition(cfg, be2Tablets, queryPlan.getOpaqued_query_plan(), tableIdentifiers[0], tableIdentifiers[1], logger);
    }

    @VisibleForTesting
    static QueryPlan getQueryPlan(String response, Logger logger) throws StarrocksException {
        QueryPlan queryPlan;
        ObjectMapper mapper = new ObjectMapper();
        try {
            queryPlan = (QueryPlan)mapper.readValue(response, QueryPlan.class);
        }
        catch (JsonParseException e) {
            String errMsg = "StarRocks FE's response is not a json. res: " + response;
            logger.error(errMsg, (Throwable)e);
            throw new StarrocksException(errMsg, e);
        }
        catch (JsonMappingException e) {
            String errMsg = "StarRocks FE's response cannot map to schema. res: " + response;
            logger.error(errMsg, (Throwable)e);
            throw new StarrocksException(errMsg, e);
        }
        catch (IOException e) {
            String errMsg = "Parse StarRocks FE's response to json failed. res: " + response;
            logger.error(errMsg, (Throwable)e);
            throw new StarrocksException(errMsg, e);
        }
        if (queryPlan == null) {
            logger.error("Should not come here.");
            throw new ShouldNeverHappenException();
        }
        if (queryPlan.getStatus() != 200) {
            String errMsg = "StarRocks FE's response is not OK, status is " + queryPlan.getStatus();
            logger.error(errMsg);
            throw new StarrocksException(errMsg);
        }
        logger.debug("Parsing partition result is '{}'.", (Object)queryPlan);
        return queryPlan;
    }

    @VisibleForTesting
    static Map<String, List<Long>> selectBeForTablet(QueryPlan queryPlan, Logger logger) throws StarrocksException {
        HashMap<String, List<Long>> be2Tablets = new HashMap<String, List<Long>>();
        for (Map.Entry<String, Tablet> part : queryPlan.getPartitions().entrySet()) {
            long tabletId;
            logger.debug("Parse tablet info: '{}'.", part);
            try {
                tabletId = Long.parseLong(part.getKey());
            }
            catch (NumberFormatException e) {
                String errMsg = "Parse tablet id '" + part.getKey() + "' to long failed.";
                logger.error(errMsg, (Throwable)e);
                throw new StarrocksException(errMsg, e);
            }
            String target = null;
            int tabletCount = Integer.MAX_VALUE;
            for (String candidate : part.getValue().getRoutings()) {
                logger.trace("Evaluate StarRocks BE '{}' to tablet '{}'.", (Object)candidate, (Object)tabletId);
                if (!be2Tablets.containsKey(candidate)) {
                    logger.debug("Choice a new StarRocks BE '{}' for tablet '{}'.", (Object)candidate, (Object)tabletId);
                    ArrayList tablets = new ArrayList();
                    be2Tablets.put(candidate, tablets);
                    target = candidate;
                    break;
                }
                if (((List)be2Tablets.get(candidate)).size() >= tabletCount) continue;
                target = candidate;
                tabletCount = ((List)be2Tablets.get(candidate)).size();
                logger.debug("Current candidate StarRocks BE to tablet '{}' is '{}' with tablet count {}.", new Object[]{tabletId, target, tabletCount});
            }
            if (target == null) {
                String errMsg = "Cannot choice StarRocks BE for tablet " + tabletId;
                logger.error(errMsg);
                throw new StarrocksException(errMsg);
            }
            logger.debug("Choice StarRocks BE '{}' for tablet '{}'.", target, (Object)tabletId);
            ((List)be2Tablets.get(target)).add(tabletId);
        }
        return be2Tablets;
    }

    @VisibleForTesting
    static int tabletCountLimitForOnePartition(Settings cfg, Logger logger) {
        int tabletsSize = Integer.MAX_VALUE;
        if (cfg.getProperty("starrocks.request.tablet.size") != null) {
            try {
                tabletsSize = Integer.parseInt(cfg.getProperty("starrocks.request.tablet.size"));
            }
            catch (NumberFormatException e) {
                logger.warn("Parse '{}' to number failed. Original string is '{}'.", (Object)"starrocks.request.tablet.size", (Object)cfg.getProperty("starrocks.request.tablet.size"));
            }
        }
        if (tabletsSize < 1) {
            logger.warn("{} is less than {}, set to default value {}.", new Object[]{"starrocks.request.tablet.size", 1, 1});
            tabletsSize = 1;
        }
        logger.debug("Tablet size is set to {}.", (Object)tabletsSize);
        return tabletsSize;
    }

    @VisibleForTesting
    static List<PartitionDefinition> tabletsMapToPartition(Settings cfg, Map<String, List<Long>> be2Tablets, String opaquedQueryPlan, String database, String table, Logger logger) throws IllegalArgumentException {
        int tabletsSize = RestService.tabletCountLimitForOnePartition(cfg, logger);
        ArrayList<PartitionDefinition> partitions = new ArrayList<PartitionDefinition>();
        for (Map.Entry<String, List<Long>> beInfo : be2Tablets.entrySet()) {
            logger.debug("Generate partition with beInfo: '{}'.", beInfo);
            HashSet tabletSet = new HashSet(beInfo.getValue());
            beInfo.getValue().clear();
            beInfo.getValue().addAll(tabletSet);
            for (int first = 0; first < beInfo.getValue().size(); first += tabletsSize) {
                HashSet<Long> partitionTablets = new HashSet<Long>(beInfo.getValue().subList(first, Math.min(beInfo.getValue().size(), first + tabletsSize)));
                PartitionDefinition partitionDefinition = new PartitionDefinition(database, table, cfg, beInfo.getKey(), partitionTablets, opaquedQueryPlan);
                logger.debug("Generate one PartitionDefinition '{}'.", (Object)partitionDefinition);
                partitions.add(partitionDefinition);
            }
        }
        return partitions;
    }
}

