/*
 * Decompiled with CFR 0.152.
 */
package org.apache.iceberg.spark.source;

import java.io.File;
import java.io.Serializable;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.apache.avro.generic.GenericData;
import org.apache.iceberg.DataFiles;
import org.apache.iceberg.Files;
import org.apache.iceberg.Parameter;
import org.apache.iceberg.ParameterizedTestExtension;
import org.apache.iceberg.Parameters;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
import org.apache.iceberg.Table;
import org.apache.iceberg.avro.Avro;
import org.apache.iceberg.hadoop.HadoopTables;
import org.apache.iceberg.io.FileAppender;
import org.apache.iceberg.io.OutputFile;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.spark.data.RandomData;
import org.apache.iceberg.spark.data.TestHelpers;
import org.apache.iceberg.spark.source.ComplexRecord;
import org.apache.iceberg.spark.source.NestedRecord;
import org.apache.iceberg.spark.source.SimpleRecord;
import org.apache.iceberg.types.Type;
import org.apache.iceberg.types.Types;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.types.DataType;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
import org.assertj.core.api.AbstractBooleanAssert;
import org.assertj.core.api.AbstractStringAssert;
import org.assertj.core.api.Assertions;
import org.assertj.core.api.Assumptions;
import org.assertj.core.api.ListAssert;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.TestTemplate;
import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.api.io.TempDir;

@ExtendWith(value={ParameterizedTestExtension.class})
public class TestPartitionValues {
    private static final Schema SUPPORTED_PRIMITIVES = new Schema(new Types.NestedField[]{Types.NestedField.required((int)100, (String)"id", (Type)Types.LongType.get()), Types.NestedField.required((int)101, (String)"data", (Type)Types.StringType.get()), Types.NestedField.required((int)102, (String)"b", (Type)Types.BooleanType.get()), Types.NestedField.required((int)103, (String)"i", (Type)Types.IntegerType.get()), Types.NestedField.required((int)104, (String)"l", (Type)Types.LongType.get()), Types.NestedField.required((int)105, (String)"f", (Type)Types.FloatType.get()), Types.NestedField.required((int)106, (String)"d", (Type)Types.DoubleType.get()), Types.NestedField.required((int)107, (String)"date", (Type)Types.DateType.get()), Types.NestedField.required((int)108, (String)"ts", (Type)Types.TimestampType.withZone()), Types.NestedField.required((int)110, (String)"s", (Type)Types.StringType.get()), Types.NestedField.required((int)113, (String)"bytes", (Type)Types.BinaryType.get()), Types.NestedField.required((int)114, (String)"dec_9_0", (Type)Types.DecimalType.of((int)9, (int)0)), Types.NestedField.required((int)115, (String)"dec_11_2", (Type)Types.DecimalType.of((int)11, (int)2)), Types.NestedField.required((int)116, (String)"dec_38_10", (Type)Types.DecimalType.of((int)38, (int)10))});
    private static final Schema SIMPLE_SCHEMA = new Schema(new Types.NestedField[]{Types.NestedField.optional((int)1, (String)"id", (Type)Types.IntegerType.get()), Types.NestedField.optional((int)2, (String)"data", (Type)Types.StringType.get())});
    private static final PartitionSpec SPEC = PartitionSpec.builderFor((Schema)SIMPLE_SCHEMA).identity("data").build();
    private static SparkSession spark = null;
    @TempDir
    private Path temp;
    @Parameter(index=0)
    private String format;
    @Parameter(index=1)
    private boolean vectorized;

    @Parameters(name="format = {0}, vectorized = {1}")
    public static Object[][] parameters() {
        return new Object[][]{{"parquet", false}, {"parquet", true}, {"avro", false}, {"orc", false}, {"orc", true}};
    }

    @BeforeAll
    public static void startSpark() {
        spark = SparkSession.builder().master("local[2]").getOrCreate();
    }

    @AfterAll
    public static void stopSpark() {
        SparkSession currentSpark = spark;
        spark = null;
        currentSpark.stop();
    }

