/*
 * Decompiled with CFR 0.152.
 */
package org.apache.iceberg.spark.source;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
import org.apache.iceberg.Snapshot;
import org.apache.iceberg.Table;
import org.apache.iceberg.hadoop.HadoopTables;
import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.spark.source.SimpleRecord;
import org.apache.iceberg.types.Type;
import org.apache.iceberg.types.Types;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.SparkSession;
import org.assertj.core.api.AbstractThrowableAssert;
import org.assertj.core.api.Assertions;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;

public class TestSnapshotSelection {
    private static final Configuration CONF = new Configuration();
    private static final Schema SCHEMA = new Schema(new Types.NestedField[]{Types.NestedField.optional((int)1, (String)"id", (Type)Types.IntegerType.get()), Types.NestedField.optional((int)2, (String)"data", (Type)Types.StringType.get())});
    @Rule
    public TemporaryFolder temp = new TemporaryFolder();
    private static SparkSession spark = null;

    @BeforeClass
    public static void startSpark() {
        spark = SparkSession.builder().master("local[2]").getOrCreate();
    }

    @AfterClass
    public static void stopSpark() {
        SparkSession currentSpark = spark;
        spark = null;
        currentSpark.stop();
    }

    @Test
    public void testSnapshotSelectionById() throws IOException {
        String tableLocation = this.temp.newFolder("iceberg-table").toString();
        HadoopTables tables = new HadoopTables(CONF);
        PartitionSpec spec = PartitionSpec.unpartitioned();
        Table table = tables.create(SCHEMA, spec, tableLocation);
        ArrayList firstBatchRecords = Lists.newArrayList((Object[])new SimpleRecord[]{new SimpleRecord(1, "a"), new SimpleRecord(2, "b"), new SimpleRecord(3, "c")});
        Dataset firstDf = spark.createDataFrame((List)firstBatchRecords, SimpleRecord.class);
        firstDf.select("id", new String[]{"data"}).write().format("iceberg").mode("append").save(tableLocation);
        ArrayList secondBatchRecords = Lists.newArrayList((Object[])new SimpleRecord[]{new SimpleRecord(4, "d"), new SimpleRecord(5, "e"), new SimpleRecord(6, "f")});
        Dataset secondDf = spark.createDataFrame((List)secondBatchRecords, SimpleRecord.class);
        secondDf.select("id", new String[]{"data"}).write().format("iceberg").mode("append").save(tableLocation);
        Assert.assertEquals((String)"Expected 2 snapshots", (long)2L, (long)Iterables.size((Iterable)table.snapshots()));
        Dataset currentSnapshotResult = spark.read().format("iceberg").load(tableLocation);
        List currentSnapshotRecords = currentSnapshotResult.orderBy("id", new String[0]).as(Encoders.bean(SimpleRecord.class)).collectAsList();
        ArrayList expectedRecords = Lists.newArrayList();
        expectedRecords.addAll(firstBatchRecords);
        expectedRecords.addAll(secondBatchRecords);
        Assert.assertEquals((String)"Current snapshot rows should match", (Object)expectedRecords, (Object)currentSnapshotRecords);
        Snapshot currentSnapshot = table.currentSnapshot();
        Long parentSnapshotId = currentSnapshot.parentId();
        Dataset previousSnapshotResult = spark.read().format("iceberg").option("snapshot-id", parentSnapshotId.longValue()).load(tableLocation);
        List previousSnapshotRecords = previousSnapshotResult.orderBy("id", new String[0]).as(Encoders.bean(SimpleRecord.class)).collectAsList();
        Assert.assertEquals((String)"Previous snapshot rows should match", (Object)firstBatchRecords, (Object)previousSnapshotRecords);
    }

