/*
 * Decompiled with CFR 0.152.
 */
package org.apache.seatunnel.connectors.cdc.debezium.row;

import com.google.common.base.Preconditions;
import io.debezium.data.Envelope;
import java.time.ZoneId;
import java.util.HashMap;
import java.util.Map;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;
import org.apache.seatunnel.api.source.Collector;
import org.apache.seatunnel.api.table.catalog.TablePath;
import org.apache.seatunnel.api.table.event.SchemaChangeEvent;
import org.apache.seatunnel.api.table.event.handler.DataTypeChangeEventDispatcher;
import org.apache.seatunnel.api.table.event.handler.DataTypeChangeEventHandler;
import org.apache.seatunnel.api.table.type.MultipleRowType;
import org.apache.seatunnel.api.table.type.RowKind;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.api.table.type.SqlType;
import org.apache.seatunnel.connectors.cdc.base.schema.SchemaChangeResolver;
import org.apache.seatunnel.connectors.cdc.base.source.split.wartermark.WatermarkEvent;
import org.apache.seatunnel.connectors.cdc.base.utils.SourceRecordUtils;
import org.apache.seatunnel.connectors.cdc.debezium.DebeziumDeserializationConverterFactory;
import org.apache.seatunnel.connectors.cdc.debezium.DebeziumDeserializationSchema;
import org.apache.seatunnel.connectors.cdc.debezium.MetadataConverter;
import org.apache.seatunnel.connectors.cdc.debezium.row.SeaTunnelRowDebeziumDeserializationConverters;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public final class SeaTunnelRowDebeziumDeserializeSchema
implements DebeziumDeserializationSchema<SeaTunnelRow> {
    private static final Logger log = LoggerFactory.getLogger(SeaTunnelRowDebeziumDeserializeSchema.class);
    private static final long serialVersionUID = 1L;
    private static final String DEFAULT_TABLE_NAME_KEY = null;
    private final MetadataConverter[] metadataConverters;
    private final ZoneId serverTimeZone;
    private final DebeziumDeserializationConverterFactory userDefinedConverterFactory;
    private final SchemaChangeResolver schemaChangeResolver;
    private final DataTypeChangeEventHandler dataTypeChangeEventHandler;
    private SeaTunnelDataType<SeaTunnelRow> resultTypeInfo;
    private Map<String, SeaTunnelRowDebeziumDeserializationConverters> tableRowConverters;

    SeaTunnelRowDebeziumDeserializeSchema(SeaTunnelDataType<SeaTunnelRow> physicalDataType, MetadataConverter[] metadataConverters, SeaTunnelDataType<SeaTunnelRow> resultType, ZoneId serverTimeZone, DebeziumDeserializationConverterFactory userDefinedConverterFactory, SchemaChangeResolver schemaChangeResolver) {
        this.metadataConverters = metadataConverters;
        this.serverTimeZone = serverTimeZone;
        this.userDefinedConverterFactory = userDefinedConverterFactory;
        this.resultTypeInfo = Preconditions.checkNotNull(resultType);
        this.schemaChangeResolver = schemaChangeResolver;
        this.dataTypeChangeEventHandler = new DataTypeChangeEventDispatcher();
        this.tableRowConverters = SeaTunnelRowDebeziumDeserializeSchema.createTableRowConverters(resultType, metadataConverters, serverTimeZone, userDefinedConverterFactory);
    }

    @Override
    public void deserialize(SourceRecord record, Collector<SeaTunnelRow> collector) throws Exception {
        if (WatermarkEvent.isSchemaChangeBeforeWatermarkEvent(record)) {
            collector.markSchemaChangeBeforeCheckpoint();
            return;
        }
        if (WatermarkEvent.isSchemaChangeAfterWatermarkEvent(record)) {
            collector.markSchemaChangeAfterCheckpoint();
            return;
        }
        if (SourceRecordUtils.isSchemaChangeEvent(record)) {
            this.deserializeSchemaChangeRecord(record, collector);
            return;
        }
        if (SourceRecordUtils.isDataChangeRecord(record)) {
            this.deserializeDataChangeRecord(record, collector);
            return;
        }
        log.debug("Unsupported record {}, just skip.", (Object)record);
    }

    private void deserializeSchemaChangeRecord(SourceRecord record, Collector<SeaTunnelRow> collector) {
        SchemaChangeEvent schemaChangeEvent = this.schemaChangeResolver.resolve(record, this.resultTypeInfo);
        if (schemaChangeEvent == null) {
            log.warn("Unsupported resolve schemaChangeEvent {}, just skip.", (Object)record);
            return;
        }
        if (this.resultTypeInfo instanceof MultipleRowType) {
            HashMap newRowTypeMap = new HashMap();
            for (Map.Entry entry : (MultipleRowType)this.resultTypeInfo) {
                if (!((String)entry.getKey()).equals(schemaChangeEvent.tablePath().toString())) {
                    newRowTypeMap.put(entry.getKey(), entry.getValue());
                    continue;
                }
                log.debug("Table[{}] datatype change before: {}", entry.getKey(), entry.getValue());
                SeaTunnelRowType newRowType = this.dataTypeChangeEventHandler.reset((SeaTunnelRowType)entry.getValue()).apply(schemaChangeEvent);
                newRowTypeMap.put(entry.getKey(), newRowType);
                log.debug("Table[{}] datatype change after: {}", entry.getKey(), (Object)newRowType);
            }
            this.resultTypeInfo = new MultipleRowType(newRowTypeMap);
        } else {
            log.debug("Table datatype change before: {}", this.resultTypeInfo);
            this.resultTypeInfo = this.dataTypeChangeEventHandler.reset((SeaTunnelRowType)this.resultTypeInfo).apply(schemaChangeEvent);
            log.debug("table datatype change after: {}", this.resultTypeInfo);
        }
        this.tableRowConverters = SeaTunnelRowDebeziumDeserializeSchema.createTableRowConverters(this.resultTypeInfo, this.metadataConverters, this.serverTimeZone, this.userDefinedConverterFactory);
        collector.collect(schemaChangeEvent);
    }

    private void deserializeDataChangeRecord(SourceRecord record, Collector<SeaTunnelRow> collector) throws Exception {
        SeaTunnelRowDebeziumDeserializationConverters converters;
        Envelope.Operation operation = Envelope.operationFor(record);
        Struct messageStruct = (Struct)record.value();
        Schema valueSchema = record.valueSchema();
        TablePath tablePath = SourceRecordUtils.getTablePath(record);
        String tableId = tablePath.toString();
        if (this.resultTypeInfo instanceof MultipleRowType) {
            converters = this.tableRowConverters.get(tableId);
            if (converters == null) {
                log.debug("Ignore newly added table {}", (Object)tableId);
                return;
            }
        } else {
            converters = this.tableRowConverters.get(DEFAULT_TABLE_NAME_KEY);
        }
        if (operation == Envelope.Operation.CREATE || operation == Envelope.Operation.READ) {
            SeaTunnelRow insert = this.extractAfterRow(converters, record, messageStruct, valueSchema);
            insert.setRowKind(RowKind.INSERT);
            insert.setTableId(tableId);
            collector.collect((Object)insert);
        } else if (operation == Envelope.Operation.DELETE) {
            SeaTunnelRow delete = this.extractBeforeRow(converters, record, messageStruct, valueSchema);
            delete.setRowKind(RowKind.DELETE);
            delete.setTableId(tableId);
            collector.collect((Object)delete);
        } else if (operation == Envelope.Operation.UPDATE) {
            SeaTunnelRow before = this.extractBeforeRow(converters, record, messageStruct, valueSchema);
            before.setRowKind(RowKind.UPDATE_BEFORE);
            before.setTableId(tableId);
            collector.collect((Object)before);
            SeaTunnelRow after = this.extractAfterRow(converters, record, messageStruct, valueSchema);
            after.setRowKind(RowKind.UPDATE_AFTER);
            after.setTableId(tableId);
            collector.collect((Object)after);
        } else {
            log.warn("Received {} operation, skip", (Object)operation);
        }
    }

    private SeaTunnelRow extractAfterRow(SeaTunnelRowDebeziumDeserializationConverters runtimeConverter, SourceRecord record, Struct value, Schema valueSchema) throws Exception {
        Schema afterSchema = valueSchema.field("after").schema();
        Struct after = value.getStruct("after");
        return runtimeConverter.convert(record, after, afterSchema);
    }

    private SeaTunnelRow extractBeforeRow(SeaTunnelRowDebeziumDeserializationConverters runtimeConverter, SourceRecord record, Struct value, Schema valueSchema) throws Exception {
        Schema beforeSchema = valueSchema.field("before").schema();
        Struct before = value.getStruct("before");
        return runtimeConverter.convert(record, before, beforeSchema);
    }

    @Override
    public SeaTunnelDataType<SeaTunnelRow> getProducedType() {
        return this.resultTypeInfo;
    }

    @Override
    public SchemaChangeResolver getSchemaChangeResolver() {
        return this.schemaChangeResolver;
    }

    @Override
    public void restoreCheckpointProducedType(SeaTunnelDataType<SeaTunnelRow> checkpointDataType) {
        if (this.schemaChangeResolver == null) {
            return;
        }
        if (SqlType.ROW.equals((Object)checkpointDataType.getSqlType()) && SqlType.MULTIPLE_ROW.equals((Object)this.resultTypeInfo.getSqlType())) {
            log.warn("Skip incompatible restore type. produced type: {}, checkpoint type: {}", this.resultTypeInfo, checkpointDataType);
            return;
        }
        if (checkpointDataType instanceof MultipleRowType) {
            MultipleRowType latestDataType = (MultipleRowType)this.resultTypeInfo;
            HashMap newRowTypeMap = new HashMap();
            for (Map.Entry entry : latestDataType) {
                newRowTypeMap.put(entry.getKey(), entry.getValue());
            }
            for (Map.Entry entry : (MultipleRowType)checkpointDataType) {
                SeaTunnelRowType oldDataType = latestDataType.getRowType((String)entry.getKey());
                if (oldDataType == null) {
                    log.info("Ignore restore table[{}] datatype has been deleted.", entry.getKey());
                    continue;
                }
                log.info("Table[{}] datatype restore before: {}", entry.getKey(), (Object)oldDataType);
                newRowTypeMap.put(entry.getKey(), entry.getValue());
                log.info("Table[{}] datatype restore after: {}", entry.getKey(), entry.getValue());
            }
            this.resultTypeInfo = new MultipleRowType(newRowTypeMap);
        } else {
            log.info("Table datatype restore before: {}", this.resultTypeInfo);
            this.resultTypeInfo = checkpointDataType;
            log.info("Table datatype restore after: {}", checkpointDataType);
        }
        this.tableRowConverters = SeaTunnelRowDebeziumDeserializeSchema.createTableRowConverters(this.resultTypeInfo, this.metadataConverters, this.serverTimeZone, this.userDefinedConverterFactory);
    }

    private static Map<String, SeaTunnelRowDebeziumDeserializationConverters> createTableRowConverters(SeaTunnelDataType<SeaTunnelRow> inputDataType, MetadataConverter[] metadataConverters, ZoneId serverTimeZone, DebeziumDeserializationConverterFactory userDefinedConverterFactory) {
        HashMap<String, SeaTunnelRowDebeziumDeserializationConverters> tableRowConverters = new HashMap<String, SeaTunnelRowDebeziumDeserializationConverters>();
        if (inputDataType instanceof MultipleRowType) {
            for (Map.Entry item : (MultipleRowType)inputDataType) {
                SeaTunnelRowDebeziumDeserializationConverters itemRowConverter = new SeaTunnelRowDebeziumDeserializationConverters((SeaTunnelRowType)item.getValue(), metadataConverters, serverTimeZone, userDefinedConverterFactory);
                tableRowConverters.put((String)item.getKey(), itemRowConverter);
            }
            return tableRowConverters;
        }
        SeaTunnelRowDebeziumDeserializationConverters tableRowConverter = new SeaTunnelRowDebeziumDeserializationConverters((SeaTunnelRowType)inputDataType, metadataConverters, serverTimeZone, userDefinedConverterFactory);
        tableRowConverters.put(DEFAULT_TABLE_NAME_KEY, tableRowConverter);
        return tableRowConverters;
    }

    public static Builder builder() {
        return new Builder();
    }

    public static class Builder {
        private SeaTunnelDataType<SeaTunnelRow> physicalRowType;
        private SeaTunnelDataType<SeaTunnelRow> resultTypeInfo;
        private MetadataConverter[] metadataConverters = new MetadataConverter[0];
        private ZoneId serverTimeZone = ZoneId.systemDefault();
        private DebeziumDeserializationConverterFactory userDefinedConverterFactory = DebeziumDeserializationConverterFactory.DEFAULT;
        private SchemaChangeResolver schemaChangeResolver;

        public SeaTunnelRowDebeziumDeserializeSchema build() {
            return new SeaTunnelRowDebeziumDeserializeSchema(this.physicalRowType, this.metadataConverters, this.resultTypeInfo, this.serverTimeZone, this.userDefinedConverterFactory, this.schemaChangeResolver);
        }

        public Builder setPhysicalRowType(SeaTunnelDataType<SeaTunnelRow> physicalRowType) {
            this.physicalRowType = physicalRowType;
            return this;
        }

        public Builder setResultTypeInfo(SeaTunnelDataType<SeaTunnelRow> resultTypeInfo) {
            this.resultTypeInfo = resultTypeInfo;
            return this;
        }

        public Builder setMetadataConverters(MetadataConverter[] metadataConverters) {
            this.metadataConverters = metadataConverters;
            return this;
        }

        public Builder setServerTimeZone(ZoneId serverTimeZone) {
            this.serverTimeZone = serverTimeZone;
            return this;
        }

        public Builder setUserDefinedConverterFactory(DebeziumDeserializationConverterFactory userDefinedConverterFactory) {
            this.userDefinedConverterFactory = userDefinedConverterFactory;
            return this;
        }

        public Builder setSchemaChangeResolver(SchemaChangeResolver schemaChangeResolver) {
            this.schemaChangeResolver = schemaChangeResolver;
            return this;
        }

        private Builder() {
        }
    }
}

