/*
 * Decompiled with CFR 0.152.
 */
package org.apache.paimon.flink;

import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.List;
import org.apache.flink.types.Row;
import org.apache.paimon.flink.CatalogITCaseBase;
import org.apache.paimon.utils.BlockingIterator;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;

public class TimeTravelITCase
extends CatalogITCaseBase {
    @Test
    public void testTravelToTimestampString() throws Exception {
        this.sql("CREATE TABLE t (k INT, v STRING)", new Object[0]);
        this.sql("INSERT INTO t VALUES(1, 'hello'), (2, 'world')", new Object[0]);
        Thread.sleep(3000L);
        String anchor = this.now();
        this.sql("INSERT INTO t VALUES(1, 'flink'), (2, 'paimon')", new Object[0]);
        List<Row> result = this.sql("SELECT * FROM t", new Object[0]);
        Assertions.assertThat((String)result.toString()).isEqualTo("[+I[1, hello], +I[2, world], +I[1, flink], +I[2, paimon]]");
        result = this.sql("SELECT * FROM t FOR SYSTEM_TIME AS OF TIMESTAMP '%s'", anchor);
        Assertions.assertThat((String)result.toString()).isEqualTo("[+I[1, hello], +I[2, world]]");
    }

    @Test
    public void testExpression() throws Exception {
        this.sql("CREATE TABLE t (k INT, v STRING)", new Object[0]);
        this.sql("INSERT INTO t VALUES(1, 'hello'), (2, 'world')", new Object[0]);
        String anchor = this.now();
        Thread.sleep(3000L);
        this.sql("INSERT INTO t VALUES(1, 'flink'), (2, 'paimon')", new Object[0]);
        List<Row> result = this.sql("SELECT * FROM t", new Object[0]);
        Assertions.assertThat((String)result.toString()).isEqualTo("[+I[1, hello], +I[2, world], +I[1, flink], +I[2, paimon]]");
        result = this.sql("SELECT * FROM t FOR SYSTEM_TIME AS OF TIMESTAMP '%s' + INTERVAL '1' SECOND", anchor);
        Assertions.assertThat((String)result.toString()).isEqualTo("[+I[1, hello], +I[2, world]]");
    }

    @Test
    public void testTravelToOldSchema() throws Exception {
        this.sql("CREATE TABLE t (k INT, v STRING)", new Object[0]);
        this.sql("INSERT INTO t VALUES(1, 'hello'), (2, 'world')", new Object[0]);
        Thread.sleep(3000L);
        String anchor = this.now();
        this.sql("ALTER TABLE t ADD dt STRING", new Object[0]);
        this.sql("INSERT INTO t VALUES(1, 'flink', '2020-01-01'), (2, 'paimon', '2020-01-02')", new Object[0]);
        List<Row> result = this.sql("SELECT * FROM t", new Object[0]);
        Assertions.assertThat((String)result.toString()).isEqualTo("[+I[1, hello, null], +I[2, world, null], +I[1, flink, 2020-01-01], +I[2, paimon, 2020-01-02]]");
        result = this.sql("SELECT * FROM t FOR SYSTEM_TIME AS OF TIMESTAMP '%s'", anchor);
        Assertions.assertThat((String)result.toString()).isEqualTo("[+I[1, hello], +I[2, world]]");
    }

    @Test
    public void testTravelToNonExistedTimestamp() {
        this.sql("CREATE TABLE t (k INT, v STRING)", new Object[0]);
        this.sql("INSERT INTO t VALUES(1, 'hello'), (2, 'world')", new Object[0]);
        Assertions.assertThat(this.sql("SELECT * FROM t FOR SYSTEM_TIME AS OF TIMESTAMP '1900-01-01 00:00:00'", new Object[0])).isEmpty();
    }

    @Test
    public void testSystemTableTimeTravel() throws Exception {
        this.sql("CREATE TABLE t (k INT, v STRING)", new Object[0]);
        this.sql("INSERT INTO t VALUES(1, 'hello'), (2, 'world')", new Object[0]);
        Thread.sleep(3000L);
        String anchor = this.now();
        this.sql("INSERT INTO t VALUES(1, 'flink'), (2, 'paimon')", new Object[0]);
        List<Row> result = this.sql("SELECT * FROM t$files", new Object[0]);
        Assertions.assertThat((int)result.size()).isEqualTo(2);
        result = this.sql("SELECT * FROM t$files FOR SYSTEM_TIME AS OF TIMESTAMP '%s'", anchor);
        Assertions.assertThat((int)result.size()).isEqualTo(1);
    }

    @Test
    public void testStreamingTravel() throws Exception {
        this.sql("CREATE TABLE t (k INT PRIMARY KEY NOT ENFORCED, v STRING)", new Object[0]);
        BlockingIterator<Row, Row> streamIter = this.streamSqlBlockIter("SELECT * FROM t FOR SYSTEM_TIME AS OF TIMESTAMP '%s'", this.now());
        this.sql("INSERT INTO t VALUES(1, 'hello')", new Object[0]);
        this.sql("INSERT INTO t VALUES(1, 'apache')", new Object[0]);
        List result = streamIter.collect(3);
        Assertions.assertThat((String)result.toString()).isEqualTo("[+I[1, hello], -U[1, hello], +U[1, apache]]");
        streamIter.close();
    }

    private String now() {
        return LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));
    }
}