    @Test
    public void testSnapshotSelectionByTimestamp() throws IOException {
        String tableLocation = this.temp.newFolder("iceberg-table").toString();
        HadoopTables tables = new HadoopTables(CONF);
        PartitionSpec spec = PartitionSpec.unpartitioned();
        Table table = tables.create(SCHEMA, spec, tableLocation);
        ArrayList firstBatchRecords = Lists.newArrayList((Object[])new SimpleRecord[]{new SimpleRecord(1, "a"), new SimpleRecord(2, "b"), new SimpleRecord(3, "c")});
        Dataset firstDf = spark.createDataFrame((List)firstBatchRecords, SimpleRecord.class);
        firstDf.select("id", new String[]{"data"}).write().format("iceberg").mode("append").save(tableLocation);
        long firstSnapshotTimestamp = System.currentTimeMillis();
        ArrayList secondBatchRecords = Lists.newArrayList((Object[])new SimpleRecord[]{new SimpleRecord(4, "d"), new SimpleRecord(5, "e"), new SimpleRecord(6, "f")});
        Dataset secondDf = spark.createDataFrame((List)secondBatchRecords, SimpleRecord.class);
        secondDf.select("id", new String[]{"data"}).write().format("iceberg").mode("append").save(tableLocation);
        Assert.assertEquals((String)"Expected 2 snapshots", (long)2L, (long)Iterables.size((Iterable)table.snapshots()));
        Dataset currentSnapshotResult = spark.read().format("iceberg").load(tableLocation);
        List currentSnapshotRecords = currentSnapshotResult.orderBy("id", new String[0]).as(Encoders.bean(SimpleRecord.class)).collectAsList();
        ArrayList expectedRecords = Lists.newArrayList();
        expectedRecords.addAll(firstBatchRecords);
        expectedRecords.addAll(secondBatchRecords);
        Assert.assertEquals((String)"Current snapshot rows should match", (Object)expectedRecords, (Object)currentSnapshotRecords);
        Dataset previousSnapshotResult = spark.read().format("iceberg").option("as-of-timestamp", firstSnapshotTimestamp).load(tableLocation);
        List previousSnapshotRecords = previousSnapshotResult.orderBy("id", new String[0]).as(Encoders.bean(SimpleRecord.class)).collectAsList();
        Assert.assertEquals((String)"Previous snapshot rows should match", (Object)firstBatchRecords, (Object)previousSnapshotRecords);
    }

    @Test
    public void testSnapshotSelectionByInvalidSnapshotId() throws IOException {
        String tableLocation = this.temp.newFolder("iceberg-table").toString();
        HadoopTables tables = new HadoopTables(CONF);
        PartitionSpec spec = PartitionSpec.unpartitioned();
        tables.create(SCHEMA, spec, tableLocation);
        Dataset df = spark.read().format("iceberg").option("snapshot-id", -10L).load(tableLocation);
        ((AbstractThrowableAssert)Assertions.assertThatThrownBy(() -> ((Dataset)df).collectAsList()).isInstanceOf(IllegalArgumentException.class)).hasMessage("Cannot find snapshot with ID -10");
    }

    @Test
    public void testSnapshotSelectionByInvalidTimestamp() throws IOException {
        long timestamp = System.currentTimeMillis();
        String tableLocation = this.temp.newFolder("iceberg-table").toString();
        HadoopTables tables = new HadoopTables(CONF);
        PartitionSpec spec = PartitionSpec.unpartitioned();
        tables.create(SCHEMA, spec, tableLocation);
        ((AbstractThrowableAssert)Assertions.assertThatThrownBy(() -> spark.read().format("iceberg").option("as-of-timestamp", timestamp).load(tableLocation)).isInstanceOf(IllegalArgumentException.class)).hasMessageContaining("Cannot find a snapshot older than");
    }

    @Test
    public void testSnapshotSelectionBySnapshotIdAndTimestamp() throws IOException {
        String tableLocation = this.temp.newFolder("iceberg-table").toString();
        HadoopTables tables = new HadoopTables(CONF);
        PartitionSpec spec = PartitionSpec.unpartitioned();
        Table table = tables.create(SCHEMA, spec, tableLocation);
        ArrayList firstBatchRecords = Lists.newArrayList((Object[])new SimpleRecord[]{new SimpleRecord(1, "a"), new SimpleRecord(2, "b"), new SimpleRecord(3, "c")});
        Dataset firstDf = spark.createDataFrame((List)firstBatchRecords, SimpleRecord.class);
        firstDf.select("id", new String[]{"data"}).write().format("iceberg").mode("append").save(tableLocation);
        long timestamp = System.currentTimeMillis();
        long snapshotId = table.currentSnapshot().snapshotId();
        ((AbstractThrowableAssert)Assertions.assertThatThrownBy(() -> spark.read().format("iceberg").option("snapshot-id", snapshotId).option("as-of-timestamp", timestamp).load(tableLocation)).isInstanceOf(IllegalArgumentException.class)).hasMessageContaining("Cannot specify both snapshot-id").hasMessageContaining("and as-of-timestamp");
    }
}

