/*
 * Decompiled with CFR 0.152.
 */
package org.apache.paimon.flink;

import java.io.IOException;
import java.math.BigDecimal;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.apache.commons.codec.binary.Hex;
import org.apache.flink.table.api.config.ExecutionConfigOptions;
import org.apache.flink.table.planner.factories.TestValuesTableFactory;
import org.apache.flink.types.Row;
import org.apache.flink.types.RowKind;
import org.apache.flink.util.CloseableIterator;
import org.apache.paimon.flink.CatalogITCaseBase;
import org.apache.paimon.utils.BlockingIterator;
import org.apache.paimon.utils.RoaringBitmap32;
import org.apache.paimon.utils.RoaringBitmap64;
import org.apache.paimon.utils.ThetaSketch;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;

public class PreAggregationITCase {

    public static class RoaringBitmapAggAggregationITCase
    extends CatalogITCaseBase {
        @Test
        public void testRoaring32BitmapAgg() throws IOException {
            this.sql("CREATE TABLE test_rbm64(  id INT PRIMARY KEY NOT ENFORCED,  f0 VARBINARY) WITH (  'merge-engine' = 'aggregation',  'fields.f0.aggregate-function' = 'rbm32')", new Object[0]);
            byte[] v1Bytes = RoaringBitmap32.bitmapOf((int[])new int[]{1}).serialize();
            byte[] v2Bytes = RoaringBitmap32.bitmapOf((int[])new int[]{2}).serialize();
            byte[] v3Bytes = RoaringBitmap32.bitmapOf((int[])new int[]{3}).serialize();
            byte[] v4Bytes = RoaringBitmap32.bitmapOf((int[])new int[]{1, 2}).serialize();
            byte[] v5Bytes = RoaringBitmap32.bitmapOf((int[])new int[]{2, 3}).serialize();
            String v1 = Hex.encodeHexString((byte[])v1Bytes).toUpperCase();
            String v2 = Hex.encodeHexString((byte[])v2Bytes).toUpperCase();
            String v3 = Hex.encodeHexString((byte[])v3Bytes).toUpperCase();
            this.sql("INSERT INTO test_rbm64 VALUES (1, CAST (NULL AS VARBINARY)), (2, CAST (x'" + v1 + "' AS VARBINARY)), (3, CAST (x'" + v2 + "' AS VARBINARY))", new Object[0]);
            List<Row> result = this.queryAndSort("SELECT * FROM test_rbm64");
            this.checkOneRecord(result.get(0), 1, null);
            this.checkOneRecord(result.get(1), 2, v1Bytes);
            this.checkOneRecord(result.get(2), 3, v2Bytes);
            this.sql("INSERT INTO test_rbm64 VALUES (1, CAST (x'" + v1 + "' AS VARBINARY)), (2, CAST (x'" + v2 + "' AS VARBINARY)), (2, CAST (x'" + v2 + "' AS VARBINARY)), (3, CAST (x'" + v3 + "' AS VARBINARY))", new Object[0]);
            result = this.queryAndSort("SELECT * FROM test_rbm64");
            this.checkOneRecord(result.get(0), 1, v1Bytes);
            this.checkOneRecord(result.get(1), 2, v4Bytes);
            this.checkOneRecord(result.get(2), 3, v5Bytes);
        }

        @Test
        public void testRoaring64BitmapAgg() throws IOException {
            this.sql("CREATE TABLE test_rbm64(  id INT PRIMARY KEY NOT ENFORCED,  f0 VARBINARY) WITH (  'merge-engine' = 'aggregation',  'fields.f0.aggregate-function' = 'rbm64')", new Object[0]);
            byte[] v1Bytes = RoaringBitmap64.bitmapOf((long[])new long[]{1L}).serialize();
            byte[] v2Bytes = RoaringBitmap64.bitmapOf((long[])new long[]{2L}).serialize();
            byte[] v3Bytes = RoaringBitmap64.bitmapOf((long[])new long[]{3L}).serialize();
            byte[] v4Bytes = RoaringBitmap64.bitmapOf((long[])new long[]{1L, 2L}).serialize();
            byte[] v5Bytes = RoaringBitmap64.bitmapOf((long[])new long[]{2L, 3L}).serialize();
            String v1 = Hex.encodeHexString((byte[])v1Bytes).toUpperCase();
            String v2 = Hex.encodeHexString((byte[])v2Bytes).toUpperCase();
            String v3 = Hex.encodeHexString((byte[])v3Bytes).toUpperCase();
            this.sql("INSERT INTO test_rbm64 VALUES (1, CAST (NULL AS VARBINARY)), (2, CAST (x'" + v1 + "' AS VARBINARY)), (3, CAST (x'" + v2 + "' AS VARBINARY))", new Object[0]);
            List<Row> result = this.queryAndSort("SELECT * FROM test_rbm64");
            this.checkOneRecord(result.get(0), 1, null);
            this.checkOneRecord(result.get(1), 2, v1Bytes);
            this.checkOneRecord(result.get(2), 3, v2Bytes);
            this.sql("INSERT INTO test_rbm64 VALUES (1, CAST (x'" + v1 + "' AS VARBINARY)), (2, CAST (x'" + v2 + "' AS VARBINARY)), (2, CAST (x'" + v2 + "' AS VARBINARY)), (3, CAST (x'" + v3 + "' AS VARBINARY))", new Object[0]);
            result = this.queryAndSort("SELECT * FROM test_rbm64");
            this.checkOneRecord(result.get(0), 1, v1Bytes);
            this.checkOneRecord(result.get(1), 2, v4Bytes);
            this.checkOneRecord(result.get(2), 3, v5Bytes);
        }

        private void checkOneRecord(Row row, int id, byte[] expected) {
            Assertions.assertThat((Object)row.getField(0)).isEqualTo((Object)id);
            Assertions.assertThat((Object)row.getField(1)).isEqualTo((Object)expected);
        }
    }

    public static class ThetaSketchAggAggregationITCase
    extends CatalogITCaseBase {
        @Test
        public void testThetaSketchAgg() {
            this.sql("CREATE TABLE test_collect(  id INT PRIMARY KEY NOT ENFORCED,  f0 VARBINARY) WITH (  'merge-engine' = 'aggregation',  'fields.f0.aggregate-function' = 'theta_sketch')", new Object[0]);
            String str1 = Hex.encodeHexString((byte[])ThetaSketch.sketchOf((int[])new int[]{1})).toUpperCase();
            String str2 = Hex.encodeHexString((byte[])ThetaSketch.sketchOf((int[])new int[]{2})).toUpperCase();
            String str3 = Hex.encodeHexString((byte[])ThetaSketch.sketchOf((int[])new int[]{3})).toUpperCase();
            this.sql(String.format("INSERT INTO test_collect VALUES (1, CAST (NULL AS VARBINARY)),(2, CAST(x'%s' AS VARBINARY)), (3, CAST(x'%s' AS VARBINARY))", str1, str2), new Object[0]);
            List<Row> result = this.queryAndSort("SELECT * FROM test_collect");
            this.checkOneRecord(result.get(0), 1, null);
            this.checkOneRecord(result.get(1), 2, ThetaSketch.sketchOf((int[])new int[]{1}));
            this.checkOneRecord(result.get(2), 3, ThetaSketch.sketchOf((int[])new int[]{2}));
            this.sql(String.format("INSERT INTO test_collect VALUES (1, CAST (x'%s' AS VARBINARY)),(2, CAST(x'%s' AS VARBINARY)), (2, CAST(x'%s' AS VARBINARY)), (3, CAST(x'%s' AS VARBINARY))", str1, str2, str2, str3), new Object[0]);
            result = this.queryAndSort("SELECT * FROM test_collect");
            this.checkOneRecord(result.get(0), 1, ThetaSketch.sketchOf((int[])new int[]{1}));
            this.checkOneRecord(result.get(1), 2, ThetaSketch.sketchOf((int[])new int[]{1, 2}));
            this.checkOneRecord(result.get(2), 3, ThetaSketch.sketchOf((int[])new int[]{2, 3}));
        }

        private void checkOneRecord(Row row, int id, byte[] expected) {
            Assertions.assertThat((Object)row.getField(0)).isEqualTo((Object)id);
            Assertions.assertThat((Object)row.getField(1)).isEqualTo((Object)expected);
        }
    }

    public static class FieldsDefaultAggregationITCase
    extends CatalogITCaseBase {
        @Override
        protected int defaultParallelism() {
            return 1;
        }

        @Override
        protected List<String> ddl() {
            return Collections.singletonList("CREATE TABLE IF NOT EXISTS test_default_agg_func (j INT, k INT, a INT, b INT, i DATE,PRIMARY KEY (j,k) NOT ENFORCED) WITH ('merge-engine'='aggregation', 'fields.default-aggregate-function'='first_non_null_value', 'fields.i.aggregate-function'='last_non_null_value');");
        }

        @Test
        public void testMergeInMemory() {
            this.batchSql("CREATE TABLE myTable AS SELECT b, c, d, e, f FROM (VALUES   (1, 1, 2, CAST(NULL AS INT), 4, CAST('2020-01-01' AS DATE)),  (2, 1, 2, 2, CAST(NULL as INT), CAST('2020-01-02' AS DATE)),  (3, 1, 2, 3, 5, CAST(NULL AS DATE))) AS V(a, b, c, d, e, f) ORDER BY a", new Object[0]);
            this.batchSql("INSERT INTO test_default_agg_func SELECT * FROM myTable", new Object[0]);
            List<Row> result = this.batchSql("SELECT * FROM test_default_agg_func", new Object[0]);
            Assertions.assertThat(result).containsExactlyInAnyOrder((Object[])new Row[]{Row.of((Object[])new Object[]{1, 2, 2, 4, LocalDate.of(2020, 1, 2)})});
        }

        @Test
        public void testMergeRead() {
            this.batchSql("INSERT INTO test_default_agg_func VALUES (1, 2, CAST(NULL AS INT), 3, CAST('2020-01-01' AS DATE))", new Object[0]);
            this.batchSql("INSERT INTO test_default_agg_func VALUES (1, 2, 2, CAST(NULL AS INT), CAST('2020-01-02' AS DATE))", new Object[0]);
            this.batchSql("INSERT INTO test_default_agg_func VALUES (1, 2, 3, 5, CAST(NULL AS DATE))", new Object[0]);
            List<Row> result = this.batchSql("SELECT * FROM test_default_agg_func", new Object[0]);
            Assertions.assertThat(result).containsExactlyInAnyOrder((Object[])new Row[]{Row.of((Object[])new Object[]{1, 2, 2, 3, LocalDate.of(2020, 1, 2)})});
        }

        @Test
        public void testMergeCompaction() {
            this.batchSql("ALTER TABLE test_default_agg_func SET ('commit.force-compact'='true')", new Object[0]);
            this.batchSql("INSERT INTO test_default_agg_func VALUES (1, 2, CAST(NULL AS INT), 3, CAST('2020-01-01' AS DATE))", new Object[0]);
            this.batchSql("INSERT INTO test_default_agg_func VALUES (1, 2, 2, CAST(NULL AS INT), CAST('2020-01-02' AS DATE))", new Object[0]);
            this.batchSql("INSERT INTO test_default_agg_func VALUES (1, 2, 3, 5, CAST(NULL AS DATE))", new Object[0]);
            this.batchSql("INSERT INTO test_default_agg_func VALUES (1, 3, 3, 4, CAST('2020-01-01' AS DATE))", new Object[0]);
            this.batchSql("INSERT INTO test_default_agg_func VALUES (1, 3, 2, 6, CAST(NULL AS DATE))", new Object[0]);
            this.batchSql("INSERT INTO test_default_agg_func VALUES (1, 3, CAST(NULL AS INT), CAST(NULL AS INT), CAST('2022-01-02' AS DATE))", new Object[0]);
            Assertions.assertThat(this.batchSql("SELECT * FROM test_default_agg_func", new Object[0])).containsExactlyInAnyOrder((Object[])new Row[]{Row.of((Object[])new Object[]{1, 2, 2, 3, LocalDate.of(2020, 1, 2)}), Row.of((Object[])new Object[]{1, 3, 3, 4, LocalDate.of(2022, 1, 2)})});
        }

        @Test
        public void testStreamingRead() {
            Assertions.assertThatThrownBy(() -> this.sEnv.from("test_default_agg_func").execute().print(), (String)"Pre-aggregate continuous reading is not supported", (Object[])new Object[0]);
        }
    }

