/*
 * Decompiled with CFR 0.152.
 */
package org.apache.paimon.flink.aggregation;

import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import org.apache.flink.table.planner.factories.TestValuesTableFactory;
import org.apache.flink.types.Row;
import org.apache.flink.types.RowKind;
import org.apache.paimon.flink.CatalogITCaseBase;
import org.apache.paimon.utils.BlockingIterator;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;

public class CollectAggregationITCase
extends CatalogITCaseBase {
    @Override
    protected int defaultParallelism() {
        return 1;
    }

    @Test
    public void testAggWithDistinct() {
        this.sql("CREATE TABLE test_collect(  id INT PRIMARY KEY NOT ENFORCED,  f0 ARRAY<STRING>) WITH (  'merge-engine' = 'aggregation',  'fields.f0.aggregate-function' = 'collect',  'fields.f0.distinct' = 'true')", new Object[0]);
        this.sql("INSERT INTO test_collect VALUES (1, CAST (NULL AS ARRAY<STRING>)), (2, ARRAY['A', 'B']), (3, ARRAY['car', 'watch'])", new Object[0]);
        List<Row> result = this.queryAndSort("SELECT * FROM test_collect");
        this.checkOneRecord(result.get(0), 1, new String[0]);
        this.checkOneRecord(result.get(1), 2, "A", "B");
        this.checkOneRecord(result.get(2), 3, "car", "watch");
        this.sql("INSERT INTO test_collect VALUES (1, ARRAY['paimon', 'paimon']), (2, ARRAY['A', 'B', 'C']), (3, CAST (NULL AS ARRAY<STRING>))", new Object[0]);
        result = this.queryAndSort("SELECT * FROM test_collect");
        this.checkOneRecord(result.get(0), 1, "paimon");
        this.checkOneRecord(result.get(1), 2, "A", "B", "C");
        this.checkOneRecord(result.get(2), 3, "car", "watch");
    }

    @Test
    public void testAggWithoutDistinct() {
        this.sql("CREATE TABLE test_collect(  id INT PRIMARY KEY NOT ENFORCED,  f0 ARRAY<STRING>) WITH (  'merge-engine' = 'aggregation',  'fields.f0.aggregate-function' = 'collect')", new Object[0]);
        this.sql("INSERT INTO test_collect VALUES (1, CAST (NULL AS ARRAY<STRING>)), (2, ARRAY['A', 'B', 'B']), (3, ARRAY['car', 'watch'])", new Object[0]);
        List<Row> result = this.queryAndSort("SELECT * FROM test_collect");
        this.checkOneRecord(result.get(0), 1, new String[0]);
        this.checkOneRecord(result.get(1), 2, "A", "B", "B");
        this.checkOneRecord(result.get(2), 3, "car", "watch");
        this.sql("INSERT INTO test_collect VALUES (1, ARRAY['paimon', 'paimon']), (2, ARRAY['A', 'B', 'C']), (3, CAST (NULL AS ARRAY<STRING>))", new Object[0]);
        result = this.queryAndSort("SELECT * FROM test_collect");
        this.checkOneRecord(result.get(0), 1, "paimon", "paimon");
        this.checkOneRecord(result.get(1), 2, "A", "A", "B", "B", "B", "C");
        this.checkOneRecord(result.get(2), 3, "car", "watch");
    }

    private static List<Arguments> retractArguments() {
        return Arrays.asList(Arguments.arguments((Object[])new Object[]{"lookup", "aggregation"}), Arguments.arguments((Object[])new Object[]{"lookup", "partial-update"}), Arguments.arguments((Object[])new Object[]{"full-compaction", "aggregation"}), Arguments.arguments((Object[])new Object[]{"full-compaction", "partial-update"}));
    }

    @ParameterizedTest(name="changelog-producer = {0}, merge-engine = {1}")
    @MethodSource(value={"retractArguments"})
    public void testRetract(String changelogProducer, String mergeEngine) throws Exception {
        String sequenceGroup = "";
        if (mergeEngine.equals("partial-update")) {
            sequenceGroup = ", 'fields.f1.sequence-group' = 'f0'";
        }
        this.sql("CREATE TABLE test_collect(  id INT PRIMARY KEY NOT ENFORCED,  f0 ARRAY<STRING>,  f1 INT) WITH (  'changelog-producer' = '%s',  'merge-engine' = '%s',  'fields.f0.aggregate-function' = 'collect'  %s)", changelogProducer, mergeEngine, sequenceGroup);
        BlockingIterator<Row, Row> select = this.streamSqlBlockIter("SELECT * FROM test_collect", new Object[0]);
        String temporaryTableTemplate = "CREATE TEMPORARY TABLE %s (  id INT PRIMARY KEY NOT ENFORCED,  f0 ARRAY<STRING>,  f1 INT) WITH (  'connector' = 'values',  'data-id' = '%s',  'bounded' = 'true',  'changelog-mode' = '%s')";
        this.sql("INSERT INTO test_collect VALUES (1, ARRAY['A', 'B'], 1)", new Object[0]);
        List result = select.collect(1);
        this.checkOneRecord((Row)result.get(0), 1, "A", "B");
        List<Row> inputRecords = Arrays.asList(Row.ofKind((RowKind)RowKind.UPDATE_BEFORE, (Object[])new Object[]{1, new String[]{"A", "B"}, 2}), Row.ofKind((RowKind)RowKind.UPDATE_AFTER, (Object[])new Object[]{1, new String[]{"C", "D"}, 3}));
        this.sEnv.executeSql(String.format(temporaryTableTemplate, "INPUT11", TestValuesTableFactory.registerData(inputRecords), "UB,UA")).await();
        this.sEnv.executeSql("INSERT INTO test_collect SELECT * FROM INPUT11").await();
        result = select.collect(2);
        Assertions.assertThat((Comparable)((Row)result.get(0)).getKind()).isEqualTo((Object)RowKind.UPDATE_BEFORE);
        this.checkOneRecord((Row)result.get(0), 1, "A", "B");
        Assertions.assertThat((Comparable)((Row)result.get(1)).getKind()).isEqualTo((Object)RowKind.UPDATE_AFTER);
        this.checkOneRecord((Row)result.get(1), 1, "A", "B", "C", "D");
        inputRecords = Collections.singletonList(Row.ofKind((RowKind)RowKind.DELETE, (Object[])new Object[]{1, new String[]{"C", "D"}, 4}));
        this.sEnv.executeSql(String.format(temporaryTableTemplate, "INPUT12", TestValuesTableFactory.registerData(inputRecords), "D")).await();
        this.sEnv.executeSql("INSERT INTO test_collect SELECT * FROM INPUT12").await();
        result = select.collect(2);
        Assertions.assertThat((Comparable)((Row)result.get(0)).getKind()).isEqualTo((Object)RowKind.UPDATE_BEFORE);
        this.checkOneRecord((Row)result.get(0), 1, "A", "B", "C", "D");
        Assertions.assertThat((Comparable)((Row)result.get(1)).getKind()).isEqualTo((Object)RowKind.UPDATE_AFTER);
        this.checkOneRecord((Row)result.get(1), 1, "A", "B");
        this.sql("INSERT INTO test_collect VALUES (2, ARRAY['A', 'B'], 5), (3, ARRAY['A', 'B'], 6)", new Object[0]);
        result = select.collect(2);
        this.checkOneRecord((Row)result.get(0), 2, "A", "B");
        this.checkOneRecord((Row)result.get(1), 3, "A", "B");
        inputRecords = Arrays.asList(Row.ofKind((RowKind)RowKind.UPDATE_BEFORE, (Object[])new Object[]{2, new String[]{"A", "B"}, 7}), Row.ofKind((RowKind)RowKind.UPDATE_AFTER, (Object[])new Object[]{2, new String[]{"C", "D"}, 8}));
        this.sEnv.executeSql(String.format(temporaryTableTemplate, "INPUT21", TestValuesTableFactory.registerData(inputRecords), "UB,UA")).await();
        this.sEnv.executeSql("INSERT INTO test_collect SELECT * FROM INPUT21").await();
        result = select.collect(2);
        Assertions.assertThat((Comparable)((Row)result.get(0)).getKind()).isEqualTo((Object)RowKind.UPDATE_BEFORE);
        this.checkOneRecord((Row)result.get(0), 2, "A", "B");
        Assertions.assertThat((Comparable)((Row)result.get(1)).getKind()).isEqualTo((Object)RowKind.UPDATE_AFTER);
        this.checkOneRecord((Row)result.get(1), 2, "A", "B", "C", "D");
        inputRecords = Collections.singletonList(Row.ofKind((RowKind)RowKind.DELETE, (Object[])new Object[]{3, new String[]{"A"}, 9}));
        this.sEnv.executeSql(String.format(temporaryTableTemplate, "INPUT22", TestValuesTableFactory.registerData(inputRecords), "D")).await();
        this.sEnv.executeSql("INSERT INTO test_collect SELECT * FROM INPUT22").await();
        result = select.collect(2);
        Assertions.assertThat((Comparable)((Row)result.get(0)).getKind()).isEqualTo((Object)RowKind.UPDATE_BEFORE);
        this.checkOneRecord((Row)result.get(0), 3, "A", "B");
        Assertions.assertThat((Comparable)((Row)result.get(1)).getKind()).isEqualTo((Object)RowKind.UPDATE_AFTER);
        this.checkOneRecord((Row)result.get(1), 3, "B");
        select.close();
    }

    @Test
    public void testRetractInputNull() throws Exception {
        this.sql("CREATE TABLE test_collect (  id INT PRIMARY KEY NOT ENFORCED,  f0 ARRAY<STRING>,  f1 INT) WITH (  'changelog-producer' = 'lookup',  'merge-engine' = 'partial-update',  'fields.f0.aggregate-function' = 'collect',  'fields.f1.sequence-group' = 'f0')", new Object[0]);
        List<Row> input = Arrays.asList(Row.ofKind((RowKind)RowKind.INSERT, (Object[])new Object[]{1, null, 1}), Row.ofKind((RowKind)RowKind.INSERT, (Object[])new Object[]{1, new String[]{"A"}, 2}), Row.ofKind((RowKind)RowKind.UPDATE_BEFORE, (Object[])new Object[]{1, null, 1}), Row.ofKind((RowKind)RowKind.UPDATE_AFTER, (Object[])new Object[]{1, new String[]{"B"}, 3}));
        this.sEnv.executeSql(String.format("CREATE TEMPORARY TABLE input (  id INT PRIMARY KEY NOT ENFORCED,  f0 ARRAY<STRING>,  f1 INT) WITH (  'connector' = 'values',  'data-id' = '%s',  'bounded' = 'true',  'changelog-mode' = 'UB,UA')", TestValuesTableFactory.registerData(input))).await();
        this.sEnv.executeSql("INSERT INTO test_collect SELECT * FROM input").await();
        Assertions.assertThat(this.sql("SELECT * FROM test_collect", new Object[0])).containsExactly((Object[])new Row[]{Row.of((Object[])new Object[]{1, new String[]{"A", "B"}, 3})});
    }

    private void checkOneRecord(Row row, int id, String ... elements) {
        Assertions.assertThat((Object)row.getField(0)).isEqualTo((Object)id);
        if (elements == null || elements.length == 0) {
            Assertions.assertThat((Object)row.getField(1)).isNull();
        } else {
            Assertions.assertThat((Object[])((String[])row.getField(1))).containsExactlyInAnyOrder((Object[])elements);
        }
    }
}

