/*
 * Decompiled with CFR 0.152.
 */
package org.apache.paimon.flink.sink.cdc;

import org.apache.flink.api.common.functions.OpenContext;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;
import org.apache.paimon.flink.sink.cdc.CdcRecord;
import org.apache.paimon.flink.sink.cdc.CdcSchema;
import org.apache.paimon.flink.sink.cdc.EventParser;

public class CdcParsingProcessFunction<T>
extends ProcessFunction<T, CdcRecord> {
    public static final OutputTag<CdcSchema> SCHEMA_CHANGE_OUTPUT_TAG = new OutputTag("table-schema-change", TypeInformation.of(CdcSchema.class));
    private final EventParser.Factory<T> parserFactory;
    private transient EventParser<T> parser;

    public CdcParsingProcessFunction(EventParser.Factory<T> parserFactory) {
        this.parserFactory = parserFactory;
    }

    public void open(OpenContext openContext) throws Exception {
        this.open(new Configuration());
    }

    public void open(Configuration parameters) throws Exception {
        this.parser = this.parserFactory.create();
    }

    public void processElement(T raw, ProcessFunction.Context context, Collector<CdcRecord> collector) throws Exception {
        this.parser.setRawEvent(raw);
        CdcSchema schemaChange = this.parser.parseSchemaChange();
        if (schemaChange != null) {
            context.output(SCHEMA_CHANGE_OUTPUT_TAG, (Object)schemaChange);
        }
        this.parser.parseRecords().forEach(arg_0 -> collector.collect(arg_0));
    }
}

