/*
 * Decompiled with CFR 0.152.
 */
package org.apache.paimon.flink;

import java.math.BigDecimal;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.util.Collections;
import java.util.List;
import org.apache.flink.types.Row;
import org.apache.flink.types.RowKind;
import org.apache.flink.util.CloseableIterator;
import org.apache.paimon.flink.CatalogITCaseBase;
import org.apache.paimon.utils.BlockingIterator;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;

public class PreAggregationITCase {

    public static class BasicAggregateITCase
    extends CatalogITCaseBase {
        @Test
        public void testLocalMerge() {
            this.sql("CREATE TABLE T (k INT,v INT,d INT,PRIMARY KEY (k, d) NOT ENFORCED) PARTITIONED BY (d)  WITH ('merge-engine'='aggregation', 'fields.v.aggregate-function'='sum','local-merge-buffer-size'='1m');", new Object[0]);
            this.sql("INSERT INTO T VALUES(1, 1, 1), (2, 1, 1), (1, 2, 1)", new Object[0]);
            Assertions.assertThat(this.batchSql("SELECT * FROM T", new Object[0])).containsExactlyInAnyOrder((Object[])new Row[]{Row.of((Object[])new Object[]{1, 3, 1}), Row.of((Object[])new Object[]{2, 1, 1})});
        }
    }

    public static class FirstValueAggregation
    extends CatalogITCaseBase {
        @Override
        protected List<String> ddl() {
            return Collections.singletonList("CREATE TABLE T (k INT,a INT,b VARCHAR,c VARCHAR,PRIMARY KEY (k) NOT ENFORCED) WITH ('merge-engine'='aggregation', 'changelog-producer' = 'full-compaction','fields.b.aggregate-function'='first_value','fields.c.aggregate-function'='first_not_null_value','sequence.field'='a');");
        }

        @Test
        public void tesInMemoryMerge() {
            this.batchSql("INSERT INTO T VALUES (1, 0, CAST(NULL AS VARCHAR), CAST(NULL AS VARCHAR)),(1, 1, '1', '1'), (2, 2, '2', '2'),(2, 3, '22', '22')", new Object[0]);
            List<Row> result = this.batchSql("SELECT * FROM T", new Object[0]);
            Assertions.assertThat(result).containsExactlyInAnyOrder((Object[])new Row[]{Row.of((Object[])new Object[]{1, 1, null, "1"}), Row.of((Object[])new Object[]{2, 3, "2", "2"})});
        }

        @Test
        public void tesUnOrderInput() {
            this.batchSql("INSERT INTO T VALUES (1, 0, CAST(NULL AS VARCHAR), CAST(NULL AS VARCHAR)),(1, 1, '1', '1'), (2, 3, '2', '2'),(2, 2, '22', '22')", new Object[0]);
            List<Row> result = this.batchSql("SELECT * FROM T", new Object[0]);
            Assertions.assertThat(result).containsExactlyInAnyOrder((Object[])new Row[]{Row.of((Object[])new Object[]{1, 1, null, "1"}), Row.of((Object[])new Object[]{2, 3, "22", "22"})});
            this.batchSql("INSERT INTO T VALUES (2, 1, '1', '1')", new Object[0]);
            result = this.batchSql("SELECT * FROM T", new Object[0]);
            Assertions.assertThat(result).containsExactlyInAnyOrder((Object[])new Row[]{Row.of((Object[])new Object[]{1, 1, null, "1"}), Row.of((Object[])new Object[]{2, 3, "1", "1"})});
        }

        @Test
        public void testMergeRead() {
            this.batchSql("INSERT INTO T VALUES (1, 1, CAST(NULL AS VARCHAR), CAST(NULL AS VARCHAR))", new Object[0]);
            this.batchSql("INSERT INTO T VALUES (1, 2, '1', '1')", new Object[0]);
            this.batchSql("INSERT INTO T VALUES (2, 1, '2', '2')", new Object[0]);
            this.batchSql("INSERT INTO T VALUES (2, 2, '22', '22')", new Object[0]);
            List<Row> result = this.batchSql("SELECT * FROM T", new Object[0]);
            Assertions.assertThat(result).containsExactlyInAnyOrder((Object[])new Row[]{Row.of((Object[])new Object[]{1, 2, null, "1"}), Row.of((Object[])new Object[]{2, 2, "2", "2"})});
        }
    }

    public static class SumRetractionAggregation
    extends CatalogITCaseBase {
        @Override
        protected List<String> ddl() {
            return Collections.singletonList("CREATE TABLE T (k INT,b Decimal(12, 2),PRIMARY KEY (k) NOT ENFORCED) WITH ('merge-engine'='aggregation', 'changelog-producer' = 'full-compaction','fields.b.aggregate-function'='sum');");
        }

        @Test
        public void testRetraction() throws Exception {
            this.sql("CREATE TABLE INPUT (k INT,b INT,PRIMARY KEY (k) NOT ENFORCED);", new Object[0]);
            CloseableIterator<Row> insert = this.streamSqlIter("INSERT INTO T SELECT k, SUM(b) FROM INPUT GROUP BY k;", new Object[0]);
            BlockingIterator<Row, Row> select = this.streamSqlBlockIter("SELECT * FROM T", new Object[0]);
            this.sql("INSERT INTO INPUT VALUES (1, 1), (2, 2)", new Object[0]);
            Assertions.assertThat((List)select.collect(2)).containsExactlyInAnyOrder((Object[])new Row[]{Row.of((Object[])new Object[]{1, BigDecimal.valueOf(100L, 2)}), Row.of((Object[])new Object[]{2, BigDecimal.valueOf(200L, 2)})});
            this.sql("INSERT INTO INPUT VALUES (1, 3), (2, 4)", new Object[0]);
            Assertions.assertThat((List)select.collect(4)).containsExactlyInAnyOrder((Object[])new Row[]{Row.ofKind((RowKind)RowKind.UPDATE_BEFORE, (Object[])new Object[]{1, BigDecimal.valueOf(100L, 2)}), Row.ofKind((RowKind)RowKind.UPDATE_AFTER, (Object[])new Object[]{1, BigDecimal.valueOf(300L, 2)}), Row.ofKind((RowKind)RowKind.UPDATE_BEFORE, (Object[])new Object[]{2, BigDecimal.valueOf(200L, 2)}), Row.ofKind((RowKind)RowKind.UPDATE_AFTER, (Object[])new Object[]{2, BigDecimal.valueOf(400L, 2)})});
            select.close();
            insert.close();
        }
    }

