/*
 * Decompiled with CFR 0.152.
 */
package org.apache.paimon.spark;

import java.nio.file.Path;
import java.util.Comparator;
import java.util.List;
import org.apache.paimon.spark.SparkCatalog;
import org.apache.paimon.spark.extensions.PaimonSparkSessionExtensions;
import org.apache.spark.sql.SparkSession;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInstance;
import org.junit.jupiter.api.io.TempDir;

@TestInstance(value=TestInstance.Lifecycle.PER_CLASS)
public class SparkWriteITCase {
    protected SparkSession spark = null;

    @BeforeAll
    public void startMetastoreAndSpark(@TempDir Path tempDir) {
        org.apache.paimon.fs.Path warehousePath = new org.apache.paimon.fs.Path("file:///" + tempDir.toString());
        this.spark = SparkSession.builder().master("local[2]").config("spark.sql.extensions", PaimonSparkSessionExtensions.class.getName()).getOrCreate();
        this.spark.conf().set("spark.sql.catalog.paimon", SparkCatalog.class.getName());
        this.spark.conf().set("spark.sql.catalog.paimon.warehouse", warehousePath.toString());
        this.spark.sql("CREATE DATABASE paimon.db");
        this.spark.sql("USE paimon.db");
    }

    @AfterAll
    public void stopMetastoreAndSpark() {
        if (this.spark != null) {
            this.spark.stop();
            this.spark = null;
        }
    }

    @AfterEach
    public void afterEach() {
        this.spark.sql("DROP TABLE T");
    }

    @Test
    public void testWrite() {
        this.spark.sql("CREATE TABLE T (a INT, b INT, c STRING) TBLPROPERTIES ('primary-key'='a', 'bucket'='4', 'file.format'='avro')");
        this.innerSimpleWrite();
    }

    @Test
    public void testWritePartitionTable() {
        this.spark.sql("CREATE TABLE T (a INT, b INT, c STRING) PARTITIONED BY (a) TBLPROPERTIES ('primary-key'='a,b', 'bucket'='4', 'file.format'='avro')");
        this.innerSimpleWrite();
    }

    @Test
    public void testSortSpill() {
        this.spark.sql("CREATE TABLE T (a INT, b INT, c STRING) TBLPROPERTIES ('primary-key'='a', 'bucket'='4', 'file.format'='avro', 'sort-spill-threshold'='2')");
        this.innerSimpleWrite();
    }

    private void innerSimpleWrite() {
        this.spark.sql("INSERT INTO T VALUES (1, 2, '3')").collectAsList();
        List rows = this.spark.sql("SELECT * FROM T").collectAsList();
        Assertions.assertThat((String)rows.toString()).isEqualTo("[[1,2,3]]");
        this.spark.sql("INSERT INTO T VALUES (4, 5, '6')").collectAsList();
        this.spark.sql("INSERT INTO T VALUES (1, 2, '7')").collectAsList();
        this.spark.sql("INSERT INTO T VALUES (4, 5, '8')").collectAsList();
        rows = this.spark.sql("SELECT * FROM T").collectAsList();
        rows.sort(Comparator.comparingInt(o -> o.getInt(0)));
        Assertions.assertThat((String)rows.toString()).isEqualTo("[[1,2,7], [4,5,8]]");
        this.spark.sql("DELETE FROM T WHERE a=1").collectAsList();
        rows = this.spark.sql("SELECT * FROM T").collectAsList();
        Assertions.assertThat((String)rows.toString()).isEqualTo("[[4,5,8]]");
        this.spark.sql("DELETE FROM T").collectAsList();
        rows = this.spark.sql("SELECT * FROM T").collectAsList();
        Assertions.assertThat((String)rows.toString()).isEqualTo("[]");
    }

    @Test
    public void testDeleteWhereNonePk() {
        this.spark.sql("CREATE TABLE T (a INT, b INT, c STRING) TBLPROPERTIES ('primary-key'='a', 'file.format'='avro')");
        this.spark.sql("INSERT INTO T VALUES (1, 11, '111'), (2, 22, '222')").collectAsList();
        this.spark.sql("DELETE FROM T WHERE b=11").collectAsList();
        List rows = this.spark.sql("SELECT * FROM T").collectAsList();
        Assertions.assertThat((String)rows.toString()).isEqualTo("[[2,22,222]]");
    }

    @Test
    public void testTruncateTable() {
        this.spark.sql("CREATE TABLE T (a INT, b INT, c STRING) TBLPROPERTIES ('primary-key'='a', 'file.format'='avro')");
        this.spark.sql("INSERT INTO T VALUES (1, 11, '111'), (2, 22, '222')");
        this.spark.sql("TRUNCATE TABLE T");
        List rows = this.spark.sql("SELECT * FROM T").collectAsList();
        Assertions.assertThat((String)rows.toString()).isEqualTo("[]");
    }

    @Test
    public void testTruncatePartition1() {
        this.spark.sql("CREATE TABLE T (a INT, b INT, c LONG) PARTITIONED BY (c) TBLPROPERTIES ('primary-key'='a,c')");
        this.spark.sql("INSERT INTO T VALUES (1, 11, 111), (2, 22, 222)");
        this.spark.sql("TRUNCATE TABLE T PARTITION (c = 111)");
        List rows = this.spark.sql("SELECT * FROM T").collectAsList();
        Assertions.assertThat((String)rows.toString()).isEqualTo("[[2,22,222]]");
    }

