/*
 * Decompiled with CFR 0.152.
 */
package org.apache.iceberg.spark.source;

import java.io.IOException;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import org.apache.hadoop.conf.Configuration;
import org.apache.iceberg.Parameter;
import org.apache.iceberg.ParameterizedTestExtension;
import org.apache.iceberg.Parameters;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.PlanningMode;
import org.apache.iceberg.Schema;
import org.apache.iceberg.Snapshot;
import org.apache.iceberg.Table;
import org.apache.iceberg.hadoop.HadoopTables;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.spark.SparkSchemaUtil;
import org.apache.iceberg.spark.source.SimpleRecord;
import org.apache.iceberg.types.Type;
import org.apache.iceberg.types.Types;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.SparkSession;
import org.assertj.core.api.AbstractThrowableAssert;
import org.assertj.core.api.Assertions;
import org.assertj.core.api.IterableAssert;
import org.assertj.core.api.ListAssert;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.TestTemplate;
import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.api.io.TempDir;

@ExtendWith(value={ParameterizedTestExtension.class})
public class TestSnapshotSelection {
    private static final Configuration CONF = new Configuration();
    private static final Schema SCHEMA = new Schema(new Types.NestedField[]{Types.NestedField.optional((int)1, (String)"id", (Type)Types.IntegerType.get()), Types.NestedField.optional((int)2, (String)"data", (Type)Types.StringType.get())});
    @TempDir
    private Path temp;
    private static SparkSession spark = null;
    @Parameter(index=0)
    private Map<String, String> properties;

    @Parameters(name="properties = {0}")
    public static Object[] parameters() {
        return new Object[][]{{ImmutableMap.of((Object)"read.data-planning-mode", (Object)PlanningMode.LOCAL.modeName(), (Object)"read.delete-planning-mode", (Object)PlanningMode.LOCAL.modeName())}, {ImmutableMap.of((Object)"read.data-planning-mode", (Object)PlanningMode.DISTRIBUTED.modeName(), (Object)"read.delete-planning-mode", (Object)PlanningMode.DISTRIBUTED.modeName())}};
    }

    @BeforeAll
    public static void startSpark() {
        spark = SparkSession.builder().master("local[2]").getOrCreate();
    }

    @AfterAll
    public static void stopSpark() {
        SparkSession currentSpark = spark;
        spark = null;
        currentSpark.stop();
    }

    @TestTemplate
    public void testSnapshotSelectionById() throws IOException {
        String tableLocation = this.temp.resolve("iceberg-table").toFile().toString();
        HadoopTables tables = new HadoopTables(CONF);
        PartitionSpec spec = PartitionSpec.unpartitioned();
        Table table = tables.create(SCHEMA, spec, this.properties, tableLocation);
        ArrayList firstBatchRecords = Lists.newArrayList((Object[])new SimpleRecord[]{new SimpleRecord(1, "a"), new SimpleRecord(2, "b"), new SimpleRecord(3, "c")});
        Dataset firstDf = spark.createDataFrame((List)firstBatchRecords, SimpleRecord.class);
        firstDf.select("id", new String[]{"data"}).write().format("iceberg").mode("append").save(tableLocation);
        ArrayList secondBatchRecords = Lists.newArrayList((Object[])new SimpleRecord[]{new SimpleRecord(4, "d"), new SimpleRecord(5, "e"), new SimpleRecord(6, "f")});
        Dataset secondDf = spark.createDataFrame((List)secondBatchRecords, SimpleRecord.class);
        secondDf.select("id", new String[]{"data"}).write().format("iceberg").mode("append").save(tableLocation);
        ((IterableAssert)Assertions.assertThat((Iterable)table.snapshots()).as("Expected 2 snapshots", new Object[0])).hasSize(2);
        Dataset currentSnapshotResult = spark.read().format("iceberg").load(tableLocation);
        List currentSnapshotRecords = currentSnapshotResult.orderBy("id", new String[0]).as(Encoders.bean(SimpleRecord.class)).collectAsList();
        ArrayList expectedRecords = Lists.newArrayList();
        expectedRecords.addAll(firstBatchRecords);
        expectedRecords.addAll(secondBatchRecords);
        ((ListAssert)Assertions.assertThat((List)currentSnapshotRecords).as("Current snapshot rows should match", new Object[0])).isEqualTo((Object)expectedRecords);
        Snapshot currentSnapshot = table.currentSnapshot();
        Long parentSnapshotId = currentSnapshot.parentId();
        Dataset previousSnapshotResult = spark.read().format("iceberg").option("snapshot-id", parentSnapshotId.longValue()).load(tableLocation);
        List previousSnapshotRecords = previousSnapshotResult.orderBy("id", new String[0]).as(Encoders.bean(SimpleRecord.class)).collectAsList();
        ((ListAssert)Assertions.assertThat((List)previousSnapshotRecords).as("Previous snapshot rows should match", new Object[0])).isEqualTo((Object)firstBatchRecords);
    }