    public static class SumAggregation
    extends CatalogITCaseBase {
        @Override
        protected List<String> ddl() {
            return Collections.singletonList("CREATE TABLE IF NOT EXISTS T1 (j INT, k INT, a INT, b Decimal(4,2), c TINYINT,d SMALLINT,e BIGINT,f FLOAT,h DOUBLE,PRIMARY KEY (j,k) NOT ENFORCED) WITH ('merge-engine'='aggregation', 'fields.a.aggregate-function'='sum', 'fields.b.aggregate-function'='sum', 'fields.c.aggregate-function'='sum', 'fields.d.aggregate-function'='sum', 'fields.e.aggregate-function'='sum', 'fields.f.aggregate-function'='sum','fields.h.aggregate-function'='sum');");
        }

        @Test
        public void testMergeInMemory() {
            this.batchSql("INSERT INTO T1 VALUES (1, 2, CAST(NULL AS INT), 1.01, CAST(1 AS TINYINT), CAST(-1 AS SMALLINT), CAST(1000 AS BIGINT), 1.11, CAST(1.11 AS DOUBLE)),(1, 2, 2, 1.10, CAST(2 AS TINYINT), CAST(2 AS SMALLINT), CAST(100000 AS BIGINT), -1.11, CAST(1.11 AS DOUBLE)), (1, 2, 3, 10.00, CAST(1 AS TINYINT), CAST(1 AS SMALLINT), CAST(10000000 AS BIGINT), 0, CAST(-1.11 AS DOUBLE))", new Object[0]);
            Assertions.assertThat(this.batchSql("SELECT * FROM T1", new Object[0])).containsExactlyInAnyOrder((Object[])new Row[]{Row.of((Object[])new Object[]{1, 2, 5, new BigDecimal("12.11"), (byte)4, (short)2, 10101000L, Float.valueOf(0.0f), 1.11})});
            Assertions.assertThat(this.batchSql("SELECT f,e FROM T1", new Object[0])).containsExactlyInAnyOrder((Object[])new Row[]{Row.of((Object[])new Object[]{Float.valueOf(0.0f), 10101000L})});
        }

        @Test
        public void testMergeRead() {
            this.batchSql("INSERT INTO T1 VALUES (1, 2, 1, 1.01, CAST(1 AS TINYINT), CAST(-1 AS SMALLINT), CAST(1000 AS BIGINT), 1.11, CAST(1.11 AS DOUBLE))", new Object[0]);
            this.batchSql("INSERT INTO T1 VALUES (1, 2, 2, 1.10, CAST(2 AS TINYINT), CAST(2 AS SMALLINT), CAST(100000 AS BIGINT), CAST(NULL AS FLOAT), CAST(1.11 AS DOUBLE))", new Object[0]);
            this.batchSql("INSERT INTO T1 VALUES (1, 2, 3, 10.00, CAST(1 AS TINYINT), CAST(1 AS SMALLINT), CAST(10000000 AS BIGINT), -1.11, CAST(-1.11 AS DOUBLE))", new Object[0]);
            List<Row> result = this.batchSql("SELECT * FROM T1", new Object[0]);
            Assertions.assertThat(result).containsExactlyInAnyOrder((Object[])new Row[]{Row.of((Object[])new Object[]{1, 2, 6, new BigDecimal("12.11"), (byte)4, (short)2, 10101000L, Float.valueOf(0.0f), 1.11})});
        }

        @Test
        public void testMergeCompaction() {
            this.batchSql("ALTER TABLE T1 SET ('commit.force-compact'='true')", new Object[0]);
            this.batchSql("INSERT INTO T1 VALUES (1, 2, 1, 1.01, CAST(1 AS TINYINT), CAST(-1 AS SMALLINT), CAST(1000 AS BIGINT), 1.11, CAST(1.11 AS DOUBLE))", new Object[0]);
            this.batchSql("INSERT INTO T1 VALUES (1, 2, 2, 1.10, CAST(2 AS TINYINT), CAST(2 AS SMALLINT), CAST(100000 AS BIGINT), CAST(NULL AS FLOAT), CAST(1.11 AS DOUBLE))", new Object[0]);
            this.batchSql("INSERT INTO T1 VALUES (1, 2, 3, 10.00, CAST(1 AS TINYINT), CAST(1 AS SMALLINT), CAST(10000000 AS BIGINT), -1.11, CAST(-1.11 AS DOUBLE))", new Object[0]);
            this.batchSql("INSERT INTO T1 VALUES (1, 3, 2, 1.01, CAST(1 AS TINYINT), CAST(-1 AS SMALLINT), CAST(1000 AS BIGINT), 1.11, CAST(1.11 AS DOUBLE))", new Object[0]);
            this.batchSql("INSERT INTO T1 VALUES (1, 3, 2, 1.10, CAST(2 AS TINYINT), CAST(2 AS SMALLINT), CAST(100000 AS BIGINT), CAST(NULL AS FLOAT), CAST(1.11 AS DOUBLE))", new Object[0]);
            this.batchSql("INSERT INTO T1 VALUES (1, 3, 3, 10.00, CAST(1 AS TINYINT), CAST(1 AS SMALLINT), CAST(10000000 AS BIGINT), -1.11, CAST(-1.11 AS DOUBLE))", new Object[0]);
            Assertions.assertThat(this.batchSql("SELECT * FROM T1", new Object[0])).containsExactlyInAnyOrder((Object[])new Row[]{Row.of((Object[])new Object[]{1, 2, 6, new BigDecimal("12.11"), (byte)4, (short)2, 10101000L, Float.valueOf(0.0f), 1.11}), Row.of((Object[])new Object[]{1, 3, 7, new BigDecimal("12.11"), (byte)4, (short)2, 10101000L, Float.valueOf(0.0f), 1.11})});
        }

        @Test
        public void testStreamingRead() {
            Assertions.assertThatThrownBy(() -> this.sEnv.from("T1").execute().print(), (String)"Pre-aggregate continuous reading is not supported", (Object[])new Object[0]);
        }
    }

