/*
 * Decompiled with CFR 0.152.
 */
package org.apache.iceberg.spark.source;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.hadoop.conf.Configuration;
import org.apache.iceberg.AssertHelpers;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.FileScanTask;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableScan;
import org.apache.iceberg.exceptions.ValidationException;
import org.apache.iceberg.hadoop.HadoopTables;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.spark.SparkSchemaUtil;
import org.apache.iceberg.spark.data.RandomData;
import org.apache.iceberg.spark.source.SimpleRecord;
import org.apache.iceberg.types.Type;
import org.apache.iceberg.types.Types;
import org.apache.iceberg.util.ByteBuffers;
import org.apache.spark.SparkContext;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.catalyst.InternalRow;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;

public class TestWriteMetricsConfig {
    private static final Configuration CONF = new Configuration();
    private static final Schema SIMPLE_SCHEMA = new Schema(new Types.NestedField[]{Types.NestedField.optional((int)1, (String)"id", (Type)Types.IntegerType.get()), Types.NestedField.optional((int)2, (String)"data", (Type)Types.StringType.get())});
    private static final Schema COMPLEX_SCHEMA = new Schema(new Types.NestedField[]{Types.NestedField.required((int)1, (String)"longCol", (Type)Types.IntegerType.get()), Types.NestedField.optional((int)2, (String)"strCol", (Type)Types.StringType.get()), Types.NestedField.required((int)3, (String)"record", (Type)Types.StructType.of((Types.NestedField[])new Types.NestedField[]{Types.NestedField.required((int)4, (String)"id", (Type)Types.IntegerType.get()), Types.NestedField.required((int)5, (String)"data", (Type)Types.StringType.get())}))});
    @Rule
    public TemporaryFolder temp = new TemporaryFolder();
    private static SparkSession spark = null;
    private static JavaSparkContext sc = null;

    @BeforeClass
    public static void startSpark() {
        spark = SparkSession.builder().master("local[2]").getOrCreate();
        sc = JavaSparkContext.fromSparkContext((SparkContext)spark.sparkContext());
    }

    @AfterClass
    public static void stopSpark() {
        SparkSession currentSpark = spark;
        spark = null;
        sc = null;
        currentSpark.stop();
    }

    @Test
    public void testFullMetricsCollectionForParquet() throws IOException {
        String tableLocation = this.temp.newFolder("iceberg-table").toString();
        HadoopTables tables = new HadoopTables(CONF);
        PartitionSpec spec = PartitionSpec.unpartitioned();
        HashMap properties = Maps.newHashMap();
        properties.put("write.metadata.metrics.default", "full");
        Table table = tables.create(SIMPLE_SCHEMA, spec, (Map)properties, tableLocation);
        ArrayList expectedRecords = Lists.newArrayList((Object[])new SimpleRecord[]{new SimpleRecord(1, "a"), new SimpleRecord(2, "b"), new SimpleRecord(3, "c")});
        Dataset df = spark.createDataFrame((List)expectedRecords, SimpleRecord.class);
        df.select("id", new String[]{"data"}).coalesce(1).write().format("iceberg").option("write-format", "parquet").mode(SaveMode.Append).save(tableLocation);
        for (FileScanTask task : ((TableScan)table.newScan().includeColumnStats()).planFiles()) {
            DataFile file = (DataFile)task.file();
            Assert.assertEquals((long)2L, (long)file.nullValueCounts().size());
            Assert.assertEquals((long)2L, (long)file.valueCounts().size());
            Assert.assertEquals((long)2L, (long)file.lowerBounds().size());
            Assert.assertEquals((long)2L, (long)file.upperBounds().size());
        }
    }

