/*
 * Decompiled with CFR 0.152.
 */
package org.apache.iceberg.spark.source;

import java.io.IOException;
import java.math.RoundingMode;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.hadoop.conf.Configuration;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.ManifestFile;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
import org.apache.iceberg.Snapshot;
import org.apache.iceberg.Table;
import org.apache.iceberg.hadoop.HadoopTables;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.relocated.com.google.common.math.LongMath;
import org.apache.iceberg.spark.CommitMetadata;
import org.apache.iceberg.spark.TestBaseWithCatalog;
import org.apache.iceberg.spark.source.SimpleRecord;
import org.apache.iceberg.types.Type;
import org.apache.iceberg.types.Types;
import org.apache.iceberg.util.SnapshotUtil;
import org.apache.spark.sql.Column;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.catalyst.analysis.NoSuchTableException;
import org.apache.spark.sql.functions;
import org.assertj.core.api.AbstractIntegerAssert;
import org.assertj.core.api.AbstractLongAssert;
import org.assertj.core.api.AbstractThrowableAssert;
import org.assertj.core.api.Assertions;
import org.assertj.core.api.ListAssert;
import org.assertj.core.api.MapAssert;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.TestTemplate;
import org.junit.jupiter.api.io.TempDir;

public class TestDataSourceOptions
extends TestBaseWithCatalog {
    private static final Configuration CONF = new Configuration();
    private static final Schema SCHEMA = new Schema(new Types.NestedField[]{Types.NestedField.optional((int)1, (String)"id", (Type)Types.IntegerType.get()), Types.NestedField.optional((int)2, (String)"data", (Type)Types.StringType.get())});
    private static SparkSession spark = null;
    @TempDir
    private Path temp;

    @BeforeAll
    public static void startSpark() {
        spark = SparkSession.builder().master("local[2]").getOrCreate();
    }

    @AfterAll
    public static void stopSpark() {
        SparkSession currentSpark = spark;
        spark = null;
        currentSpark.stop();
    }

    @TestTemplate
    public void testWriteFormatOptionOverridesTableProperties() throws IOException {
        String tableLocation = this.temp.resolve("iceberg-table").toFile().toString();
        HadoopTables tables = new HadoopTables(CONF);
        PartitionSpec spec = PartitionSpec.unpartitioned();
        HashMap options = Maps.newHashMap();
        options.put("write.format.default", "avro");
        Table table = tables.create(SCHEMA, spec, (Map)options, tableLocation);
        ArrayList expectedRecords = Lists.newArrayList((Object[])new SimpleRecord[]{new SimpleRecord(1, "a"), new SimpleRecord(2, "b"), new SimpleRecord(3, "c")});
        Dataset df = spark.createDataFrame((List)expectedRecords, SimpleRecord.class);
        df.select("id", new String[]{"data"}).write().format("iceberg").option("write-format", "parquet").mode(SaveMode.Append).save(tableLocation);
        try (CloseableIterable tasks = table.newScan().planFiles();){
            tasks.forEach(task -> {
                FileFormat fileFormat = FileFormat.fromFileName((CharSequence)((DataFile)task.file()).location());
                Assertions.assertThat((Comparable)fileFormat).isEqualTo((Object)FileFormat.PARQUET);
            });
        }
    }

    @TestTemplate
    public void testNoWriteFormatOption() throws IOException {
        String tableLocation = this.temp.resolve("iceberg-table").toFile().toString();
        HadoopTables tables = new HadoopTables(CONF);
        PartitionSpec spec = PartitionSpec.unpartitioned();
        HashMap options = Maps.newHashMap();
        options.put("write.format.default", "avro");
        Table table = tables.create(SCHEMA, spec, (Map)options, tableLocation);
        ArrayList expectedRecords = Lists.newArrayList((Object[])new SimpleRecord[]{new SimpleRecord(1, "a"), new SimpleRecord(2, "b"), new SimpleRecord(3, "c")});
        Dataset df = spark.createDataFrame((List)expectedRecords, SimpleRecord.class);
        df.select("id", new String[]{"data"}).write().format("iceberg").mode("append").save(tableLocation);
        try (CloseableIterable tasks = table.newScan().planFiles();){
            tasks.forEach(task -> {
                FileFormat fileFormat = FileFormat.fromFileName((CharSequence)((DataFile)task.file()).location());
                Assertions.assertThat((Comparable)fileFormat).isEqualTo((Object)FileFormat.AVRO);
            });
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @TestTemplate
    public void testHadoopOptions() throws IOException {
        String tableLocation = this.temp.resolve("iceberg-table").toFile().toString();
        Configuration sparkHadoopConf = spark.sessionState().newHadoopConf();
        String originalDefaultFS = sparkHadoopConf.get("fs.default.name");
        try {
            HadoopTables tables = new HadoopTables(CONF);
            PartitionSpec spec = PartitionSpec.unpartitioned();
            HashMap options = Maps.newHashMap();
            tables.create(SCHEMA, spec, (Map)options, tableLocation);
            sparkHadoopConf.set("fs.default.name", "hdfs://localhost:9000");
            ArrayList expectedRecords = Lists.newArrayList((Object[])new SimpleRecord[]{new SimpleRecord(1, "a"), new SimpleRecord(2, "b")});
            Dataset originalDf = spark.createDataFrame((List)expectedRecords, SimpleRecord.class);
            originalDf.select("id", new String[]{"data"}).write().format("iceberg").mode("append").option("hadoop.fs.default.name", "file:///").save(tableLocation);
            Dataset resultDf = spark.read().format("iceberg").option("hadoop.fs.default.name", "file:///").load(tableLocation);
            List resultRecords = resultDf.orderBy("id", new String[0]).as(Encoders.bean(SimpleRecord.class)).collectAsList();
            ((ListAssert)Assertions.assertThat((List)resultRecords).as("Records should match", new Object[0])).isEqualTo((Object)expectedRecords);
        }
        finally {
            sparkHadoopConf.set("fs.default.name", originalDefaultFS);
        }
    }

    @TestTemplate
    public void testSplitOptionsOverridesTableProperties() throws IOException {
        String tableLocation = this.temp.resolve("iceberg-table").toFile().toString();
        HadoopTables tables = new HadoopTables(CONF);
        PartitionSpec spec = PartitionSpec.unpartitioned();
        HashMap options = Maps.newHashMap();
        options.put("read.split.target-size", String.valueOf(0x8000000L));
        options.put("write.format.default", String.valueOf(FileFormat.AVRO));
        Table icebergTable = tables.create(SCHEMA, spec, (Map)options, tableLocation);
        ArrayList expectedRecords = Lists.newArrayList((Object[])new SimpleRecord[]{new SimpleRecord(1, "a"), new SimpleRecord(2, "b")});
        Dataset originalDf = spark.createDataFrame((List)expectedRecords, SimpleRecord.class);
        originalDf.select("id", new String[]{"data"}).repartition(1).write().format("iceberg").mode("append").save(tableLocation);
        ArrayList files = Lists.newArrayList((Iterable)icebergTable.currentSnapshot().addedDataFiles(icebergTable.io()));
        ((ListAssert)Assertions.assertThat((List)files).as("Should have written 1 file", new Object[0])).hasSize(1);
        long fileSize = ((DataFile)files.get(0)).fileSizeInBytes();
        long splitSize = LongMath.divide((long)fileSize, (long)2L, (RoundingMode)RoundingMode.CEILING);
        Dataset resultDf = spark.read().format("iceberg").option("split-size", String.valueOf(splitSize)).load(tableLocation);
        ((AbstractIntegerAssert)Assertions.assertThat((int)resultDf.javaRDD().getNumPartitions()).as("Spark partitions should match", new Object[0])).isEqualTo(2);
    }

    @TestTemplate
    public void testIncrementalScanOptions() throws IOException {
        String tableLocation = this.temp.resolve("iceberg-table").toFile().toString();
        HadoopTables tables = new HadoopTables(CONF);
        PartitionSpec spec = PartitionSpec.unpartitioned();
        HashMap options = Maps.newHashMap();
        Table table = tables.create(SCHEMA, spec, (Map)options, tableLocation);
        ArrayList expectedRecords = Lists.newArrayList((Object[])new SimpleRecord[]{new SimpleRecord(1, "a"), new SimpleRecord(2, "b"), new SimpleRecord(3, "c"), new SimpleRecord(4, "d")});
        for (SimpleRecord record : expectedRecords) {
            Dataset originalDf = spark.createDataFrame((List)Lists.newArrayList((Object[])new SimpleRecord[]{record}), SimpleRecord.class);
            originalDf.select("id", new String[]{"data"}).write().format("iceberg").mode("append").save(tableLocation);
        }
        List snapshotIds = SnapshotUtil.currentAncestorIds((Table)table);
        ((AbstractThrowableAssert)Assertions.assertThatThrownBy(() -> spark.read().format("iceberg").option("snapshot-id", ((Long)snapshotIds.get(3)).toString()).option("start-snapshot-id", ((Long)snapshotIds.get(3)).toString()).load(tableLocation).explain()).isInstanceOf(IllegalArgumentException.class)).hasMessage("Cannot set start-snapshot-id and end-snapshot-id for incremental scans when either snapshot-id or as-of-timestamp is set");
        ((AbstractThrowableAssert)Assertions.assertThatThrownBy(() -> spark.read().format("iceberg").option("as-of-timestamp", Long.toString(table.snapshot(((Long)snapshotIds.get(3)).longValue()).timestampMillis())).option("end-snapshot-id", ((Long)snapshotIds.get(2)).toString()).load(tableLocation).explain()).isInstanceOf(IllegalArgumentException.class)).hasMessage("Cannot set start-snapshot-id and end-snapshot-id for incremental scans when either snapshot-id or as-of-timestamp is set");
        ((AbstractThrowableAssert)Assertions.assertThatThrownBy(() -> spark.read().format("iceberg").option("end-snapshot-id", ((Long)snapshotIds.get(2)).toString()).load(tableLocation).explain()).isInstanceOf(IllegalArgumentException.class)).hasMessage("Cannot set only end-snapshot-id for incremental scans. Please, set start-snapshot-id too.");
        Dataset unboundedIncrementalResult = spark.read().format("iceberg").option("start-snapshot-id", ((Long)snapshotIds.get(3)).toString()).load(tableLocation);
        List result1 = unboundedIncrementalResult.orderBy("id", new String[0]).as(Encoders.bean(SimpleRecord.class)).collectAsList();
        ((ListAssert)Assertions.assertThat((List)result1).as("Records should match", new Object[0])).isEqualTo(expectedRecords.subList(1, 4));
        ((AbstractLongAssert)Assertions.assertThat((long)unboundedIncrementalResult.count()).as("Unprocessed count should match record count", new Object[0])).isEqualTo(3L);
        Row row1 = (Row)unboundedIncrementalResult.agg(functions.min((String)"id"), new Column[]{functions.max((String)"id")}).head();
        ((AbstractIntegerAssert)Assertions.assertThat((int)row1.getInt(0)).as("min value should match", new Object[0])).isEqualTo(2);
        ((AbstractIntegerAssert)Assertions.assertThat((int)row1.getInt(1)).as("max value should match", new Object[0])).isEqualTo(4);
        Dataset incrementalResult = spark.read().format("iceberg").option("start-snapshot-id", ((Long)snapshotIds.get(2)).toString()).option("end-snapshot-id", ((Long)snapshotIds.get(1)).toString()).load(tableLocation);
        List result2 = incrementalResult.orderBy("id", new String[0]).as(Encoders.bean(SimpleRecord.class)).collectAsList();
        ((ListAssert)Assertions.assertThat((List)result2).as("Records should match", new Object[0])).isEqualTo(expectedRecords.subList(2, 3));
        ((AbstractLongAssert)Assertions.assertThat((long)incrementalResult.count()).as("Unprocessed count should match record count", new Object[0])).isEqualTo(1L);
        Row row2 = (Row)incrementalResult.agg(functions.min((String)"id"), new Column[]{functions.max((String)"id")}).head();
        ((AbstractIntegerAssert)Assertions.assertThat((int)row2.getInt(0)).as("min value should match", new Object[0])).isEqualTo(3);
        ((AbstractIntegerAssert)Assertions.assertThat((int)row2.getInt(1)).as("max value should match", new Object[0])).isEqualTo(3);
    }

    @TestTemplate
    public void testMetadataSplitSizeOptionOverrideTableProperties() throws IOException {
        String tableLocation = this.temp.resolve("iceberg-table").toFile().toString();
        HadoopTables tables = new HadoopTables(CONF);
        PartitionSpec spec = PartitionSpec.unpartitioned();
        HashMap options = Maps.newHashMap();
        Table table = tables.create(SCHEMA, spec, (Map)options, tableLocation);
        ArrayList expectedRecords = Lists.newArrayList((Object[])new SimpleRecord[]{new SimpleRecord(1, "a"), new SimpleRecord(2, "b")});
        Dataset originalDf = spark.createDataFrame((List)expectedRecords, SimpleRecord.class);
        originalDf.select("id", new String[]{"data"}).write().format("iceberg").mode("append").save(tableLocation);
        originalDf.select("id", new String[]{"data"}).write().format("iceberg").mode("append").save(tableLocation);
        List manifests = table.currentSnapshot().allManifests(table.io());
        ((ListAssert)Assertions.assertThat((List)manifests).as("Must be 2 manifests", new Object[0])).hasSize(2);
        table.updateProperties().set("read.split.metadata-target-size", String.valueOf(((ManifestFile)manifests.get(0)).length())).commit();
        Dataset entriesDf = spark.read().format("iceberg").load(tableLocation + "#entries");
        ((AbstractIntegerAssert)Assertions.assertThat((int)entriesDf.javaRDD().getNumPartitions()).as("Num partitions must match", new Object[0])).isEqualTo(2);
        entriesDf = spark.read().format("iceberg").option("split-size", String.valueOf(0x8000000)).load(tableLocation + "#entries");
        ((AbstractIntegerAssert)Assertions.assertThat((int)entriesDf.javaRDD().getNumPartitions()).as("Num partitions must match", new Object[0])).isEqualTo(1);
    }

    @TestTemplate
    public void testDefaultMetadataSplitSize() throws IOException {
        String tableLocation = this.temp.resolve("iceberg-table").toFile().toString();
        HadoopTables tables = new HadoopTables(CONF);
        PartitionSpec spec = PartitionSpec.unpartitioned();
        HashMap options = Maps.newHashMap();
        Table icebergTable = tables.create(SCHEMA, spec, (Map)options, tableLocation);
        ArrayList expectedRecords = Lists.newArrayList((Object[])new SimpleRecord[]{new SimpleRecord(1, "a"), new SimpleRecord(2, "b")});
        Dataset originalDf = spark.createDataFrame((List)expectedRecords, SimpleRecord.class);
        originalDf.select("id", new String[]{"data"}).write().format("iceberg").mode("append").save(tableLocation);
        int splitSize = 0x2000000;
        int expectedSplits = ((int)((ManifestFile)tables.load(tableLocation + "#entries").currentSnapshot().allManifests(icebergTable.io()).get(0)).length() + splitSize - 1) / splitSize;
        Dataset metadataDf = spark.read().format("iceberg").load(tableLocation + "#entries");
        int partitionNum = metadataDf.javaRDD().getNumPartitions();
        ((AbstractIntegerAssert)Assertions.assertThat((int)partitionNum).as("Spark partitions should match", new Object[0])).isEqualTo(expectedSplits);
    }

    @TestTemplate
    public void testExtraSnapshotMetadata() throws IOException {
        String tableLocation = this.temp.resolve("iceberg-table").toFile().toString();
        HadoopTables tables = new HadoopTables(CONF);
        tables.create(SCHEMA, PartitionSpec.unpartitioned(), (Map)Maps.newHashMap(), tableLocation);
        ArrayList expectedRecords = Lists.newArrayList((Object[])new SimpleRecord[]{new SimpleRecord(1, "a"), new SimpleRecord(2, "b")});
        Dataset originalDf = spark.createDataFrame((List)expectedRecords, SimpleRecord.class);
        originalDf.select("id", new String[]{"data"}).write().format("iceberg").mode("append").option("snapshot-property.extra-key", "someValue").option("snapshot-property.another-key", "anotherValue").save(tableLocation);
        Table table = tables.load(tableLocation);
        ((MapAssert)Assertions.assertThat((Map)table.currentSnapshot().summary()).containsEntry((Object)"extra-key", (Object)"someValue")).containsEntry((Object)"another-key", (Object)"anotherValue");
    }

    @TestTemplate
    public void testExtraSnapshotMetadataWithSQL() throws InterruptedException, IOException {
        String tableLocation = this.temp.resolve("iceberg-table").toFile().toString();
        HadoopTables tables = new HadoopTables(CONF);
        Table table = tables.create(SCHEMA, PartitionSpec.unpartitioned(), (Map)Maps.newHashMap(), tableLocation);
        ArrayList expectedRecords = Lists.newArrayList((Object[])new SimpleRecord[]{new SimpleRecord(1, "a"), new SimpleRecord(2, "b")});
        Dataset originalDf = spark.createDataFrame((List)expectedRecords, SimpleRecord.class);
        originalDf.select("id", new String[]{"data"}).write().format("iceberg").mode("append").save(tableLocation);
        spark.read().format("iceberg").load(tableLocation).createOrReplaceTempView("target");
        Thread writerThread = new Thread(() -> {
            ImmutableMap properties = ImmutableMap.of((Object)"writer-thread", (Object)String.valueOf(Thread.currentThread().getName()), (Object)"snapshot-property.extra-key", (Object)"someValue", (Object)"snapshot-property.another-key", (Object)"anotherValue");
            CommitMetadata.withCommitProperties((Map)properties, () -> {
                spark.sql("INSERT INTO target VALUES (3, 'c'), (4, 'd')");
                return 0;
            }, RuntimeException.class);
        });
        writerThread.setName("test-extra-commit-message-writer-thread");
        writerThread.start();
        writerThread.join();
        ArrayList snapshots = Lists.newArrayList((Iterable)table.snapshots());
        Assertions.assertThat((List)snapshots).hasSize(2);
        Assertions.assertThat((String)((String)((Snapshot)snapshots.get(0)).summary().get("writer-thread"))).isNull();
        ((MapAssert)((MapAssert)Assertions.assertThat((Map)((Snapshot)snapshots.get(1)).summary()).containsEntry((Object)"writer-thread", (Object)"test-extra-commit-message-writer-thread")).containsEntry((Object)"extra-key", (Object)"someValue")).containsEntry((Object)"another-key", (Object)"anotherValue");
    }

    @TestTemplate
    public void testExtraSnapshotMetadataWithDelete() throws InterruptedException, NoSuchTableException {
        spark.sessionState().conf().setConfString("spark.sql.shuffle.partitions", "1");
        this.sql("CREATE TABLE %s (id INT, data STRING) USING iceberg", this.tableName);
        ArrayList expectedRecords = Lists.newArrayList((Object[])new SimpleRecord[]{new SimpleRecord(1, "a"), new SimpleRecord(2, "b"), new SimpleRecord(3, "c")});
        Dataset originalDf = spark.createDataFrame((List)expectedRecords, SimpleRecord.class);
        originalDf.repartition(5, new Column[]{new Column("data")}).select("id", new String[]{"data"}).writeTo(this.tableName).append();
        Thread writerThread = new Thread(() -> {
            ImmutableMap properties = ImmutableMap.of((Object)"writer-thread", (Object)String.valueOf(Thread.currentThread().getName()), (Object)"snapshot-property.extra-key", (Object)"someValue", (Object)"snapshot-property.another-key", (Object)"anotherValue");
            CommitMetadata.withCommitProperties((Map)properties, () -> {
                spark.sql("DELETE FROM " + this.tableName + " where id = 1");
                return 0;
            }, RuntimeException.class);
        });
        writerThread.setName("test-extra-commit-message-delete-thread");
        writerThread.start();
        writerThread.join();
        Table table = this.validationCatalog.loadTable(this.tableIdent);
        ArrayList snapshots = Lists.newArrayList((Iterable)table.snapshots());
        Assertions.assertThat((List)snapshots).hasSize(2);
        Assertions.assertThat((String)((String)((Snapshot)snapshots.get(0)).summary().get("writer-thread"))).isNull();
        ((MapAssert)((MapAssert)Assertions.assertThat((Map)((Snapshot)snapshots.get(1)).summary()).containsEntry((Object)"writer-thread", (Object)"test-extra-commit-message-delete-thread")).containsEntry((Object)"extra-key", (Object)"someValue")).containsEntry((Object)"another-key", (Object)"anotherValue");
    }
}

