/*
 * Decompiled with CFR 0.152.
 */
package org.apache.paimon.flink.sink.cdc;

import java.util.HashMap;
import java.util.Map;
import org.apache.flink.api.common.functions.OpenContext;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;
import org.apache.paimon.flink.sink.cdc.CdcRecord;
import org.apache.paimon.flink.sink.cdc.CdcSchema;
import org.apache.paimon.flink.sink.cdc.EventParser;

public class CdcMultiTableParsingProcessFunction<T>
extends ProcessFunction<T, Void> {
    private final EventParser.Factory<T> parserFactory;
    private transient EventParser<T> parser;
    private transient Map<String, OutputTag<CdcSchema>> schemaChangeOutputTags;
    private transient Map<String, OutputTag<CdcRecord>> recordOutputTags;

    public CdcMultiTableParsingProcessFunction(EventParser.Factory<T> parserFactory) {
        this.parserFactory = parserFactory;
    }

    public void open(OpenContext openContext) throws Exception {
        this.open(new Configuration());
    }

    public void open(Configuration parameters) throws Exception {
        this.parser = this.parserFactory.create();
        this.schemaChangeOutputTags = new HashMap<String, OutputTag<CdcSchema>>();
        this.recordOutputTags = new HashMap<String, OutputTag<CdcRecord>>();
    }

    public void processElement(T raw, ProcessFunction.Context context, Collector<Void> collector) throws Exception {
        this.parser.setRawEvent(raw);
        String tableName = this.parser.parseTableName();
        CdcSchema schemaChange = this.parser.parseSchemaChange();
        if (schemaChange != null) {
            context.output(this.getUpdatedDataFieldsOutputTag(tableName), (Object)schemaChange);
        }
        this.parser.parseRecords().forEach(record -> context.output(this.getRecordOutputTag(tableName), record));
    }

    private OutputTag<CdcSchema> getUpdatedDataFieldsOutputTag(String tableName) {
        return this.schemaChangeOutputTags.computeIfAbsent(tableName, CdcMultiTableParsingProcessFunction::createSchameChangeOutputTag);
    }

    public static OutputTag<CdcSchema> createSchameChangeOutputTag(String tableName) {
        return new OutputTag("table-schema-change-" + tableName, TypeInformation.of(CdcSchema.class));
    }

    private OutputTag<CdcRecord> getRecordOutputTag(String tableName) {
        return this.recordOutputTags.computeIfAbsent(tableName, CdcMultiTableParsingProcessFunction::createRecordOutputTag);
    }

    public static OutputTag<CdcRecord> createRecordOutputTag(String tableName) {
        return new OutputTag("record-" + tableName, TypeInformation.of(CdcRecord.class));
    }
}