    public static class MergeMapAggregationITCase
    extends CatalogITCaseBase {
        @Override
        protected List<String> ddl() {
            return Collections.singletonList("CREATE TABLE test_merge_map(  id INT PRIMARY KEY NOT ENFORCED,  f0 MAP<INT, STRING>) WITH (  'merge-engine' = 'aggregation',  'fields.f0.aggregate-function' = 'merge_map')");
        }

        @Test
        public void testMergeMap() {
            this.sql("INSERT INTO test_merge_map VALUES (1, CAST (NULL AS MAP<INT, STRING>)), (2, MAP[1, 'A']), (3, MAP[1, 'A', 2, 'B'])", new Object[0]);
            List<Row> result = this.queryAndSort("SELECT * FROM test_merge_map");
            this.checkOneRecord(result.get(0), 1, null);
            this.checkOneRecord(result.get(1), 2, this.toMap(1, "A"));
            this.checkOneRecord(result.get(2), 3, this.toMap(1, "A", 2, "B"));
            this.sql("INSERT INTO test_merge_map VALUES (1, MAP[1, 'A']), (2, MAP[1, 'B']), (3, MAP[1, 'a', 2, 'b', 3, 'c'])", new Object[0]);
            result = this.queryAndSort("SELECT * FROM test_merge_map");
            this.checkOneRecord(result.get(0), 1, this.toMap(1, "A"));
            this.checkOneRecord(result.get(1), 2, this.toMap(1, "B"));
            this.checkOneRecord(result.get(2), 3, this.toMap(1, "a", 2, "b", 3, "c"));
        }

        @Test
        public void testRetractInputNull() throws Exception {
            this.sql("CREATE TABLE test_merge_map1 (  id INT PRIMARY KEY NOT ENFORCED,  f0 MAP<INT, STRING>,  f1 INT) WITH (  'changelog-producer' = 'lookup',  'merge-engine' = 'partial-update',  'fields.f0.aggregate-function' = 'merge_map',  'fields.f1.sequence-group' = 'f0')", new Object[0]);
            List<Row> input = Arrays.asList(Row.ofKind((RowKind)RowKind.INSERT, (Object[])new Object[]{1, null, 1}), Row.ofKind((RowKind)RowKind.INSERT, (Object[])new Object[]{1, Collections.singletonMap(1, "A"), 2}), Row.ofKind((RowKind)RowKind.UPDATE_BEFORE, (Object[])new Object[]{1, null, 1}), Row.ofKind((RowKind)RowKind.UPDATE_AFTER, (Object[])new Object[]{1, Collections.singletonMap(2, "B"), 3}));
            this.sEnv.executeSql(String.format("CREATE TEMPORARY TABLE input (  id INT PRIMARY KEY NOT ENFORCED,  f0 MAP<INT, STRING>,  f1 INT) WITH (  'connector' = 'values',  'data-id' = '%s',  'bounded' = 'true',  'changelog-mode' = 'UB,UA')", TestValuesTableFactory.registerData(input))).await();
            this.sEnv.executeSql("INSERT INTO test_merge_map1 SELECT * FROM input").await();
            Assertions.assertThat(this.sql("SELECT * FROM test_merge_map1", new Object[0])).containsExactly((Object[])new Row[]{Row.of((Object[])new Object[]{1, this.toMap(1, "A", 2, "B"), 3})});
        }

        private Map<Object, Object> toMap(Object ... kvs) {
            HashMap<Object, Object> result = new HashMap<Object, Object>();
            for (int i = 0; i < kvs.length; i += 2) {
                result.put(kvs[i], kvs[i + 1]);
            }
            return result;
        }

        private void checkOneRecord(Row row, int id, Map<Object, Object> map) {
            Assertions.assertThat((Object)row.getField(0)).isEqualTo((Object)id);
            if (map == null || map.isEmpty()) {
                Assertions.assertThat((Object)row.getField(1)).isNull();
            } else {
                Assertions.assertThat((Map)((Map)row.getField(1))).containsExactlyInAnyOrderEntriesOf(map);
            }
        }
    }

    public static class CollectAggregationITCase
    extends CatalogITCaseBase {
        @Override
        protected int defaultParallelism() {
            return 1;
        }

        @Test
        public void testAggWithDistinct() {
            this.sql("CREATE TABLE test_collect(  id INT PRIMARY KEY NOT ENFORCED,  f0 ARRAY<STRING>) WITH (  'merge-engine' = 'aggregation',  'fields.f0.aggregate-function' = 'collect',  'fields.f0.distinct' = 'true')", new Object[0]);
            this.sql("INSERT INTO test_collect VALUES (1, CAST (NULL AS ARRAY<STRING>)), (2, ARRAY['A', 'B']), (3, ARRAY['car', 'watch'])", new Object[0]);
            List<Row> result = this.queryAndSort("SELECT * FROM test_collect");
            this.checkOneRecord(result.get(0), 1, new String[0]);
            this.checkOneRecord(result.get(1), 2, "A", "B");
            this.checkOneRecord(result.get(2), 3, "car", "watch");
            this.sql("INSERT INTO test_collect VALUES (1, ARRAY['paimon', 'paimon']), (2, ARRAY['A', 'B', 'C']), (3, CAST (NULL AS ARRAY<STRING>))", new Object[0]);
            result = this.queryAndSort("SELECT * FROM test_collect");
            this.checkOneRecord(result.get(0), 1, "paimon");
            this.checkOneRecord(result.get(1), 2, "A", "B", "C");
            this.checkOneRecord(result.get(2), 3, "car", "watch");
        }

        @Test
        public void testAggWithoutDistinct() {
            this.sql("CREATE TABLE test_collect(  id INT PRIMARY KEY NOT ENFORCED,  f0 ARRAY<STRING>) WITH (  'merge-engine' = 'aggregation',  'fields.f0.aggregate-function' = 'collect')", new Object[0]);
            this.sql("INSERT INTO test_collect VALUES (1, CAST (NULL AS ARRAY<STRING>)), (2, ARRAY['A', 'B', 'B']), (3, ARRAY['car', 'watch'])", new Object[0]);
            List<Row> result = this.queryAndSort("SELECT * FROM test_collect");
            this.checkOneRecord(result.get(0), 1, new String[0]);
            this.checkOneRecord(result.get(1), 2, "A", "B", "B");
            this.checkOneRecord(result.get(2), 3, "car", "watch");
            this.sql("INSERT INTO test_collect VALUES (1, ARRAY['paimon', 'paimon']), (2, ARRAY['A', 'B', 'C']), (3, CAST (NULL AS ARRAY<STRING>))", new Object[0]);
            result = this.queryAndSort("SELECT * FROM test_collect");
            this.checkOneRecord(result.get(0), 1, "paimon", "paimon");
            this.checkOneRecord(result.get(1), 2, "A", "A", "B", "B", "B", "C");
            this.checkOneRecord(result.get(2), 3, "car", "watch");
        }

        private static List<Arguments> retractArguments() {
            return Arrays.asList(Arguments.arguments((Object[])new Object[]{"lookup", "aggregation"}), Arguments.arguments((Object[])new Object[]{"lookup", "partial-update"}), Arguments.arguments((Object[])new Object[]{"full-compaction", "aggregation"}), Arguments.arguments((Object[])new Object[]{"full-compaction", "partial-update"}));
        }

        @ParameterizedTest(name="changelog-producer = {0}, merge-engine = {1}")
        @MethodSource(value={"retractArguments"})
        public void testRetract(String changelogProducer, String mergeEngine) throws Exception {
            String sequenceGroup = "";
            if (mergeEngine.equals("partial-update")) {
                sequenceGroup = ", 'fields.f1.sequence-group' = 'f0'";
            }
            this.sql("CREATE TABLE test_collect(  id INT PRIMARY KEY NOT ENFORCED,  f0 ARRAY<STRING>,  f1 INT) WITH (  'changelog-producer' = '%s',  'merge-engine' = '%s',  'fields.f0.aggregate-function' = 'collect'  %s)", changelogProducer, mergeEngine, sequenceGroup);
            BlockingIterator<Row, Row> select = this.streamSqlBlockIter("SELECT * FROM test_collect", new Object[0]);
            String temporaryTableTemplate = "CREATE TEMPORARY TABLE %s (  id INT PRIMARY KEY NOT ENFORCED,  f0 ARRAY<STRING>,  f1 INT) WITH (  'connector' = 'values',  'data-id' = '%s',  'bounded' = 'true',  'changelog-mode' = '%s')";
            this.sql("INSERT INTO test_collect VALUES (1, ARRAY['A', 'B'], 1)", new Object[0]);
            List result = select.collect(1);
            this.checkOneRecord((Row)result.get(0), 1, "A", "B");
            List<Row> inputRecords = Arrays.asList(Row.ofKind((RowKind)RowKind.UPDATE_BEFORE, (Object[])new Object[]{1, new String[]{"A", "B"}, 2}), Row.ofKind((RowKind)RowKind.UPDATE_AFTER, (Object[])new Object[]{1, new String[]{"C", "D"}, 3}));
            this.sEnv.executeSql(String.format(temporaryTableTemplate, "INPUT11", TestValuesTableFactory.registerData(inputRecords), "UB,UA")).await();
            this.sEnv.executeSql("INSERT INTO test_collect SELECT * FROM INPUT11").await();
            result = select.collect(2);
            Assertions.assertThat((Comparable)((Row)result.get(0)).getKind()).isEqualTo((Object)RowKind.UPDATE_BEFORE);
            this.checkOneRecord((Row)result.get(0), 1, "A", "B");
            Assertions.assertThat((Comparable)((Row)result.get(1)).getKind()).isEqualTo((Object)RowKind.UPDATE_AFTER);
            this.checkOneRecord((Row)result.get(1), 1, "A", "B", "C", "D");
            inputRecords = Collections.singletonList(Row.ofKind((RowKind)RowKind.DELETE, (Object[])new Object[]{1, new String[]{"C", "D"}, 4}));
            this.sEnv.executeSql(String.format(temporaryTableTemplate, "INPUT12", TestValuesTableFactory.registerData(inputRecords), "D")).await();
            this.sEnv.executeSql("INSERT INTO test_collect SELECT * FROM INPUT12").await();
            result = select.collect(2);
            Assertions.assertThat((Comparable)((Row)result.get(0)).getKind()).isEqualTo((Object)RowKind.UPDATE_BEFORE);
            this.checkOneRecord((Row)result.get(0), 1, "A", "B", "C", "D");
            Assertions.assertThat((Comparable)((Row)result.get(1)).getKind()).isEqualTo((Object)RowKind.UPDATE_AFTER);
            this.checkOneRecord((Row)result.get(1), 1, "A", "B");
            this.sql("INSERT INTO test_collect VALUES (2, ARRAY['A', 'B'], 5), (3, ARRAY['A', 'B'], 6)", new Object[0]);
            result = select.collect(2);
            this.checkOneRecord((Row)result.get(0), 2, "A", "B");
            this.checkOneRecord((Row)result.get(1), 3, "A", "B");
            inputRecords = Arrays.asList(Row.ofKind((RowKind)RowKind.UPDATE_BEFORE, (Object[])new Object[]{2, new String[]{"A", "B"}, 7}), Row.ofKind((RowKind)RowKind.UPDATE_AFTER, (Object[])new Object[]{2, new String[]{"C", "D"}, 8}));
            this.sEnv.executeSql(String.format(temporaryTableTemplate, "INPUT21", TestValuesTableFactory.registerData(inputRecords), "UB,UA")).await();
            this.sEnv.executeSql("INSERT INTO test_collect SELECT * FROM INPUT21").await();
            result = select.collect(2);
            Assertions.assertThat((Comparable)((Row)result.get(0)).getKind()).isEqualTo((Object)RowKind.UPDATE_BEFORE);
            this.checkOneRecord((Row)result.get(0), 2, "A", "B");
            Assertions.assertThat((Comparable)((Row)result.get(1)).getKind()).isEqualTo((Object)RowKind.UPDATE_AFTER);
            this.checkOneRecord((Row)result.get(1), 2, "A", "B", "C", "D");
            inputRecords = Collections.singletonList(Row.ofKind((RowKind)RowKind.DELETE, (Object[])new Object[]{3, new String[]{"A"}, 9}));
            this.sEnv.executeSql(String.format(temporaryTableTemplate, "INPUT22", TestValuesTableFactory.registerData(inputRecords), "D")).await();
            this.sEnv.executeSql("INSERT INTO test_collect SELECT * FROM INPUT22").await();
            result = select.collect(2);
            Assertions.assertThat((Comparable)((Row)result.get(0)).getKind()).isEqualTo((Object)RowKind.UPDATE_BEFORE);
            this.checkOneRecord((Row)result.get(0), 3, "A", "B");
            Assertions.assertThat((Comparable)((Row)result.get(1)).getKind()).isEqualTo((Object)RowKind.UPDATE_AFTER);
            this.checkOneRecord((Row)result.get(1), 3, "B");
            select.close();
        }

        @Test
        public void testRetractInputNull() throws Exception {
            this.sql("CREATE TABLE test_collect (  id INT PRIMARY KEY NOT ENFORCED,  f0 ARRAY<STRING>,  f1 INT) WITH (  'changelog-producer' = 'lookup',  'merge-engine' = 'partial-update',  'fields.f0.aggregate-function' = 'collect',  'fields.f1.sequence-group' = 'f0')", new Object[0]);
            List<Row> input = Arrays.asList(Row.ofKind((RowKind)RowKind.INSERT, (Object[])new Object[]{1, null, 1}), Row.ofKind((RowKind)RowKind.INSERT, (Object[])new Object[]{1, new String[]{"A"}, 2}), Row.ofKind((RowKind)RowKind.UPDATE_BEFORE, (Object[])new Object[]{1, null, 1}), Row.ofKind((RowKind)RowKind.UPDATE_AFTER, (Object[])new Object[]{1, new String[]{"B"}, 3}));
            this.sEnv.executeSql(String.format("CREATE TEMPORARY TABLE input (  id INT PRIMARY KEY NOT ENFORCED,  f0 ARRAY<STRING>,  f1 INT) WITH (  'connector' = 'values',  'data-id' = '%s',  'bounded' = 'true',  'changelog-mode' = 'UB,UA')", TestValuesTableFactory.registerData(input))).await();
            this.sEnv.executeSql("INSERT INTO test_collect SELECT * FROM input").await();
            Assertions.assertThat(this.sql("SELECT * FROM test_collect", new Object[0])).containsExactly((Object[])new Row[]{Row.of((Object[])new Object[]{1, new String[]{"A", "B"}, 3})});
        }

        private void checkOneRecord(Row row, int id, String ... elements) {
            Assertions.assertThat((Object)row.getField(0)).isEqualTo((Object)id);
            if (elements == null || elements.length == 0) {
                Assertions.assertThat((Object)row.getField(1)).isNull();
            } else {
                Assertions.assertThat((Object[])((String[])row.getField(1))).containsExactlyInAnyOrder((Object[])elements);
            }
        }
    }

