/*
 * Decompiled with CFR 0.152.
 */
package org.apache.paimon.spark;

import java.io.IOException;
import java.util.Arrays;
import java.util.Comparator;
import java.util.List;
import java.util.stream.Collectors;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.FileStatus;
import org.apache.paimon.fs.Path;
import org.apache.paimon.fs.local.LocalFileIO;
import org.apache.paimon.spark.SparkCatalog;
import org.apache.paimon.spark.extensions.PaimonSparkSessionExtensions;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.FileStoreTableFactory;
import org.apache.spark.sql.SparkSession;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInstance;
import org.junit.jupiter.api.io.TempDir;

@TestInstance(value=TestInstance.Lifecycle.PER_CLASS)
public class SparkWriteITCase {
    protected SparkSession spark = null;
    protected static Path warehousePath = null;

    @BeforeAll
    public void startMetastoreAndSpark(@TempDir java.nio.file.Path tempDir) {
        warehousePath = new Path("file:///" + tempDir.toString());
        this.spark = SparkSession.builder().master("local[2]").config("spark.sql.extensions", PaimonSparkSessionExtensions.class.getName()).getOrCreate();
        this.spark.conf().set("spark.sql.catalog.paimon", SparkCatalog.class.getName());
        this.spark.conf().set("spark.sql.catalog.paimon.warehouse", warehousePath.toString());
        this.spark.sql("CREATE DATABASE paimon.db");
        this.spark.sql("USE paimon.db");
    }

    @AfterAll
    public void stopMetastoreAndSpark() {
        if (this.spark != null) {
            this.spark.stop();
            this.spark = null;
        }
    }

    @AfterEach
    public void afterEach() {
        this.spark.sql("DROP TABLE T");
    }

    @Test
    public void testWrite() {
        this.spark.sql("CREATE TABLE T (a INT, b INT, c STRING) TBLPROPERTIES ('primary-key'='a', 'bucket'='4', 'file.format'='avro')");
        this.innerSimpleWrite();
    }

    @Test
    public void testWritePartitionTable() {
        this.spark.sql("CREATE TABLE T (a INT, b INT, c STRING) PARTITIONED BY (a) TBLPROPERTIES ('primary-key'='a,b', 'bucket'='4', 'file.format'='avro')");
        this.innerSimpleWrite();
    }

    @Test
    public void testSortSpill() {
        this.spark.sql("CREATE TABLE T (a INT, b INT, c STRING) TBLPROPERTIES ('primary-key'='a', 'bucket'='4', 'file.format'='avro', 'sort-spill-threshold'='2')");
        this.innerSimpleWrite();
    }

    private void innerSimpleWrite() {
        this.spark.sql("INSERT INTO T VALUES (1, 2, '3')").collectAsList();
        List rows = this.spark.sql("SELECT * FROM T").collectAsList();
        Assertions.assertThat((String)rows.toString()).isEqualTo("[[1,2,3]]");
        this.spark.sql("INSERT INTO T VALUES (4, 5, '6')").collectAsList();
        this.spark.sql("INSERT INTO T VALUES (1, 2, '7')").collectAsList();
        this.spark.sql("INSERT INTO T VALUES (4, 5, '8')").collectAsList();
        rows = this.spark.sql("SELECT * FROM T").collectAsList();
        rows.sort(Comparator.comparingInt(o -> o.getInt(0)));
        Assertions.assertThat((String)rows.toString()).isEqualTo("[[1,2,7], [4,5,8]]");
        this.spark.sql("DELETE FROM T WHERE a=1").collectAsList();
        rows = this.spark.sql("SELECT * FROM T").collectAsList();
        Assertions.assertThat((String)rows.toString()).isEqualTo("[[4,5,8]]");
        this.spark.sql("DELETE FROM T").collectAsList();
        rows = this.spark.sql("SELECT * FROM T").collectAsList();
        Assertions.assertThat((String)rows.toString()).isEqualTo("[]");
    }

