/*
 * Decompiled with CFR 0.152.
 */
package io.debezium.connector.postgresql;

import io.debezium.connector.postgresql.PostgresConnectorConfig;
import io.debezium.connector.postgresql.PostgresSchema;
import io.debezium.connector.postgresql.PostgresType;
import io.debezium.connector.postgresql.UnchangedToastedReplicationMessageColumn;
import io.debezium.connector.postgresql.connection.PostgresConnection;
import io.debezium.connector.postgresql.connection.ReplicationMessage;
import io.debezium.data.Envelope;
import io.debezium.function.Predicates;
import io.debezium.pipeline.spi.ChangeRecordEmitter;
import io.debezium.pipeline.spi.OffsetContext;
import io.debezium.relational.Column;
import io.debezium.relational.ColumnEditor;
import io.debezium.relational.RelationalChangeRecordEmitter;
import io.debezium.relational.Table;
import io.debezium.relational.TableEditor;
import io.debezium.relational.TableId;
import io.debezium.relational.TableSchema;
import io.debezium.schema.DataCollectionSchema;
import io.debezium.util.Clock;
import io.debezium.util.Strings;
import java.sql.SQLException;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.stream.Collectors;
import org.apache.kafka.connect.errors.ConnectException;
import org.postgresql.jdbc.PgConnection;

