/*
 * Decompiled with CFR 0.152.
 */
package org.apache.paimon.flink.aggregation;

import java.util.Arrays;
import java.util.Comparator;
import java.util.List;
import java.util.Objects;
import java.util.stream.Collectors;
import org.apache.flink.table.api.config.ExecutionConfigOptions;
import org.apache.flink.types.Row;
import org.apache.paimon.flink.CatalogITCaseBase;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;

public class NestedUpdateAggregationITCase
extends CatalogITCaseBase {
    @Override
    protected List<String> ddl() {
        String ordersTable = "CREATE TABLE orders (\n  order_id INT PRIMARY KEY NOT ENFORCED,\n  user_name STRING,\n  address STRING\n);";
        String subordersTable = "CREATE TABLE sub_orders (\n  order_id INT,\n  daily_id INT,\n  today STRING,\n  product_name STRING,\n  price BIGINT,\n  PRIMARY KEY (order_id, daily_id, today) NOT ENFORCED\n);";
        String wideTable = "CREATE TABLE order_wide (\n  order_id INT PRIMARY KEY NOT ENFORCED,\n  user_name STRING,\n  address STRING,\n  sub_orders ARRAY<ROW<daily_id INT, today STRING, product_name STRING, price BIGINT>>\n) WITH (\n  'merge-engine' = 'aggregation',\n  'fields.sub_orders.aggregate-function' = 'nested_update',\n  'fields.sub_orders.nested-key' = 'daily_id,today',\n  'fields.sub_orders.ignore-retract' = 'true',  'fields.user_name.ignore-retract' = 'true',  'fields.address.ignore-retract' = 'true')";
        String wideAppendTable = "CREATE TABLE order_append_wide (\n  order_id INT PRIMARY KEY NOT ENFORCED,\n  user_name STRING,\n  address STRING,\n  sub_orders ARRAY<ROW<daily_id INT, today STRING, product_name STRING, price BIGINT>>\n) WITH (\n  'merge-engine' = 'aggregation',\n  'fields.sub_orders.aggregate-function' = 'nested_update',\n  'fields.sub_orders.ignore-retract' = 'true',  'fields.user_name.ignore-retract' = 'true',  'fields.address.ignore-retract' = 'true')";
        return Arrays.asList(ordersTable, subordersTable, wideTable, wideAppendTable);
    }

    @Test
    public void testUseCase() {
        this.sql("INSERT INTO orders VALUES (1, 'Wang', 'HangZhou'),(2, 'Zhao', 'ChengDu'),(3, 'Liu', 'NanJing')", new Object[0]);
        this.sql("INSERT INTO sub_orders VALUES (1, 1, '12-20', 'Apple', 8000),(1, 2, '12-20', 'Tesla', 400000),(1, 1, '12-21', 'Sangsung', 5000),(2, 1, '12-20', 'Tea', 40),(2, 2, '12-20', 'Pot', 60),(3, 1, '12-25', 'Bat', 15),(3, 1, '12-26', 'Cup', 30)", new Object[0]);
        this.sql(this.widenSql(), new Object[0]);
        List result = this.sql("SELECT * FROM order_wide", new Object[0]).stream().sorted(Comparator.comparingInt(r -> (Integer)r.getFieldAs(0))).collect(Collectors.toList());
        Assertions.assertThat((boolean)this.checkOneRecord((Row)result.get(0), 1, "Wang", "HangZhou", Row.of((Object[])new Object[]{1, "12-20", "Apple", 8000L}), Row.of((Object[])new Object[]{1, "12-21", "Sangsung", 5000L}), Row.of((Object[])new Object[]{2, "12-20", "Tesla", 400000L}))).isTrue();
        Assertions.assertThat((boolean)this.checkOneRecord((Row)result.get(1), 2, "Zhao", "ChengDu", Row.of((Object[])new Object[]{1, "12-20", "Tea", 40L}), Row.of((Object[])new Object[]{2, "12-20", "Pot", 60L}))).isTrue();
        Assertions.assertThat((boolean)this.checkOneRecord((Row)result.get(2), 3, "Liu", "NanJing", Row.of((Object[])new Object[]{1, "12-25", "Bat", 15L}), Row.of((Object[])new Object[]{1, "12-26", "Cup", 30L}))).isTrue();
        List<Row> unnested = this.sql("SELECT order_id, user_name, address, daily_id, today, product_name, price FROM order_wide, UNNEST(sub_orders) AS so(daily_id, today, product_name, price)", new Object[0]);
        Assertions.assertThat(unnested).containsExactlyInAnyOrder((Object[])new Row[]{Row.of((Object[])new Object[]{1, "Wang", "HangZhou", 1, "12-20", "Apple", 8000L}), Row.of((Object[])new Object[]{1, "Wang", "HangZhou", 2, "12-20", "Tesla", 400000L}), Row.of((Object[])new Object[]{1, "Wang", "HangZhou", 1, "12-21", "Sangsung", 5000L}), Row.of((Object[])new Object[]{2, "Zhao", "ChengDu", 1, "12-20", "Tea", 40L}), Row.of((Object[])new Object[]{2, "Zhao", "ChengDu", 2, "12-20", "Pot", 60L}), Row.of((Object[])new Object[]{3, "Liu", "NanJing", 1, "12-25", "Bat", 15L}), Row.of((Object[])new Object[]{3, "Liu", "NanJing", 1, "12-26", "Cup", 30L})});
    }

    @Test
    public void testUseCaseWithNullValue() {
        this.sql("INSERT INTO order_wide\nSELECT 6, CAST (NULL AS STRING), CAST (NULL AS STRING), ARRAY[cast(null as ROW<daily_id INT, today STRING, product_name STRING, price BIGINT>)]", new Object[0]);
        List result = this.sql("SELECT * FROM order_wide", new Object[0]).stream().sorted(Comparator.comparingInt(r -> (Integer)r.getFieldAs(0))).collect(Collectors.toList());
        Assertions.assertThat((boolean)this.checkOneRecord((Row)result.get(0), 6, null, null, new Row[]{null})).isTrue();
        this.sql("INSERT INTO order_wide\nSELECT 6, 'Sun', CAST (NULL AS STRING), ARRAY[ROW(1, '01-01','Apple', 6999)]", new Object[0]);
        result = this.sql("SELECT * FROM order_wide", new Object[0]).stream().sorted(Comparator.comparingInt(r -> (Integer)r.getFieldAs(0))).collect(Collectors.toList());
        Assertions.assertThat((boolean)this.checkOneRecord((Row)result.get(0), 6, "Sun", null, Row.of((Object[])new Object[]{1, "01-01", "Apple", 6999L}))).isTrue();
    }

    @Test
    public void testUseCaseAppend() {
        this.sql("INSERT INTO orders VALUES (1, 'Wang', 'HangZhou'),(2, 'Zhao', 'ChengDu'),(3, 'Liu', 'NanJing')", new Object[0]);
        this.sql("INSERT INTO sub_orders VALUES (1, 1, '12-20', 'Apple', 8000),(2, 1, '12-20', 'Tesla', 400000),(3, 1, '12-25', 'Bat', 15),(3, 1, '12-26', 'Cup', 30)", new Object[0]);
        this.sql(this.widenAppendSql(), new Object[0]);
        List<Row> unnested = this.sql("SELECT order_id, user_name, address, daily_id, today, product_name, price FROM order_append_wide, UNNEST(sub_orders) AS so(daily_id, today, product_name, price)", new Object[0]);
        Assertions.assertThat(unnested).containsExactlyInAnyOrder((Object[])new Row[]{Row.of((Object[])new Object[]{1, "Wang", "HangZhou", 1, "12-20", "Apple", 8000L}), Row.of((Object[])new Object[]{2, "Zhao", "ChengDu", 1, "12-20", "Tesla", 400000L}), Row.of((Object[])new Object[]{3, "Liu", "NanJing", 1, "12-25", "Bat", 15L}), Row.of((Object[])new Object[]{3, "Liu", "NanJing", 1, "12-26", "Cup", 30L})});
    }

    @Test
    @Timeout(value=60L)
    public void testUpdateWithIgnoreRetract() throws Exception {
        List<Row> result;
        boolean checkResult;
        this.sEnv.getConfig().set(ExecutionConfigOptions.TABLE_EXEC_SINK_UPSERT_MATERIALIZE, (Object)ExecutionConfigOptions.UpsertMaterialize.NONE);
        this.sql("INSERT INTO orders VALUES (1, 'Wang', 'HangZhou')", new Object[0]);
        this.sql("INSERT INTO sub_orders VALUES (1, 1, '12-20', 'Apple', 8000),(1, 2, '12-20', 'Tesla', 400000),(1, 1, '12-21', 'Sangsung', 5000)", new Object[0]);
        this.sEnv.executeSql(this.widenSql());
        do {
            Thread.sleep(500L);
        } while (!(checkResult = !(result = this.sql("SELECT * FROM order_wide", new Object[0])).isEmpty() && this.checkOneRecord(result.get(0), 1, "Wang", "HangZhou", Row.of((Object[])new Object[]{1, "12-20", "Apple", 8000L}), Row.of((Object[])new Object[]{1, "12-21", "Sangsung", 5000L}), Row.of((Object[])new Object[]{2, "12-20", "Tesla", 400000L}))));
        this.sql("INSERT INTO sub_orders VALUES (1, 2, '12-20', 'Benz', 380000)", new Object[0]);
        do {
            Thread.sleep(500L);
        } while (!(checkResult = !(result = this.sql("SELECT * FROM order_wide", new Object[0])).isEmpty() && this.checkOneRecord(result.get(0), 1, "Wang", "HangZhou", Row.of((Object[])new Object[]{1, "12-20", "Apple", 8000L}), Row.of((Object[])new Object[]{1, "12-21", "Sangsung", 5000L}), Row.of((Object[])new Object[]{2, "12-20", "Benz", 380000L}))));
    }

    private String widenSql() {
        return "INSERT INTO order_wide\nSELECT order_id, user_name, address, CAST (NULL AS ARRAY<ROW<daily_id INT, today STRING, product_name STRING, price BIGINT>>) FROM orders\nUNION ALL\nSELECT order_id, CAST (NULL AS STRING), CAST (NULL AS STRING), ARRAY[ROW(daily_id, today, product_name, price)] FROM sub_orders";
    }

    private String widenAppendSql() {
        return "INSERT INTO order_append_wide\nSELECT order_id, user_name, address, CAST (NULL AS ARRAY<ROW<daily_id INT, today STRING, product_name STRING, price BIGINT>>) FROM orders\nUNION ALL\nSELECT order_id, CAST (NULL AS STRING), CAST (NULL AS STRING), ARRAY[ROW(daily_id, today, product_name, price)] FROM sub_orders";
    }

    private boolean checkOneRecord(Row record, int orderId, String userName, String address, Row ... subOrders) {
        if ((Integer)record.getField(0) != orderId) {
            return false;
        }
        if (!Objects.equals(record.getFieldAs(1), userName)) {
            return false;
        }
        if (!Objects.equals(record.getFieldAs(2), address)) {
            return false;
        }
        return this.checkNestedTable((Row[])record.getFieldAs(3), subOrders);
    }

    private boolean checkNestedTable(Row[] nestedTable, Row ... subOrders) {
        if (nestedTable.length != subOrders.length) {
            return false;
        }
        Comparator<Object> comparator = Comparator.comparingInt(r -> (Integer)((Row)r).getFieldAs(0)).thenComparing(r -> (String)((Row)r).getField(1));
        List sortedActual = Arrays.stream(nestedTable).sorted(comparator).collect(Collectors.toList());
        List sortedExpected = Arrays.stream(subOrders).sorted(comparator).collect(Collectors.toList());
        for (int i = 0; i < sortedActual.size(); ++i) {
            if (Objects.equals(sortedActual.get(i), sortedExpected.get(i))) continue;
            return false;
        }
        return true;
    }
}