    @Test
    public void testDeleteWhereNonePk() {
        this.spark.sql("CREATE TABLE T (a INT, b INT, c STRING) TBLPROPERTIES ('primary-key'='a', 'file.format'='avro')");
        this.spark.sql("INSERT INTO T VALUES (1, 11, '111'), (2, 22, '222')").collectAsList();
        this.spark.sql("DELETE FROM T WHERE b=11").collectAsList();
        List rows = this.spark.sql("SELECT * FROM T").collectAsList();
        Assertions.assertThat((String)rows.toString()).isEqualTo("[[2,22,222]]");
    }

    @Test
    public void testTruncateTable() {
        this.spark.sql("CREATE TABLE T (a INT, b INT, c STRING) TBLPROPERTIES ('primary-key'='a', 'file.format'='avro')");
        this.spark.sql("INSERT INTO T VALUES (1, 11, '111'), (2, 22, '222')");
        this.spark.sql("TRUNCATE TABLE T");
        List rows = this.spark.sql("SELECT * FROM T").collectAsList();
        Assertions.assertThat((String)rows.toString()).isEqualTo("[]");
    }

    @Test
    public void testTruncatePartition1() {
        this.spark.sql("CREATE TABLE T (a INT, b INT, c LONG) PARTITIONED BY (c) TBLPROPERTIES ('primary-key'='a,c')");
        this.spark.sql("INSERT INTO T VALUES (1, 11, 111), (2, 22, 222)");
        this.spark.sql("TRUNCATE TABLE T PARTITION (c = 111)");
        List rows = this.spark.sql("SELECT * FROM T").collectAsList();
        Assertions.assertThat((String)rows.toString()).isEqualTo("[[2,22,222]]");
    }

    @Test
    public void testTruncatePartition() {
        this.spark.sql("CREATE TABLE T (a INT, b INT, c LONG, d STRING) PARTITIONED BY (c,d) TBLPROPERTIES ('primary-key'='a,c,d')");
        this.spark.sql("INSERT INTO T VALUES (1, 11, 111, 'a'), (2, 22, 222, 'b'), (3, 33, 333, 'b'), (4, 44, 444, 'a')");
        this.spark.sql("TRUNCATE TABLE T PARTITION (d = 'a')");
        List rows = this.spark.sql("SELECT * FROM T ORDER BY a").collectAsList();
        Assertions.assertThat((String)rows.toString()).isEqualTo("[[2,22,222,b], [3,33,333,b]]");
    }

    @Test
    public void testWriteDynamicBucketPartitionedTable() {
        this.spark.sql("CREATE TABLE T (a INT, b INT, c STRING) PARTITIONED BY (a) TBLPROPERTIES ('primary-key'='a,b', 'bucket'='-1', 'dynamic-bucket.target-row-num'='3', 'dynamic-bucket.initial-buckets'='1')");
        this.spark.sql("INSERT INTO T VALUES (1, 1, '1'), (1, 2, '2')");
        List rows = this.spark.sql("SELECT max(bucket) FROM `T$FILES`").collectAsList();
        Assertions.assertThat((String)rows.toString()).isEqualTo("[[0]]");
        this.spark.sql("INSERT INTO T VALUES (1, 2, '22'), (1, 3, '3')");
        rows = this.spark.sql("SELECT max(bucket) FROM `T$FILES`").collectAsList();
        Assertions.assertThat((String)rows.toString()).isEqualTo("[[0]]");
        this.spark.sql("INSERT INTO T VALUES (1, 4, '4'), (1, 5, '5')").collectAsList();
        rows = this.spark.sql("SELECT max(bucket) FROM `T$FILES`").collectAsList();
        Assertions.assertThat((String)rows.toString()).isEqualTo("[[1]]");
        this.spark.sql("INSERT INTO T VALUES (1, 2, '222'), (1, 6, '6'), (1, 7, '7')").collectAsList();
        rows = this.spark.sql("SELECT max(bucket) FROM `T$FILES`").collectAsList();
        Assertions.assertThat((String)rows.toString()).isEqualTo("[[2]]");
        rows = this.spark.sql("SELECT count(1) FROM T").collectAsList();
        Assertions.assertThat((String)rows.toString()).isEqualTo("[[7]]");
        rows = this.spark.sql("SELECT * FROM T WHERE b = 2").collectAsList();
        Assertions.assertThat((String)rows.toString()).isEqualTo("[[1,2,222]]");
        this.spark.sql("INSERT INTO T VALUES (2, 1, '11'), (2, 3, '33'), (1, 8, '8')").collectAsList();
        rows = this.spark.sql("SELECT count(1) FROM T").collectAsList();
        Assertions.assertThat((String)rows.toString()).isEqualTo("[[10]]");
        rows = this.spark.sql("SELECT partition, max(bucket) FROM `T$FILES` GROUP BY partition ORDER BY partition").collectAsList();
        Assertions.assertThat((String)rows.toString()).isEqualTo("[[[1],2], [[2],0]]");
    }