    @Test
    public void testTruncatePartition() {
        this.spark.sql("CREATE TABLE T (a INT, b INT, c LONG, d STRING) PARTITIONED BY (c,d) TBLPROPERTIES ('primary-key'='a,c,d')");
        this.spark.sql("INSERT INTO T VALUES (1, 11, 111, 'a'), (2, 22, 222, 'b'), (3, 33, 333, 'b'), (4, 44, 444, 'a')");
        this.spark.sql("TRUNCATE TABLE T PARTITION (d = 'a')");
        List rows = this.spark.sql("SELECT * FROM T ORDER BY a").collectAsList();
        Assertions.assertThat((String)rows.toString()).isEqualTo("[[2,22,222,b], [3,33,333,b]]");
    }

    @Test
    public void testWriteDynamicBucketPartitionedTable() {
        this.spark.sql("CREATE TABLE T (a INT, b INT, c STRING) PARTITIONED BY (a) TBLPROPERTIES ('primary-key'='a,b', 'bucket'='-1', 'dynamic-bucket.target-row-num'='3', 'dynamic-bucket.initial-buckets'='1')");
        this.spark.sql("INSERT INTO T VALUES (1, 1, '1'), (1, 2, '2')");
        List rows = this.spark.sql("SELECT max(bucket) FROM `T$FILES`").collectAsList();
        Assertions.assertThat((String)rows.toString()).isEqualTo("[[0]]");
        this.spark.sql("INSERT INTO T VALUES (1, 2, '22'), (1, 3, '3')");
        rows = this.spark.sql("SELECT max(bucket) FROM `T$FILES`").collectAsList();
        Assertions.assertThat((String)rows.toString()).isEqualTo("[[0]]");
        this.spark.sql("INSERT INTO T VALUES (1, 4, '4'), (1, 5, '5')").collectAsList();
        rows = this.spark.sql("SELECT max(bucket) FROM `T$FILES`").collectAsList();
        Assertions.assertThat((String)rows.toString()).isEqualTo("[[1]]");
        this.spark.sql("INSERT INTO T VALUES (1, 2, '222'), (1, 6, '6'), (1, 7, '7')").collectAsList();
        rows = this.spark.sql("SELECT max(bucket) FROM `T$FILES`").collectAsList();
        Assertions.assertThat((String)rows.toString()).isEqualTo("[[2]]");
        rows = this.spark.sql("SELECT count(1) FROM T").collectAsList();
        Assertions.assertThat((String)rows.toString()).isEqualTo("[[7]]");
        rows = this.spark.sql("SELECT * FROM T WHERE b = 2").collectAsList();
        Assertions.assertThat((String)rows.toString()).isEqualTo("[[1,2,222]]");
        this.spark.sql("INSERT INTO T VALUES (2, 1, '11'), (2, 3, '33'), (1, 8, '8')").collectAsList();
        rows = this.spark.sql("SELECT count(1) FROM T").collectAsList();
        Assertions.assertThat((String)rows.toString()).isEqualTo("[[10]]");
        rows = this.spark.sql("SELECT partition, max(bucket) FROM `T$FILES` GROUP BY partition ORDER BY partition").collectAsList();
        Assertions.assertThat((String)rows.toString()).isEqualTo("[[[1],2], [[2],0]]");
    }

    @Test
    public void testReadWriteUnawareBucketTable() {
        this.spark.sql("CREATE TABLE T (a INT, b INT, c STRING) PARTITIONED BY (a) TBLPROPERTIES ('bucket'='-1')");
        this.spark.sql("INSERT INTO T VALUES (1, 1, '1'), (1, 2, '2')");
        this.spark.sql("INSERT INTO T VALUES (1, 1, '1'), (1, 2, '2')");
        this.spark.sql("INSERT INTO T VALUES (2, 1, '1'), (2, 2, '2')");
        this.spark.sql("INSERT INTO T VALUES (2, 1, '1'), (2, 2, '2')");
        this.spark.sql("INSERT INTO T VALUES (3, 1, '1'), (3, 2, '2')");
        this.spark.sql("INSERT INTO T VALUES (3, 1, '1'), (3, 2, '2')");
        List rows = this.spark.sql("SELECT count(1) FROM T").collectAsList();
        Assertions.assertThat((String)rows.toString()).isEqualTo("[[12]]");
        rows = this.spark.sql("SELECT * FROM T WHERE b = 2 AND a = 1").collectAsList();
        Assertions.assertThat((String)rows.toString()).isEqualTo("[[1,2,2], [1,2,2]]");
        rows = this.spark.sql("SELECT max(bucket) FROM `T$FILES`").collectAsList();
        Assertions.assertThat((String)rows.toString()).isEqualTo("[[0]]");
    }

    @Test
    public void testNonnull() {
        try {
            this.spark.sql("CREATE TABLE S AS SELECT 1 as a, 2 as b, 'yann' as c");
            this.spark.sql("CREATE TABLE T (a INT NOT NULL, b INT, c STRING)");
            this.spark.sql("INSERT INTO T SELECT * FROM S");
            List rows = this.spark.sql("SELECT * FROM T").collectAsList();
            Assertions.assertThat((String)rows.toString()).isEqualTo("[[1,2,yann]]");
        }
        finally {
            this.spark.sql("DROP TABLE IF EXISTS S");
        }
    }
}

