/*
 * Decompiled with CFR 0.152.
 */
package org.apache.paimon.flink.sink.cdc;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;
import org.apache.paimon.catalog.Catalog;
import org.apache.paimon.catalog.CatalogLoader;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.flink.sink.cdc.UpdatedDataFieldsProcessFunctionBase;
import org.apache.paimon.schema.SchemaChange;
import org.apache.paimon.schema.SchemaManager;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.types.DataField;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class MultiTableUpdatedDataFieldsProcessFunction
extends UpdatedDataFieldsProcessFunctionBase<Tuple2<Identifier, List<DataField>>, Void> {
    private static final Logger LOG = LoggerFactory.getLogger(MultiTableUpdatedDataFieldsProcessFunction.class);
    private final Map<Identifier, SchemaManager> schemaManagers = new HashMap<Identifier, SchemaManager>();

    public MultiTableUpdatedDataFieldsProcessFunction(CatalogLoader catalogLoader) {
        super(catalogLoader);
    }

    public void processElement(Tuple2<Identifier, List<DataField>> updatedDataFields, ProcessFunction.Context context, Collector<Void> collector) throws Exception {
        Identifier tableId = (Identifier)updatedDataFields.f0;
        SchemaManager schemaManager = this.schemaManagers.computeIfAbsent(tableId, id -> {
            FileStoreTable table;
            try {
                table = (FileStoreTable)this.catalog.getTable(tableId);
            }
            catch (Catalog.TableNotExistException e) {
                return null;
            }
            return new SchemaManager(table.fileIO(), table.location());
        });
        if (Objects.isNull(schemaManager)) {
            LOG.error("Failed to get schema manager for table " + tableId);
        } else {
            for (SchemaChange schemaChange : this.extractSchemaChanges(schemaManager, (List)updatedDataFields.f1)) {
                this.applySchemaChange(schemaManager, schemaChange, tableId);
            }
        }
    }
}

