/*
 * Decompiled with CFR 0.152.
 */
package org.apache.iceberg.flink;

import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.table.catalog.CatalogDatabase;
import org.apache.flink.table.catalog.CatalogDatabaseImpl;
import org.apache.flink.table.catalog.Column;
import org.apache.flink.table.catalog.ObjectIdentifier;
import org.apache.flink.table.catalog.ObjectPath;
import org.apache.flink.table.catalog.ResolvedCatalogTable;
import org.apache.flink.table.catalog.ResolvedSchema;
import org.apache.flink.table.catalog.exceptions.DatabaseAlreadyExistException;
import org.apache.flink.table.catalog.exceptions.TableAlreadyExistException;
import org.apache.flink.table.connector.sink.DynamicTableSink;
import org.apache.flink.table.connector.source.DynamicTableSource;
import org.apache.flink.table.factories.DynamicTableFactory;
import org.apache.flink.table.factories.DynamicTableSinkFactory;
import org.apache.flink.table.factories.DynamicTableSourceFactory;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.exceptions.AlreadyExistsException;
import org.apache.iceberg.flink.FlinkCatalog;
import org.apache.iceberg.flink.FlinkCatalogFactory;
import org.apache.iceberg.flink.FlinkCreateTableOptions;
import org.apache.iceberg.flink.IcebergTableSink;
import org.apache.iceberg.flink.TableLoader;
import org.apache.iceberg.flink.source.IcebergTableSource;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;