public class PostgresChangeRecordEmitter
extends RelationalChangeRecordEmitter {
    private final ReplicationMessage message;
    private final PostgresSchema schema;
    private final PostgresConnectorConfig connectorConfig;
    private final PostgresConnection connection;
    private final TableId tableId;
    private final boolean unchangedToastColumnMarkerMissing;
    private final Map<String, Object> cachedOldToastedValues = new HashMap<String, Object>();

    public PostgresChangeRecordEmitter(OffsetContext offset, Clock clock, PostgresConnectorConfig connectorConfig, PostgresSchema schema, PostgresConnection connection, ReplicationMessage message) {
        super(offset, clock);
        this.schema = schema;
        this.message = message;
        this.connectorConfig = connectorConfig;
        this.connection = connection;
        this.tableId = PostgresSchema.parse(message.getTable());
        this.unchangedToastColumnMarkerMissing = !connectorConfig.plugin().hasUnchangedToastColumnMarker();
        Objects.requireNonNull(this.tableId);
    }

    protected Envelope.Operation getOperation() {
        switch (this.message.getOperation()) {
            case INSERT: {
                return Envelope.Operation.CREATE;
            }
            case UPDATE: {
                return Envelope.Operation.UPDATE;
            }
            case DELETE: {
                return Envelope.Operation.DELETE;
            }
        }
        throw new IllegalArgumentException("Received event of unexpected command type: " + (Object)((Object)this.message.getOperation()));
    }

    public void emitChangeRecords(DataCollectionSchema schema, ChangeRecordEmitter.Receiver receiver) throws InterruptedException {
        schema = this.synchronizeTableSchema(schema);
        super.emitChangeRecords(schema, receiver);
    }

    protected Object[] getOldColumnValues() {
        try {
            switch (this.getOperation()) {
                case CREATE: {
                    return null;
                }
                case UPDATE: {
                    return this.columnValues(this.message.getOldTupleList(), this.tableId, true, this.message.hasTypeMetadata(), true);
                }
            }
            return this.columnValues(this.message.getOldTupleList(), this.tableId, true, this.message.hasTypeMetadata(), false);
        }
        catch (SQLException e) {
            throw new ConnectException((Throwable)e);
        }
    }

    protected Object[] getNewColumnValues() {
        try {
            switch (this.getOperation()) {
                case CREATE: {
                    return this.columnValues(this.message.getNewTupleList(), this.tableId, true, this.message.hasTypeMetadata(), false);
                }
                case UPDATE: {
                    return this.columnValues(this.message.getNewTupleList(), this.tableId, true, this.message.hasTypeMetadata(), false);
                }
            }
            return null;
        }
        catch (SQLException e) {
            throw new ConnectException((Throwable)e);
        }
    }

    private DataCollectionSchema synchronizeTableSchema(DataCollectionSchema tableSchema) {
        boolean metadataInMessage = this.message.hasTypeMetadata();
        TableId tableId = (TableId)tableSchema.id();
        Table table = this.schema.tableFor(tableId);
        if (this.getOperation() == Envelope.Operation.DELETE || !this.message.shouldSchemaBeSynchronized()) {
            return tableSchema;
        }
        List<ReplicationMessage.Column> columns = this.message.getNewTupleList();
        if (this.schemaChanged(columns, table, metadataInMessage)) {
            this.refreshTableFromDatabase(tableId);
            if (metadataInMessage) {
                this.schema.refresh(this.tableFromFromMessage(columns, this.schema.tableFor(tableId)));
            }
        }
        return this.schema.schemaFor(tableId);
    }

    private Object[] columnValues(List<ReplicationMessage.Column> columns, TableId tableId, boolean refreshSchemaIfChanged, boolean metadataInMessage, boolean sourceOfToasted) throws SQLException {
        if (columns == null || columns.isEmpty()) {
            return null;
        }
        Table table = this.schema.tableFor(tableId);
        Objects.requireNonNull(table);
        List schemaColumns = table.columns();
        List columnsWithoutToasted = columns.stream().filter(Predicates.not(ReplicationMessage.Column::isToastedColumn)).collect(Collectors.toList());
        Object[] values = new Object[columnsWithoutToasted.size() < schemaColumns.size() ? schemaColumns.size() : columnsWithoutToasted.size()];
        HashSet<String> undeliveredToastableColumns = new HashSet<String>(this.schema.getToastableColumnsForTableId(table.id()));
        for (ReplicationMessage.Column column : columns) {
            Object candidate;
            String columnName = Strings.unquoteIdentifierPart((String)column.getName());
            undeliveredToastableColumns.remove(columnName);
            int position = this.getPosition(columnName, table, values);
            if (position == -1) continue;
            Object value = column.getValue(() -> (PgConnection)this.connection.connection(), this.connectorConfig.includeUnknownDatatypes());
            if (sourceOfToasted) {
                this.cachedOldToastedValues.put(columnName, value);
            } else if (value == UnchangedToastedReplicationMessageColumn.UNCHANGED_TOAST_VALUE && (candidate = this.cachedOldToastedValues.get(columnName)) != null) {
                value = candidate;
            }
            values[position] = value;
        }
        if (this.unchangedToastColumnMarkerMissing) {
            for (String columnName : undeliveredToastableColumns) {
                int position = this.getPosition(columnName, table, values);
                if (position == -1) continue;
                Object candidate = this.cachedOldToastedValues.get(columnName);
                values[position] = candidate != null ? candidate : UnchangedToastedReplicationMessageColumn.UNCHANGED_TOAST_VALUE;
            }
        }
        return values;
    }

    private int getPosition(String columnName, Table table, Object[] values) {
        Column tableColumn = table.columnWithName(columnName);
        if (tableColumn == null) {
            this.logger.warn("Internal schema is out-of-sync with incoming decoder events; column {} will be omitted from the change event.", (Object)columnName);
            return -1;
        }
        int position = tableColumn.position() - 1;
        if (position < 0 || position >= values.length) {
            this.logger.warn("Internal schema is out-of-sync with incoming decoder events; column {} will be omitted from the change event.", (Object)columnName);
            return -1;
        }
        return position;
    }

    private Optional<DataCollectionSchema> newTable(TableId tableId) {
        this.logger.debug("Schema for table '{}' is missing", (Object)tableId);
        this.refreshTableFromDatabase(tableId);
        TableSchema tableSchema = this.schema.schemaFor(tableId);
        if (tableSchema == null) {
            this.logger.warn("cannot load schema for table '{}'", (Object)tableId);
            return Optional.empty();
        }
        this.logger.debug("refreshed DB schema to include table '{}'", (Object)tableId);
        return Optional.of(tableSchema);
    }

    private void refreshTableFromDatabase(TableId tableId) {
        try {
            this.schema.refresh(this.connection, tableId, this.connectorConfig.skipRefreshSchemaOnMissingToastableData());
        }
        catch (SQLException e) {
            throw new ConnectException("Database error while refresing table schema");
        }
    }

    static Optional<DataCollectionSchema> updateSchema(TableId tableId, ChangeRecordEmitter changeRecordEmitter) {
        return ((PostgresChangeRecordEmitter)changeRecordEmitter).newTable(tableId);
    }

    private boolean schemaChanged(List<ReplicationMessage.Column> columns, Table table, boolean metadataInMessage) {
        boolean msgHasAdditionalColumns;
        int replicationColumnCount;
        boolean msgHasMissingColumns;
        int tableColumnCount = table.columns().size();
        boolean bl = msgHasMissingColumns = tableColumnCount > (replicationColumnCount = columns.size());
        if (msgHasMissingColumns && this.connectorConfig.skipRefreshSchemaOnMissingToastableData()) {
            msgHasMissingColumns = this.hasMissingUntoastedColumns(table, columns);
        }
        boolean bl2 = msgHasAdditionalColumns = tableColumnCount < replicationColumnCount;
        if (msgHasMissingColumns || msgHasAdditionalColumns) {
            this.logger.info("Different column count {} present in the server message as schema in memory contains {}; refreshing table schema", (Object)replicationColumnCount, (Object)tableColumnCount);
            return true;
        }
        return columns.stream().filter(message -> {
            int incomingType;
            String columnName = message.getName();
            Column column = table.columnWithName(columnName);
            if (column == null) {
                this.logger.info("found new column '{}' present in the server message which is not part of the table metadata; refreshing table schema", (Object)columnName);
                return true;
            }
            int localType = column.nativeType();
            if (localType != (incomingType = message.getType().getOid())) {
                this.logger.info("detected new type for column '{}', old type was {} ({}), new type is {} ({}); refreshing table schema", new Object[]{columnName, localType, column.typeName(), incomingType, message.getType().getName()});
                return true;
            }
            if (metadataInMessage) {
                boolean incomingOptional;
                int incomingScale;
                int incomingLength;
                int localLength = column.length();
                if (localLength != (incomingLength = message.getTypeMetadata().getLength())) {
                    this.logger.info("detected new length for column '{}', old length was {}, new length is {}; refreshing table schema", new Object[]{columnName, localLength, incomingLength});
                    return true;
                }
                int localScale = (Integer)column.scale().get();
                if (localScale != (incomingScale = message.getTypeMetadata().getScale())) {
                    this.logger.info("detected new scale for column '{}', old scale was {}, new scale is {}; refreshing table schema", new Object[]{columnName, localScale, incomingScale});
                    return true;
                }
                boolean localOptional = column.isOptional();
                if (localOptional != (incomingOptional = message.isOptional())) {
                    this.logger.info("detected new optional status for column '{}', old value was {}, new value is {}; refreshing table schema", new Object[]{columnName, localOptional, incomingOptional});
                    return true;
                }
            }
            return false;
        }).findFirst().isPresent();
    }

    private boolean hasMissingUntoastedColumns(Table table, List<ReplicationMessage.Column> columns) {
        List msgColumnNames = columns.stream().map(ReplicationMessage.Column::getName).collect(Collectors.toList());
        List missingColumnNames = table.columns().stream().filter(c -> !msgColumnNames.contains(c.name())).map(Column::name).collect(Collectors.toList());
        List<String> toastableColumns = this.schema.getToastableColumnsForTableId(table.id());
        if (this.logger.isDebugEnabled()) {
            this.logger.debug("msg columns: '{}' --- missing columns: '{}' --- toastableColumns: '{}", new Object[]{String.join((CharSequence)",", msgColumnNames), String.join((CharSequence)",", missingColumnNames), String.join((CharSequence)",", toastableColumns)});
        }
        return !toastableColumns.containsAll(missingColumnNames);
    }

    private Table tableFromFromMessage(List<ReplicationMessage.Column> columns, Table table) {
        TableEditor combinedTable = table.edit().setColumns((Iterable)columns.stream().map(column -> {
            PostgresType type = column.getType();
            ColumnEditor columnEditor = Column.editor().name(column.getName()).jdbcType(type.getJdbcId()).type(type.getName()).optional(column.isOptional()).nativeType(type.getOid());
            columnEditor.length(column.getTypeMetadata().getLength());
            columnEditor.scale(Integer.valueOf(column.getTypeMetadata().getScale()));
            return columnEditor.create();
        }).collect(Collectors.toList()));
        List pkCandidates = table.filterColumnNames(c -> table.isPrimaryKeyColumn(c.name()));
        Iterator itPkCandidates = pkCandidates.iterator();
        while (itPkCandidates.hasNext()) {
            String candidateName = (String)itPkCandidates.next();
            if (combinedTable.hasUniqueValues() || combinedTable.columnWithName(candidateName) != null) continue;
            this.logger.error("Potentional inconsistency in key for message {}", columns);
            itPkCandidates.remove();
        }
        combinedTable.setPrimaryKeyNames(pkCandidates);
        return combinedTable.create();
    }

    protected boolean skipEmptyMessages() {
        return true;
    }
}

