/*
 * Decompiled with CFR 0.152.
 */
package com.alibaba.ververica.connectors.common.table;

import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.flink.table.factories.CannotMigrateException;

public class TableUpgradeUtils {
    private TableUpgradeUtils() {
    }

    public static Map<String, String> checkAndMigrateOptions(Set<String> notAllowedChangedOptions, Map<String, String> snapshotOptions, Map<String, String> newOptions) throws CannotMigrateException {
        HashSet<String> addedOptions = new HashSet<String>(newOptions.keySet());
        addedOptions.removeAll(snapshotOptions.keySet());
        HashMap<String, String> originalOptions = new HashMap<String, String>(snapshotOptions);
        Set removedOptions = originalOptions.keySet().stream().filter(option -> !newOptions.containsKey(option)).collect(Collectors.toSet());
        for (String key : removedOptions) {
            if (notAllowedChangedOptions.contains(key)) {
                throw new CannotMigrateException(String.format("Can not migrate connector because the option %s is removed.", key));
            }
            originalOptions.remove(key);
        }
        for (String key : addedOptions) {
            if (!notAllowedChangedOptions.contains(key)) continue;
            throw new CannotMigrateException(String.format("Can not migrate connector because the option %s is added.", key));
        }
        for (String key : snapshotOptions.keySet()) {
            if (!notAllowedChangedOptions.contains(key) || snapshotOptions.get(key).equals(newOptions.get(key))) continue;
            throw new CannotMigrateException(String.format("Can not migrate connector because the option %s is changed from %s to %s.", key, snapshotOptions.get(key), newOptions.get(key)));
        }
        originalOptions.putAll(newOptions);
        return originalOptions;
    }
}