    @TestTemplate
    public void testNullPartitionValue() throws Exception {
        String desc = "null_part";
        File parent = new File(this.temp.toFile(), desc);
        File location = new File(parent, "test");
        File dataFolder = new File(location, "data");
        ((AbstractBooleanAssert)Assertions.assertThat((boolean)dataFolder.mkdirs()).as("mkdirs should succeed", new Object[0])).isTrue();
        HadoopTables tables = new HadoopTables(spark.sessionState().newHadoopConf());
        Table table = tables.create(SIMPLE_SCHEMA, SPEC, location.toString());
        table.updateProperties().set("write.format.default", this.format).commit();
        ArrayList expected = Lists.newArrayList((Object[])new SimpleRecord[]{new SimpleRecord(1, "a"), new SimpleRecord(2, "b"), new SimpleRecord(3, "c"), new SimpleRecord(4, null)});
        Dataset df = spark.createDataFrame((List)expected, SimpleRecord.class);
        df.select("id", new String[]{"data"}).write().format("iceberg").mode(SaveMode.Append).save(location.toString());
        Dataset result = spark.read().format("iceberg").option("vectorization-enabled", String.valueOf(this.vectorized)).load(location.toString());
        List actual = result.orderBy("id", new String[0]).as(Encoders.bean(SimpleRecord.class)).collectAsList();
        ((ListAssert)Assertions.assertThat((List)actual).as("Number of rows should match", new Object[0])).hasSameSizeAs((Iterable)expected);
        ((ListAssert)Assertions.assertThat((List)actual).as("Result rows should match", new Object[0])).isEqualTo((Object)expected);
    }

    @TestTemplate
    public void testReorderedColumns() throws Exception {
        String desc = "reorder_columns";
        File parent = new File(this.temp.toFile(), desc);
        File location = new File(parent, "test");
        File dataFolder = new File(location, "data");
        ((AbstractBooleanAssert)Assertions.assertThat((boolean)dataFolder.mkdirs()).as("mkdirs should succeed", new Object[0])).isTrue();
        HadoopTables tables = new HadoopTables(spark.sessionState().newHadoopConf());
        Table table = tables.create(SIMPLE_SCHEMA, SPEC, location.toString());
        table.updateProperties().set("write.format.default", this.format).commit();
        ArrayList expected = Lists.newArrayList((Object[])new SimpleRecord[]{new SimpleRecord(1, "a"), new SimpleRecord(2, "b"), new SimpleRecord(3, "c")});
        Dataset df = spark.createDataFrame((List)expected, SimpleRecord.class);
        df.select("data", new String[]{"id"}).write().format("iceberg").mode(SaveMode.Append).option("check-ordering", "false").save(location.toString());
        Dataset result = spark.read().format("iceberg").option("vectorization-enabled", String.valueOf(this.vectorized)).load(location.toString());
        List actual = result.orderBy("id", new String[0]).as(Encoders.bean(SimpleRecord.class)).collectAsList();
        ((ListAssert)Assertions.assertThat((List)actual).as("Number of rows should match", new Object[0])).hasSameSizeAs((Iterable)expected);
        ((ListAssert)Assertions.assertThat((List)actual).as("Result rows should match", new Object[0])).isEqualTo((Object)expected);
    }

    @TestTemplate
    public void testReorderedColumnsNoNullability() throws Exception {
        String desc = "reorder_columns_no_nullability";
        File parent = new File(this.temp.toFile(), desc);
        File location = new File(parent, "test");
        File dataFolder = new File(location, "data");
        ((AbstractBooleanAssert)Assertions.assertThat((boolean)dataFolder.mkdirs()).as("mkdirs should succeed", new Object[0])).isTrue();
        HadoopTables tables = new HadoopTables(spark.sessionState().newHadoopConf());
        Table table = tables.create(SIMPLE_SCHEMA, SPEC, location.toString());
        table.updateProperties().set("write.format.default", this.format).commit();
        ArrayList expected = Lists.newArrayList((Object[])new SimpleRecord[]{new SimpleRecord(1, "a"), new SimpleRecord(2, "b"), new SimpleRecord(3, "c")});
        Dataset df = spark.createDataFrame((List)expected, SimpleRecord.class);
        df.select("data", new String[]{"id"}).write().format("iceberg").mode(SaveMode.Append).option("check-ordering", "false").option("check-nullability", "false").save(location.toString());
        Dataset result = spark.read().format("iceberg").option("vectorization-enabled", String.valueOf(this.vectorized)).load(location.toString());
        List actual = result.orderBy("id", new String[0]).as(Encoders.bean(SimpleRecord.class)).collectAsList();
        ((ListAssert)Assertions.assertThat((List)actual).as("Number of rows should match", new Object[0])).hasSameSizeAs((Iterable)expected);
        ((ListAssert)Assertions.assertThat((List)actual).as("Result rows should match", new Object[0])).isEqualTo((Object)expected);
    }

