/*
 * Decompiled with CFR 0.152.
 */
package org.apache.iceberg.spark.actions;

import java.io.File;
import java.io.IOException;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.function.BiFunction;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.Stream;
import org.apache.iceberg.ContentFile;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.DeleteFile;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.Files;
import org.apache.iceberg.MetadataTableType;
import org.apache.iceberg.MetadataTableUtils;
import org.apache.iceberg.Parameter;
import org.apache.iceberg.Parameters;
import org.apache.iceberg.PartitionData;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Partitioning;
import org.apache.iceberg.PositionDeletesScanTask;
import org.apache.iceberg.RowDelta;
import org.apache.iceberg.Schema;
import org.apache.iceberg.StructLike;
import org.apache.iceberg.Table;
import org.apache.iceberg.actions.RewritePositionDeleteFiles;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.data.FileHelpers;
import org.apache.iceberg.deletes.DeleteGranularity;
import org.apache.iceberg.exceptions.ValidationException;
import org.apache.iceberg.expressions.Expression;
import org.apache.iceberg.expressions.Expressions;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.io.OutputFile;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.spark.CatalogTestBase;
import org.apache.iceberg.spark.SparkCatalogConfig;
import org.apache.iceberg.spark.actions.RewriteDataFilesSparkAction;
import org.apache.iceberg.spark.actions.RewritePositionDeleteFilesSparkAction;
import org.apache.iceberg.spark.actions.SparkActions;
import org.apache.iceberg.spark.data.TestHelpers;
import org.apache.iceberg.spark.source.FourColumnRecord;
import org.apache.iceberg.spark.source.ThreeColumnRecord;
import org.apache.iceberg.types.Type;
import org.apache.iceberg.types.Types;
import org.apache.iceberg.util.Pair;
import org.apache.iceberg.util.StructLikeMap;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.functions;
import org.apache.spark.sql.internal.SQLConf;
import org.apache.spark.sql.types.StructType;
import org.assertj.core.api.AbstractIntegerAssert;
import org.assertj.core.api.AbstractLongAssert;
import org.assertj.core.api.AbstractThrowableAssert;
import org.assertj.core.api.Assertions;
import org.assertj.core.api.ListAssert;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.TestTemplate;
import org.junit.jupiter.api.io.TempDir;