    @Test
    public void testReadWriteUnawareBucketTable() {
        this.spark.sql("CREATE TABLE T (a INT, b INT, c STRING) PARTITIONED BY (a) TBLPROPERTIES ('bucket'='-1')");
        this.spark.sql("INSERT INTO T VALUES (1, 1, '1'), (1, 2, '2')");
        this.spark.sql("INSERT INTO T VALUES (1, 1, '1'), (1, 2, '2')");
        this.spark.sql("INSERT INTO T VALUES (2, 1, '1'), (2, 2, '2')");
        this.spark.sql("INSERT INTO T VALUES (2, 1, '1'), (2, 2, '2')");
        this.spark.sql("INSERT INTO T VALUES (3, 1, '1'), (3, 2, '2')");
        this.spark.sql("INSERT INTO T VALUES (3, 1, '1'), (3, 2, '2')");
        List rows = this.spark.sql("SELECT count(1) FROM T").collectAsList();
        Assertions.assertThat((String)rows.toString()).isEqualTo("[[12]]");
        rows = this.spark.sql("SELECT * FROM T WHERE b = 2 AND a = 1").collectAsList();
        Assertions.assertThat((String)rows.toString()).isEqualTo("[[1,2,2], [1,2,2]]");
        rows = this.spark.sql("SELECT max(bucket) FROM `T$FILES`").collectAsList();
        Assertions.assertThat((String)rows.toString()).isEqualTo("[[0]]");
    }

    @Test
    public void testDefaultDataFilePrefix() {
        this.spark.sql("CREATE TABLE T (a INT, b INT, c STRING)");
        this.spark.sql("INSERT INTO T VALUES (1, 1, 'aa')");
        this.spark.sql("INSERT INTO T VALUES (2, 2, 'bb')");
        this.spark.sql("INSERT INTO T VALUES (3, 3, 'cc')");
        List data = this.spark.sql("SELECT * FROM T").collectAsList();
        Assertions.assertThat((String)data.toString()).isEqualTo("[[1,1,aa], [2,2,bb], [3,3,cc]]");
        List rows = this.spark.sql("select file_path from `T$files`").collectAsList();
        List fileNames = rows.stream().map(x -> x.getString(0)).collect(Collectors.toList());
        org.junit.jupiter.api.Assertions.assertEquals((int)3, (int)fileNames.size());
        for (String fileName : fileNames) {
            org.junit.jupiter.api.Assertions.assertTrue((boolean)fileName.startsWith("data-"));
        }
    }

