/*
 * Decompiled with CFR 0.152.
 */
package io.debezium.connector.oracle.logminer;

import io.debezium.DebeziumException;
import io.debezium.connector.oracle.OracleConnection;
import io.debezium.connector.oracle.OracleConnectorConfig;
import io.debezium.connector.oracle.OracleDatabaseSchema;
import io.debezium.connector.oracle.OracleOffsetContext;
import io.debezium.connector.oracle.OracleSchemaChangeEventEmitter;
import io.debezium.connector.oracle.OracleStreamingChangeEventSourceMetrics;
import io.debezium.connector.oracle.OracleValueConverters;
import io.debezium.connector.oracle.Scn;
import io.debezium.connector.oracle.logminer.HistoryRecorder;
import io.debezium.connector.oracle.logminer.LogMinerHelper;
import io.debezium.connector.oracle.logminer.RowMapper;
import io.debezium.connector.oracle.logminer.TransactionalBuffer;
import io.debezium.connector.oracle.logminer.parser.DmlParser;
import io.debezium.connector.oracle.logminer.parser.DmlParserException;
import io.debezium.connector.oracle.logminer.parser.LogMinerDmlParser;
import io.debezium.connector.oracle.logminer.parser.SelectLobParser;
import io.debezium.connector.oracle.logminer.parser.SimpleDmlParser;
import io.debezium.connector.oracle.logminer.valueholder.LogMinerDmlEntry;
import io.debezium.pipeline.EventDispatcher;
import io.debezium.pipeline.source.spi.ChangeEventSource;
import io.debezium.pipeline.spi.SchemaChangeEventEmitter;
import io.debezium.relational.Table;
import io.debezium.relational.TableId;
import io.debezium.schema.DataCollectionId;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Timestamp;
import java.time.Duration;
import java.time.Instant;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

class LogMinerQueryResultProcessor {
    private static final Logger LOGGER = LoggerFactory.getLogger(LogMinerQueryResultProcessor.class);
    private final ChangeEventSource.ChangeEventSourceContext context;
    private final OracleStreamingChangeEventSourceMetrics streamingMetrics;
    private final TransactionalBuffer transactionalBuffer;
    private final DmlParser dmlParser;
    private final OracleOffsetContext offsetContext;
    private final OracleDatabaseSchema schema;
    private final EventDispatcher<TableId> dispatcher;
    private final OracleConnectorConfig connectorConfig;
    private final HistoryRecorder historyRecorder;
    private final SelectLobParser selectLobParser;
    private Scn currentOffsetScn = Scn.NULL;
    private Scn currentOffsetCommitScn = Scn.NULL;
    private Scn lastProcessedScn = Scn.NULL;
    private long stuckScnCounter = 0L;

    LogMinerQueryResultProcessor(ChangeEventSource.ChangeEventSourceContext context, OracleConnectorConfig connectorConfig, OracleStreamingChangeEventSourceMetrics streamingMetrics, TransactionalBuffer transactionalBuffer, OracleOffsetContext offsetContext, OracleDatabaseSchema schema, EventDispatcher<TableId> dispatcher, HistoryRecorder historyRecorder) {
        this.context = context;
        this.streamingMetrics = streamingMetrics;
        this.transactionalBuffer = transactionalBuffer;
        this.offsetContext = offsetContext;
        this.schema = schema;
        this.dispatcher = dispatcher;
        this.historyRecorder = historyRecorder;
        this.connectorConfig = connectorConfig;
        this.dmlParser = LogMinerQueryResultProcessor.resolveParser(connectorConfig, schema.getValueConverters());
        this.selectLobParser = new SelectLobParser();
    }

    private static DmlParser resolveParser(OracleConnectorConfig connectorConfig, OracleValueConverters valueConverters) {
        if (connectorConfig.getLogMiningDmlParser().equals((Object)OracleConnectorConfig.LogMiningDmlParser.LEGACY)) {
            return new SimpleDmlParser(connectorConfig.getCatalogName(), valueConverters);
        }
        return new LogMinerDmlParser();
    }