public class TestRewritePositionDeleteFilesAction
extends CatalogTestBase {
    private static final String TABLE_NAME = "test_table";
    private static final Schema SCHEMA = new Schema(new Types.NestedField[]{Types.NestedField.optional((int)1, (String)"c1", (Type)Types.IntegerType.get()), Types.NestedField.optional((int)2, (String)"c2", (Type)Types.StringType.get()), Types.NestedField.optional((int)3, (String)"c3", (Type)Types.StringType.get())});
    private static final Map<String, String> CATALOG_PROPS = ImmutableMap.of((Object)"type", (Object)"hive", (Object)"default-namespace", (Object)"default", (Object)"cache-enabled", (Object)"false");
    private static final int SCALE = 4000;
    private static final int DELETES_SCALE = 1000;
    @TempDir
    private Path temp;
    @Parameter(index=3)
    private FileFormat format;

    @Parameters(name="catalogName = {0}, implementation = {1}, config = {2}, fileFormat = {3}")
    public static Object[][] parameters() {
        return new Object[][]{{SparkCatalogConfig.HIVE.catalogName(), SparkCatalogConfig.HIVE.implementation(), CATALOG_PROPS, FileFormat.PARQUET}};
    }

    @AfterEach
    public void cleanup() {
        this.validationCatalog.dropTable(TableIdentifier.of((String[])new String[]{"default", TABLE_NAME}));
    }

    @TestTemplate
    public void testEmptyTable() {
        PartitionSpec spec = PartitionSpec.builderFor((Schema)SCHEMA).identity("c1").build();
        Table table = this.validationCatalog.createTable(TableIdentifier.of((String[])new String[]{"default", TABLE_NAME}), SCHEMA, spec, this.tableProperties());
        RewritePositionDeleteFiles.Result result = SparkActions.get((SparkSession)spark).rewritePositionDeletes(table).execute();
        ((AbstractIntegerAssert)Assertions.assertThat((int)result.rewrittenDeleteFilesCount()).as("No rewritten delete files", new Object[0])).isZero();
        ((AbstractIntegerAssert)Assertions.assertThat((int)result.addedDeleteFilesCount()).as("No added delete files", new Object[0])).isZero();
    }

    @TestTemplate
    public void testFileGranularity() throws Exception {
        this.checkDeleteGranularity(DeleteGranularity.FILE);
    }

    @TestTemplate
    public void testPartitionGranularity() throws Exception {
        this.checkDeleteGranularity(DeleteGranularity.PARTITION);
    }

    private void checkDeleteGranularity(DeleteGranularity deleteGranularity) throws Exception {
        Table table = this.createTableUnpartitioned(2, 4000);
        table.updateProperties().set("write.delete.granularity", deleteGranularity.toString()).commit();
        List<DataFile> dataFiles = TestHelpers.dataFiles(table);
        Assertions.assertThat(dataFiles).hasSize(2);
        this.writePosDeletesForFiles(table, 2, 1000, dataFiles);
        List<DeleteFile> deleteFiles = this.deleteFiles(table);
        Assertions.assertThat(deleteFiles).hasSize(2);
        RewritePositionDeleteFiles.Result result = ((RewritePositionDeleteFilesSparkAction)SparkActions.get((SparkSession)spark).rewritePositionDeletes(table).option("rewrite-all", "true")).execute();
        int expectedDeleteFilesCount = deleteGranularity == DeleteGranularity.FILE ? 2 : 1;
        Assertions.assertThat((int)result.addedDeleteFilesCount()).isEqualTo(expectedDeleteFilesCount);
    }

    @TestTemplate
    public void testUnpartitioned() throws Exception {
        Table table = this.createTableUnpartitioned(2, 4000);
        List<DataFile> dataFiles = TestHelpers.dataFiles(table);
        this.writePosDeletesForFiles(table, 2, 1000, dataFiles);
        Assertions.assertThat(dataFiles).hasSize(2);
        List<DeleteFile> deleteFiles = this.deleteFiles(table);
        Assertions.assertThat(deleteFiles).hasSize(2);
        List<Object[]> expectedRecords = this.records(table);
        List<Object[]> expectedDeletes = this.deleteRecords(table);
        Assertions.assertThat(expectedRecords).hasSize(2000);
        Assertions.assertThat(expectedDeletes).hasSize(2000);
        RewritePositionDeleteFiles.Result result = ((RewritePositionDeleteFilesSparkAction)SparkActions.get((SparkSession)spark).rewritePositionDeletes(table).option("rewrite-all", "true")).execute();
        List<DeleteFile> newDeleteFiles = this.deleteFiles(table);
        ((ListAssert)Assertions.assertThat(newDeleteFiles).as("New delete files", new Object[0])).hasSize(1);
        this.assertLocallySorted(newDeleteFiles);
        this.assertNotContains(deleteFiles, newDeleteFiles);
        this.checkResult(result, deleteFiles, newDeleteFiles, 1);
        this.checkSequenceNumbers(table, deleteFiles, newDeleteFiles);
        List<Object[]> actualRecords = this.records(table);
        List<Object[]> actualDeletes = this.deleteRecords(table);
        this.assertEquals("Rows", expectedRecords, actualRecords);
        this.assertEquals("Position deletes", expectedDeletes, actualDeletes);
    }

    @TestTemplate
    public void testRewriteAll() throws Exception {
        Table table = this.createTablePartitioned(4, 2, 4000);
        List<DataFile> dataFiles = TestHelpers.dataFiles(table);
        this.writePosDeletesForFiles(table, 2, 1000, dataFiles);
        Assertions.assertThat(dataFiles).hasSize(4);
        List<DeleteFile> deleteFiles = this.deleteFiles(table);
        Assertions.assertThat(deleteFiles).hasSize(8);
        List<Object[]> expectedRecords = this.records(table);
        List<Object[]> expectedDeletes = this.deleteRecords(table);
        Assertions.assertThat(expectedRecords).hasSize(12000);
        Assertions.assertThat(expectedDeletes).hasSize(4000);
        RewritePositionDeleteFiles.Result result = ((RewritePositionDeleteFilesSparkAction)((RewritePositionDeleteFilesSparkAction)SparkActions.get((SparkSession)spark).rewritePositionDeletes(table).option("rewrite-all", "true")).option("target-file-size-bytes", Long.toString(0x7FFFFFFFFFFFFFFEL))).execute();
        List<DeleteFile> newDeleteFiles = this.deleteFiles(table);
        Assertions.assertThat(newDeleteFiles).hasSize(4);
        this.assertNotContains(deleteFiles, newDeleteFiles);
        this.assertLocallySorted(newDeleteFiles);
        this.checkResult(result, deleteFiles, newDeleteFiles, 4);
        this.checkSequenceNumbers(table, deleteFiles, newDeleteFiles);
        List<Object[]> actualRecords = this.records(table);
        List<Object[]> actualDeletes = this.deleteRecords(table);
        this.assertEquals("Rows", expectedRecords, actualRecords);
        this.assertEquals("Position deletes", expectedDeletes, actualDeletes);
    }

    @TestTemplate
    public void testRewriteFilter() throws Exception {
        Table table = this.createTablePartitioned(4, 2, 4000);
        table.refresh();
        List<DataFile> dataFiles = TestHelpers.dataFiles(table);
        this.writePosDeletesForFiles(table, 2, 1000, dataFiles);
        Assertions.assertThat(dataFiles).hasSize(4);
        List<DeleteFile> deleteFiles = this.deleteFiles(table);
        Assertions.assertThat(deleteFiles).hasSize(8);
        table.refresh();
        List<Object[]> expectedRecords = this.records(table);
        List<Object[]> expectedDeletes = this.deleteRecords(table);
        Assertions.assertThat(expectedRecords).hasSize(12000);
        Assertions.assertThat(expectedDeletes).hasSize(4000);
        Expression filter = Expressions.and((Expression)Expressions.greaterThan((String)"c3", (Object)"0"), (Expression)Expressions.or((Expression)Expressions.equal((String)"C1", (Object)1), (Expression)Expressions.equal((String)"C1", (Object)2)));
        RewritePositionDeleteFiles.Result result = ((RewritePositionDeleteFilesSparkAction)((RewritePositionDeleteFilesSparkAction)SparkActions.get((SparkSession)spark).rewritePositionDeletes(table).filter(filter).option("rewrite-all", "true")).option("target-file-size-bytes", Long.toString(0x7FFFFFFFFFFFFFFEL))).execute();
        List<DeleteFile> newDeleteFiles = this.except(this.deleteFiles(table), deleteFiles);
        ((ListAssert)Assertions.assertThat(newDeleteFiles).as("Delete files", new Object[0])).hasSize(2);
        List<DeleteFile> expectedRewrittenFiles = this.filterFiles(table, deleteFiles, new List[]{ImmutableList.of((Object)1), ImmutableList.of((Object)2)});
        this.assertLocallySorted(newDeleteFiles);
        this.checkResult(result, expectedRewrittenFiles, newDeleteFiles, 2);
        List<Object[]> actualRecords = this.records(table);
        List<Object[]> actualDeletes = this.deleteRecords(table);
        this.assertEquals("Rows", expectedRecords, actualRecords);
        this.assertEquals("Position deletes", expectedDeletes, actualDeletes);
        this.withSQLConf((Map<String, String>)ImmutableMap.of((Object)SQLConf.CASE_SENSITIVE().key(), (Object)"true"), () -> ((AbstractThrowableAssert)Assertions.assertThatThrownBy(() -> SparkActions.get((SparkSession)spark).rewritePositionDeletes(table).filter(filter).execute()).isInstanceOf(ValidationException.class)).hasMessageContaining("Cannot find field 'C1' in struct"));
    }

    @TestTemplate
    public void testRewriteToSmallerTarget() throws Exception {
        Table table = this.createTablePartitioned(4, 2, 4000);
        List<DataFile> dataFiles = TestHelpers.dataFiles(table);
        this.writePosDeletesForFiles(table, 2, 1000, dataFiles);
        Assertions.assertThat(dataFiles).hasSize(4);
        List<Object[]> expectedRecords = this.records(table);
        List<Object[]> expectedDeletes = this.deleteRecords(table);
        Assertions.assertThat(expectedRecords).hasSize(12000);
        Assertions.assertThat(expectedDeletes).hasSize(4000);
        List<DeleteFile> deleteFiles = this.deleteFiles(table);
        Assertions.assertThat(deleteFiles).hasSize(8);
        long avgSize = this.size(deleteFiles) / (long)deleteFiles.size();
        RewritePositionDeleteFiles.Result result = ((RewritePositionDeleteFilesSparkAction)((RewritePositionDeleteFilesSparkAction)SparkActions.get((SparkSession)spark).rewritePositionDeletes(table).option("rewrite-all", "true")).option("target-file-size-bytes", String.valueOf(avgSize / 2L))).execute();
        List<DeleteFile> newDeleteFiles = this.deleteFiles(table);
        ((ListAssert)Assertions.assertThat(newDeleteFiles).as("New delete files", new Object[0])).hasSize(8);
        this.assertNotContains(deleteFiles, newDeleteFiles);
        this.assertLocallySorted(newDeleteFiles);
        this.checkResult(result, deleteFiles, newDeleteFiles, 4);
        this.checkSequenceNumbers(table, deleteFiles, newDeleteFiles);
        List<Object[]> actualRecords = this.records(table);
        List<Object[]> actualDeletes = this.deleteRecords(table);
        this.assertEquals("Rows", expectedRecords, actualRecords);
        this.assertEquals("Position deletes", expectedDeletes, actualDeletes);
    }

    @TestTemplate
    public void testRemoveDanglingDeletes() throws Exception {
        Table table = this.createTablePartitioned(4, 2, 4000);
        List<DataFile> dataFiles = TestHelpers.dataFiles(table);
        this.writePosDeletesForFiles(table, 2, 1000, dataFiles, true);
        Assertions.assertThat(dataFiles).hasSize(4);
        List<DeleteFile> deleteFiles = this.deleteFiles(table);
        Assertions.assertThat(deleteFiles).hasSize(8);
        List<Object[]> expectedRecords = this.records(table);
        List<Object[]> expectedDeletes = this.deleteRecords(table);
        Assertions.assertThat(expectedRecords).hasSize(12000);
        Assertions.assertThat(expectedDeletes).hasSize(4000);
        ((RewriteDataFilesSparkAction)SparkActions.get((SparkSession)spark).rewriteDataFiles(table).option("rewrite-all", "true")).execute();
        RewritePositionDeleteFiles.Result result = ((RewritePositionDeleteFilesSparkAction)SparkActions.get((SparkSession)spark).rewritePositionDeletes(table).option("rewrite-all", "true")).execute();
        List<DeleteFile> newDeleteFiles = this.deleteFiles(table);
        ((ListAssert)Assertions.assertThat(newDeleteFiles).as("New delete files", new Object[0])).isEmpty();
        this.assertNotContains(deleteFiles, newDeleteFiles);
        this.assertLocallySorted(newDeleteFiles);
        this.checkResult(result, deleteFiles, newDeleteFiles, 4);
        this.checkSequenceNumbers(table, deleteFiles, newDeleteFiles);
        List<Object[]> actualRecords = this.records(table);
        List<Object[]> actualDeletes = this.deleteRecords(table);
        this.assertEquals("Rows", expectedRecords, actualRecords);
        ((ListAssert)Assertions.assertThat(actualDeletes).as("New position deletes", new Object[0])).isEmpty();
    }

    @TestTemplate
    public void testSomePartitionsDanglingDeletes() throws Exception {
        Table table = this.createTablePartitioned(4, 2, 4000);
        List<DataFile> dataFiles = TestHelpers.dataFiles(table);
        this.writePosDeletesForFiles(table, 2, 1000, dataFiles);
        Assertions.assertThat(dataFiles).hasSize(4);
        List<DeleteFile> deleteFiles = this.deleteFiles(table);
        Assertions.assertThat(deleteFiles).hasSize(8);
        List<Object[]> expectedRecords = this.records(table);
        List<Object> expectedDeletes = this.deleteRecords(table);
        Assertions.assertThat(expectedRecords).hasSize(12000);
        Assertions.assertThat(expectedDeletes).hasSize(4000);
        Expression filter = Expressions.or((Expression)Expressions.equal((String)"c1", (Object)0), (Expression)Expressions.equal((String)"c1", (Object)1));
        ((RewriteDataFilesSparkAction)SparkActions.get((SparkSession)spark).rewriteDataFiles(table).filter(filter).option("rewrite-all", "true")).execute();
        RewritePositionDeleteFiles.Result result = ((RewritePositionDeleteFilesSparkAction)SparkActions.get((SparkSession)spark).rewritePositionDeletes(table).option("rewrite-all", "true")).execute();
        List<DeleteFile> newDeleteFiles = this.deleteFiles(table);
        ((ListAssert)Assertions.assertThat(newDeleteFiles).as("New delete files", new Object[0])).hasSize(2);
        this.assertNotContains(deleteFiles, newDeleteFiles);
        this.assertLocallySorted(newDeleteFiles);
        this.checkResult(result, deleteFiles, newDeleteFiles, 4);
        this.checkSequenceNumbers(table, deleteFiles, newDeleteFiles);
        expectedDeletes = expectedDeletes.stream().filter(r -> {
            Object[] partition = (Object[])r[3];
            return partition[0] == Integer.valueOf(2) || partition[0] == Integer.valueOf(3);
        }).collect(Collectors.toList());
        List<Object[]> actualRecords = this.records(table);
        List<Object[]> actualDeletes = this.deleteRecords(table);
        this.assertEquals("Rows", expectedRecords, actualRecords);
        this.assertEquals("Position deletes", expectedDeletes, actualDeletes);
    }

    @TestTemplate
    public void testRewriteFilterRemoveDangling() throws Exception {
        Table table = this.createTablePartitioned(4, 2, 4000);
        table.refresh();
        List<DataFile> dataFiles = TestHelpers.dataFiles(table);
        this.writePosDeletesForFiles(table, 2, 1000, dataFiles, true);
        Assertions.assertThat(dataFiles).hasSize(4);
        List<DeleteFile> deleteFiles = this.deleteFiles(table);
        Assertions.assertThat(deleteFiles).hasSize(8);
        table.refresh();
        List<Object[]> expectedRecords = this.records(table);
        List<Object[]> expectedDeletes = this.deleteRecords(table);
        Assertions.assertThat(expectedRecords).hasSize(12000);
        Assertions.assertThat(expectedDeletes).hasSize(4000);
        ((RewriteDataFilesSparkAction)SparkActions.get((SparkSession)spark).rewriteDataFiles(table).option("rewrite-all", "true")).execute();
        Expression filter = Expressions.or((Expression)Expressions.equal((String)"c1", (Object)0), (Expression)Expressions.equal((String)"c1", (Object)1));
        RewritePositionDeleteFiles.Result result = ((RewritePositionDeleteFilesSparkAction)((RewritePositionDeleteFilesSparkAction)SparkActions.get((SparkSession)spark).rewritePositionDeletes(table).filter(filter).option("rewrite-all", "true")).option("target-file-size-bytes", Long.toString(0x7FFFFFFFFFFFFFFEL))).execute();
        List<DeleteFile> newDeleteFiles = this.except(this.deleteFiles(table), deleteFiles);
        ((ListAssert)Assertions.assertThat(newDeleteFiles).as("New delete files", new Object[0])).isEmpty();
        List<DeleteFile> expectedRewrittenFiles = this.filterFiles(table, deleteFiles, new List[]{ImmutableList.of((Object)0), ImmutableList.of((Object)1)});
        this.checkResult(result, expectedRewrittenFiles, newDeleteFiles, 2);
        List<Object[]> actualRecords = this.records(table);
        List<Object[]> allDeletes = this.deleteRecords(table);
        List<Object[]> expectedDeletesFiltered = this.filterDeletes(expectedDeletes, new List[]{ImmutableList.of((Object)2), ImmutableList.of((Object)3)});
        this.assertEquals("Rows", expectedRecords, actualRecords);
        this.assertEquals("Position deletes", expectedDeletesFiltered, allDeletes);
    }

    @TestTemplate
    public void testPartitionEvolutionAdd() throws Exception {
        Table table = this.createTableUnpartitioned(2, 4000);
        List<DataFile> unpartitionedDataFiles = TestHelpers.dataFiles(table);
        this.writePosDeletesForFiles(table, 2, 1000, unpartitionedDataFiles);
        Assertions.assertThat(unpartitionedDataFiles).hasSize(2);
        List<DeleteFile> unpartitionedDeleteFiles = this.deleteFiles(table);
        Assertions.assertThat(unpartitionedDeleteFiles).hasSize(2);
        List<Object[]> expectedUnpartitionedDeletes = this.deleteRecords(table);
        List<Object[]> expectedUnpartitionedRecords = this.records(table);
        Assertions.assertThat(expectedUnpartitionedRecords).hasSize(2000);
        Assertions.assertThat(expectedUnpartitionedDeletes).hasSize(2000);
        table.updateSpec().addField("c1").commit();
        this.writeRecords(table, 2, 4000, 2);
        List<DataFile> partitionedDataFiles = this.except(TestHelpers.dataFiles(table), unpartitionedDataFiles);
        this.writePosDeletesForFiles(table, 2, 1000, partitionedDataFiles);
        Assertions.assertThat(partitionedDataFiles).hasSize(2);
        List<DeleteFile> partitionedDeleteFiles = this.except(this.deleteFiles(table), unpartitionedDeleteFiles);
        Assertions.assertThat(partitionedDeleteFiles).hasSize(4);
        List<Object[]> expectedDeletes = this.deleteRecords(table);
        List<Object[]> expectedRecords = this.records(table);
        Assertions.assertThat(expectedDeletes).hasSize(4000);
        Assertions.assertThat(expectedRecords).hasSize(8000);
        RewritePositionDeleteFiles.Result result = ((RewritePositionDeleteFilesSparkAction)SparkActions.get((SparkSession)spark).rewritePositionDeletes(table).option("rewrite-all", "true")).execute();
        List<DeleteFile> rewrittenDeleteFiles = Stream.concat(unpartitionedDeleteFiles.stream(), partitionedDeleteFiles.stream()).collect(Collectors.toList());
        List<DeleteFile> newDeleteFiles = this.deleteFiles(table);
        ((ListAssert)Assertions.assertThat(newDeleteFiles).as("New delete files", new Object[0])).hasSize(3);
        this.assertNotContains(rewrittenDeleteFiles, newDeleteFiles);
        this.assertLocallySorted(newDeleteFiles);
        this.checkResult(result, rewrittenDeleteFiles, newDeleteFiles, 3);
        this.checkSequenceNumbers(table, rewrittenDeleteFiles, newDeleteFiles);
        List<Object[]> actualRecords = this.records(table);
        List<Object[]> actualDeletes = this.deleteRecords(table);
        this.assertEquals("Rows", expectedRecords, actualRecords);
        this.assertEquals("Position deletes", expectedDeletes, actualDeletes);
    }

    @TestTemplate
    public void testPartitionEvolutionRemove() throws Exception {
        Table table = this.createTablePartitioned(2, 2, 4000);
        List<DataFile> dataFilesUnpartitioned = TestHelpers.dataFiles(table);
        this.writePosDeletesForFiles(table, 2, 1000, dataFilesUnpartitioned);
        Assertions.assertThat(dataFilesUnpartitioned).hasSize(2);
        List<DeleteFile> deleteFilesUnpartitioned = this.deleteFiles(table);
        Assertions.assertThat(deleteFilesUnpartitioned).hasSize(4);
        table.updateSpec().removeField("c1").commit();
        this.writeRecords(table, 2, 4000);
        List<DataFile> dataFilesPartitioned = this.except(TestHelpers.dataFiles(table), dataFilesUnpartitioned);
        this.writePosDeletesForFiles(table, 2, 1000, dataFilesPartitioned);
        Assertions.assertThat(dataFilesPartitioned).hasSize(2);
        List<DeleteFile> deleteFilesPartitioned = this.except(this.deleteFiles(table), deleteFilesUnpartitioned);
        Assertions.assertThat(deleteFilesPartitioned).hasSize(2);
        List<Object[]> expectedRecords = this.records(table);
        List<Object[]> expectedDeletes = this.deleteRecords(table);
        Assertions.assertThat(expectedDeletes).hasSize(4000);
        Assertions.assertThat(expectedRecords).hasSize(8000);
        List<DeleteFile> expectedRewritten = this.deleteFiles(table);
        Assertions.assertThat(expectedRewritten).hasSize(6);
        RewritePositionDeleteFiles.Result result = ((RewritePositionDeleteFilesSparkAction)SparkActions.get((SparkSession)spark).rewritePositionDeletes(table).option("rewrite-all", "true")).execute();
        List<DeleteFile> newDeleteFiles = this.deleteFiles(table);
        ((ListAssert)Assertions.assertThat(newDeleteFiles).as("New delete files", new Object[0])).hasSize(3);
        this.assertNotContains(expectedRewritten, newDeleteFiles);
        this.assertLocallySorted(newDeleteFiles);
        this.checkResult(result, expectedRewritten, newDeleteFiles, 3);
        this.checkSequenceNumbers(table, expectedRewritten, newDeleteFiles);
        List<Object[]> actualRecords = this.records(table);
        List<Object[]> actualDeletes = this.deleteRecords(table);
        this.assertEquals("Rows", expectedRecords, actualRecords);
        this.assertEquals("Position deletes", expectedDeletes, actualDeletes);
    }

    @TestTemplate
    public void testSchemaEvolution() throws Exception {
        Table table = this.createTablePartitioned(2, 2, 4000);
        List<DataFile> dataFiles = TestHelpers.dataFiles(table);
        this.writePosDeletesForFiles(table, 2, 1000, dataFiles);
        Assertions.assertThat(dataFiles).hasSize(2);
        List<DeleteFile> deleteFiles = this.deleteFiles(table);
        Assertions.assertThat(deleteFiles).hasSize(4);
        table.updateSchema().addColumn("c4", (Type)Types.StringType.get()).commit();
        this.writeNewSchemaRecords(table, 2, 4000, 2, 2);
        int newColId = table.schema().findField("c4").fieldId();
        List<DataFile> newSchemaDataFiles = TestHelpers.dataFiles(table).stream().filter(f -> f.upperBounds().containsKey(newColId)).collect(Collectors.toList());
        this.writePosDeletesForFiles(table, 2, 1000, newSchemaDataFiles);
        List<DeleteFile> newSchemaDeleteFiles = this.except(this.deleteFiles(table), deleteFiles);
        Assertions.assertThat(newSchemaDeleteFiles).hasSize(4);
        table.refresh();
        List<Object[]> expectedDeletes = this.deleteRecords(table);
        List<Object[]> expectedRecords = this.records(table);
        Assertions.assertThat(expectedDeletes).hasSize(4000);
        Assertions.assertThat(expectedRecords).hasSize(12000);
        RewritePositionDeleteFiles.Result result = ((RewritePositionDeleteFilesSparkAction)SparkActions.get((SparkSession)spark).rewritePositionDeletes(table).option("rewrite-all", "true")).execute();
        List<DeleteFile> rewrittenDeleteFiles = Stream.concat(deleteFiles.stream(), newSchemaDeleteFiles.stream()).collect(Collectors.toList());
        List<DeleteFile> newDeleteFiles = this.deleteFiles(table);
        ((ListAssert)Assertions.assertThat(newDeleteFiles).as("New delete files", new Object[0])).hasSize(4);
        this.assertNotContains(rewrittenDeleteFiles, newDeleteFiles);
        this.assertLocallySorted(newDeleteFiles);
        this.checkResult(result, rewrittenDeleteFiles, newDeleteFiles, 4);
        this.checkSequenceNumbers(table, rewrittenDeleteFiles, newDeleteFiles);
        List<Object[]> actualRecords = this.records(table);
        this.assertEquals("Rows", expectedRecords, actualRecords);
    }

    @TestTemplate
    public void testSnapshotProperty() throws Exception {
        Table table = this.createTableUnpartitioned(2, 4000);
        List<DataFile> dataFiles = TestHelpers.dataFiles(table);
        this.writePosDeletesForFiles(table, 2, 1000, dataFiles);
        Assertions.assertThat(dataFiles).hasSize(2);
        List<DeleteFile> deleteFiles = this.deleteFiles(table);
        Assertions.assertThat(deleteFiles).hasSize(2);
        RewritePositionDeleteFiles.Result ignored = ((RewritePositionDeleteFilesSparkAction)((RewritePositionDeleteFilesSparkAction)SparkActions.get((SparkSession)spark).rewritePositionDeletes(table).snapshotProperty("key", "value")).option("rewrite-all", "true")).execute();
        Assertions.assertThat((Map)table.currentSnapshot().summary()).containsAllEntriesOf((Map)ImmutableMap.of((Object)"key", (Object)"value"));
        Object[] commitMetricsKeys = new String[]{"added-delete-files", "added-position-deletes", "changed-partition-count", "removed-delete-files", "removed-position-deletes", "total-data-files", "total-delete-files"};
        Assertions.assertThat((Map)table.currentSnapshot().summary()).containsKeys(commitMetricsKeys);
    }

    @TestTemplate
    public void testRewriteManyColumns() throws Exception {
        ArrayList fields = Lists.newArrayList((Object[])new Types.NestedField[]{Types.NestedField.required((int)0, (String)"id", (Type)Types.LongType.get())});
        List additionalCols = IntStream.range(1, 1010).mapToObj(i -> Types.NestedField.optional((int)i, (String)("c" + i), (Type)Types.StringType.get())).collect(Collectors.toList());
        fields.addAll(additionalCols);
        Schema schema = new Schema((List)fields);
        PartitionSpec spec = PartitionSpec.builderFor((Schema)schema).bucket("id", 2).build();
        Table table = this.validationCatalog.createTable(TableIdentifier.of((String[])new String[]{"default", TABLE_NAME}), schema, spec, this.tableProperties());
        Dataset df = spark.range(4L).withColumns(IntStream.range(1, 1010).boxed().collect(Collectors.toMap(i -> "c" + i, i -> functions.expr((String)"CAST(id as STRING)"))));
        StructType sparkSchema = spark.table(this.name(table)).schema();
        spark.createDataFrame(df.rdd(), sparkSchema).coalesce(1).write().format("iceberg").mode("append").save(this.name(table));
        List<DataFile> dataFiles = TestHelpers.dataFiles(table);
        this.writePosDeletesForFiles(table, 1, 1, dataFiles);
        Assertions.assertThat(dataFiles).hasSize(2);
        List<DeleteFile> deleteFiles = this.deleteFiles(table);
        Assertions.assertThat(deleteFiles).hasSize(2);
        List<Object[]> expectedRecords = this.records(table);
        List<Object[]> expectedDeletes = this.deleteRecords(table);
        Assertions.assertThat(expectedRecords).hasSize(2);
        Assertions.assertThat(expectedDeletes).hasSize(2);
        RewritePositionDeleteFiles.Result result = ((RewritePositionDeleteFilesSparkAction)((RewritePositionDeleteFilesSparkAction)SparkActions.get((SparkSession)spark).rewritePositionDeletes(table).option("rewrite-all", "true")).option("target-file-size-bytes", Long.toString(0x7FFFFFFFFFFFFFFEL))).execute();
        List<DeleteFile> newDeleteFiles = this.deleteFiles(table);
        Assertions.assertThat(newDeleteFiles).hasSize(2);
        this.assertNotContains(deleteFiles, newDeleteFiles);
        this.assertLocallySorted(newDeleteFiles);
        this.checkResult(result, deleteFiles, newDeleteFiles, 2);
        this.checkSequenceNumbers(table, deleteFiles, newDeleteFiles);
        List<Object[]> actualRecords = this.records(table);
        List<Object[]> actualDeletes = this.deleteRecords(table);
        this.assertEquals("Rows", expectedRecords, actualRecords);
        this.assertEquals("Position deletes", expectedDeletes, actualDeletes);
    }

    @TestTemplate
    public void testRewritePositionDeletesForV3TableFails() {
        Table table = this.validationCatalog.createTable(TableIdentifier.of((String[])new String[]{"default", TABLE_NAME}), SCHEMA, PartitionSpec.unpartitioned(), this.tableProperties(3));
        this.writeRecords(table, 2, 4000);
        ((AbstractThrowableAssert)Assertions.assertThatThrownBy(() -> SparkActions.get((SparkSession)spark).rewritePositionDeletes(table).execute()).isInstanceOf(IllegalArgumentException.class)).hasMessage("Cannot rewrite position deletes for V3 table");
    }

    private Table createTablePartitioned(int partitions, int files, int numRecords) {
        PartitionSpec spec = PartitionSpec.builderFor((Schema)SCHEMA).identity("c1").build();
        Table table = this.validationCatalog.createTable(TableIdentifier.of((String[])new String[]{"default", TABLE_NAME}), SCHEMA, spec, this.tableProperties());
        this.writeRecords(table, files, numRecords, partitions);
        return table;
    }

    private Table createTableUnpartitioned(int files, int numRecords) {
        Table table = this.validationCatalog.createTable(TableIdentifier.of((String[])new String[]{"default", TABLE_NAME}), SCHEMA, PartitionSpec.unpartitioned(), this.tableProperties());
        this.writeRecords(table, files, numRecords);
        return table;
    }

    private Map<String, String> tableProperties() {
        return this.tableProperties(2);
    }

    private Map<String, String> tableProperties(int formatVersion) {
        return ImmutableMap.of((Object)"write.metadata.metrics.default", (Object)"full", (Object)"format-version", (Object)String.valueOf(formatVersion), (Object)"write.format.default", (Object)this.format.toString(), (Object)"write.delete.granularity", (Object)DeleteGranularity.PARTITION.toString());
    }

    private void writeRecords(Table table, int files, int numRecords) {
        this.writeRecords(table, files, numRecords, 1);
    }

    private void writeRecords(Table table, int files, int numRecords, int numPartitions) {
        this.writeRecordsWithPartitions(table, files, numRecords, IntStream.range(0, numPartitions).mapToObj(ImmutableList::of).collect(Collectors.toList()));
    }

    private void writeRecordsWithPartitions(Table table, int files, int numRecords, List<List<Integer>> partitions) {
        int partitionTypeSize = table.spec().partitionType().fields().size();
        ((AbstractIntegerAssert)Assertions.assertThat((int)partitionTypeSize).as("This method currently supports only two columns as partition columns", new Object[0])).isLessThanOrEqualTo(2);
        BiFunction<Integer, List, ThreeColumnRecord> recordFunction = (i, partValues) -> {
            switch (partitionTypeSize) {
                case 0: {
                    return new ThreeColumnRecord((Integer)i, String.valueOf(i), String.valueOf(i));
                }
                case 1: {
                    return new ThreeColumnRecord((Integer)partValues.get(0), String.valueOf(i), String.valueOf(i));
                }
                case 2: {
                    return new ThreeColumnRecord((Integer)partValues.get(0), String.valueOf(partValues.get(1)), String.valueOf(i));
                }
            }
            throw new ValidationException("This method currently supports only two columns as partition columns", new Object[0]);
        };
        List records = partitions.stream().flatMap(partition -> IntStream.range(0, numRecords).mapToObj(i -> (ThreeColumnRecord)recordFunction.apply(i, (List)partition))).collect(Collectors.toList());
        spark.createDataFrame(records, ThreeColumnRecord.class).repartition(files).write().format("iceberg").mode("append").save(this.name(table));
        table.refresh();
    }

    private void writeNewSchemaRecords(Table table, int files, int numRecords, int startingPartition, int partitions) {
        List records = IntStream.range(startingPartition, startingPartition + partitions).boxed().flatMap(partition -> IntStream.range(0, numRecords).mapToObj(i -> new FourColumnRecord((Integer)partition, String.valueOf(i), String.valueOf(i), String.valueOf(i)))).collect(Collectors.toList());
        spark.createDataFrame(records, FourColumnRecord.class).repartition(files).write().format("iceberg").mode("append").save(this.name(table));
    }

    private List<Object[]> records(Table table) {
        return this.rowsToJava(spark.read().format("iceberg").load(this.name(table)).sort("c1", new String[]{"c2", "c3"}).collectAsList());
    }

    private List<Object[]> deleteRecords(Table table) {
        String[] additionalFields = table.spec().isUnpartitioned() ? new String[]{"pos", "row"} : new String[]{"pos", "row", "partition", "spec_id"};
        return this.rowsToJava(spark.read().format("iceberg").load(this.name(table) + ".position_deletes").select("file_path", additionalFields).sort("file_path", new String[]{"pos"}).collectAsList());
    }

    private void writePosDeletesForFiles(Table table, int deleteFilesPerPartition, int deletesPerDataFile, List<DataFile> files) throws IOException {
        this.writePosDeletesForFiles(table, deleteFilesPerPartition, deletesPerDataFile, files, false);
    }

    private void writePosDeletesForFiles(Table table, int deleteFilesPerPartition, int deletesPerDataFile, List<DataFile> files, boolean transactional) throws IOException {
        Map<StructLike, List<DataFile>> filesByPartition = files.stream().collect(Collectors.groupingBy(ContentFile::partition));
        ArrayList deleteFiles = Lists.newArrayListWithCapacity((int)(deleteFilesPerPartition * filesByPartition.size()));
        String suffix = String.format(".%s", FileFormat.PARQUET.name().toLowerCase());
        for (Map.Entry<StructLike, List<DataFile>> filesByPartitionEntry : filesByPartition.entrySet()) {
            StructLike partition = filesByPartitionEntry.getKey();
            List<DataFile> partitionFiles = filesByPartitionEntry.getValue();
            int deletesForPartition = partitionFiles.size() * deletesPerDataFile;
            ((AbstractIntegerAssert)Assertions.assertThat((int)(deletesForPartition % deleteFilesPerPartition)).as("Number of delete files per partition should be evenly divisible by requested deletes per data file times number of data files in this partition", new Object[0])).isZero();
            int deleteFileSize = deletesForPartition / deleteFilesPerPartition;
            int counter = 0;
            ArrayList deletes = Lists.newArrayList();
            for (DataFile partitionFile : partitionFiles) {
                for (int deletePos = 0; deletePos < deletesPerDataFile; ++deletePos) {
                    deletes.add(Pair.of((Object)partitionFile.location(), (Object)deletePos));
                    if (++counter != deleteFileSize) continue;
                    OutputFile output = Files.localOutput((File)File.createTempFile("junit", suffix, this.temp.toFile()));
                    deleteFiles.add((DeleteFile)FileHelpers.writeDeleteFile((Table)table, (OutputFile)output, (StructLike)partition, (List)deletes).first());
                    counter = 0;
                    deletes.clear();
                }
            }
        }
        if (transactional) {
            RowDelta rowDelta = table.newRowDelta();
            deleteFiles.forEach(arg_0 -> ((RowDelta)rowDelta).addDeletes(arg_0));
            rowDelta.commit();
        } else {
            deleteFiles.forEach(deleteFile -> {
                RowDelta rowDelta = table.newRowDelta();
                rowDelta.addDeletes(deleteFile);
                rowDelta.commit();
            });
        }
    }

    private List<DeleteFile> deleteFiles(Table table) {
        Table deletesTable = MetadataTableUtils.createMetadataTableInstance((Table)table, (MetadataTableType)MetadataTableType.POSITION_DELETES);
        CloseableIterable tasks = deletesTable.newBatchScan().planFiles();
        return Lists.newArrayList((Iterable)CloseableIterable.transform((CloseableIterable)tasks, t -> (DeleteFile)((PositionDeletesScanTask)t).file()));
    }

    private <T extends ContentFile<?>> List<T> except(List<T> first, List<T> second) {
        Set secondPaths = second.stream().map(ContentFile::location).collect(Collectors.toSet());
        return first.stream().filter(f -> !secondPaths.contains(f.location())).collect(Collectors.toList());
    }

    private void assertNotContains(List<DeleteFile> original, List<DeleteFile> rewritten) {
        Set originalPaths = original.stream().map(ContentFile::location).collect(Collectors.toSet());
        Set rewrittenPaths = rewritten.stream().map(ContentFile::location).collect(Collectors.toSet());
        rewrittenPaths.retainAll(originalPaths);
        Assertions.assertThat(rewrittenPaths).isEmpty();
    }

    private void assertLocallySorted(List<DeleteFile> deleteFiles) {
        for (DeleteFile deleteFile : deleteFiles) {
            Dataset deletes = spark.read().format("iceberg").load("default.test_table.position_deletes");
            deletes.filter(deletes.col("delete_file_path").equalTo((Object)deleteFile.location()));
            List rows = deletes.collectAsList();
            ((ListAssert)Assertions.assertThat((List)rows).as("Empty delete file found", new Object[0])).isNotEmpty();
            int lastPos = 0;
            String lastPath = "";
            for (Row row : rows) {
                String path = (String)row.getAs("file_path");
                long pos = (Long)row.getAs("pos");
                if (path.compareTo(lastPath) < 0) {
                    Assertions.fail((String)String.format("File_path not sorted, Found %s after %s", path, lastPath));
                    continue;
                }
                if (!path.equals(lastPath)) continue;
                ((AbstractLongAssert)Assertions.assertThat((long)pos).as("Pos not sorted", new Object[0])).isGreaterThanOrEqualTo((long)lastPos);
            }
        }
    }

    private String name(Table table) {
        Object[] splits = table.name().split("\\.");
        Assertions.assertThat((Object[])splits).hasSize(3);
        return String.format("%s.%s", splits[1], splits[2]);
    }

    private long size(List<DeleteFile> deleteFiles) {
        return deleteFiles.stream().mapToLong(ContentFile::fileSizeInBytes).sum();
    }

    private List<Object[]> filterDeletes(List<Object[]> deletes, List<?> ... partitionValues) {
        Stream<Object[]> matches = deletes.stream().filter(r -> {
            Object[] partition = (Object[])r[3];
            return Arrays.stream(partitionValues).map(partitionValue -> this.match(partition, (List<?>)partitionValue)).reduce((a, b) -> a != false || b != false).get();
        });
        return this.sorted(matches).collect(Collectors.toList());
    }

    private boolean match(Object[] partition, List<?> expectedPartition) {
        return IntStream.range(0, expectedPartition.size()).mapToObj(j -> partition[j] == expectedPartition.get(j)).reduce((a, b) -> a != false && b != false).get();
    }

    private Stream<Object[]> sorted(Stream<Object[]> deletes) {
        return deletes.sorted((a, b) -> {
            String aFilePath = (String)a[0];
            String bFilePath = (String)b[0];
            int filePathCompare = aFilePath.compareTo(bFilePath);
            if (filePathCompare != 0) {
                return filePathCompare;
            }
            long aPos = (Long)a[1];
            long bPos = (Long)b[1];
            return Long.compare(aPos, bPos);
        });
    }

    private List<DeleteFile> filterFiles(Table table, List<DeleteFile> files, List<?> ... partitionValues) {
        List partitionTypes = table.specs().values().stream().map(PartitionSpec::partitionType).collect(Collectors.toList());
        List partitionDatas = Arrays.stream(partitionValues).map(partitionValue -> {
            Types.StructType thisType = partitionTypes.stream().filter(f -> f.fields().size() == partitionValue.size()).findFirst().get();
            PartitionData partition = new PartitionData(thisType);
            for (int i = 0; i < partitionValue.size(); ++i) {
                partition.set(i, partitionValue.get(i));
            }
            return partition;
        }).collect(Collectors.toList());
        return files.stream().filter(f -> partitionDatas.stream().anyMatch(data -> f.partition().equals(data))).collect(Collectors.toList());
    }

    private void checkResult(RewritePositionDeleteFiles.Result result, List<DeleteFile> rewrittenDeletes, List<DeleteFile> newDeletes, int expectedGroups) {
        ((AbstractIntegerAssert)Assertions.assertThat((int)rewrittenDeletes.size()).as("Rewritten delete file count", new Object[0])).isEqualTo(result.rewrittenDeleteFilesCount());
        ((AbstractIntegerAssert)Assertions.assertThat((int)newDeletes.size()).as("New delete file count", new Object[0])).isEqualTo(result.addedDeleteFilesCount());
        ((AbstractLongAssert)Assertions.assertThat((long)this.size(rewrittenDeletes)).as("Rewritten delete byte count", new Object[0])).isEqualTo(result.rewrittenBytesCount());
        ((AbstractLongAssert)Assertions.assertThat((long)this.size(newDeletes)).as("New delete byte count", new Object[0])).isEqualTo(result.addedBytesCount());
        ((AbstractIntegerAssert)Assertions.assertThat((int)expectedGroups).as("Rewrite group count", new Object[0])).isEqualTo(result.rewriteResults().size());
        ((AbstractIntegerAssert)Assertions.assertThat((int)rewrittenDeletes.size()).as("Rewritten delete file count in all groups", new Object[0])).isEqualTo(result.rewriteResults().stream().mapToInt(RewritePositionDeleteFiles.FileGroupRewriteResult::rewrittenDeleteFilesCount).sum());
        ((AbstractIntegerAssert)Assertions.assertThat((int)newDeletes.size()).as("Added delete file count in all groups", new Object[0])).isEqualTo(result.rewriteResults().stream().mapToInt(RewritePositionDeleteFiles.FileGroupRewriteResult::addedDeleteFilesCount).sum());
        ((AbstractLongAssert)Assertions.assertThat((long)this.size(rewrittenDeletes)).as("Rewritten delete bytes in all groups", new Object[0])).isEqualTo(result.rewriteResults().stream().mapToLong(RewritePositionDeleteFiles.FileGroupRewriteResult::rewrittenBytesCount).sum());
        ((AbstractLongAssert)Assertions.assertThat((long)this.size(newDeletes)).as("Added delete bytes in all groups", new Object[0])).isEqualTo(result.rewriteResults().stream().mapToLong(RewritePositionDeleteFiles.FileGroupRewriteResult::addedBytesCount).sum());
    }

    private void checkSequenceNumbers(Table table, List<DeleteFile> rewrittenDeletes, List<DeleteFile> addedDeletes) {
        StructLikeMap<List<DeleteFile>> rewrittenFilesPerPartition = this.groupPerPartition(table, rewrittenDeletes);
        StructLikeMap<List<DeleteFile>> addedFilesPerPartition = this.groupPerPartition(table, addedDeletes);
        for (StructLike partition : rewrittenFilesPerPartition.keySet()) {
            Long maxRewrittenSeq = ((List)rewrittenFilesPerPartition.get((Object)partition)).stream().mapToLong(ContentFile::dataSequenceNumber).max().getAsLong();
            List addedPartitionFiles = (List)addedFilesPerPartition.get((Object)partition);
            if (addedPartitionFiles == null) continue;
            addedPartitionFiles.forEach(d -> ((AbstractLongAssert)Assertions.assertThat((Long)d.dataSequenceNumber()).as("Sequence number should be max of rewritten set", new Object[0])).isEqualTo((Object)maxRewrittenSeq));
        }
    }

    private StructLikeMap<List<DeleteFile>> groupPerPartition(Table table, List<DeleteFile> deleteFiles) {
        StructLikeMap result = StructLikeMap.create((Types.StructType)Partitioning.partitionType((Table)table));
        for (DeleteFile deleteFile : deleteFiles) {
            StructLike partition = deleteFile.partition();
            List partitionFiles = (List)result.get((Object)partition);
            if (partitionFiles == null) {
                partitionFiles = Lists.newArrayList();
            }
            partitionFiles.add(deleteFile);
            result.put(partition, (Object)partitionFiles);
        }
        return result;
    }
}

