/*
 * Decompiled with CFR 0.152.
 */
package org.apache.iceberg.flink;

import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ConfigOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.catalog.CatalogBaseTable;
import org.apache.flink.table.catalog.CatalogDatabase;
import org.apache.flink.table.catalog.CatalogDatabaseImpl;
import org.apache.flink.table.catalog.ObjectIdentifier;
import org.apache.flink.table.catalog.ObjectPath;
import org.apache.flink.table.catalog.ResolvedCatalogTable;
import org.apache.flink.table.catalog.exceptions.DatabaseAlreadyExistException;
import org.apache.flink.table.catalog.exceptions.TableAlreadyExistException;
import org.apache.flink.table.connector.sink.DynamicTableSink;
import org.apache.flink.table.connector.source.DynamicTableSource;
import org.apache.flink.table.factories.DynamicTableFactory;
import org.apache.flink.table.factories.DynamicTableSinkFactory;
import org.apache.flink.table.factories.DynamicTableSourceFactory;
import org.apache.flink.table.utils.TableSchemaUtils;
import org.apache.flink.util.Preconditions;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.exceptions.AlreadyExistsException;
import org.apache.iceberg.flink.FlinkCatalog;
import org.apache.iceberg.flink.FlinkCatalogFactory;
import org.apache.iceberg.flink.IcebergTableSink;
import org.apache.iceberg.flink.TableLoader;
import org.apache.iceberg.flink.source.IcebergTableSource;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;

public class FlinkDynamicTableFactory
implements DynamicTableSinkFactory,
DynamicTableSourceFactory {
    static final String FACTORY_IDENTIFIER = "iceberg";
    private static final ConfigOption<String> CATALOG_NAME = ConfigOptions.key((String)"catalog-name").stringType().noDefaultValue().withDescription("Catalog name");
    private static final ConfigOption<String> CATALOG_TYPE = ConfigOptions.key((String)"catalog-type").stringType().noDefaultValue().withDescription("Catalog type, the optional types are: custom, hadoop, hive.");
    private static final ConfigOption<String> CATALOG_DATABASE = ConfigOptions.key((String)"catalog-database").stringType().defaultValue((Object)"default").withDescription("Database name managed in the iceberg catalog.");
    private static final ConfigOption<String> CATALOG_TABLE = ConfigOptions.key((String)"catalog-table").stringType().noDefaultValue().withDescription("Table name managed in the underlying iceberg catalog and database.");
    private final FlinkCatalog catalog;

    public FlinkDynamicTableFactory() {
        this.catalog = null;
    }

    public FlinkDynamicTableFactory(FlinkCatalog catalog) {
        this.catalog = catalog;
    }

    public DynamicTableSource createDynamicTableSource(DynamicTableFactory.Context context) {
        ObjectIdentifier objectIdentifier = context.getObjectIdentifier();
        ResolvedCatalogTable catalogTable = context.getCatalogTable();
        Map tableProps = catalogTable.getOptions();
        TableSchema tableSchema = TableSchemaUtils.getPhysicalSchema((TableSchema)catalogTable.getSchema());
        TableLoader tableLoader = this.catalog != null ? FlinkDynamicTableFactory.createTableLoader(this.catalog, objectIdentifier.toObjectPath()) : FlinkDynamicTableFactory.createTableLoader((CatalogBaseTable)catalogTable, tableProps, objectIdentifier.getDatabaseName(), objectIdentifier.getObjectName());
        return new IcebergTableSource(tableLoader, tableSchema, tableProps, context.getConfiguration());
    }

    public DynamicTableSink createDynamicTableSink(DynamicTableFactory.Context context) {
        ObjectIdentifier objectIdentifier = context.getObjectIdentifier();
        ResolvedCatalogTable catalogTable = context.getCatalogTable();
        Map writeProps = catalogTable.getOptions();
        TableSchema tableSchema = TableSchemaUtils.getPhysicalSchema((TableSchema)catalogTable.getSchema());
        TableLoader tableLoader = this.catalog != null ? FlinkDynamicTableFactory.createTableLoader(this.catalog, objectIdentifier.toObjectPath()) : FlinkDynamicTableFactory.createTableLoader((CatalogBaseTable)catalogTable, writeProps, objectIdentifier.getDatabaseName(), objectIdentifier.getObjectName());
        return new IcebergTableSink(tableLoader, tableSchema, context.getConfiguration(), writeProps);
    }

    public Set<ConfigOption<?>> requiredOptions() {
        HashSet<ConfigOption<?>> options = Sets.newHashSet();
        options.add(CATALOG_TYPE);
        options.add(CATALOG_NAME);
        return options;
    }

    public Set<ConfigOption<?>> optionalOptions() {
        HashSet<ConfigOption<?>> options = Sets.newHashSet();
        options.add(CATALOG_DATABASE);
        options.add(CATALOG_TABLE);
        return options;
    }

    public String factoryIdentifier() {
        return FACTORY_IDENTIFIER;
    }

    private static TableLoader createTableLoader(CatalogBaseTable catalogBaseTable, Map<String, String> tableProps, String databaseName, String tableName) {
        Configuration flinkConf = new Configuration();
        tableProps.forEach((arg_0, arg_1) -> ((Configuration)flinkConf).setString(arg_0, arg_1));
        String catalogName = flinkConf.getString(CATALOG_NAME);
        Preconditions.checkNotNull((Object)catalogName, (String)"Table property '%s' cannot be null", (Object[])new Object[]{CATALOG_NAME.key()});
        String catalogDatabase = flinkConf.getString(CATALOG_DATABASE, databaseName);
        Preconditions.checkNotNull((Object)catalogDatabase, (String)"The iceberg database name cannot be null");
        String catalogTable = flinkConf.getString(CATALOG_TABLE, tableName);
        Preconditions.checkNotNull((Object)catalogTable, (String)"The iceberg table name cannot be null");
        org.apache.hadoop.conf.Configuration hadoopConf = FlinkCatalogFactory.clusterHadoopConf();
        FlinkCatalogFactory factory = new FlinkCatalogFactory();
        FlinkCatalog flinkCatalog = (FlinkCatalog)factory.createCatalog(catalogName, tableProps, hadoopConf);
        ObjectPath objectPath = new ObjectPath(catalogDatabase, catalogTable);
        if (!flinkCatalog.databaseExists(catalogDatabase)) {
            try {
                flinkCatalog.createDatabase(catalogDatabase, (CatalogDatabase)new CatalogDatabaseImpl(Maps.newHashMap(), null), true);
            }
            catch (DatabaseAlreadyExistException e) {
                throw new AlreadyExistsException(e, "Database %s already exists in the iceberg catalog %s.", catalogName, catalogDatabase);
            }
        }
        if (!flinkCatalog.tableExists(objectPath)) {
            try {
                flinkCatalog.createIcebergTable(objectPath, catalogBaseTable, true);
            }
            catch (TableAlreadyExistException e) {
                throw new AlreadyExistsException(e, "Table %s already exists in the database %s and catalog %s", catalogTable, catalogDatabase, catalogName);
            }
        }
        return TableLoader.fromCatalog(flinkCatalog.getCatalogLoader(), TableIdentifier.of(catalogDatabase, catalogTable));
    }

    private static TableLoader createTableLoader(FlinkCatalog catalog, ObjectPath objectPath) {
        Preconditions.checkNotNull((Object)((Object)catalog), (String)"Flink catalog cannot be null");
        return TableLoader.fromCatalog(catalog.getCatalogLoader(), catalog.toIdentifier(objectPath));
    }
}

