/*
 * Decompiled with CFR 0.152.
 */
package org.apache.iceberg.spark.actions;

import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.function.BiFunction;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.Stream;
import org.apache.iceberg.ContentFile;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.DeleteFile;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.Files;
import org.apache.iceberg.MetadataTableType;
import org.apache.iceberg.MetadataTableUtils;
import org.apache.iceberg.PartitionData;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Partitioning;
import org.apache.iceberg.PositionDeletesScanTask;
import org.apache.iceberg.RowDelta;
import org.apache.iceberg.Schema;
import org.apache.iceberg.StructLike;
import org.apache.iceberg.Table;
import org.apache.iceberg.actions.RewritePositionDeleteFiles;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.data.FileHelpers;
import org.apache.iceberg.deletes.DeleteGranularity;
import org.apache.iceberg.exceptions.ValidationException;
import org.apache.iceberg.expressions.Expression;
import org.apache.iceberg.expressions.Expressions;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.io.OutputFile;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.spark.SparkCatalogConfig;
import org.apache.iceberg.spark.SparkCatalogTestBase;
import org.apache.iceberg.spark.actions.RewriteDataFilesSparkAction;
import org.apache.iceberg.spark.actions.RewritePositionDeleteFilesSparkAction;
import org.apache.iceberg.spark.actions.SparkActions;
import org.apache.iceberg.spark.data.TestHelpers;
import org.apache.iceberg.spark.source.FourColumnRecord;
import org.apache.iceberg.spark.source.ThreeColumnRecord;
import org.apache.iceberg.types.Type;
import org.apache.iceberg.types.Types;
import org.apache.iceberg.util.Pair;
import org.apache.iceberg.util.StructLikeMap;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.functions;
import org.apache.spark.sql.types.StructType;
import org.assertj.core.api.Assertions;
import org.junit.After;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.junit.runners.Parameterized;

