/*
 * Decompiled with CFR 0.152.
 */
package org.apache.paimon.flink;

import java.io.IOException;
import java.util.List;
import java.util.stream.Collectors;
import org.apache.flink.types.Row;
import org.apache.flink.types.RowKind;
import org.apache.paimon.flink.CatalogITCaseBase;
import org.apache.paimon.utils.BlockingIterator;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;

public class FullCompactionFileStoreITCase
extends CatalogITCaseBase {
    private final String table = "T";
    private final String options = " WITH('changelog-producer'='full-compaction', 'changelog-producer.compaction-interval' = '1s')";

    @Override
    @BeforeEach
    public void before() throws IOException {
        super.before();
        this.tEnv.executeSql("CREATE TABLE IF NOT EXISTS T (a STRING, b STRING, c STRING, PRIMARY KEY (a) NOT ENFORCED) WITH('changelog-producer'='full-compaction', 'changelog-producer.compaction-interval' = '1s')");
    }

    @Test
    public void testStreamingRead() throws Exception {
        BlockingIterator iterator = BlockingIterator.of(this.streamSqlIter("SELECT * FROM %s", "T"));
        this.sql("INSERT INTO %s VALUES ('1', '2', '3'), ('4', '5', '6')", "T");
        Assertions.assertThat((List)iterator.collect(2)).containsExactlyInAnyOrder((Object[])new Row[]{Row.of((Object[])new Object[]{"1", "2", "3"}), Row.of((Object[])new Object[]{"4", "5", "6"})});
        this.sql("INSERT INTO %s VALUES ('7', '8', '9')", "T");
        Assertions.assertThat((List)iterator.collect(1)).containsExactlyInAnyOrder((Object[])new Row[]{Row.of((Object[])new Object[]{"7", "8", "9"})});
    }

    @Test
    public void testStreamingReadOfArray() throws Exception {
        String table = "T_ARRAY";
        this.tEnv.executeSql("CREATE TABLE IF NOT EXISTS " + table + "(ID INT PRIMARY KEY NOT ENFORCED,\nNAMES ARRAY<ROW<NAME STRING, MARK STRING>>\n)" + " WITH('changelog-producer'='full-compaction', 'changelog-producer.compaction-interval' = '1s')");
        BlockingIterator iterator = BlockingIterator.of(this.streamSqlIter("SELECT * FROM %s", table));
        this.sql("INSERT INTO %s VALUES (1, ARRAY[('c','mark1'), ('d','mark2'), ('e','mark3')]);", table);
        Assertions.assertThat(iterator.collect(1).stream().map(Row::toString).collect(Collectors.toList())).containsExactlyInAnyOrder((Object[])new String[]{"+I[1, [+I[c, mark1], +I[d, mark2], +I[e, mark3]]]"});
    }

    @Test
    public void testCompactedScanMode() throws Exception {
        BlockingIterator iterator = BlockingIterator.of(this.streamSqlIter("SELECT * FROM %s /*+ OPTIONS('scan.mode'='compacted-full') */", "T"));
        this.sql("INSERT INTO %s VALUES ('1', '2', '3'), ('4', '5', '6')", "T");
        Assertions.assertThat((List)iterator.collect(2)).containsExactlyInAnyOrder((Object[])new Row[]{Row.of((Object[])new Object[]{"1", "2", "3"}), Row.of((Object[])new Object[]{"4", "5", "6"})});
        this.sql("INSERT INTO %s VALUES ('7', '8', '9')", "T");
        Assertions.assertThat((List)iterator.collect(1)).containsExactlyInAnyOrder((Object[])new Row[]{Row.of((Object[])new Object[]{"7", "8", "9"})});
        Assertions.assertThat(this.sql("SELECT * FROM T /*+ OPTIONS('scan.mode'='compacted-full') */", new Object[0])).containsExactlyInAnyOrder((Object[])new Row[]{Row.of((Object[])new Object[]{"1", "2", "3"}), Row.of((Object[])new Object[]{"4", "5", "6"}), Row.of((Object[])new Object[]{"7", "8", "9"})});
    }

    @ParameterizedTest
    @ValueSource(booleans={false, true})
    public void testUpdate(boolean changelogRowDeduplicate) throws Exception {
        this.sql("ALTER TABLE %s SET ('changelog-producer.row-deduplicate' = '%s')", "T", changelogRowDeduplicate);
        BlockingIterator iterator = BlockingIterator.of(this.streamSqlIter("SELECT * FROM %s", "T"));
        this.sql("INSERT INTO %s VALUES ('1', '2', '3'), ('4', '5', '1')", "T");
        Assertions.assertThat((List)iterator.collect(2)).containsExactlyInAnyOrder((Object[])new Row[]{Row.of((Object[])new Object[]{"1", "2", "3"}), Row.of((Object[])new Object[]{"4", "5", "1"})});
        this.sql("INSERT INTO %s VALUES ('1', '4', '5')", "T");
        Assertions.assertThat((List)iterator.collect(2)).containsExactlyInAnyOrder((Object[])new Row[]{Row.ofKind((RowKind)RowKind.UPDATE_BEFORE, (Object[])new Object[]{"1", "2", "3"}), Row.ofKind((RowKind)RowKind.UPDATE_AFTER, (Object[])new Object[]{"1", "4", "5"})});
        for (int i = 1; i < 5; ++i) {
            this.sql("INSERT INTO %s VALUES ('1', '4', '5'), ('4', '5', '%s')", "T", i + 1);
            if (changelogRowDeduplicate) {
                Assertions.assertThat((List)iterator.collect(2)).containsExactlyInAnyOrder((Object[])new Row[]{Row.ofKind((RowKind)RowKind.UPDATE_BEFORE, (Object[])new Object[]{"4", "5", String.valueOf(i)}), Row.ofKind((RowKind)RowKind.UPDATE_AFTER, (Object[])new Object[]{"4", "5", String.valueOf(i + 1)})});
                continue;
            }
            Assertions.assertThat((List)iterator.collect(4)).containsExactlyInAnyOrder((Object[])new Row[]{Row.ofKind((RowKind)RowKind.UPDATE_BEFORE, (Object[])new Object[]{"1", "4", "5"}), Row.ofKind((RowKind)RowKind.UPDATE_AFTER, (Object[])new Object[]{"1", "4", "5"}), Row.ofKind((RowKind)RowKind.UPDATE_BEFORE, (Object[])new Object[]{"4", "5", String.valueOf(i)}), Row.ofKind((RowKind)RowKind.UPDATE_AFTER, (Object[])new Object[]{"4", "5", String.valueOf(i + 1)})});
        }
        iterator.close();
    }

    @Test
    public void testUpdateAuditLog() throws Exception {
        BlockingIterator iterator = BlockingIterator.of(this.streamSqlIter("SELECT * FROM %s$audit_log", "T"));
        this.sql("INSERT INTO %s VALUES ('1', '2', '3')", "T");
        Assertions.assertThat((List)iterator.collect(1)).containsExactlyInAnyOrder((Object[])new Row[]{Row.ofKind((RowKind)RowKind.INSERT, (Object[])new Object[]{"+I", "1", "2", "3"})});
        this.sql("INSERT INTO %s VALUES ('1', '4', '5')", "T");
        Assertions.assertThat((List)iterator.collect(2)).containsExactlyInAnyOrder((Object[])new Row[]{Row.ofKind((RowKind)RowKind.INSERT, (Object[])new Object[]{"-U", "1", "2", "3"}), Row.ofKind((RowKind)RowKind.INSERT, (Object[])new Object[]{"+U", "1", "4", "5"})});
        iterator.close();
        Assertions.assertThat(this.sql("SELECT * FROM %s$audit_log", "T")).containsExactlyInAnyOrder((Object[])new Row[]{Row.ofKind((RowKind)RowKind.INSERT, (Object[])new Object[]{"+I", "1", "4", "5"})});
    }

    @Test
    public void testRowDeduplicateWithArrayRow() throws Exception {
        String table = "T_ARRAY_ROW";
        this.tEnv.executeSql("CREATE TABLE IF NOT EXISTS " + table + "(ID INT PRIMARY KEY NOT ENFORCED,\nNAMES ARRAY<ROW<NAME STRING, MARK STRING>>\n) WITH ('changelog-producer'='full-compaction','changelog-producer.compaction-interval' = '1s','changelog-producer.row-deduplicate' = 'true')");
        BlockingIterator iterator = BlockingIterator.of(this.streamSqlIter("SELECT * FROM %s", table));
        this.sql("INSERT INTO %s VALUES (1, ARRAY[('a','mark1')]);", table);
        Assertions.assertThat(iterator.collect(1).stream().map(Row::toString).collect(Collectors.toList())).containsExactlyInAnyOrder((Object[])new String[]{"+I[1, [+I[a, mark1]]]"});
        this.sql("INSERT INTO %s VALUES (1, ARRAY[('b', 'mark2')])", table);
        Assertions.assertThat(iterator.collect(2).stream().map(Row::toString).collect(Collectors.toList())).containsExactly((Object[])new String[]{"-U[1, [+I[a, mark1]]]", "+U[1, [+I[b, mark2]]]"});
        this.sql("INSERT INTO %s VALUES (1, ARRAY[('b', 'mark2')]), (2, ARRAY[('c', 'mark3')])", table);
        Assertions.assertThat(iterator.collect(1).stream().map(Row::toString).collect(Collectors.toList())).containsExactly((Object[])new String[]{"+I[2, [+I[c, mark3]]]"});
    }
}

