/*
 * Decompiled with CFR 0.152.
 */
package org.apache.paimon.flink.action.cdc.format;

import java.util.List;
import java.util.Map;
import java.util.Optional;
import javax.annotation.Nullable;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.util.Collector;
import org.apache.paimon.flink.action.cdc.CdcSourceRecord;
import org.apache.paimon.flink.action.cdc.ComputedColumn;
import org.apache.paimon.flink.action.cdc.TypeMapping;
import org.apache.paimon.flink.sink.cdc.CdcRecord;
import org.apache.paimon.flink.sink.cdc.CdcSchema;
import org.apache.paimon.flink.sink.cdc.RichCdcMultiplexRecord;
import org.apache.paimon.schema.Schema;
import org.apache.paimon.types.RowKind;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class AbstractRecordParser
implements FlatMapFunction<CdcSourceRecord, RichCdcMultiplexRecord> {
    private static final Logger LOG = LoggerFactory.getLogger(AbstractRecordParser.class);
    protected static final String FIELD_TABLE = "table";
    protected static final String FIELD_DATABASE = "database";
    protected final TypeMapping typeMapping;
    protected final List<ComputedColumn> computedColumns;

    public AbstractRecordParser(TypeMapping typeMapping, List<ComputedColumn> computedColumns) {
        this.typeMapping = typeMapping;
        this.computedColumns = computedColumns;
    }

    @Nullable
    public Schema buildSchema(CdcSourceRecord record) {
        try {
            this.setRoot(record);
            if (this.isDDL()) {
                return null;
            }
            Optional recordOpt = this.extractRecords().stream().findFirst();
            return recordOpt.map(RichCdcMultiplexRecord::buildSchema).orElse(null);
        }
        catch (Exception e) {
            this.logInvalidSourceRecord(record);
            throw e;
        }
    }

    public void flatMap(CdcSourceRecord value, Collector<RichCdcMultiplexRecord> out) {
        try {
            this.setRoot(value);
            this.extractRecords().forEach(arg_0 -> out.collect(arg_0));
        }
        catch (Exception e) {
            this.logInvalidSourceRecord(value);
            throw e;
        }
    }

    protected abstract void setRoot(CdcSourceRecord var1);

    protected abstract List<RichCdcMultiplexRecord> extractRecords();

    protected boolean isDDL() {
        return false;
    }

    protected abstract List<String> extractPrimaryKeys();

    protected void evalComputedColumns(Map<String, String> rowData, CdcSchema.Builder schemaBuilder) {
        this.computedColumns.forEach(computedColumn -> {
            rowData.put(computedColumn.columnName(), computedColumn.eval((String)rowData.get(computedColumn.fieldReference())));
            schemaBuilder.column(computedColumn.columnName(), computedColumn.columnType());
        });
    }

    protected RichCdcMultiplexRecord createRecord(RowKind rowKind, Map<String, String> data, CdcSchema.Builder schemaBuilder) {
        schemaBuilder.primaryKey(this.extractPrimaryKeys());
        return new RichCdcMultiplexRecord(this.getDatabaseName(), this.getTableName(), schemaBuilder.build(), new CdcRecord(rowKind, data));
    }

    @Nullable
    protected abstract String getTableName();

    @Nullable
    protected abstract String getDatabaseName();

    private void logInvalidSourceRecord(CdcSourceRecord record) {
        LOG.error("Invalid source record:\n{}", (Object)record.toString());
    }

    protected abstract String format();
}