    @TestTemplate
    public void testSnapshotSelectionByTimestamp() throws IOException {
        String tableLocation = this.temp.resolve("iceberg-table").toFile().toString();
        HadoopTables tables = new HadoopTables(CONF);
        PartitionSpec spec = PartitionSpec.unpartitioned();
        Table table = tables.create(SCHEMA, spec, this.properties, tableLocation);
        ArrayList firstBatchRecords = Lists.newArrayList((Object[])new SimpleRecord[]{new SimpleRecord(1, "a"), new SimpleRecord(2, "b"), new SimpleRecord(3, "c")});
        Dataset firstDf = spark.createDataFrame((List)firstBatchRecords, SimpleRecord.class);
        firstDf.select("id", new String[]{"data"}).write().format("iceberg").mode("append").save(tableLocation);
        long firstSnapshotTimestamp = System.currentTimeMillis();
        ArrayList secondBatchRecords = Lists.newArrayList((Object[])new SimpleRecord[]{new SimpleRecord(4, "d"), new SimpleRecord(5, "e"), new SimpleRecord(6, "f")});
        Dataset secondDf = spark.createDataFrame((List)secondBatchRecords, SimpleRecord.class);
        secondDf.select("id", new String[]{"data"}).write().format("iceberg").mode("append").save(tableLocation);
        ((IterableAssert)Assertions.assertThat((Iterable)table.snapshots()).as("Expected 2 snapshots", new Object[0])).hasSize(2);
        Dataset currentSnapshotResult = spark.read().format("iceberg").load(tableLocation);
        List currentSnapshotRecords = currentSnapshotResult.orderBy("id", new String[0]).as(Encoders.bean(SimpleRecord.class)).collectAsList();
        ArrayList expectedRecords = Lists.newArrayList();
        expectedRecords.addAll(firstBatchRecords);
        expectedRecords.addAll(secondBatchRecords);
        ((ListAssert)Assertions.assertThat((List)currentSnapshotRecords).as("Current snapshot rows should match", new Object[0])).isEqualTo((Object)expectedRecords);
        Dataset previousSnapshotResult = spark.read().format("iceberg").option("as-of-timestamp", firstSnapshotTimestamp).load(tableLocation);
        List previousSnapshotRecords = previousSnapshotResult.orderBy("id", new String[0]).as(Encoders.bean(SimpleRecord.class)).collectAsList();
        ((ListAssert)Assertions.assertThat((List)previousSnapshotRecords).as("Previous snapshot rows should match", new Object[0])).isEqualTo((Object)firstBatchRecords);
    }

    @TestTemplate
    public void testSnapshotSelectionByInvalidSnapshotId() throws IOException {
        String tableLocation = this.temp.resolve("iceberg-table").toFile().toString();
        HadoopTables tables = new HadoopTables(CONF);
        PartitionSpec spec = PartitionSpec.unpartitioned();
        tables.create(SCHEMA, spec, this.properties, tableLocation);
        Dataset df = spark.read().format("iceberg").option("snapshot-id", -10L).load(tableLocation);
        ((AbstractThrowableAssert)Assertions.assertThatThrownBy(() -> ((Dataset)df).collectAsList()).isInstanceOf(IllegalArgumentException.class)).hasMessage("Cannot find snapshot with ID -10");
    }

