/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.util.upgrade;

import com.alibaba.ververica.connectors.common.table.TableUpgradeUtils;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions;
import org.apache.flink.table.connector.OptionSnapshot;
import org.apache.flink.table.factories.CannotMigrateException;
import org.apache.flink.table.factories.CannotSnapshotException;
import org.apache.flink.table.factories.DeserializationFormatFactory;
import org.apache.flink.table.factories.DynamicTableFactory;
import org.apache.flink.table.factories.FactoryUtil;
import org.apache.flink.table.factories.OptionUpgradableTableFactory;
import org.apache.flink.table.factories.SerializationFormatFactory;

public abstract class KafkaOptionUpgradableHelper {
    private static final int CURRENT_SNAPSHOT_VERSION = 1;

    public static OptionSnapshot snapshotOptions(OptionUpgradableTableFactory.UpgradeContext context, OptionUpgradableTableFactory factory) throws CannotSnapshotException {
        FactoryUtil.OptionMigrateHelper helper = FactoryUtil.createOptionMigrateHelper((DynamicTableFactory)factory, (OptionUpgradableTableFactory.UpgradeContext)context);
        Map snapshot = helper.removeFormatOptions(context.getCatalogTable().getOptions(), new ConfigOption[]{FactoryUtil.FORMAT, KafkaConnectorOptions.KEY_FORMAT, KafkaConnectorOptions.VALUE_FORMAT});
        snapshot.put(FactoryUtil.PROPERTY_VERSION.key(), String.valueOf(1));
        snapshot.remove("properties.sasl.jaas.config");
        if (context.getTableType() == OptionUpgradableTableFactory.TableType.SOURCE) {
            snapshot.putAll(KafkaOptionUpgradableHelper.snapshotDecodingFormat(helper));
        } else {
            snapshot.putAll(KafkaOptionUpgradableHelper.snapshotEncodingFormat(helper));
        }
        return new OptionSnapshot(snapshot);
    }

    public static Map<String, String> migrateOptions(OptionUpgradableTableFactory.UpgradeContext context, OptionSnapshot snapshot, OptionUpgradableTableFactory factory) throws CannotMigrateException {
        FactoryUtil.OptionMigrateHelper helper = FactoryUtil.createOptionMigrateHelper((DynamicTableFactory)factory, (OptionUpgradableTableFactory.UpgradeContext)context);
        Map<String, String> mergedConfig = snapshot.getOptions();
        if (!String.valueOf(1).equals(mergedConfig.get(FactoryUtil.PROPERTY_VERSION.key()))) {
            throw new CannotMigrateException(String.format("Can not migrate connector because its snapshot version is %s and current version is %s.", mergedConfig.get(FactoryUtil.PROPERTY_VERSION.key()), 1));
        }
        mergedConfig.remove(FactoryUtil.PROPERTY_VERSION.key());
        Map newOptions = helper.getEnrichmentOptions().toMap();
        mergedConfig = helper.removeFormatOptions((Map)mergedConfig, new ConfigOption[]{FactoryUtil.FORMAT, KafkaConnectorOptions.KEY_FORMAT, KafkaConnectorOptions.VALUE_FORMAT});
        mergedConfig = TableUpgradeUtils.checkAndMigrateOptions(KafkaOptionUpgradableHelper.notAllowedChangedOptions(), mergedConfig, newOptions);
        if (context.getTableType() == OptionUpgradableTableFactory.TableType.SOURCE) {
            mergedConfig.putAll(KafkaOptionUpgradableHelper.migrateDecodingFormat(helper, snapshot));
        } else {
            mergedConfig.putAll(KafkaOptionUpgradableHelper.migrateEncodingFormat(helper, snapshot));
        }
        return mergedConfig;
    }

