/*
 * Decompiled with CFR 0.152.
 */
package org.apache.seatunnel.connectors.seatunnel.cdc.tidb.source.deserializer;

import org.apache.seatunnel.api.source.Collector;
import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.type.RowKind;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.connectors.seatunnel.cdc.tidb.source.converter.DataConverter;
import org.apache.seatunnel.connectors.seatunnel.cdc.tidb.source.converter.DefaultDataConverter;
import org.apache.seatunnel.connectors.seatunnel.cdc.tidb.source.deserializer.AbstractSeaTunnelRowDeserializer;
import org.tikv.common.codec.TableCodec;
import org.tikv.common.key.RowKey;
import org.tikv.common.meta.TiTableInfo;
import org.tikv.kvproto.Cdcpb;

public class SeaTunnelRowStreamingRecordDeserializer
extends AbstractSeaTunnelRowDeserializer<Cdcpb.Event.Row> {
    private final DataConverter converter = new DefaultDataConverter();

    public SeaTunnelRowStreamingRecordDeserializer(TiTableInfo tableInfo, CatalogTable catalogTable) {
        super(tableInfo, catalogTable);
    }

    @Override
    public void deserialize(Cdcpb.Event.Row row, Collector<SeaTunnelRow> output) throws Exception {
        RowKey rowKey = RowKey.decode(row.getKey().toByteArray());
        long handle = rowKey.getHandle();
        switch (row.getOpType()) {
            case DELETE: {
                Object[] values = TableCodec.decodeObjects(row.getOldValue().toByteArray(), handle, this.tableInfo);
                SeaTunnelRow record = this.converter.convert(values, this.tableInfo, this.rowType);
                record.setRowKind(RowKind.DELETE);
                output.collect((Object)record);
                break;
            }
            case PUT: {
                try {
                    Object[] values = TableCodec.decodeObjects(row.getValue().toByteArray(), RowKey.decode(row.getKey().toByteArray()).getHandle(), this.tableInfo);
                    if (row.getOldValue() == null || row.getOldValue().isEmpty()) {
                        SeaTunnelRow insert = this.converter.convert(values, this.tableInfo, this.rowType);
                        insert.setRowKind(RowKind.INSERT);
                        output.collect((Object)insert);
                        break;
                    }
                    SeaTunnelRow update = this.converter.convert(values, this.tableInfo, this.rowType);
                    update.setRowKind(RowKind.UPDATE_AFTER);
                    output.collect((Object)update);
                    break;
                }
                catch (RuntimeException e) {
                    throw new RuntimeException(String.format("Fail to deserialize row: %s, table: %s", row, this.tableInfo.getId()), e);
                }
            }
            default: {
                throw new IllegalArgumentException("Unknown Row Op Type: " + row.getOpType());
            }
        }
    }
}

