/*
 * Decompiled with CFR 0.152.
 */
package org.apache.paimon.flink.action.cdc.kafka.formats;

import java.util.List;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions;
import org.apache.paimon.flink.action.cdc.ComputedColumn;
import org.apache.paimon.flink.action.cdc.TableNameConverter;
import org.apache.paimon.flink.action.cdc.kafka.formats.RecordParser;
import org.apache.paimon.flink.action.cdc.kafka.formats.RecordParserFactory;
import org.apache.paimon.flink.action.cdc.kafka.formats.canal.CanalRecordParser;
import org.apache.paimon.flink.action.cdc.kafka.formats.ogg.OggRecordParser;

public enum DataFormat {
    CANAL_JSON(CanalRecordParser::new),
    OGG_JSON(OggRecordParser::new);

    private final RecordParserFactory parser;

    private DataFormat(RecordParserFactory parser) {
        this.parser = parser;
    }

    public RecordParser createParser(boolean caseSensitive, TableNameConverter tableNameConverter, List<ComputedColumn> computedColumns) {
        return this.parser.createParser(caseSensitive, tableNameConverter, computedColumns);
    }

    public static DataFormat getDataFormat(Configuration kafkaConfig) {
        String formatStr = (String)kafkaConfig.get(KafkaConnectorOptions.VALUE_FORMAT);
        try {
            return DataFormat.valueOf(formatStr.replace("-", "_").toUpperCase());
        }
        catch (Exception e) {
            throw new UnsupportedOperationException(String.format("This format: %s is not supported.", formatStr));
        }
    }
}

