/*
 * Decompiled with CFR 0.152.
 */
package org.apache.paimon.flink;

import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import org.apache.flink.table.api.ExplainDetail;
import org.apache.flink.types.Row;
import org.apache.flink.types.RowKind;
import org.apache.paimon.CoreOptions;
import org.apache.paimon.flink.CatalogITCaseBase;
import org.apache.paimon.table.FileStoreTable;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;

public class BatchFileStoreITCase
extends CatalogITCaseBase {
    @Override
    protected List<String> ddl() {
        return Collections.singletonList("CREATE TABLE IF NOT EXISTS T (a INT, b INT, c INT)");
    }

    @Test
    public void testAdaptiveParallelism() {
        this.batchSql("INSERT INTO T VALUES (1, 11, 111), (2, 22, 222)", new Object[0]);
        Assertions.assertThatThrownBy(() -> this.batchSql("INSERT INTO T SELECT a, b, c FROM T GROUP BY a,b,c", new Object[0])).hasMessageContaining("Paimon Sink does not support Flink's Adaptive Parallelism mode.");
        this.batchSql("INSERT INTO T /*+ OPTIONS('sink.parallelism'='1') */ SELECT a, b, c FROM T GROUP BY a,b,c", new Object[0]);
    }

    @Test
    public void testOverwriteEmpty() {
        this.batchSql("INSERT INTO T VALUES (1, 11, 111), (2, 22, 222)", new Object[0]);
        Assertions.assertThat(this.batchSql("SELECT * FROM T", new Object[0])).containsExactlyInAnyOrder((Object[])new Row[]{Row.of((Object[])new Object[]{1, 11, 111}), Row.of((Object[])new Object[]{2, 22, 222})});
        this.batchSql("INSERT OVERWRITE T SELECT * FROM T WHERE 1 <> 1", new Object[0]);
        Assertions.assertThat(this.batchSql("SELECT * FROM T", new Object[0])).isEmpty();
    }

    @Test
    public void testTimeTravelRead() throws Exception {
        this.batchSql("INSERT INTO T VALUES (1, 11, 111), (2, 22, 222)", new Object[0]);
        long time1 = System.currentTimeMillis();
        Thread.sleep(10L);
        this.batchSql("INSERT INTO T VALUES (3, 33, 333), (4, 44, 444)", new Object[0]);
        long time2 = System.currentTimeMillis();
        Thread.sleep(10L);
        this.batchSql("INSERT INTO T VALUES (5, 55, 555), (6, 66, 666)", new Object[0]);
        long time3 = System.currentTimeMillis();
        Thread.sleep(10L);
        this.batchSql("INSERT INTO T VALUES (7, 77, 777), (8, 88, 888)", new Object[0]);
        this.paimonTable("T").createTag("tag2", 2L);
        Assertions.assertThat(this.batchSql("SELECT * FROM T /*+ OPTIONS('scan.snapshot-id'='1') */", new Object[0])).containsExactlyInAnyOrder((Object[])new Row[]{Row.of((Object[])new Object[]{1, 11, 111}), Row.of((Object[])new Object[]{2, 22, 222})});
        Assertions.assertThat(this.batchSql("SELECT * FROM T /*+ OPTIONS('scan.mode'='from-snapshot-full','scan.snapshot-id'='1') */", new Object[0])).containsExactlyInAnyOrder((Object[])new Row[]{Row.of((Object[])new Object[]{1, 11, 111}), Row.of((Object[])new Object[]{2, 22, 222})});
        Assertions.assertThat(this.batchSql("SELECT * FROM T /*+ OPTIONS('scan.snapshot-id'='0') */", new Object[0])).isEmpty();
        Assertions.assertThat(this.batchSql("SELECT * FROM T /*+ OPTIONS('scan.mode'='from-snapshot-full','scan.snapshot-id'='0') */", new Object[0])).isEmpty();
        Assertions.assertThat(this.batchSql(String.format("SELECT * FROM T /*+ OPTIONS('scan.timestamp-millis'='%s') */", time1), new Object[0])).containsExactlyInAnyOrder((Object[])new Row[]{Row.of((Object[])new Object[]{1, 11, 111}), Row.of((Object[])new Object[]{2, 22, 222})});
        Assertions.assertThat(this.batchSql("SELECT * FROM T /*+ OPTIONS('scan.snapshot-id'='2') */", new Object[0])).containsExactlyInAnyOrder((Object[])new Row[]{Row.of((Object[])new Object[]{1, 11, 111}), Row.of((Object[])new Object[]{2, 22, 222}), Row.of((Object[])new Object[]{3, 33, 333}), Row.of((Object[])new Object[]{4, 44, 444})});
        Assertions.assertThat(this.batchSql("SELECT * FROM T /*+ OPTIONS('scan.mode'='from-snapshot-full','scan.snapshot-id'='2') */", new Object[0])).containsExactlyInAnyOrder((Object[])new Row[]{Row.of((Object[])new Object[]{1, 11, 111}), Row.of((Object[])new Object[]{2, 22, 222}), Row.of((Object[])new Object[]{3, 33, 333}), Row.of((Object[])new Object[]{4, 44, 444})});
        Assertions.assertThat(this.batchSql(String.format("SELECT * FROM T /*+ OPTIONS('scan.timestamp-millis'='%s') */", time2), new Object[0])).containsExactlyInAnyOrder((Object[])new Row[]{Row.of((Object[])new Object[]{1, 11, 111}), Row.of((Object[])new Object[]{2, 22, 222}), Row.of((Object[])new Object[]{3, 33, 333}), Row.of((Object[])new Object[]{4, 44, 444})});
        Assertions.assertThat(this.batchSql("SELECT * FROM T /*+ OPTIONS('scan.snapshot-id'='3') */", new Object[0])).containsExactlyInAnyOrder((Object[])new Row[]{Row.of((Object[])new Object[]{1, 11, 111}), Row.of((Object[])new Object[]{2, 22, 222}), Row.of((Object[])new Object[]{3, 33, 333}), Row.of((Object[])new Object[]{4, 44, 444}), Row.of((Object[])new Object[]{5, 55, 555}), Row.of((Object[])new Object[]{6, 66, 666})});
        Assertions.assertThat(this.batchSql("SELECT * FROM T /*+ OPTIONS('scan.mode'='from-snapshot-full','scan.snapshot-id'='3') */", new Object[0])).containsExactlyInAnyOrder((Object[])new Row[]{Row.of((Object[])new Object[]{1, 11, 111}), Row.of((Object[])new Object[]{2, 22, 222}), Row.of((Object[])new Object[]{3, 33, 333}), Row.of((Object[])new Object[]{4, 44, 444}), Row.of((Object[])new Object[]{5, 55, 555}), Row.of((Object[])new Object[]{6, 66, 666})});
        Assertions.assertThat(this.batchSql(String.format("SELECT * FROM T /*+ OPTIONS('scan.timestamp-millis'='%s') */", time3), new Object[0])).containsExactlyInAnyOrder((Object[])new Row[]{Row.of((Object[])new Object[]{1, 11, 111}), Row.of((Object[])new Object[]{2, 22, 222}), Row.of((Object[])new Object[]{3, 33, 333}), Row.of((Object[])new Object[]{4, 44, 444}), Row.of((Object[])new Object[]{5, 55, 555}), Row.of((Object[])new Object[]{6, 66, 666})});
        Assertions.assertThatThrownBy(() -> this.batchSql(String.format("SELECT * FROM T /*+ OPTIONS('scan.timestamp-millis'='%s', 'scan.snapshot-id'='1') */", time3), new Object[0])).hasRootCauseInstanceOf(IllegalArgumentException.class).hasRootCauseMessage("[scan.snapshot-id] must be null when you set [scan.timestamp-millis]");
        Assertions.assertThatThrownBy(() -> this.batchSql("SELECT * FROM T /*+ OPTIONS('scan.mode'='full', 'scan.snapshot-id'='1') */", new Object[0])).hasRootCauseInstanceOf(IllegalArgumentException.class).hasRootCauseMessage("%s must be null when you use latest-full for scan.mode", new Object[]{CoreOptions.SCAN_SNAPSHOT_ID.key()});
        Assertions.assertThat(this.batchSql("SELECT * FROM T /*+ OPTIONS('scan.tag-name'='tag2') */", new Object[0])).containsExactlyInAnyOrder((Object[])new Row[]{Row.of((Object[])new Object[]{1, 11, 111}), Row.of((Object[])new Object[]{2, 22, 222}), Row.of((Object[])new Object[]{3, 33, 333}), Row.of((Object[])new Object[]{4, 44, 444})});
        Assertions.assertThatThrownBy(() -> this.batchSql("SELECT * FROM T /*+ OPTIONS('scan.tag-name'='unknown') */", new Object[0])).hasRootCauseInstanceOf(IllegalArgumentException.class).hasRootCauseMessage("Tag 'unknown' doesn't exist.");
    }

    @Test
    public void testTimeTravelReadWithSnapshotExpiration() throws Exception {
        this.batchSql("INSERT INTO T VALUES (1, 11, 111), (2, 22, 222)", new Object[0]);
        this.paimonTable("T").createTag("tag1", 1L);
        this.batchSql("INSERT INTO T VALUES (3, 33, 333), (4, 44, 444)", new Object[0]);
        HashMap<String, String> expireOptions = new HashMap<String, String>();
        expireOptions.put(CoreOptions.SNAPSHOT_NUM_RETAINED_MAX.key(), "1");
        expireOptions.put(CoreOptions.SNAPSHOT_NUM_RETAINED_MIN.key(), "1");
        FileStoreTable table = (FileStoreTable)this.paimonTable("T");
        table.copy(expireOptions).store().newExpire().expire();
        Assertions.assertThat((long)table.snapshotManager().snapshotCount()).isEqualTo(1L);
        Assertions.assertThat(this.batchSql("SELECT * FROM T /*+ OPTIONS('scan.tag-name'='tag1') */", new Object[0])).containsExactlyInAnyOrder((Object[])new Row[]{Row.of((Object[])new Object[]{1, 11, 111}), Row.of((Object[])new Object[]{2, 22, 222})});
    }

    @Test
    public void testSortSpillMerge() {
        this.sql("CREATE TABLE IF NOT EXISTS KT (a INT PRIMARY KEY NOT ENFORCED, b STRING) WITH ('sort-spill-threshold'='2')", new Object[0]);
        this.sql("INSERT INTO KT VALUES (1, '1')", new Object[0]);
        this.sql("INSERT INTO KT VALUES (1, '2')", new Object[0]);
        this.sql("INSERT INTO KT VALUES (1, '3')", new Object[0]);
        this.sql("INSERT INTO KT VALUES (1, '4')", new Object[0]);
        this.sql("INSERT INTO KT VALUES (1, '5')", new Object[0]);
        this.sql("INSERT INTO KT VALUES (1, '6')", new Object[0]);
        this.sql("INSERT INTO KT VALUES (1, '7')", new Object[0]);
        Assertions.assertThat(this.sql("SELECT * FROM KT", new Object[0])).containsExactlyInAnyOrder((Object[])new Row[]{Row.of((Object[])new Object[]{1, "7"})});
        Assertions.assertThat(this.sql("SELECT b FROM KT", new Object[0])).containsExactlyInAnyOrder((Object[])new Row[]{Row.of((Object[])new Object[]{"7"})});
    }

    @Test
    public void testTruncateTable() {
        this.batchSql("INSERT INTO T VALUES (1, 11, 111), (2, 22, 222)", new Object[0]);
        Assertions.assertThat(this.batchSql("SELECT * FROM T", new Object[0])).containsExactlyInAnyOrder((Object[])new Row[]{Row.of((Object[])new Object[]{1, 11, 111}), Row.of((Object[])new Object[]{2, 22, 222})});
        List<Row> truncateResult = this.batchSql("TRUNCATE TABLE T", new Object[0]);
        Assertions.assertThat((int)truncateResult.size()).isEqualTo(1);
        Assertions.assertThat((Object)truncateResult.get(0)).isEqualTo((Object)Row.ofKind((RowKind)RowKind.INSERT, (Object[])new Object[]{"OK"}));
        Assertions.assertThat((boolean)this.batchSql("SELECT * FROM T", new Object[0]).isEmpty()).isTrue();
    }

    @Test
    public void testDynamicPartitionPruning() {
        this.sql("CREATE TABLE dim (x INT PRIMARY KEY NOT ENFORCED, y STRING, z INT)", new Object[0]);
        this.sql("INSERT INTO dim VALUES (1, 'a', 1), (2, 'b', 1), (3, 'c', 2)", new Object[0]);
        this.sql("CREATE TABLE fact (a INT, b BIGINT, c STRING, p INT, PRIMARY KEY (a, p) NOT ENFORCED) PARTITIONED BY (p)", new Object[0]);
        this.sql("INSERT INTO fact PARTITION (p = 1) VALUES (10, 100, 'aaa'), (11, 101, 'bbb'), (12, 102, 'ccc')", new Object[0]);
        this.sql("INSERT INTO fact PARTITION (p = 2) VALUES (20, 200, 'aaa'), (21, 201, 'bbb'), (22, 202, 'ccc')", new Object[0]);
        this.sql("INSERT INTO fact PARTITION (p = 3) VALUES (30, 300, 'aaa'), (31, 301, 'bbb'), (32, 302, 'ccc')", new Object[0]);
        String joinSql = "SELECT a, b, c, p, x, y FROM fact INNER JOIN dim ON x = p and z = 1 ORDER BY a";
        String joinSqlSwapFactDim = "SELECT a, b, c, p, x, y FROM dim INNER JOIN fact ON x = p and z = 1 ORDER BY a";
        String expectedResult = "[+I[10, 100, aaa, 1, 1, a], +I[11, 101, bbb, 1, 1, a], +I[12, 102, ccc, 1, 1, a], +I[20, 200, aaa, 2, 2, b], +I[21, 201, bbb, 2, 2, b], +I[22, 202, ccc, 2, 2, b]]";
        Assertions.assertThat((String)this.tEnv.explainSql(joinSql, new ExplainDetail[0])).contains(new CharSequence[]{"DynamicFilteringDataCollector"});
        Assertions.assertThat((String)this.tEnv.explainSql(joinSqlSwapFactDim, new ExplainDetail[0])).contains(new CharSequence[]{"DynamicFilteringDataCollector"});
        Assertions.assertThat((String)this.sql(joinSql, new Object[0]).toString()).isEqualTo(expectedResult);
        Assertions.assertThat((String)this.sql(joinSqlSwapFactDim, new Object[0]).toString()).isEqualTo(expectedResult);
    }

    @Test
    public void testDynamicPartitionPruningOnTwoFactTables() {
        this.sql("CREATE TABLE dim (x INT PRIMARY KEY NOT ENFORCED, y STRING, z INT)", new Object[0]);
        this.sql("INSERT INTO dim VALUES (1, 'a', 1), (2, 'b', 1), (3, 'c', 2)", new Object[0]);
        this.sql("CREATE TABLE fact1 (a INT, b BIGINT, c STRING, p INT, PRIMARY KEY (a, p) NOT ENFORCED) PARTITIONED BY (p)", new Object[0]);
        this.sql("INSERT INTO fact1 PARTITION (p = 1) VALUES (10, 100, 'aaa'), (11, 101, 'bbb'), (12, 102, 'ccc')", new Object[0]);
        this.sql("INSERT INTO fact1 PARTITION (p = 2) VALUES (20, 200, 'aaa'), (21, 201, 'bbb'), (22, 202, 'ccc')", new Object[0]);
        this.sql("INSERT INTO fact1 PARTITION (p = 3) VALUES (30, 300, 'aaa'), (31, 301, 'bbb'), (32, 302, 'ccc')", new Object[0]);
        this.sql("CREATE TABLE fact2 (a INT, b BIGINT, c STRING, p INT, PRIMARY KEY (a, p) NOT ENFORCED) PARTITIONED BY (p)", new Object[0]);
        this.sql("INSERT INTO fact2 PARTITION (p = 1) VALUES (40, 100, 'aaa'), (41, 101, 'bbb'), (42, 102, 'ccc')", new Object[0]);
        this.sql("INSERT INTO fact2 PARTITION (p = 2) VALUES (50, 200, 'aaa'), (51, 201, 'bbb'), (52, 202, 'ccc')", new Object[0]);
        this.sql("INSERT INTO fact2 PARTITION (p = 3) VALUES (60, 300, 'aaa'), (61, 301, 'bbb'), (62, 302, 'ccc')", new Object[0]);
        String joinSql = "SELECT * FROM (\nSELECT a, b, c, p, x, y FROM fact1 INNER JOIN dim ON x = p AND z = 1\nUNION ALL\nSELECT a, b, c, p, x, y FROM fact2 INNER JOIN dim ON x = p AND z = 1)\nORDER BY a";
        Assertions.assertThat((String)this.tEnv.explainSql(joinSql, new ExplainDetail[0])).containsOnlyOnce((CharSequence)"DynamicFilteringDataCollector(fields=[x])(reuse_id=");
        Assertions.assertThat((String)this.sql(joinSql, new Object[0]).toString()).isEqualTo("[+I[10, 100, aaa, 1, 1, a], +I[11, 101, bbb, 1, 1, a], +I[12, 102, ccc, 1, 1, a], +I[20, 200, aaa, 2, 2, b], +I[21, 201, bbb, 2, 2, b], +I[22, 202, ccc, 2, 2, b], +I[40, 100, aaa, 1, 1, a], +I[41, 101, bbb, 1, 1, a], +I[42, 102, ccc, 1, 1, a], +I[50, 200, aaa, 2, 2, b], +I[51, 201, bbb, 2, 2, b], +I[52, 202, ccc, 2, 2, b]]");
        joinSql = "SELECT * FROM (\nSELECT a, b, c, p, x, y FROM fact1 INNER JOIN dim ON x = p AND z = 1\nUNION ALL\nSELECT a, b, c, p, x, y FROM fact2 INNER JOIN dim ON x = p AND z = 2)\nORDER BY a";
        String expected2 = "[+I[10, 100, aaa, 1, 1, a], +I[11, 101, bbb, 1, 1, a], +I[12, 102, ccc, 1, 1, a], +I[20, 200, aaa, 2, 2, b], +I[21, 201, bbb, 2, 2, b], +I[22, 202, ccc, 2, 2, b], +I[60, 300, aaa, 3, 3, c], +I[61, 301, bbb, 3, 3, c], +I[62, 302, ccc, 3, 3, c]]";
        Assertions.assertThat((String)this.tEnv.explainSql(joinSql, new ExplainDetail[0])).contains(new CharSequence[]{"DynamicFilteringDataCollector"});
        Assertions.assertThat((String)this.sql(joinSql, new Object[0]).toString()).isEqualTo(expected2);
    }
}

