/*
 * Decompiled with CFR 0.152.
 */
package org.apache.paimon.flink.sink.cdc;

import org.apache.flink.api.common.functions.OpenContext;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;
import org.apache.paimon.catalog.Catalog;
import org.apache.paimon.catalog.CatalogLoader;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.flink.sink.cdc.CdcMultiplexRecord;
import org.apache.paimon.flink.sink.cdc.CdcRecord;
import org.apache.paimon.flink.sink.cdc.CdcSchema;
import org.apache.paimon.flink.sink.cdc.EventParser;
import org.apache.paimon.schema.Schema;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class CdcDynamicTableParsingProcessFunction<T>
extends ProcessFunction<T, Void> {
    private static final Logger LOG = LoggerFactory.getLogger(CdcDynamicTableParsingProcessFunction.class);
    public static final OutputTag<CdcMultiplexRecord> DYNAMIC_OUTPUT_TAG = new OutputTag("paimon-dynamic-table", TypeInformation.of(CdcMultiplexRecord.class));
    public static final OutputTag<Tuple2<Identifier, CdcSchema>> DYNAMIC_SCHEMA_CHANGE_OUTPUT_TAG = new OutputTag("paimon-dynamic-table-schema-change", TypeInformation.of((TypeHint)new TypeHint<Tuple2<Identifier, CdcSchema>>(){}));
    private final EventParser.Factory<T> parserFactory;
    private final String database;
    private final CatalogLoader catalogLoader;
    private transient EventParser<T> parser;
    private transient Catalog catalog;

    public CdcDynamicTableParsingProcessFunction(String database, CatalogLoader catalogLoader, EventParser.Factory<T> parserFactory) {
        this.database = database;
        this.catalogLoader = catalogLoader;
        this.parserFactory = parserFactory;
    }

    public void open(OpenContext openContext) throws Exception {
        this.open(new Configuration());
    }

    public void open(Configuration parameters) throws Exception {
        this.parser = this.parserFactory.create();
        this.catalog = this.catalogLoader.load();
    }

    public void processElement(T raw, ProcessFunction.Context context, Collector<Void> collector) throws Exception {
        this.parser.setRawEvent(raw);
        String tableName = this.parser.parseTableName();
        this.parser.parseNewTable().ifPresent(schema -> {
            Identifier identifier = new Identifier(this.database, tableName);
            try {
                this.catalog.createTable(identifier, (Schema)schema, true);
            }
            catch (Exception e) {
                LOG.error("Cannot create newly added Paimon table {}", (Object)identifier.getFullName(), (Object)e);
            }
        });
        CdcSchema schemaChange = this.parser.parseSchemaChange();
        if (schemaChange != null) {
            context.output(DYNAMIC_SCHEMA_CHANGE_OUTPUT_TAG, (Object)Tuple2.of((Object)Identifier.create(this.database, tableName), (Object)schemaChange));
        }
        this.parser.parseRecords().forEach(record -> context.output(DYNAMIC_OUTPUT_TAG, (Object)this.wrapRecord(this.database, tableName, (CdcRecord)record)));
    }

    private CdcMultiplexRecord wrapRecord(String databaseName, String tableName, CdcRecord record) {
        return CdcMultiplexRecord.fromCdcRecord(databaseName, tableName, record);
    }

    public void close() throws Exception {
        if (this.catalog != null) {
            this.catalog.close();
            this.catalog = null;
        }
    }
}