public class TestRewritePositionDeleteFilesAction
extends SparkCatalogTestBase {
    private static final String TABLE_NAME = "test_table";
    private static final Schema SCHEMA = new Schema(new Types.NestedField[]{Types.NestedField.optional((int)1, (String)"c1", (Type)Types.IntegerType.get()), Types.NestedField.optional((int)2, (String)"c2", (Type)Types.StringType.get()), Types.NestedField.optional((int)3, (String)"c3", (Type)Types.StringType.get())});
    private static final Map<String, String> CATALOG_PROPS = ImmutableMap.of((Object)"type", (Object)"hive", (Object)"default-namespace", (Object)"default", (Object)"cache-enabled", (Object)"false");
    private static final int SCALE = 4000;
    private static final int DELETES_SCALE = 1000;
    @Rule
    public TemporaryFolder temp = new TemporaryFolder();
    private final FileFormat format;

    @Parameterized.Parameters(name="formatVersion = {0}, catalogName = {1}, implementation = {2}, config = {3}, fileFormat = {4}")
    public static Object[][] parameters() {
        return new Object[][]{{SparkCatalogConfig.HIVE.catalogName(), SparkCatalogConfig.HIVE.implementation(), CATALOG_PROPS, FileFormat.PARQUET}};
    }

    public TestRewritePositionDeleteFilesAction(String catalogName, String implementation, Map<String, String> config, FileFormat format) {
        super(catalogName, implementation, config);
        this.format = format;
    }

    @After
    public void cleanup() {
        this.validationCatalog.dropTable(TableIdentifier.of((String[])new String[]{"default", TABLE_NAME}));
    }

    @Test
    public void testEmptyTable() {
        PartitionSpec spec = PartitionSpec.builderFor((Schema)SCHEMA).identity("c1").build();
        Table table = this.validationCatalog.createTable(TableIdentifier.of((String[])new String[]{"default", TABLE_NAME}), SCHEMA, spec, this.tableProperties());
        RewritePositionDeleteFiles.Result result = SparkActions.get((SparkSession)spark).rewritePositionDeletes(table).execute();
        Assert.assertEquals((String)"No rewritten delete files", (long)0L, (long)result.rewrittenDeleteFilesCount());
        Assert.assertEquals((String)"No added delete files", (long)0L, (long)result.addedDeleteFilesCount());
    }

    @Test
    public void testFileGranularity() throws Exception {
        this.checkDeleteGranularity(DeleteGranularity.FILE);
    }

    @Test
    public void testPartitionGranularity() throws Exception {
        this.checkDeleteGranularity(DeleteGranularity.PARTITION);
    }

    private void checkDeleteGranularity(DeleteGranularity deleteGranularity) throws Exception {
        Table table = this.createTableUnpartitioned(2, 4000);
        table.updateProperties().set("write.delete.granularity", deleteGranularity.toString()).commit();
        List<DataFile> dataFiles = TestHelpers.dataFiles(table);
        Assertions.assertThat(dataFiles).hasSize(2);
        this.writePosDeletesForFiles(table, 2, 1000, dataFiles);
        List<DeleteFile> deleteFiles = this.deleteFiles(table);
        Assertions.assertThat(deleteFiles).hasSize(2);
        RewritePositionDeleteFiles.Result result = ((RewritePositionDeleteFilesSparkAction)SparkActions.get((SparkSession)spark).rewritePositionDeletes(table).option("rewrite-all", "true")).execute();
        int expectedDeleteFilesCount = deleteGranularity == DeleteGranularity.FILE ? 2 : 1;
        Assertions.assertThat((int)result.addedDeleteFilesCount()).isEqualTo(expectedDeleteFilesCount);
    }

    @Test
    public void testUnpartitioned() throws Exception {
        Table table = this.createTableUnpartitioned(2, 4000);
        List<DataFile> dataFiles = TestHelpers.dataFiles(table);
        this.writePosDeletesForFiles(table, 2, 1000, dataFiles);
        Assert.assertEquals((long)2L, (long)dataFiles.size());
        List<DeleteFile> deleteFiles = this.deleteFiles(table);
        Assert.assertEquals((long)2L, (long)deleteFiles.size());
        List<Object[]> expectedRecords = this.records(table);
        List<Object[]> expectedDeletes = this.deleteRecords(table);
        Assert.assertEquals((long)2000L, (long)expectedRecords.size());
        Assert.assertEquals((long)2000L, (long)expectedDeletes.size());
        RewritePositionDeleteFiles.Result result = ((RewritePositionDeleteFilesSparkAction)SparkActions.get((SparkSession)spark).rewritePositionDeletes(table).option("rewrite-all", "true")).execute();
        List<DeleteFile> newDeleteFiles = this.deleteFiles(table);
        Assert.assertEquals((String)"Expected 1 new delete file", (long)1L, (long)newDeleteFiles.size());
        this.assertLocallySorted(newDeleteFiles);
        this.assertNotContains(deleteFiles, newDeleteFiles);
        this.checkResult(result, deleteFiles, newDeleteFiles, 1);
        this.checkSequenceNumbers(table, deleteFiles, newDeleteFiles);
        List<Object[]> actualRecords = this.records(table);
        List<Object[]> actualDeletes = this.deleteRecords(table);
        this.assertEquals("Rows must match", expectedRecords, actualRecords);
        this.assertEquals("Position deletes must match", expectedDeletes, actualDeletes);
    }

    @Test
    public void testRewriteAll() throws Exception {
        Table table = this.createTablePartitioned(4, 2, 4000);
        List<DataFile> dataFiles = TestHelpers.dataFiles(table);
        this.writePosDeletesForFiles(table, 2, 1000, dataFiles);
        Assert.assertEquals((long)4L, (long)dataFiles.size());
        List<DeleteFile> deleteFiles = this.deleteFiles(table);
        Assert.assertEquals((long)8L, (long)deleteFiles.size());
        List<Object[]> expectedRecords = this.records(table);
        List<Object[]> expectedDeletes = this.deleteRecords(table);
        Assert.assertEquals((long)12000L, (long)expectedRecords.size());
        Assert.assertEquals((long)4000L, (long)expectedDeletes.size());
        RewritePositionDeleteFiles.Result result = ((RewritePositionDeleteFilesSparkAction)((RewritePositionDeleteFilesSparkAction)SparkActions.get((SparkSession)spark).rewritePositionDeletes(table).option("rewrite-all", "true")).option("target-file-size-bytes", Long.toString(0x7FFFFFFFFFFFFFFEL))).execute();
        List<DeleteFile> newDeleteFiles = this.deleteFiles(table);
        Assert.assertEquals((String)"Should have 4 delete files", (long)4L, (long)newDeleteFiles.size());
        this.assertNotContains(deleteFiles, newDeleteFiles);
        this.assertLocallySorted(newDeleteFiles);
        this.checkResult(result, deleteFiles, newDeleteFiles, 4);
        this.checkSequenceNumbers(table, deleteFiles, newDeleteFiles);
        List<Object[]> actualRecords = this.records(table);
        List<Object[]> actualDeletes = this.deleteRecords(table);
        this.assertEquals("Rows must match", expectedRecords, actualRecords);
        this.assertEquals("Position deletes must match", expectedDeletes, actualDeletes);
    }

    @Test
    public void testRewriteFilter() throws Exception {
        Table table = this.createTablePartitioned(4, 2, 4000);
        table.refresh();
        List<DataFile> dataFiles = TestHelpers.dataFiles(table);
        this.writePosDeletesForFiles(table, 2, 1000, dataFiles);
        Assert.assertEquals((long)4L, (long)dataFiles.size());
        List<DeleteFile> deleteFiles = this.deleteFiles(table);
        Assert.assertEquals((long)8L, (long)deleteFiles.size());
        table.refresh();
        List<Object[]> expectedRecords = this.records(table);
        List<Object[]> expectedDeletes = this.deleteRecords(table);
        Assert.assertEquals((long)12000L, (long)expectedRecords.size());
        Assert.assertEquals((long)4000L, (long)expectedDeletes.size());
        Expression filter = Expressions.and((Expression)Expressions.greaterThan((String)"c3", (Object)"0"), (Expression)Expressions.or((Expression)Expressions.equal((String)"c1", (Object)1), (Expression)Expressions.equal((String)"c1", (Object)2)));
        RewritePositionDeleteFiles.Result result = ((RewritePositionDeleteFilesSparkAction)((RewritePositionDeleteFilesSparkAction)SparkActions.get((SparkSession)spark).rewritePositionDeletes(table).filter(filter).option("rewrite-all", "true")).option("target-file-size-bytes", Long.toString(0x7FFFFFFFFFFFFFFEL))).execute();
        List<DeleteFile> newDeleteFiles = this.except(this.deleteFiles(table), deleteFiles);
        Assert.assertEquals((String)"Should have 4 delete files", (long)2L, (long)newDeleteFiles.size());
        List<DeleteFile> expectedRewrittenFiles = this.filterFiles(table, deleteFiles, new List[]{ImmutableList.of((Object)1), ImmutableList.of((Object)2)});
        this.assertLocallySorted(newDeleteFiles);
        this.checkResult(result, expectedRewrittenFiles, newDeleteFiles, 2);
        List<Object[]> actualRecords = this.records(table);
        List<Object[]> actualDeletes = this.deleteRecords(table);
        this.assertEquals("Rows must match", expectedRecords, actualRecords);
        this.assertEquals("Position deletes must match", expectedDeletes, actualDeletes);
    }

    @Test
    public void testRewriteToSmallerTarget() throws Exception {
        Table table = this.createTablePartitioned(4, 2, 4000);
        List<DataFile> dataFiles = TestHelpers.dataFiles(table);
        this.writePosDeletesForFiles(table, 2, 1000, dataFiles);
        Assert.assertEquals((long)4L, (long)dataFiles.size());
        List<Object[]> expectedRecords = this.records(table);
        List<Object[]> expectedDeletes = this.deleteRecords(table);
        Assert.assertEquals((long)12000L, (long)expectedRecords.size());
        Assert.assertEquals((long)4000L, (long)expectedDeletes.size());
        List<DeleteFile> deleteFiles = this.deleteFiles(table);
        Assert.assertEquals((long)8L, (long)deleteFiles.size());
        long avgSize = this.size(deleteFiles) / (long)deleteFiles.size();
        RewritePositionDeleteFiles.Result result = ((RewritePositionDeleteFilesSparkAction)((RewritePositionDeleteFilesSparkAction)SparkActions.get((SparkSession)spark).rewritePositionDeletes(table).option("rewrite-all", "true")).option("target-file-size-bytes", String.valueOf(avgSize / 2L))).execute();
        List<DeleteFile> newDeleteFiles = this.deleteFiles(table);
        Assert.assertEquals((String)"Should have 8 new delete files", (long)8L, (long)newDeleteFiles.size());
        this.assertNotContains(deleteFiles, newDeleteFiles);
        this.assertLocallySorted(newDeleteFiles);
        this.checkResult(result, deleteFiles, newDeleteFiles, 4);
        this.checkSequenceNumbers(table, deleteFiles, newDeleteFiles);
        List<Object[]> actualRecords = this.records(table);
        List<Object[]> actualDeletes = this.deleteRecords(table);
        this.assertEquals("Rows must match", expectedRecords, actualRecords);
        this.assertEquals("Position deletes must match", expectedDeletes, actualDeletes);
    }

    @Test
    public void testRemoveDanglingDeletes() throws Exception {
        Table table = this.createTablePartitioned(4, 2, 4000);
        List<DataFile> dataFiles = TestHelpers.dataFiles(table);
        this.writePosDeletesForFiles(table, 2, 1000, dataFiles, true);
        Assert.assertEquals((long)4L, (long)dataFiles.size());
        List<DeleteFile> deleteFiles = this.deleteFiles(table);
        Assert.assertEquals((long)8L, (long)deleteFiles.size());
        List<Object[]> expectedRecords = this.records(table);
        List<Object[]> expectedDeletes = this.deleteRecords(table);
        Assert.assertEquals((long)12000L, (long)expectedRecords.size());
        Assert.assertEquals((long)4000L, (long)expectedDeletes.size());
        ((RewriteDataFilesSparkAction)SparkActions.get((SparkSession)spark).rewriteDataFiles(table).option("rewrite-all", "true")).execute();
        RewritePositionDeleteFiles.Result result = ((RewritePositionDeleteFilesSparkAction)SparkActions.get((SparkSession)spark).rewritePositionDeletes(table).option("rewrite-all", "true")).execute();
        List<DeleteFile> newDeleteFiles = this.deleteFiles(table);
        Assert.assertEquals((String)"Should have 0 new delete files", (long)0L, (long)newDeleteFiles.size());
        this.assertNotContains(deleteFiles, newDeleteFiles);
        this.assertLocallySorted(newDeleteFiles);
        this.checkResult(result, deleteFiles, newDeleteFiles, 4);
        this.checkSequenceNumbers(table, deleteFiles, newDeleteFiles);
        List<Object[]> actualRecords = this.records(table);
        List<Object[]> actualDeletes = this.deleteRecords(table);
        this.assertEquals("Rows must match", expectedRecords, actualRecords);
        Assert.assertEquals((String)"Should be no new position deletes", (long)0L, (long)actualDeletes.size());
    }

    @Test
    public void testSomePartitionsDanglingDeletes() throws Exception {
        Table table = this.createTablePartitioned(4, 2, 4000);
        List<DataFile> dataFiles = TestHelpers.dataFiles(table);
        this.writePosDeletesForFiles(table, 2, 1000, dataFiles);
        Assert.assertEquals((long)4L, (long)dataFiles.size());
        List<DeleteFile> deleteFiles = this.deleteFiles(table);
        Assert.assertEquals((long)8L, (long)deleteFiles.size());
        List<Object[]> expectedRecords = this.records(table);
        List<Object> expectedDeletes = this.deleteRecords(table);
        Assert.assertEquals((long)12000L, (long)expectedRecords.size());
        Assert.assertEquals((long)4000L, (long)expectedDeletes.size());
        Expression filter = Expressions.or((Expression)Expressions.equal((String)"c1", (Object)0), (Expression)Expressions.equal((String)"c1", (Object)1));
        ((RewriteDataFilesSparkAction)SparkActions.get((SparkSession)spark).rewriteDataFiles(table).filter(filter).option("rewrite-all", "true")).execute();
        RewritePositionDeleteFiles.Result result = ((RewritePositionDeleteFilesSparkAction)SparkActions.get((SparkSession)spark).rewritePositionDeletes(table).option("rewrite-all", "true")).execute();
        List<DeleteFile> newDeleteFiles = this.deleteFiles(table);
        Assert.assertEquals((String)"Should have 2 new delete files", (long)2L, (long)newDeleteFiles.size());
        this.assertNotContains(deleteFiles, newDeleteFiles);
        this.assertLocallySorted(newDeleteFiles);
        this.checkResult(result, deleteFiles, newDeleteFiles, 4);
        this.checkSequenceNumbers(table, deleteFiles, newDeleteFiles);
        expectedDeletes = expectedDeletes.stream().filter(r -> {
            Object[] partition = (Object[])r[3];
            return partition[0] == Integer.valueOf(2) || partition[0] == Integer.valueOf(3);
        }).collect(Collectors.toList());
        List<Object[]> actualRecords = this.records(table);
        List<Object[]> actualDeletes = this.deleteRecords(table);
        this.assertEquals("Rows must match", expectedRecords, actualRecords);
        this.assertEquals("Position deletes must match", expectedDeletes, actualDeletes);
    }

    @Test
    public void testRewriteFilterRemoveDangling() throws Exception {
        Table table = this.createTablePartitioned(4, 2, 4000);
        table.refresh();
        List<DataFile> dataFiles = TestHelpers.dataFiles(table);
        this.writePosDeletesForFiles(table, 2, 1000, dataFiles, true);
        Assert.assertEquals((long)4L, (long)dataFiles.size());
        List<DeleteFile> deleteFiles = this.deleteFiles(table);
        Assert.assertEquals((long)8L, (long)deleteFiles.size());
        table.refresh();
        List<Object[]> expectedRecords = this.records(table);
        List<Object[]> expectedDeletes = this.deleteRecords(table);
        Assert.assertEquals((long)12000L, (long)expectedRecords.size());
        Assert.assertEquals((long)4000L, (long)expectedDeletes.size());
        ((RewriteDataFilesSparkAction)SparkActions.get((SparkSession)spark).rewriteDataFiles(table).option("rewrite-all", "true")).execute();
        Expression filter = Expressions.or((Expression)Expressions.equal((String)"c1", (Object)0), (Expression)Expressions.equal((String)"c1", (Object)1));
        RewritePositionDeleteFiles.Result result = ((RewritePositionDeleteFilesSparkAction)((RewritePositionDeleteFilesSparkAction)SparkActions.get((SparkSession)spark).rewritePositionDeletes(table).filter(filter).option("rewrite-all", "true")).option("target-file-size-bytes", Long.toString(0x7FFFFFFFFFFFFFFEL))).execute();
        List<DeleteFile> newDeleteFiles = this.except(this.deleteFiles(table), deleteFiles);
        Assert.assertEquals((String)"Should have 2 new delete files", (long)0L, (long)newDeleteFiles.size());
        List<DeleteFile> expectedRewrittenFiles = this.filterFiles(table, deleteFiles, new List[]{ImmutableList.of((Object)0), ImmutableList.of((Object)1)});
        this.checkResult(result, expectedRewrittenFiles, newDeleteFiles, 2);
        List<Object[]> actualRecords = this.records(table);
        List<Object[]> allDeletes = this.deleteRecords(table);
        List<Object[]> expectedDeletesFiltered = this.filterDeletes(expectedDeletes, new List[]{ImmutableList.of((Object)2), ImmutableList.of((Object)3)});
        this.assertEquals("Rows must match", expectedRecords, actualRecords);
        this.assertEquals("Position deletes must match", expectedDeletesFiltered, allDeletes);
    }

    @Test
    public void testPartitionEvolutionAdd() throws Exception {
        Table table = this.createTableUnpartitioned(2, 4000);
        List<DataFile> unpartitionedDataFiles = TestHelpers.dataFiles(table);
        this.writePosDeletesForFiles(table, 2, 1000, unpartitionedDataFiles);
        Assert.assertEquals((long)2L, (long)unpartitionedDataFiles.size());
        List<DeleteFile> unpartitionedDeleteFiles = this.deleteFiles(table);
        Assert.assertEquals((long)2L, (long)unpartitionedDeleteFiles.size());
        List<Object[]> expectedUnpartitionedDeletes = this.deleteRecords(table);
        List<Object[]> expectedUnpartitionedRecords = this.records(table);
        Assert.assertEquals((long)2000L, (long)expectedUnpartitionedRecords.size());
        Assert.assertEquals((long)2000L, (long)expectedUnpartitionedDeletes.size());
        table.updateSpec().addField("c1").commit();
        this.writeRecords(table, 2, 4000, 2);
        List<DataFile> partitionedDataFiles = this.except(TestHelpers.dataFiles(table), unpartitionedDataFiles);
        this.writePosDeletesForFiles(table, 2, 1000, partitionedDataFiles);
        Assert.assertEquals((long)2L, (long)partitionedDataFiles.size());
        List<DeleteFile> partitionedDeleteFiles = this.except(this.deleteFiles(table), unpartitionedDeleteFiles);
        Assert.assertEquals((long)4L, (long)partitionedDeleteFiles.size());
        List<Object[]> expectedDeletes = this.deleteRecords(table);
        List<Object[]> expectedRecords = this.records(table);
        Assert.assertEquals((long)4000L, (long)expectedDeletes.size());
        Assert.assertEquals((long)8000L, (long)expectedRecords.size());
        RewritePositionDeleteFiles.Result result = ((RewritePositionDeleteFilesSparkAction)SparkActions.get((SparkSession)spark).rewritePositionDeletes(table).option("rewrite-all", "true")).execute();
        List<DeleteFile> rewrittenDeleteFiles = Stream.concat(unpartitionedDeleteFiles.stream(), partitionedDeleteFiles.stream()).collect(Collectors.toList());
        List<DeleteFile> newDeleteFiles = this.deleteFiles(table);
        Assert.assertEquals((String)"Should have 3 new delete files", (long)3L, (long)newDeleteFiles.size());
        this.assertNotContains(rewrittenDeleteFiles, newDeleteFiles);
        this.assertLocallySorted(newDeleteFiles);
        this.checkResult(result, rewrittenDeleteFiles, newDeleteFiles, 3);
        this.checkSequenceNumbers(table, rewrittenDeleteFiles, newDeleteFiles);
        List<Object[]> actualRecords = this.records(table);
        List<Object[]> actualDeletes = this.deleteRecords(table);
        this.assertEquals("Rows must match", expectedRecords, actualRecords);
        this.assertEquals("Position deletes must match", expectedDeletes, actualDeletes);
    }

    @Test
    public void testPartitionEvolutionRemove() throws Exception {
        Table table = this.createTablePartitioned(2, 2, 4000);
        List<DataFile> dataFilesUnpartitioned = TestHelpers.dataFiles(table);
        this.writePosDeletesForFiles(table, 2, 1000, dataFilesUnpartitioned);
        Assert.assertEquals((long)2L, (long)dataFilesUnpartitioned.size());
        List<DeleteFile> deleteFilesUnpartitioned = this.deleteFiles(table);
        Assert.assertEquals((long)4L, (long)deleteFilesUnpartitioned.size());
        table.updateSpec().removeField("c1").commit();
        this.writeRecords(table, 2, 4000);
        List<DataFile> dataFilesPartitioned = this.except(TestHelpers.dataFiles(table), dataFilesUnpartitioned);
        this.writePosDeletesForFiles(table, 2, 1000, dataFilesPartitioned);
        Assert.assertEquals((long)2L, (long)dataFilesPartitioned.size());
        List<DeleteFile> deleteFilesPartitioned = this.except(this.deleteFiles(table), deleteFilesUnpartitioned);
        Assert.assertEquals((long)2L, (long)deleteFilesPartitioned.size());
        List<Object[]> expectedRecords = this.records(table);
        List<Object[]> expectedDeletes = this.deleteRecords(table);
        Assert.assertEquals((long)4000L, (long)expectedDeletes.size());
        Assert.assertEquals((long)8000L, (long)expectedRecords.size());
        List<DeleteFile> expectedRewritten = this.deleteFiles(table);
        Assert.assertEquals((long)6L, (long)expectedRewritten.size());
        RewritePositionDeleteFiles.Result result = ((RewritePositionDeleteFilesSparkAction)SparkActions.get((SparkSession)spark).rewritePositionDeletes(table).option("rewrite-all", "true")).execute();
        List<DeleteFile> newDeleteFiles = this.deleteFiles(table);
        Assert.assertEquals((String)"Should have 3 new delete files", (long)3L, (long)newDeleteFiles.size());
        this.assertNotContains(expectedRewritten, newDeleteFiles);
        this.assertLocallySorted(newDeleteFiles);
        this.checkResult(result, expectedRewritten, newDeleteFiles, 3);
        this.checkSequenceNumbers(table, expectedRewritten, newDeleteFiles);
        List<Object[]> actualRecords = this.records(table);
        List<Object[]> actualDeletes = this.deleteRecords(table);
        this.assertEquals("Rows must match", expectedRecords, actualRecords);
        this.assertEquals("Position deletes must match", expectedDeletes, actualDeletes);
    }

    @Test
    public void testSchemaEvolution() throws Exception {
        Table table = this.createTablePartitioned(2, 2, 4000);
        List<DataFile> dataFiles = TestHelpers.dataFiles(table);
        this.writePosDeletesForFiles(table, 2, 1000, dataFiles);
        Assert.assertEquals((long)2L, (long)dataFiles.size());
        List<DeleteFile> deleteFiles = this.deleteFiles(table);
        Assert.assertEquals((long)4L, (long)deleteFiles.size());
        table.updateSchema().addColumn("c4", (Type)Types.StringType.get()).commit();
        this.writeNewSchemaRecords(table, 2, 4000, 2, 2);
        int newColId = table.schema().findField("c4").fieldId();
        List<DataFile> newSchemaDataFiles = TestHelpers.dataFiles(table).stream().filter(f -> f.upperBounds().containsKey(newColId)).collect(Collectors.toList());
        this.writePosDeletesForFiles(table, 2, 1000, newSchemaDataFiles);
        List<DeleteFile> newSchemaDeleteFiles = this.except(this.deleteFiles(table), deleteFiles);
        Assert.assertEquals((long)4L, (long)newSchemaDeleteFiles.size());
        table.refresh();
        List<Object[]> expectedDeletes = this.deleteRecords(table);
        List<Object[]> expectedRecords = this.records(table);
        Assert.assertEquals((long)4000L, (long)expectedDeletes.size());
        Assert.assertEquals((long)12000L, (long)expectedRecords.size());
        RewritePositionDeleteFiles.Result result = ((RewritePositionDeleteFilesSparkAction)SparkActions.get((SparkSession)spark).rewritePositionDeletes(table).option("rewrite-all", "true")).execute();
        List<DeleteFile> rewrittenDeleteFiles = Stream.concat(deleteFiles.stream(), newSchemaDeleteFiles.stream()).collect(Collectors.toList());
        List<DeleteFile> newDeleteFiles = this.deleteFiles(table);
        Assert.assertEquals((String)"Should have 2 new delete files", (long)4L, (long)newDeleteFiles.size());
        this.assertNotContains(rewrittenDeleteFiles, newDeleteFiles);
        this.assertLocallySorted(newDeleteFiles);
        this.checkResult(result, rewrittenDeleteFiles, newDeleteFiles, 4);
        this.checkSequenceNumbers(table, rewrittenDeleteFiles, newDeleteFiles);
        List<Object[]> actualRecords = this.records(table);
        this.assertEquals("Rows must match", expectedRecords, actualRecords);
    }

    @Test
    public void testRewriteManyColumns() throws Exception {
        ArrayList fields = Lists.newArrayList((Object[])new Types.NestedField[]{Types.NestedField.optional((int)0, (String)"id", (Type)Types.LongType.get())});
        List additionalCols = IntStream.range(1, 1010).mapToObj(i -> Types.NestedField.optional((int)i, (String)("c" + i), (Type)Types.StringType.get())).collect(Collectors.toList());
        fields.addAll(additionalCols);
        Schema schema = new Schema((List)fields);
        PartitionSpec spec = PartitionSpec.builderFor((Schema)schema).bucket("id", 2).build();
        Table table = this.validationCatalog.createTable(TableIdentifier.of((String[])new String[]{"default", TABLE_NAME}), schema, spec, this.tableProperties());
        Dataset df = spark.range(4L).withColumns(IntStream.range(1, 1010).boxed().collect(Collectors.toMap(i -> "c" + i, i -> functions.expr((String)"CAST(id as STRING)"))));
        StructType sparkSchema = spark.table(this.name(table)).schema();
        spark.createDataFrame(df.rdd(), sparkSchema).coalesce(1).write().format("iceberg").mode("append").save(this.name(table));
        List<DataFile> dataFiles = TestHelpers.dataFiles(table);
        this.writePosDeletesForFiles(table, 1, 1, dataFiles);
        Assertions.assertThat(dataFiles).hasSize(2);
        List<DeleteFile> deleteFiles = this.deleteFiles(table);
        Assertions.assertThat(deleteFiles).hasSize(2);
        List<Object[]> expectedRecords = this.records(table);
        List<Object[]> expectedDeletes = this.deleteRecords(table);
        Assertions.assertThat(expectedRecords).hasSize(2);
        Assertions.assertThat(expectedDeletes).hasSize(2);
        RewritePositionDeleteFiles.Result result = ((RewritePositionDeleteFilesSparkAction)((RewritePositionDeleteFilesSparkAction)SparkActions.get((SparkSession)spark).rewritePositionDeletes(table).option("rewrite-all", "true")).option("target-file-size-bytes", Long.toString(0x7FFFFFFFFFFFFFFEL))).execute();
        List<DeleteFile> newDeleteFiles = this.deleteFiles(table);
        Assertions.assertThat(newDeleteFiles).hasSize(2);
        this.assertNotContains(deleteFiles, newDeleteFiles);
        this.assertLocallySorted(newDeleteFiles);
        this.checkResult(result, deleteFiles, newDeleteFiles, 2);
        this.checkSequenceNumbers(table, deleteFiles, newDeleteFiles);
        List<Object[]> actualRecords = this.records(table);
        List<Object[]> actualDeletes = this.deleteRecords(table);
        this.assertEquals("Rows must match", expectedRecords, actualRecords);
        this.assertEquals("Position deletes must match", expectedDeletes, actualDeletes);
    }

    private Table createTablePartitioned(int partitions, int files, int numRecords) {
        PartitionSpec spec = PartitionSpec.builderFor((Schema)SCHEMA).identity("c1").build();
        Table table = this.validationCatalog.createTable(TableIdentifier.of((String[])new String[]{"default", TABLE_NAME}), SCHEMA, spec, this.tableProperties());
        this.writeRecords(table, files, numRecords, partitions);
        return table;
    }

    private Table createTableUnpartitioned(int files, int numRecords) {
        Table table = this.validationCatalog.createTable(TableIdentifier.of((String[])new String[]{"default", TABLE_NAME}), SCHEMA, PartitionSpec.unpartitioned(), this.tableProperties());
        this.writeRecords(table, files, numRecords);
        return table;
    }

    private Map<String, String> tableProperties() {
        return ImmutableMap.of((Object)"write.metadata.metrics.default", (Object)"full", (Object)"format-version", (Object)"2", (Object)"write.format.default", (Object)this.format.toString());
    }

    private void writeRecords(Table table, int files, int numRecords) {
        this.writeRecords(table, files, numRecords, 1);
    }

    private void writeRecords(Table table, int files, int numRecords, int numPartitions) {
        this.writeRecordsWithPartitions(table, files, numRecords, IntStream.range(0, numPartitions).mapToObj(ImmutableList::of).collect(Collectors.toList()));
    }

    private void writeRecordsWithPartitions(Table table, int files, int numRecords, List<List<Integer>> partitions) {
        int partitionTypeSize = table.spec().partitionType().fields().size();
        Assert.assertTrue((String)"This method currently supports only two columns as partition columns", (partitionTypeSize <= 2 ? 1 : 0) != 0);
        BiFunction<Integer, List, ThreeColumnRecord> recordFunction = (i, partValues) -> {
            switch (partitionTypeSize) {
                case 0: {
                    return new ThreeColumnRecord((Integer)i, String.valueOf(i), String.valueOf(i));
                }
                case 1: {
                    return new ThreeColumnRecord((Integer)partValues.get(0), String.valueOf(i), String.valueOf(i));
                }
                case 2: {
                    return new ThreeColumnRecord((Integer)partValues.get(0), String.valueOf(partValues.get(1)), String.valueOf(i));
                }
            }
            throw new ValidationException("This method currently supports only two columns as partition columns", new Object[0]);
        };
        List records = partitions.stream().flatMap(partition -> IntStream.range(0, numRecords).mapToObj(i -> (ThreeColumnRecord)recordFunction.apply(i, (List)partition))).collect(Collectors.toList());
        spark.createDataFrame(records, ThreeColumnRecord.class).repartition(files).write().format("iceberg").mode("append").save(this.name(table));
        table.refresh();
    }

    private void writeNewSchemaRecords(Table table, int files, int numRecords, int startingPartition, int partitions) {
        List records = IntStream.range(startingPartition, startingPartition + partitions).boxed().flatMap(partition -> IntStream.range(0, numRecords).mapToObj(i -> new FourColumnRecord((Integer)partition, String.valueOf(i), String.valueOf(i), String.valueOf(i)))).collect(Collectors.toList());
        spark.createDataFrame(records, FourColumnRecord.class).repartition(files).write().format("iceberg").mode("append").save(this.name(table));
    }

    private List<Object[]> records(Table table) {
        return this.rowsToJava(spark.read().format("iceberg").load(this.name(table)).sort("c1", new String[]{"c2", "c3"}).collectAsList());
    }

    private List<Object[]> deleteRecords(Table table) {
        String[] additionalFields = table.spec().isUnpartitioned() ? new String[]{"pos", "row"} : new String[]{"pos", "row", "partition", "spec_id"};
        return this.rowsToJava(spark.read().format("iceberg").load(this.name(table) + ".position_deletes").select("file_path", additionalFields).sort("file_path", new String[]{"pos"}).collectAsList());
    }

    private void writePosDeletesForFiles(Table table, int deleteFilesPerPartition, int deletesPerDataFile, List<DataFile> files) throws IOException {
        this.writePosDeletesForFiles(table, deleteFilesPerPartition, deletesPerDataFile, files, false);
    }

    private void writePosDeletesForFiles(Table table, int deleteFilesPerPartition, int deletesPerDataFile, List<DataFile> files, boolean transactional) throws IOException {
        Map<StructLike, List<DataFile>> filesByPartition = files.stream().collect(Collectors.groupingBy(ContentFile::partition));
        ArrayList deleteFiles = Lists.newArrayListWithCapacity((int)(deleteFilesPerPartition * filesByPartition.size()));
        for (Map.Entry<StructLike, List<DataFile>> filesByPartitionEntry : filesByPartition.entrySet()) {
            StructLike partition = filesByPartitionEntry.getKey();
            List<DataFile> partitionFiles = filesByPartitionEntry.getValue();
            int deletesForPartition = partitionFiles.size() * deletesPerDataFile;
            Assert.assertEquals((String)"Number of delete files per partition should be evenly divisible by requested deletes per data file times number of data files in this partition", (long)0L, (long)(deletesForPartition % deleteFilesPerPartition));
            int deleteFileSize = deletesForPartition / deleteFilesPerPartition;
            int counter = 0;
            ArrayList deletes = Lists.newArrayList();
            for (DataFile partitionFile : partitionFiles) {
                for (int deletePos = 0; deletePos < deletesPerDataFile; ++deletePos) {
                    deletes.add(Pair.of((Object)partitionFile.path(), (Object)deletePos));
                    if (++counter != deleteFileSize) continue;
                    OutputFile output = Files.localOutput((File)this.temp.newFile());
                    deleteFiles.add((DeleteFile)FileHelpers.writeDeleteFile((Table)table, (OutputFile)output, (StructLike)partition, (List)deletes).first());
                    counter = 0;
                    deletes.clear();
                }
            }
        }
        if (transactional) {
            RowDelta rowDelta = table.newRowDelta();
            deleteFiles.forEach(arg_0 -> ((RowDelta)rowDelta).addDeletes(arg_0));
            rowDelta.commit();
        } else {
            deleteFiles.forEach(deleteFile -> {
                RowDelta rowDelta = table.newRowDelta();
                rowDelta.addDeletes(deleteFile);
                rowDelta.commit();
            });
        }
    }

    private List<DeleteFile> deleteFiles(Table table) {
        Table deletesTable = MetadataTableUtils.createMetadataTableInstance((Table)table, (MetadataTableType)MetadataTableType.POSITION_DELETES);
        CloseableIterable tasks = deletesTable.newBatchScan().planFiles();
        return Lists.newArrayList((Iterable)CloseableIterable.transform((CloseableIterable)tasks, t -> (DeleteFile)((PositionDeletesScanTask)t).file()));
    }

    private <T extends ContentFile<?>> List<T> except(List<T> first, List<T> second) {
        Set secondPaths = second.stream().map(f -> f.path().toString()).collect(Collectors.toSet());
        return first.stream().filter(f -> !secondPaths.contains(f.path().toString())).collect(Collectors.toList());
    }

    private void assertNotContains(List<DeleteFile> original, List<DeleteFile> rewritten) {
        Set originalPaths = original.stream().map(f -> f.path().toString()).collect(Collectors.toSet());
        Set rewrittenPaths = rewritten.stream().map(f -> f.path().toString()).collect(Collectors.toSet());
        rewrittenPaths.retainAll(originalPaths);
        Assert.assertEquals((long)0L, (long)rewrittenPaths.size());
    }

    private void assertLocallySorted(List<DeleteFile> deleteFiles) {
        for (DeleteFile deleteFile : deleteFiles) {
            Dataset deletes = spark.read().format("iceberg").load("default.test_table.position_deletes");
            deletes.filter(deletes.col("delete_file_path").equalTo((Object)deleteFile.path().toString()));
            List rows = deletes.collectAsList();
            Assert.assertFalse((String)"Empty delete file found", (boolean)rows.isEmpty());
            int lastPos = 0;
            String lastPath = "";
            for (Row row : rows) {
                String path = (String)row.getAs("file_path");
                long pos = (Long)row.getAs("pos");
                if (path.compareTo(lastPath) < 0) {
                    Assert.fail((String)String.format("File_path not sorted, Found %s after %s", path, lastPath));
                    continue;
                }
                if (!path.equals(lastPath)) continue;
                Assert.assertTrue((String)"Pos not sorted", (pos >= (long)lastPos ? 1 : 0) != 0);
            }
        }
    }

    private String name(Table table) {
        String[] splits = table.name().split("\\.");
        Assert.assertEquals((long)3L, (long)splits.length);
        return String.format("%s.%s", splits[1], splits[2]);
    }

    private long size(List<DeleteFile> deleteFiles) {
        return deleteFiles.stream().mapToLong(ContentFile::fileSizeInBytes).sum();
    }

    private List<Object[]> filterDeletes(List<Object[]> deletes, List<?> ... partitionValues) {
        Stream<Object[]> matches = deletes.stream().filter(r -> {
            Object[] partition = (Object[])r[3];
            return Arrays.stream(partitionValues).map(partitionValue -> this.match(partition, (List<?>)partitionValue)).reduce((a, b) -> a != false || b != false).get();
        });
        return this.sorted(matches).collect(Collectors.toList());
    }

    private boolean match(Object[] partition, List<?> expectedPartition) {
        return IntStream.range(0, expectedPartition.size()).mapToObj(j -> partition[j] == expectedPartition.get(j)).reduce((a, b) -> a != false && b != false).get();
    }

    private Stream<Object[]> sorted(Stream<Object[]> deletes) {
        return deletes.sorted((a, b) -> {
            String aFilePath = (String)a[0];
            String bFilePath = (String)b[0];
            int filePathCompare = aFilePath.compareTo(bFilePath);
            if (filePathCompare != 0) {
                return filePathCompare;
            }
            long aPos = (Long)a[1];
            long bPos = (Long)b[1];
            return Long.compare(aPos, bPos);
        });
    }

    private List<DeleteFile> filterFiles(Table table, List<DeleteFile> files, List<?> ... partitionValues) {
        List partitionTypes = table.specs().values().stream().map(PartitionSpec::partitionType).collect(Collectors.toList());
        List partitionDatas = Arrays.stream(partitionValues).map(partitionValue -> {
            Types.StructType thisType = partitionTypes.stream().filter(f -> f.fields().size() == partitionValue.size()).findFirst().get();
            PartitionData partition = new PartitionData(thisType);
            for (int i = 0; i < partitionValue.size(); ++i) {
                partition.set(i, partitionValue.get(i));
            }
            return partition;
        }).collect(Collectors.toList());
        return files.stream().filter(f -> partitionDatas.stream().anyMatch(data -> f.partition().equals(data))).collect(Collectors.toList());
    }

    private void checkResult(RewritePositionDeleteFiles.Result result, List<DeleteFile> rewrittenDeletes, List<DeleteFile> newDeletes, int expectedGroups) {
        Assert.assertEquals((String)"Expected rewritten delete file count does not match", (long)rewrittenDeletes.size(), (long)result.rewrittenDeleteFilesCount());
        Assert.assertEquals((String)"Expected new delete file count does not match", (long)newDeletes.size(), (long)result.addedDeleteFilesCount());
        Assert.assertEquals((String)"Expected rewritten delete byte count does not match", (long)this.size(rewrittenDeletes), (long)result.rewrittenBytesCount());
        Assert.assertEquals((String)"Expected new delete byte count does not match", (long)this.size(newDeletes), (long)result.addedBytesCount());
        Assert.assertEquals((String)"Expected rewrite group count does not match", (long)expectedGroups, (long)result.rewriteResults().size());
        Assert.assertEquals((String)"Expected rewritten delete file count in all groups to match", (long)rewrittenDeletes.size(), (long)result.rewriteResults().stream().mapToInt(RewritePositionDeleteFiles.FileGroupRewriteResult::rewrittenDeleteFilesCount).sum());
        Assert.assertEquals((String)"Expected added delete file count in all groups to match", (long)newDeletes.size(), (long)result.rewriteResults().stream().mapToInt(RewritePositionDeleteFiles.FileGroupRewriteResult::addedDeleteFilesCount).sum());
        Assert.assertEquals((String)"Expected rewritten delete bytes in all groups to match", (long)this.size(rewrittenDeletes), (long)result.rewriteResults().stream().mapToLong(RewritePositionDeleteFiles.FileGroupRewriteResult::rewrittenBytesCount).sum());
        Assert.assertEquals((String)"Expected added delete bytes in all groups to match", (long)this.size(newDeletes), (long)result.rewriteResults().stream().mapToLong(RewritePositionDeleteFiles.FileGroupRewriteResult::addedBytesCount).sum());
    }

    private void checkSequenceNumbers(Table table, List<DeleteFile> rewrittenDeletes, List<DeleteFile> addedDeletes) {
        StructLikeMap<List<DeleteFile>> rewrittenFilesPerPartition = this.groupPerPartition(table, rewrittenDeletes);
        StructLikeMap<List<DeleteFile>> addedFilesPerPartition = this.groupPerPartition(table, addedDeletes);
        for (StructLike partition : rewrittenFilesPerPartition.keySet()) {
            Long maxRewrittenSeq = ((List)rewrittenFilesPerPartition.get((Object)partition)).stream().mapToLong(ContentFile::dataSequenceNumber).max().getAsLong();
            List addedPartitionFiles = (List)addedFilesPerPartition.get((Object)partition);
            if (addedPartitionFiles == null) continue;
            addedPartitionFiles.forEach(d -> Assert.assertEquals((String)"Sequence number should be max of rewritten set", (Object)d.dataSequenceNumber(), (Object)maxRewrittenSeq));
        }
    }

    private StructLikeMap<List<DeleteFile>> groupPerPartition(Table table, List<DeleteFile> deleteFiles) {
        StructLikeMap result = StructLikeMap.create((Types.StructType)Partitioning.partitionType((Table)table));
        for (DeleteFile deleteFile : deleteFiles) {
            StructLike partition = deleteFile.partition();
            List partitionFiles = (List)result.get((Object)partition);
            if (partitionFiles == null) {
                partitionFiles = Lists.newArrayList();
            }
            partitionFiles.add(deleteFile);
            result.put(partition, (Object)partitionFiles);
        }
        return result;
    }
}