    @Test
    public void testDataFilePrefixForAppendOnlyTable() {
        this.spark.sql("CREATE TABLE T (a INT, b INT, c STRING)");
        this.spark.conf().set("spark.paimon.data-file.prefix", "test-");
        this.spark.sql("INSERT INTO T VALUES (1, 1, 'aa')");
        this.spark.sql("INSERT INTO T VALUES (2, 2, 'bb')");
        this.spark.sql("INSERT INTO T VALUES (3, 3, 'cc')");
        List data = this.spark.sql("SELECT * FROM T").collectAsList();
        Assertions.assertThat((String)data.toString()).isEqualTo("[[1,1,aa], [2,2,bb], [3,3,cc]]");
        List rows = this.spark.sql("select file_path from `T$files`").collectAsList();
        List fileNames = rows.stream().map(x -> x.getString(0)).collect(Collectors.toList());
        org.junit.jupiter.api.Assertions.assertEquals((int)3, (int)fileNames.size());
        for (String fileName : fileNames) {
            org.junit.jupiter.api.Assertions.assertTrue((boolean)fileName.startsWith("test-"));
        }
    }

    @Test
    public void testDataFilePrefixForPKTable() {
        this.spark.sql("CREATE TABLE T (a INT, b INT, c STRING) TBLPROPERTIES ('primary-key'='a')");
        this.spark.conf().set("spark.paimon.data-file.prefix", "test-");
        this.spark.sql("INSERT INTO T VALUES (1, 1, 'aa')");
        this.spark.sql("INSERT INTO T VALUES (2, 2, 'bb')");
        this.spark.sql("INSERT INTO T VALUES (1, 3, 'cc')");
        List data = this.spark.sql("SELECT * FROM T order by a").collectAsList();
        Assertions.assertThat((String)data.toString()).isEqualTo("[[1,3,cc], [2,2,bb]]");
        List rows = this.spark.sql("select file_path from `T$files`").collectAsList();
        List fileNames = rows.stream().map(x -> x.getString(0)).collect(Collectors.toList());
        org.junit.jupiter.api.Assertions.assertEquals((int)3, (int)fileNames.size());
        for (String fileName : fileNames) {
            org.junit.jupiter.api.Assertions.assertTrue((boolean)fileName.startsWith("test-"));
        }
        this.spark.conf().unset("spark.paimon.data-file.prefix");
    }

    @Test
    public void testChangelogFilePrefixForPkTable() throws Exception {
        this.spark.sql("CREATE TABLE T (a INT, b INT, c STRING) TBLPROPERTIES ('primary-key'='a', 'bucket' = '1', 'changelog-producer' = 'lookup')");
        FileStoreTable table = SparkWriteITCase.getTable("T");
        Path tabLocation = table.location();
        FileIO fileIO = table.fileIO();
        this.spark.sql("INSERT INTO T VALUES (1, 1, 'aa')");
        FileStatus[] files1 = fileIO.listStatus(new Path(tabLocation, "bucket-0"));
        org.junit.jupiter.api.Assertions.assertEquals((long)1L, (long)this.dataFileCount(files1, "changelog-"));
        this.spark.conf().set("spark.paimon.changelog-file.prefix", "test-changelog-");
        this.spark.sql("INSERT INTO T VALUES (2, 2, 'bb')");
        FileStatus[] files2 = fileIO.listStatus(new Path(tabLocation, "bucket-0"));
        org.junit.jupiter.api.Assertions.assertEquals((long)1L, (long)this.dataFileCount(files2, "test-changelog-"));
        this.spark.conf().unset("spark.paimon.changelog-file.prefix");
    }

    @Test
    public void testMarkDone() throws IOException {
        this.spark.sql("CREATE TABLE T (a INT, b INT, c STRING) PARTITIONED BY (c) TBLPROPERTIES ('partition.end-input-to-done' = 'true', 'partition.mark-done-action' = 'success-file')");
        this.spark.sql("INSERT INTO T VALUES (1, 1, 'aa')");
        FileStoreTable table = SparkWriteITCase.getTable("T");
        FileIO fileIO = table.fileIO();
        Path tabLocation = table.location();
        org.junit.jupiter.api.Assertions.assertTrue((boolean)fileIO.exists(new Path(tabLocation, "c=aa/_SUCCESS")));
    }