    @TestTemplate
    public void testSnapshotSelectionByInvalidTimestamp() throws IOException {
        long timestamp = System.currentTimeMillis();
        String tableLocation = this.temp.resolve("iceberg-table").toFile().toString();
        HadoopTables tables = new HadoopTables(CONF);
        PartitionSpec spec = PartitionSpec.unpartitioned();
        tables.create(SCHEMA, spec, this.properties, tableLocation);
        ((AbstractThrowableAssert)Assertions.assertThatThrownBy(() -> spark.read().format("iceberg").option("as-of-timestamp", timestamp).load(tableLocation)).isInstanceOf(IllegalArgumentException.class)).hasMessageContaining("Cannot find a snapshot older than");
    }

    @TestTemplate
    public void testSnapshotSelectionBySnapshotIdAndTimestamp() throws IOException {
        String tableLocation = this.temp.resolve("iceberg-table").toFile().toString();
        HadoopTables tables = new HadoopTables(CONF);
        PartitionSpec spec = PartitionSpec.unpartitioned();
        Table table = tables.create(SCHEMA, spec, this.properties, tableLocation);
        ArrayList firstBatchRecords = Lists.newArrayList((Object[])new SimpleRecord[]{new SimpleRecord(1, "a"), new SimpleRecord(2, "b"), new SimpleRecord(3, "c")});
        Dataset firstDf = spark.createDataFrame((List)firstBatchRecords, SimpleRecord.class);
        firstDf.select("id", new String[]{"data"}).write().format("iceberg").mode("append").save(tableLocation);
        long timestamp = System.currentTimeMillis();
        long snapshotId = table.currentSnapshot().snapshotId();
        ((AbstractThrowableAssert)Assertions.assertThatThrownBy(() -> spark.read().format("iceberg").option("snapshot-id", snapshotId).option("as-of-timestamp", timestamp).load(tableLocation)).isInstanceOf(IllegalArgumentException.class)).hasMessageContaining("Can specify only one of snapshot-id").hasMessageContaining("as-of-timestamp").hasMessageContaining("branch").hasMessageContaining("tag");
    }

    @TestTemplate
    public void testSnapshotSelectionByTag() throws IOException {
        String tableLocation = this.temp.resolve("iceberg-table").toFile().toString();
        HadoopTables tables = new HadoopTables(CONF);
        PartitionSpec spec = PartitionSpec.unpartitioned();
        Table table = tables.create(SCHEMA, spec, this.properties, tableLocation);
        ArrayList firstBatchRecords = Lists.newArrayList((Object[])new SimpleRecord[]{new SimpleRecord(1, "a"), new SimpleRecord(2, "b"), new SimpleRecord(3, "c")});
        Dataset firstDf = spark.createDataFrame((List)firstBatchRecords, SimpleRecord.class);
        firstDf.select("id", new String[]{"data"}).write().format("iceberg").mode("append").save(tableLocation);
        table.manageSnapshots().createTag("tag", table.currentSnapshot().snapshotId()).commit();
        ArrayList secondBatchRecords = Lists.newArrayList((Object[])new SimpleRecord[]{new SimpleRecord(4, "d"), new SimpleRecord(5, "e"), new SimpleRecord(6, "f")});
        Dataset secondDf = spark.createDataFrame((List)secondBatchRecords, SimpleRecord.class);
        secondDf.select("id", new String[]{"data"}).write().format("iceberg").mode("append").save(tableLocation);
        Dataset currentSnapshotResult = spark.read().format("iceberg").option("tag", "tag").load(tableLocation);
        List currentSnapshotRecords = currentSnapshotResult.orderBy("id", new String[0]).as(Encoders.bean(SimpleRecord.class)).collectAsList();
        ArrayList expectedRecords = Lists.newArrayList();
        expectedRecords.addAll(firstBatchRecords);
        ((ListAssert)Assertions.assertThat((List)currentSnapshotRecords).as("Current snapshot rows should match", new Object[0])).isEqualTo((Object)expectedRecords);
    }

