/*
 * Decompiled with CFR 0.152.
 */
package org.apache.paimon.flink.procedure;

import org.apache.flink.types.Row;
import org.apache.paimon.flink.CatalogITCaseBase;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.utils.SnapshotNotExistException;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;

public class CreateTagFromTimestampProcedureITCase
extends CatalogITCaseBase {
    @Test
    public void testCreateTagsFromSnapshotsCommitTime() throws Exception {
        this.sql("CREATE TABLE T ( k STRING, dt STRING, PRIMARY KEY (k, dt) NOT ENFORCED) PARTITIONED BY (dt) WITH ( 'bucket' = '1')", new Object[0]);
        for (int i = 1; i <= 4; ++i) {
            this.sql("insert into T values('%s', '2024-01-01')", i);
            Thread.sleep(100L);
        }
        FileStoreTable table = this.paimonTable("T");
        long earliestCommitTime = table.snapshotManager().earliestSnapshot().timeMillis();
        long commitTime3 = table.snapshotManager().snapshot(3L).timeMillis();
        long commitTime4 = table.snapshotManager().snapshot(4L).timeMillis();
        Assertions.assertThat(this.sql("CALL sys.create_tag_from_timestamp(`table` => 'default.T',`tag` => 'tag1',`timestamp` => %s)", earliestCommitTime - 1L).stream().map(Row::toString)).containsExactlyInAnyOrder((Object[])new String[]{String.format("+I[tag1, 1, %s, %s]", earliestCommitTime, Long.MIN_VALUE)});
        Assertions.assertThat(this.sql("CALL sys.create_tag_from_timestamp(`table` => 'default.T',`tag` => 'tag2',`timestamp` => %s)", commitTime3).stream().map(Row::toString)).containsExactlyInAnyOrder((Object[])new String[]{String.format("+I[tag2, 3, %s, %s]", commitTime3, Long.MIN_VALUE)});
        Assertions.assertThat(this.sql("CALL sys.create_tag_from_timestamp(`table` => 'default.T',`tag` => 'tag3',`timestamp` => %s)", commitTime3 + 1L).stream().map(Row::toString)).containsExactlyInAnyOrder((Object[])new String[]{String.format("+I[tag3, 4, %s, %s]", commitTime4, Long.MIN_VALUE)});
        Assertions.assertThatException().isThrownBy(() -> this.sql("CALL sys.create_tag_from_timestamp(`table` => 'default.T',`tag` => 'tag4',`timestamp` => %s)", Long.MAX_VALUE)).withRootCauseInstanceOf(SnapshotNotExistException.class).withMessageContaining("Could not find any snapshot whose commit-time later than %s.", new Object[]{Long.MAX_VALUE});
    }

    @Test
    public void testCreateTagsFromTagsCommitTime() throws Exception {
        this.sql("CREATE TABLE T ( k STRING, dt STRING, PRIMARY KEY (k, dt) NOT ENFORCED) PARTITIONED BY (dt) WITH ( 'bucket' = '1')", new Object[0]);
        this.sql("insert into T values('1', '2024-01-01')", new Object[0]);
        Thread.sleep(100L);
        this.sql("CALL sys.create_tag('default.T', 'tag1', 1)", new Object[0]);
        this.sql("insert into T/*+ OPTIONS( 'snapshot.num-retained.max' = '1', 'snapshot.num-retained.min' = '1') */ values('2', '2024-01-01')", new Object[0]);
        FileStoreTable table = this.paimonTable("T");
        long earliestCommitTime = table.snapshotManager().earliestSnapshot().timeMillis();
        long tagSnapshotCommitTime = table.tagManager().getOrThrow("tag1").timeMillis();
        Assertions.assertThat((tagSnapshotCommitTime < earliestCommitTime ? 1 : 0) != 0).isTrue();
        Assertions.assertThat(this.sql("CALL sys.create_tag_from_timestamp(`table` => 'default.T',`tag` => 'tag2',`timestamp` => %s)", tagSnapshotCommitTime - 1L).stream().map(Row::toString)).containsExactlyInAnyOrder((Object[])new String[]{String.format("+I[tag2, 1, %s, %s]", tagSnapshotCommitTime, Long.MIN_VALUE)});
        Assertions.assertThat(this.sql("CALL sys.create_tag_from_timestamp(`table` => 'default.T',`tag` => 'tag3',`timestamp` => %s)", earliestCommitTime - 1L).stream().map(Row::toString)).containsExactlyInAnyOrder((Object[])new String[]{String.format("+I[tag3, 2, %s, %s]", earliestCommitTime, Long.MIN_VALUE)});
    }
}

