/*
 * Decompiled with CFR 0.152.
 */
package org.apache.paimon.flink;

import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.regex.Pattern;
import javax.annotation.Nullable;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.ExecutionOptions;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.table.api.TableConfig;
import org.apache.flink.table.api.ValidationException;
import org.apache.flink.table.catalog.CatalogBaseTable;
import org.apache.flink.table.catalog.CatalogTable;
import org.apache.flink.table.connector.sink.DynamicTableSink;
import org.apache.flink.table.connector.source.DynamicTableSource;
import org.apache.flink.table.factories.DynamicTableFactory;
import org.apache.flink.table.factories.DynamicTableSinkFactory;
import org.apache.flink.table.factories.DynamicTableSourceFactory;
import org.apache.flink.table.types.logical.RowType;
import org.apache.paimon.CoreOptions;
import org.apache.paimon.annotation.VisibleForTesting;
import org.apache.paimon.catalog.Catalog;
import org.apache.paimon.catalog.CatalogContext;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.flink.DataCatalogTable;
import org.apache.paimon.flink.FlinkCatalog;
import org.apache.paimon.flink.FlinkConnectorOptions;
import org.apache.paimon.flink.FlinkFileIOLoader;
import org.apache.paimon.flink.FormatCatalogTable;
import org.apache.paimon.flink.LogicalTypeConversion;
import org.apache.paimon.flink.SystemCatalogTable;
import org.apache.paimon.flink.log.LogStoreTableFactory;
import org.apache.paimon.flink.sink.FlinkTableSink;
import org.apache.paimon.flink.source.DataTableSource;
import org.apache.paimon.flink.source.SystemTableSource;
import org.apache.paimon.options.Options;
import org.apache.paimon.options.OptionsUtils;
import org.apache.paimon.schema.Schema;
import org.apache.paimon.schema.SchemaManager;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.FileStoreTableFactory;
import org.apache.paimon.table.Table;
import org.apache.paimon.utils.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class AbstractFlinkTableFactory
implements DynamicTableSourceFactory,
DynamicTableSinkFactory {
    private static final Logger LOG = LoggerFactory.getLogger(AbstractFlinkTableFactory.class);
    @Nullable
    private final FlinkCatalog flinkCatalog;

    public AbstractFlinkTableFactory(@Nullable FlinkCatalog flinkCatalog) {
        this.flinkCatalog = flinkCatalog;
    }

    public DynamicTableSource createDynamicTableSource(DynamicTableFactory.Context context) {
        CatalogTable origin = context.getCatalogTable().getOrigin();
        Table table = origin instanceof SystemCatalogTable ? ((SystemCatalogTable)origin).table() : this.buildPaimonTable(context);
        boolean unbounded = context.getConfiguration().get(ExecutionOptions.RUNTIME_MODE) == RuntimeExecutionMode.STREAMING;
        Map<String, String> options = table.options();
        if (options.containsKey(FlinkConnectorOptions.SCAN_BOUNDED.key()) && Boolean.parseBoolean(options.get(FlinkConnectorOptions.SCAN_BOUNDED.key()))) {
            unbounded = false;
        }
        if (origin instanceof SystemCatalogTable) {
            return new SystemTableSource(table, unbounded, context.getObjectIdentifier());
        }
        return new DataTableSource(context.getObjectIdentifier(), table, unbounded, context, AbstractFlinkTableFactory.createOptionalLogStoreFactory(context).orElse(null));
    }

    public DynamicTableSink createDynamicTableSink(DynamicTableFactory.Context context) {
        return new FlinkTableSink(context.getObjectIdentifier(), this.buildPaimonTable(context), context, AbstractFlinkTableFactory.createOptionalLogStoreFactory(context).orElse(null));
    }

    public Set<ConfigOption<?>> requiredOptions() {
        return Collections.emptySet();
    }

    public Set<ConfigOption<?>> optionalOptions() {
        return new HashSet();
    }

    public static Optional<LogStoreTableFactory> createOptionalLogStoreFactory(DynamicTableFactory.Context context) {
        return AbstractFlinkTableFactory.createOptionalLogStoreFactory(context.getClassLoader(), context.getCatalogTable().getOptions());
    }

    static Optional<LogStoreTableFactory> createOptionalLogStoreFactory(ClassLoader classLoader, Map<String, String> options) {
        Options configOptions = new Options();
        options.forEach(configOptions::setString);
        if (configOptions.get(FlinkConnectorOptions.LOG_SYSTEM).equalsIgnoreCase("none")) {
            AbstractFlinkTableFactory.validateFileStoreContinuous(configOptions);
            return Optional.empty();
        }
        if (configOptions.get(CoreOptions.SCAN_MODE) == CoreOptions.StartupMode.FROM_SNAPSHOT || configOptions.get(CoreOptions.SCAN_MODE) == CoreOptions.StartupMode.FROM_SNAPSHOT_FULL) {
            throw new ValidationException(String.format("Log system does not support %s and %s scan mode", CoreOptions.StartupMode.FROM_SNAPSHOT, CoreOptions.StartupMode.FROM_SNAPSHOT_FULL));
        }
        return Optional.of(LogStoreTableFactory.discoverLogStoreFactory(classLoader, configOptions.get(FlinkConnectorOptions.LOG_SYSTEM)));
    }

    private static void validateFileStoreContinuous(Options options) {
        CoreOptions.LogChangelogMode changelogMode = options.get(CoreOptions.LOG_CHANGELOG_MODE);
        CoreOptions.StreamingReadMode streamingReadMode = options.get(CoreOptions.STREAMING_READ_MODE);
        if (changelogMode == CoreOptions.LogChangelogMode.UPSERT) {
            throw new ValidationException("File store continuous reading does not support upsert changelog mode.");
        }
        CoreOptions.LogConsistency consistency = options.get(CoreOptions.LOG_CONSISTENCY);
        if (consistency == CoreOptions.LogConsistency.EVENTUAL) {
            throw new ValidationException("File store continuous reading does not support eventual consistency mode.");
        }
        if (streamingReadMode == CoreOptions.StreamingReadMode.LOG) {
            throw new ValidationException("File store continuous reading does not support the log streaming read mode.");
        }
    }

    static CatalogContext createCatalogContext(DynamicTableFactory.Context context) {
        return CatalogContext.create(Options.fromMap(context.getCatalogTable().getOptions()), new FlinkFileIOLoader());
    }

    Table buildPaimonTable(DynamicTableFactory.Context context) {
        Table table;
        CatalogTable origin = context.getCatalogTable().getOrigin();
        Map<String, String> dynamicOptions = AbstractFlinkTableFactory.getDynamicConfigOptions(context);
        dynamicOptions.forEach((key, newValue) -> {
            String oldValue = (String)origin.getOptions().get(key);
            if (!Objects.equals(oldValue, newValue)) {
                SchemaManager.checkAlterTableOption(key, oldValue, newValue, true);
            }
        });
        HashMap<String, String> newOptions = new HashMap<String, String>();
        newOptions.putAll(origin.getOptions());
        newOptions.putAll(dynamicOptions);
        if (origin instanceof FormatCatalogTable) {
            table = ((FormatCatalogTable)origin).table();
        } else {
            FileStoreTable fileStoreTable;
            if (origin instanceof DataCatalogTable) {
                fileStoreTable = (FileStoreTable)((DataCatalogTable)origin).table();
            } else if (this.flinkCatalog == null) {
                fileStoreTable = FileStoreTableFactory.create(AbstractFlinkTableFactory.createCatalogContext(context));
            } else {
                Identifier identifier = Identifier.create(context.getObjectIdentifier().getDatabaseName(), context.getObjectIdentifier().getObjectName());
                try {
                    fileStoreTable = (FileStoreTable)this.flinkCatalog.catalog().getTable(identifier);
                }
                catch (Catalog.TableNotExistException e) {
                    throw new RuntimeException(e);
                }
            }
            table = fileStoreTable.copyWithoutTimeTravel(newOptions);
        }
        if (Options.fromMap(table.options()).get(FlinkConnectorOptions.FILESYSTEM_JOB_LEVEL_SETTINGS_ENABLED).booleanValue()) {
            Map<String, String> runtimeContext = AbstractFlinkTableFactory.getAllOptions(context);
            table.fileIO().setRuntimeContext(runtimeContext);
        }
        Schema schema = FlinkCatalog.fromCatalogTable((CatalogBaseTable)context.getCatalogTable());
        RowType rowType = LogicalTypeConversion.toLogicalType(schema.rowType());
        List<String> partitionKeys = schema.partitionKeys();
        List<String> primaryKeys = schema.primaryKeys();
        Preconditions.checkArgument(AbstractFlinkTableFactory.schemaEquals(LogicalTypeConversion.toLogicalType(table.rowType()), rowType), "Flink schema and store schema are not the same, store schema is %s, Flink schema is %s", table.rowType(), rowType);
        Preconditions.checkArgument(table.partitionKeys().equals(partitionKeys), "Flink partitionKeys and store partitionKeys are not the same, store partitionKeys is %s, Flink partitionKeys is %s", table.partitionKeys(), partitionKeys);
        Preconditions.checkArgument(table.primaryKeys().equals(primaryKeys), "Flink primaryKeys and store primaryKeys are not the same, store primaryKeys is %s, Flink primaryKeys is %s", table.primaryKeys(), primaryKeys);
        return table;
    }

    @VisibleForTesting
    static boolean schemaEquals(RowType rowType1, RowType rowType2) {
        List fieldList1 = rowType1.getFields();
        List fieldList2 = rowType2.getFields();
        if (fieldList1.size() != fieldList2.size()) {
            return false;
        }
        for (int i = 0; i < fieldList1.size(); ++i) {
            RowType.RowField f1 = (RowType.RowField)fieldList1.get(i);
            RowType.RowField f2 = (RowType.RowField)fieldList2.get(i);
            if (f1.getName().equals(f2.getName()) && f1.getType().equals((Object)f2.getType())) continue;
            return false;
        }
        return true;
    }

    static Map<String, String> getDynamicConfigOptions(DynamicTableFactory.Context context) {
        Map<String, String> conf = AbstractFlinkTableFactory.getAllOptions(context);
        String template = String.format("(%s)(%s|\\*)\\.(%s|\\*)\\.(%s|\\*)\\.(.+)", "paimon.", context.getObjectIdentifier().getCatalogName(), context.getObjectIdentifier().getDatabaseName(), context.getObjectIdentifier().getObjectName());
        Pattern pattern = Pattern.compile(template);
        Map<String, String> optionsFromTableConfig = OptionsUtils.convertToDynamicTableProperties(conf, "", pattern, 5);
        if (!optionsFromTableConfig.isEmpty()) {
            LOG.info("Loading dynamic table options for {} in table config: {}", (Object)context.getObjectIdentifier().getObjectName(), optionsFromTableConfig);
        }
        return optionsFromTableConfig;
    }

    static Map<String, String> getAllOptions(DynamicTableFactory.Context context) {
        ReadableConfig config = context.getConfiguration();
        if (config instanceof Configuration) {
            return ((Configuration)config).toMap();
        }
        if (config instanceof TableConfig) {
            return ((TableConfig)config).getConfiguration().toMap();
        }
        throw new IllegalArgumentException("Unexpected config: " + config.getClass());
    }
}

