/*
 * Decompiled with CFR 0.152.
 */
package org.apache.paimon.flink.action.cdc;

import java.util.Map;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.paimon.flink.action.cdc.CdcActionCommonUtils;
import org.apache.paimon.flink.action.cdc.MessageQueueSchemaUtils;
import org.apache.paimon.flink.action.cdc.SyncTableActionBase;
import org.apache.paimon.flink.action.cdc.format.DataFormat;
import org.apache.paimon.flink.sink.cdc.RichCdcMultiplexRecord;
import org.apache.paimon.schema.Schema;

public abstract class MessageQueueSyncTableActionBase
extends SyncTableActionBase {
    public MessageQueueSyncTableActionBase(String warehouse, String database, String table, Map<String, String> catalogConfig, Map<String, String> mqConfig) {
        super(warehouse, database, table, catalogConfig, mqConfig);
    }

    @Override
    protected Schema retrieveSchema() throws Exception {
        String topic = this.topic();
        try (MessageQueueSchemaUtils.ConsumerWrapper consumer = this.consumer(topic);){
            Schema schema = MessageQueueSchemaUtils.getSchema(consumer, topic, this.getDataFormat(), this.typeMapping);
            return schema;
        }
    }

    @Override
    protected Schema buildPaimonSchema(Schema retrievedSchema) {
        return CdcActionCommonUtils.buildPaimonSchema(this.partitionKeys, this.primaryKeys, this.computedColumns, this.tableConfig, retrievedSchema, this.metadataConverters, false);
    }

    @Override
    protected FlatMapFunction<String, RichCdcMultiplexRecord> recordParse() {
        boolean caseSensitive = this.catalog.caseSensitive();
        DataFormat format = this.getDataFormat();
        return format.createParser(caseSensitive, this.typeMapping, this.computedColumns);
    }

    protected abstract String topic();

    protected abstract MessageQueueSchemaUtils.ConsumerWrapper consumer(String var1);

    protected abstract DataFormat getDataFormat();
}

