/*
 * Decompiled with CFR 0.152.
 */
package org.apache.paimon.flink;

import java.io.File;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;
import org.apache.flink.util.CloseableIterator;
import org.apache.paimon.catalog.AbstractCatalog;
import org.apache.paimon.catalog.Catalog;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.flink.FlinkCatalog;
import org.apache.paimon.flink.util.AbstractTestBase;
import org.apache.paimon.fs.Path;
import org.apache.paimon.utils.BlockingIterator;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

public class FileSystemCatalogITCase
extends AbstractTestBase {
    private static final String DB_NAME = "default";
    private String path;
    private StreamTableEnvironment tEnv;

    @BeforeEach
    public void setup() {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.getConfig().setRestartStrategy(RestartStrategies.noRestart());
        env.setParallelism(1);
        this.tEnv = StreamTableEnvironment.create((StreamExecutionEnvironment)env);
        this.tEnv.getConfig().getConfiguration().set(ExecutionCheckpointingOptions.ENABLE_UNALIGNED, (Object)false);
        this.path = this.getTempDirPath();
        this.tEnv.executeSql(String.format("CREATE CATALOG fs WITH ('type'='paimon', 'warehouse'='%s')", this.path));
    }

    @Test
    public void testWriteRead() throws Exception {
        this.tEnv.useCatalog("fs");
        this.tEnv.executeSql("CREATE TABLE T (a STRING, b STRING, c STRING)");
        this.innerTestWriteRead();
    }

    @Test
    public void testRenameTable() throws Exception {
        this.tEnv.useCatalog("fs");
        this.tEnv.executeSql("CREATE TABLE t1 (a INT)").await();
        this.tEnv.executeSql("CREATE TABLE t2 (a INT)").await();
        this.tEnv.executeSql("INSERT INTO t1 VALUES(1),(2)").await();
        Assertions.assertThatThrownBy(() -> this.tEnv.executeSql("ALTER TABLE t3 RENAME TO t4")).hasMessage("Table `fs`.`default`.`t3` doesn't exist or is a temporary table.");
        Assertions.assertThatThrownBy(() -> this.tEnv.executeSql("ALTER TABLE t1 RENAME TO t2")).hasMessage("Could not execute ALTER TABLE fs.default.t1 RENAME TO fs.default.t2");
        this.tEnv.executeSql("ALTER TABLE t1 RENAME TO t3").await();
        Assertions.assertThat(this.collect("SHOW TABLES")).containsExactlyInAnyOrder((Object[])new Row[]{Row.of((Object[])new Object[]{"t2"}), Row.of((Object[])new Object[]{"t3"})});
        Identifier identifier = new Identifier(DB_NAME, "t3");
        Catalog catalog = ((FlinkCatalog)this.tEnv.getCatalog(this.tEnv.getCurrentCatalog()).get()).catalog();
        Path tablePath = ((AbstractCatalog)catalog).getDataTableLocation(identifier);
        Assertions.assertThat((String)tablePath.toString()).isEqualTo(new File(this.path, "default.db" + File.separator + "t3").toString());
        BlockingIterator iterator = BlockingIterator.of((Iterator)this.tEnv.from("t3").execute().collect());
        List result = iterator.collectAndClose(2);
        Assertions.assertThat((List)result).containsExactlyInAnyOrder((Object[])new Row[]{Row.of((Object[])new Object[]{1}), Row.of((Object[])new Object[]{2})});
    }

    @Test
    public void testCatalogOptionsInheritAndOverride() throws Exception {
        this.tEnv.executeSql(String.format("CREATE CATALOG fs_with_options WITH ('type'='paimon', 'warehouse'='%s', 'table-default.opt1'='value1', 'table-default.opt2'='value2', 'table-default.opt3'='value3', 'fs.allow-hadoop-fallback'='false','lock.enabled'='true')", this.path));
        this.tEnv.useCatalog("fs_with_options");
        this.tEnv.executeSql("CREATE TABLE t1_options (a STRING, b STRING, c STRING)");
        Identifier identifier = new Identifier(DB_NAME, "t1_options");
        Catalog catalog = ((FlinkCatalog)this.tEnv.getCatalog(this.tEnv.getCurrentCatalog()).get()).catalog();
        Map tableOptions = catalog.getTable(identifier).options();
        Assertions.assertThat((Map)tableOptions).containsEntry((Object)"opt1", (Object)"value1");
        Assertions.assertThat((Map)tableOptions).containsEntry((Object)"opt2", (Object)"value2");
        Assertions.assertThat((Map)tableOptions).containsEntry((Object)"opt3", (Object)"value3");
        Assertions.assertThat((Map)tableOptions).doesNotContainKey((Object)"fs.allow-hadoop-fallback");
        Assertions.assertThat((Map)tableOptions).doesNotContainKey((Object)"lock.enabled");
        this.tEnv.executeSql("CREATE TABLE t2_options (a STRING, b STRING, c STRING) WITH ('opt3'='value4')");
        identifier = new Identifier(DB_NAME, "t2_options");
        tableOptions = catalog.getTable(identifier).options();
        Assertions.assertThat((Map)tableOptions).containsEntry((Object)"opt1", (Object)"value1");
        Assertions.assertThat((Map)tableOptions).containsEntry((Object)"opt2", (Object)"value2");
        Assertions.assertThat((Map)tableOptions).containsEntry((Object)"opt3", (Object)"value4");
        Assertions.assertThat((Map)tableOptions).doesNotContainKey((Object)"fs.allow-hadoop-fallback");
        Assertions.assertThat((Map)tableOptions).doesNotContainKey((Object)"lock.enabled");
    }

    private void innerTestWriteRead() throws Exception {
        this.tEnv.executeSql("INSERT INTO T VALUES ('1', '2', '3'), ('4', '5', '6')").await();
        BlockingIterator iterator = BlockingIterator.of((Iterator)this.tEnv.from("T").execute().collect());
        List result = iterator.collectAndClose(2);
        Assertions.assertThat((List)result).containsExactlyInAnyOrder((Object[])new Row[]{Row.of((Object[])new Object[]{"1", "2", "3"}), Row.of((Object[])new Object[]{"4", "5", "6"})});
    }

    private List<Row> collect(String sql) throws Exception {
        ArrayList<Row> result = new ArrayList<Row>();
        try (CloseableIterator it = this.tEnv.executeSql(sql).collect();){
            while (it.hasNext()) {
                result.add((Row)it.next());
            }
        }
        return result;
    }
}

