/*
 * Decompiled with CFR 0.152.
 */
package org.apache.iceberg.spark.source;

import java.io.File;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.nio.file.Path;
import java.time.LocalDateTime;
import java.time.ZoneOffset;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.StringJoiner;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecordBuilder;
import org.apache.iceberg.ContentFile;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.DeleteFile;
import org.apache.iceberg.Files;
import org.apache.iceberg.HistoryEntry;
import org.apache.iceberg.ManifestContent;
import org.apache.iceberg.ManifestFile;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
import org.apache.iceberg.Snapshot;
import org.apache.iceberg.StructLike;
import org.apache.iceberg.Table;
import org.apache.iceberg.TestHelpers;
import org.apache.iceberg.actions.DeleteOrphanFiles;
import org.apache.iceberg.actions.RewriteManifests;
import org.apache.iceberg.avro.Avro;
import org.apache.iceberg.avro.AvroIterable;
import org.apache.iceberg.avro.AvroSchemaUtil;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.data.FileHelpers;
import org.apache.iceberg.data.GenericRecord;
import org.apache.iceberg.deletes.PositionDelete;
import org.apache.iceberg.deletes.PositionDeleteWriter;
import org.apache.iceberg.encryption.EncryptedOutputFile;
import org.apache.iceberg.expressions.Expression;
import org.apache.iceberg.expressions.Expressions;
import org.apache.iceberg.io.InputFile;
import org.apache.iceberg.io.OutputFile;
import org.apache.iceberg.io.OutputFileFactory;
import org.apache.iceberg.mapping.MappingUtil;
import org.apache.iceberg.mapping.NameMapping;
import org.apache.iceberg.mapping.NameMappingParser;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.spark.SparkSchemaUtil;
import org.apache.iceberg.spark.SparkTableUtil;
import org.apache.iceberg.spark.TestBase;
import org.apache.iceberg.spark.actions.SparkActions;
import org.apache.iceberg.spark.data.TestHelpers;
import org.apache.iceberg.spark.source.SimpleRecord;
import org.apache.iceberg.spark.source.SparkFileWriterFactory;
import org.apache.iceberg.types.Type;
import org.apache.iceberg.types.Types;
import org.apache.iceberg.util.Pair;
import org.apache.spark.SparkException;
import org.apache.spark.sql.Column;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.functions;
import org.apache.spark.sql.internal.SQLConf;
import org.apache.spark.sql.types.StructType;
import org.assertj.core.api.AbstractIntegerAssert;
import org.assertj.core.api.AbstractLongAssert;
import org.assertj.core.api.AbstractThrowableAssert;
import org.assertj.core.api.Assertions;
import org.assertj.core.api.IterableAssert;
import org.assertj.core.api.IteratorAssert;
import org.assertj.core.api.ListAssert;
import org.assertj.core.api.ObjectAssert;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;

