/*
 * Decompiled with CFR 0.152.
 */
package org.apache.paimon.flink.procedure;

import org.apache.flink.types.Row;
import org.apache.paimon.flink.CatalogITCaseBase;
import org.apache.paimon.table.FileStoreTable;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;

public class RollbackToWatermarkProcedureITCase
extends CatalogITCaseBase {
    @Test
    public void testCreateTagsFromSnapshotsWatermark() throws Exception {
        this.sql("CREATE TABLE T ( k STRING, dt STRING, PRIMARY KEY (k, dt) NOT ENFORCED) PARTITIONED BY (dt) WITH ( 'bucket' = '1')", new Object[0]);
        this.sql("insert into T/*+ OPTIONS('end-input.watermark'= '1000') */ values('k1', '2024-12-02')", new Object[0]);
        this.sql("insert into T/*+ OPTIONS('end-input.watermark'= '2000') */ values('k2', '2024-12-02')", new Object[0]);
        this.sql("insert into T/*+ OPTIONS('end-input.watermark'= '3000') */ values('k3', '2024-12-02')", new Object[0]);
        FileStoreTable table = this.paimonTable("T");
        long watermark1 = table.snapshotManager().snapshot(1L).watermark();
        long watermark2 = table.snapshotManager().snapshot(2L).watermark();
        long watermark3 = table.snapshotManager().snapshot(3L).watermark();
        Assertions.assertThat((watermark1 == 1000L ? 1 : 0) != 0).isTrue();
        Assertions.assertThat((watermark2 == 2000L ? 1 : 0) != 0).isTrue();
        Assertions.assertThat((watermark3 == 3000L ? 1 : 0) != 0).isTrue();
        Assertions.assertThat(this.sql("select * from T", new Object[0]).stream().map(Row::toString)).containsExactlyInAnyOrder((Object[])new String[]{"+I[k1, 2024-12-02]", "+I[k2, 2024-12-02]", "+I[k3, 2024-12-02]"});
        this.sql("CALL sys.rollback_to_watermark(`table` => 'default.T',`watermark` => 2001)", new Object[0]);
        Assertions.assertThat(this.sql("select * from T", new Object[0]).stream().map(Row::toString)).containsExactlyInAnyOrder((Object[])new String[]{"+I[k1, 2024-12-02]", "+I[k2, 2024-12-02]"});
        this.sql("CALL sys.rollback_to_watermark(`table` => 'default.T',`watermark` => 1001)", new Object[0]);
        Assertions.assertThat(this.sql("select * from T", new Object[0]).stream().map(Row::toString)).containsExactlyInAnyOrder((Object[])new String[]{"+I[k1, 2024-12-02]"});
    }
}