    @TestTemplate
    public void testPartitionValueTypes() throws Exception {
        String[] columnNames = new String[]{"b", "i", "l", "f", "d", "date", "ts", "s", "bytes", "dec_9_0", "dec_11_2", "dec_38_10"};
        HadoopTables tables = new HadoopTables(spark.sessionState().newHadoopConf());
        String sourceLocation = this.temp.resolve("source_table").toString();
        Table source = tables.create(SUPPORTED_PRIMITIVES, sourceLocation);
        List<GenericData.Record> expected = RandomData.generateList(source.schema(), 2, 128735L);
        File avroData = File.createTempFile("data", ".avro", this.temp.toFile());
        Assertions.assertThat((boolean)avroData.delete()).isTrue();
        try (FileAppender appender = Avro.write((OutputFile)Files.localOutput((File)avroData)).schema(source.schema()).build();){
            appender.addAll(expected);
        }
        source.newAppend().appendFile(DataFiles.builder((PartitionSpec)PartitionSpec.unpartitioned()).withRecordCount(10L).withInputFile(Files.localInput((File)avroData)).build()).commit();
        Dataset sourceDF = spark.read().format("iceberg").option("vectorization-enabled", String.valueOf(this.vectorized)).load(sourceLocation);
        for (String column : columnNames) {
            String desc = "partition_by_" + SUPPORTED_PRIMITIVES.findType(column).toString();
            File parent = new File(this.temp.toFile(), desc);
            File location = new File(parent, "test");
            File dataFolder = new File(location, "data");
            ((AbstractBooleanAssert)Assertions.assertThat((boolean)dataFolder.mkdirs()).as("mkdirs should succeed", new Object[0])).isTrue();
            PartitionSpec spec = PartitionSpec.builderFor((Schema)SUPPORTED_PRIMITIVES).identity(column).build();
            Table table = tables.create(SUPPORTED_PRIMITIVES, spec, location.toString());
            table.updateProperties().set("write.format.default", this.format).commit();
            sourceDF.write().format("iceberg").mode(SaveMode.Append).option("use-table-distribution-and-ordering", "false").option("fanout-enabled", "false").save(location.toString());
            List actual = spark.read().format("iceberg").option("vectorization-enabled", String.valueOf(this.vectorized)).load(location.toString()).collectAsList();
            ((ListAssert)Assertions.assertThat((List)actual).as("Number of rows should match", new Object[0])).hasSameSizeAs(expected);
            for (int i = 0; i < expected.size(); ++i) {
                TestHelpers.assertEqualsSafe(SUPPORTED_PRIMITIVES.asStruct(), expected.get(i), (Row)actual.get(i));
            }
        }
    }

    @TestTemplate
    public void testNestedPartitionValues() throws Exception {
        String[] columnNames = new String[]{"b", "i", "l", "f", "d", "date", "ts", "s", "bytes", "dec_9_0", "dec_11_2", "dec_38_10"};
        HadoopTables tables = new HadoopTables(spark.sessionState().newHadoopConf());
        Schema nestedSchema = new Schema(new Types.NestedField[]{Types.NestedField.optional((int)1, (String)"nested", (Type)SUPPORTED_PRIMITIVES.asStruct())});
        String sourceLocation = this.temp.resolve("source_table").toString();
        Table source = tables.create(nestedSchema, sourceLocation);
        List<GenericData.Record> expected = RandomData.generateList(source.schema(), 2, 128735L);
        File avroData = File.createTempFile("data", ".avro", this.temp.toFile());
        Assertions.assertThat((boolean)avroData.delete()).isTrue();
        try (FileAppender appender = Avro.write((OutputFile)Files.localOutput((File)avroData)).schema(source.schema()).build();){
            appender.addAll(expected);
        }
        source.newAppend().appendFile(DataFiles.builder((PartitionSpec)PartitionSpec.unpartitioned()).withRecordCount(10L).withInputFile(Files.localInput((File)avroData)).build()).commit();
        Dataset sourceDF = spark.read().format("iceberg").option("vectorization-enabled", String.valueOf(this.vectorized)).load(sourceLocation);
        for (String column : columnNames) {
            String desc = "partition_by_" + SUPPORTED_PRIMITIVES.findType(column).toString();
            File parent = new File(this.temp.toFile(), desc);
            File location = new File(parent, "test");
            File dataFolder = new File(location, "data");
            ((AbstractBooleanAssert)Assertions.assertThat((boolean)dataFolder.mkdirs()).as("mkdirs should succeed", new Object[0])).isTrue();
            PartitionSpec spec = PartitionSpec.builderFor((Schema)nestedSchema).identity("nested." + column).build();
            Table table = tables.create(nestedSchema, spec, location.toString());
            table.updateProperties().set("write.format.default", this.format).commit();
            sourceDF.write().format("iceberg").mode(SaveMode.Append).option("use-table-distribution-and-ordering", "false").option("fanout-enabled", "false").save(location.toString());
            List actual = spark.read().format("iceberg").option("vectorization-enabled", String.valueOf(this.vectorized)).load(location.toString()).collectAsList();
            ((ListAssert)Assertions.assertThat((List)actual).as("Number of rows should match", new Object[0])).hasSameSizeAs(expected);
            for (int i = 0; i < expected.size(); ++i) {
                TestHelpers.assertEqualsSafe(nestedSchema.asStruct(), expected.get(i), (Row)actual.get(i));
            }
        }
    }