    @TestTemplate
    public void testSnapshotSelectionByBranch() throws IOException {
        String tableLocation = this.temp.resolve("iceberg-table").toFile().toString();
        HadoopTables tables = new HadoopTables(CONF);
        PartitionSpec spec = PartitionSpec.unpartitioned();
        Table table = tables.create(SCHEMA, spec, this.properties, tableLocation);
        ArrayList firstBatchRecords = Lists.newArrayList((Object[])new SimpleRecord[]{new SimpleRecord(1, "a"), new SimpleRecord(2, "b"), new SimpleRecord(3, "c")});
        Dataset firstDf = spark.createDataFrame((List)firstBatchRecords, SimpleRecord.class);
        firstDf.select("id", new String[]{"data"}).write().format("iceberg").mode("append").save(tableLocation);
        table.manageSnapshots().createBranch("branch", table.currentSnapshot().snapshotId()).commit();
        ArrayList secondBatchRecords = Lists.newArrayList((Object[])new SimpleRecord[]{new SimpleRecord(4, "d"), new SimpleRecord(5, "e"), new SimpleRecord(6, "f")});
        Dataset secondDf = spark.createDataFrame((List)secondBatchRecords, SimpleRecord.class);
        secondDf.select("id", new String[]{"data"}).write().format("iceberg").mode("append").save(tableLocation);
        Dataset currentSnapshotResult = spark.read().format("iceberg").option("branch", "branch").load(tableLocation);
        List currentSnapshotRecords = currentSnapshotResult.orderBy("id", new String[0]).as(Encoders.bean(SimpleRecord.class)).collectAsList();
        ArrayList expectedRecords = Lists.newArrayList();
        expectedRecords.addAll(firstBatchRecords);
        ((ListAssert)Assertions.assertThat((List)currentSnapshotRecords).as("Current snapshot rows should match", new Object[0])).isEqualTo((Object)expectedRecords);
    }

    @TestTemplate
    public void testSnapshotSelectionByBranchAndTagFails() throws IOException {
        String tableLocation = this.temp.resolve("iceberg-table").toFile().toString();
        HadoopTables tables = new HadoopTables(CONF);
        PartitionSpec spec = PartitionSpec.unpartitioned();
        Table table = tables.create(SCHEMA, spec, this.properties, tableLocation);
        ArrayList firstBatchRecords = Lists.newArrayList((Object[])new SimpleRecord[]{new SimpleRecord(1, "a"), new SimpleRecord(2, "b"), new SimpleRecord(3, "c")});
        Dataset firstDf = spark.createDataFrame((List)firstBatchRecords, SimpleRecord.class);
        firstDf.select("id", new String[]{"data"}).write().format("iceberg").mode("append").save(tableLocation);
        table.manageSnapshots().createBranch("branch", table.currentSnapshot().snapshotId()).commit();
        table.manageSnapshots().createTag("tag", table.currentSnapshot().snapshotId()).commit();
        ((AbstractThrowableAssert)Assertions.assertThatThrownBy(() -> spark.read().format("iceberg").option("tag", "tag").option("branch", "branch").load(tableLocation).show()).isInstanceOf(IllegalArgumentException.class)).hasMessageStartingWith("Can specify only one of snapshot-id");
    }

    @TestTemplate
    public void testSnapshotSelectionByTimestampAndBranchOrTagFails() throws IOException {
        String tableLocation = this.temp.resolve("iceberg-table").toFile().toString();
        HadoopTables tables = new HadoopTables(CONF);
        PartitionSpec spec = PartitionSpec.unpartitioned();
        Table table = tables.create(SCHEMA, spec, this.properties, tableLocation);
        ArrayList firstBatchRecords = Lists.newArrayList((Object[])new SimpleRecord[]{new SimpleRecord(1, "a"), new SimpleRecord(2, "b"), new SimpleRecord(3, "c")});
        Dataset firstDf = spark.createDataFrame((List)firstBatchRecords, SimpleRecord.class);
        firstDf.select("id", new String[]{"data"}).write().format("iceberg").mode("append").save(tableLocation);
        long timestamp = System.currentTimeMillis();
        table.manageSnapshots().createBranch("branch", table.currentSnapshot().snapshotId()).commit();
        table.manageSnapshots().createTag("tag", table.currentSnapshot().snapshotId()).commit();
        ((AbstractThrowableAssert)Assertions.assertThatThrownBy(() -> spark.read().format("iceberg").option("as-of-timestamp", timestamp).option("branch", "branch").load(tableLocation).show()).isInstanceOf(IllegalArgumentException.class)).hasMessageStartingWith("Can specify only one of snapshot-id");
        ((AbstractThrowableAssert)Assertions.assertThatThrownBy(() -> spark.read().format("iceberg").option("as-of-timestamp", timestamp).option("tag", "tag").load(tableLocation).show()).isInstanceOf(IllegalArgumentException.class)).hasMessageStartingWith("Can specify only one of snapshot-id");
    }

