/*
 * Decompiled with CFR 0.152.
 */
package org.apache.iceberg.spark.actions;

import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import org.apache.hadoop.conf.Configuration;
import org.apache.iceberg.AppendFiles;
import org.apache.iceberg.AssertHelpers;
import org.apache.iceberg.BaseTable;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.DataFiles;
import org.apache.iceberg.DeleteFile;
import org.apache.iceberg.FileMetadata;
import org.apache.iceberg.ManifestFile;
import org.apache.iceberg.OverwriteFiles;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.ReachableFileUtil;
import org.apache.iceberg.Schema;
import org.apache.iceberg.Snapshot;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableMetadata;
import org.apache.iceberg.actions.ExpireSnapshots;
import org.apache.iceberg.exceptions.ValidationException;
import org.apache.iceberg.expressions.Expression;
import org.apache.iceberg.expressions.Expressions;
import org.apache.iceberg.hadoop.HadoopTables;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.apache.iceberg.spark.SparkTestBase;
import org.apache.iceberg.spark.actions.ExpireSnapshotsSparkAction;
import org.apache.iceberg.spark.actions.FileInfo;
import org.apache.iceberg.spark.actions.SparkActions;
import org.apache.iceberg.spark.data.TestHelpers;
import org.apache.iceberg.types.Type;
import org.apache.iceberg.types.Types;
import org.apache.spark.sql.Dataset;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;

