/*
 * Decompiled with CFR 0.152.
 */
package org.apache.seatunnel.connectors.seatunnel.jdbc.internal;

import java.util.Arrays;
import java.util.List;
import java.util.Optional;
import java.util.function.Function;
import java.util.stream.Collectors;
import lombok.NonNull;
import org.apache.commons.lang3.StringUtils;
import org.apache.seatunnel.api.table.catalog.TablePath;
import org.apache.seatunnel.api.table.catalog.TableSchema;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.connectors.seatunnel.jdbc.config.JdbcSinkConfig;
import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.JdbcOutputFormat;
import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.connection.JdbcConnectionProvider;
import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.converter.JdbcRowConverter;
import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialect;
import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.executor.BufferReducedBatchStatementExecutor;
import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.executor.BufferedBatchStatementExecutor;
import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.executor.FieldNamedPreparedStatement;
import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.executor.InsertOrUpdateBatchStatementExecutor;
import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.executor.JdbcBatchStatementExecutor;
import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.executor.SimpleBatchStatementExecutor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class JdbcOutputFormatBuilder {
    private static final Logger log = LoggerFactory.getLogger(JdbcOutputFormatBuilder.class);
    @NonNull
    private final JdbcDialect dialect;
    @NonNull
    private final JdbcConnectionProvider connectionProvider;
    @NonNull
    private final JdbcSinkConfig jdbcSinkConfig;
    @NonNull
    private final TableSchema tableSchema;

    public JdbcOutputFormat build() {
        String database = this.jdbcSinkConfig.getDatabase();
        String table = this.dialect.extractTableName(TablePath.of((String)(this.jdbcSinkConfig.getDatabase() + "." + this.jdbcSinkConfig.getTable())));
        List<String> primaryKeys = this.jdbcSinkConfig.getPrimaryKeys();
        JdbcOutputFormat.StatementExecutorFactory statementExecutorFactory = StringUtils.isNotBlank((CharSequence)this.jdbcSinkConfig.getSimpleSql()) ? () -> JdbcOutputFormatBuilder.createSimpleBufferedExecutor(this.jdbcSinkConfig.getSimpleSql(), this.tableSchema, this.dialect.getRowConverter()) : (primaryKeys == null || primaryKeys.isEmpty() ? () -> JdbcOutputFormatBuilder.createSimpleBufferedExecutor(this.dialect, database, table, this.tableSchema) : () -> JdbcOutputFormatBuilder.createUpsertBufferedExecutor(this.dialect, database, table, this.tableSchema, primaryKeys.toArray(new String[0]), this.jdbcSinkConfig.isEnableUpsert(), this.jdbcSinkConfig.isPrimaryKeyUpdated(), this.jdbcSinkConfig.isSupportUpsertByInsertOnly()));
        return new JdbcOutputFormat(this.connectionProvider, this.jdbcSinkConfig.getJdbcConnectionConfig(), statementExecutorFactory);
    }

    private static JdbcBatchStatementExecutor<SeaTunnelRow> createSimpleBufferedExecutor(JdbcDialect dialect, String database, String table, TableSchema tableSchema) {
        String insertSQL = dialect.getInsertIntoStatement(database, table, tableSchema.getFieldNames());
        return JdbcOutputFormatBuilder.createSimpleBufferedExecutor(insertSQL, tableSchema, dialect.getRowConverter());
    }

    private static JdbcBatchStatementExecutor<SeaTunnelRow> createSimpleBufferedExecutor(String sql, TableSchema tableSchema, JdbcRowConverter rowConverter) {
        JdbcBatchStatementExecutor<SeaTunnelRow> simpleRowExecutor = JdbcOutputFormatBuilder.createSimpleExecutor(sql, tableSchema, rowConverter);
        return new BufferedBatchStatementExecutor(simpleRowExecutor, Function.identity());
    }

    private static JdbcBatchStatementExecutor<SeaTunnelRow> createUpsertBufferedExecutor(JdbcDialect dialect, String database, String table, TableSchema tableSchema, String[] pkNames, boolean enableUpsert, boolean isPrimaryKeyUpdated, boolean supportUpsertByInsertOnly) {
        int[] pkFields = Arrays.stream(pkNames).mapToInt(arg_0 -> ((SeaTunnelRowType)tableSchema.toPhysicalRowDataType()).indexOf(arg_0)).toArray();
        TableSchema pkSchema = TableSchema.builder().columns(Arrays.stream(pkFields).mapToObj(tableSchema.getColumns()::get).collect(Collectors.toList())).build();
        Function<SeaTunnelRow, SeaTunnelRow> keyExtractor = JdbcOutputFormatBuilder.createKeyExtractor(pkFields);
        JdbcBatchStatementExecutor<SeaTunnelRow> deleteExecutor = JdbcOutputFormatBuilder.createDeleteExecutor(dialect, database, table, pkNames, pkSchema);
        JdbcBatchStatementExecutor<SeaTunnelRow> upsertExecutor = JdbcOutputFormatBuilder.createUpsertExecutor(dialect, database, table, tableSchema, pkNames, pkSchema, keyExtractor, enableUpsert, isPrimaryKeyUpdated, supportUpsertByInsertOnly);
        return new BufferReducedBatchStatementExecutor(upsertExecutor, deleteExecutor, keyExtractor, Function.identity());
    }

    private static JdbcBatchStatementExecutor<SeaTunnelRow> createUpsertExecutor(JdbcDialect dialect, String database, String table, TableSchema tableSchema, String[] pkNames, TableSchema pkTableSchema, Function<SeaTunnelRow, SeaTunnelRow> keyExtractor, boolean enableUpsert, boolean isPrimaryKeyUpdated, boolean supportUpsertByInsertOnly) {
        if (supportUpsertByInsertOnly) {
            return JdbcOutputFormatBuilder.createInsertOnlyExecutor(dialect, database, table, tableSchema);
        }
        if (enableUpsert) {
            Optional<String> upsertSQL = dialect.getUpsertStatement(database, table, tableSchema.getFieldNames(), pkNames);
            if (upsertSQL.isPresent()) {
                return JdbcOutputFormatBuilder.createSimpleExecutor(upsertSQL.get(), tableSchema, dialect.getRowConverter());
            }
            return JdbcOutputFormatBuilder.createInsertOrUpdateByQueryExecutor(dialect, database, table, tableSchema, pkNames, pkTableSchema, keyExtractor, isPrimaryKeyUpdated);
        }
        return JdbcOutputFormatBuilder.createInsertOrUpdateExecutor(dialect, database, table, tableSchema, pkNames, isPrimaryKeyUpdated);
    }

    private static JdbcBatchStatementExecutor<SeaTunnelRow> createInsertOnlyExecutor(JdbcDialect dialect, String database, String table, TableSchema tableSchema) {
        return new SimpleBatchStatementExecutor(connection -> FieldNamedPreparedStatement.prepareStatement(connection, dialect.getInsertIntoStatement(database, table, tableSchema.getFieldNames()), tableSchema.getFieldNames()), tableSchema, dialect.getRowConverter());
    }

    private static JdbcBatchStatementExecutor<SeaTunnelRow> createInsertOrUpdateExecutor(JdbcDialect dialect, String database, String table, TableSchema tableSchema, String[] pkNames, boolean isPrimaryKeyUpdated) {
        return new InsertOrUpdateBatchStatementExecutor(connection -> FieldNamedPreparedStatement.prepareStatement(connection, dialect.getInsertIntoStatement(database, table, tableSchema.getFieldNames()), tableSchema.getFieldNames()), connection -> FieldNamedPreparedStatement.prepareStatement(connection, dialect.getUpdateStatement(database, table, tableSchema.getFieldNames(), pkNames, isPrimaryKeyUpdated), tableSchema.getFieldNames()), tableSchema, dialect.getRowConverter());
    }

    private static JdbcBatchStatementExecutor<SeaTunnelRow> createInsertOrUpdateByQueryExecutor(JdbcDialect dialect, String database, String table, TableSchema tableSchema, String[] pkNames, TableSchema pkTableSchema, Function<SeaTunnelRow, SeaTunnelRow> keyExtractor, boolean isPrimaryKeyUpdated) {
        return new InsertOrUpdateBatchStatementExecutor(connection -> FieldNamedPreparedStatement.prepareStatement(connection, dialect.getRowExistsStatement(database, table, pkNames), pkNames), connection -> FieldNamedPreparedStatement.prepareStatement(connection, dialect.getInsertIntoStatement(database, table, tableSchema.getFieldNames()), tableSchema.getFieldNames()), connection -> FieldNamedPreparedStatement.prepareStatement(connection, dialect.getUpdateStatement(database, table, tableSchema.getFieldNames(), pkNames, isPrimaryKeyUpdated), tableSchema.getFieldNames()), pkTableSchema, keyExtractor, tableSchema, dialect.getRowConverter());
    }

    private static JdbcBatchStatementExecutor<SeaTunnelRow> createDeleteExecutor(JdbcDialect dialect, String database, String table, String[] pkNames, TableSchema pkTableSchema) {
        String deleteSQL = dialect.getDeleteStatement(database, table, pkNames);
        return JdbcOutputFormatBuilder.createSimpleExecutor(deleteSQL, pkTableSchema, dialect.getRowConverter());
    }

    private static JdbcBatchStatementExecutor<SeaTunnelRow> createSimpleExecutor(String sql, TableSchema tableSchema, JdbcRowConverter rowConverter) {
        return new SimpleBatchStatementExecutor(connection -> FieldNamedPreparedStatement.prepareStatement(connection, sql, tableSchema.getFieldNames()), tableSchema, rowConverter);
    }

    static Function<SeaTunnelRow, SeaTunnelRow> createKeyExtractor(int[] pkFields) {
        return row -> {
            Object[] fields = new Object[pkFields.length];
            for (int i = 0; i < pkFields.length; ++i) {
                fields[i] = row.getField(pkFields[i]);
            }
            SeaTunnelRow newRow = new SeaTunnelRow(fields);
            newRow.setTableId(row.getTableId());
            return newRow;
        };
    }

    public JdbcOutputFormatBuilder(@NonNull JdbcDialect dialect, @NonNull JdbcConnectionProvider connectionProvider, @NonNull JdbcSinkConfig jdbcSinkConfig, @NonNull TableSchema tableSchema) {
        if (dialect == null) {
            throw new NullPointerException("dialect is marked non-null but is null");
        }
        if (connectionProvider == null) {
            throw new NullPointerException("connectionProvider is marked non-null but is null");
        }
        if (jdbcSinkConfig == null) {
            throw new NullPointerException("jdbcSinkConfig is marked non-null but is null");
        }
        if (tableSchema == null) {
            throw new NullPointerException("tableSchema is marked non-null but is null");
        }
        this.dialect = dialect;
        this.connectionProvider = connectionProvider;
        this.jdbcSinkConfig = jdbcSinkConfig;
        this.tableSchema = tableSchema;
    }
}

