/*
 * Decompiled with CFR 0.152.
 */
package org.apache.paimon.flink.sink.cdc;

import java.util.HashSet;
import java.util.List;
import java.util.Objects;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.commons.collections.CollectionUtils;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;
import org.apache.paimon.catalog.CatalogLoader;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.flink.sink.cdc.UpdatedDataFieldsProcessFunctionBase;
import org.apache.paimon.schema.SchemaChange;
import org.apache.paimon.schema.SchemaManager;
import org.apache.paimon.types.DataField;
import org.apache.paimon.types.FieldIdentifier;
import org.apache.paimon.types.RowType;

public class UpdatedDataFieldsProcessFunction
extends UpdatedDataFieldsProcessFunctionBase<List<DataField>, Void> {
    private final SchemaManager schemaManager;
    private final Identifier identifier;
    private Set<FieldIdentifier> latestFields;

    public UpdatedDataFieldsProcessFunction(SchemaManager schemaManager, Identifier identifier, CatalogLoader catalogLoader) {
        super(catalogLoader);
        this.schemaManager = schemaManager;
        this.identifier = identifier;
        this.latestFields = new HashSet<FieldIdentifier>();
    }

    public void processElement(List<DataField> updatedDataFields, ProcessFunction.Context context, Collector<Void> collector) throws Exception {
        List<DataField> actualUpdatedDataFields = updatedDataFields.stream().filter(dataField -> !this.latestDataFieldContain(new FieldIdentifier((DataField)dataField))).collect(Collectors.toList());
        if (CollectionUtils.isEmpty(actualUpdatedDataFields)) {
            return;
        }
        for (SchemaChange schemaChange : this.extractSchemaChanges(this.schemaManager, actualUpdatedDataFields)) {
            this.applySchemaChange(this.schemaManager, schemaChange, this.identifier);
        }
        this.updateLatestFields();
    }

    private boolean latestDataFieldContain(FieldIdentifier dataField) {
        return this.latestFields.stream().anyMatch(previous -> Objects.equals(previous, dataField));
    }

    private void updateLatestFields() {
        RowType oldRowType = this.schemaManager.latest().get().logicalRowType();
        Set fieldIdentifiers = oldRowType.getFields().stream().map(item -> new FieldIdentifier((DataField)item)).collect(Collectors.toSet());
        this.latestFields = fieldIdentifiers;
    }
}