    public static class NestedUpdateAggregationITCase
    extends CatalogITCaseBase {
        @Override
        protected List<String> ddl() {
            String ordersTable = "CREATE TABLE orders (\n  order_id INT PRIMARY KEY NOT ENFORCED,\n  user_name STRING,\n  address STRING\n);";
            String subordersTable = "CREATE TABLE sub_orders (\n  order_id INT,\n  daily_id INT,\n  today STRING,\n  product_name STRING,\n  price BIGINT,\n  PRIMARY KEY (order_id, daily_id, today) NOT ENFORCED\n);";
            String wideTable = "CREATE TABLE order_wide (\n  order_id INT PRIMARY KEY NOT ENFORCED,\n  user_name STRING,\n  address STRING,\n  sub_orders ARRAY<ROW<daily_id INT, today STRING, product_name STRING, price BIGINT>>\n) WITH (\n  'merge-engine' = 'aggregation',\n  'fields.sub_orders.aggregate-function' = 'nested_update',\n  'fields.sub_orders.nested-key' = 'daily_id,today',\n  'fields.sub_orders.ignore-retract' = 'true',  'fields.user_name.ignore-retract' = 'true',  'fields.address.ignore-retract' = 'true')";
            String wideAppendTable = "CREATE TABLE order_append_wide (\n  order_id INT PRIMARY KEY NOT ENFORCED,\n  user_name STRING,\n  address STRING,\n  sub_orders ARRAY<ROW<daily_id INT, today STRING, product_name STRING, price BIGINT>>\n) WITH (\n  'merge-engine' = 'aggregation',\n  'fields.sub_orders.aggregate-function' = 'nested_update',\n  'fields.sub_orders.ignore-retract' = 'true',  'fields.user_name.ignore-retract' = 'true',  'fields.address.ignore-retract' = 'true')";
            return Arrays.asList(ordersTable, subordersTable, wideTable, wideAppendTable);
        }

        @Test
        public void testUseCase() {
            this.sql("INSERT INTO orders VALUES (1, 'Wang', 'HangZhou'),(2, 'Zhao', 'ChengDu'),(3, 'Liu', 'NanJing')", new Object[0]);
            this.sql("INSERT INTO sub_orders VALUES (1, 1, '12-20', 'Apple', 8000),(1, 2, '12-20', 'Tesla', 400000),(1, 1, '12-21', 'Sangsung', 5000),(2, 1, '12-20', 'Tea', 40),(2, 2, '12-20', 'Pot', 60),(3, 1, '12-25', 'Bat', 15),(3, 1, '12-26', 'Cup', 30)", new Object[0]);
            this.sql(this.widenSql(), new Object[0]);
            List result = this.sql("SELECT * FROM order_wide", new Object[0]).stream().sorted(Comparator.comparingInt(r -> (Integer)r.getFieldAs(0))).collect(Collectors.toList());
            Assertions.assertThat((boolean)this.checkOneRecord((Row)result.get(0), 1, "Wang", "HangZhou", Row.of((Object[])new Object[]{1, "12-20", "Apple", 8000L}), Row.of((Object[])new Object[]{1, "12-21", "Sangsung", 5000L}), Row.of((Object[])new Object[]{2, "12-20", "Tesla", 400000L}))).isTrue();
            Assertions.assertThat((boolean)this.checkOneRecord((Row)result.get(1), 2, "Zhao", "ChengDu", Row.of((Object[])new Object[]{1, "12-20", "Tea", 40L}), Row.of((Object[])new Object[]{2, "12-20", "Pot", 60L}))).isTrue();
            Assertions.assertThat((boolean)this.checkOneRecord((Row)result.get(2), 3, "Liu", "NanJing", Row.of((Object[])new Object[]{1, "12-25", "Bat", 15L}), Row.of((Object[])new Object[]{1, "12-26", "Cup", 30L}))).isTrue();
            List<Row> unnested = this.sql("SELECT order_id, user_name, address, daily_id, today, product_name, price FROM order_wide, UNNEST(sub_orders) AS so(daily_id, today, product_name, price)", new Object[0]);
            Assertions.assertThat(unnested).containsExactlyInAnyOrder((Object[])new Row[]{Row.of((Object[])new Object[]{1, "Wang", "HangZhou", 1, "12-20", "Apple", 8000L}), Row.of((Object[])new Object[]{1, "Wang", "HangZhou", 2, "12-20", "Tesla", 400000L}), Row.of((Object[])new Object[]{1, "Wang", "HangZhou", 1, "12-21", "Sangsung", 5000L}), Row.of((Object[])new Object[]{2, "Zhao", "ChengDu", 1, "12-20", "Tea", 40L}), Row.of((Object[])new Object[]{2, "Zhao", "ChengDu", 2, "12-20", "Pot", 60L}), Row.of((Object[])new Object[]{3, "Liu", "NanJing", 1, "12-25", "Bat", 15L}), Row.of((Object[])new Object[]{3, "Liu", "NanJing", 1, "12-26", "Cup", 30L})});
        }

        @Test
        public void testUseCaseWithNullValue() {
            this.sql("INSERT INTO order_wide\nSELECT 6, CAST (NULL AS STRING), CAST (NULL AS STRING), ARRAY[cast(null as ROW<daily_id INT, today STRING, product_name STRING, price BIGINT>)]", new Object[0]);
            List result = this.sql("SELECT * FROM order_wide", new Object[0]).stream().sorted(Comparator.comparingInt(r -> (Integer)r.getFieldAs(0))).collect(Collectors.toList());
            Assertions.assertThat((boolean)this.checkOneRecord((Row)result.get(0), 6, null, null, new Row[]{null})).isTrue();
            this.sql("INSERT INTO order_wide\nSELECT 6, 'Sun', CAST (NULL AS STRING), ARRAY[ROW(1, '01-01','Apple', 6999)]", new Object[0]);
            result = this.sql("SELECT * FROM order_wide", new Object[0]).stream().sorted(Comparator.comparingInt(r -> (Integer)r.getFieldAs(0))).collect(Collectors.toList());
            Assertions.assertThat((boolean)this.checkOneRecord((Row)result.get(0), 6, "Sun", null, Row.of((Object[])new Object[]{1, "01-01", "Apple", 6999L}))).isTrue();
        }

        @Test
        public void testUseCaseAppend() {
            this.sql("INSERT INTO orders VALUES (1, 'Wang', 'HangZhou'),(2, 'Zhao', 'ChengDu'),(3, 'Liu', 'NanJing')", new Object[0]);
            this.sql("INSERT INTO sub_orders VALUES (1, 1, '12-20', 'Apple', 8000),(2, 1, '12-20', 'Tesla', 400000),(3, 1, '12-25', 'Bat', 15),(3, 1, '12-26', 'Cup', 30)", new Object[0]);
            this.sql(this.widenAppendSql(), new Object[0]);
            List<Row> unnested = this.sql("SELECT order_id, user_name, address, daily_id, today, product_name, price FROM order_append_wide, UNNEST(sub_orders) AS so(daily_id, today, product_name, price)", new Object[0]);
            Assertions.assertThat(unnested).containsExactlyInAnyOrder((Object[])new Row[]{Row.of((Object[])new Object[]{1, "Wang", "HangZhou", 1, "12-20", "Apple", 8000L}), Row.of((Object[])new Object[]{2, "Zhao", "ChengDu", 1, "12-20", "Tesla", 400000L}), Row.of((Object[])new Object[]{3, "Liu", "NanJing", 1, "12-25", "Bat", 15L}), Row.of((Object[])new Object[]{3, "Liu", "NanJing", 1, "12-26", "Cup", 30L})});
        }

        @Test
        @Timeout(value=60L)
        public void testUpdateWithIgnoreRetract() throws Exception {
            List<Row> result;
            boolean checkResult;
            this.sEnv.getConfig().set(ExecutionConfigOptions.TABLE_EXEC_SINK_UPSERT_MATERIALIZE, (Object)ExecutionConfigOptions.UpsertMaterialize.NONE);
            this.sql("INSERT INTO orders VALUES (1, 'Wang', 'HangZhou')", new Object[0]);
            this.sql("INSERT INTO sub_orders VALUES (1, 1, '12-20', 'Apple', 8000),(1, 2, '12-20', 'Tesla', 400000),(1, 1, '12-21', 'Sangsung', 5000)", new Object[0]);
            this.sEnv.executeSql(this.widenSql());
            do {
                Thread.sleep(500L);
            } while (!(checkResult = !(result = this.sql("SELECT * FROM order_wide", new Object[0])).isEmpty() && this.checkOneRecord(result.get(0), 1, "Wang", "HangZhou", Row.of((Object[])new Object[]{1, "12-20", "Apple", 8000L}), Row.of((Object[])new Object[]{1, "12-21", "Sangsung", 5000L}), Row.of((Object[])new Object[]{2, "12-20", "Tesla", 400000L}))));
            this.sql("INSERT INTO sub_orders VALUES (1, 2, '12-20', 'Benz', 380000)", new Object[0]);
            do {
                Thread.sleep(500L);
            } while (!(checkResult = !(result = this.sql("SELECT * FROM order_wide", new Object[0])).isEmpty() && this.checkOneRecord(result.get(0), 1, "Wang", "HangZhou", Row.of((Object[])new Object[]{1, "12-20", "Apple", 8000L}), Row.of((Object[])new Object[]{1, "12-21", "Sangsung", 5000L}), Row.of((Object[])new Object[]{2, "12-20", "Benz", 380000L}))));
        }

        private String widenSql() {
            return "INSERT INTO order_wide\nSELECT order_id, user_name, address, CAST (NULL AS ARRAY<ROW<daily_id INT, today STRING, product_name STRING, price BIGINT>>) FROM orders\nUNION ALL\nSELECT order_id, CAST (NULL AS STRING), CAST (NULL AS STRING), ARRAY[ROW(daily_id, today, product_name, price)] FROM sub_orders";
        }

        private String widenAppendSql() {
            return "INSERT INTO order_append_wide\nSELECT order_id, user_name, address, CAST (NULL AS ARRAY<ROW<daily_id INT, today STRING, product_name STRING, price BIGINT>>) FROM orders\nUNION ALL\nSELECT order_id, CAST (NULL AS STRING), CAST (NULL AS STRING), ARRAY[ROW(daily_id, today, product_name, price)] FROM sub_orders";
        }

        private boolean checkOneRecord(Row record, int orderId, String userName, String address, Row ... subOrders) {
            if ((Integer)record.getField(0) != orderId) {
                return false;
            }
            if (!Objects.equals(record.getFieldAs(1), userName)) {
                return false;
            }
            if (!Objects.equals(record.getFieldAs(2), address)) {
                return false;
            }
            return this.checkNestedTable((Row[])record.getFieldAs(3), subOrders);
        }

        private boolean checkNestedTable(Row[] nestedTable, Row ... subOrders) {
            if (nestedTable.length != subOrders.length) {
                return false;
            }
            Comparator<Object> comparator = Comparator.comparingInt(r -> (Integer)((Row)r).getFieldAs(0)).thenComparing(r -> (String)((Row)r).getField(1));
            List sortedActual = Arrays.stream(nestedTable).sorted(comparator).collect(Collectors.toList());
            List sortedExpected = Arrays.stream(subOrders).sorted(comparator).collect(Collectors.toList());
            for (int i = 0; i < sortedActual.size(); ++i) {
                if (Objects.equals(sortedActual.get(i), sortedExpected.get(i))) continue;
                return false;
            }
            return true;
        }
    }

