/*
 * Decompiled with CFR 0.152.
 */
package org.apache.iceberg.spark.source;

import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.function.BiFunction;
import java.util.stream.Collectors;
import org.apache.iceberg.BatchScan;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.DeleteFile;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.Files;
import org.apache.iceberg.MetadataTableType;
import org.apache.iceberg.MetadataTableUtils;
import org.apache.iceberg.PartitionField;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Partitioning;
import org.apache.iceberg.PositionDeletesScanTask;
import org.apache.iceberg.ScanTask;
import org.apache.iceberg.Schema;
import org.apache.iceberg.StructLike;
import org.apache.iceberg.Table;
import org.apache.iceberg.TestHelpers;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.data.FileHelpers;
import org.apache.iceberg.data.GenericRecord;
import org.apache.iceberg.data.Record;
import org.apache.iceberg.deletes.PositionDelete;
import org.apache.iceberg.expressions.Expression;
import org.apache.iceberg.expressions.Expressions;
import org.apache.iceberg.expressions.UnboundPredicate;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.io.OutputFile;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.spark.PositionDeletesRewriteCoordinator;
import org.apache.iceberg.spark.ScanTaskSetManager;
import org.apache.iceberg.spark.SparkCatalogConfig;
import org.apache.iceberg.spark.SparkCatalogTestBase;
import org.apache.iceberg.spark.SparkStructLike;
import org.apache.iceberg.types.Type;
import org.apache.iceberg.types.TypeUtil;
import org.apache.iceberg.types.Types;
import org.apache.iceberg.util.Pair;
import org.apache.iceberg.util.StructLikeSet;
import org.apache.spark.sql.AnalysisException;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.catalyst.analysis.NoSuchTableException;
import org.apache.spark.sql.functions;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