    @TestTemplate
    public void testPartitionedByNestedString() throws Exception {
        Schema nestedSchema = new Schema(new Types.NestedField[]{Types.NestedField.required((int)1, (String)"struct", (Type)Types.StructType.of((Types.NestedField[])new Types.NestedField[]{Types.NestedField.required((int)2, (String)"string", (Type)Types.StringType.get())}))});
        PartitionSpec spec = PartitionSpec.builderFor((Schema)nestedSchema).identity("struct.string").build();
        HadoopTables tables = new HadoopTables(spark.sessionState().newHadoopConf());
        String baseLocation = this.temp.resolve("partition_by_nested_string").toString();
        tables.create(nestedSchema, spec, baseLocation);
        StructField[] structFields = new StructField[]{new StructField("struct", (DataType)DataTypes.createStructType((StructField[])new StructField[]{new StructField("string", DataTypes.StringType, false, Metadata.empty())}), false, Metadata.empty())};
        ArrayList rows = Lists.newArrayList();
        rows.add(RowFactory.create((Object[])new Object[]{RowFactory.create((Object[])new Object[]{"nested_string_value"})}));
        Dataset sourceDF = spark.createDataFrame((List)rows, new StructType(structFields));
        sourceDF.write().format("iceberg").mode(SaveMode.Append).save(baseLocation);
        List actual = spark.read().format("iceberg").option("vectorization-enabled", String.valueOf(this.vectorized)).load(baseLocation).collectAsList();
        ((ListAssert)Assertions.assertThat((List)actual).as("Number of rows should match", new Object[0])).hasSameSizeAs((Iterable)rows);
    }

    @TestTemplate
    public void testReadPartitionColumn() throws Exception {
        ((AbstractStringAssert)Assumptions.assumeThat((String)this.format).as("Temporary skip ORC", new Object[0])).isNotEqualTo((Object)"orc");
        Schema nestedSchema = new Schema(new Types.NestedField[]{Types.NestedField.optional((int)1, (String)"id", (Type)Types.LongType.get()), Types.NestedField.optional((int)2, (String)"struct", (Type)Types.StructType.of((Types.NestedField[])new Types.NestedField[]{Types.NestedField.optional((int)3, (String)"innerId", (Type)Types.LongType.get()), Types.NestedField.optional((int)4, (String)"innerName", (Type)Types.StringType.get())}))});
        PartitionSpec spec = PartitionSpec.builderFor((Schema)nestedSchema).identity("struct.innerName").build();
        HadoopTables tables = new HadoopTables(spark.sessionState().newHadoopConf());
        String baseLocation = this.temp.resolve("partition_by_nested_string").toString();
        Table table = tables.create(nestedSchema, spec, baseLocation);
        table.updateProperties().set("write.format.default", this.format).commit();
        MapFunction & Serializable func = (MapFunction & Serializable)value -> new ComplexRecord((long)value, new NestedRecord((long)value, "name_" + value));
        spark.range(0L, 10L, 1L, 1).map((MapFunction)func, Encoders.bean(ComplexRecord.class)).write().format("iceberg").mode(SaveMode.Append).save(baseLocation);
        List actual = spark.read().format("iceberg").option("vectorization-enabled", String.valueOf(this.vectorized)).load(baseLocation).select("struct.innerName", new String[0]).orderBy("struct.innerName", new String[0]).as(Encoders.STRING()).collectAsList();
        ((ListAssert)Assertions.assertThat((List)actual).as("Number of rows should match", new Object[0])).hasSize(10);
        List inputRecords = IntStream.range(0, 10).mapToObj(i -> "name_" + i).collect(Collectors.toList());
        ((ListAssert)Assertions.assertThat((List)actual).as("Read object should be matched", new Object[0])).isEqualTo(inputRecords);
    }
}

