/*
 * Decompiled with CFR 0.152.
 */
package org.apache.paimon.flink;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;
import org.apache.flink.table.api.config.ExecutionConfigOptions;
import org.apache.flink.types.Row;
import org.apache.flink.types.RowKind;
import org.apache.flink.util.CloseableIterator;
import org.apache.paimon.flink.CatalogITCaseBase;
import org.apache.paimon.utils.BlockingIterator;
import org.assertj.core.api.Assertions;
import org.assertj.core.api.ListAssert;
import org.awaitility.Awaitility;
import org.junit.jupiter.api.Test;

public class PartialUpdateITCase
extends CatalogITCaseBase {
    @Override
    protected List<String> ddl() {
        return Arrays.asList("CREATE TABLE IF NOT EXISTS T (j INT, k INT, a INT, b INT, c STRING, PRIMARY KEY (j,k) NOT ENFORCED) WITH ('merge-engine'='partial-update');", "CREATE TABLE IF NOT EXISTS dwd_orders (OrderID INT, OrderNumber INT, PersonID INT, LastName STRING, FirstName STRING, Age INT, PRIMARY KEY (OrderID) NOT ENFORCED) WITH ('merge-engine'='partial-update', 'partial-update.ignore-delete'='true');", "CREATE TABLE IF NOT EXISTS ods_orders (OrderID INT, OrderNumber INT, PersonID INT, PRIMARY KEY (OrderID) NOT ENFORCED) WITH ('changelog-producer'='input', 'continuous.discovery-interval'='1s');", "CREATE TABLE IF NOT EXISTS dim_persons (PersonID INT, LastName STRING, FirstName STRING, Age INT, PRIMARY KEY (PersonID) NOT ENFORCED) WITH ('changelog-producer'='input', 'continuous.discovery-interval'='1s');");
    }

    @Test
    public void testMergeInMemory() {
        this.batchSql("INSERT INTO T VALUES (1, 2, 3, CAST(NULL AS INT), '5'), (1, 2, CAST(NULL AS INT), 6, CAST(NULL AS STRING))", new Object[0]);
        List<Row> result = this.batchSql("SELECT * FROM T", new Object[0]);
        Assertions.assertThat(result).containsExactlyInAnyOrder((Object[])new Row[]{Row.of((Object[])new Object[]{1, 2, 3, 6, "5"})});
    }

    @Test
    public void testMergeRead() {
        this.batchSql("INSERT INTO T VALUES (1, 2, 3, CAST(NULL AS INT), CAST(NULL AS STRING))", new Object[0]);
        this.batchSql("INSERT INTO T VALUES (1, 2, 4, 5, CAST(NULL AS STRING))", new Object[0]);
        this.batchSql("INSERT INTO T VALUES (1, 2, 4, CAST(NULL AS INT), '6')", new Object[0]);
        Assertions.assertThat(this.batchSql("SELECT * FROM T", new Object[0])).containsExactlyInAnyOrder((Object[])new Row[]{Row.of((Object[])new Object[]{1, 2, 4, 5, "6"})});
        Assertions.assertThat(this.batchSql("SELECT a FROM T", new Object[0])).containsExactlyInAnyOrder((Object[])new Row[]{Row.of((Object[])new Object[]{4})});
    }

    @Test
    public void testMergeCompaction() {
        this.batchSql("ALTER TABLE T SET ('commit.force-compact'='true')", new Object[0]);
        this.batchSql("INSERT INTO T VALUES (1, 2, 3, CAST(NULL AS INT), CAST(NULL AS STRING))", new Object[0]);
        this.batchSql("INSERT INTO T VALUES (1, 2, 4, 5, CAST(NULL AS STRING))", new Object[0]);
        this.batchSql("INSERT INTO T VALUES (1, 2, 4, CAST(NULL AS INT), '6')", new Object[0]);
        this.batchSql("INSERT INTO T VALUES (1, 3, CAST(NULL AS INT), 1, '1')", new Object[0]);
        this.batchSql("INSERT INTO T VALUES (1, 3, 2, 3, CAST(NULL AS STRING))", new Object[0]);
        this.batchSql("INSERT INTO T VALUES (1, 3, CAST(NULL AS INT), 4, CAST(NULL AS STRING))", new Object[0]);
        Assertions.assertThat(this.batchSql("SELECT * FROM T", new Object[0])).containsExactlyInAnyOrder((Object[])new Row[]{Row.of((Object[])new Object[]{1, 2, 4, 5, "6"}), Row.of((Object[])new Object[]{1, 3, 2, 4, "1"})});
    }

    @Test
    public void testForeignKeyJoin() throws Exception {
        this.sEnv.getConfig().set(ExecutionConfigOptions.TABLE_EXEC_SINK_UPSERT_MATERIALIZE, (Object)ExecutionConfigOptions.UpsertMaterialize.NONE);
        CloseableIterator<Row> iter = this.streamSqlIter("INSERT INTO dwd_orders SELECT OrderID, OrderNumber, PersonID, CAST(NULL AS STRING), CAST(NULL AS STRING), CAST(NULL AS INT) FROM ods_orders UNION ALL SELECT OrderID, CAST(NULL AS INT), dim_persons.PersonID, LastName, FirstName, Age FROM dim_persons JOIN ods_orders ON dim_persons.PersonID = ods_orders.PersonID;", new Object[0]);
        this.batchSql("INSERT INTO ods_orders VALUES (1, 2, 3)", new Object[0]);
        this.batchSql("INSERT INTO dim_persons VALUES (3, 'snow', 'jon', 23)", new Object[0]);
        Awaitility.await().pollInSameThread().atMost(5L, TimeUnit.SECONDS).untilAsserted(() -> {
            ListAssert cfr_ignored_0 = (ListAssert)Assertions.assertThat(this.rowsToList(this.batchSql("SELECT * FROM dwd_orders", new Object[0]))).containsExactly((Object[])new List[]{Arrays.asList(1, 2, 3, "snow", "jon", 23)});
        });
        this.batchSql("INSERT INTO ods_orders VALUES (1, 4, 3)", new Object[0]);
        this.batchSql("INSERT INTO dim_persons VALUES (3, 'snow', 'targaryen', 23)", new Object[0]);
        Awaitility.await().pollInSameThread().atMost(5L, TimeUnit.SECONDS).untilAsserted(() -> {
            ListAssert cfr_ignored_0 = (ListAssert)Assertions.assertThat(this.rowsToList(this.batchSql("SELECT * FROM dwd_orders", new Object[0]))).containsExactly((Object[])new List[]{Arrays.asList(1, 4, 3, "snow", "targaryen", 23)});
        });
        iter.close();
    }

    private List<List<Object>> rowsToList(List<Row> rows) {
        return rows.stream().map(this::toList).collect(Collectors.toList());
    }

    private List<Object> toList(Row row) {
        Assertions.assertThat((Comparable)row.getKind()).isIn(new Object[]{RowKind.INSERT, RowKind.UPDATE_AFTER});
        ArrayList<Object> result = new ArrayList<Object>();
        for (int i = 0; i < row.getArity(); ++i) {
            result.add(row.getField(i));
        }
        return result;
    }

    @Test
    public void testStreamingRead() {
        Assertions.assertThatThrownBy(() -> this.sEnv.from("T").execute().print(), (String)"Partial update continuous reading is not supported", (Object[])new Object[0]);
    }

    @Test
    public void testStreamingReadChangelogInput() throws TimeoutException {
        this.sql("CREATE TABLE INPUT_T (a INT, b INT, c INT, PRIMARY KEY (a) NOT ENFORCED) WITH ('merge-engine'='partial-update', 'changelog-producer'='input');", new Object[0]);
        BlockingIterator iterator = BlockingIterator.of(this.streamSqlIter("SELECT * FROM INPUT_T", new Object[0]));
        this.sql("INSERT INTO INPUT_T VALUES (1, CAST(NULL AS INT), 1)", new Object[0]);
        Assertions.assertThat((List)iterator.collect(1)).containsExactlyInAnyOrder((Object[])new Row[]{Row.of((Object[])new Object[]{1, null, 1})});
        this.sql("INSERT INTO INPUT_T VALUES (1, 1, CAST(NULL AS INT)), (2, 2, 2)", new Object[0]);
        Assertions.assertThat((List)iterator.collect(2)).containsExactlyInAnyOrder((Object[])new Row[]{Row.of((Object[])new Object[]{1, 1, null}), Row.of((Object[])new Object[]{2, 2, 2})});
    }

    @Test
    public void testSequenceGroup() {
        this.sql("CREATE TABLE SG (k INT, a INT, b INT, g_1 INT, c INT, d INT, g_2 INT, PRIMARY KEY (k) NOT ENFORCED) WITH ('merge-engine'='partial-update', 'fields.g_1.sequence-group'='a,b', 'fields.g_2.sequence-group'='c,d');", new Object[0]);
        this.sql("INSERT INTO SG VALUES (1, 1, 1, 1, 1, 1, 1)", new Object[0]);
        this.sql("INSERT INTO SG VALUES (1, 2, 2, 2, 2, 2, CAST(NULL AS INT))", new Object[0]);
        Assertions.assertThat(this.sql("SELECT * FROM SG", new Object[0])).containsExactlyInAnyOrder((Object[])new Row[]{Row.of((Object[])new Object[]{1, 2, 2, 2, 1, 1, 1})});
        Assertions.assertThat(this.sql("SELECT c, d FROM SG", new Object[0])).containsExactlyInAnyOrder((Object[])new Row[]{Row.of((Object[])new Object[]{1, 1})});
        this.sql("INSERT INTO SG VALUES (1, 3, 3, 1, 3, 3, 3)", new Object[0]);
        Assertions.assertThat(this.sql("SELECT * FROM SG", new Object[0])).containsExactlyInAnyOrder((Object[])new Row[]{Row.of((Object[])new Object[]{1, 2, 2, 2, 3, 3, 3})});
        this.sql("INSERT INTO SG VALUES (1, 3, 3, 3, 2, 2, CAST(NULL AS INT))", new Object[0]);
        this.sql("INSERT INTO SG VALUES (1, 4, 4, 4, 2, 2, CAST(NULL AS INT))", new Object[0]);
        this.sql("INSERT INTO SG VALUES (1, 5, 5, 3, 5, CAST(NULL AS INT), 4)", new Object[0]);
        Assertions.assertThat(this.sql("SELECT a, b FROM SG", new Object[0])).containsExactlyInAnyOrder((Object[])new Row[]{Row.of((Object[])new Object[]{4, 4})});
        Assertions.assertThat(this.sql("SELECT c, d FROM SG", new Object[0])).containsExactlyInAnyOrder((Object[])new Row[]{Row.of((Object[])new Object[]{5, null})});
    }

    @Test
    public void testInvalidSequenceGroup() {
        Assertions.assertThatThrownBy(() -> this.sql("CREATE TABLE SG (k INT, a INT, b INT, g_1 INT, c INT, d INT, g_2 INT, PRIMARY KEY (k) NOT ENFORCED) WITH ('merge-engine'='partial-update', 'fields.g_0.sequence-group'='a,b', 'fields.g_2.sequence-group'='c,d');", new Object[0])).hasRootCauseMessage("The sequence field group: g_0 can not be found in table schema.");
        Assertions.assertThatThrownBy(() -> this.sql("CREATE TABLE SG (k INT, a INT, b INT, g_1 INT, c INT, d INT, g_2 INT, PRIMARY KEY (k) NOT ENFORCED) WITH ('merge-engine'='partial-update', 'fields.g_1.sequence-group'='a1,b', 'fields.g_2.sequence-group'='c,d');", new Object[0])).hasRootCauseMessage("Field a1 can not be found in table schema.");
        Assertions.assertThatThrownBy(() -> this.sql("CREATE TABLE SG (k INT, a INT, b INT, g_1 INT, c INT, d INT, g_2 INT, PRIMARY KEY (k) NOT ENFORCED) WITH ('merge-engine'='partial-update', 'fields.g_1.sequence-group'='a,b', 'fields.g_2.sequence-group'='a,d');", new Object[0])).hasRootCauseMessage("Field a is defined repeatedly by multiple groups: [g_1, g_2].");
    }

    @Test
    public void testProjectPushDownWithLookupChangelogProducer() {
        this.sql("CREATE TABLE IF NOT EXISTS T_P (j INT, k INT, a INT, b INT, c STRING, PRIMARY KEY (j,k) NOT ENFORCED) WITH ('merge-engine'='partial-update', 'changelog-producer' = 'lookup', 'fields.a.sequence-group'='j', 'fields.b.sequence-group'='c');", new Object[0]);
        this.batchSql("INSERT INTO T_P VALUES (1, 1, 1, 1, '1')", new Object[0]);
        Assertions.assertThat(this.sql("SELECT k, c FROM T_P", new Object[0])).containsExactlyInAnyOrder((Object[])new Row[]{Row.of((Object[])new Object[]{1, "1"})});
    }

    @Test
    public void testLocalMerge() {
        this.sql("CREATE TABLE T1 (k INT,v INT,d INT,PRIMARY KEY (k, d) NOT ENFORCED) PARTITIONED BY (d)  WITH ('merge-engine'='partial-update', 'local-merge-buffer-size'='1m');", new Object[0]);
        this.sql("INSERT INTO T1 VALUES (1, CAST(NULL AS INT), 1), (2, 1, 1), (1, 2, 1)", new Object[0]);
        Assertions.assertThat(this.batchSql("SELECT * FROM T1", new Object[0])).containsExactlyInAnyOrder((Object[])new Row[]{Row.of((Object[])new Object[]{1, 2, 1}), Row.of((Object[])new Object[]{2, 1, 1})});
    }

    @Test
    public void testPartialUpdateWithAggregation() {
        this.sql("CREATE TABLE AGG (k INT, a INT, b INT, g_1 INT, c VARCHAR, g_2 INT, PRIMARY KEY (k) NOT ENFORCED) WITH ('merge-engine'='partial-update', 'fields.a.aggregate-function'='sum', 'fields.g_1.sequence-group'='a', 'fields.g_2.sequence-group'='c');", new Object[0]);
        this.sql("INSERT INTO AGG VALUES (1, 1, 1, 1, '1', 1)", new Object[0]);
        this.sql("INSERT INTO AGG VALUES (1, 2, 2, 2, '2', CAST(NULL AS INT))", new Object[0]);
        Assertions.assertThat(this.sql("SELECT * FROM AGG", new Object[0])).containsExactlyInAnyOrder((Object[])new Row[]{Row.of((Object[])new Object[]{1, 3, 2, 2, "1", 1})});
        Assertions.assertThat(this.sql("SELECT a, c FROM AGG", new Object[0])).containsExactlyInAnyOrder((Object[])new Row[]{Row.of((Object[])new Object[]{3, "1"})});
        this.sql("INSERT INTO AGG VALUES (1, 3, 3, 1, '3', 3)", new Object[0]);
        Assertions.assertThat(this.sql("SELECT * FROM AGG", new Object[0])).containsExactlyInAnyOrder((Object[])new Row[]{Row.of((Object[])new Object[]{1, 6, 3, 2, "3", 3})});
        this.sql("INSERT INTO AGG VALUES (1, CAST(NULL AS INT), CAST(NULL AS INT), 2, CAST(NULL AS VARCHAR), 4)", new Object[0]);
        Assertions.assertThat(this.sql("SELECT a, b, c FROM AGG", new Object[0])).containsExactlyInAnyOrder((Object[])new Row[]{Row.of((Object[])new Object[]{6, 3, null})});
    }

    @Test
    public void testFirstValuePartialUpdate() {
        this.sql("CREATE TABLE AGG (k INT, a INT, g_1 INT, PRIMARY KEY (k) NOT ENFORCED) WITH ('merge-engine'='partial-update', 'fields.g_1.sequence-group'='a', 'fields.a.aggregate-function'='first_value');", new Object[0]);
        this.sql("INSERT INTO AGG VALUES (1, 1, 1), (1, 2, 2)", new Object[0]);
        Assertions.assertThat(this.sql("SELECT * FROM AGG", new Object[0])).containsExactlyInAnyOrder((Object[])new Row[]{Row.of((Object[])new Object[]{1, 1, 2})});
        this.sql("INSERT INTO AGG VALUES (1, 0, 0)", new Object[0]);
        Assertions.assertThat(this.sql("SELECT * FROM AGG", new Object[0])).containsExactlyInAnyOrder((Object[])new Row[]{Row.of((Object[])new Object[]{1, 0, 2})});
    }
}