public abstract class TestIcebergSourceTablesBase
extends TestBase {
    private static final Schema SCHEMA = new Schema(new Types.NestedField[]{Types.NestedField.optional((int)1, (String)"id", (Type)Types.IntegerType.get()), Types.NestedField.optional((int)2, (String)"data", (Type)Types.StringType.get())});
    private static final Schema SCHEMA2 = new Schema(new Types.NestedField[]{Types.NestedField.optional((int)1, (String)"id", (Type)Types.IntegerType.get()), Types.NestedField.optional((int)2, (String)"data", (Type)Types.StringType.get()), Types.NestedField.optional((int)3, (String)"category", (Type)Types.StringType.get())});
    private static final Schema SCHEMA3 = new Schema(new Types.NestedField[]{Types.NestedField.optional((int)1, (String)"id", (Type)Types.IntegerType.get()), Types.NestedField.optional((int)3, (String)"category", (Type)Types.StringType.get())});
    private static final PartitionSpec SPEC = PartitionSpec.builderFor((Schema)SCHEMA).identity("id").build();
    @TempDir
    protected Path temp;

    public abstract Table createTable(TableIdentifier var1, Schema var2, PartitionSpec var3, Map<String, String> var4);

    public abstract Table loadTable(TableIdentifier var1, String var2);

    public abstract String loadLocation(TableIdentifier var1, String var2);

    public abstract String loadLocation(TableIdentifier var1);

    public abstract void dropTable(TableIdentifier var1) throws IOException;

    @AfterEach
    public void removeTable() {
        spark.sql("DROP TABLE IF EXISTS parquet_table");
    }

    private Table createTable(TableIdentifier ident, Schema schema, PartitionSpec spec) {
        return this.createTable(ident, schema, spec, (Map<String, String>)ImmutableMap.of());
    }

    @Test
    public synchronized void testTablesSupport() {
        TableIdentifier tableIdentifier = TableIdentifier.of((String[])new String[]{"db", "table"});
        this.createTable(tableIdentifier, SCHEMA, PartitionSpec.unpartitioned());
        ArrayList expectedRecords = Lists.newArrayList((Object[])new SimpleRecord[]{new SimpleRecord(1, "1"), new SimpleRecord(2, "2"), new SimpleRecord(3, "3")});
        Dataset inputDf = spark.createDataFrame((List)expectedRecords, SimpleRecord.class);
        inputDf.select("id", new String[]{"data"}).write().format("iceberg").mode(SaveMode.Append).save(this.loadLocation(tableIdentifier));
        Dataset resultDf = spark.read().format("iceberg").load(this.loadLocation(tableIdentifier));
        List actualRecords = resultDf.orderBy("id", new String[0]).as(Encoders.bean(SimpleRecord.class)).collectAsList();
        ((ListAssert)Assertions.assertThat((List)actualRecords).as("Records should match", new Object[0])).isEqualTo((Object)expectedRecords);
    }

    @Test
    public void testEntriesTable() throws Exception {
        TableIdentifier tableIdentifier = TableIdentifier.of((String[])new String[]{"db", "entries_test"});
        Table table = this.createTable(tableIdentifier, SCHEMA, PartitionSpec.unpartitioned());
        Table entriesTable = this.loadTable(tableIdentifier, "entries");
        ArrayList records = Lists.newArrayList((Object[])new SimpleRecord[]{new SimpleRecord(1, "1")});
        Dataset inputDf = spark.createDataFrame((List)records, SimpleRecord.class);
        inputDf.select("id", new String[]{"data"}).write().format("iceberg").mode("append").save(this.loadLocation(tableIdentifier));
        table.refresh();
        Dataset entriesTableDs = spark.read().format("iceberg").load(this.loadLocation(tableIdentifier, "entries"));
        List actual = TestHelpers.selectNonDerived((Dataset<Row>)entriesTableDs).collectAsList();
        Snapshot snapshot = table.currentSnapshot();
        ((ListAssert)Assertions.assertThat((List)snapshot.allManifests(table.io())).as("Should only contain one manifest", new Object[0])).hasSize(1);
        InputFile manifest = table.io().newInputFile(((ManifestFile)snapshot.allManifests(table.io()).get(0)).path());
        ArrayList expected = Lists.newArrayList();
        try (AvroIterable rows = Avro.read((InputFile)manifest).project(entriesTable.schema()).build();){
            rows.forEach(row -> {
                row.put(2, (Object)1L);
                row.put(3, (Object)1L);
                GenericData.Record file = (GenericData.Record)row.get("data_file");
                TestHelpers.asMetadataRecord(file);
                expected.add(row);
            });
        }
        ((ListAssert)Assertions.assertThat((List)expected).as("Entries table should have one row", new Object[0])).hasSize(1);
        ((ListAssert)Assertions.assertThat((List)actual).as("Actual results should have one row", new Object[0])).hasSize(1);
        TestHelpers.assertEqualsSafe(TestHelpers.nonDerivedSchema((Dataset<Row>)entriesTableDs), (GenericData.Record)expected.get(0), (Row)actual.get(0));
    }

    @Test
    public void testEntriesTablePartitionedPrune() {
        TableIdentifier tableIdentifier = TableIdentifier.of((String[])new String[]{"db", "entries_test"});
        Table table = this.createTable(tableIdentifier, SCHEMA, SPEC);
        ArrayList records = Lists.newArrayList((Object[])new SimpleRecord[]{new SimpleRecord(1, "1")});
        Dataset inputDf = spark.createDataFrame((List)records, SimpleRecord.class);
        inputDf.select("id", new String[]{"data"}).write().format("iceberg").mode("append").save(this.loadLocation(tableIdentifier));
        table.refresh();
        List actual = spark.read().format("iceberg").load(this.loadLocation(tableIdentifier, "entries")).select("status", new String[0]).collectAsList();
        ((ListAssert)Assertions.assertThat((List)actual).as("Results should contain only one status", new Object[0])).hasSize(1);
        ((AbstractIntegerAssert)Assertions.assertThat((int)((Row)actual.get(0)).getInt(0)).as("That status should be Added (1)", new Object[0])).isEqualTo(1);
    }

    @Test
    public void testEntriesTableDataFilePrune() {
        TableIdentifier tableIdentifier = TableIdentifier.of((String[])new String[]{"db", "entries_test"});
        Table table = this.createTable(tableIdentifier, SCHEMA, PartitionSpec.unpartitioned());
        ArrayList records = Lists.newArrayList((Object[])new SimpleRecord[]{new SimpleRecord(1, "1")});
        Dataset inputDf = spark.createDataFrame((List)records, SimpleRecord.class);
        inputDf.select("id", new String[]{"data"}).write().format("iceberg").mode("append").save(this.loadLocation(tableIdentifier));
        table.refresh();
        DataFile file = (DataFile)table.currentSnapshot().addedDataFiles(table.io()).iterator().next();
        List<Object[]> singleActual = this.rowsToJava(spark.read().format("iceberg").load(this.loadLocation(tableIdentifier, "entries")).select("data_file.file_path", new String[0]).collectAsList());
        ImmutableList singleExpected = ImmutableList.of((Object)this.row(file.location()));
        this.assertEquals("Should prune a single element from a nested struct", (List<Object[]>)singleExpected, singleActual);
    }

    @Test
    public void testEntriesTableDataFilePruneMulti() {
        TableIdentifier tableIdentifier = TableIdentifier.of((String[])new String[]{"db", "entries_test"});
        Table table = this.createTable(tableIdentifier, SCHEMA, PartitionSpec.unpartitioned());
        ArrayList records = Lists.newArrayList((Object[])new SimpleRecord[]{new SimpleRecord(1, "1")});
        Dataset inputDf = spark.createDataFrame((List)records, SimpleRecord.class);
        inputDf.select("id", new String[]{"data"}).write().format("iceberg").mode("append").save(this.loadLocation(tableIdentifier));
        table.refresh();
        DataFile file = (DataFile)table.currentSnapshot().addedDataFiles(table.io()).iterator().next();
        List<Object[]> multiActual = this.rowsToJava(spark.read().format("iceberg").load(this.loadLocation(tableIdentifier, "entries")).select("data_file.file_path", new String[]{"data_file.value_counts", "data_file.record_count", "data_file.column_sizes"}).collectAsList());
        ImmutableList multiExpected = ImmutableList.of((Object)this.row(file.location(), file.valueCounts(), file.recordCount(), file.columnSizes()));
        this.assertEquals("Should prune a single element from a nested struct", (List<Object[]>)multiExpected, multiActual);
    }

    @Test
    public void testFilesSelectMap() {
        TableIdentifier tableIdentifier = TableIdentifier.of((String[])new String[]{"db", "entries_test"});
        Table table = this.createTable(tableIdentifier, SCHEMA, PartitionSpec.unpartitioned());
        ArrayList records = Lists.newArrayList((Object[])new SimpleRecord[]{new SimpleRecord(1, "1")});
        Dataset inputDf = spark.createDataFrame((List)records, SimpleRecord.class);
        inputDf.select("id", new String[]{"data"}).write().format("iceberg").mode("append").save(this.loadLocation(tableIdentifier));
        table.refresh();
        DataFile file = (DataFile)table.currentSnapshot().addedDataFiles(table.io()).iterator().next();
        List<Object[]> multiActual = this.rowsToJava(spark.read().format("iceberg").load(this.loadLocation(tableIdentifier, "files")).select("file_path", new String[]{"value_counts", "record_count", "column_sizes"}).collectAsList());
        ImmutableList multiExpected = ImmutableList.of((Object)this.row(file.location(), file.valueCounts(), file.recordCount(), file.columnSizes()));
        this.assertEquals("Should prune a single element from a row", (List<Object[]>)multiExpected, multiActual);
    }

    @Test
    public void testAllEntriesTable() throws Exception {
        TableIdentifier tableIdentifier = TableIdentifier.of((String[])new String[]{"db", "entries_test"});
        Table table = this.createTable(tableIdentifier, SCHEMA, PartitionSpec.unpartitioned());
        Table entriesTable = this.loadTable(tableIdentifier, "all_entries");
        Dataset df1 = spark.createDataFrame((List)Lists.newArrayList((Object[])new SimpleRecord[]{new SimpleRecord(1, "a")}), SimpleRecord.class);
        Dataset df2 = spark.createDataFrame((List)Lists.newArrayList((Object[])new SimpleRecord[]{new SimpleRecord(1, "b")}), SimpleRecord.class);
        df1.select("id", new String[]{"data"}).write().format("iceberg").mode("append").save(this.loadLocation(tableIdentifier));
        table.newDelete().deleteFromRowFilter((Expression)Expressions.equal((String)"id", (Object)1)).commit();
        df2.select("id", new String[]{"data"}).write().format("iceberg").mode("append").save(this.loadLocation(tableIdentifier));
        table.refresh();
        Dataset entriesTableDs = spark.read().format("iceberg").load(this.loadLocation(tableIdentifier, "all_entries")).orderBy("snapshot_id", new String[0]);
        List actual = TestHelpers.selectNonDerived((Dataset<Row>)entriesTableDs).collectAsList();
        ArrayList expected = Lists.newArrayList();
        for (ManifestFile manifest : Iterables.concat((Iterable)Iterables.transform((Iterable)table.snapshots(), s -> s.allManifests(table.io())))) {
            InputFile in = table.io().newInputFile(manifest.path());
            AvroIterable rows = Avro.read((InputFile)in).project(entriesTable.schema()).build();
            try {
                rows.forEach(row -> {
                    if (row.get("snapshot_id").equals(table.currentSnapshot().snapshotId())) {
                        row.put(2, (Object)3L);
                        row.put(3, (Object)3L);
                    } else {
                        row.put(2, (Object)1L);
                        row.put(3, (Object)1L);
                    }
                    GenericData.Record file = (GenericData.Record)row.get("data_file");
                    TestHelpers.asMetadataRecord(file);
                    expected.add(row);
                });
            }
            finally {
                if (rows == null) continue;
                rows.close();
            }
        }
        expected.sort(Comparator.comparing(o -> (Long)o.get("snapshot_id")));
        ((ListAssert)Assertions.assertThat((List)expected).as("Entries table should have 3 rows", new Object[0])).hasSize(3);
        ((ListAssert)Assertions.assertThat((List)actual).as("Actual results should have 3 rows", new Object[0])).hasSize(3);
        for (int i = 0; i < expected.size(); ++i) {
            TestHelpers.assertEqualsSafe(TestHelpers.nonDerivedSchema((Dataset<Row>)entriesTableDs), (GenericData.Record)expected.get(i), (Row)actual.get(i));
        }
    }

    @Test
    public void testCountEntriesTable() {
        TableIdentifier tableIdentifier = TableIdentifier.of((String[])new String[]{"db", "count_entries_test"});
        this.createTable(tableIdentifier, SCHEMA, PartitionSpec.unpartitioned());
        ArrayList records = Lists.newArrayList((Object[])new SimpleRecord[]{new SimpleRecord(1, "1")});
        Dataset inputDf = spark.createDataFrame((List)records, SimpleRecord.class);
        inputDf.select("id", new String[]{"data"}).write().format("iceberg").mode("append").save(this.loadLocation(tableIdentifier));
        boolean expectedEntryCount = true;
        ((AbstractLongAssert)Assertions.assertThat((long)spark.read().format("iceberg").load(this.loadLocation(tableIdentifier, "entries")).count()).as("Count should return 1", new Object[0])).isEqualTo(1L);
        ((AbstractLongAssert)Assertions.assertThat((long)spark.read().format("iceberg").load(this.loadLocation(tableIdentifier, "all_entries")).count()).as("Count should return 1", new Object[0])).isEqualTo(1L);
    }

    @Test
    public void testFilesTable() throws Exception {
        TableIdentifier tableIdentifier = TableIdentifier.of((String[])new String[]{"db", "files_test"});
        Table table = this.createTable(tableIdentifier, SCHEMA, SPEC);
        Table entriesTable = this.loadTable(tableIdentifier, "entries");
        Dataset df1 = spark.createDataFrame((List)Lists.newArrayList((Object[])new SimpleRecord[]{new SimpleRecord(1, "a")}), SimpleRecord.class);
        Dataset df2 = spark.createDataFrame((List)Lists.newArrayList((Object[])new SimpleRecord[]{new SimpleRecord(2, "b")}), SimpleRecord.class);
        df1.select("id", new String[]{"data"}).write().format("iceberg").mode("append").save(this.loadLocation(tableIdentifier));
        df2.select("id", new String[]{"data"}).write().format("iceberg").mode("append").save(this.loadLocation(tableIdentifier));
        table.newDelete().deleteFromRowFilter((Expression)Expressions.equal((String)"id", (Object)1)).commit();
        Dataset filesTableDs = spark.read().format("iceberg").load(this.loadLocation(tableIdentifier, "files"));
        List actual = TestHelpers.selectNonDerived((Dataset<Row>)filesTableDs).collectAsList();
        ArrayList expected = Lists.newArrayList();
        for (ManifestFile manifest : table.currentSnapshot().dataManifests(table.io())) {
            InputFile in = table.io().newInputFile(manifest.path());
            AvroIterable rows = Avro.read((InputFile)in).project(entriesTable.schema()).build();
            try {
                for (GenericData.Record record : rows) {
                    if ((Integer)record.get("status") >= 2) continue;
                    GenericData.Record file = (GenericData.Record)record.get("data_file");
                    TestHelpers.asMetadataRecord(file);
                    expected.add(file);
                }
            }
            finally {
                if (rows == null) continue;
                rows.close();
            }
        }
        ((ListAssert)Assertions.assertThat((List)expected).as("Files table should have one row", new Object[0])).hasSize(1);
        ((ListAssert)Assertions.assertThat((List)actual).as("Actual results should have one row", new Object[0])).hasSize(1);
        TestHelpers.assertEqualsSafe(TestHelpers.nonDerivedSchema((Dataset<Row>)filesTableDs), (GenericData.Record)expected.get(0), (Row)actual.get(0));
    }

    @Test
    public void testFilesTableWithSnapshotIdInheritance() throws Exception {
        TableIdentifier tableIdentifier = TableIdentifier.of((String[])new String[]{"db", "files_inheritance_test"});
        Table table = this.createTable(tableIdentifier, SCHEMA, SPEC);
        table.updateProperties().set("compatibility.snapshot-id-inheritance.enabled", "true").commit();
        Table entriesTable = this.loadTable(tableIdentifier, "entries");
        spark.sql(String.format("CREATE TABLE parquet_table (data string, id int) USING parquet PARTITIONED BY (id) LOCATION '%s'", this.temp.toFile()));
        ArrayList records = Lists.newArrayList((Object[])new SimpleRecord[]{new SimpleRecord(1, "a"), new SimpleRecord(2, "b")});
        Dataset inputDF = spark.createDataFrame((List)records, SimpleRecord.class);
        inputDF.select("data", new String[]{"id"}).write().mode("overwrite").insertInto("parquet_table");
        NameMapping mapping = MappingUtil.create((Schema)table.schema());
        String mappingJson = NameMappingParser.toJson((NameMapping)mapping);
        table.updateProperties().set("schema.name-mapping.default", mappingJson).commit();
        String stagingLocation = table.location() + "/metadata";
        SparkTableUtil.importSparkTable((SparkSession)spark, (org.apache.spark.sql.catalyst.TableIdentifier)new org.apache.spark.sql.catalyst.TableIdentifier("parquet_table"), (Table)table, (String)stagingLocation);
        Dataset filesTableDs = spark.read().format("iceberg").load(this.loadLocation(tableIdentifier, "files"));
        List actual = TestHelpers.selectNonDerived((Dataset<Row>)filesTableDs).collectAsList();
        ArrayList expected = Lists.newArrayList();
        for (ManifestFile manifest : table.currentSnapshot().dataManifests(table.io())) {
            InputFile in = table.io().newInputFile(manifest.path());
            AvroIterable rows = Avro.read((InputFile)in).project(entriesTable.schema()).build();
            try {
                for (GenericData.Record record : rows) {
                    GenericData.Record file = (GenericData.Record)record.get("data_file");
                    TestHelpers.asMetadataRecord(file);
                    expected.add(file);
                }
            }
            finally {
                if (rows == null) continue;
                rows.close();
            }
        }
        Types.StructType struct = TestHelpers.nonDerivedSchema((Dataset<Row>)filesTableDs);
        ((ListAssert)Assertions.assertThat((List)expected).as("Files table should have 2 rows", new Object[0])).hasSize(2);
        ((ListAssert)Assertions.assertThat((List)actual).as("Actual results should have 2 rows", new Object[0])).hasSize(2);
        TestHelpers.assertEqualsSafe(struct, (GenericData.Record)expected.get(0), (Row)actual.get(0));
        TestHelpers.assertEqualsSafe(struct, (GenericData.Record)expected.get(1), (Row)actual.get(1));
    }

    @Test
    public void testV1EntriesTableWithSnapshotIdInheritance() throws Exception {
        TableIdentifier tableIdentifier = TableIdentifier.of((String[])new String[]{"db", "entries_inheritance_test"});
        ImmutableMap properties = ImmutableMap.of((Object)"format-version", (Object)"1");
        Table table = this.createTable(tableIdentifier, SCHEMA, SPEC, (Map<String, String>)properties);
        table.updateProperties().set("compatibility.snapshot-id-inheritance.enabled", "true").commit();
        spark.sql(String.format("CREATE TABLE parquet_table (data string, id int) USING parquet PARTITIONED BY (id) LOCATION '%s'", this.temp.toFile()));
        ArrayList records = Lists.newArrayList((Object[])new SimpleRecord[]{new SimpleRecord(1, "a"), new SimpleRecord(2, "b")});
        Dataset inputDF = spark.createDataFrame((List)records, SimpleRecord.class);
        inputDF.select("data", new String[]{"id"}).write().mode("overwrite").insertInto("parquet_table");
        String stagingLocation = table.location() + "/metadata";
        SparkTableUtil.importSparkTable((SparkSession)spark, (org.apache.spark.sql.catalyst.TableIdentifier)new org.apache.spark.sql.catalyst.TableIdentifier("parquet_table"), (Table)table, (String)stagingLocation);
        List actual = spark.read().format("iceberg").load(this.loadLocation(tableIdentifier, "entries")).select("sequence_number", new String[]{"snapshot_id", "data_file"}).collectAsList();
        table.refresh();
        long snapshotId = table.currentSnapshot().snapshotId();
        ((ListAssert)Assertions.assertThat((List)actual).as("Entries table should have 2 rows", new Object[0])).hasSize(2);
        ((AbstractLongAssert)Assertions.assertThat((long)((Row)actual.get(0)).getLong(0)).as("Sequence number must match", new Object[0])).isEqualTo(0L);
        ((AbstractLongAssert)Assertions.assertThat((long)((Row)actual.get(0)).getLong(1)).as("Snapshot id must match", new Object[0])).isEqualTo(snapshotId);
        ((AbstractLongAssert)Assertions.assertThat((long)((Row)actual.get(1)).getLong(0)).as("Sequence number must match", new Object[0])).isEqualTo(0L);
        ((AbstractLongAssert)Assertions.assertThat((long)((Row)actual.get(1)).getLong(1)).as("Snapshot id must match", new Object[0])).isEqualTo(snapshotId);
    }

    @Test
    public void testFilesUnpartitionedTable() throws Exception {
        TableIdentifier tableIdentifier = TableIdentifier.of((String[])new String[]{"db", "unpartitioned_files_test"});
        Table table = this.createTable(tableIdentifier, SCHEMA, PartitionSpec.unpartitioned());
        Table entriesTable = this.loadTable(tableIdentifier, "entries");
        Dataset df1 = spark.createDataFrame((List)Lists.newArrayList((Object[])new SimpleRecord[]{new SimpleRecord(1, "a")}), SimpleRecord.class);
        Dataset df2 = spark.createDataFrame((List)Lists.newArrayList((Object[])new SimpleRecord[]{new SimpleRecord(2, "b")}), SimpleRecord.class);
        df1.select("id", new String[]{"data"}).write().format("iceberg").mode("append").save(this.loadLocation(tableIdentifier));
        table.refresh();
        DataFile toDelete = (DataFile)Iterables.getOnlyElement((Iterable)table.currentSnapshot().addedDataFiles(table.io()));
        df2.select("id", new String[]{"data"}).write().format("iceberg").mode("append").save(this.loadLocation(tableIdentifier));
        table.newDelete().deleteFile(toDelete).commit();
        Dataset filesTableDs = spark.read().format("iceberg").load(this.loadLocation(tableIdentifier, "files"));
        List actual = TestHelpers.selectNonDerived((Dataset<Row>)filesTableDs).collectAsList();
        ArrayList expected = Lists.newArrayList();
        for (ManifestFile manifest : table.currentSnapshot().dataManifests(table.io())) {
            InputFile in = table.io().newInputFile(manifest.path());
            AvroIterable rows = Avro.read((InputFile)in).project(entriesTable.schema()).build();
            try {
                for (GenericData.Record record : rows) {
                    if ((Integer)record.get("status") >= 2) continue;
                    GenericData.Record file = (GenericData.Record)record.get("data_file");
                    TestHelpers.asMetadataRecord(file);
                    expected.add(file);
                }
            }
            finally {
                if (rows == null) continue;
                rows.close();
            }
        }
        ((ListAssert)Assertions.assertThat((List)expected).as("Files table should have one row", new Object[0])).hasSize(1);
        ((ListAssert)Assertions.assertThat((List)actual).as("Actual results should have one row", new Object[0])).hasSize(1);
        TestHelpers.assertEqualsSafe(TestHelpers.nonDerivedSchema((Dataset<Row>)filesTableDs), (GenericData.Record)expected.get(0), (Row)actual.get(0));
    }

    @Test
    public void testAllMetadataTablesWithStagedCommits() {
        TableIdentifier tableIdentifier = TableIdentifier.of((String[])new String[]{"db", "stage_aggregate_table_test"});
        Table table = this.createTable(tableIdentifier, SCHEMA, SPEC);
        table.updateProperties().set("write.wap.enabled", "true").commit();
        spark.conf().set("spark.wap.id", "1234567");
        Dataset df1 = spark.createDataFrame((List)Lists.newArrayList((Object[])new SimpleRecord[]{new SimpleRecord(1, "a")}), SimpleRecord.class);
        Dataset df2 = spark.createDataFrame((List)Lists.newArrayList((Object[])new SimpleRecord[]{new SimpleRecord(2, "b")}), SimpleRecord.class);
        df1.select("id", new String[]{"data"}).write().format("iceberg").mode("append").save(this.loadLocation(tableIdentifier));
        df2.select("id", new String[]{"data"}).write().format("iceberg").mode("append").save(this.loadLocation(tableIdentifier));
        List actualAllData = spark.read().format("iceberg").load(this.loadLocation(tableIdentifier, "all_data_files")).collectAsList();
        List actualAllManifests = spark.read().format("iceberg").load(this.loadLocation(tableIdentifier, "all_manifests")).collectAsList();
        List actualAllEntries = spark.read().format("iceberg").load(this.loadLocation(tableIdentifier, "all_entries")).collectAsList();
        ((IteratorAssert)Assertions.assertThat(table.snapshots().iterator()).as("Stage table should have some snapshots", new Object[0])).hasNext();
        ((ObjectAssert)Assertions.assertThat((Object)table.currentSnapshot()).as("Stage table should have null currentSnapshot", new Object[0])).isNull();
        ((ListAssert)Assertions.assertThat((List)actualAllData).as("Actual results should have two rows", new Object[0])).hasSize(2);
        ((ListAssert)Assertions.assertThat((List)actualAllManifests).as("Actual results should have two rows", new Object[0])).hasSize(2);
        ((ListAssert)Assertions.assertThat((List)actualAllEntries).as("Actual results should have two rows", new Object[0])).hasSize(2);
    }

    @Test
    public void testAllDataFilesTable() throws Exception {
        TableIdentifier tableIdentifier = TableIdentifier.of((String[])new String[]{"db", "files_test"});
        Table table = this.createTable(tableIdentifier, SCHEMA, SPEC);
        Table entriesTable = this.loadTable(tableIdentifier, "entries");
        Dataset df1 = spark.createDataFrame((List)Lists.newArrayList((Object[])new SimpleRecord[]{new SimpleRecord(1, "a")}), SimpleRecord.class);
        Dataset df2 = spark.createDataFrame((List)Lists.newArrayList((Object[])new SimpleRecord[]{new SimpleRecord(2, "b")}), SimpleRecord.class);
        df1.select("id", new String[]{"data"}).write().format("iceberg").mode("append").save(this.loadLocation(tableIdentifier));
        table.newDelete().deleteFromRowFilter((Expression)Expressions.equal((String)"id", (Object)1)).commit();
        df2.select("id", new String[]{"data"}).write().format("iceberg").mode("append").save(this.loadLocation(tableIdentifier));
        table.refresh();
        Dataset filesTableDs = spark.read().format("iceberg").load(this.loadLocation(tableIdentifier, "all_data_files"));
        List actual = TestHelpers.selectNonDerived((Dataset<Row>)filesTableDs).collectAsList();
        actual.sort(Comparator.comparing(o -> o.getString(1)));
        ArrayList expected = Lists.newArrayList();
        Iterable dataManifests = Iterables.concat((Iterable)Iterables.transform((Iterable)table.snapshots(), snapshot -> snapshot.dataManifests(table.io())));
        for (ManifestFile manifest : dataManifests) {
            InputFile in = table.io().newInputFile(manifest.path());
            AvroIterable rows = Avro.read((InputFile)in).project(entriesTable.schema()).build();
            try {
                for (GenericData.Record record : rows) {
                    if ((Integer)record.get("status") >= 2) continue;
                    GenericData.Record file = (GenericData.Record)record.get("data_file");
                    TestHelpers.asMetadataRecord(file);
                    expected.add(file);
                }
            }
            finally {
                if (rows == null) continue;
                rows.close();
            }
        }
        expected.sort(Comparator.comparing(o -> o.get("file_path").toString()));
        ((ListAssert)Assertions.assertThat((List)expected).as("Files table should have two rows", new Object[0])).hasSize(2);
        ((ListAssert)Assertions.assertThat((List)actual).as("Actual results should have two rows", new Object[0])).hasSize(2);
        for (int i = 0; i < expected.size(); ++i) {
            TestHelpers.assertEqualsSafe(TestHelpers.nonDerivedSchema((Dataset<Row>)filesTableDs), (GenericData.Record)expected.get(i), (Row)actual.get(i));
        }
    }

    @Test
    public void testHistoryTable() {
        TableIdentifier tableIdentifier = TableIdentifier.of((String[])new String[]{"db", "history_test"});
        Table table = this.createTable(tableIdentifier, SCHEMA, PartitionSpec.unpartitioned());
        Table historyTable = this.loadTable(tableIdentifier, "history");
        ArrayList records = Lists.newArrayList((Object[])new SimpleRecord[]{new SimpleRecord(1, "1")});
        Dataset inputDf = spark.createDataFrame((List)records, SimpleRecord.class);
        inputDf.select("id", new String[]{"data"}).write().format("iceberg").mode("append").save(this.loadLocation(tableIdentifier));
        table.refresh();
        long firstSnapshotTimestamp = table.currentSnapshot().timestampMillis();
        long firstSnapshotId = table.currentSnapshot().snapshotId();
        inputDf.select("id", new String[]{"data"}).write().format("iceberg").mode("append").save(this.loadLocation(tableIdentifier));
        table.refresh();
        long secondSnapshotTimestamp = table.currentSnapshot().timestampMillis();
        long secondSnapshotId = table.currentSnapshot().snapshotId();
        table.manageSnapshots().rollbackTo(firstSnapshotId).commit();
        long rollbackTimestamp = ((HistoryEntry)Iterables.getLast((Iterable)table.history())).timestampMillis();
        inputDf.select("id", new String[]{"data"}).write().format("iceberg").mode("append").save(this.loadLocation(tableIdentifier));
        table.refresh();
        long thirdSnapshotTimestamp = table.currentSnapshot().timestampMillis();
        long thirdSnapshotId = table.currentSnapshot().snapshotId();
        List actual = spark.read().format("iceberg").load(this.loadLocation(tableIdentifier, "history")).collectAsList();
        GenericRecordBuilder builder = new GenericRecordBuilder(AvroSchemaUtil.convert((Schema)historyTable.schema(), (String)"history"));
        ArrayList expected = Lists.newArrayList((Object[])new GenericData.Record[]{builder.set("made_current_at", (Object)(firstSnapshotTimestamp * 1000L)).set("snapshot_id", (Object)firstSnapshotId).set("parent_id", null).set("is_current_ancestor", (Object)true).build(), builder.set("made_current_at", (Object)(secondSnapshotTimestamp * 1000L)).set("snapshot_id", (Object)secondSnapshotId).set("parent_id", (Object)firstSnapshotId).set("is_current_ancestor", (Object)false).build(), builder.set("made_current_at", (Object)(rollbackTimestamp * 1000L)).set("snapshot_id", (Object)firstSnapshotId).set("parent_id", null).set("is_current_ancestor", (Object)true).build(), builder.set("made_current_at", (Object)(thirdSnapshotTimestamp * 1000L)).set("snapshot_id", (Object)thirdSnapshotId).set("parent_id", (Object)firstSnapshotId).set("is_current_ancestor", (Object)true).build()});
        ((ListAssert)Assertions.assertThat((List)actual).as("History table should have a row for each commit", new Object[0])).hasSize(4);
        TestHelpers.assertEqualsSafe(historyTable.schema().asStruct(), (GenericData.Record)expected.get(0), (Row)actual.get(0));
        TestHelpers.assertEqualsSafe(historyTable.schema().asStruct(), (GenericData.Record)expected.get(1), (Row)actual.get(1));
        TestHelpers.assertEqualsSafe(historyTable.schema().asStruct(), (GenericData.Record)expected.get(2), (Row)actual.get(2));
        TestHelpers.assertEqualsSafe(historyTable.schema().asStruct(), (GenericData.Record)expected.get(3), (Row)actual.get(3));
    }

    @Test
    public void testSnapshotsTable() {
        TableIdentifier tableIdentifier = TableIdentifier.of((String[])new String[]{"db", "snapshots_test"});
        Table table = this.createTable(tableIdentifier, SCHEMA, PartitionSpec.unpartitioned());
        Table snapTable = this.loadTable(tableIdentifier, "snapshots");
        ArrayList records = Lists.newArrayList((Object[])new SimpleRecord[]{new SimpleRecord(1, "1")});
        Dataset inputDf = spark.createDataFrame((List)records, SimpleRecord.class);
        inputDf.select("id", new String[]{"data"}).write().format("iceberg").mode("append").save(this.loadLocation(tableIdentifier));
        table.refresh();
        long firstSnapshotTimestamp = table.currentSnapshot().timestampMillis();
        long firstSnapshotId = table.currentSnapshot().snapshotId();
        String firstManifestList = table.currentSnapshot().manifestListLocation();
        table.newDelete().deleteFromRowFilter((Expression)Expressions.alwaysTrue()).commit();
        long secondSnapshotTimestamp = table.currentSnapshot().timestampMillis();
        long secondSnapshotId = table.currentSnapshot().snapshotId();
        String secondManifestList = table.currentSnapshot().manifestListLocation();
        table.manageSnapshots().rollbackTo(firstSnapshotId).commit();
        List actual = spark.read().format("iceberg").load(this.loadLocation(tableIdentifier, "snapshots")).collectAsList();
        GenericRecordBuilder builder = new GenericRecordBuilder(AvroSchemaUtil.convert((Schema)snapTable.schema(), (String)"snapshots"));
        ArrayList expected = Lists.newArrayList((Object[])new GenericData.Record[]{builder.set("committed_at", (Object)(firstSnapshotTimestamp * 1000L)).set("snapshot_id", (Object)firstSnapshotId).set("parent_id", null).set("operation", (Object)"append").set("manifest_list", (Object)firstManifestList).set("summary", (Object)ImmutableMap.of((Object)"added-records", (Object)"1", (Object)"added-data-files", (Object)"1", (Object)"changed-partition-count", (Object)"1", (Object)"total-data-files", (Object)"1", (Object)"total-records", (Object)"1")).build(), builder.set("committed_at", (Object)(secondSnapshotTimestamp * 1000L)).set("snapshot_id", (Object)secondSnapshotId).set("parent_id", (Object)firstSnapshotId).set("operation", (Object)"delete").set("manifest_list", (Object)secondManifestList).set("summary", (Object)ImmutableMap.of((Object)"deleted-records", (Object)"1", (Object)"deleted-data-files", (Object)"1", (Object)"changed-partition-count", (Object)"1", (Object)"total-records", (Object)"0", (Object)"total-data-files", (Object)"0")).build()});
        ((ListAssert)Assertions.assertThat((List)actual).as("Snapshots table should have a row for each snapshot", new Object[0])).hasSize(2);
        TestHelpers.assertEqualsSafe(snapTable.schema().asStruct(), (GenericData.Record)expected.get(0), (Row)actual.get(0));
        TestHelpers.assertEqualsSafe(snapTable.schema().asStruct(), (GenericData.Record)expected.get(1), (Row)actual.get(1));
    }

    @Test
    public void testPrunedSnapshotsTable() {
        TableIdentifier tableIdentifier = TableIdentifier.of((String[])new String[]{"db", "snapshots_test"});
        Table table = this.createTable(tableIdentifier, SCHEMA, PartitionSpec.unpartitioned());
        ArrayList records = Lists.newArrayList((Object[])new SimpleRecord[]{new SimpleRecord(1, "1")});
        Dataset inputDf = spark.createDataFrame((List)records, SimpleRecord.class);
        inputDf.select("id", new String[]{"data"}).write().format("iceberg").mode("append").save(this.loadLocation(tableIdentifier));
        table.refresh();
        long firstSnapshotTimestamp = table.currentSnapshot().timestampMillis();
        long firstSnapshotId = table.currentSnapshot().snapshotId();
        table.newDelete().deleteFromRowFilter((Expression)Expressions.alwaysTrue()).commit();
        long secondSnapshotTimestamp = table.currentSnapshot().timestampMillis();
        table.manageSnapshots().rollbackTo(firstSnapshotId).commit();
        Dataset actualDf = spark.read().format("iceberg").load(this.loadLocation(tableIdentifier, "snapshots")).select("operation", new String[]{"committed_at", "summary", "parent_id"});
        Schema projectedSchema = SparkSchemaUtil.convert((StructType)actualDf.schema());
        List actual = actualDf.collectAsList();
        GenericRecordBuilder builder = new GenericRecordBuilder(AvroSchemaUtil.convert((Schema)projectedSchema, (String)"snapshots"));
        ArrayList expected = Lists.newArrayList((Object[])new GenericData.Record[]{builder.set("committed_at", (Object)(firstSnapshotTimestamp * 1000L)).set("parent_id", null).set("operation", (Object)"append").set("summary", (Object)ImmutableMap.of((Object)"added-records", (Object)"1", (Object)"added-data-files", (Object)"1", (Object)"changed-partition-count", (Object)"1", (Object)"total-data-files", (Object)"1", (Object)"total-records", (Object)"1")).build(), builder.set("committed_at", (Object)(secondSnapshotTimestamp * 1000L)).set("parent_id", (Object)firstSnapshotId).set("operation", (Object)"delete").set("summary", (Object)ImmutableMap.of((Object)"deleted-records", (Object)"1", (Object)"deleted-data-files", (Object)"1", (Object)"changed-partition-count", (Object)"1", (Object)"total-records", (Object)"0", (Object)"total-data-files", (Object)"0")).build()});
        ((ListAssert)Assertions.assertThat((List)actual).as("Snapshots table should have a row for each snapshot", new Object[0])).hasSize(2);
        TestHelpers.assertEqualsSafe(projectedSchema.asStruct(), (GenericData.Record)expected.get(0), (Row)actual.get(0));
        TestHelpers.assertEqualsSafe(projectedSchema.asStruct(), (GenericData.Record)expected.get(1), (Row)actual.get(1));
    }

    @Test
    public void testManifestsTable() {
        TableIdentifier tableIdentifier = TableIdentifier.of((String[])new String[]{"db", "manifests_test"});
        Table table = this.createTable(tableIdentifier, SCHEMA, SPEC);
        Table manifestTable = this.loadTable(tableIdentifier, "manifests");
        Dataset df1 = spark.createDataFrame((List)Lists.newArrayList((Object[])new SimpleRecord[]{new SimpleRecord(1, "a"), new SimpleRecord(null, "b")}), SimpleRecord.class);
        df1.select("id", new String[]{"data"}).write().format("iceberg").mode("append").option("distribution-mode", "none").save(this.loadLocation(tableIdentifier));
        table.updateProperties().set("format-version", "2").commit();
        DeleteFile deleteFile = this.writePosDeleteFile(table);
        table.newRowDelta().addDeletes(deleteFile).commit();
        List actual = spark.read().format("iceberg").load(this.loadLocation(tableIdentifier, "manifests")).collectAsList();
        table.refresh();
        GenericRecordBuilder builder = new GenericRecordBuilder(AvroSchemaUtil.convert((Schema)manifestTable.schema(), (String)"manifests"));
        GenericRecordBuilder summaryBuilder = new GenericRecordBuilder(AvroSchemaUtil.convert((Types.StructType)manifestTable.schema().findType("partition_summaries.element").asStructType(), (String)"partition_summary"));
        List expected = Lists.transform((List)table.currentSnapshot().allManifests(table.io()), manifest -> builder.set("content", (Object)manifest.content().id()).set("path", (Object)manifest.path()).set("length", (Object)manifest.length()).set("partition_spec_id", (Object)manifest.partitionSpecId()).set("added_snapshot_id", (Object)manifest.snapshotId()).set("added_data_files_count", (Object)(manifest.content() == ManifestContent.DATA ? manifest.addedFilesCount() : 0)).set("existing_data_files_count", (Object)(manifest.content() == ManifestContent.DATA ? manifest.existingFilesCount() : 0)).set("deleted_data_files_count", (Object)(manifest.content() == ManifestContent.DATA ? manifest.deletedFilesCount() : 0)).set("added_delete_files_count", (Object)(manifest.content() == ManifestContent.DELETES ? manifest.addedFilesCount() : 0)).set("existing_delete_files_count", (Object)(manifest.content() == ManifestContent.DELETES ? manifest.existingFilesCount() : 0)).set("deleted_delete_files_count", (Object)(manifest.content() == ManifestContent.DELETES ? manifest.deletedFilesCount() : 0)).set("partition_summaries", (Object)Lists.transform((List)manifest.partitions(), partition -> summaryBuilder.set("contains_null", (Object)(manifest.content() == ManifestContent.DATA ? 1 : 0)).set("contains_nan", (Object)false).set("lower_bound", (Object)"1").set("upper_bound", (Object)"1").build())).build());
        ((ListAssert)Assertions.assertThat((List)actual).as("Manifests table should have two manifest rows", new Object[0])).hasSize(2);
        TestHelpers.assertEqualsSafe(manifestTable.schema().asStruct(), (GenericData.Record)expected.get(0), (Row)actual.get(0));
        TestHelpers.assertEqualsSafe(manifestTable.schema().asStruct(), (GenericData.Record)expected.get(1), (Row)actual.get(1));
    }

    @Test
    public void testPruneManifestsTable() {
        TableIdentifier tableIdentifier = TableIdentifier.of((String[])new String[]{"db", "manifests_test"});
        Table table = this.createTable(tableIdentifier, SCHEMA, SPEC);
        Table manifestTable = this.loadTable(tableIdentifier, "manifests");
        Dataset df1 = spark.createDataFrame((List)Lists.newArrayList((Object[])new SimpleRecord[]{new SimpleRecord(1, "a"), new SimpleRecord(null, "b")}), SimpleRecord.class);
        df1.select("id", new String[]{"data"}).write().format("iceberg").mode("append").save(this.loadLocation(tableIdentifier));
        if (!spark.version().startsWith("2")) {
            ((AbstractThrowableAssert)Assertions.assertThatThrownBy(() -> spark.read().format("iceberg").load(this.loadLocation(tableIdentifier, "manifests")).select("partition_spec_id", new String[]{"path", "partition_summaries.contains_null"}).collectAsList()).isInstanceOf(SparkException.class)).hasMessageContaining("Cannot project a partial list element struct");
        }
        Dataset actualDf = spark.read().format("iceberg").load(this.loadLocation(tableIdentifier, "manifests")).select("partition_spec_id", new String[]{"path", "partition_summaries"});
        Schema projectedSchema = SparkSchemaUtil.convert((StructType)actualDf.schema());
        List actual = spark.read().format("iceberg").load(this.loadLocation(tableIdentifier, "manifests")).select("partition_spec_id", new String[]{"path", "partition_summaries"}).collectAsList();
        table.refresh();
        GenericRecordBuilder builder = new GenericRecordBuilder(AvroSchemaUtil.convert((Type)projectedSchema.asStruct()));
        GenericRecordBuilder summaryBuilder = new GenericRecordBuilder(AvroSchemaUtil.convert((Types.StructType)projectedSchema.findType("partition_summaries.element").asStructType(), (String)"partition_summary"));
        List expected = Lists.transform((List)table.currentSnapshot().allManifests(table.io()), manifest -> builder.set("partition_spec_id", (Object)manifest.partitionSpecId()).set("path", (Object)manifest.path()).set("partition_summaries", (Object)Lists.transform((List)manifest.partitions(), partition -> summaryBuilder.set("contains_null", (Object)true).set("contains_nan", (Object)false).set("lower_bound", (Object)"1").set("upper_bound", (Object)"1").build())).build());
        ((ListAssert)Assertions.assertThat((List)actual).as("Manifests table should have one manifest row", new Object[0])).hasSize(1);
        TestHelpers.assertEqualsSafe(projectedSchema.asStruct(), (GenericData.Record)expected.get(0), (Row)actual.get(0));
    }

    @Test
    public void testAllManifestsTable() {
        TableIdentifier tableIdentifier = TableIdentifier.of((String[])new String[]{"db", "manifests_test"});
        Table table = this.createTable(tableIdentifier, SCHEMA, SPEC);
        Table manifestTable = this.loadTable(tableIdentifier, "all_manifests");
        Dataset df1 = spark.createDataFrame((List)Lists.newArrayList((Object[])new SimpleRecord[]{new SimpleRecord(1, "a")}), SimpleRecord.class);
        df1.select("id", new String[]{"data"}).write().format("iceberg").mode("append").save(this.loadLocation(tableIdentifier));
        table.updateProperties().set("format-version", "2").commit();
        DeleteFile deleteFile = this.writePosDeleteFile(table);
        table.newRowDelta().addDeletes(deleteFile).commit();
        table.newDelete().deleteFromRowFilter((Expression)Expressions.alwaysTrue()).commit();
        Stream<GenericData.Record> snapshotIdToManifests = StreamSupport.stream(table.snapshots().spliterator(), false).flatMap(snapshot -> snapshot.allManifests(table.io()).stream().map(manifest -> Pair.of((Object)snapshot.snapshotId(), (Object)manifest)));
        List actual = spark.read().format("iceberg").load(this.loadLocation(tableIdentifier, "all_manifests")).orderBy("path", new String[0]).collectAsList();
        table.refresh();
        List expected = snapshotIdToManifests.map(snapshotManifest -> this.manifestRecord(manifestTable, (Long)snapshotManifest.first(), (ManifestFile)snapshotManifest.second())).sorted(Comparator.comparing(o -> o.get("path").toString())).collect(Collectors.toList());
        ((ListAssert)Assertions.assertThat((List)actual).as("Manifests table should have 5 manifest rows", new Object[0])).hasSize(5);
        for (int i = 0; i < expected.size(); ++i) {
            TestHelpers.assertEqualsSafe(manifestTable.schema().asStruct(), (GenericData.Record)expected.get(i), (Row)actual.get(i));
        }
    }

    @Test
    public void testUnpartitionedPartitionsTable() {
        TableIdentifier tableIdentifier = TableIdentifier.of((String[])new String[]{"db", "unpartitioned_partitions_test"});
        Table table = this.createTable(tableIdentifier, SCHEMA, PartitionSpec.unpartitioned());
        Dataset df = spark.createDataFrame((List)Lists.newArrayList((Object[])new SimpleRecord[]{new SimpleRecord(1, "a")}), SimpleRecord.class);
        df.select("id", new String[]{"data"}).write().format("iceberg").mode("append").save(this.loadLocation(tableIdentifier));
        Types.StructType expectedSchema = Types.StructType.of((Types.NestedField[])new Types.NestedField[]{Types.NestedField.required((int)2, (String)"record_count", (Type)Types.LongType.get(), (String)"Count of records in data files"), Types.NestedField.required((int)3, (String)"file_count", (Type)Types.IntegerType.get(), (String)"Count of data files"), Types.NestedField.required((int)11, (String)"total_data_file_size_in_bytes", (Type)Types.LongType.get(), (String)"Total size in bytes of data files"), Types.NestedField.required((int)5, (String)"position_delete_record_count", (Type)Types.LongType.get(), (String)"Count of records in position delete files"), Types.NestedField.required((int)6, (String)"position_delete_file_count", (Type)Types.IntegerType.get(), (String)"Count of position delete files"), Types.NestedField.required((int)7, (String)"equality_delete_record_count", (Type)Types.LongType.get(), (String)"Count of records in equality delete files"), Types.NestedField.required((int)8, (String)"equality_delete_file_count", (Type)Types.IntegerType.get(), (String)"Count of equality delete files"), Types.NestedField.optional((int)9, (String)"last_updated_at", (Type)Types.TimestampType.withZone(), (String)"Commit time of snapshot that last updated this partition"), Types.NestedField.optional((int)10, (String)"last_updated_snapshot_id", (Type)Types.LongType.get(), (String)"Id of snapshot that last updated this partition")});
        Table partitionsTable = this.loadTable(tableIdentifier, "partitions");
        ((ObjectAssert)Assertions.assertThat((Object)expectedSchema).as("Schema should not have partition field", new Object[0])).isEqualTo((Object)partitionsTable.schema().asStruct());
        GenericRecordBuilder builder = new GenericRecordBuilder(AvroSchemaUtil.convert((Schema)partitionsTable.schema(), (String)"partitions"));
        GenericData.Record expectedRow = builder.set("last_updated_at", (Object)(table.currentSnapshot().timestampMillis() * 1000L)).set("last_updated_snapshot_id", (Object)table.currentSnapshot().snapshotId()).set("record_count", (Object)1L).set("file_count", (Object)1).set("total_data_file_size_in_bytes", (Object)this.totalSizeInBytes(table.currentSnapshot().addedDataFiles(table.io()))).set("position_delete_record_count", (Object)0L).set("position_delete_file_count", (Object)0).set("equality_delete_record_count", (Object)0L).set("equality_delete_file_count", (Object)0).build();
        List actual = spark.read().format("iceberg").load(this.loadLocation(tableIdentifier, "partitions")).collectAsList();
        ((ListAssert)Assertions.assertThat((List)actual).as("Unpartitioned partitions table should have one row", new Object[0])).hasSize(1);
        TestHelpers.assertEqualsSafe(expectedSchema, expectedRow, (Row)actual.get(0));
    }

    @Test
    public void testPartitionsTable() {
        TableIdentifier tableIdentifier = TableIdentifier.of((String[])new String[]{"db", "partitions_test"});
        Table table = this.createTable(tableIdentifier, SCHEMA, SPEC);
        Table partitionsTable = this.loadTable(tableIdentifier, "partitions");
        Dataset df1 = spark.createDataFrame((List)Lists.newArrayList((Object[])new SimpleRecord[]{new SimpleRecord(1, "a")}), SimpleRecord.class);
        Dataset df2 = spark.createDataFrame((List)Lists.newArrayList((Object[])new SimpleRecord[]{new SimpleRecord(2, "b")}), SimpleRecord.class);
        df1.select("id", new String[]{"data"}).write().format("iceberg").mode("append").save(this.loadLocation(tableIdentifier));
        table.refresh();
        long firstCommitId = table.currentSnapshot().snapshotId();
        df2.select("id", new String[]{"data"}).write().format("iceberg").mode("append").save(this.loadLocation(tableIdentifier));
        table.refresh();
        long secondCommitId = table.currentSnapshot().snapshotId();
        List actual = spark.read().format("iceberg").load(this.loadLocation(tableIdentifier, "partitions")).orderBy("partition.id", new String[0]).collectAsList();
        GenericRecordBuilder builder = new GenericRecordBuilder(AvroSchemaUtil.convert((Schema)partitionsTable.schema(), (String)"partitions"));
        GenericRecordBuilder partitionBuilder = new GenericRecordBuilder(AvroSchemaUtil.convert((Types.StructType)partitionsTable.schema().findType("partition").asStructType(), (String)"partition"));
        ArrayList expected = Lists.newArrayList();
        expected.add(builder.set("partition", (Object)partitionBuilder.set("id", (Object)1).build()).set("record_count", (Object)1L).set("file_count", (Object)1).set("total_data_file_size_in_bytes", (Object)this.totalSizeInBytes(table.snapshot(firstCommitId).addedDataFiles(table.io()))).set("position_delete_record_count", (Object)0L).set("position_delete_file_count", (Object)0).set("equality_delete_record_count", (Object)0L).set("equality_delete_file_count", (Object)0).set("spec_id", (Object)0).set("last_updated_at", (Object)(table.snapshot(firstCommitId).timestampMillis() * 1000L)).set("last_updated_snapshot_id", (Object)firstCommitId).build());
        expected.add(builder.set("partition", (Object)partitionBuilder.set("id", (Object)2).build()).set("record_count", (Object)1L).set("file_count", (Object)1).set("total_data_file_size_in_bytes", (Object)this.totalSizeInBytes(table.snapshot(secondCommitId).addedDataFiles(table.io()))).set("position_delete_record_count", (Object)0L).set("position_delete_file_count", (Object)0).set("equality_delete_record_count", (Object)0L).set("equality_delete_file_count", (Object)0).set("spec_id", (Object)0).set("last_updated_at", (Object)(table.snapshot(secondCommitId).timestampMillis() * 1000L)).set("last_updated_snapshot_id", (Object)secondCommitId).build());
        ((ListAssert)Assertions.assertThat((List)expected).as("Partitions table should have two rows", new Object[0])).hasSize(2);
        ((ListAssert)Assertions.assertThat((List)actual).as("Actual results should have two rows", new Object[0])).hasSize(2);
        for (int i = 0; i < 2; ++i) {
            TestHelpers.assertEqualsSafe(partitionsTable.schema().asStruct(), (GenericData.Record)expected.get(i), (Row)actual.get(i));
        }
        List actualAfterFirstCommit = spark.read().format("iceberg").option("snapshot-id", String.valueOf(firstCommitId)).load(this.loadLocation(tableIdentifier, "partitions")).orderBy("partition.id", new String[0]).collectAsList();
        ((ListAssert)Assertions.assertThat((List)actualAfterFirstCommit).as("Actual results should have one row", new Object[0])).hasSize(1);
        TestHelpers.assertEqualsSafe(partitionsTable.schema().asStruct(), (GenericData.Record)expected.get(0), (Row)actualAfterFirstCommit.get(0));
        List filtered = spark.read().format("iceberg").load(this.loadLocation(tableIdentifier, "partitions")).filter("partition.id < 2").collectAsList();
        ((ListAssert)Assertions.assertThat((List)filtered).as("Actual results should have one row", new Object[0])).hasSize(1);
        TestHelpers.assertEqualsSafe(partitionsTable.schema().asStruct(), (GenericData.Record)expected.get(0), (Row)filtered.get(0));
        List nonFiltered = spark.read().format("iceberg").load(this.loadLocation(tableIdentifier, "partitions")).filter("partition.id < 2 or record_count=1").collectAsList();
        ((ListAssert)Assertions.assertThat((List)nonFiltered).as("Actual results should have two rows", new Object[0])).hasSize(2);
        for (int i = 0; i < 2; ++i) {
            TestHelpers.assertEqualsSafe(partitionsTable.schema().asStruct(), (GenericData.Record)expected.get(i), (Row)actual.get(i));
        }
    }

    @Test
    public void testPartitionsTableLastUpdatedSnapshot() {
        TableIdentifier tableIdentifier = TableIdentifier.of((String[])new String[]{"db", "partitions_test"});
        Table table = this.createTable(tableIdentifier, SCHEMA, SPEC);
        Table partitionsTable = this.loadTable(tableIdentifier, "partitions");
        Dataset df1 = spark.createDataFrame((List)Lists.newArrayList((Object[])new SimpleRecord[]{new SimpleRecord(1, "1"), new SimpleRecord(2, "2")}), SimpleRecord.class);
        Dataset df2 = spark.createDataFrame((List)Lists.newArrayList((Object[])new SimpleRecord[]{new SimpleRecord(2, "20")}), SimpleRecord.class);
        df1.select("id", new String[]{"data"}).write().format("iceberg").mode("append").save(this.loadLocation(tableIdentifier));
        table.refresh();
        long firstCommitId = table.currentSnapshot().snapshotId();
        df2.select("id", new String[]{"data"}).write().format("iceberg").mode("append").save(this.loadLocation(tableIdentifier));
        table.refresh();
        long secondCommitId = table.currentSnapshot().snapshotId();
        RewriteManifests.Result rewriteManifestResult = SparkActions.get().rewriteManifests(table).execute();
        ((IterableAssert)Assertions.assertThat((Iterable)rewriteManifestResult.rewrittenManifests()).as("rewrite replaced 2 manifests", new Object[0])).hasSize(2);
        ((IterableAssert)Assertions.assertThat((Iterable)rewriteManifestResult.addedManifests()).as("rewrite added 1 manifests", new Object[0])).hasSize(1);
        List actual = spark.read().format("iceberg").load(this.loadLocation(tableIdentifier, "partitions")).orderBy("partition.id", new String[0]).collectAsList();
        GenericRecordBuilder builder = new GenericRecordBuilder(AvroSchemaUtil.convert((Schema)partitionsTable.schema(), (String)"partitions"));
        GenericRecordBuilder partitionBuilder = new GenericRecordBuilder(AvroSchemaUtil.convert((Types.StructType)partitionsTable.schema().findType("partition").asStructType(), (String)"partition"));
        List<DataFile> dataFiles = TestHelpers.dataFiles(table);
        this.assertDataFilePartitions(dataFiles, Arrays.asList(1, 2, 2));
        ArrayList expected = Lists.newArrayList();
        expected.add(builder.set("partition", (Object)partitionBuilder.set("id", (Object)1).build()).set("record_count", (Object)1L).set("file_count", (Object)1).set("total_data_file_size_in_bytes", (Object)dataFiles.get(0).fileSizeInBytes()).set("position_delete_record_count", (Object)0L).set("position_delete_file_count", (Object)0).set("equality_delete_record_count", (Object)0L).set("equality_delete_file_count", (Object)0).set("spec_id", (Object)0).set("last_updated_at", (Object)(table.snapshot(firstCommitId).timestampMillis() * 1000L)).set("last_updated_snapshot_id", (Object)firstCommitId).build());
        expected.add(builder.set("partition", (Object)partitionBuilder.set("id", (Object)2).build()).set("record_count", (Object)2L).set("file_count", (Object)2).set("total_data_file_size_in_bytes", (Object)(dataFiles.get(1).fileSizeInBytes() + dataFiles.get(2).fileSizeInBytes())).set("position_delete_record_count", (Object)0L).set("position_delete_file_count", (Object)0).set("equality_delete_record_count", (Object)0L).set("equality_delete_file_count", (Object)0).set("spec_id", (Object)0).set("last_updated_at", (Object)(table.snapshot(secondCommitId).timestampMillis() * 1000L)).set("last_updated_snapshot_id", (Object)secondCommitId).build());
        ((ListAssert)Assertions.assertThat((List)expected).as("Partitions table should have two rows", new Object[0])).hasSize(2);
        ((ListAssert)Assertions.assertThat((List)actual).as("Actual results should have two rows", new Object[0])).hasSize(2);
        for (int i = 0; i < 2; ++i) {
            TestHelpers.assertEqualsSafe(partitionsTable.schema().asStruct(), (GenericData.Record)expected.get(i), (Row)actual.get(i));
        }
        List filtered = spark.read().format("iceberg").load(this.loadLocation(tableIdentifier, "partitions")).filter("partition.id < 2").collectAsList();
        ((ListAssert)Assertions.assertThat((List)filtered).as("Actual results should have one row", new Object[0])).hasSize(1);
        TestHelpers.assertEqualsSafe(partitionsTable.schema().asStruct(), (GenericData.Record)expected.get(0), (Row)filtered.get(0));
        SparkActions.get().expireSnapshots(table).expireSnapshotId(firstCommitId).execute();
        GenericData.Record newPartitionRecord = builder.set("partition", (Object)partitionBuilder.set("id", (Object)1).build()).set("record_count", (Object)1L).set("file_count", (Object)1).set("total_data_file_size_in_bytes", (Object)dataFiles.get(0).fileSizeInBytes()).set("position_delete_record_count", (Object)0L).set("position_delete_file_count", (Object)0).set("equality_delete_record_count", (Object)0L).set("equality_delete_file_count", (Object)0).set("spec_id", (Object)0).set("last_updated_at", null).set("last_updated_snapshot_id", null).build();
        expected.remove(0);
        expected.add(0, newPartitionRecord);
        List actualAfterSnapshotExpiration = spark.read().format("iceberg").load(this.loadLocation(tableIdentifier, "partitions")).collectAsList();
        ((ListAssert)Assertions.assertThat((List)actualAfterSnapshotExpiration).as("Actual results should have two rows", new Object[0])).hasSize(2);
        for (int i = 0; i < 2; ++i) {
            TestHelpers.assertEqualsSafe(partitionsTable.schema().asStruct(), (GenericData.Record)expected.get(i), (Row)actualAfterSnapshotExpiration.get(i));
        }
    }

    @Test
    public void testPartitionsTableDeleteStats() {
        TableIdentifier tableIdentifier = TableIdentifier.of((String[])new String[]{"db", "partitions_test"});
        Table table = this.createTable(tableIdentifier, SCHEMA, SPEC);
        Table partitionsTable = this.loadTable(tableIdentifier, "partitions");
        Dataset df1 = spark.createDataFrame((List)Lists.newArrayList((Object[])new SimpleRecord[]{new SimpleRecord(1, "a"), new SimpleRecord(1, "b"), new SimpleRecord(1, "c")}), SimpleRecord.class);
        Dataset df2 = spark.createDataFrame((List)Lists.newArrayList((Object[])new SimpleRecord[]{new SimpleRecord(2, "d"), new SimpleRecord(2, "e"), new SimpleRecord(2, "f")}), SimpleRecord.class);
        df1.select("id", new String[]{"data"}).write().format("iceberg").mode("append").save(this.loadLocation(tableIdentifier));
        table.refresh();
        long firstCommitId = table.currentSnapshot().snapshotId();
        df2.select("id", new String[]{"data"}).write().format("iceberg").mode("append").save(this.loadLocation(tableIdentifier));
        table.updateProperties().set("format-version", "2").commit();
        DeleteFile deleteFile1 = this.writePosDeleteFile(table, 0L);
        DeleteFile deleteFile2 = this.writePosDeleteFile(table, 1L);
        table.newRowDelta().addDeletes(deleteFile1).addDeletes(deleteFile2).commit();
        table.refresh();
        long posDeleteCommitId = table.currentSnapshot().snapshotId();
        List actual = spark.read().format("iceberg").load(this.loadLocation(tableIdentifier, "partitions")).orderBy("partition.id", new String[0]).collectAsList();
        ((ListAssert)Assertions.assertThat((List)actual).as("Actual results should have two rows", new Object[0])).hasSize(2);
        GenericRecordBuilder builder = new GenericRecordBuilder(AvroSchemaUtil.convert((Schema)partitionsTable.schema(), (String)"partitions"));
        GenericRecordBuilder partitionBuilder = new GenericRecordBuilder(AvroSchemaUtil.convert((Types.StructType)partitionsTable.schema().findType("partition").asStructType(), (String)"partition"));
        ArrayList expected = Lists.newArrayList();
        expected.add(builder.set("partition", (Object)partitionBuilder.set("id", (Object)1).build()).set("record_count", (Object)3L).set("file_count", (Object)1).set("total_data_file_size_in_bytes", (Object)this.totalSizeInBytes(table.snapshot(firstCommitId).addedDataFiles(table.io()))).set("position_delete_record_count", (Object)0L).set("position_delete_file_count", (Object)0).set("equality_delete_record_count", (Object)0L).set("equality_delete_file_count", (Object)0).set("spec_id", (Object)0).set("last_updated_at", (Object)(table.snapshot(firstCommitId).timestampMillis() * 1000L)).set("last_updated_snapshot_id", (Object)firstCommitId).build());
        expected.add(builder.set("partition", (Object)partitionBuilder.set("id", (Object)2).build()).set("record_count", (Object)3L).set("file_count", (Object)1).set("total_data_file_size_in_bytes", (Object)this.totalSizeInBytes(table.snapshot(firstCommitId).addedDataFiles(table.io()))).set("position_delete_record_count", (Object)2L).set("position_delete_file_count", (Object)2).set("equality_delete_record_count", (Object)0L).set("equality_delete_file_count", (Object)0).set("spec_id", (Object)0).set("last_updated_at", (Object)(table.snapshot(posDeleteCommitId).timestampMillis() * 1000L)).set("last_updated_snapshot_id", (Object)posDeleteCommitId).build());
        for (int i = 0; i < 2; ++i) {
            TestHelpers.assertEqualsSafe(partitionsTable.schema().asStruct(), (GenericData.Record)expected.get(i), (Row)actual.get(i));
        }
        DeleteFile eqDeleteFile1 = this.writeEqDeleteFile(table, "d");
        DeleteFile eqDeleteFile2 = this.writeEqDeleteFile(table, "f");
        table.newRowDelta().addDeletes(eqDeleteFile1).addDeletes(eqDeleteFile2).commit();
        table.refresh();
        long eqDeleteCommitId = table.currentSnapshot().snapshotId();
        actual = spark.read().format("iceberg").load(this.loadLocation(tableIdentifier, "partitions")).orderBy("partition.id", new String[0]).collectAsList();
        ((ListAssert)Assertions.assertThat((List)actual).as("Actual results should have two rows", new Object[0])).hasSize(2);
        expected.remove(0);
        expected.add(0, builder.set("partition", (Object)partitionBuilder.set("id", (Object)1).build()).set("record_count", (Object)3L).set("file_count", (Object)1).set("position_delete_record_count", (Object)0L).set("position_delete_file_count", (Object)0).set("equality_delete_record_count", (Object)2L).set("equality_delete_file_count", (Object)2).set("last_updated_at", (Object)(table.snapshot(eqDeleteCommitId).timestampMillis() * 1000L)).set("last_updated_snapshot_id", (Object)eqDeleteCommitId).build());
        for (int i = 0; i < 2; ++i) {
            TestHelpers.assertEqualsSafe(partitionsTable.schema().asStruct(), (GenericData.Record)expected.get(i), (Row)actual.get(i));
        }
    }

    @Test
    public synchronized void testSnapshotReadAfterAddColumn() {
        TableIdentifier tableIdentifier = TableIdentifier.of((String[])new String[]{"db", "table"});
        Table table = this.createTable(tableIdentifier, SCHEMA, PartitionSpec.unpartitioned());
        ArrayList originalRecords = Lists.newArrayList((Object[])new Row[]{RowFactory.create((Object[])new Object[]{1, "x"}), RowFactory.create((Object[])new Object[]{2, "y"}), RowFactory.create((Object[])new Object[]{3, "z"})});
        StructType originalSparkSchema = SparkSchemaUtil.convert((Schema)SCHEMA);
        Dataset inputDf = spark.createDataFrame((List)originalRecords, originalSparkSchema);
        inputDf.select("id", new String[]{"data"}).write().format("iceberg").mode(SaveMode.Append).save(this.loadLocation(tableIdentifier));
        table.refresh();
        Dataset resultDf = spark.read().format("iceberg").load(this.loadLocation(tableIdentifier));
        ((ListAssert)Assertions.assertThat((List)originalRecords).as("Records should match", new Object[0])).isEqualTo((Object)resultDf.orderBy("id", new String[0]).collectAsList());
        Snapshot snapshotBeforeAddColumn = table.currentSnapshot();
        table.updateSchema().addColumn("category", (Type)Types.StringType.get()).commit();
        ArrayList newRecords = Lists.newArrayList((Object[])new Row[]{RowFactory.create((Object[])new Object[]{4, "xy", "B"}), RowFactory.create((Object[])new Object[]{5, "xyz", "C"})});
        StructType newSparkSchema = SparkSchemaUtil.convert((Schema)SCHEMA2);
        Dataset inputDf2 = spark.createDataFrame((List)newRecords, newSparkSchema);
        inputDf2.select("id", new String[]{"data", "category"}).write().format("iceberg").mode(SaveMode.Append).save(this.loadLocation(tableIdentifier));
        table.refresh();
        ArrayList updatedRecords = Lists.newArrayList((Object[])new Row[]{RowFactory.create((Object[])new Object[]{1, "x", null}), RowFactory.create((Object[])new Object[]{2, "y", null}), RowFactory.create((Object[])new Object[]{3, "z", null}), RowFactory.create((Object[])new Object[]{4, "xy", "B"}), RowFactory.create((Object[])new Object[]{5, "xyz", "C"})});
        Dataset resultDf2 = spark.read().format("iceberg").load(this.loadLocation(tableIdentifier));
        ((ListAssert)Assertions.assertThat((List)updatedRecords).as("Records should match", new Object[0])).isEqualTo((Object)resultDf2.orderBy("id", new String[0]).collectAsList());
        Dataset resultDf3 = spark.read().format("iceberg").option("snapshot-id", snapshotBeforeAddColumn.snapshotId()).load(this.loadLocation(tableIdentifier));
        ((ListAssert)Assertions.assertThat((List)originalRecords).as("Records should match", new Object[0])).isEqualTo((Object)resultDf3.orderBy("id", new String[0]).collectAsList());
        ((ObjectAssert)Assertions.assertThat((Object)resultDf3.schema()).as("Schemas should match", new Object[0])).isEqualTo((Object)originalSparkSchema);
    }

    @Test
    public synchronized void testSnapshotReadAfterDropColumn() {
        TableIdentifier tableIdentifier = TableIdentifier.of((String[])new String[]{"db", "table"});
        Table table = this.createTable(tableIdentifier, SCHEMA2, PartitionSpec.unpartitioned());
        ArrayList originalRecords = Lists.newArrayList((Object[])new Row[]{RowFactory.create((Object[])new Object[]{1, "x", "A"}), RowFactory.create((Object[])new Object[]{2, "y", "A"}), RowFactory.create((Object[])new Object[]{3, "z", "B"})});
        StructType originalSparkSchema = SparkSchemaUtil.convert((Schema)SCHEMA2);
        Dataset inputDf = spark.createDataFrame((List)originalRecords, originalSparkSchema);
        inputDf.select("id", new String[]{"data", "category"}).write().format("iceberg").mode(SaveMode.Append).save(this.loadLocation(tableIdentifier));
        table.refresh();
        Dataset resultDf = spark.read().format("iceberg").load(this.loadLocation(tableIdentifier));
        ((ListAssert)Assertions.assertThat((List)resultDf.orderBy("id", new String[0]).collectAsList()).as("Records should match", new Object[0])).isEqualTo((Object)originalRecords);
        long tsBeforeDropColumn = this.waitUntilAfter(System.currentTimeMillis());
        table.updateSchema().deleteColumn("data").commit();
        long tsAfterDropColumn = this.waitUntilAfter(System.currentTimeMillis());
        ArrayList newRecords = Lists.newArrayList((Object[])new Row[]{RowFactory.create((Object[])new Object[]{4, "B"}), RowFactory.create((Object[])new Object[]{5, "C"})});
        StructType newSparkSchema = SparkSchemaUtil.convert((Schema)SCHEMA3);
        Dataset inputDf2 = spark.createDataFrame((List)newRecords, newSparkSchema);
        inputDf2.select("id", new String[]{"category"}).write().format("iceberg").mode(SaveMode.Append).save(this.loadLocation(tableIdentifier));
        table.refresh();
        ArrayList updatedRecords = Lists.newArrayList((Object[])new Row[]{RowFactory.create((Object[])new Object[]{1, "A"}), RowFactory.create((Object[])new Object[]{2, "A"}), RowFactory.create((Object[])new Object[]{3, "B"}), RowFactory.create((Object[])new Object[]{4, "B"}), RowFactory.create((Object[])new Object[]{5, "C"})});
        Dataset resultDf2 = spark.read().format("iceberg").load(this.loadLocation(tableIdentifier));
        ((ListAssert)Assertions.assertThat((List)resultDf2.orderBy("id", new String[0]).collectAsList()).as("Records should match", new Object[0])).isEqualTo((Object)updatedRecords);
        Dataset resultDf3 = spark.read().format("iceberg").option("as-of-timestamp", tsBeforeDropColumn).load(this.loadLocation(tableIdentifier));
        ((ListAssert)Assertions.assertThat((List)resultDf3.orderBy("id", new String[0]).collectAsList()).as("Records should match", new Object[0])).isEqualTo((Object)originalRecords);
        ((ObjectAssert)Assertions.assertThat((Object)resultDf3.schema()).as("Schemas should match", new Object[0])).isEqualTo((Object)originalSparkSchema);
        Dataset resultDf4 = spark.read().format("iceberg").option("as-of-timestamp", tsAfterDropColumn).load(this.loadLocation(tableIdentifier));
        ((ListAssert)Assertions.assertThat((List)resultDf4.orderBy("id", new String[0]).collectAsList()).as("Records should match", new Object[0])).isEqualTo((Object)originalRecords);
        ((ObjectAssert)Assertions.assertThat((Object)resultDf4.schema()).as("Schemas should match", new Object[0])).isEqualTo((Object)originalSparkSchema);
    }

    @Test
    public synchronized void testSnapshotReadAfterAddAndDropColumn() {
        TableIdentifier tableIdentifier = TableIdentifier.of((String[])new String[]{"db", "table"});
        Table table = this.createTable(tableIdentifier, SCHEMA, PartitionSpec.unpartitioned());
        ArrayList originalRecords = Lists.newArrayList((Object[])new Row[]{RowFactory.create((Object[])new Object[]{1, "x"}), RowFactory.create((Object[])new Object[]{2, "y"}), RowFactory.create((Object[])new Object[]{3, "z"})});
        StructType originalSparkSchema = SparkSchemaUtil.convert((Schema)SCHEMA);
        Dataset inputDf = spark.createDataFrame((List)originalRecords, originalSparkSchema);
        inputDf.select("id", new String[]{"data"}).write().format("iceberg").mode(SaveMode.Append).save(this.loadLocation(tableIdentifier));
        table.refresh();
        Dataset resultDf = spark.read().format("iceberg").load(this.loadLocation(tableIdentifier));
        ((ListAssert)Assertions.assertThat((List)resultDf.orderBy("id", new String[0]).collectAsList()).as("Records should match", new Object[0])).isEqualTo((Object)originalRecords);
        Snapshot snapshotBeforeAddColumn = table.currentSnapshot();
        table.updateSchema().addColumn("category", (Type)Types.StringType.get()).commit();
        ArrayList newRecords = Lists.newArrayList((Object[])new Row[]{RowFactory.create((Object[])new Object[]{4, "xy", "B"}), RowFactory.create((Object[])new Object[]{5, "xyz", "C"})});
        StructType sparkSchemaAfterAddColumn = SparkSchemaUtil.convert((Schema)SCHEMA2);
        Dataset inputDf2 = spark.createDataFrame((List)newRecords, sparkSchemaAfterAddColumn);
        inputDf2.select("id", new String[]{"data", "category"}).write().format("iceberg").mode(SaveMode.Append).save(this.loadLocation(tableIdentifier));
        table.refresh();
        ArrayList updatedRecords = Lists.newArrayList((Object[])new Row[]{RowFactory.create((Object[])new Object[]{1, "x", null}), RowFactory.create((Object[])new Object[]{2, "y", null}), RowFactory.create((Object[])new Object[]{3, "z", null}), RowFactory.create((Object[])new Object[]{4, "xy", "B"}), RowFactory.create((Object[])new Object[]{5, "xyz", "C"})});
        Dataset resultDf2 = spark.read().format("iceberg").load(this.loadLocation(tableIdentifier));
        ((ListAssert)Assertions.assertThat((List)resultDf2.orderBy("id", new String[0]).collectAsList()).as("Records should match", new Object[0])).isEqualTo((Object)updatedRecords);
        table.updateSchema().deleteColumn("data").commit();
        ArrayList recordsAfterDropColumn = Lists.newArrayList((Object[])new Row[]{RowFactory.create((Object[])new Object[]{1, null}), RowFactory.create((Object[])new Object[]{2, null}), RowFactory.create((Object[])new Object[]{3, null}), RowFactory.create((Object[])new Object[]{4, "B"}), RowFactory.create((Object[])new Object[]{5, "C"})});
        Dataset resultDf3 = spark.read().format("iceberg").load(this.loadLocation(tableIdentifier));
        ((ListAssert)Assertions.assertThat((List)resultDf3.orderBy("id", new String[0]).collectAsList()).as("Records should match", new Object[0])).isEqualTo((Object)recordsAfterDropColumn);
        Dataset resultDf4 = spark.read().format("iceberg").option("snapshot-id", snapshotBeforeAddColumn.snapshotId()).load(this.loadLocation(tableIdentifier));
        ((ListAssert)Assertions.assertThat((List)resultDf4.orderBy("id", new String[0]).collectAsList()).as("Records should match", new Object[0])).isEqualTo((Object)originalRecords);
        ((ObjectAssert)Assertions.assertThat((Object)resultDf4.schema()).as("Schemas should match", new Object[0])).isEqualTo((Object)originalSparkSchema);
    }

    @Test
    public void testRemoveOrphanFilesActionSupport() throws InterruptedException {
        TableIdentifier tableIdentifier = TableIdentifier.of((String[])new String[]{"db", "table"});
        Table table = this.createTable(tableIdentifier, SCHEMA, PartitionSpec.unpartitioned());
        ArrayList records = Lists.newArrayList((Object[])new SimpleRecord[]{new SimpleRecord(1, "1")});
        Dataset df = spark.createDataFrame((List)records, SimpleRecord.class);
        df.select("id", new String[]{"data"}).write().format("iceberg").mode("append").save(this.loadLocation(tableIdentifier));
        df.write().mode("append").parquet(table.location() + "/data");
        Thread.sleep(1000L);
        SparkActions actions = SparkActions.get();
        DeleteOrphanFiles.Result result1 = actions.deleteOrphanFiles(table).location(table.location() + "/metadata").olderThan(System.currentTimeMillis()).execute();
        ((IterableAssert)Assertions.assertThat((Iterable)result1.orphanFileLocations()).as("Should not delete any metadata files", new Object[0])).isEmpty();
        DeleteOrphanFiles.Result result2 = actions.deleteOrphanFiles(table).olderThan(System.currentTimeMillis()).execute();
        ((IterableAssert)Assertions.assertThat((Iterable)result2.orphanFileLocations()).as("Should delete 1 data file", new Object[0])).hasSize(1);
        Dataset resultDF = spark.read().format("iceberg").load(this.loadLocation(tableIdentifier));
        List actualRecords = resultDF.as(Encoders.bean(SimpleRecord.class)).collectAsList();
        ((ListAssert)Assertions.assertThat((List)actualRecords).as("Rows must match", new Object[0])).isEqualTo((Object)records);
    }

    @Test
    public void testFilesTablePartitionId() {
        TableIdentifier tableIdentifier = TableIdentifier.of((String[])new String[]{"db", "files_test"});
        Table table = this.createTable(tableIdentifier, SCHEMA, PartitionSpec.builderFor((Schema)SCHEMA).identity("id").build());
        int spec0 = table.spec().specId();
        Dataset df1 = spark.createDataFrame((List)Lists.newArrayList((Object[])new SimpleRecord[]{new SimpleRecord(1, "a")}), SimpleRecord.class);
        Dataset df2 = spark.createDataFrame((List)Lists.newArrayList((Object[])new SimpleRecord[]{new SimpleRecord(2, "b")}), SimpleRecord.class);
        df1.select("id", new String[]{"data"}).write().format("iceberg").mode("append").save(this.loadLocation(tableIdentifier));
        table.refresh();
        table.updateSpec().removeField("id").commit();
        int spec1 = table.spec().specId();
        df2.select("id", new String[]{"data"}).write().format("iceberg").mode("append").save(this.loadLocation(tableIdentifier));
        List actual = spark.read().format("iceberg").load(this.loadLocation(tableIdentifier, "files")).sort(DataFile.SPEC_ID.name(), new String[0]).collectAsList().stream().map(r -> (Integer)r.getAs(DataFile.SPEC_ID.name())).collect(Collectors.toList());
        ((ListAssert)Assertions.assertThat(actual).as("Should have two partition specs", new Object[0])).isEqualTo((Object)ImmutableList.of((Object)spec0, (Object)spec1));
    }

    @Test
    public void testAllManifestTableSnapshotFiltering() {
        TableIdentifier tableIdentifier = TableIdentifier.of((String[])new String[]{"db", "all_manifest_snapshot_filtering"});
        Table table = this.createTable(tableIdentifier, SCHEMA, SPEC);
        Table manifestTable = this.loadTable(tableIdentifier, "all_manifests");
        Dataset df = spark.createDataFrame((List)Lists.newArrayList((Object[])new SimpleRecord[]{new SimpleRecord(1, "a")}), SimpleRecord.class);
        ArrayList snapshotIdToManifests = Lists.newArrayList();
        df.select("id", new String[]{"data"}).write().format("iceberg").mode("append").save(this.loadLocation(tableIdentifier));
        table.refresh();
        Snapshot snapshot1 = table.currentSnapshot();
        snapshotIdToManifests.addAll(snapshot1.allManifests(table.io()).stream().map(manifest -> Pair.of((Object)snapshot1.snapshotId(), (Object)manifest)).collect(Collectors.toList()));
        df.select("id", new String[]{"data"}).write().format("iceberg").mode("append").save(this.loadLocation(tableIdentifier));
        table.refresh();
        Snapshot snapshot2 = table.currentSnapshot();
        ((ListAssert)Assertions.assertThat((List)snapshot2.allManifests(table.io())).as("Should have two manifests", new Object[0])).hasSize(2);
        snapshotIdToManifests.addAll(snapshot2.allManifests(table.io()).stream().map(manifest -> Pair.of((Object)snapshot2.snapshotId(), (Object)manifest)).collect(Collectors.toList()));
        df.select("id", new String[]{"data"}).write().format("iceberg").mode("append").save(this.loadLocation(tableIdentifier));
        df.select("id", new String[]{"data"}).write().format("iceberg").mode("append").save(this.loadLocation(tableIdentifier));
        StringJoiner snapshotIds = new StringJoiner(",", "(", ")");
        snapshotIds.add(String.valueOf(snapshot1.snapshotId()));
        snapshotIds.add(String.valueOf(snapshot2.snapshotId()));
        snapshotIds.toString();
        List actual = spark.read().format("iceberg").load(this.loadLocation(tableIdentifier, "all_manifests")).filter("reference_snapshot_id in " + snapshotIds).orderBy("path", new String[0]).collectAsList();
        table.refresh();
        List expected = snapshotIdToManifests.stream().map(snapshotManifest -> this.manifestRecord(manifestTable, (Long)snapshotManifest.first(), (ManifestFile)snapshotManifest.second())).sorted(Comparator.comparing(o -> o.get("path").toString())).collect(Collectors.toList());
        ((ListAssert)Assertions.assertThat((List)actual).as("Manifests table should have 3 manifest rows", new Object[0])).hasSize(3);
        for (int i = 0; i < expected.size(); ++i) {
            TestHelpers.assertEqualsSafe(manifestTable.schema().asStruct(), (GenericData.Record)expected.get(i), (Row)actual.get(i));
        }
    }

    @Test
    public void testTableWithInt96Timestamp() throws IOException {
        File parquetTableDir = this.temp.resolve("table_timestamp_int96").toFile();
        String parquetTableLocation = parquetTableDir.toURI().toString();
        Schema schema = new Schema(new Types.NestedField[]{Types.NestedField.optional((int)1, (String)"id", (Type)Types.LongType.get()), Types.NestedField.optional((int)2, (String)"tmp_col", (Type)Types.TimestampType.withZone())});
        spark.conf().set(SQLConf.PARQUET_OUTPUT_TIMESTAMP_TYPE().key(), "INT96");
        LocalDateTime start = LocalDateTime.of(2000, 1, 31, 0, 0, 0);
        LocalDateTime end = LocalDateTime.of(2100, 1, 1, 0, 0, 0);
        long startSec = start.toEpochSecond(ZoneOffset.UTC);
        long endSec = end.toEpochSecond(ZoneOffset.UTC);
        Column idColumn = functions.expr((String)"id");
        Column secondsColumn = functions.expr((String)("(id % " + (endSec - startSec) + " + " + startSec + ")")).as("seconds");
        Column timestampColumn = functions.expr((String)"cast( seconds as timestamp) as tmp_col");
        for (Boolean useDict : new Boolean[]{true, false}) {
            for (Boolean useVectorization : new Boolean[]{true, false}) {
                spark.sql("DROP TABLE IF EXISTS parquet_table");
                spark.range(0L, 5000L, 100L, 1).select(new Column[]{idColumn, secondsColumn}).select(new Column[]{idColumn, timestampColumn}).write().format("parquet").option("parquet.enable.dictionary", useDict.booleanValue()).mode("overwrite").option("path", parquetTableLocation).saveAsTable("parquet_table");
                TableIdentifier tableIdentifier = TableIdentifier.of((String[])new String[]{"db", "table_with_timestamp_int96"});
                Table table = this.createTable(tableIdentifier, schema, PartitionSpec.unpartitioned());
                table.updateProperties().set("read.parquet.vectorization.enabled", useVectorization.toString()).commit();
                String stagingLocation = table.location() + "/metadata";
                SparkTableUtil.importSparkTable((SparkSession)spark, (org.apache.spark.sql.catalyst.TableIdentifier)new org.apache.spark.sql.catalyst.TableIdentifier("parquet_table"), (Table)table, (String)stagingLocation);
                this.testWithFilter("tmp_col < to_timestamp('2000-01-31 08:30:00')", tableIdentifier);
                this.testWithFilter("tmp_col <= to_timestamp('2000-01-31 08:30:00')", tableIdentifier);
                this.testWithFilter("tmp_col == to_timestamp('2000-01-31 08:30:00')", tableIdentifier);
                this.testWithFilter("tmp_col > to_timestamp('2000-01-31 08:30:00')", tableIdentifier);
                this.testWithFilter("tmp_col >= to_timestamp('2000-01-31 08:30:00')", tableIdentifier);
                this.dropTable(tableIdentifier);
            }
        }
    }

    private void testWithFilter(String filterExpr, TableIdentifier tableIdentifier) {
        List expected = spark.table("parquet_table").select("tmp_col", new String[0]).filter(filterExpr).collectAsList();
        List actual = spark.read().format("iceberg").load(this.loadLocation(tableIdentifier)).select("tmp_col", new String[0]).filter(filterExpr).collectAsList();
        ((ListAssert)Assertions.assertThat((List)actual).as("Rows must match", new Object[0])).containsExactlyInAnyOrderElementsOf((Iterable)expected);
    }

    @Test
    public void testSessionConfigSupport() {
        PartitionSpec spec = PartitionSpec.builderFor((Schema)SCHEMA).identity("id").build();
        TableIdentifier tableIdentifier = TableIdentifier.of((String[])new String[]{"db", "session_config_table"});
        Table table = this.createTable(tableIdentifier, SCHEMA, spec);
        ArrayList initialRecords = Lists.newArrayList((Object[])new SimpleRecord[]{new SimpleRecord(1, "a"), new SimpleRecord(2, "b"), new SimpleRecord(3, "c")});
        Dataset df = spark.createDataFrame((List)initialRecords, SimpleRecord.class);
        df.select("id", new String[]{"data"}).write().format("iceberg").mode(SaveMode.Append).save(this.loadLocation(tableIdentifier));
        long s1 = table.currentSnapshot().snapshotId();
        this.withSQLConf((Map<String, String>)ImmutableMap.of((Object)"spark.datasource.iceberg.snapshot-property.foo", (Object)"bar"), () -> df.select("id", new String[]{"data"}).write().format("iceberg").mode(SaveMode.Append).save(this.loadLocation(tableIdentifier)));
        table.refresh();
        Assertions.assertThat((Map)table.currentSnapshot().summary()).containsEntry((Object)"foo", (Object)"bar");
        this.withSQLConf((Map<String, String>)ImmutableMap.of((Object)"spark.datasource.iceberg.snapshot-id", (Object)String.valueOf(s1)), () -> {
            Dataset result = spark.read().format("iceberg").load(this.loadLocation(tableIdentifier));
            List actual = result.as(Encoders.bean(SimpleRecord.class)).collectAsList();
            ((ListAssert)Assertions.assertThat((List)actual).as("Rows must match", new Object[0])).containsExactlyInAnyOrderElementsOf((Iterable)initialRecords);
        });
    }

    private GenericData.Record manifestRecord(Table manifestTable, Long referenceSnapshotId, ManifestFile manifest) {
        GenericRecordBuilder builder = new GenericRecordBuilder(AvroSchemaUtil.convert((Schema)manifestTable.schema(), (String)"manifests"));
        GenericRecordBuilder summaryBuilder = new GenericRecordBuilder(AvroSchemaUtil.convert((Types.StructType)manifestTable.schema().findType("partition_summaries.element").asStructType(), (String)"partition_summary"));
        return builder.set("content", (Object)manifest.content().id()).set("path", (Object)manifest.path()).set("length", (Object)manifest.length()).set("partition_spec_id", (Object)manifest.partitionSpecId()).set("added_snapshot_id", (Object)manifest.snapshotId()).set("added_data_files_count", (Object)(manifest.content() == ManifestContent.DATA ? manifest.addedFilesCount() : 0)).set("existing_data_files_count", (Object)(manifest.content() == ManifestContent.DATA ? manifest.existingFilesCount() : 0)).set("deleted_data_files_count", (Object)(manifest.content() == ManifestContent.DATA ? manifest.deletedFilesCount() : 0)).set("added_delete_files_count", (Object)(manifest.content() == ManifestContent.DELETES ? manifest.addedFilesCount() : 0)).set("existing_delete_files_count", (Object)(manifest.content() == ManifestContent.DELETES ? manifest.existingFilesCount() : 0)).set("deleted_delete_files_count", (Object)(manifest.content() == ManifestContent.DELETES ? manifest.deletedFilesCount() : 0)).set("partition_summaries", (Object)Lists.transform((List)manifest.partitions(), partition -> summaryBuilder.set("contains_null", (Object)false).set("contains_nan", (Object)false).set("lower_bound", (Object)"1").set("upper_bound", (Object)"1").build())).set("reference_snapshot_id", (Object)referenceSnapshotId).build();
    }

    private PositionDeleteWriter<InternalRow> newPositionDeleteWriter(Table table, PartitionSpec spec, StructLike partition) {
        OutputFileFactory fileFactory = OutputFileFactory.builderFor((Table)table, (int)0, (long)0L).build();
        EncryptedOutputFile outputFile = fileFactory.newOutputFile(spec, partition);
        SparkFileWriterFactory fileWriterFactory = SparkFileWriterFactory.builderFor((Table)table).build();
        return fileWriterFactory.newPositionDeleteWriter(outputFile, spec, partition);
    }

    private DeleteFile writePositionDeletes(Table table, PartitionSpec spec, StructLike partition, Iterable<PositionDelete<InternalRow>> deletes) {
        PositionDeleteWriter<InternalRow> positionDeleteWriter = this.newPositionDeleteWriter(table, spec, partition);
        try (PositionDeleteWriter<InternalRow> writer = positionDeleteWriter;){
            for (PositionDelete<InternalRow> delete : deletes) {
                writer.write(delete);
            }
        }
        catch (IOException e) {
            throw new UncheckedIOException(e);
        }
        return positionDeleteWriter.toDeleteFile();
    }

    private DeleteFile writePosDeleteFile(Table table) {
        return this.writePosDeleteFile(table, 0L);
    }

    private DeleteFile writePosDeleteFile(Table table, long pos) {
        DataFile dataFile = (DataFile)Iterables.getFirst((Iterable)table.currentSnapshot().addedDataFiles(table.io()), null);
        PartitionSpec dataFileSpec = (PartitionSpec)table.specs().get(dataFile.specId());
        StructLike dataFilePartition = dataFile.partition();
        PositionDelete delete = PositionDelete.create();
        delete.set((CharSequence)dataFile.location(), pos, null);
        return this.writePositionDeletes(table, dataFileSpec, dataFilePartition, (Iterable<PositionDelete<InternalRow>>)ImmutableList.of((Object)delete));
    }

    private DeleteFile writeEqDeleteFile(Table table, String dataValue) {
        ArrayList deletes = Lists.newArrayList();
        Schema deleteRowSchema = SCHEMA.select(new String[]{"data"});
        GenericRecord delete = GenericRecord.create((Schema)deleteRowSchema);
        deletes.add(delete.copy("data", (Object)dataValue));
        try {
            return FileHelpers.writeDeleteFile((Table)table, (OutputFile)Files.localOutput((File)File.createTempFile("junit", null, this.temp.toFile())), (StructLike)TestHelpers.Row.of((Object[])new Object[]{1}), (List)deletes, (Schema)deleteRowSchema);
        }
        catch (IOException e) {
            throw new RuntimeException(e);
        }
    }

    private long totalSizeInBytes(Iterable<DataFile> dataFiles) {
        return Lists.newArrayList(dataFiles).stream().mapToLong(ContentFile::fileSizeInBytes).sum();
    }

    private void assertDataFilePartitions(List<DataFile> dataFiles, List<Integer> expectedPartitionIds) {
        ((ListAssert)Assertions.assertThat(dataFiles).as("Table should have " + expectedPartitionIds.size() + " data files", new Object[0])).hasSameSizeAs(expectedPartitionIds);
        for (int i = 0; i < dataFiles.size(); ++i) {
            ((AbstractIntegerAssert)Assertions.assertThat((int)((Integer)dataFiles.get(i).partition().get(0, Integer.class))).as("Data file should have partition of id " + expectedPartitionIds.get(i), new Object[0])).isEqualTo(expectedPartitionIds.get(i).intValue());
        }
    }
}

