/*
 * Decompiled with CFR 0.152.
 */
package org.apache.iceberg.spark.extensions;

import java.io.File;
import java.io.IOException;
import java.net.URI;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.sql.Timestamp;
import java.time.Instant;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocalFileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.DataFiles;
import org.apache.iceberg.DeleteFile;
import org.apache.iceberg.Files;
import org.apache.iceberg.GenericBlobMetadata;
import org.apache.iceberg.GenericStatisticsFile;
import org.apache.iceberg.ManifestFile;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.PartitionStatisticsFile;
import org.apache.iceberg.ReachableFileUtil;
import org.apache.iceberg.StatisticsFile;
import org.apache.iceberg.Table;
import org.apache.iceberg.Transaction;
import org.apache.iceberg.exceptions.ValidationException;
import org.apache.iceberg.io.OutputFile;
import org.apache.iceberg.puffin.Blob;
import org.apache.iceberg.puffin.Puffin;
import org.apache.iceberg.puffin.PuffinWriter;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.spark.Spark3Util;
import org.apache.iceberg.spark.data.TestHelpers;
import org.apache.iceberg.spark.extensions.ProcedureUtil;
import org.apache.iceberg.spark.extensions.SparkExtensionsTestBase;
import org.apache.iceberg.spark.source.FilePathLastModifiedRecord;
import org.apache.iceberg.spark.source.SimpleRecord;
import org.apache.spark.sql.AnalysisException;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoder;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.catalyst.analysis.NoSuchProcedureException;
import org.apache.spark.sql.catalyst.analysis.NoSuchTableException;
import org.apache.spark.sql.catalyst.parser.ParseException;
import org.assertj.core.api.AbstractBooleanAssert;
import org.assertj.core.api.AbstractFileAssert;
import org.assertj.core.api.AbstractLongAssert;
import org.assertj.core.api.AbstractThrowableAssert;
import org.assertj.core.api.Assertions;
import org.assertj.core.api.ListAssert;
import org.assertj.core.api.ObjectArrayAssert;
import org.junit.After;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;

