/*
 * Decompiled with CFR 0.152.
 */
package org.apache.iceberg.spark.actions;

import java.io.File;
import java.io.IOException;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import org.apache.hadoop.conf.Configuration;
import org.apache.iceberg.AppendFiles;
import org.apache.iceberg.BaseTable;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.DataFiles;
import org.apache.iceberg.DeleteFile;
import org.apache.iceberg.FileMetadata;
import org.apache.iceberg.ManifestFile;
import org.apache.iceberg.OverwriteFiles;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.ReachableFileUtil;
import org.apache.iceberg.Schema;
import org.apache.iceberg.Snapshot;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableMetadata;
import org.apache.iceberg.actions.ExpireSnapshots;
import org.apache.iceberg.exceptions.ValidationException;
import org.apache.iceberg.expressions.Expression;
import org.apache.iceberg.expressions.Expressions;
import org.apache.iceberg.hadoop.HadoopTables;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.apache.iceberg.spark.TestBase;
import org.apache.iceberg.spark.actions.ExpireSnapshotsSparkAction;
import org.apache.iceberg.spark.actions.FileInfo;
import org.apache.iceberg.spark.actions.SparkActions;
import org.apache.iceberg.spark.data.TestHelpers;
import org.apache.iceberg.types.Type;
import org.apache.iceberg.types.Types;
import org.apache.spark.sql.Dataset;
import org.assertj.core.api.AbstractCollectionAssert;
import org.assertj.core.api.AbstractIntegerAssert;
import org.assertj.core.api.AbstractLongAssert;
import org.assertj.core.api.AbstractStringAssert;
import org.assertj.core.api.AbstractThrowableAssert;
import org.assertj.core.api.Assertions;
import org.assertj.core.api.IterableAssert;
import org.assertj.core.api.ListAssert;
import org.assertj.core.api.ObjectAssert;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;