    @Test
    public void testCountMetricsCollectionForParquet() throws IOException {
        String tableLocation = this.temp.newFolder("iceberg-table").toString();
        HadoopTables tables = new HadoopTables(CONF);
        PartitionSpec spec = PartitionSpec.unpartitioned();
        HashMap properties = Maps.newHashMap();
        properties.put("write.metadata.metrics.default", "counts");
        Table table = tables.create(SIMPLE_SCHEMA, spec, (Map)properties, tableLocation);
        ArrayList expectedRecords = Lists.newArrayList((Object[])new SimpleRecord[]{new SimpleRecord(1, "a"), new SimpleRecord(2, "b"), new SimpleRecord(3, "c")});
        Dataset df = spark.createDataFrame((List)expectedRecords, SimpleRecord.class);
        df.select("id", new String[]{"data"}).coalesce(1).write().format("iceberg").option("write-format", "parquet").mode(SaveMode.Append).save(tableLocation);
        for (FileScanTask task : ((TableScan)table.newScan().includeColumnStats()).planFiles()) {
            DataFile file = (DataFile)task.file();
            Assert.assertEquals((long)2L, (long)file.nullValueCounts().size());
            Assert.assertEquals((long)2L, (long)file.valueCounts().size());
            Assert.assertTrue((boolean)file.lowerBounds().isEmpty());
            Assert.assertTrue((boolean)file.upperBounds().isEmpty());
        }
    }

    @Test
    public void testNoMetricsCollectionForParquet() throws IOException {
        String tableLocation = this.temp.newFolder("iceberg-table").toString();
        HadoopTables tables = new HadoopTables(CONF);
        PartitionSpec spec = PartitionSpec.unpartitioned();
        HashMap properties = Maps.newHashMap();
        properties.put("write.metadata.metrics.default", "none");
        Table table = tables.create(SIMPLE_SCHEMA, spec, (Map)properties, tableLocation);
        ArrayList expectedRecords = Lists.newArrayList((Object[])new SimpleRecord[]{new SimpleRecord(1, "a"), new SimpleRecord(2, "b"), new SimpleRecord(3, "c")});
        Dataset df = spark.createDataFrame((List)expectedRecords, SimpleRecord.class);
        df.select("id", new String[]{"data"}).coalesce(1).write().format("iceberg").option("write-format", "parquet").mode(SaveMode.Append).save(tableLocation);
        for (FileScanTask task : ((TableScan)table.newScan().includeColumnStats()).planFiles()) {
            DataFile file = (DataFile)task.file();
            Assert.assertTrue((boolean)file.nullValueCounts().isEmpty());
            Assert.assertTrue((boolean)file.valueCounts().isEmpty());
            Assert.assertTrue((boolean)file.lowerBounds().isEmpty());
            Assert.assertTrue((boolean)file.upperBounds().isEmpty());
        }
    }

    @Test
    public void testCustomMetricCollectionForParquet() throws IOException {
        String tableLocation = this.temp.newFolder("iceberg-table").toString();
        HadoopTables tables = new HadoopTables(CONF);
        PartitionSpec spec = PartitionSpec.unpartitioned();
        HashMap properties = Maps.newHashMap();
        properties.put("write.metadata.metrics.default", "counts");
        properties.put("write.metadata.metrics.column.id", "full");
        Table table = tables.create(SIMPLE_SCHEMA, spec, (Map)properties, tableLocation);
        ArrayList expectedRecords = Lists.newArrayList((Object[])new SimpleRecord[]{new SimpleRecord(1, "a"), new SimpleRecord(2, "b"), new SimpleRecord(3, "c")});
        Dataset df = spark.createDataFrame((List)expectedRecords, SimpleRecord.class);
        df.select("id", new String[]{"data"}).coalesce(1).write().format("iceberg").option("write-format", "parquet").mode(SaveMode.Append).save(tableLocation);
        Schema schema = table.schema();
        Types.NestedField id = schema.findField("id");
        for (FileScanTask task : ((TableScan)table.newScan().includeColumnStats()).planFiles()) {
            DataFile file = (DataFile)task.file();
            Assert.assertEquals((long)2L, (long)file.nullValueCounts().size());
            Assert.assertEquals((long)2L, (long)file.valueCounts().size());
            Assert.assertEquals((long)1L, (long)file.lowerBounds().size());
            Assert.assertTrue((boolean)file.lowerBounds().containsKey(id.fieldId()));
            Assert.assertEquals((long)1L, (long)file.upperBounds().size());
            Assert.assertTrue((boolean)file.upperBounds().containsKey(id.fieldId()));
        }
    }

