/*
 * Decompiled with CFR 0.152.
 */
package org.apache.paimon.spark;

import java.time.LocalDateTime;
import java.util.HashMap;
import java.util.List;
import org.apache.paimon.CoreOptions;
import org.apache.paimon.data.BinaryString;
import org.apache.paimon.data.GenericRow;
import org.apache.paimon.spark.SparkReadTestBase;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.testutils.assertj.PaimonAssertions;
import org.assertj.core.api.Assertions;
import org.assertj.core.api.ThrowingConsumer;
import org.junit.jupiter.api.Test;

public class SparkTimeTravelITCase
extends SparkReadTestBase {
    @Test
    public void testTravelToVersion() throws Exception {
        spark.sql("CREATE TABLE t (k INT, v STRING)");
        SparkTimeTravelITCase.writeTable("t", GenericRow.of((Object[])new Object[]{1, BinaryString.fromString((String)"Hello")}), GenericRow.of((Object[])new Object[]{2, BinaryString.fromString((String)"Paimon")}));
        SparkTimeTravelITCase.writeTable("t", GenericRow.of((Object[])new Object[]{3, BinaryString.fromString((String)"Test")}), GenericRow.of((Object[])new Object[]{4, BinaryString.fromString((String)"Case")}));
        Assertions.assertThat((String)spark.sql("SELECT * FROM t").collectAsList().toString()).isEqualTo("[[1,Hello], [2,Paimon], [3,Test], [4,Case]]");
        Assertions.assertThat((String)spark.sql("SELECT * FROM t VERSION AS OF 1").collectAsList().toString()).isEqualTo("[[1,Hello], [2,Paimon]]");
    }

    @Test
    public void testTravelToTimestampString() throws Exception {
        spark.sql("CREATE TABLE t (k INT, v STRING)");
        SparkTimeTravelITCase.writeTable("t", GenericRow.of((Object[])new Object[]{1, BinaryString.fromString((String)"Hello")}), GenericRow.of((Object[])new Object[]{2, BinaryString.fromString((String)"Paimon")}));
        String anchor = LocalDateTime.now().toString();
        SparkTimeTravelITCase.writeTable("t", GenericRow.of((Object[])new Object[]{3, BinaryString.fromString((String)"Test")}), GenericRow.of((Object[])new Object[]{4, BinaryString.fromString((String)"Case")}));
        Assertions.assertThat((String)spark.sql("SELECT * FROM t").collectAsList().toString()).isEqualTo("[[1,Hello], [2,Paimon], [3,Test], [4,Case]]");
        Assertions.assertThat((String)spark.sql(String.format("SELECT * FROM t TIMESTAMP AS OF '%s'", anchor)).collectAsList().toString()).isEqualTo("[[1,Hello], [2,Paimon]]");
    }

    @Test
    public void testTravelToTimestampNumber() throws Exception {
        spark.sql("CREATE TABLE t (k INT, v STRING)");
        SparkTimeTravelITCase.writeTable("t", GenericRow.of((Object[])new Object[]{1, BinaryString.fromString((String)"Hello")}), GenericRow.of((Object[])new Object[]{2, BinaryString.fromString((String)"Paimon")}));
        Thread.sleep(1000L);
        long anchor = System.currentTimeMillis() / 1000L;
        SparkTimeTravelITCase.writeTable("t", GenericRow.of((Object[])new Object[]{3, BinaryString.fromString((String)"Test")}), GenericRow.of((Object[])new Object[]{4, BinaryString.fromString((String)"Case")}));
        Assertions.assertThat((String)spark.sql("SELECT * FROM t").collectAsList().toString()).isEqualTo("[[1,Hello], [2,Paimon], [3,Test], [4,Case]]");
        Assertions.assertThat((String)spark.sql(String.format("SELECT * FROM t TIMESTAMP AS OF %s", anchor)).collectAsList().toString()).isEqualTo("[[1,Hello], [2,Paimon]]");
    }

    @Test
    public void testTravelToOldSchema() throws Exception {
        spark.sql("CREATE TABLE t (k INT, v STRING)");
        SparkTimeTravelITCase.writeTable("t", GenericRow.of((Object[])new Object[]{1, BinaryString.fromString((String)"Hello")}), GenericRow.of((Object[])new Object[]{2, BinaryString.fromString((String)"Paimon")}));
        spark.sql("ALTER TABLE t ADD COLUMN dt STRING");
        SparkTimeTravelITCase.writeTable("t", GenericRow.of((Object[])new Object[]{3, BinaryString.fromString((String)"Test"), BinaryString.fromString((String)"0401")}), GenericRow.of((Object[])new Object[]{4, BinaryString.fromString((String)"Case"), BinaryString.fromString((String)"0402")}));
        Assertions.assertThat((String)spark.sql("SELECT * FROM t").collectAsList().toString()).isEqualTo("[[1,Hello,null], [2,Paimon,null], [3,Test,0401], [4,Case,0402]]");
        Assertions.assertThat((String)spark.sql("SELECT * FROM t VERSION AS OF 1").collectAsList().toString()).isEqualTo("[[1,Hello], [2,Paimon]]");
    }

    @Test
    public void testTravelToNonExistedVersion() {
        spark.sql("CREATE TABLE t (k INT, v STRING)");
        Assertions.assertThat((List)spark.sql("SELECT * FROM t VERSION AS OF 2").collectAsList()).isEmpty();
    }

    @Test
    public void testTravelToNonExistedTimestamp() {
        long anchor = System.currentTimeMillis() / 1000L;
        spark.sql("CREATE TABLE t (k INT, v STRING)");
        Assertions.assertThat((List)spark.sql(String.format("SELECT * FROM t TIMESTAMP AS OF %s", anchor)).collectAsList()).isEmpty();
    }

    @Test
    public void testSystemTableTimeTravel() throws Exception {
        spark.sql("CREATE TABLE t (k INT, v STRING)");
        SparkTimeTravelITCase.writeTable("t", GenericRow.of((Object[])new Object[]{1, BinaryString.fromString((String)"Hello")}), GenericRow.of((Object[])new Object[]{2, BinaryString.fromString((String)"Paimon")}));
        String anchor = LocalDateTime.now().toString();
        SparkTimeTravelITCase.writeTable("t", GenericRow.of((Object[])new Object[]{3, BinaryString.fromString((String)"Test")}), GenericRow.of((Object[])new Object[]{4, BinaryString.fromString((String)"Case")}));
        Assertions.assertThat((int)spark.sql("SELECT * FROM `t$files`").collectAsList().size()).isEqualTo(2);
        Assertions.assertThat((int)spark.sql("SELECT * FROM `t$files` VERSION AS OF 1").collectAsList().size()).isEqualTo(1);
        Assertions.assertThat((int)spark.sql(String.format("SELECT * FROM `t$files` TIMESTAMP AS OF '%s'", anchor)).collectAsList().size()).isEqualTo(1);
    }

    @Test
    public void testTravelToTag() throws Exception {
        spark.sql("CREATE TABLE t (k INT, v STRING)");
        SparkTimeTravelITCase.writeTable("t", GenericRow.of((Object[])new Object[]{1, BinaryString.fromString((String)"Hello")}), GenericRow.of((Object[])new Object[]{2, BinaryString.fromString((String)"Paimon")}));
        SparkTimeTravelITCase.writeTable("t", GenericRow.of((Object[])new Object[]{3, BinaryString.fromString((String)"Test")}), GenericRow.of((Object[])new Object[]{4, BinaryString.fromString((String)"Case")}));
        SparkTimeTravelITCase.writeTable("t", GenericRow.of((Object[])new Object[]{5, BinaryString.fromString((String)"Time")}), GenericRow.of((Object[])new Object[]{6, BinaryString.fromString((String)"Travel")}));
        SparkTimeTravelITCase.getTable("t").createTag("tag2", 2L);
        Assertions.assertThat((String)spark.sql("SELECT * FROM t VERSION AS OF 'tag2'").collectAsList().toString()).isEqualTo("[[1,Hello], [2,Paimon], [3,Test], [4,Case]]");
    }

    @Test
    public void testTravelToNonExistingTag() {
        spark.sql("CREATE TABLE t (k INT, v STRING)");
        Assertions.assertThatThrownBy(() -> spark.sql("SELECT * FROM t VERSION AS OF 'unknown'").collectAsList()).satisfies(new ThrowingConsumer[]{PaimonAssertions.anyCauseMatches(RuntimeException.class, (String)"Cannot find a time travel version for unknown")});
    }

    @Test
    public void testTravelToTagWithSnapshotExpiration() throws Exception {
        spark.sql("CREATE TABLE t (k INT, v STRING)");
        SparkTimeTravelITCase.writeTable("t", GenericRow.of((Object[])new Object[]{1, BinaryString.fromString((String)"Hello")}), GenericRow.of((Object[])new Object[]{2, BinaryString.fromString((String)"Paimon")}));
        SparkTimeTravelITCase.writeTable("t", GenericRow.of((Object[])new Object[]{3, BinaryString.fromString((String)"Test")}), GenericRow.of((Object[])new Object[]{4, BinaryString.fromString((String)"Case")}));
        SparkTimeTravelITCase.writeTable("t", GenericRow.of((Object[])new Object[]{5, BinaryString.fromString((String)"Time")}), GenericRow.of((Object[])new Object[]{6, BinaryString.fromString((String)"Travel")}));
        FileStoreTable table = SparkTimeTravelITCase.getTable("t");
        table.createTag("tag2", 2L);
        HashMap<String, String> expireOptions = new HashMap<String, String>();
        expireOptions.put(CoreOptions.SNAPSHOT_NUM_RETAINED_MAX.key(), "1");
        expireOptions.put(CoreOptions.SNAPSHOT_NUM_RETAINED_MIN.key(), "1");
        table.copy(expireOptions).newCommit("").expireSnapshots();
        Assertions.assertThat((long)table.snapshotManager().snapshotCount()).isEqualTo(1L);
        Assertions.assertThat((String)spark.sql("SELECT * FROM t VERSION AS OF 'tag2'").collectAsList().toString()).isEqualTo("[[1,Hello], [2,Paimon], [3,Test], [4,Case]]");
    }

    @Test
    public void testTravelToTagWithDigitalName() throws Exception {
        spark.sql("CREATE TABLE t (k INT, v STRING)");
        SparkTimeTravelITCase.writeTable("t", GenericRow.of((Object[])new Object[]{1, BinaryString.fromString((String)"Hello")}), GenericRow.of((Object[])new Object[]{2, BinaryString.fromString((String)"Paimon")}));
        SparkTimeTravelITCase.writeTable("t", GenericRow.of((Object[])new Object[]{3, BinaryString.fromString((String)"Test")}), GenericRow.of((Object[])new Object[]{4, BinaryString.fromString((String)"Case")}));
        FileStoreTable table = SparkTimeTravelITCase.getTable("t");
        table.createTag("1", 2L);
        Assertions.assertThat((String)spark.sql("SELECT * FROM t VERSION AS OF '1'").collectAsList().toString()).isEqualTo("[[1,Hello], [2,Paimon], [3,Test], [4,Case]]");
    }

    @Test
    public void testTravelWithWatermark() throws Exception {
        spark.sql("CREATE TABLE t (k INT, v STRING)");
        SparkTimeTravelITCase.writeTableWithWatermark("t", 1L, GenericRow.of((Object[])new Object[]{1, BinaryString.fromString((String)"Hello")}), GenericRow.of((Object[])new Object[]{2, BinaryString.fromString((String)"Paimon")}));
        SparkTimeTravelITCase.writeTableWithWatermark("t", null, GenericRow.of((Object[])new Object[]{1, BinaryString.fromString((String)"Null")}), GenericRow.of((Object[])new Object[]{2, BinaryString.fromString((String)"Watermark")}));
        SparkTimeTravelITCase.writeTableWithWatermark("t", 10L, GenericRow.of((Object[])new Object[]{3, BinaryString.fromString((String)"Time")}), GenericRow.of((Object[])new Object[]{4, BinaryString.fromString((String)"Travel")}));
        Assertions.assertThat((String)spark.sql("SELECT * FROM t version as of 'watermark-1'").collectAsList().toString()).isEqualTo("[[1,Hello], [2,Paimon]]");
        try {
            spark.sql("SELECT * FROM t version as of 'watermark-11'").collectAsList();
        }
        catch (Exception e) {
            Assertions.assertThat((boolean)e.getMessage().equals("There is currently no snapshot later than or equal to watermark[11]"));
        }
        Assertions.assertThat((String)spark.sql("SELECT * FROM t version as of 'watermark-9'").collectAsList().toString()).isEqualTo("[[1,Hello], [2,Paimon], [1,Null], [2,Watermark], [3,Time], [4,Travel]]");
        Assertions.assertThat((String)spark.sql("SELECT * FROM t version as of 'watermark-10'").collectAsList().toString()).isEqualTo("[[1,Hello], [2,Paimon], [1,Null], [2,Watermark], [3,Time], [4,Travel]]");
    }
}