    private static Map<String, String> snapshotDecodingFormat(FactoryUtil.OptionMigrateHelper helper) throws CannotSnapshotException {
        ReadableConfig options = helper.getOptions();
        HashMap<String, String> snapshot = new HashMap<String, String>();
        if (options.getOptional(KafkaConnectorOptions.KEY_FORMAT).isPresent()) {
            snapshot.putAll(helper.snapshotFormat(DeserializationFormatFactory.class, KafkaConnectorOptions.KEY_FORMAT));
        }
        snapshot.putAll(helper.snapshotFormat(DeserializationFormatFactory.class, options.getOptional(FactoryUtil.FORMAT).isPresent() ? FactoryUtil.FORMAT : KafkaConnectorOptions.VALUE_FORMAT));
        return snapshot;
    }

    private static Map<String, String> snapshotEncodingFormat(FactoryUtil.OptionMigrateHelper helper) throws CannotSnapshotException {
        ReadableConfig options = helper.getOptions();
        HashMap<String, String> snapshot = new HashMap<String, String>();
        if (options.getOptional(KafkaConnectorOptions.KEY_FORMAT).isPresent()) {
            snapshot.putAll(helper.snapshotFormat(SerializationFormatFactory.class, KafkaConnectorOptions.KEY_FORMAT));
        }
        snapshot.putAll(helper.snapshotFormat(SerializationFormatFactory.class, options.getOptional(FactoryUtil.FORMAT).isPresent() ? FactoryUtil.FORMAT : KafkaConnectorOptions.VALUE_FORMAT));
        return snapshot;
    }

    private static Map<String, String> migrateDecodingFormat(FactoryUtil.OptionMigrateHelper helper, OptionSnapshot snapshot) throws CannotMigrateException {
        ReadableConfig options = helper.getOptions();
        HashMap<String, String> mergedConfig = new HashMap<String, String>();
        mergedConfig.putAll(helper.migrateFormat(DeserializationFormatFactory.class, KafkaConnectorOptions.KEY_FORMAT, snapshot));
        mergedConfig.putAll(helper.migrateFormat(DeserializationFormatFactory.class, options.getOptional(FactoryUtil.FORMAT).isPresent() ? FactoryUtil.FORMAT : KafkaConnectorOptions.VALUE_FORMAT, snapshot));
        return mergedConfig;
    }

    private static Map<String, String> migrateEncodingFormat(FactoryUtil.OptionMigrateHelper helper, OptionSnapshot snapshot) throws CannotMigrateException {
        ReadableConfig options = helper.getOptions();
        HashMap<String, String> mergedConfig = new HashMap<String, String>();
        if (options.getOptional(KafkaConnectorOptions.KEY_FORMAT).isPresent()) {
            mergedConfig.putAll(helper.migrateFormat(SerializationFormatFactory.class, KafkaConnectorOptions.KEY_FORMAT, snapshot));
        }
        mergedConfig.putAll(helper.migrateFormat(SerializationFormatFactory.class, options.getOptional(FactoryUtil.FORMAT).isPresent() ? FactoryUtil.FORMAT : KafkaConnectorOptions.VALUE_FORMAT, snapshot));
        return mergedConfig;
    }

    private static Set<String> notAllowedChangedOptions() {
        return Stream.of(KafkaConnectorOptions.PROPS_BOOTSTRAP_SERVERS, KafkaConnectorOptions.TOPIC, KafkaConnectorOptions.TOPIC_PATTERN, KafkaConnectorOptions.KEY_FIELDS, KafkaConnectorOptions.KEY_FIELDS_PREFIX, KafkaConnectorOptions.SCAN_HEADER_FILTER, KafkaConnectorOptions.DELIVERY_GUARANTEE, KafkaConnectorOptions.SCAN_BOUNDED_MODE, KafkaConnectorOptions.SCAN_STARTUP_SPECIFIC_OFFSETS, KafkaConnectorOptions.SCAN_STARTUP_TIMESTAMP_MILLIS, KafkaConnectorOptions.SINK_PARTITIONER).map(ConfigOption::key).collect(Collectors.toSet());
    }
}