    public static class MaxAggregation
    extends CatalogITCaseBase {
        @Override
        protected List<String> ddl() {
            return Collections.singletonList("CREATE TABLE IF NOT EXISTS T2 (j INT, k INT, a INT, b Decimal(4,2), c TINYINT,d SMALLINT,e BIGINT,f FLOAT,h DOUBLE,i DATE,l TIMESTAMP,m CHAR,n VARCHAR,PRIMARY KEY (j,k) NOT ENFORCED) WITH ('merge-engine'='aggregation', 'fields.a.aggregate-function'='max', 'fields.b.aggregate-function'='max', 'fields.c.aggregate-function'='max', 'fields.d.aggregate-function'='max', 'fields.e.aggregate-function'='max', 'fields.f.aggregate-function'='max','fields.h.aggregate-function'='max','fields.i.aggregate-function'='max','fields.l.aggregate-function'='max','fields.m.aggregate-function'='max','fields.n.aggregate-function'='max');");
        }

        @Test
        public void testMergeInMemory() {
            this.batchSql("INSERT INTO T2 VALUES (1, 2, CAST(NULL AS INT), 1.01, CAST(1 AS TINYINT), CAST(-1 AS SMALLINT), CAST(1000 AS BIGINT), 1.11, CAST(1.11 AS DOUBLE), CAST('2020-01-01' AS DATE), CAST('2021-01-01 01:01:01' AS TIMESTAMP), 'a', 'aaa'),(1, 2, 2, 1.10, CAST(2 AS TINYINT), CAST(2 AS SMALLINT), CAST(100000 AS BIGINT), -1.11, CAST(1.21 AS DOUBLE), CAST('2020-01-02' AS DATE), CAST('2022-01-01 01:01:01' AS TIMESTAMP), 'b', 'bbb'), (1, 2, 3, 10.00, CAST(1 AS TINYINT), CAST(1 AS SMALLINT), CAST(10000000 AS BIGINT), 0, CAST(-1.11 AS DOUBLE), CAST('2022-01-02' AS DATE), CAST('2022-01-01 02:00:00' AS TIMESTAMP), 'c', 'ccc')", new Object[0]);
            List<Row> result = this.batchSql("SELECT * FROM T2", new Object[0]);
            Assertions.assertThat(result).containsExactlyInAnyOrder((Object[])new Row[]{Row.of((Object[])new Object[]{1, 2, 3, new BigDecimal("10.00"), (byte)2, (short)2, 10000000L, Float.valueOf(1.11f), 1.21, LocalDate.of(2022, 1, 2), LocalDateTime.of(2022, 1, 1, 2, 0, 0), "c", "ccc"})});
        }

        @Test
        public void testMergeRead() {
            this.batchSql("INSERT INTO T2 VALUES (1, 2, CAST(NULL AS INT), 1.01, CAST(1 AS TINYINT), CAST(-1 AS SMALLINT), CAST(1000 AS BIGINT), 1.11, CAST(1.11 AS DOUBLE), CAST('2020-01-01' AS DATE), CAST('2021-01-01 01:01:01' AS TIMESTAMP), 'a', 'aaa')", new Object[0]);
            this.batchSql("INSERT INTO T2 VALUES (1, 2, 2, 1.10, CAST(2 AS TINYINT), CAST(2 AS SMALLINT), CAST(100000 AS BIGINT), -1.11, CAST(1.21 AS DOUBLE), CAST('2020-01-02' AS DATE), CAST('2022-01-01 01:01:01' AS TIMESTAMP), 'b', 'bbb')", new Object[0]);
            this.batchSql("INSERT INTO T2 VALUES (1, 2, 3, 10.00, CAST(1 AS TINYINT), CAST(1 AS SMALLINT), CAST(10000000 AS BIGINT), 0, CAST(-1.11 AS DOUBLE), CAST('2022-01-02' AS DATE), CAST('2022-01-01 02:00:00' AS TIMESTAMP), 'c', 'ccc')", new Object[0]);
            List<Row> result = this.batchSql("SELECT * FROM T2", new Object[0]);
            Assertions.assertThat(result).containsExactlyInAnyOrder((Object[])new Row[]{Row.of((Object[])new Object[]{1, 2, 3, new BigDecimal("10.00"), (byte)2, (short)2, 10000000L, Float.valueOf(1.11f), 1.21, LocalDate.of(2022, 1, 2), LocalDateTime.of(2022, 1, 1, 2, 0, 0), "c", "ccc"})});
        }

        @Test
        public void testMergeCompaction() {
            this.batchSql("ALTER TABLE T2 SET ('commit.force-compact'='true')", new Object[0]);
            this.batchSql("INSERT INTO T2 VALUES (1, 2, CAST(NULL AS INT), 1.01, CAST(1 AS TINYINT), CAST(-1 AS SMALLINT), CAST(1000 AS BIGINT), 1.11, CAST(1.11 AS DOUBLE), CAST('2020-01-01' AS DATE), CAST('2021-01-01 01:01:01' AS TIMESTAMP), 'a', 'aaa')", new Object[0]);
            this.batchSql("INSERT INTO T2 VALUES (1, 2, 2, 1.10, CAST(2 AS TINYINT), CAST(2 AS SMALLINT), CAST(100000 AS BIGINT), -1.11, CAST(1.21 AS DOUBLE), CAST('2020-01-02' AS DATE), CAST('2022-01-01 01:01:01' AS TIMESTAMP), 'c', 'ccc')", new Object[0]);
            this.batchSql("INSERT INTO T2 VALUES (1, 2, 3, 10.00, CAST(1 AS TINYINT), CAST(1 AS SMALLINT), CAST(10000000 AS BIGINT), 0, CAST(-1.11 AS DOUBLE), CAST('2022-01-02' AS DATE), CAST('2022-01-01 02:00:00' AS TIMESTAMP), 'b', 'bbb')", new Object[0]);
            this.batchSql("INSERT INTO T2 VALUES (1, 3, CAST(NULL AS INT), 1.01, CAST(1 AS TINYINT), CAST(-1 AS SMALLINT), CAST(1000 AS BIGINT), 1.11, CAST(1.11 AS DOUBLE), CAST('2020-01-01' AS DATE), CAST('2021-01-01 01:01:01' AS TIMESTAMP), 'a', 'aaa')", new Object[0]);
            this.batchSql("INSERT INTO T2 VALUES (1, 3, 6, 1.10, CAST(2 AS TINYINT), CAST(2 AS SMALLINT), CAST(100000 AS BIGINT), -1.11, CAST(1.21 AS DOUBLE), CAST('2020-01-02' AS DATE), CAST('2022-01-01 01:01:01' AS TIMESTAMP), 'c', 'ccc')", new Object[0]);
            this.batchSql("INSERT INTO T2 VALUES (1, 3, 3, 10.00, CAST(1 AS TINYINT), CAST(1 AS SMALLINT), CAST(10000000 AS BIGINT), 0, CAST(-1.11 AS DOUBLE), CAST('2022-01-02' AS DATE), CAST('2022-01-01 02:00:00' AS TIMESTAMP), 'b', 'bbb')", new Object[0]);
            Assertions.assertThat(this.batchSql("SELECT * FROM T2", new Object[0])).containsExactlyInAnyOrder((Object[])new Row[]{Row.of((Object[])new Object[]{1, 2, 3, new BigDecimal("10.00"), (byte)2, (short)2, 10000000L, Float.valueOf(1.11f), 1.21, LocalDate.of(2022, 1, 2), LocalDateTime.of(2022, 1, 1, 2, 0, 0), "c", "ccc"}), Row.of((Object[])new Object[]{1, 3, 6, new BigDecimal("10.00"), (byte)2, (short)2, 10000000L, Float.valueOf(1.11f), 1.21, LocalDate.of(2022, 1, 2), LocalDateTime.of(2022, 1, 1, 2, 0, 0), "c", "ccc"})});
        }

        @Test
        public void testStreamingRead() {
            Assertions.assertThatThrownBy(() -> this.sEnv.from("T2").execute().print(), (String)"Pre-aggregate continuous reading is not supported", (Object[])new Object[0]);
        }
    }

