/*
 * Decompiled with CFR 0.152.
 */
package io.debezium.connector.vitess.connection;

import binlogdata.Binlogdata;
import io.debezium.connector.vitess.Vgtid;
import io.debezium.connector.vitess.VitessDatabaseSchema;
import io.debezium.connector.vitess.VitessType;
import io.debezium.connector.vitess.connection.ColumnMetaData;
import io.debezium.connector.vitess.connection.DdlMessage;
import io.debezium.connector.vitess.connection.KeyMetaData;
import io.debezium.connector.vitess.connection.MessageDecoder;
import io.debezium.connector.vitess.connection.OtherMessage;
import io.debezium.connector.vitess.connection.ReplicationMessage;
import io.debezium.connector.vitess.connection.ReplicationMessageColumn;
import io.debezium.connector.vitess.connection.ReplicationMessageProcessor;
import io.debezium.connector.vitess.connection.TransactionalMessage;
import io.debezium.connector.vitess.connection.VStreamOutputReplicationMessage;
import io.debezium.relational.Column;
import io.debezium.relational.ColumnEditor;
import io.debezium.relational.Table;
import io.debezium.relational.TableEditor;
import io.debezium.relational.TableId;
import io.vitess.proto.Query;
import java.nio.charset.StandardCharsets;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class VStreamOutputMessageDecoder
implements MessageDecoder {
    private static final Logger LOGGER = LoggerFactory.getLogger(VStreamOutputMessageDecoder.class);
    private static final int NOT_NULL_FLAG = 1;
    private static final int PRI_KEY_FLAG = 2;
    private static final int UNIQUE_KEY_FLAG = 4;
    private Instant commitTimestamp;
    private String transactionId = null;
    private final VitessDatabaseSchema schema;

    public VStreamOutputMessageDecoder(VitessDatabaseSchema schema) {
        this.schema = schema;
    }

    @Override
    public void processMessage(Binlogdata.VEvent vEvent, ReplicationMessageProcessor processor, Vgtid newVgtid, boolean isLastRowEventOfTransaction) throws InterruptedException {
        Binlogdata.VEventType vEventType = vEvent.getType();
        switch (vEventType) {
            case BEGIN: {
                this.handleBeginMessage(vEvent, processor, newVgtid);
                break;
            }
            case COMMIT: {
                this.handleCommitMessage(vEvent, processor, newVgtid);
                break;
            }
            case ROW: {
                this.decodeRows(vEvent, processor, newVgtid, isLastRowEventOfTransaction);
                break;
            }
            case FIELD: {
                this.handleFieldMessage(vEvent);
                break;
            }
            case DDL: {
                this.handleDdl(vEvent, processor, newVgtid);
                break;
            }
            case OTHER: {
                this.handleOther(vEvent, processor, newVgtid);
                break;
            }
            case VGTID: 
            case VERSION: {
                break;
            }
            default: {
                LOGGER.warn("vEventType {} skipped, not proccess.", (Object)vEventType);
            }
        }
    }

    private void handleDdl(Binlogdata.VEvent vEvent, ReplicationMessageProcessor processor, Vgtid newVgtid) throws InterruptedException {
        this.commitTimestamp = Instant.ofEpochSecond(vEvent.getTimestamp());
        if (newVgtid != null) {
            this.transactionId = newVgtid.toString();
        }
        processor.process(new DdlMessage(this.transactionId, this.commitTimestamp), newVgtid, false);
    }

    private void handleOther(Binlogdata.VEvent vEvent, ReplicationMessageProcessor processor, Vgtid newVgtid) throws InterruptedException {
        this.commitTimestamp = Instant.ofEpochSecond(vEvent.getTimestamp());
        if (newVgtid != null) {
            this.transactionId = newVgtid.toString();
        }
        processor.process(new OtherMessage(this.transactionId, this.commitTimestamp), newVgtid, false);
    }

    private void handleBeginMessage(Binlogdata.VEvent vEvent, ReplicationMessageProcessor processor, Vgtid newVgtid) throws InterruptedException {
        this.commitTimestamp = Instant.ofEpochSecond(vEvent.getTimestamp());
        if (newVgtid != null) {
            this.transactionId = newVgtid.toString();
        }
        LOGGER.trace("Commit timestamp of begin transaction: {}", (Object)this.commitTimestamp);
        processor.process(new TransactionalMessage(ReplicationMessage.Operation.BEGIN, this.transactionId, this.commitTimestamp), newVgtid, false);
    }

    private void handleCommitMessage(Binlogdata.VEvent vEvent, ReplicationMessageProcessor processor, Vgtid newVgtid) throws InterruptedException {
        Instant commitTimestamp = Instant.ofEpochSecond(vEvent.getTimestamp());
        LOGGER.trace("Commit timestamp of commit transaction: {}", (Object)commitTimestamp);
        processor.process(new TransactionalMessage(ReplicationMessage.Operation.COMMIT, this.transactionId, commitTimestamp), newVgtid, false);
    }

    private void decodeRows(Binlogdata.VEvent vEvent, ReplicationMessageProcessor processor, Vgtid newVgtid, boolean isLastRowEventOfTransaction) throws InterruptedException {
        Binlogdata.RowEvent rowEvent = vEvent.getRowEvent();
        String[] schemaTableTuple = rowEvent.getTableName().split("\\.");
        if (schemaTableTuple.length != 2) {
            LOGGER.error("Handling ROW vEvent. schemaTableTuple should have schema name and table name but has size {}. {} is skipped.", (Object)schemaTableTuple.length, (Object)rowEvent);
        } else {
            String schemaName = schemaTableTuple[0];
            String tableName = schemaTableTuple[1];
            int numOfRowChanges = rowEvent.getRowChangesCount();
            int numOfRowChangesEventSeen = 0;
            for (int i = 0; i < numOfRowChanges; ++i) {
                boolean isLastRowOfTransaction;
                Binlogdata.RowChange rowChange = rowEvent.getRowChanges(i);
                boolean bl = isLastRowOfTransaction = isLastRowEventOfTransaction && ++numOfRowChangesEventSeen == numOfRowChanges;
                if (rowChange.hasAfter() && !rowChange.hasBefore()) {
                    this.decodeInsert(rowChange.getAfter(), schemaName, tableName, processor, newVgtid, isLastRowOfTransaction);
                    continue;
                }
                if (rowChange.hasAfter() && rowChange.hasBefore()) {
                    this.decodeUpdate(rowChange.getBefore(), rowChange.getAfter(), schemaName, tableName, processor, newVgtid, isLastRowOfTransaction);
                    continue;
                }
                if (!rowChange.hasAfter() && rowChange.hasBefore()) {
                    this.decodeDelete(rowChange.getBefore(), schemaName, tableName, processor, newVgtid, isLastRowOfTransaction);
                    continue;
                }
                LOGGER.error("{} decodeRow skipped.", (Object)vEvent);
            }
        }
    }

    private void decodeInsert(Query.Row row, String schemaName, String tableName, ReplicationMessageProcessor processor, Vgtid newVgtid, boolean isLastRowEventOfTransaction) throws InterruptedException {
        TableId tableId;
        Optional<Table> resolvedTable = this.resolveRelation(schemaName, tableName);
        List<ReplicationMessage.Column> columns = null;
        if (!resolvedTable.isPresent()) {
            LOGGER.trace("Row insert for {}.{} is filtered out", (Object)schemaName, (Object)tableName);
            tableId = new TableId(null, schemaName, tableName);
        } else {
            Table table = resolvedTable.get();
            tableId = table.id();
            columns = this.resolveColumns(row, table);
        }
        processor.process(new VStreamOutputReplicationMessage(ReplicationMessage.Operation.INSERT, this.commitTimestamp, this.transactionId, tableId.toDoubleQuotedString(), null, columns), newVgtid, isLastRowEventOfTransaction);
    }

    private void decodeUpdate(Query.Row oldRow, Query.Row newRow, String schemaName, String tableName, ReplicationMessageProcessor processor, Vgtid newVgtid, boolean isLastRowEventOfTransaction) throws InterruptedException {
        TableId tableId;
        Optional<Table> resolvedTable = this.resolveRelation(schemaName, tableName);
        List<ReplicationMessage.Column> oldColumns = null;
        List<ReplicationMessage.Column> newColumns = null;
        if (!resolvedTable.isPresent()) {
            LOGGER.trace("Row update for {}.{} is filtered out", (Object)schemaName, (Object)tableName);
            tableId = new TableId(null, schemaName, tableName);
        } else {
            Table table = resolvedTable.get();
            tableId = table.id();
            oldColumns = this.resolveColumns(oldRow, table);
            newColumns = this.resolveColumns(newRow, table);
        }
        processor.process(new VStreamOutputReplicationMessage(ReplicationMessage.Operation.UPDATE, this.commitTimestamp, this.transactionId, tableId.toDoubleQuotedString(), oldColumns, newColumns), newVgtid, isLastRowEventOfTransaction);
    }

    private void decodeDelete(Query.Row row, String schemaName, String tableName, ReplicationMessageProcessor processor, Vgtid newVgtid, boolean isLastRowOfTransaction) throws InterruptedException {
        TableId tableId;
        Optional<Table> resolvedTable = this.resolveRelation(schemaName, tableName);
        List<ReplicationMessage.Column> columns = null;
        if (!resolvedTable.isPresent()) {
            LOGGER.trace("Row delete for {}.{} is filtered out", (Object)schemaName, (Object)tableName);
            tableId = new TableId(null, schemaName, tableName);
        } else {
            Table table = resolvedTable.get();
            tableId = table.id();
            columns = this.resolveColumns(row, table);
        }
        processor.process(new VStreamOutputReplicationMessage(ReplicationMessage.Operation.DELETE, this.commitTimestamp, this.transactionId, tableId.toDoubleQuotedString(), columns, null), newVgtid, isLastRowOfTransaction);
    }

    private Optional<Table> resolveRelation(String schemaName, String tableName) {
        return Optional.ofNullable(this.schema.tableFor(new TableId(null, schemaName, tableName)));
    }

    private List<ReplicationMessage.Column> resolveColumns(Query.Row row, Table table) {
        int numberOfColumns = row.getLengthsCount();
        List tableColumns = table.columns();
        if (tableColumns.size() != numberOfColumns) {
            throw new IllegalStateException(String.format("The number of columns in the ROW event {} is different from the in-memory table schema {}.", row, table));
        }
        String rawValues = row.getValues().toStringUtf8();
        int rawValueIndex = 0;
        ArrayList<ReplicationMessage.Column> columns = new ArrayList<ReplicationMessage.Column>(numberOfColumns);
        for (int i = 0; i < numberOfColumns; i = (int)((short)(i + 1))) {
            String rawValue;
            Column column = (Column)tableColumns.get(i);
            String columnName = column.name();
            VitessType vitessType = new VitessType(column.typeName(), column.jdbcType(), column.enumValues());
            boolean optional = column.isOptional();
            int rawValueLength = (int)row.getLengths(i);
            String string = rawValue = rawValueLength == -1 ? null : new String(Arrays.copyOfRange(rawValues.getBytes(StandardCharsets.UTF_8), rawValueIndex, rawValueIndex + rawValueLength));
            if (rawValueLength != -1) {
                rawValueIndex += rawValueLength;
            }
            columns.add(new ReplicationMessageColumn(columnName, vitessType, optional, rawValue));
        }
        return columns;
    }

    private void handleFieldMessage(Binlogdata.VEvent vEvent) {
        Binlogdata.FieldEvent fieldEvent = vEvent.getFieldEvent();
        if (fieldEvent == null) {
            LOGGER.error("fieldEvent is expected from {}", (Object)vEvent);
        } else {
            String[] schemaTableTuple = fieldEvent.getTableName().split("\\.");
            if (schemaTableTuple.length != 2) {
                LOGGER.error("Handling FIELD vEvent. schemaTableTuple should have schema name and table name but has size {}. {} is skipped", (Object)schemaTableTuple.length, (Object)vEvent);
            } else {
                LOGGER.debug("Handling FIELD vEvent: {}", (Object)fieldEvent);
                String schemaName = schemaTableTuple[0];
                String tableName = schemaTableTuple[1];
                int columnCount = fieldEvent.getFieldsCount();
                ArrayList<ColumnMetaData> columns = new ArrayList<ColumnMetaData>(columnCount);
                for (int i = 0; i < columnCount; i = (int)((short)(i + 1))) {
                    Query.Field field = fieldEvent.getFields(i);
                    String columnName = VStreamOutputMessageDecoder.validateColumnName(field.getName(), schemaName, tableName);
                    VitessType vitessType = VitessType.resolve(field);
                    if (vitessType.getJdbcId() == 1111) {
                        LOGGER.error("Cannot resolve JDBC type from vstream field {}", (Object)field);
                    }
                    KeyMetaData keyMetaData = KeyMetaData.NONE;
                    if ((field.getFlags() & 2) != 0) {
                        keyMetaData = KeyMetaData.IS_KEY;
                    } else if ((field.getFlags() & 4) != 0) {
                        keyMetaData = KeyMetaData.IS_UNIQUE_KEY;
                    }
                    boolean optional = (field.getFlags() & 1) == 0;
                    columns.add(new ColumnMetaData(columnName, vitessType, optional, keyMetaData));
                }
                Table table = this.resolveTable(schemaName, tableName, columns);
                LOGGER.debug("Number of columns in the resolved table: {}", (Object)table.columns().size());
                this.schema.applySchemaChangesForTable(table);
            }
        }
    }

    private Table resolveTable(String schemaName, String tableName, List<ColumnMetaData> columns) {
        ArrayList<String> pkColumnNames = new ArrayList<String>();
        String uniqueKeyColumnName = null;
        ArrayList<Column> cols = new ArrayList<Column>(columns.size());
        for (ColumnMetaData columnMetaData : columns) {
            ColumnEditor editor = Column.editor().name(columnMetaData.getColumnName()).type(columnMetaData.getVitessType().getName()).jdbcType(columnMetaData.getVitessType().getJdbcId()).optional(columnMetaData.isOptional());
            if (columnMetaData.getVitessType().isEnum()) {
                editor = editor.enumValues(columnMetaData.getVitessType().getEnumValues());
            }
            cols.add(editor.create());
            switch (columnMetaData.getKeyMetaData()) {
                case IS_KEY: {
                    pkColumnNames.add(columnMetaData.getColumnName());
                    break;
                }
                case IS_UNIQUE_KEY: {
                    if (uniqueKeyColumnName != null) break;
                    uniqueKeyColumnName = columnMetaData.getColumnName();
                    break;
                }
            }
        }
        TableEditor tableEditor = Table.editor().addColumns(cols).tableId(new TableId(null, schemaName, tableName));
        if (!pkColumnNames.isEmpty()) {
            tableEditor = tableEditor.setPrimaryKeyNames(pkColumnNames);
        } else if (uniqueKeyColumnName != null) {
            tableEditor = tableEditor.setPrimaryKeyNames(Collections.singletonList(uniqueKeyColumnName));
        }
        return tableEditor.create();
    }

    private static String validateColumnName(String columnName, String schemaName, String tableName) {
        int length = columnName.length();
        if (length == 0) {
            throw new IllegalArgumentException(String.format("Empty column name from schema: %s, table: %s", schemaName, tableName));
        }
        char first = columnName.charAt(0);
        if (first == '@') {
            throw new IllegalArgumentException(String.format("Illegal prefix '@' for column: %s, from schema: %s, table: %s", columnName, schemaName, tableName));
        }
        return columnName;
    }
}