    public static class BasicAggregateITCase
    extends CatalogITCaseBase {
        @Override
        protected List<String> ddl() {
            return Collections.singletonList("CREATE TABLE T (k INT,v INT,d INT,PRIMARY KEY (k, d) NOT ENFORCED) PARTITIONED BY (d)  WITH ('merge-engine'='aggregation', 'fields.v.aggregate-function'='sum','local-merge-buffer-size'='5m');");
        }

        @Test
        public void testLocalMerge() {
            this.sql("INSERT INTO T VALUES(1, 1, 1), (2, 1, 1), (1, 2, 1)", new Object[0]);
            Assertions.assertThat(this.batchSql("SELECT * FROM T", new Object[0])).containsExactlyInAnyOrder((Object[])new Row[]{Row.of((Object[])new Object[]{1, 3, 1}), Row.of((Object[])new Object[]{2, 1, 1})});
        }

        @Test
        public void testMergeRead() {
            this.sql("INSERT INTO T VALUES(1, 1, 1), (2, 1, 1)", new Object[0]);
            this.sql("INSERT INTO T VALUES(1, 2, 1)", new Object[0]);
            Assertions.assertThat(this.batchSql("SELECT * FROM T", new Object[0])).containsExactlyInAnyOrder((Object[])new Row[]{Row.of((Object[])new Object[]{1, 3, 1}), Row.of((Object[])new Object[]{2, 1, 1})});
            Assertions.assertThat(this.batchSql("SELECT * FROM T where v = 3", new Object[0])).containsExactlyInAnyOrder((Object[])new Row[]{Row.of((Object[])new Object[]{1, 3, 1})});
            Assertions.assertThat(this.batchSql("SELECT * FROM T where v = 1", new Object[0])).containsExactlyInAnyOrder((Object[])new Row[]{Row.of((Object[])new Object[]{2, 1, 1})});
        }

        @Test
        public void testSequenceFieldWithDefaultAgg() {
            this.sql("CREATE TABLE seq_default_agg ( pk INT PRIMARY KEY NOT ENFORCED, seq INT, v INT) WITH ( 'merge-engine'='aggregation', 'sequence.field'='seq', 'fields.default-aggregate-function'='sum')", new Object[0]);
            this.sql("INSERT INTO seq_default_agg VALUES (0, 1, 1)", new Object[0]);
            this.sql("INSERT INTO seq_default_agg VALUES (0, 2, 2)", new Object[0]);
            Assertions.assertThat(this.sql("SELECT * FROM seq_default_agg", new Object[0])).containsExactly((Object[])new Row[]{Row.of((Object[])new Object[]{0, 2, 3})});
        }
    }

    public static class FirstValueAggregation
    extends CatalogITCaseBase {
        @Override
        protected List<String> ddl() {
            return Arrays.asList("CREATE TABLE T (k INT,a INT,b VARCHAR,c VARCHAR,d VARCHAR,PRIMARY KEY (k) NOT ENFORCED) WITH ('merge-engine'='aggregation', 'changelog-producer' = 'full-compaction','fields.b.aggregate-function'='first_value','fields.c.aggregate-function'='first_non_null_value','fields.d.aggregate-function'='first_not_null_value','sequence.field'='a');", "CREATE TABLE T2 (k INT,v STRING,PRIMARY KEY (k) NOT ENFORCED)WITH ('merge-engine' = 'aggregation','fields.v.aggregate-function' = 'first_value','fields.v.ignore-retract' = 'true');");
        }

        @Test
        public void tesInMemoryMerge() {
            this.batchSql("INSERT INTO T VALUES (1, 0, CAST(NULL AS VARCHAR), CAST(NULL AS VARCHAR), CAST(NULL AS VARCHAR)),(1, 1, '1', '1', '1'), (2, 2, '2', '2', '2'),(2, 3, '22', '22', '22')", new Object[0]);
            List<Row> result = this.batchSql("SELECT * FROM T", new Object[0]);
            Assertions.assertThat(result).containsExactlyInAnyOrder((Object[])new Row[]{Row.of((Object[])new Object[]{1, 1, null, "1", "1"}), Row.of((Object[])new Object[]{2, 3, "2", "2", "2"})});
        }

        @Test
        public void tesUnOrderInput() {
            this.batchSql("INSERT INTO T VALUES (1, 0, CAST(NULL AS VARCHAR), CAST(NULL AS VARCHAR), CAST(NULL AS VARCHAR)),(1, 1, '1', '1', '1'), (2, 3, '2', '2', '2'),(2, 2, '22', '22', '22')", new Object[0]);
            List<Row> result = this.batchSql("SELECT * FROM T", new Object[0]);
            Assertions.assertThat(result).containsExactlyInAnyOrder((Object[])new Row[]{Row.of((Object[])new Object[]{1, 1, null, "1", "1"}), Row.of((Object[])new Object[]{2, 3, "22", "22", "22"})});
            this.batchSql("INSERT INTO T VALUES (2, 1, '1', '1', '1')", new Object[0]);
            result = this.batchSql("SELECT * FROM T", new Object[0]);
            Assertions.assertThat(result).containsExactlyInAnyOrder((Object[])new Row[]{Row.of((Object[])new Object[]{1, 1, null, "1", "1"}), Row.of((Object[])new Object[]{2, 3, "1", "1", "1"})});
        }

        @Test
        public void testMergeRead() {
            this.batchSql("INSERT INTO T VALUES (1, 1, CAST(NULL AS VARCHAR), CAST(NULL AS VARCHAR), CAST(NULL AS VARCHAR))", new Object[0]);
            this.batchSql("INSERT INTO T VALUES (1, 2, '1', '1', '1')", new Object[0]);
            this.batchSql("INSERT INTO T VALUES (2, 1, '2', '2', '2')", new Object[0]);
            this.batchSql("INSERT INTO T VALUES (2, 2, '22', '22', '22')", new Object[0]);
            List<Row> result = this.batchSql("SELECT * FROM T", new Object[0]);
            Assertions.assertThat(result).containsExactlyInAnyOrder((Object[])new Row[]{Row.of((Object[])new Object[]{1, 2, null, "1", "1"}), Row.of((Object[])new Object[]{2, 2, "2", "2", "2"})});
        }

        @Test
        public void testAggregatorResetWhenIgnoringRetract() {
            int numRows = 100;
            this.batchSql("INSERT INTO T2 VALUES " + IntStream.range(0, numRows).mapToObj(i -> String.format("(%d, '%d')", i, i)).collect(Collectors.joining(", ")), new Object[0]);
            this.batchSql("INSERT INTO T2 VALUES " + IntStream.range(numRows / 2, numRows).mapToObj(i -> String.format("(%d, '%d')", i, i + numRows)).collect(Collectors.joining(", ")), new Object[0]);
            List<Row> result = this.batchSql("SELECT * FROM T2", new Object[0]);
            Assertions.assertThat(result).containsExactlyInAnyOrder((Object[])IntStream.range(0, numRows).mapToObj(i -> Row.of((Object[])new Object[]{i, String.valueOf(i)})).toArray(Row[]::new));
        }
    }