    void processResult(ResultSet resultSet) throws SQLException {
        int dmlCounter = 0;
        int insertCounter = 0;
        int updateCounter = 0;
        int deleteCounter = 0;
        int commitCounter = 0;
        int rollbackCounter = 0;
        long rows = 0L;
        Instant startTime = Instant.now();
        while (this.context.isRunning() && this.hasNext(resultSet)) {
            ++rows;
            Scn scn = RowMapper.getScn(resultSet);
            if (scn.isNull()) {
                throw new DebeziumException("Unexpected null SCN detected in LogMiner results");
            }
            String tableName = RowMapper.getTableName(resultSet);
            String segOwner = RowMapper.getSegOwner(resultSet);
            int operationCode = RowMapper.getOperationCode(resultSet);
            Timestamp changeTime = RowMapper.getChangeTime(resultSet);
            String txId = RowMapper.getTransactionId(resultSet);
            String operation = RowMapper.getOperation(resultSet);
            String userName = RowMapper.getUsername(resultSet);
            String rowId = RowMapper.getRowId(resultSet);
            int rollbackFlag = RowMapper.getRollbackFlag(resultSet);
            Object rsId = RowMapper.getRsId(resultSet);
            boolean dml = LogMinerQueryResultProcessor.isDmlOperation(operationCode);
            String redoSql = RowMapper.getSqlRedo(resultSet, dml, this.historyRecorder, scn, tableName, segOwner, operationCode, changeTime, txId);
            LOGGER.trace("scn={}, operationCode={}, operation={}, table={}, segOwner={}, userName={}, rowId={}, rollbackFlag={}", new Object[]{scn, operationCode, operation, tableName, segOwner, userName, rowId, rollbackFlag});
            String logMessage = String.format("transactionId=%s, SCN=%s, table_name=%s, segOwner=%s, operationCode=%s, offsetSCN=%s,  commitOffsetSCN=%s", txId, scn, tableName, segOwner, operationCode, this.offsetContext.getScn(), this.offsetContext.getCommitScn());
            if (operationCode != 34) {
                this.lastProcessedScn = scn;
            }
            switch (operationCode) {
                case 6: {
                    this.transactionalBuffer.registerTransaction(txId, scn);
                    break;
                }
                case 7: {
                    if (!this.transactionalBuffer.isTransactionRegistered(txId)) break;
                    this.historyRecorder.record(scn, tableName, segOwner, operationCode, changeTime, txId, 0, redoSql);
                    if (!this.transactionalBuffer.commit(txId, scn, this.offsetContext, changeTime, this.context, logMessage, this.dispatcher)) break;
                    LOGGER.trace("COMMIT, {}", (Object)logMessage);
                    ++commitCounter;
                    break;
                }
                case 36: {
                    if (!this.transactionalBuffer.isTransactionRegistered(txId)) break;
                    this.historyRecorder.record(scn, tableName, segOwner, operationCode, changeTime, txId, 0, redoSql);
                    if (!this.transactionalBuffer.rollback(txId, logMessage)) break;
                    LOGGER.trace("ROLLBACK, {}", (Object)logMessage);
                    ++rollbackCounter;
                    break;
                }
                case 5: {
                    TableId tableId;
                    if (this.transactionalBuffer.isDdlOperationRegistered(scn)) {
                        LOGGER.trace("DDL: {} has already been seen, skipped.", (Object)redoSql);
                        break;
                    }
                    this.historyRecorder.record(scn, tableName, segOwner, operationCode, changeTime, txId, 0, redoSql);
                    LOGGER.info("DDL: {}, REDO_SQL: {}", (Object)logMessage, (Object)redoSql);
                    try {
                        if (tableName != null) {
                            tableId = RowMapper.getTableId(this.connectorConfig.getCatalogName(), resultSet);
                            this.transactionalBuffer.registerDdlOperation(scn);
                            this.dispatcher.dispatchSchemaChangeEvent((DataCollectionId)tableId, (SchemaChangeEventEmitter)new OracleSchemaChangeEventEmitter(this.connectorConfig, this.offsetContext, tableId, tableId.catalog(), tableId.schema(), redoSql, this.schema, changeTime.toInstant(), this.streamingMetrics));
                        }
                    }
                    catch (InterruptedException e) {
                        throw new DebeziumException("Failed to dispatch DDL event", (Throwable)e);
                    }
                }
                case 34: {
                    this.historyRecorder.record(scn, tableName, segOwner, operationCode, changeTime, txId, 0, redoSql);
                    LogMinerHelper.logWarn(this.streamingMetrics, "Missing SCN, {}", logMessage);
                    break;
                }
                case 9: {
                    if (!this.connectorConfig.isLobEnabled()) {
                        LOGGER.trace("SEL_LOB_LOCATOR operation skipped for '{}', LOB not enabled.", (Object)redoSql);
                        break;
                    }
                    LOGGER.trace("SEL_LOB_LOCATOR: {}, REDO_SQL: {}", (Object)logMessage, (Object)redoSql);
                    TableId tableId = RowMapper.getTableId(this.connectorConfig.getCatalogName(), resultSet);
                    Table table = this.schema.tableFor(tableId);
                    if (table == null) {
                        LogMinerHelper.logWarn(this.streamingMetrics, "SEL_LOB_LOCATOR for table '{}' is not known to the connector, skipped.", tableId);
                        break;
                    }
                    this.transactionalBuffer.registerSelectLobOperation(operationCode, txId, scn, tableId, changeTime.toInstant(), rowId, rsId, segOwner, tableName, redoSql, this.schema.tableFor(tableId), this.selectLobParser);
                    break;
                }
                case 10: {
                    if (!this.connectorConfig.isLobEnabled()) {
                        LOGGER.trace("LOB_WRITE operation skipped, LOB not enabled.");
                        break;
                    }
                    TableId tableId = RowMapper.getTableId(this.connectorConfig.getCatalogName(), resultSet);
                    if (this.schema.tableFor(tableId) == null) {
                        LogMinerHelper.logWarn(this.streamingMetrics, "LOB_WRITE for table '{}' is not known to the connector, skipped.", tableId);
                        break;
                    }
                    this.transactionalBuffer.registerLobWriteOperation(operationCode, txId, scn, tableId, redoSql, changeTime.toInstant(), rowId, rsId);
                    break;
                }
                case 29: {
                    if (!this.connectorConfig.isLobEnabled()) {
                        LOGGER.trace("LOB_ERASE operation skipped, LOB not enabled.");
                        break;
                    }
                    TableId tableId = RowMapper.getTableId(this.connectorConfig.getCatalogName(), resultSet);
                    if (this.schema.tableFor(tableId) == null) {
                        LogMinerHelper.logWarn(this.streamingMetrics, "LOB_ERASE for table '{}' is not known to the connector, skipped.", tableId);
                        break;
                    }
                    this.transactionalBuffer.registerLobEraseOperation(operationCode, txId, scn, tableId, changeTime.toInstant(), rowId, rsId);
                    break;
                }
                case 1: 
                case 2: 
                case 3: {
                    Table table;
                    TableId tableId;
                    LOGGER.trace("DML, {}, sql {}", (Object)logMessage, (Object)redoSql);
                    if (redoSql != null) {
                        tableId = RowMapper.getTableId(this.connectorConfig.getCatalogName(), resultSet);
                        ++dmlCounter;
                        switch (operationCode) {
                            case 1: {
                                ++insertCounter;
                                break;
                            }
                            case 3: {
                                ++updateCounter;
                                break;
                            }
                            case 2: {
                                ++deleteCounter;
                            }
                        }
                        table = this.getTableForDmlEvent(tableId);
                        if (rollbackFlag == 1) {
                            this.transactionalBuffer.undoDmlOperation(txId, rowId, tableId);
                            break;
                        }
                        this.transactionalBuffer.registerDmlOperation(operationCode, txId, scn, tableId, () -> {
                            LogMinerDmlEntry dmlEntry = this.parse(redoSql, table, txId);
                            dmlEntry.setObjectOwner(segOwner);
                            dmlEntry.setObjectName(tableName);
                            return dmlEntry;
                        }, changeTime.toInstant(), rowId, rsId);
                        break;
                    }
                    LOGGER.trace("Redo SQL was empty, DML operation skipped.");
                }
            }
        }
        Duration totalTime = Duration.between(startTime, Instant.now());
        if (dmlCounter > 0 || commitCounter > 0 || rollbackCounter > 0) {
            this.streamingMetrics.setLastCapturedDmlCount(dmlCounter);
            this.streamingMetrics.setLastDurationOfBatchProcessing(totalTime);
            this.warnStuckScn();
            this.currentOffsetScn = this.offsetContext.getScn();
            if (this.offsetContext.getCommitScn() != null) {
                this.currentOffsetCommitScn = this.offsetContext.getCommitScn();
            }
        }
        LOGGER.debug("{} Rows, {} DMLs, {} Commits, {} Rollbacks, {} Inserts, {} Updates, {} Deletes. Processed in {} millis. Lag:{}. Offset scn:{}. Offset commit scn:{}. Active transactions:{}. Sleep time:{}", new Object[]{rows, dmlCounter, commitCounter, rollbackCounter, insertCounter, updateCounter, deleteCounter, totalTime.toMillis(), this.streamingMetrics.getLagFromSourceInMilliseconds(), this.offsetContext.getScn(), this.offsetContext.getCommitScn(), this.streamingMetrics.getNumberOfActiveTransactions(), this.streamingMetrics.getMillisecondToSleepBetweenMiningQuery()});
        this.streamingMetrics.addProcessedRows(rows);
        this.historyRecorder.flush();
    }