public class TestRemoveOrphanFilesProcedure
extends SparkExtensionsTestBase {
    @Rule
    public TemporaryFolder temp = new TemporaryFolder();

    public TestRemoveOrphanFilesProcedure(String catalogName, String implementation, Map<String, String> config) {
        super(catalogName, implementation, config);
    }

    @After
    public void removeTable() {
        this.sql("DROP TABLE IF EXISTS %s PURGE", new Object[]{this.tableName});
        this.sql("DROP TABLE IF EXISTS p PURGE", new Object[0]);
    }

    @Test
    public void testRemoveOrphanFilesInEmptyTable() {
        this.sql("CREATE TABLE %s (id bigint NOT NULL, data string) USING iceberg", new Object[]{this.tableName});
        List output = this.sql("CALL %s.system.remove_orphan_files('%s')", new Object[]{this.catalogName, this.tableIdent});
        this.assertEquals("Should be no orphan files", (List)ImmutableList.of(), output);
        this.assertEquals("Should have no rows", (List)ImmutableList.of(), this.sql("SELECT * FROM %s", new Object[]{this.tableName}));
    }

    @Test
    public void testRemoveOrphanFilesInDataFolder() throws IOException {
        if (this.catalogName.equals("testhadoop")) {
            this.sql("CREATE TABLE %s (id bigint NOT NULL, data string) USING iceberg", new Object[]{this.tableName});
        } else {
            this.sql("CREATE TABLE %s (id bigint NOT NULL, data string) USING iceberg LOCATION '%s'", new Object[]{this.tableName, this.temp.newFolder()});
        }
        this.sql("INSERT INTO TABLE %s VALUES (1, 'a')", new Object[]{this.tableName});
        this.sql("INSERT INTO TABLE %s VALUES (2, 'b')", new Object[]{this.tableName});
        Table table = this.validationCatalog.loadTable(this.tableIdent);
        String metadataLocation = table.location() + "/metadata";
        String dataLocation = table.location() + "/data";
        this.sql("CREATE TABLE p (id bigint) USING parquet LOCATION '%s'", new Object[]{dataLocation});
        this.sql("INSERT INTO TABLE p VALUES (1)", new Object[0]);
        this.waitUntilAfter(System.currentTimeMillis());
        Timestamp currentTimestamp = Timestamp.from(Instant.ofEpochMilli(System.currentTimeMillis()));
        List output1 = this.sql("CALL %s.system.remove_orphan_files(table => '%s',older_than => TIMESTAMP '%s',location => '%s')", new Object[]{this.catalogName, this.tableIdent, currentTimestamp, metadataLocation});
        this.assertEquals("Should be no orphan files in the metadata folder", (List)ImmutableList.of(), output1);
        List output2 = this.sql("CALL %s.system.remove_orphan_files(table => '%s',older_than => TIMESTAMP '%s')", new Object[]{this.catalogName, this.tableIdent, currentTimestamp});
        Assert.assertEquals((String)"Should be orphan files in the data folder", (long)1L, (long)output2.size());
        List output3 = this.sql("CALL %s.system.remove_orphan_files(table => '%s',older_than => TIMESTAMP '%s')", new Object[]{this.catalogName, this.tableIdent, currentTimestamp});
        Assert.assertEquals((String)"Should be no more orphan files in the data folder", (long)0L, (long)output3.size());
        this.assertEquals("Should have expected rows", (List)ImmutableList.of((Object)this.row(new Object[]{1L, "a"}), (Object)this.row(new Object[]{2L, "b"})), this.sql("SELECT * FROM %s ORDER BY id", new Object[]{this.tableName}));
    }

    @Test
    public void testRemoveOrphanFilesDryRun() throws IOException {
        if (this.catalogName.equals("testhadoop")) {
            this.sql("CREATE TABLE %s (id bigint NOT NULL, data string) USING iceberg", new Object[]{this.tableName});
        } else {
            this.sql("CREATE TABLE %s (id bigint NOT NULL, data string) USING iceberg LOCATION '%s'", new Object[]{this.tableName, this.temp.newFolder()});
        }
        this.sql("INSERT INTO TABLE %s VALUES (1, 'a')", new Object[]{this.tableName});
        this.sql("INSERT INTO TABLE %s VALUES (2, 'b')", new Object[]{this.tableName});
        Table table = this.validationCatalog.loadTable(this.tableIdent);
        this.sql("CREATE TABLE p (id bigint) USING parquet LOCATION '%s'", new Object[]{table.location()});
        this.sql("INSERT INTO TABLE p VALUES (1)", new Object[0]);
        this.waitUntilAfter(System.currentTimeMillis());
        Timestamp currentTimestamp = Timestamp.from(Instant.ofEpochMilli(System.currentTimeMillis()));
        List output1 = this.sql("CALL %s.system.remove_orphan_files(table => '%s',older_than => TIMESTAMP '%s',dry_run => true)", new Object[]{this.catalogName, this.tableIdent, currentTimestamp});
        Assert.assertEquals((String)"Should be one orphan files", (long)1L, (long)output1.size());
        List output2 = this.sql("CALL %s.system.remove_orphan_files(table => '%s',older_than => TIMESTAMP '%s')", new Object[]{this.catalogName, this.tableIdent, currentTimestamp});
        Assert.assertEquals((String)"Should be one orphan files", (long)1L, (long)output2.size());
        List output3 = this.sql("CALL %s.system.remove_orphan_files(table => '%s',older_than => TIMESTAMP '%s')", new Object[]{this.catalogName, this.tableIdent, currentTimestamp});
        Assert.assertEquals((String)"Should be no more orphan files", (long)0L, (long)output3.size());
        this.assertEquals("Should have expected rows", (List)ImmutableList.of((Object)this.row(new Object[]{1L, "a"}), (Object)this.row(new Object[]{2L, "b"})), this.sql("SELECT * FROM %s ORDER BY id", new Object[]{this.tableName}));
    }

    @Test
    public void testRemoveOrphanFilesGCDisabled() {
        this.sql("CREATE TABLE %s (id bigint NOT NULL, data string) USING iceberg", new Object[]{this.tableName});
        this.sql("ALTER TABLE %s SET TBLPROPERTIES ('%s' 'false')", new Object[]{this.tableName, "gc.enabled"});
        ((AbstractThrowableAssert)Assertions.assertThatThrownBy(() -> this.sql("CALL %s.system.remove_orphan_files('%s')", new Object[]{this.catalogName, this.tableIdent})).isInstanceOf(ValidationException.class)).hasMessage("Cannot delete orphan files: GC is disabled (deleting files may corrupt other tables)");
        this.sql("ALTER TABLE %s SET TBLPROPERTIES ('%s' 'true')", new Object[]{this.tableName, "gc.enabled"});
    }

    @Test
    public void testRemoveOrphanFilesWap() {
        this.sql("CREATE TABLE %s (id bigint NOT NULL, data string) USING iceberg", new Object[]{this.tableName});
        this.sql("ALTER TABLE %s SET TBLPROPERTIES ('%s' 'true')", new Object[]{this.tableName, "write.wap.enabled"});
        spark.conf().set("spark.wap.id", "1");
        this.sql("INSERT INTO TABLE %s VALUES (1, 'a')", new Object[]{this.tableName});
        this.assertEquals("Should not see rows from staged snapshot", (List)ImmutableList.of(), this.sql("SELECT * FROM %s", new Object[]{this.tableName}));
        List output = this.sql("CALL %s.system.remove_orphan_files('%s')", new Object[]{this.catalogName, this.tableIdent});
        this.assertEquals("Should be no orphan files", (List)ImmutableList.of(), output);
    }

    @Test
    public void testInvalidRemoveOrphanFilesCases() {
        ((AbstractThrowableAssert)Assertions.assertThatThrownBy(() -> this.sql("CALL %s.system.remove_orphan_files('n', table => 't')", new Object[]{this.catalogName})).isInstanceOf(AnalysisException.class)).hasMessage("Named and positional arguments cannot be mixed");
        ((AbstractThrowableAssert)Assertions.assertThatThrownBy(() -> this.sql("CALL %s.custom.remove_orphan_files('n', 't')", new Object[]{this.catalogName})).isInstanceOf(NoSuchProcedureException.class)).hasMessage("Procedure custom.remove_orphan_files not found");
        ((AbstractThrowableAssert)Assertions.assertThatThrownBy(() -> this.sql("CALL %s.system.remove_orphan_files()", new Object[]{this.catalogName})).isInstanceOf(AnalysisException.class)).hasMessage("Missing required parameters: [table]");
        ((AbstractThrowableAssert)Assertions.assertThatThrownBy(() -> this.sql("CALL %s.system.remove_orphan_files('n', 2.2)", new Object[]{this.catalogName})).isInstanceOf(AnalysisException.class)).hasMessageStartingWith("Wrong arg type for older_than");
        ((AbstractThrowableAssert)Assertions.assertThatThrownBy(() -> this.sql("CALL %s.system.remove_orphan_files('')", new Object[]{this.catalogName})).isInstanceOf(IllegalArgumentException.class)).hasMessage("Cannot handle an empty identifier for argument table");
    }

    @Test
    public void testConcurrentRemoveOrphanFiles() throws IOException {
        if (this.catalogName.equals("testhadoop")) {
            this.sql("CREATE TABLE %s (id bigint NOT NULL, data string) USING iceberg", new Object[]{this.tableName});
        } else {
            this.sql("CREATE TABLE %s (id bigint NOT NULL, data string) USING iceberg LOCATION '%s'", new Object[]{this.tableName, this.temp.newFolder()});
        }
        this.sql("INSERT INTO TABLE %s VALUES (1, 'a')", new Object[]{this.tableName});
        this.sql("INSERT INTO TABLE %s VALUES (2, 'b')", new Object[]{this.tableName});
        Table table = this.validationCatalog.loadTable(this.tableIdent);
        String dataLocation = table.location() + "/data";
        this.sql("CREATE TABLE p (id bigint) USING parquet LOCATION '%s'", new Object[]{dataLocation});
        this.sql("INSERT INTO TABLE p VALUES (1)", new Object[0]);
        this.sql("INSERT INTO TABLE p VALUES (10)", new Object[0]);
        this.sql("INSERT INTO TABLE p VALUES (100)", new Object[0]);
        this.sql("INSERT INTO TABLE p VALUES (1000)", new Object[0]);
        this.waitUntilAfter(System.currentTimeMillis());
        Timestamp currentTimestamp = Timestamp.from(Instant.ofEpochMilli(System.currentTimeMillis()));
        List output = this.sql("CALL %s.system.remove_orphan_files(table => '%s',max_concurrent_deletes => %s,older_than => TIMESTAMP '%s')", new Object[]{this.catalogName, this.tableIdent, 4, currentTimestamp});
        Assert.assertEquals((String)"Should be orphan files in the data folder", (long)4L, (long)output.size());
        List output3 = this.sql("CALL %s.system.remove_orphan_files(table => '%s',max_concurrent_deletes => %s,older_than => TIMESTAMP '%s')", new Object[]{this.catalogName, this.tableIdent, 4, currentTimestamp});
        Assert.assertEquals((String)"Should be no more orphan files in the data folder", (long)0L, (long)output3.size());
        this.assertEquals("Should have expected rows", (List)ImmutableList.of((Object)this.row(new Object[]{1L, "a"}), (Object)this.row(new Object[]{2L, "b"})), this.sql("SELECT * FROM %s ORDER BY id", new Object[]{this.tableName}));
    }

    @Test
    public void testConcurrentRemoveOrphanFilesWithInvalidInput() {
        this.sql("CREATE TABLE %s (id bigint NOT NULL, data string) USING iceberg", new Object[]{this.tableName});
        ((AbstractThrowableAssert)Assertions.assertThatThrownBy(() -> this.sql("CALL %s.system.remove_orphan_files(table => '%s', max_concurrent_deletes => %s)", new Object[]{this.catalogName, this.tableIdent, 0})).isInstanceOf(IllegalArgumentException.class)).hasMessage("max_concurrent_deletes should have value > 0, value: 0");
        ((AbstractThrowableAssert)Assertions.assertThatThrownBy(() -> this.sql("CALL %s.system.remove_orphan_files(table => '%s', max_concurrent_deletes => %s)", new Object[]{this.catalogName, this.tableIdent, -1})).isInstanceOf(IllegalArgumentException.class)).hasMessage("max_concurrent_deletes should have value > 0, value: -1");
        String tempViewName = "file_list_test";
        spark.emptyDataFrame().createOrReplaceTempView(tempViewName);
        ((AbstractThrowableAssert)Assertions.assertThatThrownBy(() -> this.sql("CALL %s.system.remove_orphan_files(table => '%s', file_list_view => '%s')", new Object[]{this.catalogName, this.tableIdent, tempViewName})).isInstanceOf(IllegalArgumentException.class)).hasMessage("file_path does not exist. Available: ");
        spark.createDataset((List)Lists.newArrayList(), Encoders.tuple((Encoder)Encoders.INT(), (Encoder)Encoders.TIMESTAMP())).toDF(new String[]{"file_path", "last_modified"}).createOrReplaceTempView(tempViewName);
        ((AbstractThrowableAssert)Assertions.assertThatThrownBy(() -> this.sql("CALL %s.system.remove_orphan_files(table => '%s', file_list_view => '%s')", new Object[]{this.catalogName, this.tableIdent, tempViewName})).isInstanceOf(IllegalArgumentException.class)).hasMessage("Invalid file_path column: IntegerType is not a string");
        spark.createDataset((List)Lists.newArrayList(), Encoders.tuple((Encoder)Encoders.STRING(), (Encoder)Encoders.STRING())).toDF(new String[]{"file_path", "last_modified"}).createOrReplaceTempView(tempViewName);
        ((AbstractThrowableAssert)Assertions.assertThatThrownBy(() -> this.sql("CALL %s.system.remove_orphan_files(table => '%s', file_list_view => '%s')", new Object[]{this.catalogName, this.tableIdent, tempViewName})).isInstanceOf(IllegalArgumentException.class)).hasMessage("Invalid last_modified column: StringType is not a timestamp");
    }

    @Test
    public void testRemoveOrphanFilesWithDeleteFiles() throws Exception {
        this.sql("CREATE TABLE %s (id int, data string) USING iceberg TBLPROPERTIES('format-version'='2', 'write.delete.mode'='merge-on-read')", new Object[]{this.tableName});
        ArrayList records = Lists.newArrayList((Object[])new SimpleRecord[]{new SimpleRecord(Integer.valueOf(1), "a"), new SimpleRecord(Integer.valueOf(2), "b"), new SimpleRecord(Integer.valueOf(3), "c"), new SimpleRecord(Integer.valueOf(4), "d")});
        spark.createDataset((List)records, Encoders.bean(SimpleRecord.class)).coalesce(1).writeTo(this.tableName).append();
        this.sql("DELETE FROM %s WHERE id=1", new Object[]{this.tableName});
        Table table = Spark3Util.loadIcebergTable((SparkSession)spark, (String)this.tableName);
        Assert.assertEquals((String)"Should have 1 delete manifest", (long)1L, (long)TestHelpers.deleteManifests((Table)table).size());
        Assert.assertEquals((String)"Should have 1 delete file", (long)1L, (long)TestHelpers.deleteFiles((Table)table).size());
        Path deleteManifestPath = new Path(((ManifestFile)TestHelpers.deleteManifests((Table)table).iterator().next()).path());
        Path deleteFilePath = new Path(String.valueOf(((DeleteFile)TestHelpers.deleteFiles((Table)table).iterator().next()).path()));
        this.waitUntilAfter(System.currentTimeMillis());
        Timestamp currentTimestamp = Timestamp.from(Instant.ofEpochMilli(System.currentTimeMillis()));
        List output = this.sql("CALL %s.system.remove_orphan_files(table => '%s',older_than => TIMESTAMP '%s')", new Object[]{this.catalogName, this.tableIdent, currentTimestamp});
        Assert.assertEquals((String)"Should be no orphan files", (long)0L, (long)output.size());
        LocalFileSystem localFs = FileSystem.getLocal((Configuration)new Configuration());
        Assert.assertTrue((String)"Delete manifest should still exist", (boolean)localFs.exists(deleteManifestPath));
        Assert.assertTrue((String)"Delete file should still exist", (boolean)localFs.exists(deleteFilePath));
        records.remove(new SimpleRecord(Integer.valueOf(1), "a"));
        Dataset resultDF = spark.read().format("iceberg").load(this.tableName);
        List actualRecords = resultDF.as(Encoders.bean(SimpleRecord.class)).collectAsList();
        Assert.assertEquals((String)"Rows must match", (Object)records, (Object)actualRecords);
    }

    @Test
    public void testRemoveOrphanFilesWithStatisticFiles() throws Exception {
        GenericStatisticsFile statisticsFile;
        this.sql("CREATE TABLE %s USING iceberg TBLPROPERTIES('format-version'='2') AS SELECT 10 int, 'abc' data", new Object[]{this.tableName});
        Table table = Spark3Util.loadIcebergTable((SparkSession)spark, (String)this.tableName);
        String statsFileName = "stats-file-" + UUID.randomUUID();
        File statsLocation = new File(new URI(table.location())).toPath().resolve("data").resolve(statsFileName).toFile();
        try (PuffinWriter puffinWriter = Puffin.write((OutputFile)Files.localOutput((File)statsLocation)).build();){
            long snapshotId = table.currentSnapshot().snapshotId();
            long snapshotSequenceNumber = table.currentSnapshot().sequenceNumber();
            puffinWriter.add(new Blob("some-blob-type", (List)ImmutableList.of((Object)1), snapshotId, snapshotSequenceNumber, ByteBuffer.wrap("blob content".getBytes(StandardCharsets.UTF_8))));
            puffinWriter.finish();
            statisticsFile = new GenericStatisticsFile(snapshotId, statsLocation.toString(), puffinWriter.fileSize(), puffinWriter.footerSize(), (List)puffinWriter.writtenBlobsMetadata().stream().map(GenericBlobMetadata::from).collect(ImmutableList.toImmutableList()));
        }
        Transaction transaction = table.newTransaction();
        transaction.updateStatistics().setStatistics(statisticsFile.snapshotId(), (StatisticsFile)statisticsFile).commit();
        transaction.commitTransaction();
        this.waitUntilAfter(System.currentTimeMillis());
        Timestamp currentTimestamp = Timestamp.from(Instant.ofEpochMilli(System.currentTimeMillis()));
        List output = this.sql("CALL %s.system.remove_orphan_files(table => '%s',older_than => TIMESTAMP '%s')", new Object[]{this.catalogName, this.tableIdent, currentTimestamp});
        ((ListAssert)Assertions.assertThat((List)output).as("Should be no orphan files", new Object[0])).isEmpty();
        ((AbstractBooleanAssert)Assertions.assertThat((boolean)statsLocation.exists()).as("stats file should exist", new Object[0])).isTrue();
        ((AbstractLongAssert)Assertions.assertThat((long)statsLocation.length()).as("stats file length", new Object[0])).isEqualTo(statisticsFile.fileSizeInBytes());
        transaction = table.newTransaction();
        transaction.updateStatistics().removeStatistics(statisticsFile.snapshotId()).commit();
        transaction.commitTransaction();
        output = this.sql("CALL %s.system.remove_orphan_files(table => '%s',older_than => TIMESTAMP '%s')", new Object[]{this.catalogName, this.tableIdent, currentTimestamp});
        ((ListAssert)Assertions.assertThat((List)output).as("Should be orphan files", new Object[0])).hasSize(1);
        ((ObjectArrayAssert)Assertions.assertThat((Object[])((Object[])Iterables.getOnlyElement((Iterable)output))).as("Deleted files", new Object[0])).containsExactly(new Object[]{statsLocation.toURI().toString()});
        ((AbstractBooleanAssert)Assertions.assertThat((boolean)statsLocation.exists()).as("stats file should be deleted", new Object[0])).isFalse();
    }

    @Test
    public void testRemoveOrphanFilesWithPartitionStatisticFiles() throws Exception {
        this.sql("CREATE TABLE %s USING iceberg TBLPROPERTIES('format-version'='2') AS SELECT 10 int, 'abc' data", new Object[]{this.tableName});
        Table table = Spark3Util.loadIcebergTable((SparkSession)spark, (String)this.tableName);
        String partitionStatsLocation = ProcedureUtil.statsFileLocation(table.location());
        PartitionStatisticsFile partitionStatisticsFile = ProcedureUtil.writePartitionStatsFile(table.currentSnapshot().snapshotId(), partitionStatsLocation, table.io());
        TestRemoveOrphanFilesProcedure.commitPartitionStatsTxn(table, partitionStatisticsFile);
        this.waitUntilAfter(System.currentTimeMillis());
        Timestamp currentTimestamp = Timestamp.from(Instant.ofEpochMilli(System.currentTimeMillis()));
        List output = this.sql("CALL %s.system.remove_orphan_files(table => '%s',older_than => TIMESTAMP '%s')", new Object[]{this.catalogName, this.tableIdent, currentTimestamp});
        ((ListAssert)Assertions.assertThat((List)output).as("Should be no orphan files", new Object[0])).isEmpty();
        ((AbstractFileAssert)Assertions.assertThat((File)new File(partitionStatsLocation)).as("partition stats file should exist", new Object[0])).exists();
        TestRemoveOrphanFilesProcedure.removePartitionStatsTxn(table, partitionStatisticsFile);
        output = this.sql("CALL %s.system.remove_orphan_files(table => '%s',older_than => TIMESTAMP '%s')", new Object[]{this.catalogName, this.tableIdent, currentTimestamp});
        ((ListAssert)Assertions.assertThat((List)output).as("Should be orphan files", new Object[0])).hasSize(1);
        ((ObjectArrayAssert)Assertions.assertThat((Object[])((Object[])Iterables.getOnlyElement((Iterable)output))).as("Deleted files", new Object[0])).containsExactly(new Object[]{"file:" + partitionStatsLocation});
        ((AbstractFileAssert)Assertions.assertThat((File)new File(partitionStatsLocation)).as("partition stats file should be deleted", new Object[0])).doesNotExist();
    }

    private static void removePartitionStatsTxn(Table table, PartitionStatisticsFile partitionStatisticsFile) {
        Transaction transaction = table.newTransaction();
        transaction.updatePartitionStatistics().removePartitionStatistics(partitionStatisticsFile.snapshotId()).commit();
        transaction.commitTransaction();
    }

    private static void commitPartitionStatsTxn(Table table, PartitionStatisticsFile partitionStatisticsFile) {
        Transaction transaction = table.newTransaction();
        transaction.updatePartitionStatistics().setPartitionStatistics(partitionStatisticsFile).commit();
        transaction.commitTransaction();
    }

    @Test
    public void testRemoveOrphanFilesProcedureWithPrefixMode() throws NoSuchTableException, ParseException, IOException {
        if (this.catalogName.equals("testhadoop")) {
            this.sql("CREATE TABLE %s (id bigint NOT NULL, data string) USING iceberg", new Object[]{this.tableName});
        } else {
            this.sql("CREATE TABLE %s (id bigint NOT NULL, data string) USING iceberg LOCATION '%s'", new Object[]{this.tableName, this.temp.newFolder().toURI().toString()});
        }
        Table table = Spark3Util.loadIcebergTable((SparkSession)spark, (String)this.tableName);
        String location = table.location();
        Path originalPath = new Path(location);
        URI uri = originalPath.toUri();
        Path newParentPath = new Path("file1", uri.getAuthority(), uri.getPath());
        DataFile dataFile1 = DataFiles.builder((PartitionSpec)PartitionSpec.unpartitioned()).withPath(new Path(newParentPath, "path/to/data-a.parquet").toString()).withFileSizeInBytes(10L).withRecordCount(1L).build();
        DataFile dataFile2 = DataFiles.builder((PartitionSpec)PartitionSpec.unpartitioned()).withPath(new Path(newParentPath, "path/to/data-b.parquet").toString()).withFileSizeInBytes(10L).withRecordCount(1L).build();
        table.newFastAppend().appendFile(dataFile1).appendFile(dataFile2).commit();
        Timestamp lastModifiedTimestamp = new Timestamp(10000L);
        ArrayList allFiles = Lists.newArrayList((Object[])new FilePathLastModifiedRecord[]{new FilePathLastModifiedRecord(new Path(originalPath, "path/to/data-a.parquet").toString(), lastModifiedTimestamp), new FilePathLastModifiedRecord(new Path(originalPath, "path/to/data-b.parquet").toString(), lastModifiedTimestamp), new FilePathLastModifiedRecord(ReachableFileUtil.versionHintLocation((Table)table), lastModifiedTimestamp)});
        for (String file : ReachableFileUtil.metadataFileLocations((Table)table, (boolean)true)) {
            allFiles.add(new FilePathLastModifiedRecord(file, lastModifiedTimestamp));
        }
        for (ManifestFile manifest : TestHelpers.dataManifests((Table)table)) {
            allFiles.add(new FilePathLastModifiedRecord(manifest.path(), lastModifiedTimestamp));
        }
        Dataset compareToFileList = spark.createDataFrame((List)allFiles, FilePathLastModifiedRecord.class).withColumnRenamed("filePath", "file_path").withColumnRenamed("lastModified", "last_modified");
        String fileListViewName = "files_view";
        compareToFileList.createOrReplaceTempView(fileListViewName);
        List orphanFiles = this.sql("CALL %s.system.remove_orphan_files(table => '%s',equal_schemes => map('file1', 'file'),file_list_view => '%s')", new Object[]{this.catalogName, this.tableIdent, fileListViewName});
        Assert.assertEquals((long)0L, (long)orphanFiles.size());
        ((AbstractThrowableAssert)Assertions.assertThatThrownBy(() -> this.sql("CALL %s.system.remove_orphan_files(table => '%s',file_list_view => '%s')", new Object[]{this.catalogName, this.tableIdent, fileListViewName})).isInstanceOf(ValidationException.class)).hasMessageEndingWith("Conflicting authorities/schemes: [(file1, file)].");
        this.sql("DROP TABLE %s", new Object[]{this.tableName});
    }

    @Test
    public void testRemoveOrphanFilesProcedureWithEqualAuthorities() throws NoSuchTableException, ParseException, IOException {
        if (this.catalogName.equals("testhadoop")) {
            this.sql("CREATE TABLE %s (id bigint NOT NULL, data string) USING iceberg", new Object[]{this.tableName});
        } else {
            this.sql("CREATE TABLE %s (id bigint NOT NULL, data string) USING iceberg LOCATION '%s'", new Object[]{this.tableName, this.temp.newFolder().toURI().toString()});
        }
        Table table = Spark3Util.loadIcebergTable((SparkSession)spark, (String)this.tableName);
        Path originalPath = new Path(table.location());
        URI uri = originalPath.toUri();
        String originalAuthority = uri.getAuthority() == null ? "" : uri.getAuthority();
        Path newParentPath = new Path(uri.getScheme(), "localhost", uri.getPath());
        DataFile dataFile1 = DataFiles.builder((PartitionSpec)PartitionSpec.unpartitioned()).withPath(new Path(newParentPath, "path/to/data-a.parquet").toString()).withFileSizeInBytes(10L).withRecordCount(1L).build();
        DataFile dataFile2 = DataFiles.builder((PartitionSpec)PartitionSpec.unpartitioned()).withPath(new Path(newParentPath, "path/to/data-b.parquet").toString()).withFileSizeInBytes(10L).withRecordCount(1L).build();
        table.newFastAppend().appendFile(dataFile1).appendFile(dataFile2).commit();
        Timestamp lastModifiedTimestamp = new Timestamp(10000L);
        ArrayList allFiles = Lists.newArrayList((Object[])new FilePathLastModifiedRecord[]{new FilePathLastModifiedRecord(new Path(originalPath, "path/to/data-a.parquet").toString(), lastModifiedTimestamp), new FilePathLastModifiedRecord(new Path(originalPath, "path/to/data-b.parquet").toString(), lastModifiedTimestamp), new FilePathLastModifiedRecord(ReachableFileUtil.versionHintLocation((Table)table), lastModifiedTimestamp)});
        for (String file : ReachableFileUtil.metadataFileLocations((Table)table, (boolean)true)) {
            allFiles.add(new FilePathLastModifiedRecord(file, lastModifiedTimestamp));
        }
        for (ManifestFile manifest : TestHelpers.dataManifests((Table)table)) {
            allFiles.add(new FilePathLastModifiedRecord(manifest.path(), lastModifiedTimestamp));
        }
        Dataset compareToFileList = spark.createDataFrame((List)allFiles, FilePathLastModifiedRecord.class).withColumnRenamed("filePath", "file_path").withColumnRenamed("lastModified", "last_modified");
        String fileListViewName = "files_view";
        compareToFileList.createOrReplaceTempView(fileListViewName);
        List orphanFiles = this.sql("CALL %s.system.remove_orphan_files(table => '%s',equal_authorities => map('localhost', '%s'),file_list_view => '%s')", new Object[]{this.catalogName, this.tableIdent, originalAuthority, fileListViewName});
        Assert.assertEquals((long)0L, (long)orphanFiles.size());
        ((AbstractThrowableAssert)Assertions.assertThatThrownBy(() -> this.sql("CALL %s.system.remove_orphan_files(table => '%s',file_list_view => '%s')", new Object[]{this.catalogName, this.tableIdent, fileListViewName})).isInstanceOf(ValidationException.class)).hasMessageEndingWith("Conflicting authorities/schemes: [(localhost, null)].");
        this.sql("DROP TABLE %s", new Object[]{this.tableName});
    }
}