    public static class MinAggregation
    extends CatalogITCaseBase {
        @Override
        protected List<String> ddl() {
            return Collections.singletonList("CREATE TABLE IF NOT EXISTS T3 (j INT, k INT, a INT, b Decimal(4,2), c TINYINT,d SMALLINT,e BIGINT,f FLOAT,h DOUBLE,i DATE,l TIMESTAMP,m CHAR(1),n VARCHAR,PRIMARY KEY (j,k) NOT ENFORCED) WITH ('merge-engine'='aggregation', 'fields.a.aggregate-function'='min', 'fields.b.aggregate-function'='min', 'fields.c.aggregate-function'='min', 'fields.d.aggregate-function'='min', 'fields.e.aggregate-function'='min', 'fields.f.aggregate-function'='min','fields.h.aggregate-function'='min','fields.i.aggregate-function'='min','fields.l.aggregate-function'='min','fields.m.aggregate-function'='min','fields.n.aggregate-function'='min');");
        }

        @Test
        public void testMergeInMemory() {
            this.batchSql("INSERT INTO T3 VALUES (1, 2, CAST(NULL AS INT), 1.01, CAST(-1 AS TINYINT), CAST(-1 AS SMALLINT), CAST(1000 AS BIGINT), 1.11, CAST(1.11 AS DOUBLE), CAST('2020-01-01' AS DATE), CAST('2021-01-01 01:01:01' AS TIMESTAMP), 'a', 'aaa'),(1, 2, 2, 1.10, CAST(2 AS TINYINT), CAST(2 AS SMALLINT), CAST(100000 AS BIGINT), -1.11, CAST(1.21 AS DOUBLE), CAST('2020-01-02' AS DATE), CAST('2022-01-01 01:01:01' AS TIMESTAMP), 'b', 'bbb'), (1, 2, 3, 10.00, CAST(1 AS TINYINT), CAST(1 AS SMALLINT), CAST(10000000 AS BIGINT), 0, CAST(-1.11 AS DOUBLE), CAST('2022-01-02' AS DATE), CAST('2022-01-01 02:00:00' AS TIMESTAMP), 'c', 'ccc')", new Object[0]);
            List<Row> result = this.batchSql("SELECT * FROM T3", new Object[0]);
            Assertions.assertThat(result).containsExactlyInAnyOrder((Object[])new Row[]{Row.of((Object[])new Object[]{1, 2, 2, new BigDecimal("1.01"), (byte)-1, (short)-1, 1000L, Float.valueOf(-1.11f), -1.11, LocalDate.of(2020, 1, 1), LocalDateTime.of(2021, 1, 1, 1, 1, 1), "a", "aaa"})});
        }

        @Test
        public void testMergeRead() {
            this.batchSql("INSERT INTO T3 VALUES (1, 2, CAST(NULL AS INT), 1.01, CAST(1 AS TINYINT), CAST(-1 AS SMALLINT), CAST(1000 AS BIGINT), 1.11, CAST(1.11 AS DOUBLE), CAST('2020-01-01' AS DATE), CAST('2021-01-01 01:01:01' AS TIMESTAMP), 'a', 'aaa')", new Object[0]);
            this.batchSql("INSERT INTO T3 VALUES (1, 2, 2, 1.10, CAST(2 AS TINYINT), CAST(2 AS SMALLINT), CAST(100000 AS BIGINT), -1.11, CAST(1.21 AS DOUBLE), CAST('2020-01-02' AS DATE), CAST('2022-01-01 01:01:01' AS TIMESTAMP), 'b', 'bbb')", new Object[0]);
            this.batchSql("INSERT INTO T3 VALUES (1, 2, 3, 10.00, CAST(-1 AS TINYINT), CAST(1 AS SMALLINT), CAST(10000000 AS BIGINT), 0, CAST(-1.11 AS DOUBLE), CAST('2022-01-02' AS DATE), CAST('2022-01-01 02:00:00' AS TIMESTAMP), 'c', 'ccc')", new Object[0]);
            List<Row> result = this.batchSql("SELECT * FROM T3", new Object[0]);
            Assertions.assertThat(result).containsExactlyInAnyOrder((Object[])new Row[]{Row.of((Object[])new Object[]{1, 2, 2, new BigDecimal("1.01"), (byte)-1, (short)-1, 1000L, Float.valueOf(-1.11f), -1.11, LocalDate.of(2020, 1, 1), LocalDateTime.of(2021, 1, 1, 1, 1, 1), "a", "aaa"})});
        }

        @Test
        public void testMergeCompaction() {
            this.batchSql("ALTER TABLE T3 SET ('commit.force-compact'='true')", new Object[0]);
            this.batchSql("INSERT INTO T3 VALUES (1, 2, CAST(NULL AS INT), 1.01, CAST(1 AS TINYINT), CAST(-1 AS SMALLINT), CAST(1000 AS BIGINT), 1.11, CAST(1.11 AS DOUBLE), CAST('2020-01-01' AS DATE), CAST('2021-01-01 01:01:01' AS TIMESTAMP), 'a', 'aaa')", new Object[0]);
            this.batchSql("INSERT INTO T3 VALUES (1, 2, 2, 1.10, CAST(2 AS TINYINT), CAST(2 AS SMALLINT), CAST(100000 AS BIGINT), -1.11, CAST(1.21 AS DOUBLE), CAST('2020-01-02' AS DATE), CAST('2022-01-01 01:01:01' AS TIMESTAMP), 'b', 'bbb')", new Object[0]);
            this.batchSql("INSERT INTO T3 VALUES (1, 2, 3, 10.00, CAST(-1 AS TINYINT), CAST(1 AS SMALLINT), CAST(10000000 AS BIGINT), 0, CAST(-1.11 AS DOUBLE), CAST('2022-01-02' AS DATE), CAST('2022-01-01 02:00:00' AS TIMESTAMP), 'c', 'ccc')", new Object[0]);
            this.batchSql("INSERT INTO T3 VALUES (1, 3, CAST(NULL AS INT), 1.01, CAST(1 AS TINYINT), CAST(-1 AS SMALLINT), CAST(1000 AS BIGINT), 1.11, CAST(1.11 AS DOUBLE), CAST('2020-01-01' AS DATE), CAST('2021-01-01 01:01:01' AS TIMESTAMP), 'a', 'aaa')", new Object[0]);
            this.batchSql("INSERT INTO T3 VALUES (1, 3, 6, 1.10, CAST(2 AS TINYINT), CAST(2 AS SMALLINT), CAST(100000 AS BIGINT), -1.11, CAST(1.21 AS DOUBLE), CAST('2020-01-02' AS DATE), CAST('2022-01-01 01:01:01' AS TIMESTAMP), 'b', 'bbb')", new Object[0]);
            this.batchSql("INSERT INTO T3 VALUES (1, 3, 3, 10.00, CAST(-1 AS TINYINT), CAST(1 AS SMALLINT), CAST(10000000 AS BIGINT), 0, CAST(-1.11 AS DOUBLE), CAST('2022-01-02' AS DATE), CAST('2022-01-01 02:00:00' AS TIMESTAMP), 'c', 'ccc')", new Object[0]);
            Assertions.assertThat(this.batchSql("SELECT * FROM T3", new Object[0])).containsExactlyInAnyOrder((Object[])new Row[]{Row.of((Object[])new Object[]{1, 2, 2, new BigDecimal("1.01"), (byte)-1, (short)-1, 1000L, Float.valueOf(-1.11f), -1.11, LocalDate.of(2020, 1, 1), LocalDateTime.of(2021, 1, 1, 1, 1, 1), "a", "aaa"}), Row.of((Object[])new Object[]{1, 3, 3, new BigDecimal("1.01"), (byte)-1, (short)-1, 1000L, Float.valueOf(-1.11f), -1.11, LocalDate.of(2020, 1, 1), LocalDateTime.of(2021, 1, 1, 1, 1, 1), "a", "aaa"})});
        }

        @Test
        public void testStreamingRead() {
            Assertions.assertThatThrownBy(() -> this.sEnv.from("T3").execute().print(), (String)"Pre-aggregate continuous reading is not supported", (Object[])new Object[0]);
        }
    }