    @Test
    public void testDataFileSuffixName() {
        this.spark.sql("CREATE TABLE T (a INT, b INT, c STRING) TBLPROPERTIES ('bucket' = '1', 'primary-key'='a', 'write-only' = 'true', 'file.format' = 'parquet', 'file.compression' = 'zstd')");
        this.spark.sql("INSERT INTO T VALUES (1, 1, 'aa')");
        this.spark.sql("INSERT INTO T VALUES (2, 2, 'bb')");
        this.spark.conf().set("spark.paimon.file.suffix.include.compression", true);
        this.spark.sql("INSERT INTO T VALUES (3, 3, 'cc')");
        this.spark.sql("INSERT INTO T VALUES (4, 4, 'dd')");
        List data2 = this.spark.sql("SELECT * FROM T order by a").collectAsList();
        Assertions.assertThat((String)data2.toString()).isEqualTo("[[1,1,aa], [2,2,bb], [3,3,cc], [4,4,dd]]");
        List files = this.spark.sql("select file_path from `T$files`").collectAsList().stream().map(x -> x.getString(0)).collect(Collectors.toList());
        org.junit.jupiter.api.Assertions.assertEquals((int)4, (int)files.size());
        String defaultExtension = ".parquet";
        String newExtension = ".zstd.parquet";
        org.junit.jupiter.api.Assertions.assertEquals((long)2L, (long)files.stream().filter(name -> name.endsWith(defaultExtension) && !name.endsWith(newExtension)).count());
        org.junit.jupiter.api.Assertions.assertEquals((long)2L, (long)files.stream().filter(name -> name.endsWith(newExtension)).count());
        this.spark.conf().unset("spark.paimon.file.suffix.include.compression");
    }

    @Test
    public void testChangelogFileSuffixName() throws Exception {
        this.spark.sql("CREATE TABLE T (a INT, b INT, c STRING) TBLPROPERTIES ('primary-key'='a', 'bucket' = '1', 'changelog-producer' = 'lookup', 'file.format' = 'parquet', 'file.compression' = 'zstd')");
        FileStoreTable table = SparkWriteITCase.getTable("T");
        Path tabLocation = table.location();
        FileIO fileIO = table.fileIO();
        this.spark.sql("INSERT INTO T VALUES (1, 1, 'aa')");
        this.spark.conf().set("spark.paimon.file.suffix.include.compression", true);
        this.spark.sql("INSERT INTO T VALUES (2, 2, 'bb')");
        List files = Arrays.stream(fileIO.listStatus(new Path(tabLocation, "bucket-0"))).map(name -> name.getPath().getName()).filter(name -> name.startsWith("changelog-")).collect(Collectors.toList());
        String defaultExtension = ".parquet";
        String newExtension = ".zstd.parquet";
        org.junit.jupiter.api.Assertions.assertEquals((long)1L, (long)files.stream().filter(name -> name.endsWith(defaultExtension) && !name.endsWith(newExtension)).count());
        org.junit.jupiter.api.Assertions.assertEquals((long)1L, (long)files.stream().filter(name -> name.endsWith(newExtension)).count());
        this.spark.conf().unset("spark.paimon.file.suffix.include.compression");
    }

    protected static FileStoreTable getTable(String tableName) {
        return FileStoreTableFactory.create((FileIO)LocalFileIO.create(), (Path)new Path(warehousePath, String.format("db.db/%s", tableName)));
    }

    private long dataFileCount(FileStatus[] files, String filePrefix) {
        return Arrays.stream(files).filter(f -> f.getPath().getName().startsWith(filePrefix)).count();
    }
}