    public static class SumRetractionAggregation
    extends CatalogITCaseBase {
        @Override
        protected List<String> ddl() {
            return Collections.singletonList("CREATE TABLE T (k INT,b Decimal(12, 2),PRIMARY KEY (k) NOT ENFORCED) WITH ('merge-engine'='aggregation', 'changelog-producer' = 'full-compaction','fields.b.aggregate-function'='sum');");
        }

        @Test
        public void testRetraction() throws Exception {
            this.sql("CREATE TABLE INPUT (k INT,b INT,PRIMARY KEY (k) NOT ENFORCED);", new Object[0]);
            CloseableIterator<Row> insert = this.streamSqlIter("INSERT INTO T SELECT k, SUM(b) FROM INPUT GROUP BY k;", new Object[0]);
            BlockingIterator<Row, Row> select = this.streamSqlBlockIter("SELECT * FROM T", new Object[0]);
            this.sql("INSERT INTO INPUT VALUES (1, 1), (2, 2)", new Object[0]);
            Assertions.assertThat((List)select.collect(2)).containsExactlyInAnyOrder((Object[])new Row[]{Row.of((Object[])new Object[]{1, BigDecimal.valueOf(100L, 2)}), Row.of((Object[])new Object[]{2, BigDecimal.valueOf(200L, 2)})});
            this.sql("INSERT INTO INPUT VALUES (1, 3), (2, 4)", new Object[0]);
            Assertions.assertThat((List)select.collect(4)).containsExactlyInAnyOrder((Object[])new Row[]{Row.ofKind((RowKind)RowKind.UPDATE_BEFORE, (Object[])new Object[]{1, BigDecimal.valueOf(100L, 2)}), Row.ofKind((RowKind)RowKind.UPDATE_AFTER, (Object[])new Object[]{1, BigDecimal.valueOf(300L, 2)}), Row.ofKind((RowKind)RowKind.UPDATE_BEFORE, (Object[])new Object[]{2, BigDecimal.valueOf(200L, 2)}), Row.ofKind((RowKind)RowKind.UPDATE_AFTER, (Object[])new Object[]{2, BigDecimal.valueOf(400L, 2)})});
            select.close();
            insert.close();
        }
    }

    public static class ProductAggregation
    extends CatalogITCaseBase {
        @Override
        protected List<String> ddl() {
            return Collections.singletonList("CREATE TABLE IF NOT EXISTS T1 (j INT, k INT, a INT, b Decimal(4,2), c TINYINT,d SMALLINT,e BIGINT,f FLOAT,h DOUBLE,PRIMARY KEY (j,k) NOT ENFORCED) WITH ('merge-engine'='aggregation', 'fields.a.aggregate-function'='product', 'fields.b.aggregate-function'='product', 'fields.c.aggregate-function'='product', 'fields.d.aggregate-function'='product', 'fields.e.aggregate-function'='product', 'fields.f.aggregate-function'='product','fields.h.aggregate-function'='product');");
        }

        @Test
        public void testMergeInMemory() {
            this.batchSql("ALTER TABLE T1 MODIFY b DECIMAL(5, 3)", new Object[0]);
            this.batchSql("INSERT INTO T1 VALUES (1, 2, CAST(NULL AS INT), 1.01, CAST(1 AS TINYINT), CAST(-1 AS SMALLINT), CAST(1000 AS BIGINT), 1.11, CAST(1.11 AS DOUBLE)),(1, 2, 2, 1.10, CAST(2 AS TINYINT), CAST(2 AS SMALLINT), CAST(100000 AS BIGINT), -1.11, CAST(1.11 AS DOUBLE)), (1, 2, 3, 10.00, CAST(1 AS TINYINT), CAST(1 AS SMALLINT), CAST(10000000 AS BIGINT), 0, CAST(-1.11 AS DOUBLE))", new Object[0]);
            List<Row> result = this.batchSql("SELECT * FROM T1", new Object[0]);
            Assertions.assertThat(result).containsExactlyInAnyOrder((Object[])new Row[]{Row.of((Object[])new Object[]{1, 2, 6, new BigDecimal("11.110"), (byte)2, (short)-2, 1000000000000000L, Float.valueOf(-0.0f), -1.3676310000000003})});
            Assertions.assertThat(this.batchSql("SELECT f,e FROM T1", new Object[0])).containsExactlyInAnyOrder((Object[])new Row[]{Row.of((Object[])new Object[]{Float.valueOf(-0.0f), 1000000000000000L})});
        }

        @Test
        public void testMergeRead() {
            this.batchSql("INSERT INTO T1 VALUES (1, 2, 1, 1.01, CAST(1 AS TINYINT), CAST(-1 AS SMALLINT), CAST(1000 AS BIGINT), 1.11, CAST(1.11 AS DOUBLE))", new Object[0]);
            this.batchSql("INSERT INTO T1 VALUES (1, 2, 2, 1.10, CAST(2 AS TINYINT), CAST(2 AS SMALLINT), CAST(100000 AS BIGINT), CAST(NULL AS FLOAT), CAST(1.11 AS DOUBLE))", new Object[0]);
            this.batchSql("INSERT INTO T1 VALUES (1, 2, 3, 10.00, CAST(1 AS TINYINT), CAST(1 AS SMALLINT), CAST(10000000 AS BIGINT), -1.11, CAST(-1.11 AS DOUBLE))", new Object[0]);
            List<Row> result = this.batchSql("SELECT * FROM T1", new Object[0]);
            Assertions.assertThat(result).containsExactlyInAnyOrder((Object[])new Row[]{Row.of((Object[])new Object[]{1, 2, 6, new BigDecimal("11.10"), (byte)2, (short)-2, 1000000000000000L, Float.valueOf(-1.2321f), -1.3676310000000003})});
        }

        @Test
        public void testMergeCompaction() {
            this.batchSql("ALTER TABLE T1 SET ('commit.force-compact'='true')", new Object[0]);
            this.batchSql("INSERT INTO T1 VALUES (1, 2, 1, 1.01, CAST(1 AS TINYINT), CAST(-1 AS SMALLINT), CAST(1000 AS BIGINT), 1.11, CAST(1.11 AS DOUBLE))", new Object[0]);
            this.batchSql("INSERT INTO T1 VALUES (1, 2, 2, 1.10, CAST(2 AS TINYINT), CAST(2 AS SMALLINT), CAST(100000 AS BIGINT), CAST(NULL AS FLOAT), CAST(1.11 AS DOUBLE))", new Object[0]);
            this.batchSql("INSERT INTO T1 VALUES (1, 2, 3, 10.00, CAST(1 AS TINYINT), CAST(1 AS SMALLINT), CAST(10000000 AS BIGINT), -1.11, CAST(-1.11 AS DOUBLE))", new Object[0]);
            this.batchSql("INSERT INTO T1 VALUES (1, 3, 2, 1.01, CAST(1 AS TINYINT), CAST(-1 AS SMALLINT), CAST(1000 AS BIGINT), 1.11, CAST(1.11 AS DOUBLE))", new Object[0]);
            this.batchSql("INSERT INTO T1 VALUES (1, 3, 2, 1.10, CAST(2 AS TINYINT), CAST(2 AS SMALLINT), CAST(100000 AS BIGINT), CAST(NULL AS FLOAT), CAST(1.11 AS DOUBLE))", new Object[0]);
            this.batchSql("INSERT INTO T1 VALUES (1, 3, 3, 10.00, CAST(1 AS TINYINT), CAST(1 AS SMALLINT), CAST(10000000 AS BIGINT), -1.11, CAST(-1.11 AS DOUBLE))", new Object[0]);
            List<Row> result = this.batchSql("SELECT * FROM T1", new Object[0]);
            Assertions.assertThat(result).containsExactlyInAnyOrder((Object[])new Row[]{Row.of((Object[])new Object[]{1, 2, 6, new BigDecimal("11.10"), (byte)2, (short)-2, 1000000000000000L, Float.valueOf(-1.2321f), -1.3676310000000003}), Row.of((Object[])new Object[]{1, 3, 12, new BigDecimal("11.10"), (byte)2, (short)-2, 1000000000000000L, Float.valueOf(-1.2321f), -1.3676310000000003})});
        }

        @Test
        public void testStreamingRead() {
            Assertions.assertThatThrownBy(() -> this.sEnv.from("T1").execute().print(), (String)"Pre-aggregate continuous reading is not supported", (Object[])new Object[0]);
        }
    }