    public static class LastNonNullValueAggregation
    extends CatalogITCaseBase {
        @Override
        protected int defaultParallelism() {
            return 1;
        }

        @Override
        protected List<String> ddl() {
            return Collections.singletonList("CREATE TABLE IF NOT EXISTS T4 (j INT, k INT, a INT, b INT, i DATE,PRIMARY KEY (j,k) NOT ENFORCED) WITH ('merge-engine'='aggregation', 'fields.a.aggregate-function'='last_non_null_value', 'fields.i.aggregate-function'='last_non_null_value');");
        }

        @Test
        public void testMergeInMemory() {
            this.batchSql("CREATE TABLE myTable AS SELECT b, c, d, e, f FROM (VALUES   (1, 1, 2, CAST(NULL AS INT), 4, CAST('2020-01-01' AS DATE)),  (2, 1, 2, 2, CAST(NULL as INT), CAST('2020-01-02' AS DATE)),  (3, 1, 2, 3, 5, CAST(NULL AS DATE))) AS V(a, b, c, d, e, f) ORDER BY a", new Object[0]);
            this.batchSql("INSERT INTO T4 SELECT * FROM myTable", new Object[0]);
            List<Row> result = this.batchSql("SELECT * FROM T4", new Object[0]);
            Assertions.assertThat(result).containsExactlyInAnyOrder((Object[])new Row[]{Row.of((Object[])new Object[]{1, 2, 3, 5, LocalDate.of(2020, 1, 2)})});
        }

        @Test
        public void testMergeRead() {
            this.batchSql("INSERT INTO T4 VALUES (1, 2, CAST(NULL AS INT), 3, CAST('2020-01-01' AS DATE))", new Object[0]);
            this.batchSql("INSERT INTO T4 VALUES (1, 2, 2, CAST(NULL AS INT), CAST('2020-01-02' AS DATE))", new Object[0]);
            this.batchSql("INSERT INTO T4 VALUES (1, 2, 3, 5, CAST(NULL AS DATE))", new Object[0]);
            List<Row> result = this.batchSql("SELECT * FROM T4", new Object[0]);
            Assertions.assertThat(result).containsExactlyInAnyOrder((Object[])new Row[]{Row.of((Object[])new Object[]{1, 2, 3, 5, LocalDate.of(2020, 1, 2)})});
        }

        @Test
        public void testMergeCompaction() {
            this.batchSql("ALTER TABLE T4 SET ('commit.force-compact'='true')", new Object[0]);
            this.batchSql("INSERT INTO T4 VALUES (1, 2, CAST(NULL AS INT), 3, CAST('2020-01-01' AS DATE))", new Object[0]);
            this.batchSql("INSERT INTO T4 VALUES (1, 2, 2, CAST(NULL AS INT), CAST('2020-01-02' AS DATE))", new Object[0]);
            this.batchSql("INSERT INTO T4 VALUES (1, 2, 3, 5, CAST(NULL AS DATE))", new Object[0]);
            this.batchSql("INSERT INTO T4 VALUES (1, 3, 3, 4, CAST('2020-01-01' AS DATE))", new Object[0]);
            this.batchSql("INSERT INTO T4 VALUES (1, 3, 2, 6, CAST(NULL AS DATE))", new Object[0]);
            this.batchSql("INSERT INTO T4 VALUES (1, 3, CAST(NULL AS INT), CAST(NULL AS INT), CAST('2022-01-02' AS DATE))", new Object[0]);
            Assertions.assertThat(this.batchSql("SELECT * FROM T4", new Object[0])).containsExactlyInAnyOrder((Object[])new Row[]{Row.of((Object[])new Object[]{1, 2, 3, 5, LocalDate.of(2020, 1, 2)}), Row.of((Object[])new Object[]{1, 3, 2, 6, LocalDate.of(2022, 1, 2)})});
        }

        @Test
        public void testStreamingRead() {
            Assertions.assertThatThrownBy(() -> this.sEnv.from("T4").execute().print(), (String)"Pre-aggregate continuous reading is not supported", (Object[])new Object[0]);
        }
    }

