/*
 * Decompiled with CFR 0.152.
 */
package org.apache.paimon.flink.factories;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.ServiceLoader;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ConfigOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.ConfigurationUtils;
import org.apache.flink.configuration.DelegatingConfiguration;
import org.apache.flink.configuration.FallbackKey;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.table.api.TableException;
import org.apache.flink.table.api.ValidationException;
import org.apache.flink.table.connector.format.DecodingFormat;
import org.apache.flink.table.connector.format.EncodingFormat;
import org.apache.flink.table.factories.DecodingFormatFactory;
import org.apache.flink.table.factories.DynamicTableFactory;
import org.apache.flink.table.factories.EncodingFormatFactory;
import org.apache.flink.table.factories.Factory;
import org.apache.flink.table.factories.FormatFactory;
import org.apache.paimon.flink.log.LogStoreTableFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public final class FlinkFactoryUtil {
    private static final Logger LOG = LoggerFactory.getLogger(FlinkFactoryUtil.class);
    public static final ConfigOption<String> FORMAT = ConfigOptions.key((String)"format").stringType().noDefaultValue().withDescription("Defines the format identifier for encoding data. The identifier is used to discover a suitable format factory.");
    public static final String FORMAT_SUFFIX = ".format";

    public static FlinkTableFactoryHelper createFlinkTableFactoryHelper(LogStoreTableFactory factory, DynamicTableFactory.Context context) {
        return new FlinkTableFactoryHelper(factory, context);
    }

    public static <T extends Factory> T discoverFlinkFactory(ClassLoader classLoader, Class<T> factoryClass, String factoryIdentifier) {
        List<Factory> factories = FlinkFactoryUtil.discoverFlinkFactories(classLoader);
        List foundFactories = factories.stream().filter(f -> factoryClass.isAssignableFrom(f.getClass())).collect(Collectors.toList());
        if (foundFactories.isEmpty()) {
            throw new ValidationException(String.format("Could not find any factories that implement '%s' in the classpath.", factoryClass.getName()));
        }
        List matchingFactories = foundFactories.stream().filter(f -> f.factoryIdentifier().equals(factoryIdentifier)).collect(Collectors.toList());
        if (matchingFactories.isEmpty()) {
            throw new ValidationException(String.format("Could not find any factory for identifier '%s' that implements '%s' in the classpath.\n\nAvailable factory identifiers are:\n\n%s", factoryIdentifier, factoryClass.getName(), foundFactories.stream().map(Factory::factoryIdentifier).distinct().sorted().collect(Collectors.joining("\n"))));
        }
        if (matchingFactories.size() > 1) {
            throw new ValidationException(String.format("Multiple factories for identifier '%s' that implement '%s' found in the classpath.\n\nAmbiguous factory classes are:\n\n%s", factoryIdentifier, factoryClass.getName(), matchingFactories.stream().map(f -> f.getClass().getName()).sorted().collect(Collectors.joining("\n"))));
        }
        return (T)((Factory)matchingFactories.get(0));
    }

    public static String getFormatPrefix(ConfigOption<String> formatOption, String formatIdentifier) {
        String formatOptionKey = formatOption.key();
        if (formatOptionKey.equals(FORMAT.key())) {
            return formatIdentifier + ".";
        }
        if (formatOptionKey.endsWith(FORMAT_SUFFIX)) {
            String keyPrefix = formatOptionKey.substring(0, formatOptionKey.length() - FORMAT_SUFFIX.length());
            return keyPrefix + "." + formatIdentifier + ".";
        }
        throw new ValidationException("Format identifier key should be 'format' or suffix with '.format', don't support format identifier key '" + formatOptionKey + "'.");
    }

    static List<Factory> discoverFlinkFactories(ClassLoader classLoader) {
        Iterator<Factory> serviceLoaderIterator = ServiceLoader.load(Factory.class, classLoader).iterator();
        ArrayList<Factory> loadResults = new ArrayList<Factory>();
        while (true) {
            try {
                while (serviceLoaderIterator.hasNext()) {
                    loadResults.add(serviceLoaderIterator.next());
                }
            }
            catch (Throwable t) {
                if (t instanceof NoClassDefFoundError) {
                    LOG.debug("NoClassDefFoundError when loading a " + LogStoreTableFactory.class.getCanonicalName() + ". This is expected when trying to load a format dependency but no flink-connector-files is loaded.", t);
                    continue;
                }
                throw new TableException("Unexpected error when trying to load service provider.", t);
            }
            break;
        }
        return loadResults;
    }

    private static Set<String> allKeysExpanded(ConfigOption<?> option, Set<String> actualKeys) {
        return FlinkFactoryUtil.allKeysExpanded("", option, actualKeys);
    }

    private static Set<String> allKeysExpanded(String prefix, ConfigOption<?> option, Set<String> actualKeys) {
        Set<String> staticKeys = FlinkFactoryUtil.allKeys(option).map(k -> prefix + k).collect(Collectors.toSet());
        if (!ConfigurationUtils.canBePrefixMap(option)) {
            return staticKeys;
        }
        return Stream.concat(staticKeys.stream(), staticKeys.stream().flatMap(k -> actualKeys.stream().filter(c -> ConfigurationUtils.filterPrefixMapKey((String)k, (String)c)))).collect(Collectors.toSet());
    }

    private static Stream<String> allKeys(ConfigOption<?> option) {
        return Stream.concat(Stream.of(option.key()), FlinkFactoryUtil.fallbackKeys(option));
    }

    private static Stream<String> fallbackKeys(ConfigOption<?> option) {
        return StreamSupport.stream(option.fallbackKeys().spliterator(), false).map(FallbackKey::getKey);
    }

    private static Stream<String> deprecatedKeys(ConfigOption<?> option) {
        return StreamSupport.stream(option.fallbackKeys().spliterator(), false).filter(FallbackKey::isDeprecated).map(FallbackKey::getKey);
    }

    private FlinkFactoryUtil() {
    }

    public static class FlinkTableFactoryHelper
    extends FlinkFactoryHelper<LogStoreTableFactory> {
        private final DynamicTableFactory.Context context;
        private final Configuration enrichingOptions;

        private FlinkTableFactoryHelper(LogStoreTableFactory tableFactory, DynamicTableFactory.Context context) {
            super(tableFactory, context.getCatalogTable().getOptions(), new ConfigOption[0]);
            this.context = context;
            this.enrichingOptions = Configuration.fromMap((Map)context.getEnrichmentOptions());
        }

        @Override
        public ReadableConfig getOptions() {
            return super.getOptions();
        }

        public <I, F extends DecodingFormatFactory<I>> DecodingFormat<I> discoverDecodingFormat(Class<F> formatFactoryClass, ConfigOption<String> formatOption) {
            return this.discoverOptionalDecodingFormat(formatFactoryClass, formatOption).orElseThrow(() -> new ValidationException(String.format("Could not find required scan format '%s'.", formatOption.key())));
        }

        public <I, F extends DecodingFormatFactory<I>> Optional<DecodingFormat<I>> discoverOptionalDecodingFormat(Class<F> formatFactoryClass, ConfigOption<String> formatOption) {
            return this.discoverOptionalFormatFactory(formatFactoryClass, formatOption).map(formatFactory -> {
                String formatPrefix = this.formatFlinkPrefix((Factory)formatFactory, formatOption);
                try {
                    return formatFactory.createDecodingFormat(this.context, this.createFormatOptions(formatPrefix, (FormatFactory)formatFactory));
                }
                catch (Throwable t) {
                    throw new ValidationException(String.format("Error creating scan format '%s' in option space '%s'.", formatFactory.factoryIdentifier(), formatPrefix), t);
                }
            });
        }

        public <I, F extends EncodingFormatFactory<I>> EncodingFormat<I> discoverEncodingFormat(Class<F> formatFactoryClass, ConfigOption<String> formatOption) {
            return this.discoverOptionalEncodingFormat(formatFactoryClass, formatOption).orElseThrow(() -> new ValidationException(String.format("Could not find required sink format '%s'.", formatOption.key())));
        }

        public <I, F extends EncodingFormatFactory<I>> Optional<EncodingFormat<I>> discoverOptionalEncodingFormat(Class<F> formatFactoryClass, ConfigOption<String> formatOption) {
            return this.discoverOptionalFormatFactory(formatFactoryClass, formatOption).map(formatFactory -> {
                String formatPrefix = this.formatFlinkPrefix((Factory)formatFactory, formatOption);
                try {
                    return formatFactory.createEncodingFormat(this.context, this.createFormatOptions(formatPrefix, (FormatFactory)formatFactory));
                }
                catch (Throwable t) {
                    throw new ValidationException(String.format("Error creating sink format '%s' in option space '%s'.", formatFactory.factoryIdentifier(), formatPrefix), t);
                }
            });
        }

        private <F extends Factory> Optional<F> discoverOptionalFormatFactory(Class<F> formatFactoryClass, ConfigOption<String> formatOption) {
            String identifier = (String)this.allOptions.get(formatOption);
            this.checkFormatIdentifierMatchesWithEnrichingOptions(formatOption, identifier);
            if (identifier == null) {
                return Optional.empty();
            }
            F factory = FlinkFactoryUtil.discoverFlinkFactory(this.context.getClassLoader(), formatFactoryClass, identifier);
            String formatPrefix = this.formatFlinkPrefix((Factory)factory, formatOption);
            ArrayList consumedOptions = new ArrayList();
            consumedOptions.addAll(factory.requiredOptions());
            consumedOptions.addAll(factory.optionalOptions());
            consumedOptions.stream().flatMap(option -> FlinkFactoryUtil.allKeysExpanded(formatPrefix, option, this.allOptions.keySet()).stream()).forEach(this.consumedOptionKeys::add);
            consumedOptions.stream().flatMap(x$0 -> FlinkFactoryUtil.deprecatedKeys(x$0)).map(k -> formatPrefix + k).forEach(this.deprecatedOptionKeys::add);
            return Optional.of(factory);
        }

        private String formatFlinkPrefix(Factory formatFactory, ConfigOption<String> formatOption) {
            String identifier = formatFactory.factoryIdentifier();
            return FlinkFactoryUtil.getFormatPrefix(formatOption, identifier);
        }

        private ReadableConfig createFormatOptions(String formatPrefix, FormatFactory formatFactory) {
            Set forwardableConfigOptions = formatFactory.forwardOptions();
            DelegatingConfiguration formatConf = new DelegatingConfiguration(this.allOptions, formatPrefix);
            if (forwardableConfigOptions.isEmpty()) {
                return formatConf;
            }
            DelegatingConfiguration formatConfFromEnrichingOptions = new DelegatingConfiguration(this.enrichingOptions, formatPrefix);
            for (ConfigOption option : forwardableConfigOptions) {
                formatConfFromEnrichingOptions.getOptional(option).ifPresent(arg_0 -> FlinkTableFactoryHelper.lambda$createFormatOptions$7((Configuration)formatConf, option, arg_0));
            }
            return formatConf;
        }

        private void checkFormatIdentifierMatchesWithEnrichingOptions(ConfigOption<String> formatOption, String identifierFromPlan) {
            Optional identifierFromEnrichingOptions = this.enrichingOptions.getOptional(formatOption);
            if (!identifierFromEnrichingOptions.isPresent()) {
                return;
            }
            if (identifierFromPlan == null) {
                throw new ValidationException(String.format("The persisted plan has no format option '%s' specified, while the catalog table has it with value '%s'. This is invalid, as either only the persisted plan table defines the format, or both the persisted plan table and the catalog table defines the same format.", formatOption, identifierFromEnrichingOptions.get()));
            }
            if (!Objects.equals(identifierFromPlan, identifierFromEnrichingOptions.get())) {
                throw new ValidationException(String.format("Both persisted plan table and catalog table define the format option '%s', but they mismatch: '%s' != '%s'.", formatOption, identifierFromPlan, identifierFromEnrichingOptions.get()));
            }
        }

        private static /* synthetic */ void lambda$createFormatOptions$7(Configuration formatConf, ConfigOption option, Object o) {
            formatConf.set(option, o);
        }
    }

    public static class FlinkFactoryHelper<F extends LogStoreTableFactory> {
        protected final F factory;
        protected final Configuration allOptions;
        protected final Set<String> consumedOptionKeys;
        protected final Set<String> deprecatedOptionKeys;

        public FlinkFactoryHelper(F factory, Map<String, String> configuration, ConfigOption<?> ... implicitOptions) {
            this.factory = factory;
            this.allOptions = Configuration.fromMap(configuration);
            ArrayList consumedOptions = new ArrayList();
            consumedOptions.addAll(Arrays.asList(implicitOptions));
            this.consumedOptionKeys = consumedOptions.stream().flatMap(option -> FlinkFactoryUtil.allKeysExpanded(option, this.allOptions.keySet()).stream()).collect(Collectors.toSet());
            this.deprecatedOptionKeys = consumedOptions.stream().flatMap(x$0 -> FlinkFactoryUtil.deprecatedKeys(x$0)).collect(Collectors.toSet());
        }

        public ReadableConfig getOptions() {
            return this.allOptions;
        }
    }
}