    @TestTemplate
    public void testSnapshotSelectionByBranchWithSchemaChange() throws IOException {
        String tableLocation = this.temp.resolve("iceberg-table").toFile().toString();
        HadoopTables tables = new HadoopTables(CONF);
        PartitionSpec spec = PartitionSpec.unpartitioned();
        Table table = tables.create(SCHEMA, spec, this.properties, tableLocation);
        ArrayList firstBatchRecords = Lists.newArrayList((Object[])new SimpleRecord[]{new SimpleRecord(1, "a"), new SimpleRecord(2, "b"), new SimpleRecord(3, "c")});
        Dataset firstDf = spark.createDataFrame((List)firstBatchRecords, SimpleRecord.class);
        firstDf.select("id", new String[]{"data"}).write().format("iceberg").mode("append").save(tableLocation);
        table.manageSnapshots().createBranch("branch", table.currentSnapshot().snapshotId()).commit();
        Dataset branchSnapshotResult = spark.read().format("iceberg").option("branch", "branch").load(tableLocation);
        List branchSnapshotRecords = branchSnapshotResult.orderBy("id", new String[0]).as(Encoders.bean(SimpleRecord.class)).collectAsList();
        ArrayList expectedRecords = Lists.newArrayList();
        expectedRecords.addAll(firstBatchRecords);
        ((ListAssert)Assertions.assertThat((List)branchSnapshotRecords).as("Current snapshot rows should match", new Object[0])).isEqualTo((Object)expectedRecords);
        table.updateSchema().deleteColumn("data").commit();
        Assertions.assertThat((List)spark.read().format("iceberg").option("branch", "branch").load(tableLocation).orderBy("id", new String[0]).collectAsList()).containsExactly((Object[])new Row[]{RowFactory.create((Object[])new Object[]{1}), RowFactory.create((Object[])new Object[]{2}), RowFactory.create((Object[])new Object[]{3})});
        table.updateSchema().addColumn("data", (Type)Types.StringType.get()).commit();
        Assertions.assertThat((List)spark.read().format("iceberg").option("branch", "branch").load(tableLocation).orderBy("id", new String[0]).as(Encoders.bean(SimpleRecord.class)).collectAsList()).containsExactly((Object[])new SimpleRecord[]{new SimpleRecord(1, null), new SimpleRecord(2, null), new SimpleRecord(3, null)});
    }