    public static class LastValueAggregation
    extends CatalogITCaseBase {
        @Override
        protected int defaultParallelism() {
            return 1;
        }

        @Override
        protected List<String> ddl() {
            return Collections.singletonList("CREATE TABLE IF NOT EXISTS T5 (j INT, k INT, a INT, i DATE,PRIMARY KEY (j,k) NOT ENFORCED) WITH ('merge-engine'='aggregation', 'fields.a.aggregate-function'='last_value', 'fields.i.aggregate-function'='last_value');");
        }

        @Test
        public void testMergeInMemory() {
            this.batchSql("CREATE TABLE myTable AS SELECT b, c, d, e FROM (VALUES   (1, 1, 2, CAST(NULL AS INT), CAST('2020-01-01' AS DATE)),  (2, 1, 2, 2, CAST('2020-01-02' AS DATE)),  (3, 1, 2, 3, CAST(NULL AS DATE))) AS V(a, b, c, d, e) ORDER BY a", new Object[0]);
            this.batchSql("INSERT INTO T5 SELECT * FROM myTable", new Object[0]);
            List<Row> result = this.batchSql("SELECT * FROM T5", new Object[0]);
            Assertions.assertThat(result).containsExactlyInAnyOrder((Object[])new Row[]{Row.of((Object[])new Object[]{1, 2, 3, null})});
        }

        @Test
        public void testMergeRead() {
            this.batchSql("INSERT INTO T5 VALUES (1, 2, CAST(NULL AS INT), CAST('2020-01-01' AS DATE))", new Object[0]);
            this.batchSql("INSERT INTO T5 VALUES (1, 2, 2, CAST('2020-01-02' AS DATE))", new Object[0]);
            this.batchSql("INSERT INTO T5 VALUES (1, 2, 3, CAST(NULL AS DATE))", new Object[0]);
            List<Row> result = this.batchSql("SELECT * FROM T5", new Object[0]);
            Assertions.assertThat(result).containsExactlyInAnyOrder((Object[])new Row[]{Row.of((Object[])new Object[]{1, 2, 3, null})});
        }

        @Test
        public void testMergeCompaction() {
            this.batchSql("ALTER TABLE T5 SET ('commit.force-compact'='true')", new Object[0]);
            this.batchSql("INSERT INTO T5 VALUES (1, 2, CAST(NULL AS INT), CAST('2020-01-01' AS DATE))", new Object[0]);
            this.batchSql("INSERT INTO T5 VALUES (1, 2, 2, CAST('2020-01-02' AS DATE))", new Object[0]);
            this.batchSql("INSERT INTO T5 VALUES (1, 2, 3, CAST(NULL AS DATE))", new Object[0]);
            this.batchSql("INSERT INTO T5 VALUES (1, 3, 3, CAST('2020-01-01' AS DATE))", new Object[0]);
            this.batchSql("INSERT INTO T5 VALUES (1, 3, 2, CAST(NULL AS DATE))", new Object[0]);
            this.batchSql("INSERT INTO T5 VALUES (1, 3, CAST(NULL AS INT), CAST('2022-01-02' AS DATE))", new Object[0]);
            Assertions.assertThat(this.batchSql("SELECT * FROM T5", new Object[0])).containsExactlyInAnyOrder((Object[])new Row[]{Row.of((Object[])new Object[]{1, 2, 3, null}), Row.of((Object[])new Object[]{1, 3, null, LocalDate.of(2022, 1, 2)})});
        }

        @Test
        public void testStreamingRead() {
            Assertions.assertThatThrownBy(() -> this.sEnv.from("T5").execute().print(), (String)"Pre-aggregate continuous reading is not supported", (Object[])new Object[0]);
        }
    }

