/*
 * Decompiled with CFR 0.152.
 */
package org.apache.seatunnel.connectors.seatunnel.elasticsearch.source;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import org.apache.seatunnel.api.options.ConnectorCommonOptions;
import org.apache.seatunnel.api.source.Boundedness;
import org.apache.seatunnel.api.source.SeaTunnelSource;
import org.apache.seatunnel.api.source.SourceReader;
import org.apache.seatunnel.api.source.SourceSplitEnumerator;
import org.apache.seatunnel.api.source.SupportColumnProjection;
import org.apache.seatunnel.api.source.SupportParallelism;
import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.catalog.CatalogTableUtil;
import org.apache.seatunnel.api.table.catalog.Column;
import org.apache.seatunnel.api.table.catalog.PhysicalColumn;
import org.apache.seatunnel.api.table.catalog.SeaTunnelDataTypeConvertorUtil;
import org.apache.seatunnel.api.table.catalog.TableIdentifier;
import org.apache.seatunnel.api.table.catalog.TableSchema;
import org.apache.seatunnel.api.table.converter.BasicTypeDefine;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.common.exception.SeaTunnelErrorCode;
import org.apache.seatunnel.connectors.seatunnel.elasticsearch.catalog.ElasticSearchTypeConverter;
import org.apache.seatunnel.connectors.seatunnel.elasticsearch.client.EsRestClient;
import org.apache.seatunnel.connectors.seatunnel.elasticsearch.client.EsType;
import org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.ElasticsearchConfig;
import org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.ElasticsearchSourceOptions;
import org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.SearchTypeEnum;
import org.apache.seatunnel.connectors.seatunnel.elasticsearch.exception.ElasticsearchConnectorErrorCode;
import org.apache.seatunnel.connectors.seatunnel.elasticsearch.exception.ElasticsearchConnectorException;
import org.apache.seatunnel.connectors.seatunnel.elasticsearch.source.ElasticsearchSourceReader;
import org.apache.seatunnel.connectors.seatunnel.elasticsearch.source.ElasticsearchSourceSplit;
import org.apache.seatunnel.connectors.seatunnel.elasticsearch.source.ElasticsearchSourceSplitEnumerator;
import org.apache.seatunnel.connectors.seatunnel.elasticsearch.source.ElasticsearchSourceState;
import org.apache.seatunnel.shade.com.google.common.annotations.VisibleForTesting;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ElasticsearchSource
implements SeaTunnelSource<SeaTunnelRow, ElasticsearchSourceSplit, ElasticsearchSourceState>,
SupportParallelism,
SupportColumnProjection {
    private static final Logger log = LoggerFactory.getLogger(ElasticsearchSource.class);
    private final List<ElasticsearchConfig> elasticsearchConfigList;
    private final ReadonlyConfig connectionConfig;

    public ElasticsearchSource(ReadonlyConfig config) {
        this.connectionConfig = config;
        boolean multiSource = config.getOptional(ElasticsearchSourceOptions.INDEX_LIST).isPresent();
        boolean singleSource = config.getOptional(ElasticsearchSourceOptions.INDEX).isPresent();
        boolean sqlQuery = config.getOptional(ElasticsearchSourceOptions.SQL_QUERY).isPresent();
        if (SearchTypeEnum.SQL.equals(config.get(ElasticsearchSourceOptions.SEARCH_TYPE)) && !sqlQuery) {
            throw new ElasticsearchConnectorException((SeaTunnelErrorCode)ElasticsearchConnectorErrorCode.SOURCE_CONFIG_ERROR_02, ElasticsearchConnectorErrorCode.SOURCE_CONFIG_ERROR_02.getDescription());
        }
        if (multiSource && singleSource) {
            log.warn("Elasticsearch Source config warn: when both 'index' and 'index_list' are present in the configuration, only the 'index_list' configuration will take effect");
        }
        if (!multiSource && !singleSource) {
            throw new ElasticsearchConnectorException((SeaTunnelErrorCode)ElasticsearchConnectorErrorCode.SOURCE_CONFIG_ERROR_01, ElasticsearchConnectorErrorCode.SOURCE_CONFIG_ERROR_01.getDescription());
        }
        this.elasticsearchConfigList = multiSource ? this.createMultiSource(config) : Collections.singletonList(this.parseOneIndexQueryConfig(config));
    }

    private List<ElasticsearchConfig> createMultiSource(ReadonlyConfig config) {
        List configMaps = (List)config.get(ElasticsearchSourceOptions.INDEX_LIST);
        List configList = configMaps.stream().map(ReadonlyConfig::fromMap).collect(Collectors.toList());
        ArrayList<ElasticsearchConfig> elasticsearchConfigList = new ArrayList<ElasticsearchConfig>(configList.size());
        for (ReadonlyConfig readonlyConfig : configList) {
            ElasticsearchConfig elasticsearchConfig = this.parseOneIndexQueryConfig(readonlyConfig);
            elasticsearchConfigList.add(elasticsearchConfig);
        }
        return elasticsearchConfigList;
    }

    private ElasticsearchConfig parseOneIndexQueryConfig(ReadonlyConfig readonlyConfig) {
        List<String> source;
        CatalogTable catalogTable;
        Map query = (Map)readonlyConfig.get(ElasticsearchSourceOptions.QUERY);
        String index = (String)readonlyConfig.get(ElasticsearchSourceOptions.INDEX);
        if (readonlyConfig.getOptional(ConnectorCommonOptions.SCHEMA).isPresent()) {
            log.warn("The schema config in ElasticSearch source/sink is deprecated, please use source config instead!");
            catalogTable = CatalogTableUtil.buildWithConfig((ReadonlyConfig)readonlyConfig);
            source = Arrays.asList(catalogTable.getSeaTunnelRowType().getFieldNames());
        } else {
            source = (List<String>)readonlyConfig.get(ElasticsearchSourceOptions.SOURCE);
            Map arrayColumn = (Map)readonlyConfig.get(ElasticsearchSourceOptions.ARRAY_COLUMN);
            Map<String, BasicTypeDefine<EsType>> esFieldType = SearchTypeEnum.SQL.equals(readonlyConfig.get(ElasticsearchSourceOptions.SEARCH_TYPE)) ? this.getSqlFieldTypeMapping((String)readonlyConfig.get(ElasticsearchSourceOptions.SQL_QUERY), source) : this.getFieldTypeMapping(index, source);
            if (CollectionUtils.isEmpty(source)) {
                source = new ArrayList<String>(esFieldType.keySet());
            }
            SeaTunnelDataType[] fieldTypes = ElasticsearchSource.getSeaTunnelDataType(esFieldType, source);
            TableSchema.Builder builder = TableSchema.builder();
            for (int i = 0; i < source.size(); ++i) {
                String key = source.get(i);
                String sourceType = esFieldType.get(key).getDataType();
                if (arrayColumn.containsKey(key)) {
                    String value = (String)arrayColumn.get(key);
                    SeaTunnelDataType dataType = SeaTunnelDataTypeConvertorUtil.deserializeSeaTunnelDataType((String)key, (String)value);
                    builder.column((Column)PhysicalColumn.of((String)key, (SeaTunnelDataType)dataType, (Long)0L, (boolean)true, null, null, (String)sourceType, null));
                    continue;
                }
                builder.column((Column)PhysicalColumn.of((String)source.get(i), (SeaTunnelDataType)fieldTypes[i], (Long)0L, (boolean)true, null, null, (String)sourceType, null));
            }
            catalogTable = CatalogTable.of((TableIdentifier)TableIdentifier.of((String)"elasticsearch", null, (String)index), (TableSchema)builder.build(), Collections.emptyMap(), Collections.emptyList(), (String)"");
        }
        SearchTypeEnum searchType = (SearchTypeEnum)((Object)readonlyConfig.get(ElasticsearchSourceOptions.SEARCH_TYPE));
        String sqlQuery = (String)readonlyConfig.get(ElasticsearchSourceOptions.SQL_QUERY);
        String scrollTime = (String)readonlyConfig.get(ElasticsearchSourceOptions.SCROLL_TIME);
        int scrollSize = (Integer)readonlyConfig.get(ElasticsearchSourceOptions.SCROLL_SIZE);
        ElasticsearchConfig elasticsearchConfig = new ElasticsearchConfig();
        elasticsearchConfig.setSource(source);
        elasticsearchConfig.setCatalogTable(catalogTable);
        elasticsearchConfig.setQuery(query);
        elasticsearchConfig.setScrollTime(scrollTime);
        elasticsearchConfig.setScrollSize(scrollSize);
        elasticsearchConfig.setIndex(index);
        elasticsearchConfig.setCatalogTable(catalogTable);
        elasticsearchConfig.setSqlQuery(sqlQuery);
        elasticsearchConfig.setSearchType(searchType);
        return elasticsearchConfig;
    }

    public String getPluginName() {
        return "Elasticsearch";
    }

    public Boundedness getBoundedness() {
        return Boundedness.BOUNDED;
    }

    public List<CatalogTable> getProducedCatalogTables() {
        return this.elasticsearchConfigList.stream().map(ElasticsearchConfig::getCatalogTable).collect(Collectors.toList());
    }

    public SourceReader<SeaTunnelRow, ElasticsearchSourceSplit> createReader(SourceReader.Context readerContext) {
        return new ElasticsearchSourceReader(readerContext, this.connectionConfig);
    }

    public SourceSplitEnumerator<ElasticsearchSourceSplit, ElasticsearchSourceState> createEnumerator(SourceSplitEnumerator.Context<ElasticsearchSourceSplit> enumeratorContext) {
        return new ElasticsearchSourceSplitEnumerator(enumeratorContext, this.connectionConfig, this.elasticsearchConfigList);
    }

    public SourceSplitEnumerator<ElasticsearchSourceSplit, ElasticsearchSourceState> restoreEnumerator(SourceSplitEnumerator.Context<ElasticsearchSourceSplit> enumeratorContext, ElasticsearchSourceState sourceState) {
        return new ElasticsearchSourceSplitEnumerator(enumeratorContext, sourceState, this.connectionConfig, this.elasticsearchConfigList);
    }

    @VisibleForTesting
    public static SeaTunnelDataType[] getSeaTunnelDataType(Map<String, BasicTypeDefine<EsType>> esFieldType, List<String> source) {
        SeaTunnelDataType[] fieldTypes = new SeaTunnelDataType[source.size()];
        for (int i = 0; i < source.size(); ++i) {
            SeaTunnelDataType seaTunnelDataType;
            BasicTypeDefine<EsType> esType = esFieldType.get(source.get(i));
            fieldTypes[i] = seaTunnelDataType = ElasticSearchTypeConverter.INSTANCE.convert(esType).getDataType();
        }
        return fieldTypes;
    }

    private Map<String, BasicTypeDefine<EsType>> getSqlFieldTypeMapping(String query, List<String> source) {
        try (EsRestClient esRestClient = EsRestClient.createInstance(this.connectionConfig);){
            Map<String, BasicTypeDefine<EsType>> map = esRestClient.getSqlMapping(query, source);
            return map;
        }
    }

    private Map<String, BasicTypeDefine<EsType>> getFieldTypeMapping(String index, List<String> source) {
        try (EsRestClient esRestClient = EsRestClient.createInstance(this.connectionConfig);){
            Map<String, BasicTypeDefine<EsType>> map = esRestClient.getFieldTypeMapping(index, source);
            return map;
        }
    }
}

