/*
 * Decompiled with CFR 0.152.
 */
package org.apache.iceberg.flink;

import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.HashMap;
import java.util.Map;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.CoreOptions;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.TableException;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.catalog.Catalog;
import org.apache.flink.table.catalog.ObjectPath;
import org.apache.flink.types.Row;
import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
import org.apache.iceberg.flink.CatalogTestBase;
import org.apache.iceberg.flink.FlinkCatalogFactory;
import org.apache.iceberg.flink.FlinkConfigOptions;
import org.apache.iceberg.flink.FlinkTestBase;
import org.apache.iceberg.flink.MiniClusterResource;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.apache.thrift.TException;
import org.assertj.core.api.AbstractThrowableAssert;
import org.assertj.core.api.Assertions;
import org.junit.After;
import org.junit.Assert;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

@RunWith(value=Parameterized.class)
public class TestIcebergConnector
extends FlinkTestBase {
    private static final String TABLE_NAME = "test_table";
    @ClassRule
    public static final TemporaryFolder WAREHOUSE = new TemporaryFolder();
    private final String catalogName;
    private final Map<String, String> properties;
    private final boolean isStreaming;
    private volatile TableEnvironment tEnv;

    @Parameterized.Parameters(name="catalogName = {0}, properties = {1}, isStreaming={2}")
    public static Iterable<Object[]> parameters() {
        return Lists.newArrayList((Object[])new Object[][]{{"testhadoop", ImmutableMap.of((Object)"connector", (Object)"iceberg", (Object)"catalog-type", (Object)"hadoop"), true}, {"testhadoop", ImmutableMap.of((Object)"connector", (Object)"iceberg", (Object)"catalog-type", (Object)"hadoop", (Object)"catalog-table", (Object)"not_existing_table"), true}, {"testhadoop", ImmutableMap.of((Object)"connector", (Object)"iceberg", (Object)"catalog-type", (Object)"hadoop"), false}, {"testhadoop", ImmutableMap.of((Object)"connector", (Object)"iceberg", (Object)"catalog-type", (Object)"hadoop", (Object)"catalog-database", (Object)"not_existing_db"), true}, {"testhadoop", ImmutableMap.of((Object)"connector", (Object)"iceberg", (Object)"catalog-type", (Object)"hadoop", (Object)"catalog-database", (Object)"not_existing_db", (Object)"catalog-table", (Object)"not_existing_table"), true}, {"testhadoop", ImmutableMap.of((Object)"connector", (Object)"iceberg", (Object)"catalog-type", (Object)"hadoop", (Object)"catalog-database", (Object)"not_existing_db"), false}, {"testhive", ImmutableMap.of((Object)"connector", (Object)"iceberg", (Object)"catalog-type", (Object)"hive"), true}, {"testhive", ImmutableMap.of((Object)"connector", (Object)"iceberg", (Object)"catalog-type", (Object)"hive", (Object)"catalog-table", (Object)"not_existing_table"), true}, {"testhive", ImmutableMap.of((Object)"connector", (Object)"iceberg", (Object)"catalog-type", (Object)"hive"), false}, {"testhive", ImmutableMap.of((Object)"connector", (Object)"iceberg", (Object)"catalog-type", (Object)"hive", (Object)"catalog-database", (Object)"not_existing_db"), true}, {"testhive", ImmutableMap.of((Object)"connector", (Object)"iceberg", (Object)"catalog-type", (Object)"hive", (Object)"catalog-database", (Object)"not_existing_db", (Object)"catalog-table", (Object)"not_existing_table"), true}, {"testhive", ImmutableMap.of((Object)"connector", (Object)"iceberg", (Object)"catalog-type", (Object)"hive", (Object)"catalog-database", (Object)"not_existing_db"), false}});
    }

    public TestIcebergConnector(String catalogName, Map<String, String> properties, boolean isStreaming) {
        this.catalogName = catalogName;
        this.properties = properties;
        this.isStreaming = isStreaming;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    protected TableEnvironment getTableEnv() {
        if (this.tEnv == null) {
            TestIcebergConnector testIcebergConnector = this;
            synchronized (testIcebergConnector) {
                if (this.tEnv == null) {
                    EnvironmentSettings.Builder settingsBuilder = EnvironmentSettings.newInstance();
                    if (this.isStreaming) {
                        settingsBuilder.inStreamingMode();
                        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment((Configuration)MiniClusterResource.DISABLE_CLASSLOADER_CHECK_CONFIG);
                        env.enableCheckpointing(400L);
                        env.setMaxParallelism(2);
                        env.setParallelism(2);
                        this.tEnv = StreamTableEnvironment.create((StreamExecutionEnvironment)env, (EnvironmentSettings)settingsBuilder.build());
                    } else {
                        settingsBuilder.inBatchMode();
                        this.tEnv = TableEnvironment.create((EnvironmentSettings)settingsBuilder.build());
                    }
                    this.tEnv.getConfig().getConfiguration().set(CoreOptions.DEFAULT_PARALLELISM, (Object)1).set(FlinkConfigOptions.TABLE_EXEC_ICEBERG_INFER_SOURCE_PARALLELISM, (Object)false);
                }
            }
        }
        return this.tEnv;
    }

    @After
    public void after() throws TException {
        this.sql("DROP TABLE IF EXISTS %s", TABLE_NAME);
        if (this.isHiveCatalog()) {
            try (HiveMetaStoreClient metaStoreClient = new HiveMetaStoreClient(hiveConf);){
                metaStoreClient.dropTable(this.databaseName(), this.tableName());
                if (!this.isDefaultDatabaseName()) {
                    try {
                        metaStoreClient.dropDatabase(this.databaseName());
                    }
                    catch (Exception exception) {
                        // empty catch block
                    }
                }
            }
        }
    }

    private void testCreateConnectorTable() {
        Map<String, String> tableProps = this.createTableProps();
        this.sql("CREATE TABLE %s (id BIGINT, data STRING) WITH %s", TABLE_NAME, this.toWithClause(tableProps));
        this.sql("INSERT INTO %s VALUES (1, 'AAA'), (2, 'BBB'), (3, 'CCC')", TABLE_NAME);
        Assert.assertEquals((String)"Should have expected rows", (Object)Sets.newHashSet((Object[])new Row[]{Row.of((Object[])new Object[]{1L, "AAA"}), Row.of((Object[])new Object[]{2L, "BBB"}), Row.of((Object[])new Object[]{3L, "CCC"})}), (Object)Sets.newHashSet(this.sql("SELECT * FROM %s", TABLE_NAME)));
        FlinkCatalogFactory factory = new FlinkCatalogFactory();
        Catalog flinkCatalog = factory.createCatalog(this.catalogName, tableProps, new org.apache.hadoop.conf.Configuration());
        Assert.assertTrue((String)"Should have created the expected database", (boolean)flinkCatalog.databaseExists(this.databaseName()));
        Assert.assertTrue((String)"Should have created the expected table", (boolean)flinkCatalog.tableExists(new ObjectPath(this.databaseName(), this.tableName())));
        this.sql("DROP TABLE %s", TABLE_NAME);
        this.sql("CREATE TABLE %s (id BIGINT, data STRING) WITH %s", TABLE_NAME, this.toWithClause(tableProps));
        Assert.assertEquals((String)"Should have expected rows", (Object)Sets.newHashSet((Object[])new Row[]{Row.of((Object[])new Object[]{1L, "AAA"}), Row.of((Object[])new Object[]{2L, "BBB"}), Row.of((Object[])new Object[]{3L, "CCC"})}), (Object)Sets.newHashSet(this.sql("SELECT * FROM %s", TABLE_NAME)));
    }

    @Test
    public void testCreateTableUnderDefaultDatabase() {
        this.testCreateConnectorTable();
    }

    @Test
    public void testCatalogDatabaseConflictWithFlinkDatabase() {
        this.sql("CREATE DATABASE IF NOT EXISTS `%s`", this.databaseName());
        this.sql("USE `%s`", this.databaseName());
        try {
            this.testCreateConnectorTable();
            ((AbstractThrowableAssert)Assertions.assertThatThrownBy(() -> this.sql("CREATE TABLE `default_catalog`.`%s`.`%s`", this.databaseName(), TABLE_NAME)).isInstanceOf(TableException.class)).hasMessageStartingWith("Could not execute CreateTable in path");
        }
        catch (Throwable throwable) {
            this.sql("DROP TABLE IF EXISTS `%s`.`%s`", this.databaseName(), TABLE_NAME);
            if (!this.isDefaultDatabaseName()) {
                this.sql("DROP DATABASE `%s`", this.databaseName());
            }
            throw throwable;
        }
        this.sql("DROP TABLE IF EXISTS `%s`.`%s`", this.databaseName(), TABLE_NAME);
        if (!this.isDefaultDatabaseName()) {
            this.sql("DROP DATABASE `%s`", this.databaseName());
        }
    }

    @Test
    public void testConnectorTableInIcebergCatalog() {
        HashMap catalogProps = Maps.newHashMap();
        catalogProps.put("type", "iceberg");
        if (this.isHiveCatalog()) {
            catalogProps.put("catalog-type", "hive");
            catalogProps.put("uri", CatalogTestBase.getURI(hiveConf));
        } else {
            catalogProps.put("catalog-type", "hadoop");
        }
        catalogProps.put("warehouse", TestIcebergConnector.createWarehouse());
        Map<String, String> tableProps = this.createTableProps();
        this.sql("CREATE CATALOG `test_catalog` WITH %s", this.toWithClause(catalogProps));
        try {
            ((AbstractThrowableAssert)Assertions.assertThatThrownBy(() -> this.sql("CREATE TABLE `test_catalog`.`%s`.`%s` (id BIGINT, data STRING) WITH %s", "default", TABLE_NAME, this.toWithClause(tableProps))).cause().isInstanceOf(IllegalArgumentException.class)).hasMessage("Cannot create the table with 'connector'='iceberg' table property in an iceberg catalog, Please create table with 'connector'='iceberg' property in a non-iceberg catalog or create table without 'connector'='iceberg' related properties in an iceberg table.");
        }
        finally {
            this.sql("DROP CATALOG IF EXISTS `test_catalog`", new Object[0]);
        }
    }

    private Map<String, String> createTableProps() {
        HashMap tableProps = Maps.newHashMap(this.properties);
        tableProps.put("catalog-name", this.catalogName);
        tableProps.put("warehouse", TestIcebergConnector.createWarehouse());
        if (this.isHiveCatalog()) {
            tableProps.put("uri", CatalogTestBase.getURI(hiveConf));
        }
        return tableProps;
    }

    private boolean isHiveCatalog() {
        return "testhive".equalsIgnoreCase(this.catalogName);
    }

    private boolean isDefaultDatabaseName() {
        return "default".equalsIgnoreCase(this.databaseName());
    }

    private String tableName() {
        return this.properties.getOrDefault("catalog-table", TABLE_NAME);
    }

    private String databaseName() {
        return this.properties.getOrDefault("catalog-database", "default_database");
    }

    private String toWithClause(Map<String, String> props) {
        return CatalogTestBase.toWithClause(props);
    }

    private static String createWarehouse() {
        try {
            return String.format("file://%s", WAREHOUSE.newFolder().getAbsolutePath());
        }
        catch (IOException e) {
            throw new UncheckedIOException(e);
        }
    }
}

