/*
 * Decompiled with CFR 0.152.
 */
package org.apache.seatunnel.connectors.seatunnel.elasticsearch.client;

import java.io.Closeable;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.function.Function;
import java.util.stream.Collectors;
import javax.net.ssl.SSLContext;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.http.HttpHost;
import org.apache.http.auth.AuthScope;
import org.apache.http.auth.UsernamePasswordCredentials;
import org.apache.http.conn.ssl.NoopHostnameVerifier;
import org.apache.http.conn.ssl.TrustAllStrategy;
import org.apache.http.impl.client.BasicCredentialsProvider;
import org.apache.http.ssl.SSLContexts;
import org.apache.http.util.Asserts;
import org.apache.http.util.EntityUtils;
import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import org.apache.seatunnel.api.table.converter.BasicTypeDefine;
import org.apache.seatunnel.common.exception.SeaTunnelErrorCode;
import org.apache.seatunnel.common.utils.JsonUtils;
import org.apache.seatunnel.connectors.seatunnel.elasticsearch.client.EsType;
import org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.EsClusterConnectionConfig;
import org.apache.seatunnel.connectors.seatunnel.elasticsearch.dto.BulkResponse;
import org.apache.seatunnel.connectors.seatunnel.elasticsearch.dto.ElasticsearchClusterInfo;
import org.apache.seatunnel.connectors.seatunnel.elasticsearch.dto.source.IndexDocsCount;
import org.apache.seatunnel.connectors.seatunnel.elasticsearch.dto.source.ScrollResult;
import org.apache.seatunnel.connectors.seatunnel.elasticsearch.exception.ElasticsearchConnectorErrorCode;
import org.apache.seatunnel.connectors.seatunnel.elasticsearch.exception.ElasticsearchConnectorException;
import org.apache.seatunnel.connectors.seatunnel.elasticsearch.util.SSLUtils;
import org.apache.seatunnel.shade.com.fasterxml.jackson.databind.JsonNode;
import org.apache.seatunnel.shade.com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.seatunnel.shade.com.fasterxml.jackson.databind.node.ArrayNode;
import org.apache.seatunnel.shade.com.fasterxml.jackson.databind.node.ObjectNode;
import org.apache.seatunnel.shade.com.fasterxml.jackson.databind.node.TextNode;
import org.elasticsearch.client.Request;
import org.elasticsearch.client.Response;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestClientBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class EsRestClient
implements Closeable {
    private static final Logger log = LoggerFactory.getLogger(EsRestClient.class);
    private static final int CONNECTION_REQUEST_TIMEOUT = 10000;
    private static final int SOCKET_TIMEOUT = 300000;
    private final RestClient restClient;

    private EsRestClient(RestClient restClient) {
        this.restClient = restClient;
    }

    public static EsRestClient createInstance(ReadonlyConfig config) {
        List hosts = (List)config.get(EsClusterConnectionConfig.HOSTS);
        Optional username = config.getOptional(EsClusterConnectionConfig.USERNAME);
        Optional password = config.getOptional(EsClusterConnectionConfig.PASSWORD);
        Optional keystorePath = Optional.empty();
        Optional keystorePassword = Optional.empty();
        Optional truststorePath = Optional.empty();
        Optional truststorePassword = Optional.empty();
        boolean tlsVerifyCertificate = (Boolean)config.get(EsClusterConnectionConfig.TLS_VERIFY_CERTIFICATE);
        if (tlsVerifyCertificate) {
            keystorePath = config.getOptional(EsClusterConnectionConfig.TLS_KEY_STORE_PATH);
            keystorePassword = config.getOptional(EsClusterConnectionConfig.TLS_KEY_STORE_PASSWORD);
            truststorePath = config.getOptional(EsClusterConnectionConfig.TLS_TRUST_STORE_PATH);
            truststorePassword = config.getOptional(EsClusterConnectionConfig.TLS_TRUST_STORE_PASSWORD);
        }
        boolean tlsVerifyHostnames = (Boolean)config.get(EsClusterConnectionConfig.TLS_VERIFY_HOSTNAME);
        return EsRestClient.createInstance(hosts, username, password, tlsVerifyCertificate, tlsVerifyHostnames, keystorePath, keystorePassword, truststorePath, truststorePassword);
    }

    public static EsRestClient createInstance(List<String> hosts, Optional<String> username, Optional<String> password, boolean tlsVerifyCertificate, boolean tlsVerifyHostnames, Optional<String> keystorePath, Optional<String> keystorePassword, Optional<String> truststorePath, Optional<String> truststorePassword) {
        RestClientBuilder restClientBuilder = EsRestClient.getRestClientBuilder(hosts, username, password, tlsVerifyCertificate, tlsVerifyHostnames, keystorePath, keystorePassword, truststorePath, truststorePassword);
        return new EsRestClient(restClientBuilder.build());
    }

    private static RestClientBuilder getRestClientBuilder(List<String> hosts, Optional<String> username, Optional<String> password, boolean tlsVerifyCertificate, boolean tlsVerifyHostnames, Optional<String> keystorePath, Optional<String> keystorePassword, Optional<String> truststorePath, Optional<String> truststorePassword) {
        HttpHost[] httpHosts = new HttpHost[hosts.size()];
        for (int i = 0; i < hosts.size(); ++i) {
            httpHosts[i] = HttpHost.create(hosts.get(i));
        }
        RestClientBuilder restClientBuilder = RestClient.builder(httpHosts).setRequestConfigCallback(requestConfigBuilder -> requestConfigBuilder.setConnectionRequestTimeout(10000).setSocketTimeout(300000));
        restClientBuilder.setHttpClientConfigCallback(httpClientBuilder -> {
            if (username.isPresent()) {
                BasicCredentialsProvider credentialsProvider = new BasicCredentialsProvider();
                credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials((String)username.get(), (String)password.get()));
                httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider);
            }
            try {
                Object sslContext;
                if (tlsVerifyCertificate) {
                    sslContext = SSLUtils.buildSSLContext(keystorePath, keystorePassword, truststorePath, truststorePassword);
                    ((Optional)sslContext).ifPresent(httpClientBuilder::setSSLContext);
                } else {
                    sslContext = SSLContexts.custom().loadTrustMaterial(new TrustAllStrategy()).build();
                    httpClientBuilder.setSSLContext((SSLContext)sslContext);
                }
                if (!tlsVerifyHostnames) {
                    httpClientBuilder.setSSLHostnameVerifier(NoopHostnameVerifier.INSTANCE);
                }
            }
            catch (Exception e) {
                throw new RuntimeException(e);
            }
            return httpClientBuilder;
        });
        return restClientBuilder;
    }

    public BulkResponse bulk(String requestBody) {
        Request request = new Request("POST", "/_bulk");
        request.setJsonEntity(requestBody);
        try {
            Response response = this.restClient.performRequest(request);
            if (response == null) {
                throw new ElasticsearchConnectorException((SeaTunnelErrorCode)ElasticsearchConnectorErrorCode.BULK_RESPONSE_ERROR, "bulk es Response is null");
            }
            if (response.getStatusLine().getStatusCode() == 200) {
                ObjectMapper objectMapper = new ObjectMapper();
                String entity = EntityUtils.toString(response.getEntity());
                JsonNode json = objectMapper.readTree(entity);
                int took = json.get("took").asInt();
                boolean errors = json.get("errors").asBoolean();
                return new BulkResponse(errors, took, entity);
            }
            throw new ElasticsearchConnectorException((SeaTunnelErrorCode)ElasticsearchConnectorErrorCode.BULK_RESPONSE_ERROR, String.format("bulk es response status code=%d,request boy=%s", response.getStatusLine().getStatusCode(), requestBody));
        }
        catch (IOException e) {
            throw new ElasticsearchConnectorException(ElasticsearchConnectorErrorCode.BULK_RESPONSE_ERROR, String.format("bulk es error,request boy=%s", requestBody), e);
        }
    }

    public ElasticsearchClusterInfo getClusterInfo() {
        Request request = new Request("GET", "/");
        try {
            Response response = this.restClient.performRequest(request);
            String result = EntityUtils.toString(response.getEntity());
            ObjectMapper objectMapper = new ObjectMapper();
            JsonNode jsonNode = objectMapper.readTree(result);
            JsonNode versionNode = jsonNode.get("version");
            return ElasticsearchClusterInfo.builder().clusterVersion(versionNode.get("number").asText()).distribution(Optional.ofNullable(versionNode.get("distribution")).map(JsonNode::asText).orElse(null)).build();
        }
        catch (IOException e) {
            throw new ElasticsearchConnectorException(ElasticsearchConnectorErrorCode.GET_ES_VERSION_FAILED, "fail to get elasticsearch version.", e);
        }
    }

    @Override
    public void close() {
        try {
            this.restClient.close();
        }
        catch (IOException e) {
            log.warn("close elasticsearch connection error", (Throwable)e);
        }
    }

    public ScrollResult searchByScroll(String index, List<String> source, Map<String, Object> query, String scrollTime, int scrollSize) {
        HashMap<String, Object> param = new HashMap<String, Object>();
        param.put("query", query);
        param.put("_source", source);
        param.put("sort", new String[]{"_doc"});
        param.put("size", scrollSize);
        String endpoint = "/" + index + "/_search?scroll=" + scrollTime;
        return this.getDocsFromScrollRequest(endpoint, JsonUtils.toJsonString(param));
    }

    public ScrollResult searchWithScrollId(String scrollId, String scrollTime) {
        HashMap<String, String> param = new HashMap<String, String>();
        param.put("scroll_id", scrollId);
        param.put("scroll", scrollTime);
        return this.getDocsFromScrollRequest("/_search/scroll", JsonUtils.toJsonString(param));
    }

    private ScrollResult getDocsFromScrollRequest(String endpoint, String requestBody) {
        Request request = new Request("POST", endpoint);
        request.setJsonEntity(requestBody);
        try {
            Response response = this.restClient.performRequest(request);
            if (response == null) {
                throw new ElasticsearchConnectorException((SeaTunnelErrorCode)ElasticsearchConnectorErrorCode.SCROLL_REQUEST_ERROR, "POST " + endpoint + " response null");
            }
            if (response.getStatusLine().getStatusCode() == 200) {
                int successful;
                String entity = EntityUtils.toString(response.getEntity());
                ObjectNode responseJson = JsonUtils.parseObject((String)entity);
                JsonNode shards = responseJson.get("_shards");
                int totalShards = shards.get("total").intValue();
                Asserts.check(totalShards == (successful = shards.get("successful").intValue()), String.format("POST %s,total shards(%d)!= successful shards(%d)", endpoint, totalShards, successful));
                return this.getDocsFromScrollResponse(responseJson);
            }
            throw new ElasticsearchConnectorException((SeaTunnelErrorCode)ElasticsearchConnectorErrorCode.SCROLL_REQUEST_ERROR, String.format("POST %s response status code=%d,request boy=%s", endpoint, response.getStatusLine().getStatusCode(), requestBody));
        }
        catch (IOException e) {
            throw new ElasticsearchConnectorException(ElasticsearchConnectorErrorCode.SCROLL_REQUEST_ERROR, String.format("POST %s error,request boy=%s", endpoint, requestBody), e);
        }
    }

    private ScrollResult getDocsFromScrollResponse(ObjectNode responseJson) {
        ScrollResult scrollResult = new ScrollResult();
        String scrollId = responseJson.get("_scroll_id").asText();
        scrollResult.setScrollId(scrollId);
        JsonNode hitsNode = responseJson.get("hits").get("hits");
        ArrayList<Map<String, Object>> docs = new ArrayList<Map<String, Object>>(hitsNode.size());
        scrollResult.setDocs(docs);
        for (JsonNode jsonNode : hitsNode) {
            HashMap<String, String> doc = new HashMap<String, String>();
            doc.put("_index", jsonNode.get("_index").textValue());
            doc.put("_id", jsonNode.get("_id").textValue());
            JsonNode source = jsonNode.get("_source");
            Iterator iterator = source.fields();
            while (iterator.hasNext()) {
                Map.Entry entry = (Map.Entry)iterator.next();
                String fieldName = (String)entry.getKey();
                if (entry.getValue() instanceof TextNode) {
                    doc.put(fieldName, ((JsonNode)entry.getValue()).textValue());
                    continue;
                }
                doc.put(fieldName, (String)entry.getValue());
            }
            docs.add(doc);
        }
        return scrollResult;
    }

    public boolean checkIndexExist(String index) {
        Request request = new Request("HEAD", "/" + index);
        try {
            Response response = this.restClient.performRequest(request);
            int statusCode = response.getStatusLine().getStatusCode();
            return statusCode == 200;
        }
        catch (Exception ex) {
            throw new ElasticsearchConnectorException((SeaTunnelErrorCode)ElasticsearchConnectorErrorCode.CHECK_INDEX_FAILED, ex);
        }
    }

    public List<IndexDocsCount> getIndexDocsCount(String index) {
        String endpoint = String.format("/_cat/indices/%s?h=index,docsCount&format=json", index);
        Request request = new Request("GET", endpoint);
        try {
            Response response = this.restClient.performRequest(request);
            if (response == null) {
                throw new ElasticsearchConnectorException((SeaTunnelErrorCode)ElasticsearchConnectorErrorCode.GET_INDEX_DOCS_COUNT_FAILED, "GET " + endpoint + " response null");
            }
            if (response.getStatusLine().getStatusCode() == 200) {
                String entity = EntityUtils.toString(response.getEntity());
                return JsonUtils.toList((String)entity, IndexDocsCount.class);
            }
            throw new ElasticsearchConnectorException((SeaTunnelErrorCode)ElasticsearchConnectorErrorCode.GET_INDEX_DOCS_COUNT_FAILED, String.format("GET %s response status code=%d", endpoint, response.getStatusLine().getStatusCode()));
        }
        catch (IOException ex) {
            throw new ElasticsearchConnectorException((SeaTunnelErrorCode)ElasticsearchConnectorErrorCode.GET_INDEX_DOCS_COUNT_FAILED, ex);
        }
    }

    public List<String> listIndex() {
        String endpoint = "/_cat/indices?format=json";
        Request request = new Request("GET", endpoint);
        try {
            Response response = this.restClient.performRequest(request);
            if (response == null) {
                throw new ElasticsearchConnectorException((SeaTunnelErrorCode)ElasticsearchConnectorErrorCode.LIST_INDEX_FAILED, "GET " + endpoint + " response null");
            }
            if (response.getStatusLine().getStatusCode() == 200) {
                String entity = EntityUtils.toString(response.getEntity());
                return JsonUtils.toList((String)entity, Map.class).stream().map(map -> map.get("index").toString()).collect(Collectors.toList());
            }
            throw new ElasticsearchConnectorException((SeaTunnelErrorCode)ElasticsearchConnectorErrorCode.LIST_INDEX_FAILED, String.format("GET %s response status code=%d", endpoint, response.getStatusLine().getStatusCode()));
        }
        catch (IOException ex) {
            throw new ElasticsearchConnectorException((SeaTunnelErrorCode)ElasticsearchConnectorErrorCode.LIST_INDEX_FAILED, ex);
        }
    }

    public void createIndex(String indexName) {
        this.createIndex(indexName, null);
    }

    public void createIndex(String indexName, String mapping) {
        String endpoint = String.format("/%s", indexName);
        Request request = new Request("PUT", endpoint);
        if (StringUtils.isNotEmpty((CharSequence)mapping)) {
            request.setJsonEntity(mapping);
        }
        try {
            Response response = this.restClient.performRequest(request);
            if (response == null) {
                throw new ElasticsearchConnectorException((SeaTunnelErrorCode)ElasticsearchConnectorErrorCode.CREATE_INDEX_FAILED, "PUT " + endpoint + " response null");
            }
            if (response.getStatusLine().getStatusCode() != 200) {
                throw new ElasticsearchConnectorException((SeaTunnelErrorCode)ElasticsearchConnectorErrorCode.CREATE_INDEX_FAILED, String.format("PUT %s response status code=%d", endpoint, response.getStatusLine().getStatusCode()));
            }
        }
        catch (IOException ex) {
            throw new ElasticsearchConnectorException((SeaTunnelErrorCode)ElasticsearchConnectorErrorCode.CREATE_INDEX_FAILED, ex);
        }
    }

    public void dropIndex(String tableName) {
        String endpoint = String.format("/%s", tableName);
        Request request = new Request("DELETE", endpoint);
        try {
            Response response = this.restClient.performRequest(request);
            if (response == null) {
                throw new ElasticsearchConnectorException((SeaTunnelErrorCode)ElasticsearchConnectorErrorCode.DROP_INDEX_FAILED, "DELETE " + endpoint + " response null");
            }
            if (response.getStatusLine().getStatusCode() == 200) {
                return;
            }
            throw new ElasticsearchConnectorException((SeaTunnelErrorCode)ElasticsearchConnectorErrorCode.DROP_INDEX_FAILED, String.format("DELETE %s response status code=%d", endpoint, response.getStatusLine().getStatusCode()));
        }
        catch (IOException ex) {
            throw new ElasticsearchConnectorException((SeaTunnelErrorCode)ElasticsearchConnectorErrorCode.DROP_INDEX_FAILED, ex);
        }
    }

    public void clearIndexData(String indexName) {
        String endpoint = String.format("/%s/_delete_by_query", indexName);
        Request request = new Request("POST", endpoint);
        String jsonString = "{ \"query\": { \"match_all\": {} } }";
        request.setJsonEntity(jsonString);
        try {
            Response response = this.restClient.performRequest(request);
            if (response == null) {
                throw new ElasticsearchConnectorException((SeaTunnelErrorCode)ElasticsearchConnectorErrorCode.CLEAR_INDEX_DATA_FAILED, "POST " + endpoint + " response null");
            }
            if (response.getStatusLine().getStatusCode() == 200) {
                return;
            }
            throw new ElasticsearchConnectorException((SeaTunnelErrorCode)ElasticsearchConnectorErrorCode.CLEAR_INDEX_DATA_FAILED, String.format("POST %s response status code=%d", endpoint, response.getStatusLine().getStatusCode()));
        }
        catch (IOException ex) {
            throw new ElasticsearchConnectorException((SeaTunnelErrorCode)ElasticsearchConnectorErrorCode.CLEAR_INDEX_DATA_FAILED, ex);
        }
    }

    public Map<String, BasicTypeDefine<EsType>> getFieldTypeMapping(String index, List<String> source) {
        String endpoint = String.format("/%s/_mappings", index);
        Request request = new Request("GET", endpoint);
        Map<String, BasicTypeDefine<EsType>> mapping = new HashMap<String, BasicTypeDefine<EsType>>();
        try {
            Response response = this.restClient.performRequest(request);
            if (response == null) {
                throw new ElasticsearchConnectorException((SeaTunnelErrorCode)ElasticsearchConnectorErrorCode.GET_INDEX_DOCS_COUNT_FAILED, "GET " + endpoint + " response null");
            }
            if (response.getStatusLine().getStatusCode() != 200) {
                throw new ElasticsearchConnectorException((SeaTunnelErrorCode)ElasticsearchConnectorErrorCode.GET_INDEX_DOCS_COUNT_FAILED, String.format("GET %s response status code=%d", endpoint, response.getStatusLine().getStatusCode()));
            }
            String entity = EntityUtils.toString(response.getEntity());
            log.info(String.format("GET %s respnse=%s", endpoint, entity));
            ObjectNode responseJson = JsonUtils.parseObject((String)entity);
            Iterator it = responseJson.elements();
            while (it.hasNext()) {
                JsonNode indexProperty = (JsonNode)it.next();
                JsonNode mappingsProperty = indexProperty.get("mappings");
                if (mappingsProperty.has("mappingsProperty")) {
                    JsonNode properties = mappingsProperty.get("properties");
                    mapping = EsRestClient.getFieldTypeMappingFromProperties(properties, source);
                    continue;
                }
                for (JsonNode typeNode : mappingsProperty) {
                    JsonNode properties = typeNode.has("properties") ? typeNode.get("properties") : typeNode;
                    mapping.putAll(EsRestClient.getFieldTypeMappingFromProperties(properties, source));
                }
            }
        }
        catch (IOException ex) {
            throw new ElasticsearchConnectorException((SeaTunnelErrorCode)ElasticsearchConnectorErrorCode.GET_INDEX_DOCS_COUNT_FAILED, ex);
        }
        return mapping;
    }

    private static Map<String, BasicTypeDefine<EsType>> getFieldTypeMappingFromProperties(JsonNode properties, List<String> source) {
        HashMap<String, BasicTypeDefine<EsType>> allElasticSearchFieldTypeInfoMap = new HashMap<String, BasicTypeDefine<EsType>>();
        properties.fields().forEachRemaining(entry -> {
            String fieldName = (String)entry.getKey();
            JsonNode fieldProperty = (JsonNode)entry.getValue();
            if (fieldProperty.has("type")) {
                String type = fieldProperty.get("type").asText();
                BasicTypeDefine.BasicTypeDefineBuilder typeDefine = BasicTypeDefine.builder().name(fieldName).columnType(type).dataType(type);
                if (type.equalsIgnoreCase("aggregate_metric_double")) {
                    ArrayNode metrics = (ArrayNode)fieldProperty.get("metrics");
                    ArrayList<String> metricsList = new ArrayList<String>();
                    for (JsonNode node : metrics) {
                        metricsList.add(node.asText());
                    }
                    HashMap<String, Object> options = new HashMap<String, Object>();
                    options.put("metrics", metricsList);
                    typeDefine.nativeType((Object)new EsType(type, options));
                } else if (type.equalsIgnoreCase("alias")) {
                    String path = fieldProperty.get("path").asText();
                    HashMap<String, Object> options = new HashMap<String, Object>();
                    options.put("path", path);
                    typeDefine.nativeType((Object)new EsType(type, options));
                } else if (type.equalsIgnoreCase("dense_vector")) {
                    String elementType = fieldProperty.get("element_type") == null ? "float" : fieldProperty.get("element_type").asText();
                    HashMap<String, Object> options = new HashMap<String, Object>();
                    options.put("element_type", elementType);
                    typeDefine.nativeType((Object)new EsType(type, options));
                } else if (type.equalsIgnoreCase("date") || type.equalsIgnoreCase("date_nanos")) {
                    String format = fieldProperty.get("format") != null ? fieldProperty.get("format").asText() : "strict_date_optional_time_nanos||epoch_millis";
                    HashMap<String, Object> options = new HashMap<String, Object>();
                    options.put("format", format);
                    typeDefine.nativeType((Object)new EsType(type, options));
                } else {
                    typeDefine.nativeType((Object)new EsType(type, new HashMap<String, Object>()));
                }
                allElasticSearchFieldTypeInfoMap.put(fieldName, typeDefine.build());
            } else if (fieldProperty.has("properties")) {
                JsonNode propertiesNode = fieldProperty.get("properties");
                ArrayList<String> fields = new ArrayList<String>();
                propertiesNode.fieldNames().forEachRemaining(fields::add);
                Map<String, BasicTypeDefine<EsType>> subFieldTypeInfoMap = EsRestClient.getFieldTypeMappingFromProperties(propertiesNode, fields);
                BasicTypeDefine.BasicTypeDefineBuilder typeDefine = BasicTypeDefine.builder().name(fieldName).columnType("object").dataType("object");
                typeDefine.nativeType((Object)new EsType("object", subFieldTypeInfoMap));
                allElasticSearchFieldTypeInfoMap.put(fieldName, typeDefine.build());
            }
        });
        if (CollectionUtils.isEmpty(source)) {
            return allElasticSearchFieldTypeInfoMap;
        }
        allElasticSearchFieldTypeInfoMap.forEach((fieldName, fieldType) -> {
            BasicTypeDefine type;
            if (fieldType.getDataType().equalsIgnoreCase("alias") && (type = (BasicTypeDefine)allElasticSearchFieldTypeInfoMap.get(((EsType)fieldType.getNativeType()).getOptions().get("path"))) != null) {
                allElasticSearchFieldTypeInfoMap.put((String)fieldName, (BasicTypeDefine<EsType>)type);
            }
        });
        return source.stream().collect(Collectors.toMap(Function.identity(), fieldName -> {
            BasicTypeDefine fieldType = (BasicTypeDefine)allElasticSearchFieldTypeInfoMap.get(fieldName);
            if (fieldType == null) {
                log.warn("fail to get elasticsearch field {} mapping type,so give a default type text", fieldName);
                return BasicTypeDefine.builder().name(fieldName).columnType("text").dataType("text").build();
            }
            return fieldType;
        }));
    }
}