    Scn getLastProcessedScn() {
        return this.lastProcessedScn;
    }

    private boolean hasNext(ResultSet resultSet) throws SQLException {
        Instant rsNextStart = Instant.now();
        if (resultSet.next()) {
            this.streamingMetrics.addCurrentResultSetNext(Duration.between(rsNextStart, Instant.now()));
            return true;
        }
        return false;
    }

    private Table getTableForDmlEvent(TableId tableId) throws SQLException {
        Table table = this.schema.tableFor(tableId);
        if (table == null) {
            if (this.connectorConfig.getTableFilters().dataCollectionFilter().isIncluded(tableId)) {
                table = this.dispatchSchemaChangeEventAndGetTableForNewCapturedTable(tableId);
            } else {
                LogMinerHelper.logWarn(this.streamingMetrics, "DML for table '{}' that is not known to this connector, skipping.", tableId);
            }
        }
        return table;
    }

    private Table dispatchSchemaChangeEventAndGetTableForNewCapturedTable(TableId tableId) throws SQLException {
        try {
            LOGGER.info("Table {} is new and will be captured.", (Object)tableId);
            this.offsetContext.event((DataCollectionId)tableId, Instant.now());
            this.dispatcher.dispatchSchemaChangeEvent((DataCollectionId)tableId, (SchemaChangeEventEmitter)new OracleSchemaChangeEventEmitter(this.connectorConfig, this.offsetContext, tableId, tableId.catalog(), tableId.schema(), this.getTableMetadataDdl(tableId), this.schema, Instant.now(), this.streamingMetrics));
            return this.schema.tableFor(tableId);
        }
        catch (InterruptedException e) {
            throw new DebeziumException("Failed to dispatch schema change event", (Throwable)e);
        }
    }

