/*
 * Decompiled with CFR 0.152.
 */
package org.apache.seatunnel.connectors.seatunnel.jdbc.internal;

import java.util.Arrays;
import java.util.List;
import java.util.Optional;
import java.util.function.Function;
import lombok.NonNull;
import org.apache.commons.lang3.StringUtils;
import org.apache.seatunnel.api.table.catalog.TablePath;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.connectors.seatunnel.jdbc.config.JdbcSinkConfig;
import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.JdbcOutputFormat;
import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.connection.JdbcConnectionProvider;
import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.converter.JdbcRowConverter;
import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialect;
import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.executor.BufferReducedBatchStatementExecutor;
import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.executor.BufferedBatchStatementExecutor;
import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.executor.FieldNamedPreparedStatement;
import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.executor.InsertOrUpdateBatchStatementExecutor;
import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.executor.JdbcBatchStatementExecutor;
import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.executor.SimpleBatchStatementExecutor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class JdbcOutputFormatBuilder {
    private static final Logger log = LoggerFactory.getLogger(JdbcOutputFormatBuilder.class);
    @NonNull
    private final JdbcDialect dialect;
    @NonNull
    private final JdbcConnectionProvider connectionProvider;
    @NonNull
    private final JdbcSinkConfig jdbcSinkConfig;
    @NonNull
    private final SeaTunnelRowType seaTunnelRowType;

    public JdbcOutputFormat build() {
        String database = this.jdbcSinkConfig.getDatabase();
        String table = this.dialect.extractTableName(TablePath.of((String)(this.jdbcSinkConfig.getDatabase() + "." + this.jdbcSinkConfig.getTable())));
        List<String> primaryKeys = this.jdbcSinkConfig.getPrimaryKeys();
        JdbcOutputFormat.StatementExecutorFactory statementExecutorFactory = StringUtils.isNotBlank(this.jdbcSinkConfig.getSimpleSql()) ? () -> JdbcOutputFormatBuilder.createSimpleBufferedExecutor(this.jdbcSinkConfig.getSimpleSql(), this.seaTunnelRowType, this.dialect.getRowConverter()) : (primaryKeys == null || primaryKeys.isEmpty() ? () -> JdbcOutputFormatBuilder.createSimpleBufferedExecutor(this.dialect, database, table, this.seaTunnelRowType) : () -> JdbcOutputFormatBuilder.createUpsertBufferedExecutor(this.dialect, database, table, this.seaTunnelRowType, primaryKeys.toArray(new String[0]), this.jdbcSinkConfig.isEnableUpsert(), this.jdbcSinkConfig.isPrimaryKeyUpdated(), this.jdbcSinkConfig.isSupportUpsertByInsertOnly()));
        return new JdbcOutputFormat(this.connectionProvider, this.jdbcSinkConfig.getJdbcConnectionConfig(), statementExecutorFactory);
    }

    private static JdbcBatchStatementExecutor<SeaTunnelRow> createSimpleBufferedExecutor(JdbcDialect dialect, String database, String table, SeaTunnelRowType rowType) {
        String insertSQL = dialect.getInsertIntoStatement(database, table, rowType.getFieldNames());
        return JdbcOutputFormatBuilder.createSimpleBufferedExecutor(insertSQL, rowType, dialect.getRowConverter());
    }

    private static JdbcBatchStatementExecutor<SeaTunnelRow> createSimpleBufferedExecutor(String sql, SeaTunnelRowType rowType, JdbcRowConverter rowConverter) {
        JdbcBatchStatementExecutor<SeaTunnelRow> simpleRowExecutor = JdbcOutputFormatBuilder.createSimpleExecutor(sql, rowType, rowConverter);
        return new BufferedBatchStatementExecutor(simpleRowExecutor, Function.identity());
    }

    private static JdbcBatchStatementExecutor<SeaTunnelRow> createUpsertBufferedExecutor(JdbcDialect dialect, String database, String table, SeaTunnelRowType rowType, String[] pkNames, boolean enableUpsert, boolean isPrimaryKeyUpdated, boolean supportUpsertByInsertOnly) {
        int[] pkFields = Arrays.stream(pkNames).mapToInt(arg_0 -> ((SeaTunnelRowType)rowType).indexOf(arg_0)).toArray();
        SeaTunnelDataType[] pkTypes = (SeaTunnelDataType[])Arrays.stream(pkFields).mapToObj(arg_0 -> ((SeaTunnelRowType)rowType).getFieldType(arg_0)).toArray(SeaTunnelDataType[]::new);
        Function<SeaTunnelRow, SeaTunnelRow> keyExtractor = JdbcOutputFormatBuilder.createKeyExtractor(pkFields);
        JdbcBatchStatementExecutor<SeaTunnelRow> deleteExecutor = JdbcOutputFormatBuilder.createDeleteExecutor(dialect, database, table, pkNames, pkTypes);
        JdbcBatchStatementExecutor<SeaTunnelRow> upsertExecutor = JdbcOutputFormatBuilder.createUpsertExecutor(dialect, database, table, rowType, pkNames, pkTypes, keyExtractor, enableUpsert, isPrimaryKeyUpdated, supportUpsertByInsertOnly);
        return new BufferReducedBatchStatementExecutor(upsertExecutor, deleteExecutor, keyExtractor, Function.identity());
    }

    private static JdbcBatchStatementExecutor<SeaTunnelRow> createUpsertExecutor(JdbcDialect dialect, String database, String table, SeaTunnelRowType rowType, String[] pkNames, SeaTunnelDataType[] pkTypes, Function<SeaTunnelRow, SeaTunnelRow> keyExtractor, boolean enableUpsert, boolean isPrimaryKeyUpdated, boolean supportUpsertByInsertOnly) {
        if (supportUpsertByInsertOnly) {
            return JdbcOutputFormatBuilder.createInsertOnlyExecutor(dialect, database, table, rowType);
        }
        if (enableUpsert) {
            Optional<String> upsertSQL = dialect.getUpsertStatement(database, table, rowType.getFieldNames(), pkNames);
            if (upsertSQL.isPresent()) {
                return JdbcOutputFormatBuilder.createSimpleExecutor(upsertSQL.get(), rowType, dialect.getRowConverter());
            }
            return JdbcOutputFormatBuilder.createInsertOrUpdateByQueryExecutor(dialect, database, table, rowType, pkNames, pkTypes, keyExtractor, isPrimaryKeyUpdated);
        }
        return JdbcOutputFormatBuilder.createInsertOrUpdateExecutor(dialect, database, table, rowType, pkNames, isPrimaryKeyUpdated);
    }

    private static JdbcBatchStatementExecutor<SeaTunnelRow> createInsertOnlyExecutor(JdbcDialect dialect, String database, String table, SeaTunnelRowType rowType) {
        return new SimpleBatchStatementExecutor(connection -> FieldNamedPreparedStatement.prepareStatement(connection, dialect.getInsertIntoStatement(database, table, rowType.getFieldNames()), rowType.getFieldNames()), rowType, dialect.getRowConverter());
    }

    private static JdbcBatchStatementExecutor<SeaTunnelRow> createInsertOrUpdateExecutor(JdbcDialect dialect, String database, String table, SeaTunnelRowType rowType, String[] pkNames, boolean isPrimaryKeyUpdated) {
        return new InsertOrUpdateBatchStatementExecutor(connection -> FieldNamedPreparedStatement.prepareStatement(connection, dialect.getInsertIntoStatement(database, table, rowType.getFieldNames()), rowType.getFieldNames()), connection -> FieldNamedPreparedStatement.prepareStatement(connection, dialect.getUpdateStatement(database, table, rowType.getFieldNames(), pkNames, isPrimaryKeyUpdated), rowType.getFieldNames()), rowType, dialect.getRowConverter());
    }

    private static JdbcBatchStatementExecutor<SeaTunnelRow> createInsertOrUpdateByQueryExecutor(JdbcDialect dialect, String database, String table, SeaTunnelRowType rowType, String[] pkNames, SeaTunnelDataType[] pkTypes, Function<SeaTunnelRow, SeaTunnelRow> keyExtractor, boolean isPrimaryKeyUpdated) {
        SeaTunnelRowType keyRowType = new SeaTunnelRowType(pkNames, pkTypes);
        return new InsertOrUpdateBatchStatementExecutor(connection -> FieldNamedPreparedStatement.prepareStatement(connection, dialect.getRowExistsStatement(database, table, pkNames), pkNames), connection -> FieldNamedPreparedStatement.prepareStatement(connection, dialect.getInsertIntoStatement(database, table, rowType.getFieldNames()), rowType.getFieldNames()), connection -> FieldNamedPreparedStatement.prepareStatement(connection, dialect.getUpdateStatement(database, table, rowType.getFieldNames(), pkNames, isPrimaryKeyUpdated), rowType.getFieldNames()), keyRowType, keyExtractor, rowType, dialect.getRowConverter());
    }

    private static JdbcBatchStatementExecutor<SeaTunnelRow> createDeleteExecutor(JdbcDialect dialect, String database, String table, String[] pkNames, SeaTunnelDataType[] pkTypes) {
        String deleteSQL = dialect.getDeleteStatement(database, table, pkNames);
        return JdbcOutputFormatBuilder.createSimpleExecutor(deleteSQL, pkNames, pkTypes, dialect.getRowConverter());
    }

    private static JdbcBatchStatementExecutor<SeaTunnelRow> createSimpleExecutor(String sql, String[] fieldNames, SeaTunnelDataType[] fieldTypes, JdbcRowConverter rowConverter) {
        SeaTunnelRowType rowType = new SeaTunnelRowType(fieldNames, fieldTypes);
        return JdbcOutputFormatBuilder.createSimpleExecutor(sql, rowType, rowConverter);
    }

    private static JdbcBatchStatementExecutor<SeaTunnelRow> createSimpleExecutor(String sql, SeaTunnelRowType rowType, JdbcRowConverter rowConverter) {
        return new SimpleBatchStatementExecutor(connection -> FieldNamedPreparedStatement.prepareStatement(connection, sql, rowType.getFieldNames()), rowType, rowConverter);
    }

    private static Function<SeaTunnelRow, SeaTunnelRow> createKeyExtractor(int[] pkFields) {
        return row -> {
            Object[] fields = new Object[pkFields.length];
            for (int i = 0; i < pkFields.length; ++i) {
                fields[i] = row.getField(pkFields[i]);
            }
            SeaTunnelRow newRow = new SeaTunnelRow(fields);
            newRow.setTableId(row.getTableId());
            newRow.setRowKind(row.getRowKind());
            return newRow;
        };
    }

    public JdbcOutputFormatBuilder(@NonNull JdbcDialect dialect, @NonNull JdbcConnectionProvider connectionProvider, @NonNull JdbcSinkConfig jdbcSinkConfig, @NonNull SeaTunnelRowType seaTunnelRowType) {
        if (dialect == null) {
            throw new NullPointerException("dialect is marked non-null but is null");
        }
        if (connectionProvider == null) {
            throw new NullPointerException("connectionProvider is marked non-null but is null");
        }
        if (jdbcSinkConfig == null) {
            throw new NullPointerException("jdbcSinkConfig is marked non-null but is null");
        }
        if (seaTunnelRowType == null) {
            throw new NullPointerException("seaTunnelRowType is marked non-null but is null");
        }
        this.dialect = dialect;
        this.connectionProvider = connectionProvider;
        this.jdbcSinkConfig = jdbcSinkConfig;
        this.seaTunnelRowType = seaTunnelRowType;
    }
}