    public static class ListAggAggregation
    extends CatalogITCaseBase {
        @Override
        protected int defaultParallelism() {
            return 1;
        }

        @Override
        protected List<String> ddl() {
            return Collections.singletonList("CREATE TABLE IF NOT EXISTS T6 (j INT, k INT, a STRING, PRIMARY KEY (j,k) NOT ENFORCED) WITH ('merge-engine'='aggregation', 'fields.a.aggregate-function'='listagg');");
        }

        @Test
        public void testMergeInMemory() {
            this.batchSql("CREATE TABLE myTable AS SELECT b, c, d FROM (VALUES   (1, 1, 2, 'first line'),  (2, 1, 2, CAST(NULL AS STRING)),  (3, 1, 2, 'second line')) AS V(a, b, c, d) ORDER BY a", new Object[0]);
            this.batchSql("INSERT INTO T6 SELECT * FROM myTable", new Object[0]);
            List<Row> result = this.batchSql("SELECT * FROM T6", new Object[0]);
            Assertions.assertThat(result).containsExactlyInAnyOrder((Object[])new Row[]{Row.of((Object[])new Object[]{1, 2, "first line,second line"})});
        }

        @Test
        public void testMergeRead() {
            this.batchSql("INSERT INTO T6 VALUES (1, 2, 'first line')", new Object[0]);
            this.batchSql("INSERT INTO T6 VALUES (1, 2, CAST(NULL AS STRING))", new Object[0]);
            this.batchSql("INSERT INTO T6 VALUES (1, 2, 'second line')", new Object[0]);
            List<Row> result = this.batchSql("SELECT * FROM T6", new Object[0]);
            Assertions.assertThat(result).containsExactlyInAnyOrder((Object[])new Row[]{Row.of((Object[])new Object[]{1, 2, "first line,second line"})});
        }

        @Test
        public void testMergeCompaction() {
            this.batchSql("ALTER TABLE T6 SET ('commit.force-compact'='true')", new Object[0]);
            this.batchSql("INSERT INTO T6 VALUES (1, 2, 'first line')", new Object[0]);
            this.batchSql("INSERT INTO T6 VALUES (1, 2, CAST(NULL AS STRING))", new Object[0]);
            this.batchSql("INSERT INTO T6 VALUES (1, 2, 'second line')", new Object[0]);
            this.batchSql("INSERT INTO T6 VALUES (1, 3, CAST(NULL AS STRING))", new Object[0]);
            this.batchSql("INSERT INTO T6 VALUES (1, 3, CAST(NULL AS STRING))", new Object[0]);
            this.batchSql("INSERT INTO T6 VALUES (1, 3, CAST(NULL AS STRING))", new Object[0]);
            Assertions.assertThat(this.batchSql("SELECT * FROM T6", new Object[0])).containsExactlyInAnyOrder((Object[])new Row[]{Row.of((Object[])new Object[]{1, 2, "first line,second line"}), Row.of((Object[])new Object[]{1, 3, null})});
        }

        @Test
        public void testStreamingRead() {
            Assertions.assertThatThrownBy(() -> this.sEnv.from("T6").execute().print(), (String)"Pre-aggregate continuous reading is not supported", (Object[])new Object[0]);
        }
    }