    public static class SumAggregation
    extends CatalogITCaseBase {
        @Override
        protected List<String> ddl() {
            return Collections.singletonList("CREATE TABLE IF NOT EXISTS T1 (j INT, k INT, a INT, b Decimal(4,2), c TINYINT,d SMALLINT,e BIGINT,f FLOAT,h DOUBLE,PRIMARY KEY (j,k) NOT ENFORCED) WITH ('merge-engine'='aggregation', 'fields.a.aggregate-function'='sum', 'fields.b.aggregate-function'='sum', 'fields.c.aggregate-function'='sum', 'fields.d.aggregate-function'='sum', 'fields.e.aggregate-function'='sum', 'fields.f.aggregate-function'='sum','fields.h.aggregate-function'='sum');");
        }

        @Test
        public void testMergeInMemory() {
            this.batchSql("INSERT INTO T1 VALUES (1, 2, CAST(NULL AS INT), 1.01, CAST(1 AS TINYINT), CAST(-1 AS SMALLINT), CAST(1000 AS BIGINT), 1.11, CAST(1.11 AS DOUBLE)),(1, 2, 2, 1.10, CAST(2 AS TINYINT), CAST(2 AS SMALLINT), CAST(100000 AS BIGINT), -1.11, CAST(1.11 AS DOUBLE)), (1, 2, 3, 10.00, CAST(1 AS TINYINT), CAST(1 AS SMALLINT), CAST(10000000 AS BIGINT), 0, CAST(-1.11 AS DOUBLE))", new Object[0]);
            Assertions.assertThat(this.batchSql("SELECT * FROM T1", new Object[0])).containsExactlyInAnyOrder((Object[])new Row[]{Row.of((Object[])new Object[]{1, 2, 5, new BigDecimal("12.11"), (byte)4, (short)2, 10101000L, Float.valueOf(0.0f), 1.11})});
            Assertions.assertThat(this.batchSql("SELECT f,e FROM T1", new Object[0])).containsExactlyInAnyOrder((Object[])new Row[]{Row.of((Object[])new Object[]{Float.valueOf(0.0f), 10101000L})});
        }

        @Test
        public void testMergeRead() {
            this.batchSql("INSERT INTO T1 VALUES (1, 2, 1, 1.01, CAST(1 AS TINYINT), CAST(-1 AS SMALLINT), CAST(1000 AS BIGINT), 1.11, CAST(1.11 AS DOUBLE))", new Object[0]);
            this.batchSql("INSERT INTO T1 VALUES (1, 2, 2, 1.10, CAST(2 AS TINYINT), CAST(2 AS SMALLINT), CAST(100000 AS BIGINT), CAST(NULL AS FLOAT), CAST(1.11 AS DOUBLE))", new Object[0]);
            this.batchSql("INSERT INTO T1 VALUES (1, 2, 3, 10.00, CAST(1 AS TINYINT), CAST(1 AS SMALLINT), CAST(10000000 AS BIGINT), -1.11, CAST(-1.11 AS DOUBLE))", new Object[0]);
            List<Row> result = this.batchSql("SELECT * FROM T1", new Object[0]);
            Assertions.assertThat(result).containsExactlyInAnyOrder((Object[])new Row[]{Row.of((Object[])new Object[]{1, 2, 6, new BigDecimal("12.11"), (byte)4, (short)2, 10101000L, Float.valueOf(0.0f), 1.11})});
        }

        @Test
        public void testMergeCompaction() {
            this.batchSql("ALTER TABLE T1 SET ('commit.force-compact'='true')", new Object[0]);
            this.batchSql("INSERT INTO T1 VALUES (1, 2, 1, 1.01, CAST(1 AS TINYINT), CAST(-1 AS SMALLINT), CAST(1000 AS BIGINT), 1.11, CAST(1.11 AS DOUBLE))", new Object[0]);
            this.batchSql("INSERT INTO T1 VALUES (1, 2, 2, 1.10, CAST(2 AS TINYINT), CAST(2 AS SMALLINT), CAST(100000 AS BIGINT), CAST(NULL AS FLOAT), CAST(1.11 AS DOUBLE))", new Object[0]);
            this.batchSql("INSERT INTO T1 VALUES (1, 2, 3, 10.00, CAST(1 AS TINYINT), CAST(1 AS SMALLINT), CAST(10000000 AS BIGINT), -1.11, CAST(-1.11 AS DOUBLE))", new Object[0]);
            this.batchSql("INSERT INTO T1 VALUES (1, 3, 2, 1.01, CAST(1 AS TINYINT), CAST(-1 AS SMALLINT), CAST(1000 AS BIGINT), 1.11, CAST(1.11 AS DOUBLE))", new Object[0]);
            this.batchSql("INSERT INTO T1 VALUES (1, 3, 2, 1.10, CAST(2 AS TINYINT), CAST(2 AS SMALLINT), CAST(100000 AS BIGINT), CAST(NULL AS FLOAT), CAST(1.11 AS DOUBLE))", new Object[0]);
            this.batchSql("INSERT INTO T1 VALUES (1, 3, 3, 10.00, CAST(1 AS TINYINT), CAST(1 AS SMALLINT), CAST(10000000 AS BIGINT), -1.11, CAST(-1.11 AS DOUBLE))", new Object[0]);
            Assertions.assertThat(this.batchSql("SELECT * FROM T1", new Object[0])).containsExactlyInAnyOrder((Object[])new Row[]{Row.of((Object[])new Object[]{1, 2, 6, new BigDecimal("12.11"), (byte)4, (short)2, 10101000L, Float.valueOf(0.0f), 1.11}), Row.of((Object[])new Object[]{1, 3, 7, new BigDecimal("12.11"), (byte)4, (short)2, 10101000L, Float.valueOf(0.0f), 1.11})});
        }

        @Test
        public void testStreamingRead() {
            Assertions.assertThatThrownBy(() -> this.sEnv.from("T1").execute().print(), (String)"Pre-aggregate continuous reading is not supported", (Object[])new Object[0]);
        }
    }

    public static class MaxAggregation
    extends CatalogITCaseBase {
        @Override
        protected List<String> ddl() {
            return Collections.singletonList("CREATE TABLE IF NOT EXISTS T2 (j INT, k INT, a INT, b Decimal(4,2), c TINYINT,d SMALLINT,e BIGINT,f FLOAT,h DOUBLE,i DATE,l TIMESTAMP,m CHAR,n VARCHAR,PRIMARY KEY (j,k) NOT ENFORCED) WITH ('merge-engine'='aggregation', 'fields.a.aggregate-function'='max', 'fields.b.aggregate-function'='max', 'fields.c.aggregate-function'='max', 'fields.d.aggregate-function'='max', 'fields.e.aggregate-function'='max', 'fields.f.aggregate-function'='max','fields.h.aggregate-function'='max','fields.i.aggregate-function'='max','fields.l.aggregate-function'='max','fields.m.aggregate-function'='max','fields.n.aggregate-function'='max');");
        }

        @Test
        public void testMergeInMemory() {
            this.batchSql("INSERT INTO T2 VALUES (1, 2, CAST(NULL AS INT), 1.01, CAST(1 AS TINYINT), CAST(-1 AS SMALLINT), CAST(1000 AS BIGINT), 1.11, CAST(1.11 AS DOUBLE), CAST('2020-01-01' AS DATE), CAST('2021-01-01 01:01:01' AS TIMESTAMP), 'a', 'aaa'),(1, 2, 2, 1.10, CAST(2 AS TINYINT), CAST(2 AS SMALLINT), CAST(100000 AS BIGINT), -1.11, CAST(1.21 AS DOUBLE), CAST('2020-01-02' AS DATE), CAST('2022-01-01 01:01:01' AS TIMESTAMP), 'b', 'bbb'), (1, 2, 3, 10.00, CAST(1 AS TINYINT), CAST(1 AS SMALLINT), CAST(10000000 AS BIGINT), 0, CAST(-1.11 AS DOUBLE), CAST('2022-01-02' AS DATE), CAST('2022-01-01 02:00:00' AS TIMESTAMP), 'c', 'ccc')", new Object[0]);
            List<Row> result = this.batchSql("SELECT * FROM T2", new Object[0]);
            Assertions.assertThat(result).containsExactlyInAnyOrder((Object[])new Row[]{Row.of((Object[])new Object[]{1, 2, 3, new BigDecimal("10.00"), (byte)2, (short)2, 10000000L, Float.valueOf(1.11f), 1.21, LocalDate.of(2022, 1, 2), LocalDateTime.of(2022, 1, 1, 2, 0, 0), "c", "ccc"})});
        }

        @Test
        public void testMergeRead() {
            this.batchSql("INSERT INTO T2 VALUES (1, 2, CAST(NULL AS INT), 1.01, CAST(1 AS TINYINT), CAST(-1 AS SMALLINT), CAST(1000 AS BIGINT), 1.11, CAST(1.11 AS DOUBLE), CAST('2020-01-01' AS DATE), CAST('2021-01-01 01:01:01' AS TIMESTAMP), 'a', 'aaa')", new Object[0]);
            this.batchSql("INSERT INTO T2 VALUES (1, 2, 2, 1.10, CAST(2 AS TINYINT), CAST(2 AS SMALLINT), CAST(100000 AS BIGINT), -1.11, CAST(1.21 AS DOUBLE), CAST('2020-01-02' AS DATE), CAST('2022-01-01 01:01:01' AS TIMESTAMP), 'b', 'bbb')", new Object[0]);
            this.batchSql("INSERT INTO T2 VALUES (1, 2, 3, 10.00, CAST(1 AS TINYINT), CAST(1 AS SMALLINT), CAST(10000000 AS BIGINT), 0, CAST(-1.11 AS DOUBLE), CAST('2022-01-02' AS DATE), CAST('2022-01-01 02:00:00' AS TIMESTAMP), 'c', 'ccc')", new Object[0]);
            List<Row> result = this.batchSql("SELECT * FROM T2", new Object[0]);
            Assertions.assertThat(result).containsExactlyInAnyOrder((Object[])new Row[]{Row.of((Object[])new Object[]{1, 2, 3, new BigDecimal("10.00"), (byte)2, (short)2, 10000000L, Float.valueOf(1.11f), 1.21, LocalDate.of(2022, 1, 2), LocalDateTime.of(2022, 1, 1, 2, 0, 0), "c", "ccc"})});
        }

        @Test
        public void testMergeCompaction() {
            this.batchSql("ALTER TABLE T2 SET ('commit.force-compact'='true')", new Object[0]);
            this.batchSql("INSERT INTO T2 VALUES (1, 2, CAST(NULL AS INT), 1.01, CAST(1 AS TINYINT), CAST(-1 AS SMALLINT), CAST(1000 AS BIGINT), 1.11, CAST(1.11 AS DOUBLE), CAST('2020-01-01' AS DATE), CAST('2021-01-01 01:01:01' AS TIMESTAMP), 'a', 'aaa')", new Object[0]);
            this.batchSql("INSERT INTO T2 VALUES (1, 2, 2, 1.10, CAST(2 AS TINYINT), CAST(2 AS SMALLINT), CAST(100000 AS BIGINT), -1.11, CAST(1.21 AS DOUBLE), CAST('2020-01-02' AS DATE), CAST('2022-01-01 01:01:01' AS TIMESTAMP), 'c', 'ccc')", new Object[0]);
            this.batchSql("INSERT INTO T2 VALUES (1, 2, 3, 10.00, CAST(1 AS TINYINT), CAST(1 AS SMALLINT), CAST(10000000 AS BIGINT), 0, CAST(-1.11 AS DOUBLE), CAST('2022-01-02' AS DATE), CAST('2022-01-01 02:00:00' AS TIMESTAMP), 'b', 'bbb')", new Object[0]);
            this.batchSql("INSERT INTO T2 VALUES (1, 3, CAST(NULL AS INT), 1.01, CAST(1 AS TINYINT), CAST(-1 AS SMALLINT), CAST(1000 AS BIGINT), 1.11, CAST(1.11 AS DOUBLE), CAST('2020-01-01' AS DATE), CAST('2021-01-01 01:01:01' AS TIMESTAMP), 'a', 'aaa')", new Object[0]);
            this.batchSql("INSERT INTO T2 VALUES (1, 3, 6, 1.10, CAST(2 AS TINYINT), CAST(2 AS SMALLINT), CAST(100000 AS BIGINT), -1.11, CAST(1.21 AS DOUBLE), CAST('2020-01-02' AS DATE), CAST('2022-01-01 01:01:01' AS TIMESTAMP), 'c', 'ccc')", new Object[0]);
            this.batchSql("INSERT INTO T2 VALUES (1, 3, 3, 10.00, CAST(1 AS TINYINT), CAST(1 AS SMALLINT), CAST(10000000 AS BIGINT), 0, CAST(-1.11 AS DOUBLE), CAST('2022-01-02' AS DATE), CAST('2022-01-01 02:00:00' AS TIMESTAMP), 'b', 'bbb')", new Object[0]);
            Assertions.assertThat(this.batchSql("SELECT * FROM T2", new Object[0])).containsExactlyInAnyOrder((Object[])new Row[]{Row.of((Object[])new Object[]{1, 2, 3, new BigDecimal("10.00"), (byte)2, (short)2, 10000000L, Float.valueOf(1.11f), 1.21, LocalDate.of(2022, 1, 2), LocalDateTime.of(2022, 1, 1, 2, 0, 0), "c", "ccc"}), Row.of((Object[])new Object[]{1, 3, 6, new BigDecimal("10.00"), (byte)2, (short)2, 10000000L, Float.valueOf(1.11f), 1.21, LocalDate.of(2022, 1, 2), LocalDateTime.of(2022, 1, 1, 2, 0, 0), "c", "ccc"})});
        }

        @Test
        public void testStreamingRead() {
            Assertions.assertThatThrownBy(() -> this.sEnv.from("T2").execute().print(), (String)"Pre-aggregate continuous reading is not supported", (Object[])new Object[0]);
        }
    }