public class FlinkDynamicTableFactory
implements DynamicTableSinkFactory,
DynamicTableSourceFactory {
    static final String FACTORY_IDENTIFIER = "iceberg";
    private final FlinkCatalog catalog;

    public FlinkDynamicTableFactory() {
        this.catalog = null;
    }

    public FlinkDynamicTableFactory(FlinkCatalog catalog) {
        this.catalog = catalog;
    }

    public DynamicTableSource createDynamicTableSource(DynamicTableFactory.Context context) {
        ObjectIdentifier objectIdentifier = context.getObjectIdentifier();
        ResolvedCatalogTable resolvedCatalogTable = context.getCatalogTable();
        Map tableProps = resolvedCatalogTable.getOptions();
        ResolvedSchema resolvedSchema = ResolvedSchema.of(resolvedCatalogTable.getResolvedSchema().getColumns().stream().filter(Column::isPhysical).collect(Collectors.toList()));
        TableLoader tableLoader = this.catalog != null ? FlinkDynamicTableFactory.createTableLoader(this.catalog, objectIdentifier.toObjectPath()) : FlinkDynamicTableFactory.createTableLoader(resolvedCatalogTable, tableProps, objectIdentifier.getDatabaseName(), objectIdentifier.getObjectName());
        return new IcebergTableSource(tableLoader, resolvedSchema, tableProps, context.getConfiguration());
    }

    public DynamicTableSink createDynamicTableSink(DynamicTableFactory.Context context) {
        ObjectIdentifier objectIdentifier = context.getObjectIdentifier();
        ResolvedCatalogTable resolvedCatalogTable = context.getCatalogTable();
        Map writeProps = resolvedCatalogTable.getOptions();
        ResolvedSchema resolvedSchema = ResolvedSchema.of(resolvedCatalogTable.getResolvedSchema().getColumns().stream().filter(Column::isPhysical).collect(Collectors.toList()));
        TableLoader tableLoader = this.catalog != null ? FlinkDynamicTableFactory.createTableLoader(this.catalog, objectIdentifier.toObjectPath()) : FlinkDynamicTableFactory.createTableLoader(resolvedCatalogTable, writeProps, objectIdentifier.getDatabaseName(), objectIdentifier.getObjectName());
        return new IcebergTableSink(tableLoader, resolvedSchema, context.getConfiguration(), (Map<String, String>)writeProps);
    }

    public Set<ConfigOption<?>> requiredOptions() {
        HashSet<ConfigOption<?>> options = Sets.newHashSet();
        options.add(FlinkCreateTableOptions.CATALOG_TYPE);
        options.add(FlinkCreateTableOptions.CATALOG_NAME);
        return options;
    }

    public Set<ConfigOption<?>> optionalOptions() {
        HashSet<ConfigOption<?>> options = Sets.newHashSet();
        options.add(FlinkCreateTableOptions.CATALOG_DATABASE);
        options.add(FlinkCreateTableOptions.CATALOG_TABLE);
        return options;
    }

    public String factoryIdentifier() {
        return FACTORY_IDENTIFIER;
    }

    private static TableLoader createTableLoader(ResolvedCatalogTable resolvedCatalogTable, Map<String, String> tableProps, String databaseName, String tableName) {
        Configuration flinkConf = new Configuration();
        Map<String, String> mergedProps = FlinkDynamicTableFactory.mergeSrcCatalogProps(tableProps);
        mergedProps.forEach((arg_0, arg_1) -> ((Configuration)flinkConf).setString(arg_0, arg_1));
        String catalogName = flinkConf.getString(FlinkCreateTableOptions.CATALOG_NAME);
        Preconditions.checkNotNull(catalogName, "Table property '%s' cannot be null", (Object)FlinkCreateTableOptions.CATALOG_NAME.key());
        String catalogDatabase = flinkConf.getString(FlinkCreateTableOptions.CATALOG_DATABASE, databaseName);
        Preconditions.checkNotNull(catalogDatabase, "The iceberg database name cannot be null");
        String catalogTable = flinkConf.getString(FlinkCreateTableOptions.CATALOG_TABLE, tableName);
        Preconditions.checkNotNull(catalogTable, "The iceberg table name cannot be null");
        org.apache.hadoop.conf.Configuration hadoopConf = FlinkCatalogFactory.clusterHadoopConf();
        FlinkCatalogFactory factory = new FlinkCatalogFactory();
        FlinkCatalog flinkCatalog = (FlinkCatalog)factory.createCatalog(catalogName, mergedProps, hadoopConf);
        ObjectPath objectPath = new ObjectPath(catalogDatabase, catalogTable);
        if (!flinkCatalog.databaseExists(catalogDatabase)) {
            try {
                flinkCatalog.createDatabase(catalogDatabase, (CatalogDatabase)new CatalogDatabaseImpl(Maps.newHashMap(), null), true);
            }
            catch (DatabaseAlreadyExistException e) {
                throw new AlreadyExistsException(e, "Database %s already exists in the iceberg catalog %s.", catalogName, catalogDatabase);
            }
        }
        if (!flinkCatalog.tableExists(objectPath)) {
            try {
                flinkCatalog.createIcebergTable(objectPath, resolvedCatalogTable, true);
            }
            catch (TableAlreadyExistException e) {
                throw new AlreadyExistsException(e, "Table %s already exists in the database %s and catalog %s", catalogTable, catalogDatabase, catalogName);
            }
        }
        return TableLoader.fromCatalog(flinkCatalog.getCatalogLoader(), TableIdentifier.of(catalogDatabase, catalogTable));
    }

    private static Map<String, String> mergeSrcCatalogProps(Map<String, String> tableProps) {
        String srcCatalogProps = tableProps.get("src-catalog");
        if (srcCatalogProps != null) {
            HashMap<String, String> mergedProps = Maps.newHashMap();
            FlinkCreateTableOptions createTableOptions = FlinkCreateTableOptions.fromJson(srcCatalogProps);
            mergedProps.put(FlinkCreateTableOptions.CATALOG_NAME.key(), createTableOptions.catalogName());
            mergedProps.put(FlinkCreateTableOptions.CATALOG_DATABASE.key(), createTableOptions.catalogDb());
            mergedProps.put(FlinkCreateTableOptions.CATALOG_TABLE.key(), createTableOptions.catalogTable());
            mergedProps.putAll(createTableOptions.catalogProps());
            tableProps.forEach((k, v) -> {
                if (!"src-catalog".equals(k)) {
                    mergedProps.put((String)k, (String)v);
                }
            });
            return Collections.unmodifiableMap(mergedProps);
        }
        return tableProps;
    }

    private static TableLoader createTableLoader(FlinkCatalog catalog, ObjectPath objectPath) {
        Preconditions.checkNotNull(catalog, "Flink catalog cannot be null");
        return TableLoader.fromCatalog(catalog.getCatalogLoader(), catalog.toIdentifier(objectPath));
    }
}