    public static class BoolOrAndAggregation
    extends CatalogITCaseBase {
        @Override
        protected List<String> ddl() {
            return Collections.singletonList("CREATE TABLE IF NOT EXISTS T7 (j INT, k INT, a BOOLEAN, b BOOLEAN,PRIMARY KEY (j,k) NOT ENFORCED) WITH ('merge-engine'='aggregation', 'fields.a.aggregate-function'='bool_or','fields.b.aggregate-function'='bool_and');");
        }

        @Test
        public void testMergeInMemory() {
            this.batchSql("INSERT INTO T7 VALUES (1, 2, CAST('TRUE' AS  BOOLEAN), CAST('TRUE' AS BOOLEAN)),(1, 2, CAST(NULL AS BOOLEAN), CAST(NULL AS BOOLEAN)), (1, 2, CAST('FALSE' AS BOOLEAN), CAST('FALSE' AS BOOLEAN))", new Object[0]);
            List<Row> result = this.batchSql("SELECT * FROM T7", new Object[0]);
            Assertions.assertThat(result).containsExactlyInAnyOrder((Object[])new Row[]{Row.of((Object[])new Object[]{1, 2, true, false})});
        }

        @Test
        public void testMergeRead() {
            this.batchSql("INSERT INTO T7 VALUES (1, 2, CAST('TRUE' AS  BOOLEAN), CAST('TRUE' AS BOOLEAN))", new Object[0]);
            this.batchSql("INSERT INTO T7 VALUES (1, 2, CAST(NULL AS BOOLEAN), CAST(NULL AS BOOLEAN))", new Object[0]);
            this.batchSql("INSERT INTO T7 VALUES (1, 2, CAST('FALSE' AS BOOLEAN), CAST('FALSE' AS BOOLEAN))", new Object[0]);
            List<Row> result = this.batchSql("SELECT * FROM T7", new Object[0]);
            Assertions.assertThat(result).containsExactlyInAnyOrder((Object[])new Row[]{Row.of((Object[])new Object[]{1, 2, true, false})});
        }

        @Test
        public void testMergeCompaction() {
            this.batchSql("ALTER TABLE T7 SET ('commit.force-compact'='true')", new Object[0]);
            this.batchSql("INSERT INTO T7 VALUES (1, 2, CAST('TRUE' AS  BOOLEAN), CAST('TRUE' AS BOOLEAN))", new Object[0]);
            this.batchSql("INSERT INTO T7 VALUES (1, 2, CAST(NULL AS BOOLEAN), CAST(NULL AS BOOLEAN))", new Object[0]);
            this.batchSql("INSERT INTO T7 VALUES (1, 2, CAST('FALSE' AS BOOLEAN), CAST('FALSE' AS BOOLEAN))", new Object[0]);
            this.batchSql("INSERT INTO T7 VALUES (1, 3, CAST('FALSE' AS  BOOLEAN), CAST('TRUE' AS BOOLEAN))", new Object[0]);
            this.batchSql("INSERT INTO T7 VALUES (1, 3, CAST(NULL AS BOOLEAN), CAST(NULL AS BOOLEAN))", new Object[0]);
            this.batchSql("INSERT INTO T7 VALUES (1, 3, CAST('FALSE' AS BOOLEAN), CAST('TRUE' AS BOOLEAN))", new Object[0]);
            Assertions.assertThat(this.batchSql("SELECT * FROM T7", new Object[0])).containsExactlyInAnyOrder((Object[])new Row[]{Row.of((Object[])new Object[]{1, 2, true, false}), Row.of((Object[])new Object[]{1, 3, false, true})});
        }

        @Test
        public void testStreamingRead() {
            Assertions.assertThatThrownBy(() -> this.sEnv.from("T7").execute().print(), (String)"Pre-aggregate continuous reading is not supported", (Object[])new Object[0]);
        }
    }
}