    @Test
    public void testBadCustomMetricCollectionForParquet() throws IOException {
        String tableLocation = this.temp.newFolder("iceberg-table").toString();
        HadoopTables tables = new HadoopTables(CONF);
        PartitionSpec spec = PartitionSpec.unpartitioned();
        HashMap properties = Maps.newHashMap();
        properties.put("write.metadata.metrics.default", "counts");
        properties.put("write.metadata.metrics.column.ids", "full");
        AssertHelpers.assertThrows((String)"Creating a table with invalid metrics should fail", ValidationException.class, null, () -> tables.create(SIMPLE_SCHEMA, spec, properties, tableLocation));
    }

    @Test
    public void testCustomMetricCollectionForNestedParquet() throws IOException {
        String tableLocation = this.temp.newFolder("iceberg-table").toString();
        HadoopTables tables = new HadoopTables(CONF);
        PartitionSpec spec = PartitionSpec.builderFor((Schema)COMPLEX_SCHEMA).identity("strCol").build();
        HashMap properties = Maps.newHashMap();
        properties.put("write.metadata.metrics.default", "none");
        properties.put("write.metadata.metrics.column.longCol", "counts");
        properties.put("write.metadata.metrics.column.record.id", "full");
        properties.put("write.metadata.metrics.column.record.data", "truncate(2)");
        Table table = tables.create(COMPLEX_SCHEMA, spec, (Map)properties, tableLocation);
        Iterable<InternalRow> rows = RandomData.generateSpark(COMPLEX_SCHEMA, 10, 0L);
        JavaRDD rdd = sc.parallelize((List)Lists.newArrayList(rows));
        Dataset df = spark.internalCreateDataFrame(JavaRDD.toRDD((JavaRDD)rdd), SparkSchemaUtil.convert((Schema)COMPLEX_SCHEMA), false);
        df.coalesce(1).write().format("iceberg").option("write-format", "parquet").mode(SaveMode.Append).save(tableLocation);
        Schema schema = table.schema();
        Types.NestedField longCol = schema.findField("longCol");
        Types.NestedField recordId = schema.findField("record.id");
        Types.NestedField recordData = schema.findField("record.data");
        for (FileScanTask task : ((TableScan)table.newScan().includeColumnStats()).planFiles()) {
            DataFile file = (DataFile)task.file();
            Map nullValueCounts = file.nullValueCounts();
            Assert.assertEquals((long)3L, (long)nullValueCounts.size());
            Assert.assertTrue((boolean)nullValueCounts.containsKey(longCol.fieldId()));
            Assert.assertTrue((boolean)nullValueCounts.containsKey(recordId.fieldId()));
            Assert.assertTrue((boolean)nullValueCounts.containsKey(recordData.fieldId()));
            Map valueCounts = file.valueCounts();
            Assert.assertEquals((long)3L, (long)valueCounts.size());
            Assert.assertTrue((boolean)valueCounts.containsKey(longCol.fieldId()));
            Assert.assertTrue((boolean)valueCounts.containsKey(recordId.fieldId()));
            Assert.assertTrue((boolean)valueCounts.containsKey(recordData.fieldId()));
            Map lowerBounds = file.lowerBounds();
            Assert.assertEquals((long)2L, (long)lowerBounds.size());
            Assert.assertTrue((boolean)lowerBounds.containsKey(recordId.fieldId()));
            ByteBuffer recordDataLowerBound = (ByteBuffer)lowerBounds.get(recordData.fieldId());
            Assert.assertEquals((long)2L, (long)ByteBuffers.toByteArray((ByteBuffer)recordDataLowerBound).length);
            Map upperBounds = file.upperBounds();
            Assert.assertEquals((long)2L, (long)upperBounds.size());
            Assert.assertTrue((boolean)upperBounds.containsKey(recordId.fieldId()));
            ByteBuffer recordDataUpperBound = (ByteBuffer)upperBounds.get(recordData.fieldId());
            Assert.assertEquals((long)2L, (long)ByteBuffers.toByteArray((ByteBuffer)recordDataUpperBound).length);
        }
    }
}