@RunWith(value=Parameterized.class)
public class TestPositionDeletesTable
extends SparkCatalogTestBase {
    public static final Schema SCHEMA = new Schema(new Types.NestedField[]{Types.NestedField.required((int)1, (String)"id", (Type)Types.IntegerType.get()), Types.NestedField.required((int)2, (String)"data", (Type)Types.StringType.get())});
    private static final Map<String, String> CATALOG_PROPS = ImmutableMap.of((Object)"type", (Object)"hive", (Object)"default-namespace", (Object)"default", (Object)"cache-enabled", (Object)"false");
    private static final List<String> NON_PATH_COLS = ImmutableList.of((Object)"file_path", (Object)"pos", (Object)"row", (Object)"partition", (Object)"spec_id");
    private final FileFormat format;
    @Rule
    public TemporaryFolder temp = new TemporaryFolder();

    @Parameterized.Parameters(name="formatVersion = {0}, catalogName = {1}, implementation = {2}, config = {3}, fileFormat = {4}")
    public static Object[][] parameters() {
        return new Object[][]{{SparkCatalogConfig.HIVE.catalogName(), SparkCatalogConfig.HIVE.implementation(), CATALOG_PROPS, FileFormat.PARQUET}, {SparkCatalogConfig.HIVE.catalogName(), SparkCatalogConfig.HIVE.implementation(), CATALOG_PROPS, FileFormat.AVRO}, {SparkCatalogConfig.HIVE.catalogName(), SparkCatalogConfig.HIVE.implementation(), CATALOG_PROPS, FileFormat.ORC}};
    }

    public TestPositionDeletesTable(String catalogName, String implementation, Map<String, String> config, FileFormat format) {
        super(catalogName, implementation, config);
        this.format = format;
    }

    @Test
    public void testNullRows() throws IOException {
        String tableName = "null_rows";
        Table tab = this.createTable(tableName, SCHEMA, PartitionSpec.unpartitioned());
        DataFile dFile = this.dataFile(tab, new Object[0]);
        tab.newAppend().appendFile(dFile).commit();
        ArrayList deletes = Lists.newArrayList();
        deletes.add(Pair.of((Object)dFile.path(), (Object)0L));
        deletes.add(Pair.of((Object)dFile.path(), (Object)1L));
        Pair posDeletes = FileHelpers.writeDeleteFile((Table)tab, (OutputFile)Files.localOutput((File)this.temp.newFile()), (StructLike)TestHelpers.Row.of((Object[])new Object[]{0}), (List)deletes);
        tab.newRowDelta().addDeletes((DeleteFile)posDeletes.first()).commit();
        StructLikeSet actual = this.actual(tableName, tab);
        ArrayList expectedDeletes = Lists.newArrayList((Object[])new PositionDelete[]{this.positionDelete(dFile.path(), 0L), this.positionDelete(dFile.path(), 1L)});
        StructLikeSet expected = this.expected(tab, expectedDeletes, null, ((DeleteFile)posDeletes.first()).path().toString());
        Assert.assertEquals((String)"Position Delete table should contain expected rows", (Object)expected, (Object)actual);
        this.dropTable(tableName);
    }

    @Test
    public void testPartitionedTable() throws IOException {
        String tableName = "partitioned_table";
        PartitionSpec spec = PartitionSpec.builderFor((Schema)SCHEMA).identity("data").build();
        Table tab = this.createTable(tableName, SCHEMA, spec);
        DataFile dataFileA = this.dataFile(tab, "a");
        DataFile dataFileB = this.dataFile(tab, "b");
        tab.newAppend().appendFile(dataFileA).appendFile(dataFileB).commit();
        Pair<List<PositionDelete<?>>, DeleteFile> deletesA = this.deleteFile(tab, dataFileA, "a");
        Pair<List<PositionDelete<?>>, DeleteFile> deletesB = this.deleteFile(tab, dataFileB, "b");
        tab.newRowDelta().addDeletes((DeleteFile)deletesA.second()).addDeletes((DeleteFile)deletesB.second()).commit();
        StructLikeSet actual = this.actual(tableName, tab, "row.data='b'");
        GenericRecord partitionB = GenericRecord.create((Types.StructType)tab.spec().partitionType());
        partitionB.setField("data", (Object)"b");
        StructLikeSet expected = this.expected(tab, (List)deletesB.first(), (StructLike)partitionB, ((DeleteFile)deletesB.second()).path().toString());
        Assert.assertEquals((String)"Position Delete table should contain expected rows", (Object)expected, (Object)actual);
        this.dropTable(tableName);
    }

    @Test
    public void testSelect() throws IOException {
        String tableName = "select";
        PartitionSpec spec = PartitionSpec.builderFor((Schema)SCHEMA).identity("data").build();
        Table tab = this.createTable(tableName, SCHEMA, spec);
        DataFile dataFileA = this.dataFile(tab, "a");
        DataFile dataFileB = this.dataFile(tab, "b");
        tab.newAppend().appendFile(dataFileA).appendFile(dataFileB).commit();
        Pair<List<PositionDelete<?>>, DeleteFile> deletesA = this.deleteFile(tab, dataFileA, "a");
        Pair<List<PositionDelete<?>>, DeleteFile> deletesB = this.deleteFile(tab, dataFileB, "b");
        tab.newRowDelta().addDeletes((DeleteFile)deletesA.second()).addDeletes((DeleteFile)deletesB.second()).commit();
        Dataset df = spark.read().format("iceberg").load("default." + tableName + ".position_deletes").withColumn("input_file", functions.input_file_name()).select("row.id", new String[]{"pos", "delete_file_path", "input_file"});
        List<Object[]> actual = this.rowsToJava(df.collectAsList());
        ArrayList expected = Lists.newArrayList();
        BiFunction<PositionDelete, DeleteFile, Object[]> toRow = (delete, file) -> {
            int rowData = (Integer)((GenericRecord)delete.get(2, GenericRecord.class)).get(0, Integer.class);
            long pos = (Long)delete.get(1, Long.class);
            return this.row(rowData, pos, file.path().toString(), file.path().toString());
        };
        expected.addAll(((List)deletesA.first()).stream().map(d -> (Object[])toRow.apply((PositionDelete)d, (DeleteFile)deletesA.second())).collect(Collectors.toList()));
        expected.addAll(((List)deletesB.first()).stream().map(d -> (Object[])toRow.apply((PositionDelete)d, (DeleteFile)deletesB.second())).collect(Collectors.toList()));
        Comparator comp = (o1, o2) -> {
            int result = Integer.compare((Integer)o1[0], (Integer)o2[0]);
            if (result != 0) {
                return result;
            }
            return ((String)o1[2]).compareTo((String)o2[2]);
        };
        actual.sort(comp);
        expected.sort(comp);
        this.assertEquals("Position Delete table should contain expected rows", expected, actual);
        this.dropTable(tableName);
    }

    @Test
    public void testSplitTasks() throws IOException {
        String tableName = "big_table";
        Table tab = this.createTable(tableName, SCHEMA, PartitionSpec.unpartitioned());
        tab.updateProperties().set("read.split.target-size", "100").commit();
        int records = 500;
        GenericRecord record = GenericRecord.create((Schema)tab.schema());
        ArrayList dataRecords = Lists.newArrayList();
        for (int i = 0; i < records; ++i) {
            dataRecords.add(record.copy("id", (Object)i, "data", (Object)String.valueOf(i)));
        }
        DataFile dFile = FileHelpers.writeDataFile((Table)tab, (OutputFile)Files.localOutput((File)this.temp.newFile()), (StructLike)TestHelpers.Row.of((Object[])new Object[0]), (List)dataRecords);
        tab.newAppend().appendFile(dFile).commit();
        ArrayList deletes = Lists.newArrayList();
        for (long i = 0L; i < (long)records; ++i) {
            deletes.add(this.positionDelete(tab.schema(), dFile.path(), i, (int)i, String.valueOf(i)));
        }
        DeleteFile posDeletes = FileHelpers.writePosDeleteFile((Table)tab, (OutputFile)Files.localOutput((File)this.temp.newFile()), (StructLike)TestHelpers.Row.of((Object[])new Object[]{0}), (List)deletes);
        tab.newRowDelta().addDeletes(posDeletes).commit();
        Table deleteTable = MetadataTableUtils.createMetadataTableInstance((Table)tab, (MetadataTableType)MetadataTableType.POSITION_DELETES);
        if (this.format.equals((Object)FileFormat.AVRO)) {
            Assert.assertTrue((String)"Position delete scan should produce more than one split", (Iterables.size((Iterable)deleteTable.newBatchScan().planTasks()) > 1 ? 1 : 0) != 0);
        } else {
            Assert.assertEquals((String)"Position delete scan should produce one split", (long)1L, (long)Iterables.size((Iterable)deleteTable.newBatchScan().planTasks()));
        }
        StructLikeSet actual = this.actual(tableName, tab);
        StructLikeSet expected = this.expected(tab, deletes, null, posDeletes.path().toString());
        Assert.assertEquals((String)"Position Delete table should contain expected rows", (Object)expected, (Object)actual);
        this.dropTable(tableName);
    }

    @Test
    public void testPartitionFilter() throws IOException {
        String tableName = "partition_filter";
        PartitionSpec spec = PartitionSpec.builderFor((Schema)SCHEMA).identity("data").build();
        Table tab = this.createTable(tableName, SCHEMA, spec);
        Table deletesTab = MetadataTableUtils.createMetadataTableInstance((Table)tab, (MetadataTableType)MetadataTableType.POSITION_DELETES);
        DataFile dataFileA = this.dataFile(tab, "a");
        DataFile dataFileB = this.dataFile(tab, "b");
        tab.newAppend().appendFile(dataFileA).appendFile(dataFileB).commit();
        Pair<List<PositionDelete<?>>, DeleteFile> deletesA = this.deleteFile(tab, dataFileA, "a");
        Pair<List<PositionDelete<?>>, DeleteFile> deletesB = this.deleteFile(tab, dataFileA, "b");
        tab.newRowDelta().addDeletes((DeleteFile)deletesA.second()).addDeletes((DeleteFile)deletesB.second()).commit();
        GenericRecord partitionRecordTemplate = GenericRecord.create((Types.StructType)tab.spec().partitionType());
        Record partitionA = partitionRecordTemplate.copy("data", (Object)"a");
        Record partitionB = partitionRecordTemplate.copy("data", (Object)"b");
        StructLikeSet expectedA = this.expected(tab, (List)deletesA.first(), (StructLike)partitionA, ((DeleteFile)deletesA.second()).path().toString());
        StructLikeSet expectedB = this.expected(tab, (List)deletesB.first(), (StructLike)partitionB, ((DeleteFile)deletesB.second()).path().toString());
        StructLikeSet allExpected = StructLikeSet.create((Types.StructType)deletesTab.schema().asStruct());
        allExpected.addAll((Collection)expectedA);
        allExpected.addAll((Collection)expectedB);
        StructLikeSet actual = this.actual(tableName, tab);
        Assert.assertEquals((String)"Position Delete table should contain expected rows", (Object)allExpected, (Object)actual);
        StructLikeSet actual2 = this.actual(tableName, tab, "partition.data = 'a' AND pos >= 0");
        Assert.assertEquals((String)"Position Delete table should contain expected rows", (Object)expectedA, (Object)actual2);
        this.dropTable(tableName);
    }

    @Test
    public void testPartitionTransformFilter() throws IOException {
        String tableName = "partition_filter";
        PartitionSpec spec = PartitionSpec.builderFor((Schema)SCHEMA).truncate("data", 1).build();
        Table tab = this.createTable(tableName, SCHEMA, spec);
        Table deletesTable = MetadataTableUtils.createMetadataTableInstance((Table)tab, (MetadataTableType)MetadataTableType.POSITION_DELETES);
        DataFile dataFileA = this.dataFile(tab, new Object[]{"aa"}, new Object[]{"a"});
        DataFile dataFileB = this.dataFile(tab, new Object[]{"bb"}, new Object[]{"b"});
        tab.newAppend().appendFile(dataFileA).appendFile(dataFileB).commit();
        Pair<List<PositionDelete<?>>, DeleteFile> deletesA = this.deleteFile(tab, dataFileA, new Object[]{"aa"}, new Object[]{"a"});
        Pair<List<PositionDelete<?>>, DeleteFile> deletesB = this.deleteFile(tab, dataFileA, new Object[]{"bb"}, new Object[]{"b"});
        tab.newRowDelta().addDeletes((DeleteFile)deletesA.second()).addDeletes((DeleteFile)deletesB.second()).commit();
        GenericRecord partitionRecordTemplate = GenericRecord.create((Types.StructType)tab.spec().partitionType());
        Record partitionA = partitionRecordTemplate.copy("data_trunc", (Object)"a");
        Record partitionB = partitionRecordTemplate.copy("data_trunc", (Object)"b");
        StructLikeSet expectedA = this.expected(tab, (List)deletesA.first(), (StructLike)partitionA, ((DeleteFile)deletesA.second()).path().toString());
        StructLikeSet expectedB = this.expected(tab, (List)deletesB.first(), (StructLike)partitionB, ((DeleteFile)deletesB.second()).path().toString());
        StructLikeSet allExpected = StructLikeSet.create((Types.StructType)deletesTable.schema().asStruct());
        allExpected.addAll((Collection)expectedA);
        allExpected.addAll((Collection)expectedB);
        StructLikeSet actual = this.actual(tableName, tab);
        Assert.assertEquals((String)"Position Delete table should contain expected rows", (Object)allExpected, (Object)actual);
        StructLikeSet actual2 = this.actual(tableName, tab, "partition.data_trunc = 'a' AND pos >= 0");
        Assert.assertEquals((String)"Position Delete table should contain expected rows", (Object)expectedA, (Object)actual2);
        this.dropTable(tableName);
    }

    @Test
    public void testPartitionEvolutionReplace() throws Exception {
        String tableName = "partition_evolution";
        PartitionSpec originalSpec = PartitionSpec.builderFor((Schema)SCHEMA).identity("data").build();
        Table tab = this.createTable(tableName, SCHEMA, originalSpec);
        int dataSpec = tab.spec().specId();
        DataFile dataFileA = this.dataFile(tab, "a");
        DataFile dataFileB = this.dataFile(tab, "b");
        tab.newAppend().appendFile(dataFileA).appendFile(dataFileB).commit();
        Pair<List<PositionDelete<?>>, DeleteFile> deletesA = this.deleteFile(tab, dataFileA, "a");
        Pair<List<PositionDelete<?>>, DeleteFile> deletesB = this.deleteFile(tab, dataFileA, "b");
        tab.newRowDelta().addDeletes((DeleteFile)deletesA.second()).addDeletes((DeleteFile)deletesB.second()).commit();
        tab.updateSpec().removeField("data").addField("id").commit();
        DataFile dataFile10 = this.dataFile(tab, 10);
        DataFile dataFile99 = this.dataFile(tab, 99);
        tab.newAppend().appendFile(dataFile10).appendFile(dataFile99).commit();
        Pair<List<PositionDelete<?>>, DeleteFile> deletes10 = this.deleteFile(tab, dataFile10, 10);
        Pair<List<PositionDelete<?>>, DeleteFile> deletes99 = this.deleteFile(tab, dataFile10, 99);
        tab.newRowDelta().addDeletes((DeleteFile)deletes10.second()).addDeletes((DeleteFile)deletes99.second()).commit();
        GenericRecord partitionRecordTemplate = GenericRecord.create((Types.StructType)Partitioning.partitionType((Table)tab));
        Record partitionA = partitionRecordTemplate.copy("data", (Object)"a");
        StructLikeSet expectedA = this.expected(tab, (List)deletesA.first(), (StructLike)partitionA, dataSpec, ((DeleteFile)deletesA.second()).path().toString());
        StructLikeSet actualA = this.actual(tableName, tab, "partition.data = 'a' AND pos >= 0");
        Assert.assertEquals((String)"Position Delete table should contain expected rows", (Object)expectedA, (Object)actualA);
        Record partition10 = partitionRecordTemplate.copy("id", (Object)10);
        StructLikeSet expected10 = this.expected(tab, (List)deletes10.first(), (StructLike)partition10, tab.spec().specId(), ((DeleteFile)deletes10.second()).path().toString());
        StructLikeSet actual10 = this.actual(tableName, tab, "partition.id = 10 AND pos >= 0");
        Assert.assertEquals((String)"Position Delete table should contain expected rows", (Object)expected10, (Object)actual10);
        this.dropTable(tableName);
    }

    @Test
    public void testPartitionEvolutionAdd() throws Exception {
        String tableName = "partition_evolution_add";
        Table tab = this.createTable(tableName, SCHEMA, PartitionSpec.unpartitioned());
        int specId0 = tab.spec().specId();
        DataFile dataFileUnpartitioned = this.dataFile(tab, new Object[0]);
        tab.newAppend().appendFile(dataFileUnpartitioned).commit();
        Pair<List<PositionDelete<?>>, DeleteFile> deletesUnpartitioned = this.deleteFile(tab, dataFileUnpartitioned, new Object[0]);
        tab.newRowDelta().addDeletes((DeleteFile)deletesUnpartitioned.second()).commit();
        tab.updateSpec().addField("data").commit();
        int specId1 = tab.spec().specId();
        DataFile dataFileA = this.dataFile(tab, "a");
        DataFile dataFileB = this.dataFile(tab, "b");
        tab.newAppend().appendFile(dataFileA).appendFile(dataFileB).commit();
        Pair<List<PositionDelete<?>>, DeleteFile> deletesA = this.deleteFile(tab, dataFileA, "a");
        Pair<List<PositionDelete<?>>, DeleteFile> deletesB = this.deleteFile(tab, dataFileB, "b");
        tab.newRowDelta().addDeletes((DeleteFile)deletesA.second()).addDeletes((DeleteFile)deletesB.second()).commit();
        GenericRecord partitionRecordTemplate = GenericRecord.create((Types.StructType)Partitioning.partitionType((Table)tab));
        Record partitionA = partitionRecordTemplate.copy("data", (Object)"a");
        StructLikeSet expectedA = this.expected(tab, (List)deletesA.first(), (StructLike)partitionA, specId1, ((DeleteFile)deletesA.second()).path().toString());
        StructLikeSet actualA = this.actual(tableName, tab, "partition.data = 'a' AND pos >= 0");
        Assert.assertEquals((String)"Position Delete table should contain expected rows", (Object)expectedA, (Object)actualA);
        Record unpartitionedRecord = partitionRecordTemplate.copy("data", null);
        StructLikeSet expectedUnpartitioned = this.expected(tab, (List)deletesUnpartitioned.first(), (StructLike)unpartitionedRecord, specId0, ((DeleteFile)deletesUnpartitioned.second()).path().toString());
        StructLikeSet actualUnpartitioned = this.actual(tableName, tab, "partition.data IS NULL and pos >= 0");
        Assert.assertEquals((String)"Position Delete table should contain expected rows", (Object)expectedUnpartitioned, (Object)actualUnpartitioned);
        this.dropTable(tableName);
    }

    @Test
    public void testPartitionEvolutionRemove() throws Exception {
        String tableName = "partition_evolution_remove";
        PartitionSpec originalSpec = PartitionSpec.builderFor((Schema)SCHEMA).identity("data").build();
        Table tab = this.createTable(tableName, SCHEMA, originalSpec);
        int specId0 = tab.spec().specId();
        DataFile dataFileA = this.dataFile(tab, "a");
        DataFile dataFileB = this.dataFile(tab, "b");
        tab.newAppend().appendFile(dataFileA).appendFile(dataFileB).commit();
        Pair<List<PositionDelete<?>>, DeleteFile> deletesA = this.deleteFile(tab, dataFileA, "a");
        Pair<List<PositionDelete<?>>, DeleteFile> deletesB = this.deleteFile(tab, dataFileB, "b");
        tab.newRowDelta().addDeletes((DeleteFile)deletesA.second()).addDeletes((DeleteFile)deletesB.second()).commit();
        tab.updateSpec().removeField("data").commit();
        int specId1 = tab.spec().specId();
        DataFile dataFileUnpartitioned = this.dataFile(tab, new Object[0]);
        tab.newAppend().appendFile(dataFileUnpartitioned).commit();
        Pair<List<PositionDelete<?>>, DeleteFile> deletesUnpartitioned = this.deleteFile(tab, dataFileUnpartitioned, new Object[0]);
        tab.newRowDelta().addDeletes((DeleteFile)deletesUnpartitioned.second()).commit();
        GenericRecord partitionRecordTemplate = GenericRecord.create((Types.StructType)Partitioning.partitionType((Table)tab));
        Record partitionA = partitionRecordTemplate.copy("data", (Object)"a");
        StructLikeSet expectedA = this.expected(tab, (List)deletesA.first(), (StructLike)partitionA, specId0, ((DeleteFile)deletesA.second()).path().toString());
        StructLikeSet actualA = this.actual(tableName, tab, "partition.data = 'a' AND pos >= 0");
        Assert.assertEquals((String)"Position Delete table should contain expected rows", (Object)expectedA, (Object)actualA);
        Record unpartitionedRecord = partitionRecordTemplate.copy("data", null);
        StructLikeSet expectedUnpartitioned = this.expected(tab, (List)deletesUnpartitioned.first(), (StructLike)unpartitionedRecord, specId1, ((DeleteFile)deletesUnpartitioned.second()).path().toString());
        StructLikeSet actualUnpartitioned = this.actual(tableName, tab, "partition.data IS NULL and pos >= 0");
        Assert.assertEquals((String)"Position Delete table should contain expected rows", (Object)expectedUnpartitioned, (Object)actualUnpartitioned);
        this.dropTable(tableName);
    }

    @Test
    public void testSpecIdFilter() throws Exception {
        String tableName = "spec_id_filter";
        Table tab = this.createTable(tableName, SCHEMA, PartitionSpec.unpartitioned());
        int unpartitionedSpec = tab.spec().specId();
        DataFile dataFileUnpartitioned = this.dataFile(tab, new Object[0]);
        tab.newAppend().appendFile(dataFileUnpartitioned).commit();
        Pair<List<PositionDelete<?>>, DeleteFile> deletesUnpartitioned = this.deleteFile(tab, dataFileUnpartitioned, new Object[0]);
        tab.newRowDelta().addDeletes((DeleteFile)deletesUnpartitioned.second()).commit();
        tab.updateSpec().addField("data").commit();
        int dataSpec = tab.spec().specId();
        DataFile dataFileA = this.dataFile(tab, "a");
        DataFile dataFileB = this.dataFile(tab, "b");
        tab.newAppend().appendFile(dataFileA).appendFile(dataFileB).commit();
        Pair<List<PositionDelete<?>>, DeleteFile> deletesA = this.deleteFile(tab, dataFileA, "a");
        Pair<List<PositionDelete<?>>, DeleteFile> deletesB = this.deleteFile(tab, dataFileB, "b");
        tab.newRowDelta().addDeletes((DeleteFile)deletesA.second()).addDeletes((DeleteFile)deletesB.second()).commit();
        GenericRecord partitionRecordTemplate = GenericRecord.create((Types.StructType)Partitioning.partitionType((Table)tab));
        StructLikeSet expectedUnpartitioned = this.expected(tab, (List)deletesUnpartitioned.first(), (StructLike)partitionRecordTemplate, unpartitionedSpec, ((DeleteFile)deletesUnpartitioned.second()).path().toString());
        StructLikeSet actualUnpartitioned = this.actual(tableName, tab, String.format("spec_id = %d", unpartitionedSpec));
        Assert.assertEquals((String)"Position Delete table should contain expected rows", (Object)expectedUnpartitioned, (Object)actualUnpartitioned);
        Record partitionA = partitionRecordTemplate.copy("data", (Object)"a");
        Record partitionB = partitionRecordTemplate.copy("data", (Object)"b");
        StructLikeSet expected = this.expected(tab, (List)deletesA.first(), (StructLike)partitionA, dataSpec, ((DeleteFile)deletesA.second()).path().toString());
        expected.addAll((Collection)this.expected(tab, (List)deletesB.first(), (StructLike)partitionB, dataSpec, ((DeleteFile)deletesB.second()).path().toString()));
        StructLikeSet actual = this.actual(tableName, tab, String.format("spec_id = %d", dataSpec));
        Assert.assertEquals((String)"Position Delete table should contain expected rows", (Object)expected, (Object)actual);
        this.dropTable(tableName);
    }

    @Test
    public void testSchemaEvolutionAdd() throws Exception {
        String tableName = "schema_evolution_add";
        PartitionSpec spec = PartitionSpec.builderFor((Schema)SCHEMA).identity("data").build();
        Table tab = this.createTable(tableName, SCHEMA, spec);
        DataFile dataFileA = this.dataFile(tab, "a");
        DataFile dataFileB = this.dataFile(tab, "b");
        tab.newAppend().appendFile(dataFileA).appendFile(dataFileB).commit();
        Pair<List<PositionDelete<?>>, DeleteFile> deletesA = this.deleteFile(tab, dataFileA, "a");
        Pair<List<PositionDelete<?>>, DeleteFile> deletesB = this.deleteFile(tab, dataFileB, "b");
        tab.newRowDelta().addDeletes((DeleteFile)deletesA.second()).addDeletes((DeleteFile)deletesB.second()).commit();
        tab.updateSchema().addColumn("new_col_1", (Type)Types.IntegerType.get()).addColumn("new_col_2", (Type)Types.IntegerType.get()).commit();
        DataFile dataFileC = this.dataFile(tab, "c");
        DataFile dataFileD = this.dataFile(tab, "d");
        tab.newAppend().appendFile(dataFileA).appendFile(dataFileB).commit();
        Pair<List<PositionDelete<?>>, DeleteFile> deletesC = this.deleteFile(tab, dataFileC, "c");
        Pair<List<PositionDelete<?>>, DeleteFile> deletesD = this.deleteFile(tab, dataFileD, "d");
        tab.newRowDelta().addDeletes((DeleteFile)deletesC.second()).addDeletes((DeleteFile)deletesD.second()).commit();
        GenericRecord partitionRecordTemplate = GenericRecord.create((Types.StructType)Partitioning.partitionType((Table)tab));
        Record partitionA = partitionRecordTemplate.copy("data", (Object)"a");
        List expectedDeletesA = (List)deletesA.first();
        expectedDeletesA.forEach(d -> {
            GenericRecord nested = (GenericRecord)d.get(2, GenericRecord.class);
            GenericRecord padded = GenericRecord.create((Types.StructType)tab.schema().asStruct());
            padded.set(0, nested.get(0));
            padded.set(1, nested.get(1));
            padded.set(2, null);
            padded.set(3, null);
            d.set(2, (Object)padded);
        });
        StructLikeSet expectedA = this.expected(tab, expectedDeletesA, (StructLike)partitionA, ((DeleteFile)deletesA.second()).path().toString());
        StructLikeSet actualA = this.actual(tableName, tab, "partition.data = 'a' AND pos >= 0");
        Assert.assertEquals((String)"Position Delete table should contain expected rows", (Object)expectedA, (Object)actualA);
        Record partitionC = partitionRecordTemplate.copy("data", (Object)"c");
        StructLikeSet expectedC = this.expected(tab, (List)deletesC.first(), (StructLike)partitionC, ((DeleteFile)deletesC.second()).path().toString());
        StructLikeSet actualC = this.actual(tableName, tab, "partition.data = 'c' and pos >= 0");
        Assert.assertEquals((String)"Position Delete table should contain expected rows", (Object)expectedC, (Object)actualC);
        this.dropTable(tableName);
    }

    @Test
    public void testSchemaEvolutionRemove() throws Exception {
        String tableName = "schema_evolution_remove";
        Schema oldSchema = new Schema(new Types.NestedField[]{Types.NestedField.required((int)1, (String)"id", (Type)Types.IntegerType.get()), Types.NestedField.required((int)2, (String)"data", (Type)Types.StringType.get()), Types.NestedField.optional((int)3, (String)"new_col_1", (Type)Types.IntegerType.get()), Types.NestedField.optional((int)4, (String)"new_col_2", (Type)Types.IntegerType.get())});
        PartitionSpec spec = PartitionSpec.builderFor((Schema)oldSchema).identity("data").build();
        Table tab = this.createTable(tableName, oldSchema, spec);
        DataFile dataFileA = this.dataFile(tab, "a");
        DataFile dataFileB = this.dataFile(tab, "b");
        tab.newAppend().appendFile(dataFileA).appendFile(dataFileB).commit();
        Pair<List<PositionDelete<?>>, DeleteFile> deletesA = this.deleteFile(tab, dataFileA, "a");
        Pair<List<PositionDelete<?>>, DeleteFile> deletesB = this.deleteFile(tab, dataFileB, "b");
        tab.newRowDelta().addDeletes((DeleteFile)deletesA.second()).addDeletes((DeleteFile)deletesB.second()).commit();
        tab.updateSchema().deleteColumn("new_col_1").deleteColumn("new_col_2").commit();
        DataFile dataFileC = this.dataFile(tab, "c");
        DataFile dataFileD = this.dataFile(tab, "d");
        tab.newAppend().appendFile(dataFileA).appendFile(dataFileB).commit();
        Pair<List<PositionDelete<?>>, DeleteFile> deletesC = this.deleteFile(tab, dataFileC, "c");
        Pair<List<PositionDelete<?>>, DeleteFile> deletesD = this.deleteFile(tab, dataFileD, "d");
        tab.newRowDelta().addDeletes((DeleteFile)deletesC.second()).addDeletes((DeleteFile)deletesD.second()).commit();
        GenericRecord partitionRecordTemplate = GenericRecord.create((Types.StructType)Partitioning.partitionType((Table)tab));
        Record partitionA = partitionRecordTemplate.copy("data", (Object)"a");
        List expectedDeletesA = (List)deletesA.first();
        expectedDeletesA.forEach(d -> {
            GenericRecord nested = (GenericRecord)d.get(2, GenericRecord.class);
            GenericRecord padded = GenericRecord.create((Types.StructType)tab.schema().asStruct());
            padded.set(0, nested.get(0));
            padded.set(1, nested.get(1));
            d.set(2, (Object)padded);
        });
        StructLikeSet expectedA = this.expected(tab, expectedDeletesA, (StructLike)partitionA, ((DeleteFile)deletesA.second()).path().toString());
        StructLikeSet actualA = this.actual(tableName, tab, "partition.data = 'a' AND pos >= 0");
        Assert.assertEquals((String)"Position Delete table should contain expected rows", (Object)expectedA, (Object)actualA);
        Record partitionC = partitionRecordTemplate.copy("data", (Object)"c");
        StructLikeSet expectedC = this.expected(tab, (List)deletesC.first(), (StructLike)partitionC, ((DeleteFile)deletesC.second()).path().toString());
        StructLikeSet actualC = this.actual(tableName, tab, "partition.data = 'c' and pos >= 0");
        Assert.assertEquals((String)"Position Delete table should contain expected rows", (Object)expectedC, (Object)actualC);
        this.dropTable(tableName);
    }

    @Test
    public void testWrite() throws IOException, NoSuchTableException {
        String tableName = "test_write";
        PartitionSpec spec = PartitionSpec.builderFor((Schema)SCHEMA).identity("data").build();
        Table tab = this.createTable(tableName, SCHEMA, spec);
        DataFile dataFileA = this.dataFile(tab, "a");
        DataFile dataFileB = this.dataFile(tab, "b");
        tab.newAppend().appendFile(dataFileA).appendFile(dataFileB).commit();
        Pair<List<PositionDelete<?>>, DeleteFile> deletesA = this.deleteFile(tab, dataFileA, "a");
        Pair<List<PositionDelete<?>>, DeleteFile> deletesB = this.deleteFile(tab, dataFileB, "b");
        tab.newRowDelta().addDeletes((DeleteFile)deletesA.second()).addDeletes((DeleteFile)deletesB.second()).commit();
        Table posDeletesTable = MetadataTableUtils.createMetadataTableInstance((Table)tab, (MetadataTableType)MetadataTableType.POSITION_DELETES);
        String posDeletesTableName = this.catalogName + ".default." + tableName + ".position_deletes";
        for (String partValue : ImmutableList.of((Object)"a", (Object)"b")) {
            CloseableIterable<ScanTask> tasks = this.tasks(posDeletesTable, "data", partValue);
            try {
                String fileSetID = UUID.randomUUID().toString();
                this.stageTask(tab, fileSetID, tasks);
                Dataset scanDF = spark.read().format("iceberg").option("scan-task-set-id", fileSetID).option("file-open-cost", Integer.MAX_VALUE).load(posDeletesTableName);
                Assert.assertEquals((long)1L, (long)scanDF.javaRDD().getNumPartitions());
                scanDF.writeTo(posDeletesTableName).option("rewritten-file-scan-task-set-id", fileSetID).append();
                this.commit(tab, posDeletesTable, fileSetID, 1);
            }
            finally {
                if (tasks == null) continue;
                tasks.close();
            }
        }
        GenericRecord partitionRecordTemplate = GenericRecord.create((Types.StructType)tab.spec().partitionType());
        Record partitionA = partitionRecordTemplate.copy("data", (Object)"a");
        Record partitionB = partitionRecordTemplate.copy("data", (Object)"b");
        StructLikeSet expectedA = this.expected(tab, (List)deletesA.first(), (StructLike)partitionA, null);
        StructLikeSet expectedB = this.expected(tab, (List)deletesB.first(), (StructLike)partitionB, null);
        StructLikeSet allExpected = StructLikeSet.create((Types.StructType)TypeUtil.selectNot((Schema)posDeletesTable.schema(), (Set)ImmutableSet.of((Object)0x7FFFFFFE)).asStruct());
        allExpected.addAll((Collection)expectedA);
        allExpected.addAll((Collection)expectedB);
        StructLikeSet actual = this.actual(tableName, tab, null, NON_PATH_COLS);
        Assert.assertEquals((String)"Position Delete table should contain expected rows", (Object)allExpected, (Object)actual);
        this.dropTable(tableName);
    }

    @Test
    public void testWriteUnpartitionedNullRows() throws Exception {
        String tableName = "write_null_rows";
        Table tab = this.createTable(tableName, SCHEMA, PartitionSpec.unpartitioned());
        DataFile dFile = this.dataFile(tab, new Object[0]);
        tab.newAppend().appendFile(dFile).commit();
        ArrayList deletes = Lists.newArrayList();
        deletes.add(Pair.of((Object)dFile.path(), (Object)0L));
        deletes.add(Pair.of((Object)dFile.path(), (Object)1L));
        Pair posDeletes = FileHelpers.writeDeleteFile((Table)tab, (OutputFile)Files.localOutput((File)this.temp.newFile()), (StructLike)TestHelpers.Row.of((Object[])new Object[]{0}), (List)deletes);
        tab.newRowDelta().addDeletes((DeleteFile)posDeletes.first()).commit();
        Table posDeletesTable = MetadataTableUtils.createMetadataTableInstance((Table)tab, (MetadataTableType)MetadataTableType.POSITION_DELETES);
        String posDeletesTableName = this.catalogName + ".default." + tableName + ".position_deletes";
        try (CloseableIterable tasks = posDeletesTable.newBatchScan().planFiles();){
            String fileSetID = UUID.randomUUID().toString();
            this.stageTask(tab, fileSetID, tasks);
            Dataset scanDF = spark.read().format("iceberg").option("scan-task-set-id", fileSetID).option("file-open-cost", Integer.MAX_VALUE).load(posDeletesTableName);
            Assert.assertEquals((long)1L, (long)scanDF.javaRDD().getNumPartitions());
            scanDF.writeTo(posDeletesTableName).option("rewritten-file-scan-task-set-id", fileSetID).append();
            this.commit(tab, posDeletesTable, fileSetID, 1);
        }
        StructLikeSet actual = this.actual(tableName, tab, null, (List<String>)ImmutableList.of((Object)"file_path", (Object)"pos", (Object)"row", (Object)"spec_id"));
        ArrayList expectedDeletes = Lists.newArrayList((Object[])new PositionDelete[]{this.positionDelete(dFile.path(), 0L), this.positionDelete(dFile.path(), 1L)});
        StructLikeSet expected = this.expected(tab, expectedDeletes, null, null);
        Assert.assertEquals((String)"Position Delete table should contain expected rows", (Object)expected, (Object)actual);
        this.dropTable(tableName);
    }

    @Test
    public void testWriteMixedRows() throws Exception {
        String tableName = "write_mixed_rows";
        PartitionSpec spec = PartitionSpec.builderFor((Schema)SCHEMA).identity("data").build();
        Table tab = this.createTable(tableName, SCHEMA, spec);
        DataFile dataFileA = this.dataFile(tab, "a");
        DataFile dataFileB = this.dataFile(tab, "b");
        tab.newAppend().appendFile(dataFileA).appendFile(dataFileB).commit();
        ArrayList deletes = Lists.newArrayList();
        deletes.add(Pair.of((Object)dataFileA.path(), (Object)0L));
        deletes.add(Pair.of((Object)dataFileA.path(), (Object)1L));
        Pair deletesWithoutRow = FileHelpers.writeDeleteFile((Table)tab, (OutputFile)Files.localOutput((File)this.temp.newFile()), (StructLike)TestHelpers.Row.of((Object[])new Object[]{"a"}), (List)deletes);
        Pair<List<PositionDelete<?>>, DeleteFile> deletesWithRow = this.deleteFile(tab, dataFileB, "b");
        tab.newRowDelta().addDeletes((DeleteFile)deletesWithoutRow.first()).addDeletes((DeleteFile)deletesWithRow.second()).commit();
        Table posDeletesTable = MetadataTableUtils.createMetadataTableInstance((Table)tab, (MetadataTableType)MetadataTableType.POSITION_DELETES);
        String posDeletesTableName = this.catalogName + ".default." + tableName + ".position_deletes";
        for (String partValue : ImmutableList.of((Object)"a", (Object)"b")) {
            CloseableIterable<ScanTask> tasks = this.tasks(posDeletesTable, "data", partValue);
            try {
                String fileSetID = UUID.randomUUID().toString();
                this.stageTask(tab, fileSetID, tasks);
                Dataset scanDF = spark.read().format("iceberg").option("scan-task-set-id", fileSetID).load(posDeletesTableName);
                Assert.assertEquals((long)1L, (long)scanDF.javaRDD().getNumPartitions());
                scanDF.writeTo(posDeletesTableName).option("rewritten-file-scan-task-set-id", fileSetID).append();
                this.commit(tab, posDeletesTable, fileSetID, 1);
            }
            finally {
                if (tasks == null) continue;
                tasks.close();
            }
        }
        StructLikeSet actual = this.actual(tableName, tab, null, (List<String>)ImmutableList.of((Object)"file_path", (Object)"pos", (Object)"row", (Object)"partition", (Object)"spec_id"));
        GenericRecord partitionRecordTemplate = GenericRecord.create((Types.StructType)tab.spec().partitionType());
        Record partitionA = partitionRecordTemplate.copy("data", (Object)"a");
        Record partitionB = partitionRecordTemplate.copy("data", (Object)"b");
        StructLikeSet allExpected = StructLikeSet.create((Types.StructType)TypeUtil.selectNot((Schema)posDeletesTable.schema(), (Set)ImmutableSet.of((Object)0x7FFFFFFE)).asStruct());
        allExpected.addAll((Collection)this.expected(tab, Lists.newArrayList((Object[])new PositionDelete[]{this.positionDelete(dataFileA.path(), 0L), this.positionDelete(dataFileA.path(), 1L)}), (StructLike)partitionA, null));
        allExpected.addAll((Collection)this.expected(tab, (List)deletesWithRow.first(), (StructLike)partitionB, null));
        Assert.assertEquals((String)"Position Delete table should contain expected rows", (Object)allExpected, (Object)actual);
        this.dropTable(tableName);
    }

    @Test
    public void testWritePartitionEvolutionAdd() throws Exception {
        String tableName = "write_partition_evolution_add";
        Table tab = this.createTable(tableName, SCHEMA, PartitionSpec.unpartitioned());
        int specId0 = tab.spec().specId();
        DataFile dataFileUnpartitioned = this.dataFile(tab, new Object[0]);
        tab.newAppend().appendFile(dataFileUnpartitioned).commit();
        Pair<List<PositionDelete<?>>, DeleteFile> deletesUnpartitioned = this.deleteFile(tab, dataFileUnpartitioned, new Object[0]);
        tab.newRowDelta().addDeletes((DeleteFile)deletesUnpartitioned.second()).commit();
        tab.updateSpec().addField("data").commit();
        int specId1 = tab.spec().specId();
        DataFile dataFileA = this.dataFile(tab, "a");
        DataFile dataFileB = this.dataFile(tab, "b");
        tab.newAppend().appendFile(dataFileA).appendFile(dataFileB).commit();
        Pair<List<PositionDelete<?>>, DeleteFile> deletesA = this.deleteFile(tab, dataFileA, "a");
        Pair<List<PositionDelete<?>>, DeleteFile> deletesB = this.deleteFile(tab, dataFileB, "b");
        tab.newRowDelta().addDeletes((DeleteFile)deletesA.second()).addDeletes((DeleteFile)deletesB.second()).commit();
        Table posDeletesTable = MetadataTableUtils.createMetadataTableInstance((Table)tab, (MetadataTableType)MetadataTableType.POSITION_DELETES);
        String posDeletesTableName = this.catalogName + ".default." + tableName + ".position_deletes";
        try (CloseableIterable tasks = ((BatchScan)posDeletesTable.newBatchScan().filter((Expression)Expressions.isNull((String)"partition.data"))).planFiles();){
            String fileSetID = UUID.randomUUID().toString();
            this.stageTask(tab, fileSetID, tasks);
            Dataset scanDF = spark.read().format("iceberg").option("scan-task-set-id", fileSetID).option("file-open-cost", Integer.MAX_VALUE).load(posDeletesTableName);
            Assert.assertEquals((long)1L, (long)scanDF.javaRDD().getNumPartitions());
            scanDF.writeTo(posDeletesTableName).option("rewritten-file-scan-task-set-id", fileSetID).append();
            this.commit(tab, posDeletesTable, fileSetID, 1);
        }
        GenericRecord partitionRecordTemplate = GenericRecord.create((Types.StructType)Partitioning.partitionType((Table)tab));
        Record unpartitionedRecord = partitionRecordTemplate.copy("data", null);
        StructLikeSet expectedUnpartitioned = this.expected(tab, (List)deletesUnpartitioned.first(), (StructLike)unpartitionedRecord, specId0, null);
        StructLikeSet actualUnpartitioned = this.actual(tableName, tab, "partition.data IS NULL", NON_PATH_COLS);
        Assert.assertEquals((String)"Position Delete table should contain expected rows", (Object)expectedUnpartitioned, (Object)actualUnpartitioned);
        for (String partValue : ImmutableList.of((Object)"a", (Object)"b")) {
            CloseableIterable<ScanTask> tasks = this.tasks(posDeletesTable, "data", partValue);
            try {
                String fileSetID = UUID.randomUUID().toString();
                this.stageTask(tab, fileSetID, tasks);
                Dataset scanDF = spark.read().format("iceberg").option("scan-task-set-id", fileSetID).option("file-open-cost", Integer.MAX_VALUE).load(posDeletesTableName);
                Assert.assertEquals((long)1L, (long)scanDF.javaRDD().getNumPartitions());
                scanDF.writeTo(posDeletesTableName).option("rewritten-file-scan-task-set-id", fileSetID).append();
                this.commit(tab, posDeletesTable, fileSetID, 1);
            }
            finally {
                if (tasks == null) continue;
                tasks.close();
            }
        }
        Record partitionA = partitionRecordTemplate.copy("data", (Object)"a");
        Record partitionB = partitionRecordTemplate.copy("data", (Object)"b");
        StructLikeSet expectedAll = StructLikeSet.create((Types.StructType)TypeUtil.selectNot((Schema)posDeletesTable.schema(), (Set)ImmutableSet.of((Object)0x7FFFFFFE)).asStruct());
        expectedAll.addAll((Collection)this.expected(tab, (List)deletesA.first(), (StructLike)partitionA, specId1, null));
        expectedAll.addAll((Collection)this.expected(tab, (List)deletesB.first(), (StructLike)partitionB, specId1, null));
        StructLikeSet actualAll = this.actual(tableName, tab, "partition.data = 'a' OR partition.data = 'b'", NON_PATH_COLS);
        Assert.assertEquals((String)"Position Delete table should contain expected rows", (Object)expectedAll, (Object)actualAll);
        this.dropTable(tableName);
    }

    @Test
    public void testWritePartitionEvolutionDisallowed() throws Exception {
        Dataset scanDF;
        String tableName = "write_partition_evolution_write";
        Table tab = this.createTable(tableName, SCHEMA, PartitionSpec.unpartitioned());
        DataFile dataFileUnpartitioned = this.dataFile(tab, new Object[0]);
        tab.newAppend().appendFile(dataFileUnpartitioned).commit();
        Pair<List<PositionDelete<?>>, DeleteFile> deletesUnpartitioned = this.deleteFile(tab, dataFileUnpartitioned, new Object[0]);
        tab.newRowDelta().addDeletes((DeleteFile)deletesUnpartitioned.second()).commit();
        Table posDeletesTable = MetadataTableUtils.createMetadataTableInstance((Table)tab, (MetadataTableType)MetadataTableType.POSITION_DELETES);
        String posDeletesTableName = this.catalogName + ".default." + tableName + ".position_deletes";
        String fileSetID = UUID.randomUUID().toString();
        try (CloseableIterable tasks = posDeletesTable.newBatchScan().planFiles();){
            this.stageTask(tab, fileSetID, tasks);
            scanDF = spark.read().format("iceberg").option("scan-task-set-id", fileSetID).option("file-open-cost", Integer.MAX_VALUE).load(posDeletesTableName);
            Assert.assertEquals((long)1L, (long)scanDF.javaRDD().getNumPartitions());
            tab.updateSpec().addField("data").commit();
        }
        Assert.assertThrows(AnalysisException.class, () -> scanDF.writeTo(posDeletesTableName).option("rewritten-file-scan-task-set-id", fileSetID).append());
        this.dropTable(tableName);
    }

    @Test
    public void testWriteSchemaEvolutionAdd() throws Exception {
        String tableName = "write_schema_evolution_add";
        PartitionSpec spec = PartitionSpec.builderFor((Schema)SCHEMA).identity("data").build();
        Table tab = this.createTable(tableName, SCHEMA, spec);
        DataFile dataFileA = this.dataFile(tab, "a");
        DataFile dataFileB = this.dataFile(tab, "b");
        tab.newAppend().appendFile(dataFileA).appendFile(dataFileB).commit();
        Pair<List<PositionDelete<?>>, DeleteFile> deletesA = this.deleteFile(tab, dataFileA, "a");
        Pair<List<PositionDelete<?>>, DeleteFile> deletesB = this.deleteFile(tab, dataFileB, "b");
        tab.newRowDelta().addDeletes((DeleteFile)deletesA.second()).addDeletes((DeleteFile)deletesB.second()).commit();
        tab.updateSchema().addColumn("new_col_1", (Type)Types.IntegerType.get()).addColumn("new_col_2", (Type)Types.IntegerType.get()).commit();
        DataFile dataFileC = this.dataFile(tab, "c");
        DataFile dataFileD = this.dataFile(tab, "d");
        tab.newAppend().appendFile(dataFileA).appendFile(dataFileB).commit();
        Pair<List<PositionDelete<?>>, DeleteFile> deletesC = this.deleteFile(tab, dataFileC, "c");
        Pair<List<PositionDelete<?>>, DeleteFile> deletesD = this.deleteFile(tab, dataFileD, "d");
        tab.newRowDelta().addDeletes((DeleteFile)deletesC.second()).addDeletes((DeleteFile)deletesD.second()).commit();
        Table posDeletesTable = MetadataTableUtils.createMetadataTableInstance((Table)tab, (MetadataTableType)MetadataTableType.POSITION_DELETES);
        String posDeletesTableName = this.catalogName + ".default." + tableName + ".position_deletes";
        try (CloseableIterable<ScanTask> tasks = this.tasks(posDeletesTable, "data", "a");){
            String fileSetID = UUID.randomUUID().toString();
            this.stageTask(tab, fileSetID, tasks);
            Dataset scanDF = spark.read().format("iceberg").option("scan-task-set-id", fileSetID).option("file-open-cost", Integer.MAX_VALUE).load(posDeletesTableName);
            Assert.assertEquals((long)1L, (long)scanDF.javaRDD().getNumPartitions());
            scanDF.writeTo(posDeletesTableName).option("rewritten-file-scan-task-set-id", fileSetID).append();
            this.commit(tab, posDeletesTable, fileSetID, 1);
        }
        GenericRecord partitionRecordTemplate = GenericRecord.create((Types.StructType)Partitioning.partitionType((Table)tab));
        Record partitionA = partitionRecordTemplate.copy("data", (Object)"a");
        List expectedDeletesA = (List)deletesA.first();
        expectedDeletesA.forEach(d -> {
            GenericRecord nested = (GenericRecord)d.get(2, GenericRecord.class);
            GenericRecord padded = GenericRecord.create((Types.StructType)tab.schema().asStruct());
            padded.set(0, nested.get(0));
            padded.set(1, nested.get(1));
            padded.set(2, null);
            padded.set(3, null);
            d.set(2, (Object)padded);
        });
        StructLikeSet expectedA = this.expected(tab, expectedDeletesA, (StructLike)partitionA, null);
        StructLikeSet actualA = this.actual(tableName, tab, "partition.data = 'a'", NON_PATH_COLS);
        Assert.assertEquals((String)"Position Delete table should contain expected rows", (Object)expectedA, (Object)actualA);
        try (CloseableIterable<ScanTask> tasks = this.tasks(posDeletesTable, "data", "c");){
            String fileSetID = UUID.randomUUID().toString();
            this.stageTask(tab, fileSetID, tasks);
            Dataset scanDF = spark.read().format("iceberg").option("scan-task-set-id", fileSetID).option("file-open-cost", Integer.MAX_VALUE).load(posDeletesTableName);
            Assert.assertEquals((long)1L, (long)scanDF.javaRDD().getNumPartitions());
            scanDF.writeTo(posDeletesTableName).option("rewritten-file-scan-task-set-id", fileSetID).append();
            this.commit(tab, posDeletesTable, fileSetID, 1);
        }
        Record partitionC = partitionRecordTemplate.copy("data", (Object)"c");
        StructLikeSet expectedC = this.expected(tab, (List)deletesC.first(), (StructLike)partitionC, null);
        StructLikeSet actualC = this.actual(tableName, tab, "partition.data = 'c'", NON_PATH_COLS);
        Assert.assertEquals((String)"Position Delete table should contain expected rows", (Object)expectedC, (Object)actualC);
        this.dropTable(tableName);
    }

    @Test
    public void testWriteSchemaEvolutionRemove() throws Exception {
        String tableName = "write_schema_evolution_remove";
        Schema oldSchema = new Schema(new Types.NestedField[]{Types.NestedField.required((int)1, (String)"id", (Type)Types.IntegerType.get()), Types.NestedField.required((int)2, (String)"data", (Type)Types.StringType.get()), Types.NestedField.optional((int)3, (String)"new_col_1", (Type)Types.IntegerType.get()), Types.NestedField.optional((int)4, (String)"new_col_2", (Type)Types.IntegerType.get())});
        PartitionSpec spec = PartitionSpec.builderFor((Schema)oldSchema).identity("data").build();
        Table tab = this.createTable(tableName, oldSchema, spec);
        DataFile dataFileA = this.dataFile(tab, "a");
        DataFile dataFileB = this.dataFile(tab, "b");
        tab.newAppend().appendFile(dataFileA).appendFile(dataFileB).commit();
        Pair<List<PositionDelete<?>>, DeleteFile> deletesA = this.deleteFile(tab, dataFileA, "a");
        Pair<List<PositionDelete<?>>, DeleteFile> deletesB = this.deleteFile(tab, dataFileB, "b");
        tab.newRowDelta().addDeletes((DeleteFile)deletesA.second()).addDeletes((DeleteFile)deletesB.second()).commit();
        tab.updateSchema().deleteColumn("new_col_1").deleteColumn("new_col_2").commit();
        DataFile dataFileC = this.dataFile(tab, "c");
        DataFile dataFileD = this.dataFile(tab, "d");
        tab.newAppend().appendFile(dataFileA).appendFile(dataFileB).commit();
        Pair<List<PositionDelete<?>>, DeleteFile> deletesC = this.deleteFile(tab, dataFileC, "c");
        Pair<List<PositionDelete<?>>, DeleteFile> deletesD = this.deleteFile(tab, dataFileD, "d");
        tab.newRowDelta().addDeletes((DeleteFile)deletesC.second()).addDeletes((DeleteFile)deletesD.second()).commit();
        Table posDeletesTable = MetadataTableUtils.createMetadataTableInstance((Table)tab, (MetadataTableType)MetadataTableType.POSITION_DELETES);
        String posDeletesTableName = this.catalogName + ".default." + tableName + ".position_deletes";
        for (String partValue : ImmutableList.of((Object)"a", (Object)"b", (Object)"c", (Object)"d")) {
            CloseableIterable<ScanTask> tasks = this.tasks(posDeletesTable, "data", partValue);
            try {
                String fileSetID = UUID.randomUUID().toString();
                this.stageTask(tab, fileSetID, tasks);
                Dataset scanDF = spark.read().format("iceberg").option("scan-task-set-id", fileSetID).option("file-open-cost", Integer.MAX_VALUE).load(posDeletesTableName);
                Assert.assertEquals((long)1L, (long)scanDF.javaRDD().getNumPartitions());
                scanDF.writeTo(posDeletesTableName).option("rewritten-file-scan-task-set-id", fileSetID).append();
                this.commit(tab, posDeletesTable, fileSetID, 1);
            }
            finally {
                if (tasks == null) continue;
                tasks.close();
            }
        }
        GenericRecord partitionRecordTemplate = GenericRecord.create((Types.StructType)Partitioning.partitionType((Table)tab));
        Record partitionA = partitionRecordTemplate.copy("data", (Object)"a");
        List expectedDeletesA = (List)deletesA.first();
        expectedDeletesA.forEach(d -> {
            GenericRecord nested = (GenericRecord)d.get(2, GenericRecord.class);
            GenericRecord padded = GenericRecord.create((Types.StructType)tab.schema().asStruct());
            padded.set(0, nested.get(0));
            padded.set(1, nested.get(1));
            d.set(2, (Object)padded);
        });
        StructLikeSet expectedA = this.expected(tab, expectedDeletesA, (StructLike)partitionA, null);
        StructLikeSet actualA = this.actual(tableName, tab, "partition.data = 'a'", NON_PATH_COLS);
        Assert.assertEquals((String)"Position Delete table should contain expected rows", (Object)expectedA, (Object)actualA);
        Record partitionC = partitionRecordTemplate.copy("data", (Object)"c");
        StructLikeSet expectedC = this.expected(tab, (List)deletesC.first(), (StructLike)partitionC, null);
        StructLikeSet actualC = this.actual(tableName, tab, "partition.data = 'c'", NON_PATH_COLS);
        Assert.assertEquals((String)"Position Delete table should contain expected rows", (Object)expectedC, (Object)actualC);
        this.dropTable(tableName);
    }

    @Test
    public void testNormalWritesNotAllowed() throws IOException {
        String tableName = "test_normal_write_not_allowed";
        PartitionSpec spec = PartitionSpec.builderFor((Schema)SCHEMA).identity("data").build();
        Table tab = this.createTable(tableName, SCHEMA, spec);
        DataFile dataFileA = this.dataFile(tab, "a");
        tab.newAppend().appendFile(dataFileA).commit();
        Pair<List<PositionDelete<?>>, DeleteFile> deletesA = this.deleteFile(tab, dataFileA, "a");
        tab.newRowDelta().addDeletes((DeleteFile)deletesA.second()).commit();
        String posDeletesTableName = this.catalogName + ".default." + tableName + ".position_deletes";
        Dataset scanDF = spark.read().format("iceberg").load(posDeletesTableName);
        Assert.assertThrows((String)"position_deletes table can only be written by RewriteDeleteFiles", IllegalArgumentException.class, () -> scanDF.writeTo(posDeletesTableName).append());
        this.dropTable(tableName);
    }

    private StructLikeSet actual(String tableName, Table table) {
        return this.actual(tableName, table, null, null);
    }

    private StructLikeSet actual(String tableName, Table table, String filter) {
        return this.actual(tableName, table, filter, null);
    }

    private StructLikeSet actual(String tableName, Table table, String filter, List<String> cols) {
        Dataset df = spark.read().format("iceberg").load(this.catalogName + ".default." + tableName + ".position_deletes");
        if (filter != null) {
            df = df.filter(filter);
        }
        if (cols != null) {
            df = df.select(cols.get(0), cols.subList(1, cols.size()).toArray(new String[0]));
        }
        Table deletesTable = MetadataTableUtils.createMetadataTableInstance((Table)table, (MetadataTableType)MetadataTableType.POSITION_DELETES);
        Types.StructType projection = deletesTable.schema().asStruct();
        if (cols != null) {
            projection = Types.StructType.of(projection.fields().stream().filter(f -> cols.contains(f.name())).collect(Collectors.toList()));
        }
        Types.StructType finalProjection = projection;
        StructLikeSet set = StructLikeSet.create((Types.StructType)projection);
        df.collectAsList().forEach(row -> {
            SparkStructLike rowWrapper = new SparkStructLike(finalProjection);
            set.add((StructLike)rowWrapper.wrap(row));
        });
        return set;
    }

    protected Table createTable(String name, Schema schema, PartitionSpec spec) {
        ImmutableMap properties = ImmutableMap.of((Object)"format-version", (Object)"2", (Object)"write.format.default", (Object)this.format.toString());
        return this.validationCatalog.createTable(TableIdentifier.of((String[])new String[]{"default", name}), schema, spec, (Map)properties);
    }

    protected void dropTable(String name) {
        this.validationCatalog.dropTable(TableIdentifier.of((String[])new String[]{"default", name}), false);
    }

    private PositionDelete<GenericRecord> positionDelete(CharSequence path, Long position) {
        PositionDelete posDelete = PositionDelete.create();
        posDelete.set(path, position.longValue(), null);
        return posDelete;
    }

    private PositionDelete<GenericRecord> positionDelete(Schema tableSchema, CharSequence path, Long position, Object ... values) {
        PositionDelete posDelete = PositionDelete.create();
        GenericRecord nested = GenericRecord.create((Schema)tableSchema);
        for (int i = 0; i < values.length; ++i) {
            nested.set(i, values[i]);
        }
        posDelete.set(path, position.longValue(), (Object)nested);
        return posDelete;
    }

    private StructLikeSet expected(Table testTable, List<PositionDelete<?>> deletes, StructLike partitionStruct, int specId, String deleteFilePath) {
        Table deletesTable = MetadataTableUtils.createMetadataTableInstance((Table)testTable, (MetadataTableType)MetadataTableType.POSITION_DELETES);
        Types.StructType posDeleteSchema = deletesTable.schema().asStruct();
        if (deleteFilePath == null) {
            posDeleteSchema = TypeUtil.selectNot((Schema)deletesTable.schema(), (Set)ImmutableSet.of((Object)0x7FFFFFFE)).asStruct();
        }
        Types.StructType finalSchema = posDeleteSchema;
        StructLikeSet set = StructLikeSet.create((Types.StructType)posDeleteSchema);
        deletes.stream().map(p -> {
            GenericRecord record = GenericRecord.create((Types.StructType)finalSchema);
            record.setField("file_path", (Object)p.path());
            record.setField("pos", (Object)p.pos());
            record.setField("row", p.row());
            if (partitionStruct != null) {
                record.setField("partition", (Object)partitionStruct);
            }
            record.setField("spec_id", (Object)specId);
            if (deleteFilePath != null) {
                record.setField("delete_file_path", (Object)deleteFilePath);
            }
            return record;
        }).forEach(arg_0 -> ((StructLikeSet)set).add(arg_0));
        return set;
    }

    private StructLikeSet expected(Table testTable, List<PositionDelete<?>> deletes, StructLike partitionStruct, String deleteFilePath) {
        return this.expected(testTable, deletes, partitionStruct, testTable.spec().specId(), deleteFilePath);
    }

    private DataFile dataFile(Table tab, Object ... partValues) throws IOException {
        return this.dataFile(tab, partValues, partValues);
    }

    private DataFile dataFile(Table tab, Object[] partDataValues, Object[] partFieldValues) throws IOException {
        GenericRecord record = GenericRecord.create((Schema)tab.schema());
        List partitionFieldNames = tab.spec().fields().stream().map(PartitionField::name).collect(Collectors.toList());
        int idIndex = partitionFieldNames.indexOf("id");
        int dataIndex = partitionFieldNames.indexOf("data");
        Integer idPartition = idIndex != -1 ? (Integer)partDataValues[idIndex] : null;
        String dataPartition = dataIndex != -1 ? (String)partDataValues[dataIndex] : null;
        ArrayList records = Lists.newArrayList((Object[])new Record[]{record.copy("id", (Object)(idPartition != null ? idPartition : 29), "data", (Object)(dataPartition != null ? dataPartition : "c")), record.copy("id", (Object)(idPartition != null ? idPartition : 43), "data", (Object)(dataPartition != null ? dataPartition : "k")), record.copy("id", (Object)(idPartition != null ? idPartition : 61), "data", (Object)(dataPartition != null ? dataPartition : "r")), record.copy("id", (Object)(idPartition != null ? idPartition : 89), "data", (Object)(dataPartition != null ? dataPartition : "t"))});
        List cols = tab.schema().columns();
        if (cols.size() > 2) {
            int i = 2;
            while (i < cols.size()) {
                int pos = i++;
                records.forEach(r -> r.set(pos, (Object)pos));
            }
        }
        TestHelpers.Row partitionInfo = TestHelpers.Row.of((Object[])partFieldValues);
        return FileHelpers.writeDataFile((Table)tab, (OutputFile)Files.localOutput((File)this.temp.newFile()), (StructLike)partitionInfo, (List)records);
    }

    private Pair<List<PositionDelete<?>>, DeleteFile> deleteFile(Table tab, DataFile dataFile, Object ... partValues) throws IOException {
        return this.deleteFile(tab, dataFile, partValues, partValues);
    }

    private Pair<List<PositionDelete<?>>, DeleteFile> deleteFile(Table tab, DataFile dataFile, Object[] partDataValues, Object[] partFieldValues) throws IOException {
        List partFields = tab.spec().fields();
        List partitionFieldNames = partFields.stream().map(PartitionField::name).collect(Collectors.toList());
        int idIndex = partitionFieldNames.indexOf("id");
        int dataIndex = partitionFieldNames.indexOf("data");
        Integer idPartition = idIndex != -1 ? (Integer)partDataValues[idIndex] : null;
        String dataPartition = dataIndex != -1 ? (String)partDataValues[dataIndex] : null;
        ArrayList deletes = Lists.newArrayList((Object[])new PositionDelete[]{this.positionDelete(tab.schema(), dataFile.path(), 0L, idPartition != null ? idPartition : 29, dataPartition != null ? dataPartition : "c"), this.positionDelete(tab.schema(), dataFile.path(), 1L, idPartition != null ? idPartition : 61, dataPartition != null ? dataPartition : "r")});
        List cols = tab.schema().columns();
        if (cols.size() > 2) {
            int i = 2;
            while (i < cols.size()) {
                int pos = i++;
                deletes.forEach(d -> ((GenericRecord)d.get(2, GenericRecord.class)).set(pos, (Object)pos));
            }
        }
        TestHelpers.Row partitionInfo = TestHelpers.Row.of((Object[])partFieldValues);
        DeleteFile deleteFile = FileHelpers.writePosDeleteFile((Table)tab, (OutputFile)Files.localOutput((File)this.temp.newFile()), (StructLike)partitionInfo, (List)deletes);
        return Pair.of((Object)deletes, (Object)deleteFile);
    }

    private <T extends ScanTask> void stageTask(Table tab, String fileSetID, CloseableIterable<T> tasks) {
        ScanTaskSetManager taskSetManager = ScanTaskSetManager.get();
        taskSetManager.stageTasks(tab, fileSetID, (List)Lists.newArrayList(tasks));
    }

    private void commit(Table baseTab, Table posDeletesTable, String fileSetID, int expectedSourceFiles, int expectedTargetFiles) {
        PositionDeletesRewriteCoordinator rewriteCoordinator = PositionDeletesRewriteCoordinator.get();
        Set rewrittenFiles = ScanTaskSetManager.get().fetchTasks(posDeletesTable, fileSetID).stream().map(t -> (DeleteFile)((PositionDeletesScanTask)t).file()).collect(Collectors.toSet());
        Set addedFiles = rewriteCoordinator.fetchNewFiles(posDeletesTable, fileSetID);
        Assert.assertEquals((long)expectedSourceFiles, (long)rewrittenFiles.size());
        Assert.assertEquals((long)expectedTargetFiles, (long)addedFiles.size());
        List sortedAddedFiles = addedFiles.stream().map(f -> f.path().toString()).sorted().collect(Collectors.toList());
        List sortedRewrittenFiles = rewrittenFiles.stream().map(f -> f.path().toString()).sorted().collect(Collectors.toList());
        Assert.assertNotEquals((String)"Lists should not be the same", sortedAddedFiles, sortedRewrittenFiles);
        baseTab.newRewrite().rewriteFiles((Set)ImmutableSet.of(), rewrittenFiles, (Set)ImmutableSet.of(), addedFiles).commit();
    }

    private void commit(Table baseTab, Table posDeletesTable, String fileSetID, int expectedFiles) {
        this.commit(baseTab, posDeletesTable, fileSetID, expectedFiles, expectedFiles);
    }

    private CloseableIterable<ScanTask> tasks(Table posDeletesTable, String partitionColumn, String partitionValue) {
        UnboundPredicate filter = Expressions.equal((String)("partition." + partitionColumn), (Object)partitionValue);
        CloseableIterable files = ((BatchScan)posDeletesTable.newBatchScan().filter((Expression)filter)).planFiles();
        return CloseableIterable.filter((CloseableIterable)files, t -> {
            StructLike filePartition = ((PositionDeletesScanTask)t).partition();
            String filePartitionValue = (String)filePartition.get(0, String.class);
            return filePartitionValue != null && filePartitionValue.equals(partitionValue);
        });
    }
}

