/*
 * Decompiled with CFR 0.152.
 */
package io.debezium.connector.jdbc;

import io.debezium.connector.jdbc.Buffer;
import io.debezium.connector.jdbc.JdbcKafkaSinkRecord;
import io.debezium.connector.jdbc.JdbcSinkConnectorConfig;
import io.debezium.connector.jdbc.JdbcSinkRecord;
import io.debezium.connector.jdbc.RecordBuffer;
import io.debezium.connector.jdbc.RecordWriter;
import io.debezium.connector.jdbc.ReducedRecordBuffer;
import io.debezium.connector.jdbc.dialect.DatabaseDialect;
import io.debezium.connector.jdbc.relational.TableDescriptor;
import io.debezium.metadata.CollectionId;
import io.debezium.sink.DebeziumSinkRecord;
import io.debezium.sink.field.FieldDescriptor;
import io.debezium.sink.spi.ChangeEventSink;
import io.debezium.util.Clock;
import io.debezium.util.Metronome;
import io.debezium.util.Stopwatch;
import java.sql.SQLException;
import java.time.Duration;
import java.time.temporal.ChronoUnit;
import java.util.Collection;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.errors.DataException;
import org.apache.kafka.connect.sink.SinkRecord;
import org.hibernate.StatelessSession;
import org.hibernate.Transaction;
import org.hibernate.dialect.DatabaseVersion;
import org.hibernate.query.NativeQuery;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class JdbcChangeEventSink
implements ChangeEventSink {
    private static final Logger LOGGER = LoggerFactory.getLogger(JdbcChangeEventSink.class);
    public static final String DETECT_SCHEMA_CHANGE_RECORD_MSG = "Schema change records are not supported by JDBC connector. Adjust `topics` or `topics.regex` to exclude schema change topic.";
    private final JdbcSinkConnectorConfig config;
    private final DatabaseDialect dialect;
    private final StatelessSession session;
    private final RecordWriter recordWriter;
    private final int flushMaxRetries;
    private final Duration flushRetryDelay;

    public JdbcChangeEventSink(JdbcSinkConnectorConfig config, StatelessSession session, DatabaseDialect dialect, RecordWriter recordWriter) {
        this.config = config;
        this.dialect = dialect;
        this.session = session;
        this.recordWriter = recordWriter;
        this.flushMaxRetries = config.getFlushMaxRetries();
        this.flushRetryDelay = Duration.of(config.getFlushRetryDelayMs(), ChronoUnit.MILLIS);
        DatabaseVersion version = this.dialect.getVersion();
        LOGGER.info("Database version {}.{}.{}", new Object[]{version.getMajor(), version.getMinor(), version.getMicro()});
    }

    public void execute(Collection<SinkRecord> records) {
        LinkedHashMap<CollectionId, Buffer> upsertBufferByTable = new LinkedHashMap<CollectionId, Buffer>();
        LinkedHashMap<CollectionId, Buffer> deleteBufferByTable = new LinkedHashMap<CollectionId, Buffer>();
        for (SinkRecord kafkaSinkRecord : records) {
            JdbcKafkaSinkRecord record = new JdbcKafkaSinkRecord(kafkaSinkRecord, this.config.getPrimaryKeyMode(), this.config.getPrimaryKeyFields(), this.config.getFieldFilter(), this.config.cloudEventsSchemaNamePattern(), this.dialect);
            LOGGER.trace("Processing {}", (Object)record);
            this.validate(record);
            Optional<CollectionId> optionalCollectionId = this.getCollectionIdFromRecord((DebeziumSinkRecord)record);
            if (optionalCollectionId.isEmpty()) {
                LOGGER.warn("Ignored to write record from topic '{}' partition '{}' offset '{}'. No resolvable table name", new Object[]{record.topicName(), record.partition(), record.offset()});
                continue;
            }
            CollectionId collectionId = optionalCollectionId.get();
            if (record.isTruncate()) {
                if (!this.config.isTruncateEnabled()) {
                    LOGGER.debug("Truncates are not enabled, skipping truncate for topic '{}'", (Object)record.topicName());
                    continue;
                }
                this.flushBuffers(upsertBufferByTable);
                this.flushBuffers(deleteBufferByTable);
                try {
                    TableDescriptor table = this.checkAndApplyTableChangesIfNeeded(collectionId, record);
                    this.writeTruncate(this.dialect.getTruncateStatement(table));
                    continue;
                }
                catch (SQLException e) {
                    throw new ConnectException("Failed to process a sink record", (Throwable)e);
                }
            }
            if (record.isDelete() || record.isTombstone()) {
                if (!this.config.isDeleteEnabled()) {
                    LOGGER.debug("Deletes are not enabled, skipping delete for topic '{}'", (Object)record.topicName());
                    continue;
                }
                Buffer upsertBufferToFlush = (Buffer)upsertBufferByTable.get(collectionId);
                if (upsertBufferToFlush != null && !upsertBufferToFlush.isEmpty()) {
                    if (this.config.isUseReductionBuffer()) {
                        upsertBufferToFlush.remove(record);
                    } else {
                        this.flushBufferWithRetries(collectionId, upsertBufferToFlush);
                    }
                }
                this.flushBufferRecordsWithRetries(collectionId, this.getRecordsToFlush(deleteBufferByTable, collectionId, record));
                continue;
            }
            Buffer deleteBufferToFlush = (Buffer)deleteBufferByTable.get(collectionId);
            if (deleteBufferToFlush != null && !deleteBufferToFlush.isEmpty()) {
                if (this.config.isUseReductionBuffer()) {
                    deleteBufferToFlush.remove(record);
                } else {
                    this.flushBufferWithRetries(collectionId, deleteBufferToFlush);
                }
            }
            this.flushBufferRecordsWithRetries(collectionId, this.getRecordsToFlush(upsertBufferByTable, collectionId, record));
        }
        this.flushBuffers(upsertBufferByTable);
        this.flushBuffers(deleteBufferByTable);
    }

    private void validate(JdbcSinkRecord record) {
        if (record.isSchemaChange()) {
            LOGGER.error(DETECT_SCHEMA_CHANGE_RECORD_MSG);
            throw new DataException(DETECT_SCHEMA_CHANGE_RECORD_MSG);
        }
    }

    private BufferFlushRecords getRecordsToFlush(Map<CollectionId, Buffer> bufferMap, CollectionId collectionId, JdbcSinkRecord record) {
        Stopwatch stopwatch = Stopwatch.reusable();
        stopwatch.start();
        Buffer buffer = this.getOrCreateBuffer(bufferMap, collectionId, record);
        if (this.isSchemaChanged(record, buffer.getTableDescriptor())) {
            this.flushBufferWithRetries(collectionId, buffer);
            bufferMap.remove(collectionId);
            buffer = this.getOrCreateBuffer(bufferMap, collectionId, record);
        }
        List<JdbcSinkRecord> toFlush = buffer.add(record);
        stopwatch.stop();
        LOGGER.trace("[PERF] Resolve and add record execution time for collection '{}': {}", (Object)collectionId.name(), (Object)stopwatch.durations());
        return new BufferFlushRecords(buffer, toFlush);
    }

    private Buffer getOrCreateBuffer(Map<CollectionId, Buffer> bufferMap, CollectionId collectionId, JdbcSinkRecord record) {
        return bufferMap.computeIfAbsent(collectionId, id -> {
            TableDescriptor tableDescriptor;
            try {
                tableDescriptor = this.checkAndApplyTableChangesIfNeeded(collectionId, record);
            }
            catch (SQLException e) {
                throw new ConnectException("Error while checking and applying table changes for collection '" + String.valueOf(collectionId) + "'", (Throwable)e);
            }
            return this.createBuffer(this.config, tableDescriptor, record);
        });
    }

    private Buffer createBuffer(JdbcSinkConnectorConfig config, TableDescriptor tableDescriptor, JdbcSinkRecord record) {
        if (config.isUseReductionBuffer() && !record.keyFieldNames().isEmpty()) {
            return new ReducedRecordBuffer(config, tableDescriptor);
        }
        return new RecordBuffer(config, tableDescriptor);
    }

    private boolean isSchemaChanged(JdbcSinkRecord record, TableDescriptor tableDescriptor) {
        Set<String> missingFields = this.dialect.resolveMissingFields(record, tableDescriptor);
        LOGGER.debug("Schema change detected for '{}', missing fields: {}", (Object)tableDescriptor.getId().toFullIdentiferString(), missingFields);
        return !missingFields.isEmpty();
    }

    private void flushBuffers(Map<CollectionId, Buffer> bufferByTable) {
        bufferByTable.forEach(this::flushBufferWithRetries);
    }

    private void flushBufferRecordsWithRetries(CollectionId collectionId, BufferFlushRecords bufferFlushRecords) {
        this.flushBufferWithRetries(collectionId, bufferFlushRecords.records(), bufferFlushRecords.buffer.getTableDescriptor());
    }

    private void flushBufferWithRetries(CollectionId collectionId, Buffer buffer) {
        this.flushBufferWithRetries(collectionId, buffer.flush(), buffer.getTableDescriptor());
    }

    private void flushBufferWithRetries(CollectionId collectionId, List<JdbcSinkRecord> toFlush, TableDescriptor tableDescriptor) {
        Exception lastException = null;
        LOGGER.debug("Flushing records in JDBC Writer for table: {}", (Object)collectionId.name());
        for (int retries = 0; retries <= this.flushMaxRetries; ++retries) {
            try {
                if (retries > 0) {
                    LOGGER.warn("Retry to flush records for table '{}'. Retry {}/{} with delay {} ms", new Object[]{collectionId.name(), retries, this.flushMaxRetries, this.flushRetryDelay.toMillis()});
                    try {
                        Metronome.parker((Duration)this.flushRetryDelay, (Clock)Clock.SYSTEM).pause();
                    }
                    catch (InterruptedException e) {
                        throw new ConnectException("Interrupted while waiting to retry flush records", (Throwable)e);
                    }
                }
                this.flushBuffer(collectionId, toFlush, tableDescriptor);
                return;
            }
            catch (Exception e) {
                lastException = e;
                if (this.isRetriable(e)) {
                    continue;
                }
                throw new ConnectException("Failed to process a sink record", (Throwable)e);
            }
        }
        throw new ConnectException("Exceeded max retries " + this.flushMaxRetries + " times, failed to process sink records", (Throwable)lastException);
    }

    private void flushBuffer(CollectionId collectionId, List<JdbcSinkRecord> toFlush, TableDescriptor table) throws SQLException {
        Stopwatch flushBufferStopwatch = Stopwatch.reusable();
        Stopwatch tableChangesStopwatch = Stopwatch.reusable();
        if (!toFlush.isEmpty()) {
            LOGGER.debug("Flushing records in JDBC Writer for table: {}", (Object)collectionId.name());
            tableChangesStopwatch.start();
            tableChangesStopwatch.stop();
            String sqlStatement = this.getSqlStatement(table, toFlush.get(0));
            flushBufferStopwatch.start();
            this.recordWriter.write(toFlush, sqlStatement);
            flushBufferStopwatch.stop();
            LOGGER.trace("[PERF] Flush buffer execution time {}", (Object)flushBufferStopwatch.durations());
            LOGGER.trace("[PERF] Table changes execution time {}", (Object)tableChangesStopwatch.durations());
        }
    }

    public void close() {
        if (this.session != null && this.session.isOpen()) {
            LOGGER.info("Closing session.");
            this.session.close();
        } else {
            LOGGER.info("Session already closed.");
        }
    }

    private TableDescriptor checkAndApplyTableChangesIfNeeded(CollectionId collectionId, JdbcSinkRecord record) throws SQLException {
        if (!this.hasTable(collectionId)) {
            try {
                return this.createTable(collectionId, record);
            }
            catch (SQLException ce) {
                LOGGER.warn("Table creation failed for '{}', attempting to alter the table", (Object)collectionId.toFullIdentiferString(), (Object)ce);
                try {
                    return this.alterTableIfNeeded(collectionId, record);
                }
                catch (SQLException ae) {
                    LOGGER.error("Failed to alter the table '{}'.", (Object)collectionId.toFullIdentiferString(), (Object)ae);
                    throw ae;
                }
            }
        }
        try {
            return this.alterTableIfNeeded(collectionId, record);
        }
        catch (SQLException ae) {
            LOGGER.error("Failed to alter the table '{}'.", (Object)collectionId.toFullIdentiferString(), (Object)ae);
            throw ae;
        }
    }

    private boolean hasTable(CollectionId collectionId) {
        return (Boolean)this.session.doReturningWork(connection -> this.dialect.tableExists(connection, collectionId));
    }

    private TableDescriptor readTable(CollectionId collectionId) {
        return (TableDescriptor)this.session.doReturningWork(connection -> this.dialect.readTable(connection, collectionId));
    }

    private TableDescriptor createTable(CollectionId collectionId, JdbcSinkRecord record) throws SQLException {
        LOGGER.debug("Attempting to create table '{}'.", (Object)collectionId.toFullIdentiferString());
        if (JdbcSinkConnectorConfig.SchemaEvolutionMode.NONE.equals((Object)this.config.getSchemaEvolutionMode())) {
            LOGGER.warn("Table '{}' cannot be created because schema evolution is disabled.", (Object)collectionId.toFullIdentiferString());
            throw new SQLException("Cannot create table " + collectionId.toFullIdentiferString() + " because schema evolution is disabled");
        }
        Transaction transaction = this.session.beginTransaction();
        try {
            String createSql = this.dialect.getCreateTableStatement(record, collectionId);
            LOGGER.trace("SQL: {}", (Object)createSql);
            this.session.createNativeQuery(createSql, Object.class).executeUpdate();
            transaction.commit();
        }
        catch (Exception e) {
            transaction.rollback();
            throw e;
        }
        return this.readTable(collectionId);
    }

    private TableDescriptor alterTableIfNeeded(CollectionId collectionId, JdbcSinkRecord record) throws SQLException {
        LOGGER.debug("Attempting to alter table '{}'.", (Object)collectionId.toFullIdentiferString());
        if (!this.hasTable(collectionId)) {
            LOGGER.error("Table '{}' does not exist and cannot be altered.", (Object)collectionId.toFullIdentiferString());
            throw new SQLException("Could not find table: " + collectionId.toFullIdentiferString());
        }
        TableDescriptor table = this.readTable(collectionId);
        Set<String> missingFields = this.dialect.resolveMissingFields(record, table);
        if (missingFields.isEmpty()) {
            return table;
        }
        LOGGER.debug("The follow fields are missing in the table: {}", missingFields);
        for (String missingFieldName : missingFields) {
            FieldDescriptor fieldDescriptor = (FieldDescriptor)record.allFields().get(missingFieldName);
            if (fieldDescriptor.getSchema().isOptional() || fieldDescriptor.getSchema().defaultValue() != null) continue;
            throw new SQLException(String.format("Cannot ALTER table '%s' because field '%s' is not optional but has no default value", collectionId.toFullIdentiferString(), fieldDescriptor.getName()));
        }
        if (JdbcSinkConnectorConfig.SchemaEvolutionMode.NONE.equals((Object)this.config.getSchemaEvolutionMode())) {
            LOGGER.warn("Table '{}' cannot be altered because schema evolution is disabled.", (Object)collectionId.toFullIdentiferString());
            throw new SQLException("Cannot alter table " + collectionId.toFullIdentiferString() + " because schema evolution is disabled");
        }
        Transaction transaction = this.session.beginTransaction();
        try {
            String alterSql = this.dialect.getAlterTableStatement(table, record, missingFields);
            LOGGER.trace("SQL: {}", (Object)alterSql);
            this.session.createNativeQuery(alterSql, Object.class).executeUpdate();
            transaction.commit();
        }
        catch (Exception e) {
            transaction.rollback();
            throw e;
        }
        return this.readTable(collectionId);
    }

    private String getSqlStatement(TableDescriptor table, JdbcSinkRecord record) {
        if (!record.isDelete()) {
            switch (this.config.getInsertMode()) {
                case INSERT: {
                    return this.dialect.getInsertStatement(table, record);
                }
                case UPSERT: {
                    if (record.keyFieldNames().isEmpty()) {
                        throw new ConnectException("Cannot write to table " + table.getId().name() + " with no key fields defined.");
                    }
                    return this.dialect.getUpsertStatement(table, record);
                }
                case UPDATE: {
                    return this.dialect.getUpdateStatement(table, record);
                }
            }
        } else {
            return this.dialect.getDeleteStatement(table, record);
        }
        throw new DataException(String.format("Unable to get SQL statement for %s", record));
    }

    private void writeTruncate(String sql) throws SQLException {
        Transaction transaction = this.session.beginTransaction();
        try {
            LOGGER.trace("SQL: {}", (Object)sql);
            NativeQuery query = this.session.createNativeQuery(sql, Object.class);
            query.executeUpdate();
            transaction.commit();
        }
        catch (Exception e) {
            transaction.rollback();
            throw e;
        }
    }

    public Optional<CollectionId> getCollectionId(String collectionName) {
        return Optional.of(this.dialect.getCollectionId(collectionName));
    }

    private boolean isRetriable(Throwable throwable) {
        if (throwable == null) {
            return false;
        }
        for (Class<? extends Exception> e : this.dialect.getCommunicationExceptions()) {
            if (!e.isAssignableFrom(throwable.getClass())) continue;
            return true;
        }
        return this.isRetriable(throwable.getCause());
    }

    public Optional<CollectionId> getCollectionIdFromRecord(DebeziumSinkRecord record) {
        String tableName = this.config.getCollectionNamingStrategy().resolveCollectionName(record, this.config.getCollectionNameFormat());
        if (tableName == null) {
            return Optional.empty();
        }
        return this.getCollectionId(tableName);
    }

    private record BufferFlushRecords(Buffer buffer, List<JdbcSinkRecord> records) {
    }
}