    @TestTemplate
    public void testWritingToBranchAfterSchemaChange() throws IOException {
        String tableLocation = this.temp.resolve("iceberg-table").toFile().toString();
        HadoopTables tables = new HadoopTables(CONF);
        PartitionSpec spec = PartitionSpec.unpartitioned();
        Table table = tables.create(SCHEMA, spec, this.properties, tableLocation);
        ArrayList firstBatchRecords = Lists.newArrayList((Object[])new SimpleRecord[]{new SimpleRecord(1, "a"), new SimpleRecord(2, "b"), new SimpleRecord(3, "c")});
        Dataset firstDf = spark.createDataFrame((List)firstBatchRecords, SimpleRecord.class);
        firstDf.select("id", new String[]{"data"}).write().format("iceberg").mode("append").save(tableLocation);
        table.manageSnapshots().createBranch("branch", table.currentSnapshot().snapshotId()).commit();
        Dataset branchSnapshotResult = spark.read().format("iceberg").option("branch", "branch").load(tableLocation);
        List branchSnapshotRecords = branchSnapshotResult.orderBy("id", new String[0]).as(Encoders.bean(SimpleRecord.class)).collectAsList();
        ArrayList expectedRecords = Lists.newArrayList();
        expectedRecords.addAll(firstBatchRecords);
        ((ListAssert)Assertions.assertThat((List)branchSnapshotRecords).as("Current snapshot rows should match", new Object[0])).isEqualTo((Object)expectedRecords);
        table.updateSchema().deleteColumn("data").addColumn("zip", (Type)Types.IntegerType.get()).commit();
        Assertions.assertThat((List)spark.read().format("iceberg").option("branch", "branch").load(tableLocation).orderBy("id", new String[0]).collectAsList()).containsExactly((Object[])new Row[]{RowFactory.create((Object[])new Object[]{1, null}), RowFactory.create((Object[])new Object[]{2, null}), RowFactory.create((Object[])new Object[]{3, null})});
        ArrayList records = Lists.newArrayList((Object[])new Row[]{RowFactory.create((Object[])new Object[]{4, 12345}), RowFactory.create((Object[])new Object[]{5, 54321}), RowFactory.create((Object[])new Object[]{6, 67890})});
        Dataset dataFrame = spark.createDataFrame((List)records, SparkSchemaUtil.convert((Schema)new Schema(new Types.NestedField[]{Types.NestedField.optional((int)1, (String)"id", (Type)Types.IntegerType.get()), Types.NestedField.optional((int)2, (String)"zip", (Type)Types.IntegerType.get())})));
        dataFrame.select("id", new String[]{"zip"}).write().format("iceberg").option("branch", "branch").mode("append").save(tableLocation);
        ((ListAssert)((ListAssert)Assertions.assertThat((List)spark.read().format("iceberg").option("branch", "branch").load(tableLocation).collectAsList()).hasSize(6)).contains((Object[])new Row[]{RowFactory.create((Object[])new Object[]{1, null}), RowFactory.create((Object[])new Object[]{2, null}), RowFactory.create((Object[])new Object[]{3, null})})).containsAll((Iterable)records);
    }

    @TestTemplate
    public void testSnapshotSelectionByTagWithSchemaChange() throws IOException {
        String tableLocation = this.temp.resolve("iceberg-table").toFile().toString();
        HadoopTables tables = new HadoopTables(CONF);
        PartitionSpec spec = PartitionSpec.unpartitioned();
        Table table = tables.create(SCHEMA, spec, this.properties, tableLocation);
        ArrayList firstBatchRecords = Lists.newArrayList((Object[])new SimpleRecord[]{new SimpleRecord(1, "a"), new SimpleRecord(2, "b"), new SimpleRecord(3, "c")});
        Dataset firstDf = spark.createDataFrame((List)firstBatchRecords, SimpleRecord.class);
        firstDf.select("id", new String[]{"data"}).write().format("iceberg").mode("append").save(tableLocation);
        table.manageSnapshots().createTag("tag", table.currentSnapshot().snapshotId()).commit();
        ArrayList expectedRecords = Lists.newArrayList();
        expectedRecords.addAll(firstBatchRecords);
        Dataset tagSnapshotResult = spark.read().format("iceberg").option("tag", "tag").load(tableLocation);
        List tagSnapshotRecords = tagSnapshotResult.orderBy("id", new String[0]).as(Encoders.bean(SimpleRecord.class)).collectAsList();
        ((ListAssert)Assertions.assertThat((List)tagSnapshotRecords).as("Current snapshot rows should match", new Object[0])).isEqualTo((Object)expectedRecords);
        table.updateSchema().deleteColumn("data").commit();
        Dataset deletedColumnTagSnapshotResult = spark.read().format("iceberg").option("tag", "tag").load(tableLocation);
        List deletedColumnTagSnapshotRecords = deletedColumnTagSnapshotResult.orderBy("id", new String[0]).as(Encoders.bean(SimpleRecord.class)).collectAsList();
        ((ListAssert)Assertions.assertThat((List)deletedColumnTagSnapshotRecords).as("Current snapshot rows should match", new Object[0])).isEqualTo((Object)expectedRecords);
    }
}

