/*
 * Decompiled with CFR 0.152.
 */
package org.apache.seatunnel.connectors.seatunnel.jdbc.source;

import com.google.auto.service.AutoService;
import java.io.Serializable;
import java.math.BigDecimal;
import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.HashMap;
import java.util.Optional;
import org.apache.seatunnel.api.common.PrepareFailException;
import org.apache.seatunnel.api.configuration.Option;
import org.apache.seatunnel.api.configuration.util.OptionRule;
import org.apache.seatunnel.api.source.SeaTunnelSource;
import org.apache.seatunnel.api.source.SourceSplit;
import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.catalog.PrimaryKey;
import org.apache.seatunnel.api.table.catalog.TableSchema;
import org.apache.seatunnel.api.table.connector.TableSource;
import org.apache.seatunnel.api.table.factory.Factory;
import org.apache.seatunnel.api.table.factory.TableFactoryContext;
import org.apache.seatunnel.api.table.factory.TableSourceFactory;
import org.apache.seatunnel.api.table.type.BasicType;
import org.apache.seatunnel.api.table.type.DecimalType;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.common.constants.PluginType;
import org.apache.seatunnel.common.exception.CommonErrorCode;
import org.apache.seatunnel.common.exception.SeaTunnelErrorCode;
import org.apache.seatunnel.connectors.seatunnel.jdbc.config.JdbcOptions;
import org.apache.seatunnel.connectors.seatunnel.jdbc.config.JdbcSourceConfig;
import org.apache.seatunnel.connectors.seatunnel.jdbc.exception.JdbcConnectorException;
import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.JdbcInputFormat;
import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.connection.JdbcConnectionProvider;
import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.connection.SimpleJdbcConnectionProvider;
import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialect;
import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialectLoader;
import org.apache.seatunnel.connectors.seatunnel.jdbc.source.JdbcSource;
import org.apache.seatunnel.connectors.seatunnel.jdbc.source.PartitionParameter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@AutoService(value={Factory.class})
public class JdbcSourceFactory
implements TableSourceFactory {
    private static final Logger log = LoggerFactory.getLogger(JdbcSourceFactory.class);

    public String factoryIdentifier() {
        return "Jdbc";
    }

    public <T, SplitT extends SourceSplit, StateT extends Serializable> TableSource<T, SplitT, StateT> createSource(TableFactoryContext context) {
        CatalogTable catalogTable = context.getCatalogTable();
        JdbcSourceConfig config = JdbcSourceConfig.of(context.getOptions());
        SimpleJdbcConnectionProvider connectionProvider = new SimpleJdbcConnectionProvider(config.getJdbcConnectionConfig());
        String querySql = config.getQuery();
        JdbcDialect dialect = JdbcDialectLoader.load(config.getJdbcConnectionConfig().getUrl(), config.getJdbcConnectionConfig().getCompatibleMode());
        TableSchema tableSchema = catalogTable.getTableSchema();
        SeaTunnelRowType rowType = tableSchema.toPhysicalRowDataType();
        Optional<PartitionParameter> partitionParameter = JdbcSourceFactory.createPartitionParameter(config, tableSchema, connectionProvider);
        JdbcInputFormat inputFormat = new JdbcInputFormat(connectionProvider, dialect, rowType, querySql, config.getFetchSize(), config.getJdbcConnectionConfig().isAutoCommit());
        return () -> new JdbcSource(config, rowType, dialect, inputFormat, partitionParameter.orElse(null), connectionProvider, partitionParameter.isPresent() ? JdbcSourceFactory.obtainPartitionSql(dialect, (PartitionParameter)partitionParameter.get(), querySql) : querySql);
    }

    static String obtainPartitionSql(JdbcDialect dialect, PartitionParameter partitionParameter, String nativeSql) {
        if (JdbcSourceFactory.isStringType(partitionParameter.getDataType())) {
            return String.format("SELECT * FROM (%s) tt where %s = ?", nativeSql, dialect.hashModForField(partitionParameter.getPartitionColumnName(), partitionParameter.getPartitionNumber()));
        }
        return String.format("SELECT * FROM (%s) tt where %s >= ? AND %s <= ?", nativeSql, partitionParameter.getPartitionColumnName(), partitionParameter.getPartitionColumnName());
    }

    public static Optional<PartitionParameter> createPartitionParameter(JdbcSourceConfig config, TableSchema tableSchema, JdbcConnectionProvider connectionProvider) {
        Optional<String> partitionColumnOptional = JdbcSourceFactory.getPartitionColumn(config, tableSchema);
        if (partitionColumnOptional.isPresent()) {
            String partitionColumn = partitionColumnOptional.get();
            SeaTunnelDataType<?> dataType = JdbcSourceFactory.validationPartitionColumn(partitionColumn, tableSchema.toPhysicalRowDataType());
            return Optional.of(JdbcSourceFactory.createPartitionParameter(config, partitionColumn, dataType, connectionProvider.getConnection()));
        }
        log.info("The partition_column parameter is not configured, and the source parallelism is set to 1");
        return Optional.empty();
    }

    static PartitionParameter createPartitionParameter(JdbcSourceConfig config, String columnName, SeaTunnelDataType<?> dataType, Connection connection) {
        BigDecimal max = null;
        BigDecimal min = null;
        if (dataType.equals((Object)BasicType.STRING_TYPE)) {
            return new PartitionParameter(columnName, dataType, null, null, config.getPartitionNumber().orElse(null));
        }
        if (config.getPartitionLowerBound().isPresent() && config.getPartitionUpperBound().isPresent()) {
            max = config.getPartitionUpperBound().get();
            min = config.getPartitionLowerBound().get();
            return new PartitionParameter(columnName, dataType, min, max, config.getPartitionNumber().orElse(null));
        }
        try (ResultSet rs = connection.createStatement().executeQuery(String.format("SELECT MAX(%s),MIN(%s) FROM (%s) tt", columnName, columnName, config.getQuery()));){
            if (rs.next()) {
                max = config.getPartitionUpperBound().isPresent() ? config.getPartitionUpperBound().get() : rs.getBigDecimal(1);
                min = config.getPartitionLowerBound().isPresent() ? config.getPartitionLowerBound().get() : rs.getBigDecimal(2);
            }
        }
        catch (SQLException e) {
            throw new PrepareFailException("jdbc", PluginType.SOURCE, e.toString());
        }
        return new PartitionParameter(columnName, dataType, min, max, config.getPartitionNumber().orElse(null));
    }

    private static Optional<String> getPartitionColumn(JdbcSourceConfig config, TableSchema tableSchema) {
        if (config.getPartitionColumn().isPresent()) {
            return config.getPartitionColumn();
        }
        if (tableSchema.getPrimaryKey() != null) {
            PrimaryKey primaryKey = tableSchema.getPrimaryKey();
            return Optional.of(primaryKey.getColumnNames().get(0));
        }
        return Optional.empty();
    }

    static SeaTunnelDataType<?> validationPartitionColumn(String partitionColumn, SeaTunnelRowType rowType) {
        HashMap<String, SeaTunnelDataType> fieldTypes = new HashMap<String, SeaTunnelDataType>();
        for (int i = 0; i < rowType.getFieldNames().length; ++i) {
            fieldTypes.put(rowType.getFieldName(i), rowType.getFieldType(i));
        }
        if (!fieldTypes.containsKey(partitionColumn)) {
            throw new JdbcConnectorException((SeaTunnelErrorCode)CommonErrorCode.ILLEGAL_ARGUMENT, String.format("Partitioned column(%s) don't exist in the table columns", partitionColumn));
        }
        SeaTunnelDataType partitionColumnType = (SeaTunnelDataType)fieldTypes.get(partitionColumn);
        if (!JdbcSourceFactory.isNumericType(partitionColumnType) && !JdbcSourceFactory.isStringType(partitionColumnType)) {
            throw new JdbcConnectorException((SeaTunnelErrorCode)CommonErrorCode.ILLEGAL_ARGUMENT, String.format("%s is not numeric/string type", partitionColumn));
        }
        return partitionColumnType;
    }

    private static boolean isNumericType(SeaTunnelDataType<?> type) {
        int scale = 1;
        if (type instanceof DecimalType) {
            int n = scale = ((DecimalType)type).getScale() == 0 ? 0 : ((DecimalType)type).getScale();
            if (scale != 0) {
                throw new JdbcConnectorException((SeaTunnelErrorCode)CommonErrorCode.ILLEGAL_ARGUMENT, String.format("The current field is DecimalType containing decimals: %d Unable to support", scale));
            }
        }
        return type.equals((Object)BasicType.INT_TYPE) || type.equals((Object)BasicType.LONG_TYPE) || scale == 0;
    }

    private static boolean isStringType(SeaTunnelDataType<?> type) {
        return type.equals((Object)BasicType.STRING_TYPE);
    }

    public OptionRule optionRule() {
        return OptionRule.builder().required(new Option[]{JdbcOptions.URL, JdbcOptions.DRIVER, JdbcOptions.QUERY}).optional(new Option[]{JdbcOptions.USER, JdbcOptions.PASSWORD, JdbcOptions.CONNECTION_CHECK_TIMEOUT_SEC, JdbcOptions.FETCH_SIZE, JdbcOptions.PARTITION_COLUMN, JdbcOptions.PARTITION_UPPER_BOUND, JdbcOptions.PARTITION_LOWER_BOUND, JdbcOptions.PARTITION_NUM, JdbcOptions.COMPATIBLE_MODE}).build();
    }

    public Class<? extends SeaTunnelSource> getSourceClass() {
        return JdbcSource.class;
    }
}

