/*
 * Decompiled with CFR 0.152.
 */
package org.apache.seatunnel.connectors.seatunnel.jdbc.sink;

import java.io.IOException;
import java.util.List;
import org.apache.commons.lang3.StringUtils;
import org.apache.seatunnel.api.sink.SinkWriter;
import org.apache.seatunnel.api.sink.SupportMultiTableSinkWriter;
import org.apache.seatunnel.api.table.catalog.Column;
import org.apache.seatunnel.api.table.catalog.TablePath;
import org.apache.seatunnel.api.table.catalog.TableSchema;
import org.apache.seatunnel.api.table.event.AlterTableAddColumnEvent;
import org.apache.seatunnel.api.table.event.AlterTableChangeColumnEvent;
import org.apache.seatunnel.api.table.event.AlterTableColumnEvent;
import org.apache.seatunnel.api.table.event.AlterTableColumnsEvent;
import org.apache.seatunnel.api.table.event.AlterTableDropColumnEvent;
import org.apache.seatunnel.api.table.event.AlterTableModifyColumnEvent;
import org.apache.seatunnel.api.table.event.SchemaChangeEvent;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.common.exception.SeaTunnelErrorCode;
import org.apache.seatunnel.common.utils.SeaTunnelException;
import org.apache.seatunnel.connectors.seatunnel.jdbc.config.JdbcSinkConfig;
import org.apache.seatunnel.connectors.seatunnel.jdbc.exception.JdbcConnectorErrorCode;
import org.apache.seatunnel.connectors.seatunnel.jdbc.exception.JdbcConnectorException;
import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.JdbcOutputFormat;
import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.JdbcOutputFormatBuilder;
import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.connection.JdbcConnectionProvider;
import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialect;
import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.executor.JdbcBatchStatementExecutor;
import org.apache.seatunnel.connectors.seatunnel.jdbc.state.JdbcSinkState;
import org.apache.seatunnel.connectors.seatunnel.jdbc.state.XidInfo;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class AbstractJdbcSinkWriter<ResourceT>
implements SinkWriter<SeaTunnelRow, XidInfo, JdbcSinkState>,
SupportMultiTableSinkWriter<ResourceT> {
    private static final Logger log = LoggerFactory.getLogger(AbstractJdbcSinkWriter.class);
    protected JdbcDialect dialect;
    protected TablePath sinkTablePath;
    protected TableSchema tableSchema;
    protected transient boolean isOpen;
    protected JdbcConnectionProvider connectionProvider;
    protected JdbcSinkConfig jdbcSinkConfig;
    protected JdbcOutputFormat<SeaTunnelRow, JdbcBatchStatementExecutor<SeaTunnelRow>> outputFormat;

    public void applySchemaChange(SchemaChangeEvent event) throws IOException {
        if (event instanceof AlterTableColumnsEvent) {
            AlterTableColumnsEvent alterTableColumnsEvent = (AlterTableColumnsEvent)event;
            String sourceDialectName = alterTableColumnsEvent.getSourceDialectName();
            if (StringUtils.isBlank(sourceDialectName)) {
                throw new SeaTunnelException("The sourceDialectName in AlterTableColumnEvent can not be empty");
            }
            List events = alterTableColumnsEvent.getEvents();
            for (AlterTableColumnEvent alterTableColumnEvent : events) {
                this.processSchemaChangeEvent(alterTableColumnEvent, sourceDialectName);
            }
        } else {
            log.warn("We only support AlterTableColumnsEvent, but actual event is " + event);
        }
    }

    protected void processSchemaChangeEvent(AlterTableColumnEvent event, String sourceDialectName) throws IOException {
        TableSchema newTableSchema = this.tableSchema.copy();
        List columns = newTableSchema.getColumns();
        switch (event.getEventType()) {
            case SCHEMA_CHANGE_ADD_COLUMN: {
                Column addColumn = ((AlterTableAddColumnEvent)event).getColumn();
                columns.add(addColumn);
                break;
            }
            case SCHEMA_CHANGE_DROP_COLUMN: {
                String dropColumn = ((AlterTableDropColumnEvent)event).getColumn();
                columns.removeIf(column -> column.getName().equalsIgnoreCase(dropColumn));
                break;
            }
            case SCHEMA_CHANGE_MODIFY_COLUMN: {
                Column modifyColumn = ((AlterTableModifyColumnEvent)event).getColumn();
                this.replaceColumnByIndex(columns, modifyColumn.getName(), modifyColumn);
                break;
            }
            case SCHEMA_CHANGE_CHANGE_COLUMN: {
                AlterTableChangeColumnEvent alterTableChangeColumnEvent = (AlterTableChangeColumnEvent)event;
                Column changeColumn = alterTableChangeColumnEvent.getColumn();
                String oldColumnName = alterTableChangeColumnEvent.getOldColumn();
                this.replaceColumnByIndex(columns, oldColumnName, changeColumn);
                break;
            }
            default: {
                throw new SeaTunnelException("Unsupported schemaChangeEvent for event type: " + event.getEventType());
            }
        }
        this.tableSchema = newTableSchema;
        this.reOpenOutputFormat(event, sourceDialectName);
    }

    protected void reOpenOutputFormat(AlterTableColumnEvent event, String sourceDialectName) throws IOException {
        this.prepareCommit();
        try {
            JdbcConnectionProvider refreshTableSchemaConnectionProvider = this.dialect.getJdbcConnectionProvider(this.jdbcSinkConfig.getJdbcConnectionConfig());
            this.dialect.refreshTableSchemaBySchemaChangeEvent(sourceDialectName, event, refreshTableSchemaConnectionProvider, this.sinkTablePath);
        }
        catch (Throwable e) {
            throw new JdbcConnectorException((SeaTunnelErrorCode)JdbcConnectorErrorCode.REFRESH_PHYSICAL_TABLESCHEMA_BY_SCHEMA_CHANGE_EVENT, e);
        }
        this.outputFormat = new JdbcOutputFormatBuilder(this.dialect, this.connectionProvider, this.jdbcSinkConfig, this.tableSchema).build();
        this.outputFormat.open();
    }

    protected void replaceColumnByIndex(List<Column> columns, String oldColumnName, Column newColumn) {
        for (int i = 0; i < columns.size(); ++i) {
            if (!columns.get(i).getName().equalsIgnoreCase(oldColumnName)) continue;
            columns.set(i, newColumn);
        }
    }
}