    private String getTableMetadataDdl(TableId tableId) throws SQLException {
        String pdbName = this.connectorConfig.getPdbName();
        try (OracleConnection connection = new OracleConnection(this.connectorConfig.getJdbcConfig(), () -> this.getClass().getClassLoader());){
            if (pdbName != null) {
                connection.setSessionToPdb(pdbName);
            }
            String string = connection.getTableMetadataDdl(tableId);
            return string;
        }
    }

    private void warnStuckScn() {
        if (this.offsetContext != null && this.offsetContext.getCommitScn() != null) {
            Scn scn = this.offsetContext.getScn();
            Scn commitScn = this.offsetContext.getCommitScn();
            if (this.currentOffsetScn.equals(scn) && !this.currentOffsetCommitScn.equals(commitScn)) {
                ++this.stuckScnCounter;
                if (this.stuckScnCounter == 25L) {
                    LogMinerHelper.logWarn(this.streamingMetrics, "Offset SCN {} is not changing. It indicates long transaction(s). Offset commit SCN: {}", this.currentOffsetScn, commitScn);
                    this.streamingMetrics.incrementScnFreezeCount();
                }
            } else {
                this.stuckScnCounter = 0L;
            }
        }
    }

    private LogMinerDmlEntry parse(String redoSql, Table table, String txId) {
        LogMinerDmlEntry dmlEntry;
        try {
            Instant parseStart = Instant.now();
            dmlEntry = this.dmlParser.parse(redoSql, table, txId);
            this.streamingMetrics.addCurrentParseTime(Duration.between(parseStart, Instant.now()));
        }
        catch (DmlParserException e) {
            StringBuilder message = new StringBuilder();
            message.append("DML statement couldn't be parsed.");
            message.append(" Please open a Jira issue with the statement '").append(redoSql).append("'.");
            if (OracleConnectorConfig.LogMiningDmlParser.FAST.equals((Object)this.connectorConfig.getLogMiningDmlParser())) {
                message.append(" You can set internal.log.mining.dml.parser='legacy' as a workaround until the parse error is fixed.");
            }
            throw new DmlParserException(message.toString(), (Throwable)((Object)e));
        }
        if (dmlEntry.getOldValues().length == 0 && (3 == dmlEntry.getOperation() || 2 == dmlEntry.getOperation())) {
            LOGGER.warn("The DML event '{}' contained no before state.", (Object)redoSql);
            this.streamingMetrics.incrementWarningCount();
        }
        return dmlEntry;
    }

    private static boolean isDmlOperation(int operationCode) {
        switch (operationCode) {
            case 1: 
            case 2: 
            case 3: {
                return true;
            }
        }
        return false;
    }
}