    public static class MinAggregation
    extends CatalogITCaseBase {
        @Override
        protected List<String> ddl() {
            return Collections.singletonList("CREATE TABLE IF NOT EXISTS T3 (j INT, k INT, a INT, b Decimal(4,2), c TINYINT,d SMALLINT,e BIGINT,f FLOAT,h DOUBLE,i DATE,l TIMESTAMP,m CHAR(1),n VARCHAR,PRIMARY KEY (j,k) NOT ENFORCED) WITH ('merge-engine'='aggregation', 'fields.a.aggregate-function'='min', 'fields.b.aggregate-function'='min', 'fields.c.aggregate-function'='min', 'fields.d.aggregate-function'='min', 'fields.e.aggregate-function'='min', 'fields.f.aggregate-function'='min','fields.h.aggregate-function'='min','fields.i.aggregate-function'='min','fields.l.aggregate-function'='min','fields.m.aggregate-function'='min','fields.n.aggregate-function'='min');");
        }

        @Test
        public void testMergeInMemory() {
            this.batchSql("INSERT INTO T3 VALUES (1, 2, CAST(NULL AS INT), 1.01, CAST(-1 AS TINYINT), CAST(-1 AS SMALLINT), CAST(1000 AS BIGINT), 1.11, CAST(1.11 AS DOUBLE), CAST('2020-01-01' AS DATE), CAST('2021-01-01 01:01:01' AS TIMESTAMP), 'a', 'aaa'),(1, 2, 2, 1.10, CAST(2 AS TINYINT), CAST(2 AS SMALLINT), CAST(100000 AS BIGINT), -1.11, CAST(1.21 AS DOUBLE), CAST('2020-01-02' AS DATE), CAST('2022-01-01 01:01:01' AS TIMESTAMP), 'b', 'bbb'), (1, 2, 3, 10.00, CAST(1 AS TINYINT), CAST(1 AS SMALLINT), CAST(10000000 AS BIGINT), 0, CAST(-1.11 AS DOUBLE), CAST('2022-01-02' AS DATE), CAST('2022-01-01 02:00:00' AS TIMESTAMP), 'c', 'ccc')", new Object[0]);
            List<Row> result = this.batchSql("SELECT * FROM T3", new Object[0]);
            Assertions.assertThat(result).containsExactlyInAnyOrder((Object[])new Row[]{Row.of((Object[])new Object[]{1, 2, 2, new BigDecimal("1.01"), (byte)-1, (short)-1, 1000L, Float.valueOf(-1.11f), -1.11, LocalDate.of(2020, 1, 1), LocalDateTime.of(2021, 1, 1, 1, 1, 1), "a", "aaa"})});
        }

        @Test
        public void testMergeRead() {
            this.batchSql("INSERT INTO T3 VALUES (1, 2, CAST(NULL AS INT), 1.01, CAST(1 AS TINYINT), CAST(-1 AS SMALLINT), CAST(1000 AS BIGINT), 1.11, CAST(1.11 AS DOUBLE), CAST('2020-01-01' AS DATE), CAST('2021-01-01 01:01:01' AS TIMESTAMP), 'a', 'aaa')", new Object[0]);
            this.batchSql("INSERT INTO T3 VALUES (1, 2, 2, 1.10, CAST(2 AS TINYINT), CAST(2 AS SMALLINT), CAST(100000 AS BIGINT), -1.11, CAST(1.21 AS DOUBLE), CAST('2020-01-02' AS DATE), CAST('2022-01-01 01:01:01' AS TIMESTAMP), 'b', 'bbb')", new Object[0]);
            this.batchSql("INSERT INTO T3 VALUES (1, 2, 3, 10.00, CAST(-1 AS TINYINT), CAST(1 AS SMALLINT), CAST(10000000 AS BIGINT), 0, CAST(-1.11 AS DOUBLE), CAST('2022-01-02' AS DATE), CAST('2022-01-01 02:00:00' AS TIMESTAMP), 'c', 'ccc')", new Object[0]);
            List<Row> result = this.batchSql("SELECT * FROM T3", new Object[0]);
            Assertions.assertThat(result).containsExactlyInAnyOrder((Object[])new Row[]{Row.of((Object[])new Object[]{1, 2, 2, new BigDecimal("1.01"), (byte)-1, (short)-1, 1000L, Float.valueOf(-1.11f), -1.11, LocalDate.of(2020, 1, 1), LocalDateTime.of(2021, 1, 1, 1, 1, 1), "a", "aaa"})});
        }

        @Test
        public void testMergeCompaction() {
            this.batchSql("ALTER TABLE T3 SET ('commit.force-compact'='true')", new Object[0]);
            this.batchSql("INSERT INTO T3 VALUES (1, 2, CAST(NULL AS INT), 1.01, CAST(1 AS TINYINT), CAST(-1 AS SMALLINT), CAST(1000 AS BIGINT), 1.11, CAST(1.11 AS DOUBLE), CAST('2020-01-01' AS DATE), CAST('2021-01-01 01:01:01' AS TIMESTAMP), 'a', 'aaa')", new Object[0]);
            this.batchSql("INSERT INTO T3 VALUES (1, 2, 2, 1.10, CAST(2 AS TINYINT), CAST(2 AS SMALLINT), CAST(100000 AS BIGINT), -1.11, CAST(1.21 AS DOUBLE), CAST('2020-01-02' AS DATE), CAST('2022-01-01 01:01:01' AS TIMESTAMP), 'b', 'bbb')", new Object[0]);
            this.batchSql("INSERT INTO T3 VALUES (1, 2, 3, 10.00, CAST(-1 AS TINYINT), CAST(1 AS SMALLINT), CAST(10000000 AS BIGINT), 0, CAST(-1.11 AS DOUBLE), CAST('2022-01-02' AS DATE), CAST('2022-01-01 02:00:00' AS TIMESTAMP), 'c', 'ccc')", new Object[0]);
            this.batchSql("INSERT INTO T3 VALUES (1, 3, CAST(NULL AS INT), 1.01, CAST(1 AS TINYINT), CAST(-1 AS SMALLINT), CAST(1000 AS BIGINT), 1.11, CAST(1.11 AS DOUBLE), CAST('2020-01-01' AS DATE), CAST('2021-01-01 01:01:01' AS TIMESTAMP), 'a', 'aaa')", new Object[0]);
            this.batchSql("INSERT INTO T3 VALUES (1, 3, 6, 1.10, CAST(2 AS TINYINT), CAST(2 AS SMALLINT), CAST(100000 AS BIGINT), -1.11, CAST(1.21 AS DOUBLE), CAST('2020-01-02' AS DATE), CAST('2022-01-01 01:01:01' AS TIMESTAMP), 'b', 'bbb')", new Object[0]);
            this.batchSql("INSERT INTO T3 VALUES (1, 3, 3, 10.00, CAST(-1 AS TINYINT), CAST(1 AS SMALLINT), CAST(10000000 AS BIGINT), 0, CAST(-1.11 AS DOUBLE), CAST('2022-01-02' AS DATE), CAST('2022-01-01 02:00:00' AS TIMESTAMP), 'c', 'ccc')", new Object[0]);
            Assertions.assertThat(this.batchSql("SELECT * FROM T3", new Object[0])).containsExactlyInAnyOrder((Object[])new Row[]{Row.of((Object[])new Object[]{1, 2, 2, new BigDecimal("1.01"), (byte)-1, (short)-1, 1000L, Float.valueOf(-1.11f), -1.11, LocalDate.of(2020, 1, 1), LocalDateTime.of(2021, 1, 1, 1, 1, 1), "a", "aaa"}), Row.of((Object[])new Object[]{1, 3, 3, new BigDecimal("1.01"), (byte)-1, (short)-1, 1000L, Float.valueOf(-1.11f), -1.11, LocalDate.of(2020, 1, 1), LocalDateTime.of(2021, 1, 1, 1, 1, 1), "a", "aaa"})});
        }

        @Test
        public void testStreamingRead() {
            Assertions.assertThatThrownBy(() -> this.sEnv.from("T3").execute().print(), (String)"Pre-aggregate continuous reading is not supported", (Object[])new Object[0]);
        }
    }

    public static class LastNonNullValueAggregation
    extends CatalogITCaseBase {
        @Override
        protected int defaultParallelism() {
            return 1;
        }

        @Override
        protected List<String> ddl() {
            return Collections.singletonList("CREATE TABLE IF NOT EXISTS T4 (j INT, k INT, a INT, b INT, i DATE,PRIMARY KEY (j,k) NOT ENFORCED) WITH ('merge-engine'='aggregation', 'fields.a.aggregate-function'='last_non_null_value', 'fields.i.aggregate-function'='last_non_null_value');");
        }

        @Test
        public void testMergeInMemory() {
            this.batchSql("CREATE TABLE myTable AS SELECT b, c, d, e, f FROM (VALUES   (1, 1, 2, CAST(NULL AS INT), 4, CAST('2020-01-01' AS DATE)),  (2, 1, 2, 2, CAST(NULL as INT), CAST('2020-01-02' AS DATE)),  (3, 1, 2, 3, 5, CAST(NULL AS DATE))) AS V(a, b, c, d, e, f) ORDER BY a", new Object[0]);
            this.batchSql("INSERT INTO T4 SELECT * FROM myTable", new Object[0]);
            List<Row> result = this.batchSql("SELECT * FROM T4", new Object[0]);
            Assertions.assertThat(result).containsExactlyInAnyOrder((Object[])new Row[]{Row.of((Object[])new Object[]{1, 2, 3, 5, LocalDate.of(2020, 1, 2)})});
        }

        @Test
        public void testMergeRead() {
            this.batchSql("INSERT INTO T4 VALUES (1, 2, CAST(NULL AS INT), 3, CAST('2020-01-01' AS DATE))", new Object[0]);
            this.batchSql("INSERT INTO T4 VALUES (1, 2, 2, CAST(NULL AS INT), CAST('2020-01-02' AS DATE))", new Object[0]);
            this.batchSql("INSERT INTO T4 VALUES (1, 2, 3, 5, CAST(NULL AS DATE))", new Object[0]);
            List<Row> result = this.batchSql("SELECT * FROM T4", new Object[0]);
            Assertions.assertThat(result).containsExactlyInAnyOrder((Object[])new Row[]{Row.of((Object[])new Object[]{1, 2, 3, 5, LocalDate.of(2020, 1, 2)})});
        }

        @Test
        public void testMergeCompaction() {
            this.batchSql("ALTER TABLE T4 SET ('commit.force-compact'='true')", new Object[0]);
            this.batchSql("INSERT INTO T4 VALUES (1, 2, CAST(NULL AS INT), 3, CAST('2020-01-01' AS DATE))", new Object[0]);
            this.batchSql("INSERT INTO T4 VALUES (1, 2, 2, CAST(NULL AS INT), CAST('2020-01-02' AS DATE))", new Object[0]);
            this.batchSql("INSERT INTO T4 VALUES (1, 2, 3, 5, CAST(NULL AS DATE))", new Object[0]);
            this.batchSql("INSERT INTO T4 VALUES (1, 3, 3, 4, CAST('2020-01-01' AS DATE))", new Object[0]);
            this.batchSql("INSERT INTO T4 VALUES (1, 3, 2, 6, CAST(NULL AS DATE))", new Object[0]);
            this.batchSql("INSERT INTO T4 VALUES (1, 3, CAST(NULL AS INT), CAST(NULL AS INT), CAST('2022-01-02' AS DATE))", new Object[0]);
            Assertions.assertThat(this.batchSql("SELECT * FROM T4", new Object[0])).containsExactlyInAnyOrder((Object[])new Row[]{Row.of((Object[])new Object[]{1, 2, 3, 5, LocalDate.of(2020, 1, 2)}), Row.of((Object[])new Object[]{1, 3, 2, 6, LocalDate.of(2022, 1, 2)})});
        }

        @Test
        public void testStreamingRead() {
            Assertions.assertThatThrownBy(() -> this.sEnv.from("T4").execute().print(), (String)"Pre-aggregate continuous reading is not supported", (Object[])new Object[0]);
        }
    }