public class TestExpireSnapshotsAction
extends TestBase {
    private static final HadoopTables TABLES = new HadoopTables(new Configuration());
    private static final Schema SCHEMA = new Schema(new Types.NestedField[]{Types.NestedField.optional((int)1, (String)"c1", (Type)Types.IntegerType.get()), Types.NestedField.optional((int)2, (String)"c2", (Type)Types.StringType.get()), Types.NestedField.optional((int)3, (String)"c3", (Type)Types.StringType.get())});
    private static final int SHUFFLE_PARTITIONS = 2;
    private static final PartitionSpec SPEC = PartitionSpec.builderFor((Schema)SCHEMA).identity("c1").build();
    static final DataFile FILE_A = DataFiles.builder((PartitionSpec)SPEC).withPath("/path/to/data-a.parquet").withFileSizeInBytes(10L).withPartitionPath("c1=0").withRecordCount(1L).build();
    static final DataFile FILE_B = DataFiles.builder((PartitionSpec)SPEC).withPath("/path/to/data-b.parquet").withFileSizeInBytes(10L).withPartitionPath("c1=1").withRecordCount(1L).build();
    static final DataFile FILE_C = DataFiles.builder((PartitionSpec)SPEC).withPath("/path/to/data-c.parquet").withFileSizeInBytes(10L).withPartitionPath("c1=2").withRecordCount(1L).build();
    static final DataFile FILE_D = DataFiles.builder((PartitionSpec)SPEC).withPath("/path/to/data-d.parquet").withFileSizeInBytes(10L).withPartitionPath("c1=3").withRecordCount(1L).build();
    static final DeleteFile FILE_A_POS_DELETES = FileMetadata.deleteFileBuilder((PartitionSpec)SPEC).ofPositionDeletes().withPath("/path/to/data-a-pos-deletes.parquet").withFileSizeInBytes(10L).withPartitionPath("c1=0").withRecordCount(1L).build();
    static final DeleteFile FILE_A_EQ_DELETES = FileMetadata.deleteFileBuilder((PartitionSpec)SPEC).ofEqualityDeletes(new int[0]).withPath("/path/to/data-a-eq-deletes.parquet").withFileSizeInBytes(10L).withPartitionPath("c1=0").withRecordCount(1L).build();
    @TempDir
    private Path temp;
    private File tableDir;
    private String tableLocation;
    private Table table;

    @BeforeEach
    public void setupTableLocation() throws Exception {
        this.tableDir = this.temp.resolve("junit").toFile();
        this.tableLocation = this.tableDir.toURI().toString();
        this.table = TABLES.create(SCHEMA, SPEC, (Map)Maps.newHashMap(), this.tableLocation);
        spark.conf().set("spark.sql.shuffle.partitions", 2L);
    }

    private Long rightAfterSnapshot() {
        return this.rightAfterSnapshot(this.table.currentSnapshot().snapshotId());
    }

    private Long rightAfterSnapshot(long snapshotId) {
        Long end = System.currentTimeMillis();
        while (end <= this.table.snapshot(snapshotId).timestampMillis()) {
            end = System.currentTimeMillis();
        }
        return end;
    }

    private void checkExpirationResults(long expectedDatafiles, long expectedPosDeleteFiles, long expectedEqDeleteFiles, long expectedManifestsDeleted, long expectedManifestListsDeleted, ExpireSnapshots.Result results) {
        ((AbstractLongAssert)Assertions.assertThat((long)results.deletedManifestsCount()).as("Incorrect number of manifest files deleted", new Object[0])).isEqualTo(expectedManifestsDeleted);
        ((AbstractLongAssert)Assertions.assertThat((long)results.deletedDataFilesCount()).as("Incorrect number of datafiles deleted", new Object[0])).isEqualTo(expectedDatafiles);
        ((AbstractLongAssert)Assertions.assertThat((long)results.deletedPositionDeleteFilesCount()).as("Incorrect number of pos deletefiles deleted", new Object[0])).isEqualTo(expectedPosDeleteFiles);
        ((AbstractLongAssert)Assertions.assertThat((long)results.deletedEqualityDeleteFilesCount()).as("Incorrect number of eq deletefiles deleted", new Object[0])).isEqualTo(expectedEqDeleteFiles);
        ((AbstractLongAssert)Assertions.assertThat((long)results.deletedManifestListsCount()).as("Incorrect number of manifest lists deleted", new Object[0])).isEqualTo(expectedManifestListsDeleted);
    }

    @Test
    public void testFilesCleaned() throws Exception {
        this.table.newFastAppend().appendFile(FILE_A).commit();
        this.table.newOverwrite().deleteFile(FILE_A).addFile(FILE_B).commit();
        this.table.newFastAppend().appendFile(FILE_C).commit();
        long end = this.rightAfterSnapshot();
        ExpireSnapshots.Result results = SparkActions.get().expireSnapshots(this.table).expireOlderThan(end).execute();
        ((IterableAssert)Assertions.assertThat((Iterable)this.table.snapshots()).as("Table does not have 1 snapshot after expiration", new Object[0])).hasSize(1);
        this.checkExpirationResults(1L, 0L, 0L, 1L, 2L, results);
    }

    @Test
    public void dataFilesCleanupWithParallelTasks() throws IOException {
        this.table.newFastAppend().appendFile(FILE_A).commit();
        this.table.newFastAppend().appendFile(FILE_B).commit();
        this.table.newRewrite().rewriteFiles((Set)ImmutableSet.of((Object)FILE_B), (Set)ImmutableSet.of((Object)FILE_D)).commit();
        this.table.newRewrite().rewriteFiles((Set)ImmutableSet.of((Object)FILE_A), (Set)ImmutableSet.of((Object)FILE_C)).commit();
        long t4 = this.rightAfterSnapshot();
        ConcurrentHashMap.KeySetView deletedFiles = ConcurrentHashMap.newKeySet();
        ConcurrentHashMap.KeySetView deleteThreads = ConcurrentHashMap.newKeySet();
        AtomicInteger deleteThreadsIndex = new AtomicInteger(0);
        ExpireSnapshots.Result result = SparkActions.get().expireSnapshots(this.table).executeDeleteWith(Executors.newFixedThreadPool(4, runnable -> {
            Thread thread = new Thread(runnable);
            thread.setName("remove-snapshot-" + deleteThreadsIndex.getAndIncrement());
            thread.setDaemon(true);
            return thread;
        })).expireOlderThan(t4).deleteWith(s -> {
            deleteThreads.add(Thread.currentThread().getName());
            deletedFiles.add(s);
        }).execute();
        Assertions.assertThat(deleteThreads).isEqualTo((Object)Sets.newHashSet((Object[])new String[]{"remove-snapshot-0", "remove-snapshot-1", "remove-snapshot-2", "remove-snapshot-3"}));
        ((AbstractCollectionAssert)Assertions.assertThat(deletedFiles).as("FILE_A should be deleted", new Object[0])).contains((Object[])new String[]{FILE_A.path().toString()});
        ((AbstractCollectionAssert)Assertions.assertThat(deletedFiles).as("FILE_B should be deleted", new Object[0])).contains((Object[])new String[]{FILE_B.path().toString()});
        this.checkExpirationResults(2L, 0L, 0L, 3L, 3L, result);
    }

    @Test
    public void testNoFilesDeletedWhenNoSnapshotsExpired() throws Exception {
        this.table.newFastAppend().appendFile(FILE_A).commit();
        ExpireSnapshots.Result results = SparkActions.get().expireSnapshots(this.table).execute();
        this.checkExpirationResults(0L, 0L, 0L, 0L, 0L, results);
    }

    @Test
    public void testCleanupRepeatedOverwrites() throws Exception {
        this.table.newFastAppend().appendFile(FILE_A).commit();
        for (int i = 0; i < 10; ++i) {
            this.table.newOverwrite().deleteFile(FILE_A).addFile(FILE_B).commit();
            this.table.newOverwrite().deleteFile(FILE_B).addFile(FILE_A).commit();
        }
        long end = this.rightAfterSnapshot();
        ExpireSnapshots.Result results = SparkActions.get().expireSnapshots(this.table).expireOlderThan(end).execute();
        this.checkExpirationResults(1L, 0L, 0L, 39L, 20L, results);
    }

    @Test
    public void testRetainLastWithExpireOlderThan() {
        this.table.newAppend().appendFile(FILE_A).commit();
        long firstSnapshotId = this.table.currentSnapshot().snapshotId();
        long t1 = System.currentTimeMillis();
        while (t1 <= this.table.currentSnapshot().timestampMillis()) {
            t1 = System.currentTimeMillis();
        }
        this.table.newAppend().appendFile(FILE_B).commit();
        this.table.newAppend().appendFile(FILE_C).commit();
        long t3 = this.rightAfterSnapshot();
        SparkActions.get().expireSnapshots(this.table).expireOlderThan(t3).retainLast(2).execute();
        ((IterableAssert)Assertions.assertThat((Iterable)this.table.snapshots()).as("Should have two snapshots.", new Object[0])).hasSize(2);
        ((ObjectAssert)Assertions.assertThat((Object)this.table.snapshot(firstSnapshotId)).as("First snapshot should not present.", new Object[0])).isNull();
    }

    @Test
    public void testExpireTwoSnapshotsById() throws Exception {
        this.table.newAppend().appendFile(FILE_A).commit();
        long firstSnapshotId = this.table.currentSnapshot().snapshotId();
        this.table.newAppend().appendFile(FILE_B).commit();
        long secondSnapshotID = this.table.currentSnapshot().snapshotId();
        this.table.newAppend().appendFile(FILE_C).commit();
        ExpireSnapshots.Result result = SparkActions.get().expireSnapshots(this.table).expireSnapshotId(firstSnapshotId).expireSnapshotId(secondSnapshotID).execute();
        ((IterableAssert)Assertions.assertThat((Iterable)this.table.snapshots()).as("Should have one snapshot.", new Object[0])).hasSize(1);
        ((ObjectAssert)Assertions.assertThat((Object)this.table.snapshot(firstSnapshotId)).as("First snapshot should not present.", new Object[0])).isNull();
        ((ObjectAssert)Assertions.assertThat((Object)this.table.snapshot(secondSnapshotID)).as("Second snapshot should not present.", new Object[0])).isNull();
        this.checkExpirationResults(0L, 0L, 0L, 0L, 2L, result);
    }

    @Test
    public void testRetainLastWithExpireById() {
        this.table.newAppend().appendFile(FILE_A).commit();
        long firstSnapshotId = this.table.currentSnapshot().snapshotId();
        this.table.newAppend().appendFile(FILE_B).commit();
        this.table.newAppend().appendFile(FILE_C).commit();
        ExpireSnapshots.Result result = SparkActions.get().expireSnapshots(this.table).expireSnapshotId(firstSnapshotId).retainLast(3).execute();
        ((IterableAssert)Assertions.assertThat((Iterable)this.table.snapshots()).as("Should have 2 snapshots.", new Object[0])).hasSize(2);
        ((ObjectAssert)Assertions.assertThat((Object)this.table.snapshot(firstSnapshotId)).as("First snapshot should not present.", new Object[0])).isNull();
        this.checkExpirationResults(0L, 0L, 0L, 0L, 1L, result);
    }

    @Test
    public void testRetainLastWithTooFewSnapshots() {
        this.table.newAppend().appendFile(FILE_A).appendFile(FILE_B).commit();
        long firstSnapshotId = this.table.currentSnapshot().snapshotId();
        this.table.newAppend().appendFile(FILE_C).commit();
        long t2 = this.rightAfterSnapshot();
        ExpireSnapshots.Result result = SparkActions.get().expireSnapshots(this.table).expireOlderThan(t2).retainLast(3).execute();
        ((IterableAssert)Assertions.assertThat((Iterable)this.table.snapshots()).as("Should have two snapshots.", new Object[0])).hasSize(2);
        ((ObjectAssert)Assertions.assertThat((Object)this.table.snapshot(firstSnapshotId)).as("First snapshot should still be present.", new Object[0])).isNotNull();
        this.checkExpirationResults(0L, 0L, 0L, 0L, 0L, result);
    }

    @Test
    public void testRetainLastKeepsExpiringSnapshot() {
        this.table.newAppend().appendFile(FILE_A).commit();
        this.table.newAppend().appendFile(FILE_B).commit();
        Snapshot secondSnapshot = this.table.currentSnapshot();
        this.table.newAppend().appendFile(FILE_C).commit();
        this.table.newAppend().appendFile(FILE_D).commit();
        ExpireSnapshots.Result result = SparkActions.get().expireSnapshots(this.table).expireOlderThan(secondSnapshot.timestampMillis()).retainLast(2).execute();
        ((IterableAssert)Assertions.assertThat((Iterable)this.table.snapshots()).as("Should have three snapshots.", new Object[0])).hasSize(3);
        ((ObjectAssert)Assertions.assertThat((Object)this.table.snapshot(secondSnapshot.snapshotId())).as("First snapshot should be present.", new Object[0])).isNotNull();
        this.checkExpirationResults(0L, 0L, 0L, 0L, 1L, result);
    }

    @Test
    public void testExpireSnapshotsWithDisabledGarbageCollection() {
        this.table.updateProperties().set("gc.enabled", "false").commit();
        this.table.newAppend().appendFile(FILE_A).commit();
        ((AbstractThrowableAssert)Assertions.assertThatThrownBy(() -> SparkActions.get().expireSnapshots(this.table)).isInstanceOf(ValidationException.class)).hasMessage("Cannot expire snapshots: GC is disabled (deleting files may corrupt other tables)");
    }

    @Test
    public void testExpireOlderThanMultipleCalls() {
        this.table.newAppend().appendFile(FILE_A).commit();
        this.table.newAppend().appendFile(FILE_B).commit();
        Snapshot secondSnapshot = this.table.currentSnapshot();
        this.table.newAppend().appendFile(FILE_C).commit();
        Snapshot thirdSnapshot = this.table.currentSnapshot();
        ExpireSnapshots.Result result = SparkActions.get().expireSnapshots(this.table).expireOlderThan(secondSnapshot.timestampMillis()).expireOlderThan(thirdSnapshot.timestampMillis()).execute();
        ((IterableAssert)Assertions.assertThat((Iterable)this.table.snapshots()).as("Should have one snapshot.", new Object[0])).hasSize(1);
        ((ObjectAssert)Assertions.assertThat((Object)this.table.snapshot(secondSnapshot.snapshotId())).as("Second snapshot should not present.", new Object[0])).isNull();
        this.checkExpirationResults(0L, 0L, 0L, 0L, 2L, result);
    }

    @Test
    public void testRetainLastMultipleCalls() {
        this.table.newAppend().appendFile(FILE_A).commit();
        this.table.newAppend().appendFile(FILE_B).commit();
        Snapshot secondSnapshot = this.table.currentSnapshot();
        this.table.newAppend().appendFile(FILE_C).commit();
        long t3 = this.rightAfterSnapshot();
        ExpireSnapshots.Result result = SparkActions.get().expireSnapshots(this.table).expireOlderThan(t3).retainLast(2).retainLast(1).execute();
        ((IterableAssert)Assertions.assertThat((Iterable)this.table.snapshots()).as("Should have one snapshot.", new Object[0])).hasSize(1);
        ((ObjectAssert)Assertions.assertThat((Object)this.table.snapshot(secondSnapshot.snapshotId())).as("Second snapshot should not present.", new Object[0])).isNull();
        this.checkExpirationResults(0L, 0L, 0L, 0L, 2L, result);
    }

    @Test
    public void testRetainZeroSnapshots() {
        ((AbstractThrowableAssert)Assertions.assertThatThrownBy(() -> SparkActions.get().expireSnapshots(this.table).retainLast(0).execute()).isInstanceOf(IllegalArgumentException.class)).hasMessage("Number of snapshots to retain must be at least 1, cannot be: 0");
    }

    @Test
    public void testScanExpiredManifestInValidSnapshotAppend() {
        this.table.newAppend().appendFile(FILE_A).appendFile(FILE_B).commit();
        this.table.newOverwrite().addFile(FILE_C).deleteFile(FILE_A).commit();
        this.table.newAppend().appendFile(FILE_D).commit();
        long t3 = this.rightAfterSnapshot();
        HashSet deletedFiles = Sets.newHashSet();
        ExpireSnapshots.Result result = SparkActions.get().expireSnapshots(this.table).expireOlderThan(t3).deleteWith(deletedFiles::add).execute();
        ((AbstractCollectionAssert)Assertions.assertThat((Collection)deletedFiles).as("FILE_A should be deleted", new Object[0])).contains((Object[])new String[]{FILE_A.path().toString()});
        this.checkExpirationResults(1L, 0L, 0L, 1L, 2L, result);
    }

    @Test
    public void testScanExpiredManifestInValidSnapshotFastAppend() {
        this.table.updateProperties().set("commit.manifest-merge.enabled", "true").set("commit.manifest.min-count-to-merge", "1").commit();
        this.table.newAppend().appendFile(FILE_A).appendFile(FILE_B).commit();
        this.table.newOverwrite().addFile(FILE_C).deleteFile(FILE_A).commit();
        this.table.newFastAppend().appendFile(FILE_D).commit();
        long t3 = this.rightAfterSnapshot();
        HashSet deletedFiles = Sets.newHashSet();
        ExpireSnapshots.Result result = SparkActions.get().expireSnapshots(this.table).expireOlderThan(t3).deleteWith(deletedFiles::add).execute();
        ((AbstractCollectionAssert)Assertions.assertThat((Collection)deletedFiles).as("FILE_A should be deleted", new Object[0])).contains((Object[])new String[]{FILE_A.path().toString()});
        this.checkExpirationResults(1L, 0L, 0L, 1L, 2L, result);
    }

    @Test
    public void testWithExpiringDanglingStageCommit() {
        this.table.newAppend().appendFile(FILE_A).commit();
        ((AppendFiles)this.table.newAppend().appendFile(FILE_B).stageOnly()).commit();
        TableMetadata base = ((BaseTable)this.table).operations().current();
        Snapshot snapshotA = (Snapshot)base.snapshots().get(0);
        Snapshot snapshotB = (Snapshot)base.snapshots().get(1);
        this.table.newAppend().appendFile(FILE_C).commit();
        HashSet deletedFiles = Sets.newHashSet();
        ExpireSnapshots.Result result = SparkActions.get().expireSnapshots(this.table).deleteWith(deletedFiles::add).expireOlderThan(snapshotB.timestampMillis() + 1L).execute();
        this.checkExpirationResults(1L, 0L, 0L, 1L, 2L, result);
        HashSet expectedDeletes = Sets.newHashSet();
        expectedDeletes.add(snapshotA.manifestListLocation());
        snapshotB.addedDataFiles(this.table.io()).forEach(i -> expectedDeletes.add(i.path().toString()));
        expectedDeletes.add(snapshotB.manifestListLocation());
        snapshotB.dataManifests(this.table.io()).forEach(file -> {
            if (file.snapshotId().longValue() == snapshotB.snapshotId()) {
                expectedDeletes.add(file.path());
            }
        });
        ((AbstractCollectionAssert)Assertions.assertThat((Collection)expectedDeletes).as("Files deleted count should be expected", new Object[0])).hasSameSizeAs((Iterable)deletedFiles);
        expectedDeletes.removeAll(deletedFiles);
        ((AbstractCollectionAssert)Assertions.assertThat((Collection)expectedDeletes).as("Exactly same files should be deleted", new Object[0])).isEmpty();
    }

    @Test
    public void testWithCherryPickTableSnapshot() {
        this.table.newAppend().appendFile(FILE_A).commit();
        Snapshot snapshotA = this.table.currentSnapshot();
        HashSet deletedAFiles = Sets.newHashSet();
        ((OverwriteFiles)this.table.newOverwrite().addFile(FILE_B).deleteFile(FILE_A).deleteWith(deletedAFiles::add)).commit();
        ((AbstractCollectionAssert)Assertions.assertThat((Collection)deletedAFiles).as("No files should be physically deleted", new Object[0])).isEmpty();
        Snapshot snapshotB = this.table.currentSnapshot();
        this.table.newAppend().appendFile(FILE_C).commit();
        Snapshot snapshotC = this.table.currentSnapshot();
        this.table.manageSnapshots().setCurrentSnapshot(snapshotA.snapshotId()).commit();
        this.table.manageSnapshots().cherrypick(snapshotB.snapshotId()).commit();
        Snapshot snapshotD = this.table.currentSnapshot();
        this.table.manageSnapshots().setCurrentSnapshot(snapshotC.snapshotId()).commit();
        ArrayList deletedFiles = Lists.newArrayList();
        ExpireSnapshots.Result result = SparkActions.get().expireSnapshots(this.table).deleteWith(deletedFiles::add).expireOlderThan(snapshotC.timestampMillis() + 1L).execute();
        Lists.newArrayList((Object[])new Snapshot[]{snapshotB, snapshotC, snapshotD}).forEach(i -> i.addedDataFiles(this.table.io()).forEach(item -> Assertions.assertThat((List)deletedFiles).doesNotContain((Object[])new String[]{item.path().toString()})));
        this.checkExpirationResults(1L, 0L, 0L, 2L, 2L, result);
    }

    @Test
    public void testWithExpiringStagedThenCherrypick() {
        this.table.newAppend().appendFile(FILE_A).commit();
        ((AppendFiles)this.table.newAppend().appendFile(FILE_B).stageOnly()).commit();
        TableMetadata base = ((BaseTable)this.table).operations().current();
        Snapshot snapshotB = (Snapshot)base.snapshots().get(1);
        this.table.newAppend().appendFile(FILE_C).commit();
        this.table.manageSnapshots().cherrypick(snapshotB.snapshotId()).commit();
        base = ((BaseTable)this.table).operations().current();
        Snapshot snapshotD = (Snapshot)base.snapshots().get(3);
        ArrayList deletedFiles = Lists.newArrayList();
        ExpireSnapshots.Result firstResult = SparkActions.get().expireSnapshots(this.table).deleteWith(deletedFiles::add).expireSnapshotId(snapshotB.snapshotId()).execute();
        Lists.newArrayList((Object[])new Snapshot[]{snapshotB}).forEach(i -> i.addedDataFiles(this.table.io()).forEach(item -> Assertions.assertThat((List)deletedFiles).doesNotContain((Object[])new String[]{item.path().toString()})));
        this.checkExpirationResults(0L, 0L, 0L, 1L, 1L, firstResult);
        ExpireSnapshots.Result secondResult = SparkActions.get().expireSnapshots(this.table).deleteWith(deletedFiles::add).expireOlderThan(this.table.currentSnapshot().timestampMillis() + 1L).execute();
        Lists.newArrayList((Object[])new Snapshot[]{snapshotB, snapshotD}).forEach(i -> i.addedDataFiles(this.table.io()).forEach(item -> Assertions.assertThat((List)deletedFiles).doesNotContain((Object[])new String[]{item.path().toString()})));
        this.checkExpirationResults(0L, 0L, 0L, 0L, 2L, secondResult);
    }

    @Test
    public void testExpireOlderThan() {
        this.table.newAppend().appendFile(FILE_A).commit();
        Snapshot firstSnapshot = this.table.currentSnapshot();
        this.rightAfterSnapshot();
        this.table.newAppend().appendFile(FILE_B).commit();
        long snapshotId = this.table.currentSnapshot().snapshotId();
        long tAfterCommits = this.rightAfterSnapshot();
        HashSet deletedFiles = Sets.newHashSet();
        ExpireSnapshots.Result result = SparkActions.get().expireSnapshots(this.table).expireOlderThan(tAfterCommits).deleteWith(deletedFiles::add).execute();
        ((AbstractLongAssert)Assertions.assertThat((long)this.table.currentSnapshot().snapshotId()).as("Expire should not change current snapshot.", new Object[0])).isEqualTo(snapshotId);
        ((ObjectAssert)Assertions.assertThat((Object)this.table.snapshot(firstSnapshot.snapshotId())).as("Expire should remove the oldest snapshot.", new Object[0])).isNull();
        ((AbstractCollectionAssert)Assertions.assertThat((Collection)deletedFiles).as("Should remove only the expired manifest list location.", new Object[0])).isEqualTo((Object)Sets.newHashSet((Object[])new String[]{firstSnapshot.manifestListLocation()}));
        this.checkExpirationResults(0L, 0L, 0L, 0L, 1L, result);
    }

    @Test
    public void testExpireOlderThanWithDelete() {
        this.table.newAppend().appendFile(FILE_A).commit();
        Snapshot firstSnapshot = this.table.currentSnapshot();
        ((ListAssert)Assertions.assertThat((List)firstSnapshot.allManifests(this.table.io())).as("Should create one manifest", new Object[0])).hasSize(1);
        this.rightAfterSnapshot();
        this.table.newDelete().deleteFile(FILE_A).commit();
        Snapshot secondSnapshot = this.table.currentSnapshot();
        ((ListAssert)Assertions.assertThat((List)secondSnapshot.allManifests(this.table.io())).as("Should create replace manifest with a rewritten manifest", new Object[0])).hasSize(1);
        this.table.newAppend().appendFile(FILE_B).commit();
        this.rightAfterSnapshot();
        long snapshotId = this.table.currentSnapshot().snapshotId();
        long tAfterCommits = this.rightAfterSnapshot();
        HashSet deletedFiles = Sets.newHashSet();
        ExpireSnapshots.Result result = SparkActions.get().expireSnapshots(this.table).expireOlderThan(tAfterCommits).deleteWith(deletedFiles::add).execute();
        ((AbstractLongAssert)Assertions.assertThat((long)this.table.currentSnapshot().snapshotId()).as("Expire should not change current snapshot.", new Object[0])).isEqualTo(snapshotId);
        ((ObjectAssert)Assertions.assertThat((Object)this.table.snapshot(firstSnapshot.snapshotId())).as("Expire should remove the oldest snapshot.", new Object[0])).isNull();
        ((ObjectAssert)Assertions.assertThat((Object)this.table.snapshot(secondSnapshot.snapshotId())).as("Expire should remove the second oldest snapshot.", new Object[0])).isNull();
        ((AbstractCollectionAssert)Assertions.assertThat((Collection)deletedFiles).as("Should remove expired manifest lists and deleted data file.", new Object[0])).isEqualTo((Object)Sets.newHashSet((Object[])new CharSequence[]{firstSnapshot.manifestListLocation(), ((ManifestFile)firstSnapshot.allManifests(this.table.io()).get(0)).path(), secondSnapshot.manifestListLocation(), ((ManifestFile)secondSnapshot.allManifests(this.table.io()).get(0)).path(), FILE_A.path()}));
        this.checkExpirationResults(1L, 0L, 0L, 2L, 2L, result);
    }

    @Test
    public void testExpireOlderThanWithDeleteInMergedManifests() {
        this.table.updateProperties().set("commit.manifest.min-count-to-merge", "0").commit();
        this.table.newAppend().appendFile(FILE_A).appendFile(FILE_B).commit();
        Snapshot firstSnapshot = this.table.currentSnapshot();
        ((ListAssert)Assertions.assertThat((List)firstSnapshot.allManifests(this.table.io())).as("Should create one manifest", new Object[0])).hasSize(1);
        this.rightAfterSnapshot();
        this.table.newDelete().deleteFile(FILE_A).commit();
        Snapshot secondSnapshot = this.table.currentSnapshot();
        ((ListAssert)Assertions.assertThat((List)secondSnapshot.allManifests(this.table.io())).as("Should replace manifest with a rewritten manifest", new Object[0])).hasSize(1);
        this.table.newFastAppend().appendFile(FILE_C).commit();
        this.rightAfterSnapshot();
        long snapshotId = this.table.currentSnapshot().snapshotId();
        long tAfterCommits = this.rightAfterSnapshot();
        HashSet deletedFiles = Sets.newHashSet();
        ExpireSnapshots.Result result = SparkActions.get().expireSnapshots(this.table).expireOlderThan(tAfterCommits).deleteWith(deletedFiles::add).execute();
        ((AbstractLongAssert)Assertions.assertThat((long)this.table.currentSnapshot().snapshotId()).as("Expire should not change current snapshot.", new Object[0])).isEqualTo(snapshotId);
        ((ObjectAssert)Assertions.assertThat((Object)this.table.snapshot(firstSnapshot.snapshotId())).as("Expire should remove the oldest snapshot.", new Object[0])).isNull();
        ((ObjectAssert)Assertions.assertThat((Object)this.table.snapshot(secondSnapshot.snapshotId())).as("Expire should remove the second oldest snapshot.", new Object[0])).isNull();
        ((AbstractCollectionAssert)Assertions.assertThat((Collection)deletedFiles).as("Should remove expired manifest lists and deleted data file.", new Object[0])).isEqualTo((Object)Sets.newHashSet((Object[])new CharSequence[]{firstSnapshot.manifestListLocation(), ((ManifestFile)firstSnapshot.allManifests(this.table.io()).get(0)).path(), secondSnapshot.manifestListLocation(), FILE_A.path()}));
        this.checkExpirationResults(1L, 0L, 0L, 1L, 2L, result);
    }

    @Test
    public void testExpireOlderThanWithRollback() {
        this.table.updateProperties().set("commit.manifest.min-count-to-merge", "0").commit();
        this.table.newAppend().appendFile(FILE_A).appendFile(FILE_B).commit();
        Snapshot firstSnapshot = this.table.currentSnapshot();
        ((ListAssert)Assertions.assertThat((List)firstSnapshot.allManifests(this.table.io())).as("Should create one manifest", new Object[0])).hasSize(1);
        this.rightAfterSnapshot();
        this.table.newDelete().deleteFile(FILE_B).commit();
        Snapshot secondSnapshot = this.table.currentSnapshot();
        HashSet secondSnapshotManifests = Sets.newHashSet((Iterable)secondSnapshot.allManifests(this.table.io()));
        secondSnapshotManifests.removeAll(firstSnapshot.allManifests(this.table.io()));
        ((AbstractCollectionAssert)Assertions.assertThat((Collection)secondSnapshotManifests).as("Should add one new manifest for append", new Object[0])).hasSize(1);
        this.table.manageSnapshots().rollbackTo(firstSnapshot.snapshotId()).commit();
        long tAfterCommits = this.rightAfterSnapshot(secondSnapshot.snapshotId());
        long snapshotId = this.table.currentSnapshot().snapshotId();
        HashSet deletedFiles = Sets.newHashSet();
        ExpireSnapshots.Result result = SparkActions.get().expireSnapshots(this.table).expireOlderThan(tAfterCommits).deleteWith(deletedFiles::add).execute();
        ((AbstractLongAssert)Assertions.assertThat((long)this.table.currentSnapshot().snapshotId()).as("Expire should not change current snapshot.", new Object[0])).isEqualTo(snapshotId);
        ((ObjectAssert)Assertions.assertThat((Object)this.table.snapshot(firstSnapshot.snapshotId())).as("Expire should keep the oldest snapshot, current.", new Object[0])).isNotNull();
        ((ObjectAssert)Assertions.assertThat((Object)this.table.snapshot(secondSnapshot.snapshotId())).as("Expire should remove the orphaned snapshot.", new Object[0])).isNull();
        ((AbstractCollectionAssert)Assertions.assertThat((Collection)deletedFiles).as("Should remove expired manifest lists and reverted appended data file", new Object[0])).isEqualTo((Object)Sets.newHashSet((Object[])new String[]{secondSnapshot.manifestListLocation(), ((ManifestFile)Iterables.getOnlyElement((Iterable)secondSnapshotManifests)).path()}));
        this.checkExpirationResults(0L, 0L, 0L, 1L, 1L, result);
    }

    @Test
    public void testExpireOlderThanWithRollbackAndMergedManifests() {
        this.table.newAppend().appendFile(FILE_A).commit();
        Snapshot firstSnapshot = this.table.currentSnapshot();
        ((ListAssert)Assertions.assertThat((List)firstSnapshot.allManifests(this.table.io())).as("Should create one manifest", new Object[0])).hasSize(1);
        this.rightAfterSnapshot();
        this.table.newAppend().appendFile(FILE_B).commit();
        Snapshot secondSnapshot = this.table.currentSnapshot();
        HashSet secondSnapshotManifests = Sets.newHashSet((Iterable)secondSnapshot.allManifests(this.table.io()));
        secondSnapshotManifests.removeAll(firstSnapshot.allManifests(this.table.io()));
        ((AbstractCollectionAssert)Assertions.assertThat((Collection)secondSnapshotManifests).as("Should add one new manifest for append", new Object[0])).hasSize(1);
        this.table.manageSnapshots().rollbackTo(firstSnapshot.snapshotId()).commit();
        long tAfterCommits = this.rightAfterSnapshot(secondSnapshot.snapshotId());
        long snapshotId = this.table.currentSnapshot().snapshotId();
        HashSet deletedFiles = Sets.newHashSet();
        ExpireSnapshots.Result result = SparkActions.get().expireSnapshots(this.table).expireOlderThan(tAfterCommits).deleteWith(deletedFiles::add).execute();
        ((AbstractLongAssert)Assertions.assertThat((long)this.table.currentSnapshot().snapshotId()).as("Expire should not change current snapshot.", new Object[0])).isEqualTo(snapshotId);
        ((ObjectAssert)Assertions.assertThat((Object)this.table.snapshot(firstSnapshot.snapshotId())).as("Expire should keep the oldest snapshot, current.", new Object[0])).isNotNull();
        ((ObjectAssert)Assertions.assertThat((Object)this.table.snapshot(secondSnapshot.snapshotId())).as("Expire should remove the orphaned snapshot.", new Object[0])).isNull();
        ((AbstractCollectionAssert)Assertions.assertThat((Collection)deletedFiles).as("Should remove expired manifest lists and reverted appended data file", new Object[0])).isEqualTo((Object)Sets.newHashSet((Object[])new CharSequence[]{secondSnapshot.manifestListLocation(), ((ManifestFile)Iterables.getOnlyElement((Iterable)secondSnapshotManifests)).path(), FILE_B.path()}));
        this.checkExpirationResults(1L, 0L, 0L, 1L, 1L, result);
    }

    @Test
    public void testExpireOlderThanWithDeleteFile() {
        this.table.updateProperties().set("format-version", "2").set("commit.manifest-merge.enabled", "false").commit();
        this.table.newAppend().appendFile(FILE_A).commit();
        Snapshot firstSnapshot = this.table.currentSnapshot();
        this.table.newRowDelta().addDeletes(FILE_A_POS_DELETES).commit();
        Snapshot secondSnapshot = this.table.currentSnapshot();
        this.table.newRowDelta().addDeletes(FILE_A_EQ_DELETES).commit();
        Snapshot thirdSnapshot = this.table.currentSnapshot();
        this.table.newDelete().deleteFromRowFilter((Expression)Expressions.alwaysTrue()).commit();
        Snapshot fourthSnapshot = this.table.currentSnapshot();
        long afterAllDeleted = this.rightAfterSnapshot();
        this.table.newAppend().appendFile(FILE_B).commit();
        HashSet deletedFiles = Sets.newHashSet();
        ExpireSnapshots.Result result = SparkActions.get().expireSnapshots(this.table).expireOlderThan(afterAllDeleted).deleteWith(deletedFiles::add).execute();
        HashSet expectedDeletes = Sets.newHashSet((Object[])new String[]{firstSnapshot.manifestListLocation(), secondSnapshot.manifestListLocation(), thirdSnapshot.manifestListLocation(), fourthSnapshot.manifestListLocation(), FILE_A.path().toString(), FILE_A_POS_DELETES.path().toString(), FILE_A_EQ_DELETES.path().toString()});
        expectedDeletes.addAll(thirdSnapshot.allManifests(this.table.io()).stream().map(ManifestFile::path).map(CharSequence::toString).collect(Collectors.toSet()));
        expectedDeletes.addAll(fourthSnapshot.allManifests(this.table.io()).stream().map(ManifestFile::path).map(CharSequence::toString).collect(Collectors.toSet()));
        ((AbstractCollectionAssert)Assertions.assertThat((Collection)deletedFiles).as("Should remove expired manifest lists and deleted data file", new Object[0])).isEqualTo((Object)expectedDeletes);
        this.checkExpirationResults(1L, 1L, 1L, 6L, 4L, result);
    }

    @Test
    public void testExpireOnEmptyTable() {
        HashSet deletedFiles = Sets.newHashSet();
        ExpireSnapshots.Result result = SparkActions.get().expireSnapshots(this.table).expireOlderThan(System.currentTimeMillis()).deleteWith(deletedFiles::add).execute();
        this.checkExpirationResults(0L, 0L, 0L, 0L, 0L, result);
    }

    @Test
    public void testExpireAction() {
        this.table.newAppend().appendFile(FILE_A).commit();
        Snapshot firstSnapshot = this.table.currentSnapshot();
        this.rightAfterSnapshot();
        this.table.newAppend().appendFile(FILE_B).commit();
        long snapshotId = this.table.currentSnapshot().snapshotId();
        long tAfterCommits = this.rightAfterSnapshot();
        HashSet deletedFiles = Sets.newHashSet();
        ExpireSnapshotsSparkAction action = SparkActions.get().expireSnapshots(this.table).expireOlderThan(tAfterCommits).deleteWith(deletedFiles::add);
        Dataset pendingDeletes = action.expireFiles();
        List pending = pendingDeletes.collectAsList();
        ((AbstractLongAssert)Assertions.assertThat((long)this.table.currentSnapshot().snapshotId()).as("Should not change current snapshot.", new Object[0])).isEqualTo(snapshotId);
        ((ObjectAssert)Assertions.assertThat((Object)this.table.snapshot(firstSnapshot.snapshotId())).as("Should remove the oldest snapshot", new Object[0])).isNull();
        ((ListAssert)Assertions.assertThat((List)pending).as("Pending deletes should contain one row", new Object[0])).hasSize(1);
        ((AbstractStringAssert)Assertions.assertThat((String)((FileInfo)pending.get(0)).getPath()).as("Pending delete should be the expired manifest list location", new Object[0])).isEqualTo(firstSnapshot.manifestListLocation());
        ((AbstractStringAssert)Assertions.assertThat((String)((FileInfo)pending.get(0)).getType()).as("Pending delete should be a manifest list", new Object[0])).isEqualTo("Manifest List");
        ((AbstractCollectionAssert)Assertions.assertThat((Collection)deletedFiles).as("Should not delete any files", new Object[0])).hasSize(0);
        ((AbstractLongAssert)Assertions.assertThat((long)action.expireFiles().count()).as("Multiple calls to expire should return the same count of deleted files", new Object[0])).isEqualTo(pendingDeletes.count());
    }

    @Test
    public void testUseLocalIterator() {
        this.table.newFastAppend().appendFile(FILE_A).commit();
        this.table.newOverwrite().deleteFile(FILE_A).addFile(FILE_B).commit();
        this.table.newFastAppend().appendFile(FILE_C).commit();
        long end = this.rightAfterSnapshot();
        int jobsBeforeStreamResults = spark.sparkContext().dagScheduler().nextJobId().get();
        this.withSQLConf((Map<String, String>)ImmutableMap.of((Object)"spark.sql.adaptive.enabled", (Object)"false"), () -> {
            ExpireSnapshots.Result results = ((ExpireSnapshotsSparkAction)SparkActions.get().expireSnapshots(this.table).expireOlderThan(end).option("stream-results", "true")).execute();
            int jobsAfterStreamResults = spark.sparkContext().dagScheduler().nextJobId().get();
            int jobsRunDuringStreamResults = jobsAfterStreamResults - jobsBeforeStreamResults;
            this.checkExpirationResults(1L, 0L, 0L, 1L, 2L, results);
            ((AbstractIntegerAssert)Assertions.assertThat((int)jobsRunDuringStreamResults).as("Expected total number of jobs with stream-results should match the expected number", new Object[0])).isEqualTo(4L);
        });
    }

    @Test
    public void testExpireAfterExecute() {
        this.table.newAppend().appendFile(FILE_A).commit();
        this.rightAfterSnapshot();
        this.table.newAppend().appendFile(FILE_B).commit();
        this.table.newAppend().appendFile(FILE_C).commit();
        long t3 = this.rightAfterSnapshot();
        ExpireSnapshotsSparkAction action = SparkActions.get().expireSnapshots(this.table);
        action.expireOlderThan(t3).retainLast(2);
        ExpireSnapshots.Result result = action.execute();
        this.checkExpirationResults(0L, 0L, 0L, 0L, 1L, result);
        List typedExpiredFiles = action.expireFiles().collectAsList();
        ((ListAssert)Assertions.assertThat((List)typedExpiredFiles).as("Expired results must match", new Object[0])).hasSize(1);
        List untypedExpiredFiles = action.expireFiles().collectAsList();
        ((ListAssert)Assertions.assertThat((List)untypedExpiredFiles).as("Expired results must match", new Object[0])).hasSize(1);
    }

    @Test
    public void testExpireFileDeletionMostExpired() {
        this.textExpireAllCheckFilesDeleted(5, 2);
    }

    @Test
    public void testExpireFileDeletionMostRetained() {
        this.textExpireAllCheckFilesDeleted(2, 5);
    }

    public void textExpireAllCheckFilesDeleted(int dataFilesExpired, int dataFilesRetained) {
        HashSet dataFiles = Sets.newHashSet();
        for (int i = 0; i < dataFilesExpired; ++i) {
            DataFile df = DataFiles.builder((PartitionSpec)SPEC).withPath(String.format("/path/to/data-expired-%d.parquet", i)).withFileSizeInBytes(10L).withPartitionPath("c1=1").withRecordCount(1L).build();
            dataFiles.add(df.path().toString());
            this.table.newFastAppend().appendFile(df).commit();
        }
        this.table.newDelete().deleteFromRowFilter((Expression)Expressions.alwaysTrue()).commit();
        this.table.newDelete().deleteFromRowFilter((Expression)Expressions.alwaysTrue()).commit();
        Set<String> manifestsBefore = TestHelpers.reachableManifestPaths(this.table);
        for (int i = 0; i < dataFilesRetained; ++i) {
            DataFile df = DataFiles.builder((PartitionSpec)SPEC).withPath(String.format("/path/to/data-retained-%d.parquet", i)).withFileSizeInBytes(10L).withPartitionPath("c1=1").withRecordCount(1L).build();
            this.table.newFastAppend().appendFile(df).commit();
        }
        long end = this.rightAfterSnapshot();
        HashSet expectedDeletes = Sets.newHashSet();
        expectedDeletes.addAll(ReachableFileUtil.manifestListLocations((Table)this.table));
        expectedDeletes.remove(this.table.currentSnapshot().manifestListLocation());
        expectedDeletes.addAll(manifestsBefore);
        expectedDeletes.addAll(dataFiles);
        HashSet deletedFiles = Sets.newHashSet();
        SparkActions.get().expireSnapshots(this.table).expireOlderThan(end).deleteWith(deletedFiles::add).execute();
        ((AbstractCollectionAssert)Assertions.assertThat((Collection)deletedFiles).as("All reachable files before expiration should be deleted", new Object[0])).isEqualTo((Object)expectedDeletes);
    }

    @Test
    public void testExpireSomeCheckFilesDeleted() {
        this.table.newAppend().appendFile(FILE_A).commit();
        this.table.newAppend().appendFile(FILE_B).commit();
        this.table.newAppend().appendFile(FILE_C).commit();
        this.table.newDelete().deleteFile(FILE_A).commit();
        long after = this.rightAfterSnapshot();
        this.waitUntilAfter(after);
        this.table.newAppend().appendFile(FILE_D).commit();
        this.table.newDelete().deleteFile(FILE_B).commit();
        HashSet deletedFiles = Sets.newHashSet();
        SparkActions.get().expireSnapshots(this.table).expireOlderThan(after).deleteWith(deletedFiles::add).execute();
        Assertions.assertThat((Collection)deletedFiles).contains((Object[])new String[]{FILE_A.path().toString()});
        Assertions.assertThat((Collection)deletedFiles).doesNotContain((Object[])new String[]{FILE_B.path().toString()});
        Assertions.assertThat((Collection)deletedFiles).doesNotContain((Object[])new String[]{FILE_C.path().toString()});
        Assertions.assertThat((Collection)deletedFiles).doesNotContain((Object[])new String[]{FILE_D.path().toString()});
    }
}

