/*
 * Decompiled with CFR 0.152.
 */
package org.apache.paimon.flink.procedure;

import org.apache.flink.types.Row;
import org.apache.paimon.Snapshot;
import org.apache.paimon.flink.CatalogITCaseBase;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.tag.Tag;
import org.apache.paimon.utils.SnapshotNotExistException;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;

public class CreateTagFromWatermarkProcedureITCase
extends CatalogITCaseBase {
    @Test
    public void testCreateTagsFromSnapshotsWatermark() throws Exception {
        this.sql("CREATE TABLE T ( k STRING, dt STRING, PRIMARY KEY (k, dt) NOT ENFORCED) PARTITIONED BY (dt) WITH ( 'bucket' = '1')", new Object[0]);
        this.sql("insert into T values('k1', '2024-01-02')", new Object[0]);
        Assertions.assertThatException().isThrownBy(() -> this.sql("CALL sys.create_tag_from_watermark(`table` => 'default.T',`tag` => 'tag1',`watermark` => %s )", 1000)).withRootCauseInstanceOf(SnapshotNotExistException.class).withMessageContaining("Could not find any snapshot whose watermark later than %s.", new Object[]{1000});
        this.sql("insert into T/*+ OPTIONS('end-input.watermark'= '1000') */ values('k2', '2024-01-02')", new Object[0]);
        this.sql("insert into T/*+ OPTIONS('end-input.watermark'= '2000') */ values('k3', '2024-01-02')", new Object[0]);
        FileStoreTable table = this.paimonTable("T");
        long watermark1 = table.snapshotManager().snapshot(1L).watermark();
        Snapshot snapshot2 = table.snapshotManager().snapshot(2L);
        long commitTime2 = snapshot2.timeMillis();
        long watermark2 = snapshot2.watermark();
        Snapshot snapshot3 = table.snapshotManager().snapshot(3L);
        long commitTime3 = snapshot3.timeMillis();
        long watermark3 = snapshot3.watermark();
        Assertions.assertThat((watermark1 == Long.MIN_VALUE ? 1 : 0) != 0).isTrue();
        Assertions.assertThat((watermark2 == 1000L ? 1 : 0) != 0).isTrue();
        Assertions.assertThat((watermark3 == 2000L ? 1 : 0) != 0).isTrue();
        Assertions.assertThat(this.sql("CALL sys.create_tag_from_watermark(`table` => 'default.T',`tag` => 'tag2',`watermark` => %s)", watermark2 - 1L).stream().map(Row::toString)).containsExactlyInAnyOrder((Object[])new String[]{String.format("+I[tag2, 2, %s, %s]", commitTime2, watermark2)});
        Assertions.assertThat(this.sql("CALL sys.create_tag_from_watermark(`table` => 'default.T',`tag` => 'tag3',`watermark` => %s)", watermark2 + 1L).stream().map(Row::toString)).containsExactlyInAnyOrder((Object[])new String[]{String.format("+I[tag3, 3, %s, %s]", commitTime3, watermark3)});
        Assertions.assertThatException().isThrownBy(() -> this.sql("CALL sys.create_tag_from_watermark(`table` => 'default.T',`tag` => 'tag4',`watermark` => %s )", watermark3 + 1L)).withRootCauseInstanceOf(SnapshotNotExistException.class).withMessageContaining("Could not find any snapshot whose watermark later than %s.", new Object[]{watermark3 + 1L});
    }

    @Test
    public void testCreateTagsFromTagsWatermark() throws Exception {
        this.sql("CREATE TABLE T ( k STRING, dt STRING, PRIMARY KEY (k, dt) NOT ENFORCED) PARTITIONED BY (dt) WITH ( 'bucket' = '1')", new Object[0]);
        this.sql("insert into T/*+ OPTIONS('end-input.watermark'= '1000') */ values('k2', '2024-01-02')", new Object[0]);
        this.sql("CALL sys.create_tag('default.T', 'tag1', 1)", new Object[0]);
        this.sql("insert into T/*+ OPTIONS('end-input.watermark'= '2000', 'snapshot.num-retained.max' = '1', 'snapshot.num-retained.min' = '1') */ values('k2', '2024-01-02')", new Object[0]);
        FileStoreTable table = this.paimonTable("T");
        Assertions.assertThat((boolean)table.snapshotManager().snapshotExists(1L)).isFalse();
        Tag tagSnapshot1 = table.tagManager().getOrThrow("tag1");
        long tagsCommitTime = tagSnapshot1.timeMillis();
        long tagsWatermark = tagSnapshot1.watermark();
        Snapshot snapshot2 = table.snapshotManager().snapshot(2L);
        long commitTime2 = snapshot2.timeMillis();
        long watermark2 = snapshot2.watermark();
        Assertions.assertThat((tagsWatermark == 1000L ? 1 : 0) != 0).isTrue();
        Assertions.assertThat((watermark2 == 2000L ? 1 : 0) != 0).isTrue();
        Assertions.assertThat(this.sql("CALL sys.create_tag_from_watermark(`table` => 'default.T',`tag` => 'tag2',`watermark` => %s)", tagsWatermark - 1L).stream().map(Row::toString)).containsExactlyInAnyOrder((Object[])new String[]{String.format("+I[tag2, 1, %s, %s]", tagsCommitTime, tagsWatermark)});
        Assertions.assertThat(this.sql("CALL sys.create_tag_from_watermark(`table` => 'default.T',`tag` => 'tag3',`watermark` => %s)", watermark2 - 1L).stream().map(Row::toString)).containsExactlyInAnyOrder((Object[])new String[]{String.format("+I[tag3, 2, %s, %s]", commitTime2, watermark2)});
    }
}