public class TestExpireSnapshotsAction
extends SparkTestBase {
    private static final HadoopTables TABLES = new HadoopTables(new Configuration());
    private static final Schema SCHEMA = new Schema(new Types.NestedField[]{Types.NestedField.optional((int)1, (String)"c1", (Type)Types.IntegerType.get()), Types.NestedField.optional((int)2, (String)"c2", (Type)Types.StringType.get()), Types.NestedField.optional((int)3, (String)"c3", (Type)Types.StringType.get())});
    private static final int SHUFFLE_PARTITIONS = 2;
    private static final PartitionSpec SPEC = PartitionSpec.builderFor((Schema)SCHEMA).identity("c1").build();
    static final DataFile FILE_A = DataFiles.builder((PartitionSpec)SPEC).withPath("/path/to/data-a.parquet").withFileSizeInBytes(10L).withPartitionPath("c1=0").withRecordCount(1L).build();
    static final DataFile FILE_B = DataFiles.builder((PartitionSpec)SPEC).withPath("/path/to/data-b.parquet").withFileSizeInBytes(10L).withPartitionPath("c1=1").withRecordCount(1L).build();
    static final DataFile FILE_C = DataFiles.builder((PartitionSpec)SPEC).withPath("/path/to/data-c.parquet").withFileSizeInBytes(10L).withPartitionPath("c1=2").withRecordCount(1L).build();
    static final DataFile FILE_D = DataFiles.builder((PartitionSpec)SPEC).withPath("/path/to/data-d.parquet").withFileSizeInBytes(10L).withPartitionPath("c1=3").withRecordCount(1L).build();
    static final DeleteFile FILE_A_POS_DELETES = FileMetadata.deleteFileBuilder((PartitionSpec)SPEC).ofPositionDeletes().withPath("/path/to/data-a-pos-deletes.parquet").withFileSizeInBytes(10L).withPartitionPath("c1=0").withRecordCount(1L).build();
    static final DeleteFile FILE_A_EQ_DELETES = FileMetadata.deleteFileBuilder((PartitionSpec)SPEC).ofEqualityDeletes(new int[0]).withPath("/path/to/data-a-eq-deletes.parquet").withFileSizeInBytes(10L).withPartitionPath("c1=0").withRecordCount(1L).build();
    @Rule
    public TemporaryFolder temp = new TemporaryFolder();
    private File tableDir;
    private String tableLocation;
    private Table table;

    @Before
    public void setupTableLocation() throws Exception {
        this.tableDir = this.temp.newFolder();
        this.tableLocation = this.tableDir.toURI().toString();
        this.table = TABLES.create(SCHEMA, SPEC, (Map)Maps.newHashMap(), this.tableLocation);
        spark.conf().set("spark.sql.shuffle.partitions", 2L);
    }

    private Long rightAfterSnapshot() {
        return this.rightAfterSnapshot(this.table.currentSnapshot().snapshotId());
    }

    private Long rightAfterSnapshot(long snapshotId) {
        Long end = System.currentTimeMillis();
        while (end <= this.table.snapshot(snapshotId).timestampMillis()) {
            end = System.currentTimeMillis();
        }
        return end;
    }

    private void checkExpirationResults(long expectedDatafiles, long expectedPosDeleteFiles, long expectedEqDeleteFiles, long expectedManifestsDeleted, long expectedManifestListsDeleted, ExpireSnapshots.Result results) {
        Assert.assertEquals((String)"Incorrect number of manifest files deleted", (long)expectedManifestsDeleted, (long)results.deletedManifestsCount());
        Assert.assertEquals((String)"Incorrect number of datafiles deleted", (long)expectedDatafiles, (long)results.deletedDataFilesCount());
        Assert.assertEquals((String)"Incorrect number of pos deletefiles deleted", (long)expectedPosDeleteFiles, (long)results.deletedPositionDeleteFilesCount());
        Assert.assertEquals((String)"Incorrect number of eq deletefiles deleted", (long)expectedEqDeleteFiles, (long)results.deletedEqualityDeleteFilesCount());
        Assert.assertEquals((String)"Incorrect number of manifest lists deleted", (long)expectedManifestListsDeleted, (long)results.deletedManifestListsCount());
    }

    @Test
    public void testFilesCleaned() throws Exception {
        this.table.newFastAppend().appendFile(FILE_A).commit();
        this.table.newOverwrite().deleteFile(FILE_A).addFile(FILE_B).commit();
        this.table.newFastAppend().appendFile(FILE_C).commit();
        long end = this.rightAfterSnapshot();
        ExpireSnapshots.Result results = SparkActions.get().expireSnapshots(this.table).expireOlderThan(end).execute();
        Assert.assertEquals((String)"Table does not have 1 snapshot after expiration", (long)1L, (long)Iterables.size((Iterable)this.table.snapshots()));
        this.checkExpirationResults(1L, 0L, 0L, 1L, 2L, results);
    }

    @Test
    public void dataFilesCleanupWithParallelTasks() throws IOException {
        this.table.newFastAppend().appendFile(FILE_A).commit();
        this.table.newFastAppend().appendFile(FILE_B).commit();
        this.table.newRewrite().rewriteFiles((Set)ImmutableSet.of((Object)FILE_B), (Set)ImmutableSet.of((Object)FILE_D)).commit();
        this.table.newRewrite().rewriteFiles((Set)ImmutableSet.of((Object)FILE_A), (Set)ImmutableSet.of((Object)FILE_C)).commit();
        long t4 = this.rightAfterSnapshot();
        HashSet deletedFiles = Sets.newHashSet();
        ConcurrentHashMap.KeySetView deleteThreads = ConcurrentHashMap.newKeySet();
        AtomicInteger deleteThreadsIndex = new AtomicInteger(0);
        ExpireSnapshots.Result result = SparkActions.get().expireSnapshots(this.table).executeDeleteWith(Executors.newFixedThreadPool(4, runnable -> {
            Thread thread = new Thread(runnable);
            thread.setName("remove-snapshot-" + deleteThreadsIndex.getAndIncrement());
            thread.setDaemon(true);
            return thread;
        })).expireOlderThan(t4).deleteWith(s -> {
            deleteThreads.add(Thread.currentThread().getName());
            deletedFiles.add(s);
        }).execute();
        Assert.assertEquals(deleteThreads, (Object)Sets.newHashSet((Object[])new String[]{"remove-snapshot-0", "remove-snapshot-1", "remove-snapshot-2", "remove-snapshot-3"}));
        Assert.assertTrue((String)"FILE_A should be deleted", (boolean)deletedFiles.contains(FILE_A.path().toString()));
        Assert.assertTrue((String)"FILE_B should be deleted", (boolean)deletedFiles.contains(FILE_B.path().toString()));
        this.checkExpirationResults(2L, 0L, 0L, 3L, 3L, result);
    }

    @Test
    public void testNoFilesDeletedWhenNoSnapshotsExpired() throws Exception {
        this.table.newFastAppend().appendFile(FILE_A).commit();
        ExpireSnapshots.Result results = SparkActions.get().expireSnapshots(this.table).execute();
        this.checkExpirationResults(0L, 0L, 0L, 0L, 0L, results);
    }

    @Test
    public void testCleanupRepeatedOverwrites() throws Exception {
        this.table.newFastAppend().appendFile(FILE_A).commit();
        for (int i = 0; i < 10; ++i) {
            this.table.newOverwrite().deleteFile(FILE_A).addFile(FILE_B).commit();
            this.table.newOverwrite().deleteFile(FILE_B).addFile(FILE_A).commit();
        }
        long end = this.rightAfterSnapshot();
        ExpireSnapshots.Result results = SparkActions.get().expireSnapshots(this.table).expireOlderThan(end).execute();
        this.checkExpirationResults(1L, 0L, 0L, 39L, 20L, results);
    }

    @Test
    public void testRetainLastWithExpireOlderThan() {
        this.table.newAppend().appendFile(FILE_A).commit();
        long firstSnapshotId = this.table.currentSnapshot().snapshotId();
        long t1 = System.currentTimeMillis();
        while (t1 <= this.table.currentSnapshot().timestampMillis()) {
            t1 = System.currentTimeMillis();
        }
        this.table.newAppend().appendFile(FILE_B).commit();
        this.table.newAppend().appendFile(FILE_C).commit();
        long t3 = this.rightAfterSnapshot();
        SparkActions.get().expireSnapshots(this.table).expireOlderThan(t3).retainLast(2).execute();
        Assert.assertEquals((String)"Should have two snapshots.", (long)2L, (long)Lists.newArrayList((Iterable)this.table.snapshots()).size());
        Assert.assertEquals((String)"First snapshot should not present.", null, (Object)this.table.snapshot(firstSnapshotId));
    }

    @Test
    public void testExpireTwoSnapshotsById() throws Exception {
        this.table.newAppend().appendFile(FILE_A).commit();
        long firstSnapshotId = this.table.currentSnapshot().snapshotId();
        this.table.newAppend().appendFile(FILE_B).commit();
        long secondSnapshotID = this.table.currentSnapshot().snapshotId();
        this.table.newAppend().appendFile(FILE_C).commit();
        ExpireSnapshots.Result result = SparkActions.get().expireSnapshots(this.table).expireSnapshotId(firstSnapshotId).expireSnapshotId(secondSnapshotID).execute();
        Assert.assertEquals((String)"Should have one snapshots.", (long)1L, (long)Lists.newArrayList((Iterable)this.table.snapshots()).size());
        Assert.assertEquals((String)"First snapshot should not present.", null, (Object)this.table.snapshot(firstSnapshotId));
        Assert.assertEquals((String)"Second snapshot should not be present.", null, (Object)this.table.snapshot(secondSnapshotID));
        this.checkExpirationResults(0L, 0L, 0L, 0L, 2L, result);
    }

    @Test
    public void testRetainLastWithExpireById() {
        this.table.newAppend().appendFile(FILE_A).commit();
        long firstSnapshotId = this.table.currentSnapshot().snapshotId();
        this.table.newAppend().appendFile(FILE_B).commit();
        this.table.newAppend().appendFile(FILE_C).commit();
        ExpireSnapshots.Result result = SparkActions.get().expireSnapshots(this.table).expireSnapshotId(firstSnapshotId).retainLast(3).execute();
        Assert.assertEquals((String)"Should have two snapshots.", (long)2L, (long)Lists.newArrayList((Iterable)this.table.snapshots()).size());
        Assert.assertEquals((String)"First snapshot should not present.", null, (Object)this.table.snapshot(firstSnapshotId));
        this.checkExpirationResults(0L, 0L, 0L, 0L, 1L, result);
    }

    @Test
    public void testRetainLastWithTooFewSnapshots() {
        this.table.newAppend().appendFile(FILE_A).appendFile(FILE_B).commit();
        long firstSnapshotId = this.table.currentSnapshot().snapshotId();
        this.table.newAppend().appendFile(FILE_C).commit();
        long t2 = this.rightAfterSnapshot();
        ExpireSnapshots.Result result = SparkActions.get().expireSnapshots(this.table).expireOlderThan(t2).retainLast(3).execute();
        Assert.assertEquals((String)"Should have two snapshots", (long)2L, (long)Lists.newArrayList((Iterable)this.table.snapshots()).size());
        Assert.assertEquals((String)"First snapshot should still present", (long)firstSnapshotId, (long)this.table.snapshot(firstSnapshotId).snapshotId());
        this.checkExpirationResults(0L, 0L, 0L, 0L, 0L, result);
    }

    @Test
    public void testRetainLastKeepsExpiringSnapshot() {
        this.table.newAppend().appendFile(FILE_A).commit();
        this.table.newAppend().appendFile(FILE_B).commit();
        Snapshot secondSnapshot = this.table.currentSnapshot();
        this.table.newAppend().appendFile(FILE_C).commit();
        this.table.newAppend().appendFile(FILE_D).commit();
        ExpireSnapshots.Result result = SparkActions.get().expireSnapshots(this.table).expireOlderThan(secondSnapshot.timestampMillis()).retainLast(2).execute();
        Assert.assertEquals((String)"Should have three snapshots.", (long)3L, (long)Lists.newArrayList((Iterable)this.table.snapshots()).size());
        Assert.assertNotNull((String)"Second snapshot should present.", (Object)this.table.snapshot(secondSnapshot.snapshotId()));
        this.checkExpirationResults(0L, 0L, 0L, 0L, 1L, result);
    }

    @Test
    public void testExpireSnapshotsWithDisabledGarbageCollection() {
        this.table.updateProperties().set("gc.enabled", "false").commit();
        this.table.newAppend().appendFile(FILE_A).commit();
        AssertHelpers.assertThrows((String)"Should complain about expiring snapshots", ValidationException.class, (String)"Cannot expire snapshots: GC is disabled", () -> SparkActions.get().expireSnapshots(this.table));
    }

    @Test
    public void testExpireOlderThanMultipleCalls() {
        this.table.newAppend().appendFile(FILE_A).commit();
        this.table.newAppend().appendFile(FILE_B).commit();
        Snapshot secondSnapshot = this.table.currentSnapshot();
        this.table.newAppend().appendFile(FILE_C).commit();
        Snapshot thirdSnapshot = this.table.currentSnapshot();
        ExpireSnapshots.Result result = SparkActions.get().expireSnapshots(this.table).expireOlderThan(secondSnapshot.timestampMillis()).expireOlderThan(thirdSnapshot.timestampMillis()).execute();
        Assert.assertEquals((String)"Should have one snapshots.", (long)1L, (long)Lists.newArrayList((Iterable)this.table.snapshots()).size());
        Assert.assertNull((String)"Second snapshot should not present.", (Object)this.table.snapshot(secondSnapshot.snapshotId()));
        this.checkExpirationResults(0L, 0L, 0L, 0L, 2L, result);
    }

    @Test
    public void testRetainLastMultipleCalls() {
        this.table.newAppend().appendFile(FILE_A).commit();
        this.table.newAppend().appendFile(FILE_B).commit();
        Snapshot secondSnapshot = this.table.currentSnapshot();
        this.table.newAppend().appendFile(FILE_C).commit();
        long t3 = this.rightAfterSnapshot();
        ExpireSnapshots.Result result = SparkActions.get().expireSnapshots(this.table).expireOlderThan(t3).retainLast(2).retainLast(1).execute();
        Assert.assertEquals((String)"Should have one snapshots.", (long)1L, (long)Lists.newArrayList((Iterable)this.table.snapshots()).size());
        Assert.assertNull((String)"Second snapshot should not present.", (Object)this.table.snapshot(secondSnapshot.snapshotId()));
        this.checkExpirationResults(0L, 0L, 0L, 0L, 2L, result);
    }

    @Test
    public void testRetainZeroSnapshots() {
        AssertHelpers.assertThrows((String)"Should fail retain 0 snapshots because number of snapshots to retain cannot be zero", IllegalArgumentException.class, (String)"Number of snapshots to retain must be at least 1, cannot be: 0", () -> SparkActions.get().expireSnapshots(this.table).retainLast(0).execute());
    }

    @Test
    public void testScanExpiredManifestInValidSnapshotAppend() {
        this.table.newAppend().appendFile(FILE_A).appendFile(FILE_B).commit();
        this.table.newOverwrite().addFile(FILE_C).deleteFile(FILE_A).commit();
        this.table.newAppend().appendFile(FILE_D).commit();
        long t3 = this.rightAfterSnapshot();
        HashSet deletedFiles = Sets.newHashSet();
        ExpireSnapshots.Result result = SparkActions.get().expireSnapshots(this.table).expireOlderThan(t3).deleteWith(deletedFiles::add).execute();
        Assert.assertTrue((String)"FILE_A should be deleted", (boolean)deletedFiles.contains(FILE_A.path().toString()));
        this.checkExpirationResults(1L, 0L, 0L, 1L, 2L, result);
    }

    @Test
    public void testScanExpiredManifestInValidSnapshotFastAppend() {
        this.table.updateProperties().set("commit.manifest-merge.enabled", "true").set("commit.manifest.min-count-to-merge", "1").commit();
        this.table.newAppend().appendFile(FILE_A).appendFile(FILE_B).commit();
        this.table.newOverwrite().addFile(FILE_C).deleteFile(FILE_A).commit();
        this.table.newFastAppend().appendFile(FILE_D).commit();
        long t3 = this.rightAfterSnapshot();
        HashSet deletedFiles = Sets.newHashSet();
        ExpireSnapshots.Result result = SparkActions.get().expireSnapshots(this.table).expireOlderThan(t3).deleteWith(deletedFiles::add).execute();
        Assert.assertTrue((String)"FILE_A should be deleted", (boolean)deletedFiles.contains(FILE_A.path().toString()));
        this.checkExpirationResults(1L, 0L, 0L, 1L, 2L, result);
    }

    @Test
    public void testWithExpiringDanglingStageCommit() {
        this.table.newAppend().appendFile(FILE_A).commit();
        ((AppendFiles)this.table.newAppend().appendFile(FILE_B).stageOnly()).commit();
        TableMetadata base = ((BaseTable)this.table).operations().current();
        Snapshot snapshotA = (Snapshot)base.snapshots().get(0);
        Snapshot snapshotB = (Snapshot)base.snapshots().get(1);
        this.table.newAppend().appendFile(FILE_C).commit();
        HashSet deletedFiles = Sets.newHashSet();
        ExpireSnapshots.Result result = SparkActions.get().expireSnapshots(this.table).deleteWith(deletedFiles::add).expireOlderThan(snapshotB.timestampMillis() + 1L).execute();
        this.checkExpirationResults(1L, 0L, 0L, 1L, 2L, result);
        HashSet expectedDeletes = Sets.newHashSet();
        expectedDeletes.add(snapshotA.manifestListLocation());
        snapshotB.addedDataFiles(this.table.io()).forEach(i -> expectedDeletes.add(i.path().toString()));
        expectedDeletes.add(snapshotB.manifestListLocation());
        snapshotB.dataManifests(this.table.io()).forEach(file -> {
            if (file.snapshotId().longValue() == snapshotB.snapshotId()) {
                expectedDeletes.add(file.path());
            }
        });
        Assert.assertSame((String)"Files deleted count should be expected", (Object)expectedDeletes.size(), (Object)deletedFiles.size());
        expectedDeletes.removeAll(deletedFiles);
        Assert.assertTrue((String)"Exactly same files should be deleted", (boolean)expectedDeletes.isEmpty());
    }

    @Test
    public void testWithCherryPickTableSnapshot() {
        this.table.newAppend().appendFile(FILE_A).commit();
        Snapshot snapshotA = this.table.currentSnapshot();
        HashSet deletedAFiles = Sets.newHashSet();
        ((OverwriteFiles)this.table.newOverwrite().addFile(FILE_B).deleteFile(FILE_A).deleteWith(deletedAFiles::add)).commit();
        Assert.assertTrue((String)"No files should be physically deleted", (boolean)deletedAFiles.isEmpty());
        Snapshot snapshotB = this.table.currentSnapshot();
        this.table.newAppend().appendFile(FILE_C).commit();
        Snapshot snapshotC = this.table.currentSnapshot();
        this.table.manageSnapshots().setCurrentSnapshot(snapshotA.snapshotId()).commit();
        this.table.manageSnapshots().cherrypick(snapshotB.snapshotId()).commit();
        Snapshot snapshotD = this.table.currentSnapshot();
        this.table.manageSnapshots().setCurrentSnapshot(snapshotC.snapshotId()).commit();
        ArrayList deletedFiles = Lists.newArrayList();
        ExpireSnapshots.Result result = SparkActions.get().expireSnapshots(this.table).deleteWith(deletedFiles::add).expireOlderThan(snapshotC.timestampMillis() + 1L).execute();
        Lists.newArrayList((Object[])new Snapshot[]{snapshotB, snapshotC, snapshotD}).forEach(i -> i.addedDataFiles(this.table.io()).forEach(item -> Assert.assertFalse((boolean)deletedFiles.contains(item.path().toString()))));
        this.checkExpirationResults(1L, 0L, 0L, 2L, 2L, result);
    }

    @Test
    public void testWithExpiringStagedThenCherrypick() {
        this.table.newAppend().appendFile(FILE_A).commit();
        ((AppendFiles)this.table.newAppend().appendFile(FILE_B).stageOnly()).commit();
        TableMetadata base = ((BaseTable)this.table).operations().current();
        Snapshot snapshotB = (Snapshot)base.snapshots().get(1);
        this.table.newAppend().appendFile(FILE_C).commit();
        this.table.manageSnapshots().cherrypick(snapshotB.snapshotId()).commit();
        base = ((BaseTable)this.table).operations().current();
        Snapshot snapshotD = (Snapshot)base.snapshots().get(3);
        ArrayList deletedFiles = Lists.newArrayList();
        ExpireSnapshots.Result firstResult = SparkActions.get().expireSnapshots(this.table).deleteWith(deletedFiles::add).expireSnapshotId(snapshotB.snapshotId()).execute();
        Lists.newArrayList((Object[])new Snapshot[]{snapshotB}).forEach(i -> i.addedDataFiles(this.table.io()).forEach(item -> Assert.assertFalse((boolean)deletedFiles.contains(item.path().toString()))));
        this.checkExpirationResults(0L, 0L, 0L, 1L, 1L, firstResult);
        ExpireSnapshots.Result secondResult = SparkActions.get().expireSnapshots(this.table).deleteWith(deletedFiles::add).expireOlderThan(this.table.currentSnapshot().timestampMillis() + 1L).execute();
        Lists.newArrayList((Object[])new Snapshot[]{snapshotB, snapshotD}).forEach(i -> i.addedDataFiles(this.table.io()).forEach(item -> Assert.assertFalse((boolean)deletedFiles.contains(item.path().toString()))));
        this.checkExpirationResults(0L, 0L, 0L, 0L, 2L, secondResult);
    }

    @Test
    public void testExpireOlderThan() {
        this.table.newAppend().appendFile(FILE_A).commit();
        Snapshot firstSnapshot = this.table.currentSnapshot();
        this.rightAfterSnapshot();
        this.table.newAppend().appendFile(FILE_B).commit();
        long snapshotId = this.table.currentSnapshot().snapshotId();
        long tAfterCommits = this.rightAfterSnapshot();
        HashSet deletedFiles = Sets.newHashSet();
        ExpireSnapshots.Result result = SparkActions.get().expireSnapshots(this.table).expireOlderThan(tAfterCommits).deleteWith(deletedFiles::add).execute();
        Assert.assertEquals((String)"Expire should not change current snapshot", (long)snapshotId, (long)this.table.currentSnapshot().snapshotId());
        Assert.assertNull((String)"Expire should remove the oldest snapshot", (Object)this.table.snapshot(firstSnapshot.snapshotId()));
        Assert.assertEquals((String)"Should remove only the expired manifest list location", (Object)Sets.newHashSet((Object[])new String[]{firstSnapshot.manifestListLocation()}), (Object)deletedFiles);
        this.checkExpirationResults(0L, 0L, 0L, 0L, 1L, result);
    }

    @Test
    public void testExpireOlderThanWithDelete() {
        this.table.newAppend().appendFile(FILE_A).commit();
        Snapshot firstSnapshot = this.table.currentSnapshot();
        Assert.assertEquals((String)"Should create one manifest", (long)1L, (long)firstSnapshot.allManifests(this.table.io()).size());
        this.rightAfterSnapshot();
        this.table.newDelete().deleteFile(FILE_A).commit();
        Snapshot secondSnapshot = this.table.currentSnapshot();
        Assert.assertEquals((String)"Should create replace manifest with a rewritten manifest", (long)1L, (long)secondSnapshot.allManifests(this.table.io()).size());
        this.table.newAppend().appendFile(FILE_B).commit();
        this.rightAfterSnapshot();
        long snapshotId = this.table.currentSnapshot().snapshotId();
        long tAfterCommits = this.rightAfterSnapshot();
        HashSet deletedFiles = Sets.newHashSet();
        ExpireSnapshots.Result result = SparkActions.get().expireSnapshots(this.table).expireOlderThan(tAfterCommits).deleteWith(deletedFiles::add).execute();
        Assert.assertEquals((String)"Expire should not change current snapshot", (long)snapshotId, (long)this.table.currentSnapshot().snapshotId());
        Assert.assertNull((String)"Expire should remove the oldest snapshot", (Object)this.table.snapshot(firstSnapshot.snapshotId()));
        Assert.assertNull((String)"Expire should remove the second oldest snapshot", (Object)this.table.snapshot(secondSnapshot.snapshotId()));
        Assert.assertEquals((String)"Should remove expired manifest lists and deleted data file", (Object)Sets.newHashSet((Object[])new CharSequence[]{firstSnapshot.manifestListLocation(), ((ManifestFile)firstSnapshot.allManifests(this.table.io()).get(0)).path(), secondSnapshot.manifestListLocation(), ((ManifestFile)secondSnapshot.allManifests(this.table.io()).get(0)).path(), FILE_A.path()}), (Object)deletedFiles);
        this.checkExpirationResults(1L, 0L, 0L, 2L, 2L, result);
    }

    @Test
    public void testExpireOlderThanWithDeleteInMergedManifests() {
        this.table.updateProperties().set("commit.manifest.min-count-to-merge", "0").commit();
        this.table.newAppend().appendFile(FILE_A).appendFile(FILE_B).commit();
        Snapshot firstSnapshot = this.table.currentSnapshot();
        Assert.assertEquals((String)"Should create one manifest", (long)1L, (long)firstSnapshot.allManifests(this.table.io()).size());
        this.rightAfterSnapshot();
        this.table.newDelete().deleteFile(FILE_A).commit();
        Snapshot secondSnapshot = this.table.currentSnapshot();
        Assert.assertEquals((String)"Should replace manifest with a rewritten manifest", (long)1L, (long)secondSnapshot.allManifests(this.table.io()).size());
        this.table.newFastAppend().appendFile(FILE_C).commit();
        this.rightAfterSnapshot();
        long snapshotId = this.table.currentSnapshot().snapshotId();
        long tAfterCommits = this.rightAfterSnapshot();
        HashSet deletedFiles = Sets.newHashSet();
        ExpireSnapshots.Result result = SparkActions.get().expireSnapshots(this.table).expireOlderThan(tAfterCommits).deleteWith(deletedFiles::add).execute();
        Assert.assertEquals((String)"Expire should not change current snapshot", (long)snapshotId, (long)this.table.currentSnapshot().snapshotId());
        Assert.assertNull((String)"Expire should remove the oldest snapshot", (Object)this.table.snapshot(firstSnapshot.snapshotId()));
        Assert.assertNull((String)"Expire should remove the second oldest snapshot", (Object)this.table.snapshot(secondSnapshot.snapshotId()));
        Assert.assertEquals((String)"Should remove expired manifest lists and deleted data file", (Object)Sets.newHashSet((Object[])new CharSequence[]{firstSnapshot.manifestListLocation(), ((ManifestFile)firstSnapshot.allManifests(this.table.io()).get(0)).path(), secondSnapshot.manifestListLocation(), FILE_A.path()}), (Object)deletedFiles);
        this.checkExpirationResults(1L, 0L, 0L, 1L, 2L, result);
    }

    @Test
    public void testExpireOlderThanWithRollback() {
        this.table.updateProperties().set("commit.manifest.min-count-to-merge", "0").commit();
        this.table.newAppend().appendFile(FILE_A).appendFile(FILE_B).commit();
        Snapshot firstSnapshot = this.table.currentSnapshot();
        Assert.assertEquals((String)"Should create one manifest", (long)1L, (long)firstSnapshot.allManifests(this.table.io()).size());
        this.rightAfterSnapshot();
        this.table.newDelete().deleteFile(FILE_B).commit();
        Snapshot secondSnapshot = this.table.currentSnapshot();
        HashSet secondSnapshotManifests = Sets.newHashSet((Iterable)secondSnapshot.allManifests(this.table.io()));
        secondSnapshotManifests.removeAll(firstSnapshot.allManifests(this.table.io()));
        Assert.assertEquals((String)"Should add one new manifest for append", (long)1L, (long)secondSnapshotManifests.size());
        this.table.manageSnapshots().rollbackTo(firstSnapshot.snapshotId()).commit();
        long tAfterCommits = this.rightAfterSnapshot(secondSnapshot.snapshotId());
        long snapshotId = this.table.currentSnapshot().snapshotId();
        HashSet deletedFiles = Sets.newHashSet();
        ExpireSnapshots.Result result = SparkActions.get().expireSnapshots(this.table).expireOlderThan(tAfterCommits).deleteWith(deletedFiles::add).execute();
        Assert.assertEquals((String)"Expire should not change current snapshot", (long)snapshotId, (long)this.table.currentSnapshot().snapshotId());
        Assert.assertNotNull((String)"Expire should keep the oldest snapshot, current", (Object)this.table.snapshot(firstSnapshot.snapshotId()));
        Assert.assertNull((String)"Expire should remove the orphaned snapshot", (Object)this.table.snapshot(secondSnapshot.snapshotId()));
        Assert.assertEquals((String)"Should remove expired manifest lists and reverted appended data file", (Object)Sets.newHashSet((Object[])new String[]{secondSnapshot.manifestListLocation(), ((ManifestFile)Iterables.getOnlyElement((Iterable)secondSnapshotManifests)).path()}), (Object)deletedFiles);
        this.checkExpirationResults(0L, 0L, 0L, 1L, 1L, result);
    }

    @Test
    public void testExpireOlderThanWithRollbackAndMergedManifests() {
        this.table.newAppend().appendFile(FILE_A).commit();
        Snapshot firstSnapshot = this.table.currentSnapshot();
        Assert.assertEquals((String)"Should create one manifest", (long)1L, (long)firstSnapshot.allManifests(this.table.io()).size());
        this.rightAfterSnapshot();
        this.table.newAppend().appendFile(FILE_B).commit();
        Snapshot secondSnapshot = this.table.currentSnapshot();
        HashSet secondSnapshotManifests = Sets.newHashSet((Iterable)secondSnapshot.allManifests(this.table.io()));
        secondSnapshotManifests.removeAll(firstSnapshot.allManifests(this.table.io()));
        Assert.assertEquals((String)"Should add one new manifest for append", (long)1L, (long)secondSnapshotManifests.size());
        this.table.manageSnapshots().rollbackTo(firstSnapshot.snapshotId()).commit();
        long tAfterCommits = this.rightAfterSnapshot(secondSnapshot.snapshotId());
        long snapshotId = this.table.currentSnapshot().snapshotId();
        HashSet deletedFiles = Sets.newHashSet();
        ExpireSnapshots.Result result = SparkActions.get().expireSnapshots(this.table).expireOlderThan(tAfterCommits).deleteWith(deletedFiles::add).execute();
        Assert.assertEquals((String)"Expire should not change current snapshot", (long)snapshotId, (long)this.table.currentSnapshot().snapshotId());
        Assert.assertNotNull((String)"Expire should keep the oldest snapshot, current", (Object)this.table.snapshot(firstSnapshot.snapshotId()));
        Assert.assertNull((String)"Expire should remove the orphaned snapshot", (Object)this.table.snapshot(secondSnapshot.snapshotId()));
        Assert.assertEquals((String)"Should remove expired manifest lists and reverted appended data file", (Object)Sets.newHashSet((Object[])new CharSequence[]{secondSnapshot.manifestListLocation(), ((ManifestFile)Iterables.getOnlyElement((Iterable)secondSnapshotManifests)).path(), FILE_B.path()}), (Object)deletedFiles);
        this.checkExpirationResults(1L, 0L, 0L, 1L, 1L, result);
    }

    @Test
    public void testExpireOlderThanWithDeleteFile() {
        this.table.updateProperties().set("format-version", "2").set("commit.manifest-merge.enabled", "false").commit();
        this.table.newAppend().appendFile(FILE_A).commit();
        Snapshot firstSnapshot = this.table.currentSnapshot();
        this.table.newRowDelta().addDeletes(FILE_A_POS_DELETES).commit();
        Snapshot secondSnapshot = this.table.currentSnapshot();
        this.table.newRowDelta().addDeletes(FILE_A_EQ_DELETES).commit();
        Snapshot thirdSnapshot = this.table.currentSnapshot();
        this.table.newDelete().deleteFromRowFilter((Expression)Expressions.alwaysTrue()).commit();
        Snapshot fourthSnapshot = this.table.currentSnapshot();
        long afterAllDeleted = this.rightAfterSnapshot();
        this.table.newAppend().appendFile(FILE_B).commit();
        HashSet deletedFiles = Sets.newHashSet();
        ExpireSnapshots.Result result = SparkActions.get().expireSnapshots(this.table).expireOlderThan(afterAllDeleted).deleteWith(deletedFiles::add).execute();
        HashSet expectedDeletes = Sets.newHashSet((Object[])new String[]{firstSnapshot.manifestListLocation(), secondSnapshot.manifestListLocation(), thirdSnapshot.manifestListLocation(), fourthSnapshot.manifestListLocation(), FILE_A.path().toString(), FILE_A_POS_DELETES.path().toString(), FILE_A_EQ_DELETES.path().toString()});
        expectedDeletes.addAll(thirdSnapshot.allManifests(this.table.io()).stream().map(ManifestFile::path).map(CharSequence::toString).collect(Collectors.toSet()));
        expectedDeletes.addAll(fourthSnapshot.allManifests(this.table.io()).stream().map(ManifestFile::path).map(CharSequence::toString).collect(Collectors.toSet()));
        Assert.assertEquals((String)"Should remove expired manifest lists and deleted data file", (Object)expectedDeletes, (Object)deletedFiles);
        this.checkExpirationResults(1L, 1L, 1L, 6L, 4L, result);
    }

    @Test
    public void testExpireOnEmptyTable() {
        HashSet deletedFiles = Sets.newHashSet();
        ExpireSnapshots.Result result = SparkActions.get().expireSnapshots(this.table).expireOlderThan(System.currentTimeMillis()).deleteWith(deletedFiles::add).execute();
        this.checkExpirationResults(0L, 0L, 0L, 0L, 0L, result);
    }

    @Test
    public void testExpireAction() {
        this.table.newAppend().appendFile(FILE_A).commit();
        Snapshot firstSnapshot = this.table.currentSnapshot();
        this.rightAfterSnapshot();
        this.table.newAppend().appendFile(FILE_B).commit();
        long snapshotId = this.table.currentSnapshot().snapshotId();
        long tAfterCommits = this.rightAfterSnapshot();
        HashSet deletedFiles = Sets.newHashSet();
        ExpireSnapshotsSparkAction action = SparkActions.get().expireSnapshots(this.table).expireOlderThan(tAfterCommits).deleteWith(deletedFiles::add);
        Dataset pendingDeletes = action.expireFiles();
        List pending = pendingDeletes.collectAsList();
        Assert.assertEquals((String)"Should not change current snapshot", (long)snapshotId, (long)this.table.currentSnapshot().snapshotId());
        Assert.assertNull((String)"Should remove the oldest snapshot", (Object)this.table.snapshot(firstSnapshot.snapshotId()));
        Assert.assertEquals((String)"Pending deletes should contain one row", (long)1L, (long)pending.size());
        Assert.assertEquals((String)"Pending delete should be the expired manifest list location", (Object)firstSnapshot.manifestListLocation(), (Object)((FileInfo)pending.get(0)).getPath());
        Assert.assertEquals((String)"Pending delete should be a manifest list", (Object)"Manifest List", (Object)((FileInfo)pending.get(0)).getType());
        Assert.assertEquals((String)"Should not delete any files", (long)0L, (long)deletedFiles.size());
        Assert.assertEquals((String)"Multiple calls to expire should return the same count of deleted files", (long)pendingDeletes.count(), (long)action.expireFiles().count());
    }

    @Test
    public void testUseLocalIterator() {
        this.table.newFastAppend().appendFile(FILE_A).commit();
        this.table.newOverwrite().deleteFile(FILE_A).addFile(FILE_B).commit();
        this.table.newFastAppend().appendFile(FILE_C).commit();
        long end = this.rightAfterSnapshot();
        int jobsBeforeStreamResults = spark.sparkContext().dagScheduler().nextJobId().get();
        this.withSQLConf((Map<String, String>)ImmutableMap.of((Object)"spark.sql.adaptive.enabled", (Object)"false"), () -> {
            ExpireSnapshots.Result results = ((ExpireSnapshotsSparkAction)SparkActions.get().expireSnapshots(this.table).expireOlderThan(end).option("stream-results", "true")).execute();
            int jobsAfterStreamResults = spark.sparkContext().dagScheduler().nextJobId().get();
            int jobsRunDuringStreamResults = jobsAfterStreamResults - jobsBeforeStreamResults;
            this.checkExpirationResults(1L, 0L, 0L, 1L, 2L, results);
            Assert.assertEquals((String)"Expected total number of jobs with stream-results should match the expected number", (long)4L, (long)jobsRunDuringStreamResults);
        });
    }

    @Test
    public void testExpireAfterExecute() {
        this.table.newAppend().appendFile(FILE_A).commit();
        this.rightAfterSnapshot();
        this.table.newAppend().appendFile(FILE_B).commit();
        this.table.newAppend().appendFile(FILE_C).commit();
        long t3 = this.rightAfterSnapshot();
        ExpireSnapshotsSparkAction action = SparkActions.get().expireSnapshots(this.table);
        action.expireOlderThan(t3).retainLast(2);
        ExpireSnapshots.Result result = action.execute();
        this.checkExpirationResults(0L, 0L, 0L, 0L, 1L, result);
        List typedExpiredFiles = action.expireFiles().collectAsList();
        Assert.assertEquals((String)"Expired results must match", (long)1L, (long)typedExpiredFiles.size());
        List untypedExpiredFiles = action.expireFiles().collectAsList();
        Assert.assertEquals((String)"Expired results must match", (long)1L, (long)untypedExpiredFiles.size());
    }

    @Test
    public void testExpireFileDeletionMostExpired() {
        this.testExpireFilesAreDeleted(5, 2);
    }

    @Test
    public void testExpireFileDeletionMostRetained() {
        this.testExpireFilesAreDeleted(2, 5);
    }

    public void testExpireFilesAreDeleted(int dataFilesExpired, int dataFilesRetained) {
        HashSet dataFiles = Sets.newHashSet();
        for (int i = 0; i < dataFilesExpired; ++i) {
            DataFile df = DataFiles.builder((PartitionSpec)SPEC).withPath(String.format("/path/to/data-expired-%d.parquet", i)).withFileSizeInBytes(10L).withPartitionPath("c1=1").withRecordCount(1L).build();
            dataFiles.add(df.path().toString());
            this.table.newFastAppend().appendFile(df).commit();
        }
        this.table.newDelete().deleteFromRowFilter((Expression)Expressions.alwaysTrue()).commit();
        this.table.newDelete().deleteFromRowFilter((Expression)Expressions.alwaysTrue()).commit();
        Set<String> manifestsBefore = TestHelpers.reachableManifestPaths(this.table);
        for (int i = 0; i < dataFilesRetained; ++i) {
            DataFile df = DataFiles.builder((PartitionSpec)SPEC).withPath(String.format("/path/to/data-retained-%d.parquet", i)).withFileSizeInBytes(10L).withPartitionPath("c1=1").withRecordCount(1L).build();
            this.table.newFastAppend().appendFile(df).commit();
        }
        long end = this.rightAfterSnapshot();
        HashSet expectedDeletes = Sets.newHashSet();
        expectedDeletes.addAll(ReachableFileUtil.manifestListLocations((Table)this.table));
        expectedDeletes.remove(this.table.currentSnapshot().manifestListLocation());
        expectedDeletes.addAll(manifestsBefore);
        expectedDeletes.addAll(dataFiles);
        HashSet deletedFiles = Sets.newHashSet();
        SparkActions.get().expireSnapshots(this.table).expireOlderThan(end).deleteWith(deletedFiles::add).execute();
        Assert.assertEquals((String)"All reachable files before expiration should be deleted", (Object)expectedDeletes, (Object)deletedFiles);
    }

    @Test
    public void testExpireSomeCheckFilesDeleted() {
        this.table.newAppend().appendFile(FILE_A).commit();
        this.table.newAppend().appendFile(FILE_B).commit();
        this.table.newAppend().appendFile(FILE_C).commit();
        this.table.newDelete().deleteFile(FILE_A).commit();
        long after = this.rightAfterSnapshot();
        this.waitUntilAfter(after);
        this.table.newAppend().appendFile(FILE_D).commit();
        this.table.newDelete().deleteFile(FILE_B).commit();
        HashSet deletedFiles = Sets.newHashSet();
        SparkActions.get().expireSnapshots(this.table).expireOlderThan(after).deleteWith(deletedFiles::add).execute();
        Assert.assertTrue((boolean)deletedFiles.contains(FILE_A.path().toString()));
        Assert.assertFalse((boolean)deletedFiles.contains(FILE_B.path().toString()));
        Assert.assertFalse((boolean)deletedFiles.contains(FILE_C.path().toString()));
        Assert.assertFalse((boolean)deletedFiles.contains(FILE_D.path().toString()));
    }
}