    public static class LastValueAggregation
    extends CatalogITCaseBase {
        @Override
        protected int defaultParallelism() {
            return 1;
        }

        @Override
        protected List<String> ddl() {
            return Collections.singletonList("CREATE TABLE IF NOT EXISTS T5 (j INT, k INT, a INT, i DATE,PRIMARY KEY (j,k) NOT ENFORCED) WITH ('merge-engine'='aggregation', 'fields.a.aggregate-function'='last_value', 'fields.i.aggregate-function'='last_value');");
        }

        @Test
        public void testMergeInMemory() {
            this.batchSql("CREATE TABLE myTable AS SELECT b, c, d, e FROM (VALUES   (1, 1, 2, CAST(NULL AS INT), CAST('2020-01-01' AS DATE)),  (2, 1, 2, 2, CAST('2020-01-02' AS DATE)),  (3, 1, 2, 3, CAST(NULL AS DATE))) AS V(a, b, c, d, e) ORDER BY a", new Object[0]);
            this.batchSql("INSERT INTO T5 SELECT * FROM myTable", new Object[0]);
            List<Row> result = this.batchSql("SELECT * FROM T5", new Object[0]);
            Assertions.assertThat(result).containsExactlyInAnyOrder((Object[])new Row[]{Row.of((Object[])new Object[]{1, 2, 3, null})});
        }

        @Test
        public void testMergeRead() {
            this.batchSql("INSERT INTO T5 VALUES (1, 2, CAST(NULL AS INT), CAST('2020-01-01' AS DATE))", new Object[0]);
            this.batchSql("INSERT INTO T5 VALUES (1, 2, 2, CAST('2020-01-02' AS DATE))", new Object[0]);
            this.batchSql("INSERT INTO T5 VALUES (1, 2, 3, CAST(NULL AS DATE))", new Object[0]);
            List<Row> result = this.batchSql("SELECT * FROM T5", new Object[0]);
            Assertions.assertThat(result).containsExactlyInAnyOrder((Object[])new Row[]{Row.of((Object[])new Object[]{1, 2, 3, null})});
        }

        @Test
        public void testMergeCompaction() {
            this.batchSql("ALTER TABLE T5 SET ('commit.force-compact'='true')", new Object[0]);
            this.batchSql("INSERT INTO T5 VALUES (1, 2, CAST(NULL AS INT), CAST('2020-01-01' AS DATE))", new Object[0]);
            this.batchSql("INSERT INTO T5 VALUES (1, 2, 2, CAST('2020-01-02' AS DATE))", new Object[0]);
            this.batchSql("INSERT INTO T5 VALUES (1, 2, 3, CAST(NULL AS DATE))", new Object[0]);
            this.batchSql("INSERT INTO T5 VALUES (1, 3, 3, CAST('2020-01-01' AS DATE))", new Object[0]);
            this.batchSql("INSERT INTO T5 VALUES (1, 3, 2, CAST(NULL AS DATE))", new Object[0]);
            this.batchSql("INSERT INTO T5 VALUES (1, 3, CAST(NULL AS INT), CAST('2022-01-02' AS DATE))", new Object[0]);
            Assertions.assertThat(this.batchSql("SELECT * FROM T5", new Object[0])).containsExactlyInAnyOrder((Object[])new Row[]{Row.of((Object[])new Object[]{1, 2, 3, null}), Row.of((Object[])new Object[]{1, 3, null, LocalDate.of(2022, 1, 2)})});
        }

        @Test
        public void testStreamingRead() {
            Assertions.assertThatThrownBy(() -> this.sEnv.from("T5").execute().print(), (String)"Pre-aggregate continuous reading is not supported", (Object[])new Object[0]);
        }
    }

    public static class ListAggAggregation
    extends CatalogITCaseBase {
        @Override
        protected int defaultParallelism() {
            return 1;
        }

        @Override
        protected List<String> ddl() {
            return Collections.singletonList("CREATE TABLE IF NOT EXISTS T6 (j INT, k INT, a STRING, PRIMARY KEY (j,k) NOT ENFORCED) WITH ('merge-engine'='aggregation', 'fields.a.aggregate-function'='listagg');");
        }

        @Test
        public void testMergeInMemory() {
            this.batchSql("CREATE TABLE myTable AS SELECT b, c, d FROM (VALUES   (1, 1, 2, 'first line'),  (2, 1, 2, CAST(NULL AS STRING)),  (3, 1, 2, 'second line')) AS V(a, b, c, d) ORDER BY a", new Object[0]);
            this.batchSql("INSERT INTO T6 SELECT * FROM myTable", new Object[0]);
            List<Row> result = this.batchSql("SELECT * FROM T6", new Object[0]);
            Assertions.assertThat(result).containsExactlyInAnyOrder((Object[])new Row[]{Row.of((Object[])new Object[]{1, 2, "first line,second line"})});
        }

        @Test
        public void testMergeRead() {
            this.batchSql("INSERT INTO T6 VALUES (1, 2, 'first line')", new Object[0]);
            this.batchSql("INSERT INTO T6 VALUES (1, 2, CAST(NULL AS STRING))", new Object[0]);
            this.batchSql("INSERT INTO T6 VALUES (1, 2, 'second line')", new Object[0]);
            List<Row> result = this.batchSql("SELECT * FROM T6", new Object[0]);
            Assertions.assertThat(result).containsExactlyInAnyOrder((Object[])new Row[]{Row.of((Object[])new Object[]{1, 2, "first line,second line"})});
        }

        @Test
        public void testMergeCompaction() {
            this.batchSql("ALTER TABLE T6 SET ('commit.force-compact'='true')", new Object[0]);
            this.batchSql("INSERT INTO T6 VALUES (1, 2, 'first line')", new Object[0]);
            this.batchSql("INSERT INTO T6 VALUES (1, 2, CAST(NULL AS STRING))", new Object[0]);
            this.batchSql("INSERT INTO T6 VALUES (1, 2, 'second line')", new Object[0]);
            this.batchSql("INSERT INTO T6 VALUES (1, 3, CAST(NULL AS STRING))", new Object[0]);
            this.batchSql("INSERT INTO T6 VALUES (1, 3, CAST(NULL AS STRING))", new Object[0]);
            this.batchSql("INSERT INTO T6 VALUES (1, 3, CAST(NULL AS STRING))", new Object[0]);
            Assertions.assertThat(this.batchSql("SELECT * FROM T6", new Object[0])).containsExactlyInAnyOrder((Object[])new Row[]{Row.of((Object[])new Object[]{1, 2, "first line,second line"}), Row.of((Object[])new Object[]{1, 3, null})});
        }

        @Test
        public void testStreamingRead() {
            Assertions.assertThatThrownBy(() -> this.sEnv.from("T6").execute().print(), (String)"Pre-aggregate continuous reading is not supported", (Object[])new Object[0]);
        }
    }

    public static class BoolOrAndAggregation
    extends CatalogITCaseBase {
        @Override
        protected List<String> ddl() {
            return Collections.singletonList("CREATE TABLE IF NOT EXISTS T7 (j INT, k INT, a BOOLEAN, b BOOLEAN,PRIMARY KEY (j,k) NOT ENFORCED) WITH ('merge-engine'='aggregation', 'fields.a.aggregate-function'='bool_or','fields.b.aggregate-function'='bool_and');");
        }

        @Test
        public void testMergeInMemory() {
            this.batchSql("INSERT INTO T7 VALUES (1, 2, CAST('TRUE' AS  BOOLEAN), CAST('TRUE' AS BOOLEAN)),(1, 2, CAST(NULL AS BOOLEAN), CAST(NULL AS BOOLEAN)), (1, 2, CAST('FALSE' AS BOOLEAN), CAST('FALSE' AS BOOLEAN))", new Object[0]);
            List<Row> result = this.batchSql("SELECT * FROM T7", new Object[0]);
            Assertions.assertThat(result).containsExactlyInAnyOrder((Object[])new Row[]{Row.of((Object[])new Object[]{1, 2, true, false})});
        }

        @Test
        public void testMergeRead() {
            this.batchSql("INSERT INTO T7 VALUES (1, 2, CAST('TRUE' AS  BOOLEAN), CAST('TRUE' AS BOOLEAN))", new Object[0]);
            this.batchSql("INSERT INTO T7 VALUES (1, 2, CAST(NULL AS BOOLEAN), CAST(NULL AS BOOLEAN))", new Object[0]);
            this.batchSql("INSERT INTO T7 VALUES (1, 2, CAST('FALSE' AS BOOLEAN), CAST('FALSE' AS BOOLEAN))", new Object[0]);
            List<Row> result = this.batchSql("SELECT * FROM T7", new Object[0]);
            Assertions.assertThat(result).containsExactlyInAnyOrder((Object[])new Row[]{Row.of((Object[])new Object[]{1, 2, true, false})});
        }

        @Test
        public void testMergeCompaction() {
            this.batchSql("ALTER TABLE T7 SET ('commit.force-compact'='true')", new Object[0]);
            this.batchSql("INSERT INTO T7 VALUES (1, 2, CAST('TRUE' AS  BOOLEAN), CAST('TRUE' AS BOOLEAN))", new Object[0]);
            this.batchSql("INSERT INTO T7 VALUES (1, 2, CAST(NULL AS BOOLEAN), CAST(NULL AS BOOLEAN))", new Object[0]);
            this.batchSql("INSERT INTO T7 VALUES (1, 2, CAST('FALSE' AS BOOLEAN), CAST('FALSE' AS BOOLEAN))", new Object[0]);
            this.batchSql("INSERT INTO T7 VALUES (1, 3, CAST('FALSE' AS  BOOLEAN), CAST('TRUE' AS BOOLEAN))", new Object[0]);
            this.batchSql("INSERT INTO T7 VALUES (1, 3, CAST(NULL AS BOOLEAN), CAST(NULL AS BOOLEAN))", new Object[0]);
            this.batchSql("INSERT INTO T7 VALUES (1, 3, CAST('FALSE' AS BOOLEAN), CAST('TRUE' AS BOOLEAN))", new Object[0]);
            Assertions.assertThat(this.batchSql("SELECT * FROM T7", new Object[0])).containsExactlyInAnyOrder((Object[])new Row[]{Row.of((Object[])new Object[]{1, 2, true, false}), Row.of((Object[])new Object[]{1, 3, false, true})});
        }

        @Test
        public void testStreamingRead() {
            Assertions.assertThatThrownBy(() -> this.sEnv.from("T7").execute().print(), (String)"Pre-aggregate continuous reading is not supported", (Object[])new Object[0]);
        }
    }
}

