/*
 * Decompiled with CFR 0.152.
 */
package io.debezium.connector.jdbc;

import io.debezium.connector.jdbc.Buffer;
import io.debezium.connector.jdbc.JdbcSinkConnectorConfig;
import io.debezium.connector.jdbc.RecordBuffer;
import io.debezium.connector.jdbc.RecordWriter;
import io.debezium.connector.jdbc.ReducedRecordBuffer;
import io.debezium.connector.jdbc.SinkRecordDescriptor;
import io.debezium.connector.jdbc.dialect.DatabaseDialect;
import io.debezium.connector.jdbc.naming.TableNamingStrategy;
import io.debezium.connector.jdbc.relational.TableDescriptor;
import io.debezium.connector.jdbc.relational.TableId;
import io.debezium.pipeline.spi.ChangeEventSink;
import io.debezium.util.Clock;
import io.debezium.util.Metronome;
import io.debezium.util.Stopwatch;
import io.debezium.util.Strings;
import java.sql.SQLException;
import java.time.Duration;
import java.time.temporal.ChronoUnit;
import java.util.Collection;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.errors.DataException;
import org.apache.kafka.connect.sink.SinkRecord;
import org.hibernate.StatelessSession;
import org.hibernate.Transaction;
import org.hibernate.dialect.DatabaseVersion;
import org.hibernate.query.NativeQuery;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class JdbcChangeEventSink
implements ChangeEventSink {
    private static final Logger LOGGER = LoggerFactory.getLogger(JdbcChangeEventSink.class);
    public static final String SCHEMA_CHANGE_VALUE = "SchemaChangeValue";
    public static final String DETECT_SCHEMA_CHANGE_RECORD_MSG = "Schema change records are not supported by JDBC connector. Adjust `topics` or `topics.regex` to exclude schema change topic.";
    private final JdbcSinkConnectorConfig config;
    private final DatabaseDialect dialect;
    private final StatelessSession session;
    private final TableNamingStrategy tableNamingStrategy;
    private final RecordWriter recordWriter;
    private final int flushMaxRetries;
    private final Duration flushRetryDelay;

    public JdbcChangeEventSink(JdbcSinkConnectorConfig config, StatelessSession session, DatabaseDialect dialect, RecordWriter recordWriter) {
        this.config = config;
        this.tableNamingStrategy = config.getTableNamingStrategy();
        this.dialect = dialect;
        this.session = session;
        this.recordWriter = recordWriter;
        this.flushMaxRetries = config.getFlushMaxRetries();
        this.flushRetryDelay = Duration.of(config.getFlushRetryDelayMs(), ChronoUnit.MILLIS);
        DatabaseVersion version = this.dialect.getVersion();
        LOGGER.info("Database version {}.{}.{}", new Object[]{version.getMajor(), version.getMinor(), version.getMicro()});
    }

    public void execute(Collection<SinkRecord> records) {
        LinkedHashMap<TableId, Buffer> updateBufferByTable = new LinkedHashMap<TableId, Buffer>();
        LinkedHashMap<TableId, Buffer> deleteBufferByTable = new LinkedHashMap<TableId, Buffer>();
        for (SinkRecord record : records) {
            LOGGER.trace("Processing {}", (Object)record);
            this.validate(record);
            Optional<TableId> optionalTableId = this.getTableId(record);
            if (optionalTableId.isEmpty()) {
                LOGGER.warn("Ignored to write record from topic '{}' partition '{}' offset '{}'. No resolvable table name", new Object[]{record.topic(), record.kafkaPartition(), record.kafkaOffset()});
                continue;
            }
            SinkRecordDescriptor sinkRecordDescriptor = this.buildRecordSinkDescriptor(record);
            TableId tableId = optionalTableId.get();
            if (sinkRecordDescriptor.isTombstone()) {
                LOGGER.debug("Skipping tombstone record {}", (Object)sinkRecordDescriptor);
                continue;
            }
            if (sinkRecordDescriptor.isTruncate()) {
                if (!this.config.isTruncateEnabled()) {
                    LOGGER.debug("Truncates are not enabled, skipping truncate for topic '{}'", (Object)sinkRecordDescriptor.getTopicName());
                    continue;
                }
                this.flushBuffers(updateBufferByTable);
                this.flushBuffers(deleteBufferByTable);
                try {
                    TableDescriptor table = this.checkAndApplyTableChangesIfNeeded(tableId, sinkRecordDescriptor);
                    this.writeTruncate(this.dialect.getTruncateStatement(table));
                    continue;
                }
                catch (SQLException e) {
                    throw new ConnectException("Failed to process a sink record", (Throwable)e);
                }
            }
            if (sinkRecordDescriptor.isDelete()) {
                if (!this.config.isDeleteEnabled()) {
                    LOGGER.debug("Deletes are not enabled, skipping delete for topic '{}'", (Object)sinkRecordDescriptor.getTopicName());
                    continue;
                }
                if (updateBufferByTable.get(tableId) != null && !((Buffer)updateBufferByTable.get(tableId)).isEmpty()) {
                    this.flushBufferWithRetries(tableId, ((Buffer)updateBufferByTable.get(tableId)).flush());
                }
                Buffer tableIdBuffer = this.resolveBuffer(deleteBufferByTable, tableId, sinkRecordDescriptor);
                List<SinkRecordDescriptor> toFlush = tableIdBuffer.add(sinkRecordDescriptor);
                this.flushBufferWithRetries(tableId, toFlush);
                continue;
            }
            if (deleteBufferByTable.get(tableId) != null && !((Buffer)deleteBufferByTable.get(tableId)).isEmpty()) {
                this.flushBufferWithRetries(tableId, ((Buffer)deleteBufferByTable.get(tableId)).flush());
            }
            Stopwatch updateBufferStopwatch = Stopwatch.reusable();
            updateBufferStopwatch.start();
            Buffer tableIdBuffer = this.resolveBuffer(updateBufferByTable, tableId, sinkRecordDescriptor);
            List<SinkRecordDescriptor> toFlush = tableIdBuffer.add(sinkRecordDescriptor);
            updateBufferStopwatch.stop();
            LOGGER.trace("[PERF] Update buffer execution time {}", (Object)updateBufferStopwatch.durations());
            this.flushBufferWithRetries(tableId, toFlush);
        }
        this.flushBuffers(updateBufferByTable);
        this.flushBuffers(deleteBufferByTable);
    }

    private void validate(SinkRecord record) {
        if (JdbcChangeEventSink.isSchemaChange(record)) {
            LOGGER.error(DETECT_SCHEMA_CHANGE_RECORD_MSG);
            throw new DataException(DETECT_SCHEMA_CHANGE_RECORD_MSG);
        }
    }

    private static boolean isSchemaChange(SinkRecord record) {
        return record.valueSchema() != null && !Strings.isNullOrEmpty((String)record.valueSchema().name()) && record.valueSchema().name().contains(SCHEMA_CHANGE_VALUE);
    }

    private Buffer resolveBuffer(Map<TableId, Buffer> bufferMap, TableId tableId, SinkRecordDescriptor sinkRecordDescriptor) {
        if (this.config.isUseReductionBuffer() && !sinkRecordDescriptor.getKeyFieldNames().isEmpty()) {
            return bufferMap.computeIfAbsent(tableId, k -> new ReducedRecordBuffer(this.config));
        }
        return bufferMap.computeIfAbsent(tableId, k -> new RecordBuffer(this.config));
    }

    private SinkRecordDescriptor buildRecordSinkDescriptor(SinkRecord record) {
        SinkRecordDescriptor sinkRecordDescriptor;
        try {
            sinkRecordDescriptor = SinkRecordDescriptor.builder().withPrimaryKeyMode(this.config.getPrimaryKeyMode()).withPrimaryKeyFields(this.config.getPrimaryKeyFields()).withFieldFilters(this.config.getFieldsFilter()).withSinkRecord(record).withDialect(this.dialect).build();
        }
        catch (Exception e) {
            throw new ConnectException("Failed to process a sink record", (Throwable)e);
        }
        return sinkRecordDescriptor;
    }

    private void flushBuffers(Map<TableId, Buffer> bufferByTable) {
        bufferByTable.forEach((tableId, recordBuffer) -> this.flushBufferWithRetries((TableId)tableId, recordBuffer.flush()));
    }

    private void flushBufferWithRetries(TableId tableId, List<SinkRecordDescriptor> toFlush) {
        Exception lastException = null;
        LOGGER.debug("Flushing records in JDBC Writer for table: {}", (Object)tableId.getTableName());
        for (int retries = 0; retries <= this.flushMaxRetries; ++retries) {
            try {
                if (retries > 0) {
                    LOGGER.warn("Retry to flush records for table '{}'. Retry {}/{} with delay {} ms", new Object[]{tableId.getTableName(), retries, this.flushMaxRetries, this.flushRetryDelay.toMillis()});
                    try {
                        Metronome.parker((Duration)this.flushRetryDelay, (Clock)Clock.SYSTEM).pause();
                    }
                    catch (InterruptedException e) {
                        throw new ConnectException("Interrupted while waiting to retry flush records", (Throwable)e);
                    }
                }
                this.flushBuffer(tableId, toFlush);
                return;
            }
            catch (Exception e) {
                lastException = e;
                if (this.isRetriable(e)) {
                    continue;
                }
                throw new ConnectException("Failed to process a sink record", (Throwable)e);
            }
        }
        throw new ConnectException("Exceeded max retries " + this.flushMaxRetries + " times, failed to process sink records", (Throwable)lastException);
    }

    private void flushBuffer(TableId tableId, List<SinkRecordDescriptor> toFlush) throws SQLException {
        Stopwatch flushBufferStopwatch = Stopwatch.reusable();
        Stopwatch tableChangesStopwatch = Stopwatch.reusable();
        if (!toFlush.isEmpty()) {
            LOGGER.debug("Starting flushing in JDBC Writer for table: {}", (Object)tableId.getTableName());
            tableChangesStopwatch.start();
            TableDescriptor table = this.checkAndApplyTableChangesIfNeeded(tableId, toFlush.get(0));
            tableChangesStopwatch.stop();
            String sqlStatement = this.getSqlStatement(table, toFlush.get(0));
            flushBufferStopwatch.start();
            this.recordWriter.write(toFlush, sqlStatement);
            flushBufferStopwatch.stop();
            LOGGER.trace("[PERF] Flush buffer execution time {}", (Object)flushBufferStopwatch.durations());
            LOGGER.trace("[PERF] Table changes execution time {}", (Object)tableChangesStopwatch.durations());
        }
    }

    private Optional<TableId> getTableId(SinkRecord record) {
        String tableName = this.tableNamingStrategy.resolveTableName(this.config, record);
        if (tableName == null) {
            return Optional.empty();
        }
        return Optional.of(this.dialect.getTableId(tableName));
    }

    public void close() {
        if (this.session != null && this.session.isOpen()) {
            LOGGER.info("Closing session.");
            this.session.close();
        } else {
            LOGGER.info("Session already closed.");
        }
    }

    private TableDescriptor checkAndApplyTableChangesIfNeeded(TableId tableId, SinkRecordDescriptor descriptor) throws SQLException {
        if (!this.hasTable(tableId)) {
            try {
                return this.createTable(tableId, descriptor);
            }
            catch (SQLException ce) {
                LOGGER.warn("Table creation failed for '{}', attempting to alter the table", (Object)tableId.toFullIdentiferString(), (Object)ce);
                try {
                    return this.alterTableIfNeeded(tableId, descriptor);
                }
                catch (SQLException ae) {
                    LOGGER.error("Failed to alter the table '{}'.", (Object)tableId.toFullIdentiferString(), (Object)ae);
                    throw ae;
                }
            }
        }
        try {
            return this.alterTableIfNeeded(tableId, descriptor);
        }
        catch (SQLException ae) {
            LOGGER.error("Failed to alter the table '{}'.", (Object)tableId.toFullIdentiferString(), (Object)ae);
            throw ae;
        }
    }

    private boolean hasTable(TableId tableId) {
        return (Boolean)this.session.doReturningWork(connection -> this.dialect.tableExists(connection, tableId));
    }

    private TableDescriptor readTable(TableId tableId) {
        return (TableDescriptor)this.session.doReturningWork(connection -> this.dialect.readTable(connection, tableId));
    }

    private TableDescriptor createTable(TableId tableId, SinkRecordDescriptor record) throws SQLException {
        LOGGER.debug("Attempting to create table '{}'.", (Object)tableId.toFullIdentiferString());
        if (JdbcSinkConnectorConfig.SchemaEvolutionMode.NONE.equals((Object)this.config.getSchemaEvolutionMode())) {
            LOGGER.warn("Table '{}' cannot be created because schema evolution is disabled.", (Object)tableId.toFullIdentiferString());
            throw new SQLException("Cannot create table " + tableId.toFullIdentiferString() + " because schema evolution is disabled");
        }
        Transaction transaction = this.session.beginTransaction();
        try {
            String createSql = this.dialect.getCreateTableStatement(record, tableId);
            LOGGER.trace("SQL: {}", (Object)createSql);
            this.session.createNativeQuery(createSql, Object.class).executeUpdate();
            transaction.commit();
        }
        catch (Exception e) {
            transaction.rollback();
            throw e;
        }
        return this.readTable(tableId);
    }

    private TableDescriptor alterTableIfNeeded(TableId tableId, SinkRecordDescriptor record) throws SQLException {
        LOGGER.debug("Attempting to alter table '{}'.", (Object)tableId.toFullIdentiferString());
        if (!this.hasTable(tableId)) {
            LOGGER.error("Table '{}' does not exist and cannot be altered.", (Object)tableId.toFullIdentiferString());
            throw new SQLException("Could not find table: " + tableId.toFullIdentiferString());
        }
        TableDescriptor table = this.readTable(tableId);
        Set<String> missingFields = this.dialect.resolveMissingFields(record, table);
        if (missingFields.isEmpty()) {
            return table;
        }
        LOGGER.debug("The follow fields are missing in the table: {}", missingFields);
        for (String missingFieldName : missingFields) {
            SinkRecordDescriptor.FieldDescriptor fieldDescriptor = record.getFields().get(missingFieldName);
            if (fieldDescriptor.getSchema().isOptional() || fieldDescriptor.getSchema().defaultValue() != null) continue;
            throw new SQLException(String.format("Cannot ALTER table '%s' because field '%s' is not optional but has no default value", tableId.toFullIdentiferString(), fieldDescriptor.getName()));
        }
        if (JdbcSinkConnectorConfig.SchemaEvolutionMode.NONE.equals((Object)this.config.getSchemaEvolutionMode())) {
            LOGGER.warn("Table '{}' cannot be altered because schema evolution is disabled.", (Object)tableId.toFullIdentiferString());
            throw new SQLException("Cannot alter table " + tableId.toFullIdentiferString() + " because schema evolution is disabled");
        }
        Transaction transaction = this.session.beginTransaction();
        try {
            String alterSql = this.dialect.getAlterTableStatement(table, record, missingFields);
            LOGGER.trace("SQL: {}", (Object)alterSql);
            this.session.createNativeQuery(alterSql, Object.class).executeUpdate();
            transaction.commit();
        }
        catch (Exception e) {
            transaction.rollback();
            throw e;
        }
        return this.readTable(tableId);
    }

    private String getSqlStatement(TableDescriptor table, SinkRecordDescriptor record) {
        if (!record.isDelete()) {
            switch (this.config.getInsertMode()) {
                case INSERT: {
                    return this.dialect.getInsertStatement(table, record);
                }
                case UPSERT: {
                    if (record.getKeyFieldNames().isEmpty()) {
                        throw new ConnectException("Cannot write to table " + table.getId().getTableName() + " with no key fields defined.");
                    }
                    return this.dialect.getUpsertStatement(table, record);
                }
                case UPDATE: {
                    return this.dialect.getUpdateStatement(table, record);
                }
            }
        } else {
            return this.dialect.getDeleteStatement(table, record);
        }
        throw new DataException(String.format("Unable to get SQL statement for %s", record));
    }

    private void writeTruncate(String sql) throws SQLException {
        Transaction transaction = this.session.beginTransaction();
        try {
            LOGGER.trace("SQL: {}", (Object)sql);
            NativeQuery query = this.session.createNativeQuery(sql, Object.class);
            query.executeUpdate();
            transaction.commit();
        }
        catch (Exception e) {
            transaction.rollback();
            throw e;
        }
    }

    private boolean isRetriable(Throwable throwable) {
        if (throwable == null) {
            return false;
        }
        for (Class<? extends Exception> e : this.dialect.getCommunicationExceptions()) {
            if (!e.isAssignableFrom(throwable.getClass())) continue;
            return true;
        }
        return this.isRetriable(throwable.getCause());
    }
}

